import sqlite3 import pandas as pd import random def init_database(): """初始化数据库和表""" conn = sqlite3.connect('database.db') cursor = conn.cursor() # 创建学生表 cursor.execute(''' CREATE TABLE IF NOT EXISTS students ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT UNIQUE, name TEXT, major TEXT, score INTEGER DEFAULT 0, call_count INTEGER DEFAULT 0 ) ''') # 创建点名记录表 cursor.execute(''' CREATE TABLE IF NOT EXISTS call_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT, call_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, call_type TEXT ) ''') conn.commit() conn.close() print("数据库初始化完成!") def import_from_excel(file_path): """从Excel导入学生数据""" try: # 读取Excel文件 df = pd.read_excel(file_path) conn = sqlite3.connect('database.db') cursor = conn.cursor() success_count = 0 # 插入数据到数据库 for _, row in df.iterrows(): try: cursor.execute(''' INSERT OR REPLACE INTO students (student_id, name, major) VALUES (?, ?, ?) ''', (str(row['学号']), row['姓名'], row.get('专业', '未知'))) success_count += 1 except Exception as e: print(f"插入学生 {row['姓名']} 时出错: {e}") conn.commit() conn.close() print(f"成功导入 {success_count} 名学生数据") return True except Exception as e: print(f"导入错误: {e}") return False def get_all_students(): """获取所有学生""" conn = sqlite3.connect('database.db') cursor = conn.cursor() cursor.execute('SELECT student_id, name, score, call_count FROM students ORDER BY student_id') students = cursor.fetchall() conn.close() return students def get_random_student(): """随机获取一个学生(积分越低越容易被点到)""" conn = sqlite3.connect('database.db') cursor = conn.cursor() # 获取所有学生 cursor.execute('SELECT student_id, name, score, call_count FROM students') all_students = cursor.fetchall() conn.close() if not all_students: return None # 使用权重随机:积分越低,权重越高 weights = [] for student in all_students: score = student[2] # 积分 # 积分越低,权重越高(+1 避免除零) weight = 1.0 / (score + 1) weights.append(weight) # 根据权重随机选择 selected_student = random.choices(all_students, weights=weights, k=1)[0] # 更新点名次数 update_call_count(selected_student[0], 'random') return selected_student def get_sequential_student(): """按顺序获取学生""" conn = sqlite3.connect('database.db') cursor = conn.cursor() try: # 获取所有学生,按学号排序 cursor.execute('SELECT student_id, name FROM students ORDER BY student_id') all_students = cursor.fetchall() if not all_students: return None # 获取最后一次顺序点名的学生 cursor.execute(''' SELECT student_id FROM call_records WHERE call_type = 'sequential' ORDER BY call_time DESC LIMIT 1 ''') last_student = cursor.fetchone() next_student = None if last_student: last_id = last_student[0] # 找到当前学生在列表中的位置 current_index = None for i, (stu_id, _) in enumerate(all_students): if stu_id == last_id: current_index = i break # 取下一个学生 if current_index is not None and current_index + 1 < len(all_students): next_student = all_students[current_index + 1] else: # 如果是最后一个或没找到,返回第一个 next_student = all_students[0] else: # 如果没有记录,返回第一个学生 next_student = all_students[0] if not next_student: return None # 获取完整信息 cursor.execute('SELECT student_id, name, score, call_count FROM students WHERE student_id = ?', (next_student[0],)) full_info = cursor.fetchone() # 记录这次点名 if full_info: update_call_count(full_info[0], 'sequential') return full_info except Exception as e: print(f"顺序点名错误: {e}") return None finally: conn.close() def update_call_count(student_id, call_type): """更新点名次数和记录""" conn = sqlite3.connect('database.db') cursor = conn.cursor() try: # 更新点名次数 cursor.execute('UPDATE students SET call_count = call_count + 1 WHERE student_id = ?', (student_id,)) # 添加点名记录 cursor.execute('INSERT INTO call_records (student_id, call_type) VALUES (?, ?)', (student_id, call_type)) conn.commit() print(f"更新学生 {student_id} 的点名记录") except Exception as e: print(f"更新点名记录错误: {e}") finally: conn.close() def update_score(student_id, score_change): """更新学生积分""" conn = sqlite3.connect('database.db') cursor = conn.cursor() try: cursor.execute('UPDATE students SET score = score + ? WHERE student_id = ?', (score_change, student_id)) conn.commit() print(f"更新学生 {student_id} 积分: {score_change}") return True except Exception as e: print(f"更新积分错误: {e}") return False finally: conn.close() def get_ranking_data(limit=10): """获取积分排名数据""" students = get_all_students() # 按积分排序(降序) sorted_students = sorted(students, key=lambda x: x[2], reverse=True) ranking_data = [] for i, student in enumerate(sorted_students[:limit]): ranking_data.append({ 'rank': i + 1, 'name': student[1], 'score': student[2], 'call_count': student[3] }) return ranking_data def clear_all_data(): """清空所有数据(用于测试)""" conn = sqlite3.connect('database.db') cursor = conn.cursor() cursor.execute('DELETE FROM students') cursor.execute('DELETE FROM call_records') conn.commit() conn.close() print("所有数据已清空") # 初始化数据库(每次导入时自动执行) init_database()