from fastapi import FastAPI, HTTPException from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware import random import time from fastapi import FastAPI, File, UploadFile from fastapi.responses import JSONResponse import pandas as pd # from openpyxl import Workbook, load_workbook import math import os from fastapi import Depends app = FastAPI() # CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # DATA_PATH = os.path.abspath(__file__) + "\\data\\" DATA_PATH = "./demo/Data/" initial_data = { '学号': ['*********'], '姓名': ['BOSS_K'], '积分': [9.9] } initial_info = { 'basicProbability': [1.0], # 'lastRow': [0], 'maxPoint': [0.0], 'minPoint': [0.0] } initial_data_df = pd.DataFrame(initial_data) initial_info_df = pd.DataFrame(initial_info) # 定义请求体模型 class StudentRequest(BaseModel): id: str name: str points: float # 获取文件路径 async def get_file_path(class_name: str): file_path = DATA_PATH + class_name + ".xlsx" if not os.path.exists(file_path): raise HTTPException(status_code=403, detail="Not authenticated") return file_path # 上传excel @app.post("/{class_name}/upload") async def upload_file(file_path: str = Depends(get_file_path), file: UploadFile = File(...)): # 检查文件类型 if not (file.filename.endswith('.xlsx') or file.filename.endswith('.xls')): return JSONResponse(content={"error": "无效的文件类型"}, status_code=400) try: # 将 Excel 文件读取为 DataFrame df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') df_upload = pd.read_excel(file.file) new_rows = [] # maxPoint = df_info.loc[0, 'maxPoint'] # minPoint = df_info.loc[0, 'minPoint'] for index, row in df_upload.iterrows(): id = row['学号'] # 根据列名访问学号 name = row['姓名'] # 根据列名访问姓名 points = 0.0 if '积分' in row: points = row['积分'] # maxPoint = math.max(points, maxPoint) # minPoint = math.min(points, minPoint) new_rows.append({'学号': id, '姓名': name, '积分': points}) df_data = pd.concat([df_data, pd.DataFrame(new_rows)], ignore_index=True) # 根据 '积分' 列降序排序 df_data = df_data.sort_values(by='积分', ascending=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Data'] # 删除旧的工作表 df_data.to_excel(writer, sheet_name='Data', index=False) df_info.loc[0, 'basicProbability'] = 1.0 # df_info.loc[0, 'maxPoint'] = maxPoint # df_info.loc[0, 'minPoint'] = minPoint with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return {"message": "文件上传成功"} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) # 创建班级的接口 @app.get("/create-class/{class_name}") async def create_class(class_name: str): # return {"ok":"ok"} file_path = DATA_PATH + class_name + ".xlsx" if os.path.exists(file_path): return {"message": "班级已存在"} # 创建excel initial_data_df.to_excel(file_path, sheet_name='Data', index=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: initial_info_df.to_excel(writer, sheet_name='Info', index=False) return {"message": "班级创建成功"} # 随机点名的接口 @app.get("/{class_name}/random-call") async def random_call(file_path: str = Depends(get_file_path)): df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') n = len(df_data) if n <= 0: raise HTTPException(status_code=404, detail="没有学生可供点名") # 进行随机 basicProbability = float(df_info.iloc[0]['basicProbability']) # lastRow = int(df_info.iloc[0]['lastRow']) maxPoint = float(df_info.iloc[0]['maxPoint']) minPoint = float(df_info.iloc[0]['minPoint']) res = {} basicProbability *= 0.8 while (True): # print(basicProbability) # lastRow = (lastRow + 1) % n i = math.floor(random.random() * n) points = df_data.iloc[i]['积分'] hit_probability = (maxPoint - points + 1) / (maxPoint - minPoint + 20) * basicProbability if random.random() <= hit_probability: #成功命中倒霉蛋 res = {"name": str(df_data.iloc[i]['姓名']), "id": str(df_data.iloc[i]['学号']), "points": float(points)} break basicProbability *= 1.03 # 更新信息 df_info.loc[0, 'basicProbability'] = basicProbability # df_info.loc[0, 'lastRow'] = lastRow with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return res # 修改积分的接口 @app.post("/{class_name}/change-points") async def change_points(student_request: StudentRequest, file_path: str = Depends(get_file_path)): df_data = pd.read_excel(file_path, sheet_name='Data') df_info = pd.read_excel(file_path, sheet_name='Info') print(df_data) index = -1 for i, row in df_data.iterrows(): print(row['学号'], student_request.id) print(row['姓名'], student_request.name) if (str(row['学号']) == str(student_request.id) and str(row['姓名']) == str(student_request.name)): index = i break if (index < 0): raise HTTPException(status_code=400, detail="学号姓名无效") df_data.loc[index, '积分'] = student_request.points df_data = df_data.sort_values(by='积分', ascending=False) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Data'] # 删除旧的工作表 df_data.to_excel(writer, sheet_name='Data', index=False) df_info.loc[0, 'maxPoint'] = max(df_info.loc[0, 'maxPoint'], student_request.points) df_info.loc[0, 'minPoint'] = min(df_info.loc[0, 'minPoint'], student_request.points) with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer: del writer.book['Info'] # 删除旧的工作表 df_info.to_excel(writer, sheet_name='Info', index=False) return {"message": "积分已修改"} if __name__ == "__main__": import uvicorn import sys random.seed(int(time.time())) # 使用 uvicorn 启动服务器 uvicorn.run(app, host="0.0.0.0", port=8000)