You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
7.3 KiB

4 months ago
from fastapi import FastAPI, HTTPException, Depends, File, UploadFile
from fastapi.responses import JSONResponse
4 months ago
from fastapi.middleware.cors import CORSMiddleware
4 months ago
from pydantic import BaseModel
4 months ago
import random
import time
import pandas as pd
import math
import os
app = FastAPI()
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
4 months ago
DATA_PATH = "./back_end/data/"
4 months ago
4 months ago
# 初始excel信息
4 months ago
initial_data = {
'学号': ['*********'],
'姓名': ['BOSS_K'],
'积分': [9.9]
}
initial_info = {
'basicProbability': [1.0],
4 months ago
'maxPoint': [9.9],
4 months ago
'minPoint': [0.0]
}
initial_data_df = pd.DataFrame(initial_data)
initial_info_df = pd.DataFrame(initial_info)
# 定义请求体模型
class StudentRequest(BaseModel):
id: str
name: str
points: float
# 获取文件路径
async def get_file_path(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if not os.path.exists(file_path):
4 months ago
raise HTTPException(status_code=403, detail="班级不存在")
4 months ago
return file_path
4 months ago
#查询班级是否存在
@app.post("/{class_name}/search-room")
async def upload_file(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if not os.path.exists(file_path):
return {"message": "班级不存在"}
return {"message": "班级存在"}
4 months ago
# 上传excel
@app.post("/{class_name}/upload")
async def upload_file(file_path: str = Depends(get_file_path), file: UploadFile = File(...)):
# 检查文件类型
if not (file.filename.endswith('.xlsx') or file.filename.endswith('.xls')):
return JSONResponse(content={"error": "无效的文件类型"}, status_code=400)
try:
# 将 Excel 文件读取为 DataFrame
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
4 months ago
df_info['basicProbability'] = df_info['basicProbability'].astype(float)
4 months ago
df_upload = pd.read_excel(file.file)
4 months ago
new_rows = [] # 新增信息
4 months ago
for index, row in df_upload.iterrows():
id = row['学号'] # 根据列名访问学号
name = row['姓名'] # 根据列名访问姓名
points = 0.0
if '积分' in row:
points = row['积分']
new_rows.append({'学号': id, '姓名': name, '积分': points})
4 months ago
df_data = pd.concat([df_data, pd.DataFrame(new_rows)], ignore_index=True) # 合并
df_data = df_data.sort_values(by='积分', ascending=False) # 根据 '积分' 列降序排序
4 months ago
4 months ago
# 更新excel
4 months ago
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Data'] # 删除旧的工作表
df_data.to_excel(writer, sheet_name='Data', index=False)
df_info.loc[0, 'basicProbability'] = 1.0
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return {"message": "文件上传成功"}
4 months ago
# 上传失败
4 months ago
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# 创建班级的接口
4 months ago
@app.post("/create-class/{class_name}")
4 months ago
async def create_class(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if os.path.exists(file_path):
return {"message": "班级已存在"}
# 创建excel
initial_data_df.to_excel(file_path, sheet_name='Data', index=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
initial_info_df.to_excel(writer, sheet_name='Info', index=False)
return {"message": "班级创建成功"}
# 随机点名的接口
4 months ago
@app.post("/{class_name}/random-call")
4 months ago
async def random_call(file_path: str = Depends(get_file_path)):
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
4 months ago
n = len(df_data) #学生数量
4 months ago
if n <= 0:
raise HTTPException(status_code=404, detail="没有学生可供点名")
4 months ago
df_info['basicProbability'] = df_info['basicProbability'].astype(float)
4 months ago
basicProbability = float(df_info.iloc[0]['basicProbability'])
4 months ago
4 months ago
maxPoint = float(df_info.iloc[0]['maxPoint'])
minPoint = float(df_info.iloc[0]['minPoint'])
res = {}
4 months ago
# 进行随机抽取
basicProbability *= 0.5
4 months ago
while (True):
4 months ago
# 从下标[0, n-1]中随机选出一名学生, 并获取其当前积分
4 months ago
i = math.floor(random.random() * n)
points = df_data.iloc[i]['积分']
4 months ago
# 根据当前班级中积分情况,计算该学生被抽中的相对概率, 分数越高抽中概率越低
hit_probability = (maxPoint - points + 1) / (maxPoint - minPoint + 10) * basicProbability
if random.random() <= hit_probability: #成功命中幸运儿
4 months ago
res = {"name": str(df_data.iloc[i]['姓名']),
"id": str(df_data.iloc[i]['学号']),
"points": float(points)}
break
4 months ago
#未抽到学生时,提高基础概率,防止长时间抽不到人
4 months ago
#当前抽到一个名学生期望需要约35(log(1/0.5)/log(1.02))次git
4 months ago
4 months ago
# 更新信息
df_info.loc[0, 'basicProbability'] = basicProbability
4 months ago
4 months ago
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return res
# 修改积分的接口
@app.post("/{class_name}/change-points")
async def change_points(student_request: StudentRequest, file_path: str = Depends(get_file_path)):
4 months ago
#获取当前班级的数据
4 months ago
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
4 months ago
df_data['积分'] = df_data['积分'].astype(float)
df_info['maxPoint'] = df_info['maxPoint'].astype(float)
df_info['minPoint'] = df_info['minPoint'].astype(float)
4 months ago
4 months ago
index = -1 # 点到学生的序号
4 months ago
for i, row in df_data.iterrows():
if (str(row['学号']) == str(student_request.id) and str(row['姓名']) == str(student_request.name)):
index = i
break
4 months ago
#未找到学生
4 months ago
if (index < 0):
raise HTTPException(status_code=400, detail="学号姓名无效")
4 months ago
# 更新信息并导入excel
df_data.loc[index, '积分'] += student_request.points
df_info.loc[0, 'maxPoint'] = max(df_info.loc[0, 'maxPoint'], df_data.loc[index, '积分'])
df_info.loc[0, 'minPoint'] = min(df_info.loc[0, 'minPoint'], df_data.loc[index, '积分'])
4 months ago
df_data = df_data.sort_values(by='积分', ascending=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Data'] # 删除旧的工作表
df_data.to_excel(writer, sheet_name='Data', index=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return {"message": "积分已修改"}
if __name__ == "__main__":
import uvicorn
import sys
random.seed(int(time.time()))
# 使用 uvicorn 启动服务器
uvicorn.run(app, host="0.0.0.0", port=8000)