""" 学生管理相关API路由 """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from sqlalchemy.orm import Session from typing import List from backend.database import get_db from backend.models import Student, RollCallRecord, ScoreRecord from backend.schemas import Student as StudentSchema, StudentCreate, ExcelImportResponse from backend.utils.excel import read_students_from_excel import os import tempfile router = APIRouter(prefix="/api/students", tags=["students"]) @router.get("/", response_model=List[StudentSchema]) def get_students(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """获取学生列表""" students = db.query(Student).offset(skip).limit(limit).all() return students @router.get("/{student_id}", response_model=StudentSchema) def get_student(student_id: int, db: Session = Depends(get_db)): """获取单个学生信息""" student = db.query(Student).filter(Student.id == student_id).first() if not student: raise HTTPException(status_code=404, detail="学生不存在") return student @router.post("/", response_model=StudentSchema) def create_student(student: StudentCreate, db: Session = Depends(get_db)): """创建新学生""" # 检查学号是否已存在 existing = db.query(Student).filter(Student.student_id == student.student_id).first() if existing: raise HTTPException(status_code=400, detail="学号已存在") db_student = Student(**student.dict()) db.add(db_student) db.commit() db.refresh(db_student) return db_student @router.post("/import-excel", response_model=ExcelImportResponse) async def import_students_from_excel(file: UploadFile = File(...), db: Session = Depends(get_db)): """从Excel文件导入学生""" # 保存上传的文件到临时目录 with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp_file: content = await file.read() tmp_file.write(content) tmp_path = tmp_file.name try: # 读取Excel文件 students_data = read_students_from_excel(tmp_path) imported_count = 0 failed_count = 0 for student_data in students_data: try: # 检查学号是否已存在 existing = db.query(Student).filter(Student.student_id == student_data['student_id']).first() if not existing: db_student = Student(student_id=student_data['student_id'], name=student_data['name'], major=student_data.get('major', '')) db.add(db_student) imported_count += 1 else: # 更新已存在学生的信息 existing.name = student_data['name'] existing.major = student_data.get('major', existing.major) imported_count += 1 except Exception as e: failed_count += 1 continue db.commit() return ExcelImportResponse(success=True, message=f"成功导入{imported_count}条记录,失败{failed_count}条", imported_count=imported_count, failed_count=failed_count) except Exception as e: db.rollback() raise HTTPException(status_code=400, detail=f"导入失败: {str(e)}") finally: # 删除临时文件 if os.path.exists(tmp_path): os.remove(tmp_path) @router.delete("/{student_id}") def delete_student(student_id: int, db: Session = Depends(get_db)): """删除学生""" student = db.query(Student).filter(Student.id == student_id).first() if not student: raise HTTPException(status_code=404, detail="学生不存在") db.query(RollCallRecord).filter(RollCallRecord.student_id == student_id).delete() db.query(ScoreRecord).filter(ScoreRecord.student_id == student_id).delete() db.delete(student) db.commit() return {"message": "删除成功"}