You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Flask, request, jsonify, send_file
from flask_sqlalchemy import SQLAlchemy
import pandas as pd
import random
from datetime import datetime
import os
from sqlalchemy import desc
import math
from flask_cors import CORS
# ---------------------- 1. 应用初始化核心修改适配db4free在线MySQL ----------------------
app = Flask(__name__)
CORS(app)
# ---------------------- 关键配置替换为你的db4free账号信息 ----------------------
DB_USER = 'bagood'
DB_PASSWORD = 'czb098221'
DB_NAME = 'rollcall_db'
DB_HOST = 'db4free.net'
DB_PORT = 3306
# 配置db4free在线MySQL连接核心修改部分
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# db4free专属适配配置解决连接超时/断开问题)
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # 连接池大小适配db4free并发限制
'pool_recycle': 180, # 3分钟回收连接db4free空闲连接10分钟自动断开
'pool_pre_ping': True, # 执行查询前检测连接,无效则自动重连
'connect_args': {
'charset': 'utf8mb4', # 支持中文和特殊字符
'connect_timeout': 10, # 连接超时时间10秒
'read_timeout': 15, # 读取超时时间15秒
'write_timeout': 15 # 写入超时时间15秒
}
}
db = SQLAlchemy(app)
# 临时文件存储路径用于Excel导出
EXPORT_DIR = 'exports'
os.makedirs(EXPORT_DIR, exist_ok=True)
# ---------------------- 2. 数据库模型设计无修改兼容MySQL ----------------------
class Student(db.Model):
"""学生表:存储学生基础信息与积分"""
__tablename__ = 'students'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
student_id = db.Column(db.String(20), unique=True, nullable=False, comment='学号')
name = db.Column(db.String(20), nullable=False, comment='姓名')
major = db.Column(db.String(50), nullable=False, comment='专业')
points = db.Column(db.Float, default=0.0, comment='总积分')
call_count = db.Column(db.Integer, default=0, comment='被点名次数')
create_time = db.Column(db.DateTime, default=datetime.now, comment='创建时间')
class RollCallRecord(db.Model):
"""点名记录表:存储每次点名详情"""
__tablename__ = 'roll_call_records'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
student_id = db.Column(db.String(20), db.ForeignKey('students.student_id'), nullable=False, comment='学号')
call_mode = db.Column(db.String(10), nullable=False, comment='点名模式random/order')
call_time = db.Column(db.DateTime, default=datetime.now, comment='点名时间')
is_answer = db.Column(db.Boolean, default=False, comment='是否回答问题')
repeat_question = db.Column(db.Boolean, nullable=True, comment='是否准确重复问题')
answer_correct = db.Column(db.Boolean, nullable=True, comment='回答是否正确')
point_change = db.Column(db.Float, default=0.0, comment='本次积分变化')
class PointRule(db.Model):
"""积分规则表:可灵活配置积分规则(避免硬编码)"""
__tablename__ = 'point_rules'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
rule_type = db.Column(db.String(30), unique=True, nullable=False, comment='规则类型')
point_value = db.Column(db.Float, nullable=False, comment='对应分值')
description = db.Column(db.String(100), comment='规则描述')
# ---------------------- 3. 初始化数据库表与默认规则无修改兼容MySQL ----------------------
def init_db():
with app.app_context():
db.create_all() # 自动创建所有表(首次运行时执行)
# 插入默认积分规则(若不存在)
default_rules = [
('attend_call', 1.0, '到课被点到加分'),
('repeat_correct', 0.5, '准确重复问题加分'),
('repeat_incorrect', -1.0, '未准确重复问题扣分'),
('answer_correct_low', 0.5, '回答正确(基础分)'),
('answer_correct_mid', 1.5, '回答正确(中等分)'),
('answer_correct_high', 3.0, '回答正确(高分)')
]
for rule_type, point_value, desc in default_rules:
if not PointRule.query.filter_by(rule_type=rule_type).first():
new_rule = PointRule(rule_type=rule_type, point_value=point_value, description=desc)
db.session.add(new_rule)
db.session.commit()
print("数据库初始化完成数据已存储到db4free在线MySQL中")
# ---------------------- 4. 工具函数(无修改,逻辑不变) ----------------------
def get_point_rule(rule_type):
"""获取指定类型的积分规则分值"""
rule = PointRule.query.filter_by(rule_type=rule_type).first()
return rule.point_value if rule else 0.0
def calculate_random_weight(students):
"""计算随机点名权重:积分越高,权重越低(权重=1/(积分+1)"""
total_weight = sum(1/(student.points + 1) for student in students)
weights = [(1/(s.points + 1))/total_weight for s in students]
return weights
# ---------------------- 5. 核心接口(无修改,所有功能保持不变) ----------------------
@app.route('/api/student/import', methods=['POST'])
def import_students():
"""导入学生信息Excel文件- 支持.xlsx格式含格式校验"""
if 'file' not in request.files:
return jsonify({'code': 400, 'msg': '未上传文件'}), 400
file = request.files['file']
try:
if not file.filename.endswith('.xlsx'):
return jsonify({'code': 400, 'msg': '仅支持.xlsx格式的Excel文件请将文件另存为.xlsx后上传'}), 400
df = pd.read_excel(file, engine='openpyxl', dtype=str)
df = df.dropna(how='all')
required_cols = ['学号', '姓名', '专业']
if not all(col in df.columns for col in required_cols):
missing_cols = [col for col in required_cols if col not in df.columns]
return jsonify({'code': 400, 'msg': f'Excel缺少必填列{", ".join(missing_cols)}(表头必须严格为「学号、姓名、专业」)'}), 400
success_count = 0
fail_count = 0
fail_msg = []
existing_student_ids = [s.student_id for s in Student.query.with_entities(Student.student_id).all()]
for idx, row in df.iterrows():
student_id = str(row['学号']).strip() if pd.notna(row['学号']) else ''
name = str(row['姓名']).strip() if pd.notna(row['姓名']) else ''
major = str(row['专业']).strip() if pd.notna(row['专业']) else ''
if not student_id or not name or not major:
fail_count += 1
fail_msg.append(f'{idx+1}行:存在空数据,跳过')
continue
if not student_id.isdigit() or len(student_id) != 9:
fail_count += 1
fail_msg.append(f'{idx+1}行:学号{student_id}格式错误需9位数字跳过')
continue
if student_id in existing_student_ids:
fail_count += 1
fail_msg.append(f'{idx+1}行:学号{student_id}已存在,跳过')
continue
new_student = Student(student_id=student_id, name=name, major=major)
db.session.add(new_student)
success_count += 1
existing_student_ids.append(student_id)
db.session.commit()
return jsonify({
'code': 200,
'msg': f'导入完成!成功导入{success_count}条,失败{fail_count}',
'data': {
'success_count': success_count,
'fail_count': fail_count,
'fail_msg': fail_msg
}
}), 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
# db4free专属错误提示
if 'Lost connection' in error_msg or '10060' in error_msg or '10061' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时/失败建议1. 检查账号密码/数据库名是否正确2. 1分钟后重试3. 检查网络是否稳定'}), 500
return jsonify({'code': 500, 'msg': f'导入失败:{error_msg}'}), 500
@app.route('/api/rollcall/random', methods=['GET'])
def random_rollcall():
"""随机点名(按积分权重)"""
try:
students = Student.query.all()
if not students:
return jsonify({'code': 400, 'msg': '暂无学生信息,请先导入'}), 400
weights = calculate_random_weight(students)
selected_student = random.choices(students, weights=weights, k=1)[0]
new_record = RollCallRecord(
student_id=selected_student.student_id,
call_mode='random',
is_answer=False,
point_change=get_point_rule('attend_call')
)
selected_student.call_count += 1
selected_student.points = round(selected_student.points + new_record.point_change, 1)
db.session.add(new_record)
db.session.commit()
return jsonify({
'code': 200,
'msg': '随机点名成功',
'data': {
'student_id': selected_student.student_id,
'name': selected_student.name,
'major': selected_student.major,
'current_points': selected_student.points,
'call_count': selected_student.call_count,
'call_time': new_record.call_time.strftime('%Y-%m-%d %H:%M:%S')
}
}), 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'点名失败:{error_msg}'}), 500
@app.route('/api/rollcall/order', methods=['GET'])
def order_rollcall():
"""顺序点名(按学号升序循环)"""
try:
last_record = RollCallRecord.query.filter_by(call_mode='order').order_by(desc('call_time')).first()
students = Student.query.order_by(Student.student_id).all()
if not students:
return jsonify({'code': 400, 'msg': '暂无学生信息,请先导入'}), 400
if not last_record:
selected_student = students[0]
else:
last_index = next((i for i, s in enumerate(students) if s.student_id == last_record.student_id), -1)
selected_student = students[(last_index + 1) % len(students)]
new_record = RollCallRecord(
student_id=selected_student.student_id,
call_mode='order',
is_answer=False,
point_change=get_point_rule('attend_call')
)
selected_student.call_count += 1
selected_student.points = round(selected_student.points + new_record.point_change, 1)
db.session.add(new_record)
db.session.commit()
return jsonify({
'code': 200,
'msg': '顺序点名成功',
'data': {
'student_id': selected_student.student_id,
'name': selected_student.name,
'major': selected_student.major,
'current_points': selected_student.points,
'call_count': selected_student.call_count,
'call_time': new_record.call_time.strftime('%Y-%m-%d %H:%M:%S')
}
}), 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'点名失败:{error_msg}'}), 500
@app.route('/api/student/update-points', methods=['POST'])
def update_student_points():
"""更新学生积分(回答问题后)"""
data = request.json
required_fields = ['student_id', 'is_answer', 'repeat_question', 'answer_correct_level']
if not all(field in data for field in required_fields):
return jsonify({'code': 400, 'msg': '缺少必填参数student_id/is_answer/repeat_question/answer_correct_level'}), 400
try:
student = Student.query.filter_by(student_id=data['student_id']).first()
if not student:
return jsonify({'code': 404, 'msg': '学生不存在'}), 404
last_record = RollCallRecord.query.filter_by(
student_id=data['student_id'],
is_answer=False
).order_by(desc('call_time')).first()
if not last_record:
return jsonify({'code': 400, 'msg': '无未完成的点名记录(需先点名再更新积分)'}), 400
point_change = last_record.point_change
if data['is_answer']:
if data['repeat_question']:
point_change += get_point_rule('repeat_correct')
else:
point_change += get_point_rule('repeat_incorrect')
level_map = {
'low': 'answer_correct_low',
'mid': 'answer_correct_mid',
'high': 'answer_correct_high'
}
rule_type = level_map.get(data['answer_correct_level'], 'answer_correct_low')
point_change += get_point_rule(rule_type)
points_to_add = point_change - last_record.point_change
student.points = round(student.points + points_to_add, 1)
last_record.is_answer = data['is_answer']
last_record.repeat_question = data['repeat_question']
last_record.answer_correct = (data['answer_correct_level'] != 'none') if data['is_answer'] else None
last_record.point_change = point_change
db.session.commit()
return jsonify({
'code': 200,
'msg': '积分更新成功',
'data': {
'student_id': student.student_id,
'name': student.name,
'current_points': student.points,
'total_point_change': round(point_change, 1)
}
}), 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'积分更新失败:{error_msg}'}), 500
@app.route('/api/student/rank', methods=['GET'])
def get_student_rank():
"""获取学生积分排名(全量)"""
try:
students = Student.query.order_by(desc(Student.points)).all()
rank_list = [
{
'rank': i+1,
'student_id': s.student_id,
'name': s.name,
'major': s.major,
'points': round(s.points, 1),
'call_count': s.call_count
} for i, s in enumerate(students)
]
return jsonify({
'code': 200,
'msg': '排名查询成功',
'data': rank_list
}), 200
except Exception as e:
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'排名查询失败:{error_msg}'}), 500
@app.route('/api/student/rank/top/<int:top_n>', methods=['GET'])
def get_top_student_rank(top_n):
"""获取积分最高的top_n名学生前端可视化专用"""
try:
top_students = Student.query.order_by(desc(Student.points)).limit(top_n).all()
if not top_students:
return jsonify({'code': 400, 'msg': '暂无学生信息,请先导入'}), 400
rank_list = []
for idx, student in enumerate(top_students):
random_call_count = RollCallRecord.query.filter_by(
student_id=student.student_id,
call_mode='random'
).count()
rank_list.append({
'rank': idx + 1,
'student_id': student.student_id,
'name': student.name,
'major': student.major,
'points': round(student.points, 1),
'call_count': student.call_count,
'random_call_count': random_call_count
})
return jsonify({
'code': 200,
'msg': f'获取Top{top_n}积分排名成功',
'data': rank_list
}), 200
except Exception as e:
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'Top{top_n}排名查询失败:{error_msg}'}), 500
@app.route('/api/student/export', methods=['GET'])
def export_student_points():
"""导出积分详单Excel"""
try:
students = Student.query.order_by(desc(Student.points)).all()
export_data = [
{
'学号': s.student_id,
'姓名': s.name,
'专业': s.major,
'随机点名次数': len(RollCallRecord.query.filter_by(student_id=s.student_id, call_mode='random').all()),
'总点名次数': s.call_count,
'总积分': round(s.points, 1)
} for s in students
]
df = pd.DataFrame(export_data)
export_filename = f'积分详单_{datetime.now().strftime("%Y%m%d%H%M%S")}.xlsx'
export_path = os.path.join(EXPORT_DIR, export_filename)
df.to_excel(export_path, index=False, engine='openpyxl')
return send_file(export_path, as_attachment=True, download_name=export_filename)
except Exception as e:
error_msg = str(e)
if 'Lost connection' in error_msg:
return jsonify({'code': 500, 'msg': 'db4free连接超时请1分钟后重试'}), 500
return jsonify({'code': 500, 'msg': f'导出失败:{error_msg}'}), 500
# ---------------------- 6. 启动入口(无修改) ----------------------
if __name__ == '__main__':
init_db() # 首次运行自动创建表和默认规则db4free云端
app.run(host='0.0.0.0', port=5000, debug=True)