from models import get_db, Student, RollCallRecord, OrderCallStatus from utils import ProbabilityCalculator, ScoreCalculator, ExcelService import pandas as pd from sqlalchemy import desc import matplotlib.pyplot as plt import io import base64 class RollCallService: def __init__(self): self.probability_calculator = ProbabilityCalculator() def import_students_from_excel(self, excel_file): db = next(get_db()) try: df = pd.read_excel(excel_file, engine='openpyxl') required_cols = ['学号', '姓名', '专业', '班级'] if not all(col in df.columns for col in required_cols): return False, "Excel缺少必填列(学号、姓名、专业、班级)" for _, row in df.iterrows(): student = Student( student_id=str(row['学号']), name=str(row['姓名']), major=str(row['专业']), class_name=str(row['班级']) ) existing = db.query(Student).filter_by(student_id=student.student_id).first() if existing: existing.name = student.name existing.major = student.major existing.class_name = student.class_name else: db.add(student) db.commit() return True, f"成功导入 {len(df)} 名学生" except Exception as e: db.rollback() return False, f"导入失败:{str(e)}" finally: db.close() def roll_call(self, class_name, call_mode='random'): db = next(get_db()) try: students = db.query(Student).filter_by(class_name=class_name).order_by(Student.student_id).all() if not students: return None, "没有找到该班级学生" if call_mode == 'random': selected = self.probability_calculator.weighted_random_selection(students) else: status = db.query(OrderCallStatus).filter_by(class_name=class_name).first() if not status: status = OrderCallStatus(class_name=class_name, current_index=0) db.add(status) db.commit() selected = students[status.current_index % len(students)] status.current_index += 1 db.commit() selected.call_count += 1 db.commit() return selected, "" except Exception as e: db.rollback() return None, f"点名失败:{str(e)}" finally: db.close() def update_student_score(self, student_id, answer_type, performance, status='present'): db = next(get_db()) try: student = db.query(Student).filter_by(student_id=student_id).first() if not student: return False, 0 score_delta = ScoreCalculator.calculate_score(answer_type, performance) if status == 'present': score_delta += 1 student.total_score += score_delta record = RollCallRecord( student_id=student_id, call_mode='random', status=status, answer_type=answer_type, performance=performance, score_delta=score_delta ) db.add(record) db.commit() return True, score_delta except Exception as e: db.rollback() return False, 0 finally: db.close() class ScoreService: def __init__(self): # 不在初始化时获取db连接 self.excel_service = ExcelService() def get_class_ranking(self, class_name): db = next(get_db()) try: return db.query(Student).filter_by(class_name=class_name).order_by(desc(Student.total_score)).all() finally: db.close() def generate_score_chart(self, class_name, top_n=5): db = next(get_db()) try: students = self.get_class_ranking(class_name)[:top_n] if not students: return None plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False fig, ax = plt.subplots(figsize=(8, 5)) names = [s.name for s in students] scores = [s.total_score for s in students] call_counts = [s.call_count for s in students] x = range(len(names)) ax.bar(x, scores, alpha=0.7, label='总积分', color='#3498db') ax2 = ax.twinx() ax2.plot(x, call_counts, color='#e74c3c', marker='o', label='点名次数') ax.set_xlabel('学生姓名') ax.set_ylabel('总积分', color='#3498db') ax2.set_ylabel('点名次数', color='#e74c3c') ax.set_title(f'{class_name} 积分TOP{top_n}及点名次数') ax.set_xticks(x) ax.set_xticklabels(names, rotation=45) buffer = io.BytesIO() plt.tight_layout() plt.savefig(buffer, format='png', dpi=100, bbox_inches='tight') buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') buffer.close() plt.close() return img_base64 finally: db.close() def export_scores_excel(self, class_name): students = self.get_class_ranking(class_name) return self.excel_service.export_detailed_scores(students)