You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
7.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import FastAPI, HTTPException, Depends, File, UploadFile
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import random
import time
import pandas as pd
import math
import os
app = FastAPI()
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DATA_PATH = "./back_end/data/"
# 初始excel信息
initial_data = {
'学号': ['*********'],
'姓名': ['BOSS_K'],
'积分': [9.9]
}
initial_info = {
'basicProbability': [1.0],
'maxPoint': [9.9],
'minPoint': [0.0]
}
initial_data_df = pd.DataFrame(initial_data)
initial_info_df = pd.DataFrame(initial_info)
# 定义请求体模型
class StudentRequest(BaseModel):
id: str
name: str
points: float
# 获取文件路径
async def get_file_path(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if not os.path.exists(file_path):
raise HTTPException(status_code=403, detail="班级不存在")
return file_path
#查询班级是否存在
@app.post("/{class_name}/search-room")
async def upload_file(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if not os.path.exists(file_path):
return {"message": "班级不存在"}
return {"message": "班级存在"}
# 上传excel
@app.post("/{class_name}/upload")
async def upload_file(file_path: str = Depends(get_file_path), file: UploadFile = File(...)):
# 检查文件类型
if not (file.filename.endswith('.xlsx') or file.filename.endswith('.xls')):
return JSONResponse(content={"error": "无效的文件类型"}, status_code=400)
try:
# 将 Excel 文件读取为 DataFrame
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
df_info['basicProbability'] = df_info['basicProbability'].astype(float)
df_upload = pd.read_excel(file.file)
new_rows = [] # 新增信息
for index, row in df_upload.iterrows():
id = row['学号'] # 根据列名访问学号
name = row['姓名'] # 根据列名访问姓名
points = 0.0
if '积分' in row:
points = row['积分']
new_rows.append({'学号': id, '姓名': name, '积分': points})
df_data = pd.concat([df_data, pd.DataFrame(new_rows)], ignore_index=True) # 合并
df_data = df_data.sort_values(by='积分', ascending=False) # 根据 '积分' 列降序排序
# 更新excel
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Data'] # 删除旧的工作表
df_data.to_excel(writer, sheet_name='Data', index=False)
df_info.loc[0, 'basicProbability'] = 1.0
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return {"message": "文件上传成功"}
# 上传失败
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# 创建班级的接口
@app.post("/create-class/{class_name}")
async def create_class(class_name: str):
file_path = DATA_PATH + class_name + ".xlsx"
if os.path.exists(file_path):
return {"message": "班级已存在"}
# 创建excel
initial_data_df.to_excel(file_path, sheet_name='Data', index=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
initial_info_df.to_excel(writer, sheet_name='Info', index=False)
return {"message": "班级创建成功"}
# 随机点名的接口
@app.post("/{class_name}/random-call")
async def random_call(file_path: str = Depends(get_file_path)):
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
n = len(df_data) #学生数量
if n <= 0:
raise HTTPException(status_code=404, detail="没有学生可供点名")
df_info['basicProbability'] = df_info['basicProbability'].astype(float)
basicProbability = float(df_info.iloc[0]['basicProbability'])
maxPoint = float(df_info.iloc[0]['maxPoint'])
minPoint = float(df_info.iloc[0]['minPoint'])
res = {}
# 进行随机抽取
basicProbability *= 0.5
while (True):
# 从下标[0, n-1]中随机选出一名学生, 并获取其当前积分
i = math.floor(random.random() * n)
points = df_data.iloc[i]['积分']
# 根据当前班级中积分情况,计算该学生被抽中的相对概率, 分数越高抽中概率越低
hit_probability = (maxPoint - points + 1) / (maxPoint - minPoint + 10) * basicProbability
if random.random() <= hit_probability: #成功命中幸运儿
res = {"name": str(df_data.iloc[i]['姓名']),
"id": str(df_data.iloc[i]['学号']),
"points": float(points)}
break
#未抽到学生时,提高基础概率,防止长时间抽不到人
#当前抽到一个名学生期望需要约35(log(1/0.5)/log(1.02))次git
# 更新信息
df_info.loc[0, 'basicProbability'] = basicProbability
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return res
# 修改积分的接口
@app.post("/{class_name}/change-points")
async def change_points(student_request: StudentRequest, file_path: str = Depends(get_file_path)):
#获取当前班级的数据
df_data = pd.read_excel(file_path, sheet_name='Data')
df_info = pd.read_excel(file_path, sheet_name='Info')
df_data['积分'] = df_data['积分'].astype(float)
df_info['maxPoint'] = df_info['maxPoint'].astype(float)
df_info['minPoint'] = df_info['minPoint'].astype(float)
index = -1 # 点到学生的序号
for i, row in df_data.iterrows():
if (str(row['学号']) == str(student_request.id) and str(row['姓名']) == str(student_request.name)):
index = i
break
#未找到学生
if (index < 0):
raise HTTPException(status_code=400, detail="学号姓名无效")
# 更新信息并导入excel
df_data.loc[index, '积分'] += student_request.points
df_info.loc[0, 'maxPoint'] = max(df_info.loc[0, 'maxPoint'], df_data.loc[index, '积分'])
df_info.loc[0, 'minPoint'] = min(df_info.loc[0, 'minPoint'], df_data.loc[index, '积分'])
df_data = df_data.sort_values(by='积分', ascending=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Data'] # 删除旧的工作表
df_data.to_excel(writer, sheet_name='Data', index=False)
with pd.ExcelWriter(file_path, mode='a', engine='openpyxl') as writer:
del writer.book['Info'] # 删除旧的工作表
df_info.to_excel(writer, sheet_name='Info', index=False)
return {"message": "积分已修改"}
if __name__ == "__main__":
import uvicorn
import sys
random.seed(int(time.time()))
# 使用 uvicorn 启动服务器
uvicorn.run(app, host="0.0.0.0", port=8000)