You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
4.1 KiB

"""
学生管理相关API路由
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from sqlalchemy.orm import Session
from typing import List
from backend.database import get_db
from backend.models import Student, RollCallRecord, ScoreRecord
from backend.schemas import Student as StudentSchema, StudentCreate, ExcelImportResponse
from backend.utils.excel import read_students_from_excel
import os
import tempfile
router = APIRouter(prefix="/api/students", tags=["students"])
@router.get("/", response_model=List[StudentSchema])
def get_students(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""获取学生列表"""
students = db.query(Student).offset(skip).limit(limit).all()
return students
@router.get("/{student_id}", response_model=StudentSchema)
def get_student(student_id: int, db: Session = Depends(get_db)):
"""获取单个学生信息"""
student = db.query(Student).filter(Student.id == student_id).first()
if not student:
raise HTTPException(status_code=404, detail="学生不存在")
return student
@router.post("/", response_model=StudentSchema)
def create_student(student: StudentCreate, db: Session = Depends(get_db)):
"""创建新学生"""
# 检查学号是否已存在
existing = db.query(Student).filter(Student.student_id == student.student_id).first()
if existing:
raise HTTPException(status_code=400, detail="学号已存在")
db_student = Student(**student.dict())
db.add(db_student)
db.commit()
db.refresh(db_student)
return db_student
@router.post("/import-excel", response_model=ExcelImportResponse)
async def import_students_from_excel(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""从Excel文件导入学生"""
# 保存上传的文件到临时目录
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp_file:
content = await file.read()
tmp_file.write(content)
tmp_path = tmp_file.name
try:
# 读取Excel文件
students_data = read_students_from_excel(tmp_path)
imported_count = 0
failed_count = 0
for student_data in students_data:
try:
# 检查学号是否已存在
existing = db.query(Student).filter(Student.student_id == student_data['student_id']).first()
if not existing:
db_student = Student(student_id=student_data['student_id'],
name=student_data['name'],
major=student_data.get('major', ''))
db.add(db_student)
imported_count += 1
else:
# 更新已存在学生的信息
existing.name = student_data['name']
existing.major = student_data.get('major', existing.major)
imported_count += 1
except Exception as e:
failed_count += 1
continue
db.commit()
return ExcelImportResponse(success=True,
message=f"成功导入{imported_count}条记录,失败{failed_count}",
imported_count=imported_count,
failed_count=failed_count)
except Exception as e:
db.rollback()
raise HTTPException(status_code=400, detail=f"导入失败: {str(e)}")
finally:
# 删除临时文件
if os.path.exists(tmp_path):
os.remove(tmp_path)
@router.delete("/{student_id}")
def delete_student(student_id: int, db: Session = Depends(get_db)):
"""删除学生"""
student = db.query(Student).filter(Student.id == student_id).first()
if not student:
raise HTTPException(status_code=404, detail="学生不存在")
db.query(RollCallRecord).filter(RollCallRecord.student_id == student_id).delete()
db.query(ScoreRecord).filter(ScoreRecord.student_id == student_id).delete()
db.delete(student)
db.commit()
return {"message": "删除成功"}