import sqlite3 import random import csv import datetime class PointsSystem: def __init__(self, db_path='students.db'): self.db_path = db_path self.init_database() def init_database(self): """初始化数据库表""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS students ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL, name TEXT NOT NULL, major TEXT, points INTEGER DEFAULT 0 ) ''') conn.commit() conn.close() def add_points(self, student_id, points_type="attendance"): """添加积分""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 不同的积分类型 points_map = { "attendance": 1, # 正常点名 "repeat_question": 0.5, # 重复问题 "answer_correct": 2, # 回答问题正确 "answer_wrong": -1 # 回答问题错误 } points = points_map.get(points_type, 1) cursor.execute( "UPDATE students SET points = points + ? WHERE student_id = ?", (points, student_id) ) conn.commit() conn.close() return points def get_ranking(self, top_n=5): """获取积分排名""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT name, student_id, points FROM students ORDER BY points DESC LIMIT ? ''', (top_n,)) ranking = cursor.fetchall() conn.close() return ranking def get_weighted_random_student(self): """根据积分权重随机选择学生(积分越低越容易被选)""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT student_id, name, points FROM students") students = cursor.fetchall() conn.close() if not students: return None # 修复:积分越低,权重越高(被点概率越大) weights = [] for _, _, points in students: # 积分0→权重10,积分越高→权重越低 weight = 10 - min(points, 9) # 确保权重至少为1 weights.append(max(weight, 1)) print(f"权重点名调试信息:") for i, (student_id, name, points) in enumerate(students): print(f" {name}({points}分) - 权重:{weights[i]}") # 加权随机选择 total_weight = sum(weights) rand_val = random.uniform(0, total_weight) current_weight = 0 for i, weight in enumerate(weights): current_weight += weight if rand_val <= current_weight: print(f" 选中:{students[i][1]},权重:{weight}") return students[i] return students[0] def export_to_csv(self, filename=None): """导出积分详单到CSV文件""" if filename is None: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"积分详单_{timestamp}.csv" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT student_id, name, major, points FROM students ORDER BY points DESC ''') students = cursor.fetchall() conn.close() with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile: writer = csv.writer(csvfile) # 写入表头 writer.writerow(['学号', '姓名', '专业', '总积分', '导出时间']) # 写入数据 for student_id, name, major, points in students: export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") writer.writerow([student_id, name, major, points, export_time]) return filename def get_all_students_data(self): """获取所有学生数据用于图表""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT name, points FROM students ORDER BY points DESC ''') students = cursor.fetchall() conn.close() return students def has_students(self): """检查是否有学生数据""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM students") count = cursor.fetchone()[0] conn.close() return count > 0 def reset_all_points(self): """重置所有积分""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("UPDATE students SET points = 0") conn.commit() conn.close() return True def get_student_count(self): """获取学生总数""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM students") count = cursor.fetchone()[0] conn.close() return count def get_total_points(self): """获取总积分""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT SUM(points) FROM students") total = cursor.fetchone()[0] conn.close() return total if total else 0 def get_average_points(self): """获取平均积分""" count = self.get_student_count() if count == 0: return 0 total = self.get_total_points() return round(total / count, 2) def get_top_student(self): """获取积分最高的学生""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT name, student_id, points FROM students ORDER BY points DESC LIMIT 1 ''') top_student = cursor.fetchone() conn.close() return top_student def get_lowest_student(self): """获取积分最低的学生""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT name, student_id, points FROM students ORDER BY points ASC LIMIT 1 ''') lowest_student = cursor.fetchone() conn.close() return lowest_student def get_student_by_id(self, student_id): """根据学号获取学生信息""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT student_id, name, major, points FROM students WHERE student_id = ? ''', (student_id,)) student = cursor.fetchone() conn.close() return student def update_student_info(self, student_id, name=None, major=None): """更新学生信息""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if name and major: cursor.execute(''' UPDATE students SET name = ?, major = ? WHERE student_id = ? ''', (name, major, student_id)) elif name: cursor.execute(''' UPDATE students SET name = ? WHERE student_id = ? ''', (name, student_id)) elif major: cursor.execute(''' UPDATE students SET major = ? WHERE student_id = ? ''', (major, student_id)) conn.commit() conn.close() return True def delete_student(self, student_id): """删除学生""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' DELETE FROM students WHERE student_id = ? ''', (student_id,)) conn.commit() conn.close() return True def add_student(self, student_id, name, major): """添加学生""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO students (student_id, name, major) VALUES (?, ?, ?) ''', (student_id, name, major)) conn.commit() conn.close() return True def get_statistics(self): """获取统计信息""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 获取各种统计信息 cursor.execute("SELECT COUNT(*) FROM students") total_students = cursor.fetchone()[0] cursor.execute("SELECT SUM(points) FROM students") total_points = cursor.fetchone()[0] or 0 cursor.execute("SELECT AVG(points) FROM students") avg_points = cursor.fetchone()[0] or 0 cursor.execute("SELECT MAX(points) FROM students") max_points = cursor.fetchone()[0] or 0 cursor.execute("SELECT MIN(points) FROM students") min_points = cursor.fetchone()[0] or 0 conn.close() return { 'total_students': total_students, 'total_points': total_points, 'average_points': round(avg_points, 2), 'max_points': max_points, 'min_points': min_points } def backup_database(self, backup_path=None): """备份数据库""" if backup_path is None: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"backup_students_{timestamp}.db" import shutil shutil.copy2(self.db_path, backup_path) return backup_path def import_from_csv(self, csv_filepath): """从CSV文件导入学生数据""" try: import pandas as pd # 读取CSV文件 df = pd.read_csv(csv_filepath) # 检查必要的列 required_columns = ['学号', '姓名'] if not all(col in df.columns for col in required_columns): return False, "CSV文件必须包含'学号'和'姓名'列" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 清空现有数据 cursor.execute("DELETE FROM students") # 插入新数据 for _, row in df.iterrows(): student_id = row['学号'] name = row['姓名'] major = row.get('专业', '未指定') cursor.execute( "INSERT INTO students (student_id, name, major) VALUES (?, ?, ?)", (student_id, name, major) ) conn.commit() conn.close() return True, f"成功导入 {len(df)} 名学生数据" except ImportError: return False, "需要安装pandas库来支持CSV导入" except Exception as e: return False, f"导入失败:{str(e)}" # 测试代码 if __name__ == "__main__": # 测试积分系统 ps = PointsSystem() print("=== 积分系统测试 ===") print(f"是否有学生数据: {ps.has_students()}") print(f"学生总数: {ps.get_student_count()}") # 获取统计信息 stats = ps.get_statistics() print(f"统计信息: {stats}") print("测试完成!")