from fastapi import FastAPI, HTTPException, Depends, File, UploadFile from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import random import time import pandas as pd import math import os app = FastAPI() # CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) DATA_PATH = "./back_end/data/" # 初始excel信息 initial_data = { '学号': ['*********'], '姓名': ['BOSS_K'], '积分': [9.9] } initial_info = { 'basicProbability': [1.0], 'maxPoint': [9.9], 'minPoint': [0.0] } initial_data_df = pd.DataFrame(initial_data) initial_info_df = pd.DataFrame(initial_info) # 定义请求体模型 class StudentRequest(BaseModel): id: str name: str points: float # 获取文件路径 async def get_file_path(class_name: str): file_path = DATA_PATH + class_name + ".xlsx" if not os.path.exists(file_path): raise HTTPException(status_code=403, detail="班级不存在") return file_path #查询班级是否存在 @app.post("/{class_name}/search-room") async def upload_file(class_name: str): file_path = DATA_PATH + class_name + ".xlsx" if not os.path.exists(file_path): return {"message": "班级不存在"} return {"message": "班级存在"} # 上传excel @app.post("/{class_name}/upload") async def upload_file(file_path: str = Depends(get_file_path), file: UploadFile = File(...)): # 检查文件类型 if not (file.filename.endswith('.xlsx') or file.filename.endswith('.xls')): return JSONResponse(content={"error": "无效的文件类型"}, status_code=400) try: # 将 Excel 文件读取为 DataFrame df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') df_info['basicProbability'] = df_info['basicProbability'].astype(float) df_upload = pd.read_excel(file.file) new_rows = [] # 新增信息 for index, row in df_upload.iterrows(): id = row['学号'] # 根据列名访问学号 name = row['姓名'] # 根据列名访问姓名 points = 0.0 if '积分' in row: points = row['积分'] new_rows.append({'学号': id, '姓名': name, '积分': points}) df_data = pd.concat([df_data, pd.DataFrame(new_rows)], ignore_index=True) # 合并 df_data = df_data.sort_values(by='积分', ascending=False) # 根据 '积分' 列降序排序 # 更新excel with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Data'] # 删除旧的工作表 df_data.to_excel(writer, sheet_name='Data', index=False) df_info.loc[0, 'basicProbability'] = 1.0 with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return {"message": "文件上传成功"} # 上传失败 except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) # 创建班级的接口 @app.post("/create-class/{class_name}") async def create_class(class_name: str): file_path = DATA_PATH + class_name + ".xlsx" if os.path.exists(file_path): return {"message": "班级已存在"} # 创建excel initial_data_df.to_excel(file_path, sheet_name='Data', index=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: initial_info_df.to_excel(writer, sheet_name='Info', index=False) return {"message": "班级创建成功"} # 随机点名的接口 @app.post("/{class_name}/random-call") async def random_call(file_path: str = Depends(get_file_path)): df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') n = len(df_data) #学生数量 if n <= 0: raise HTTPException(status_code=404, detail="没有学生可供点名") df_info['basicProbability'] = df_info['basicProbability'].astype(float) basicProbability = float(df_info.iloc[0]['basicProbability']) maxPoint = float(df_info.iloc[0]['maxPoint']) minPoint = float(df_info.iloc[0]['minPoint']) res = {} # 进行随机抽取 basicProbability *= 0.5 while (True): # 从下标[0, n-1]中随机选出一名学生, 并获取其当前积分 i = math.floor(random.random() * n) points = df_data.iloc[i]['积分'] # 根据当前班级中积分情况,计算该学生被抽中的相对概率, 分数越高抽中概率越低 hit_probability = (maxPoint - points + 1) / (maxPoint - minPoint + 10) * basicProbability if random.random() <= hit_probability: #成功命中幸运儿 res = {"name": str(df_data.iloc[i]['姓名']), "id": str(df_data.iloc[i]['学号']), "points": float(points)} break #未抽到学生时,提高基础概率,防止长时间抽不到人 #当前抽到一个名学生期望需要约35(log(1/0.5)/log(1.02))次git # 更新信息 df_info.loc[0, 'basicProbability'] = basicProbability with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return res # 修改积分的接口 @app.post("/{class_name}/change-points") async def change_points(student_request: StudentRequest, file_path: str = Depends(get_file_path)): #获取当前班级的数据 df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') df_data['积分'] = df_data['积分'].astype(float) df_info['maxPoint'] = df_info['maxPoint'].astype(float) df_info['minPoint'] = df_info['minPoint'].astype(float) index = -1 # 点到学生的序号 for i, row in df_data.iterrows(): if (str(row['学号']) == str(student_request.id) and str(row['姓名']) == str(student_request.name)): index = i break #未找到学生 if (index < 0): raise HTTPException(status_code=400, detail="学号姓名无效") # 更新信息,并导入excel df_data.loc[index, '积分'] += student_request.points df_info.loc[0, 'maxPoint'] = max(df_info.loc[0, 'maxPoint'], df_data.loc[index, '积分']) df_info.loc[0, 'minPoint'] = min(df_info.loc[0, 'minPoint'], df_data.loc[index, '积分']) df_data = df_data.sort_values(by='积分', ascending=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Data'] # 删除旧的工作表 df_data.to_excel(writer, sheet_name='Data', index=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return {"message": "积分已修改"} if __name__ == "__main__": import uvicorn import sys random.seed(int(time.time())) # 使用 uvicorn 启动服务器 uvicorn.run(app, host="0.0.0.0", port=8000)