You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sqlite3
import random
import csv
import datetime
class PointsSystem:
def __init__(self, db_path='students.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT NOT NULL,
name TEXT NOT NULL,
major TEXT,
points INTEGER DEFAULT 0
)
''')
conn.commit()
conn.close()
def add_points(self, student_id, points_type="attendance"):
"""添加积分"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 不同的积分类型
points_map = {
"attendance": 1, # 正常点名
"repeat_question": 0.5, # 重复问题
"answer_correct": 2, # 回答问题正确
"answer_wrong": -1 # 回答问题错误
}
points = points_map.get(points_type, 1)
cursor.execute(
"UPDATE students SET points = points + ? WHERE student_id = ?",
(points, student_id)
)
conn.commit()
conn.close()
return points
def get_ranking(self, top_n=5):
"""获取积分排名"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT name, student_id, points
FROM students
ORDER BY points DESC
LIMIT ?
''', (top_n,))
ranking = cursor.fetchall()
conn.close()
return ranking
def get_weighted_random_student(self):
"""根据积分权重随机选择学生(积分越低越容易被选)"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT student_id, name, points FROM students")
students = cursor.fetchall()
conn.close()
if not students:
return None
# 修复:积分越低,权重越高(被点概率越大)
weights = []
for _, _, points in students:
# 积分0→权重10积分越高→权重越低
weight = 10 - min(points, 9) # 确保权重至少为1
weights.append(max(weight, 1))
print(f"权重点名调试信息:")
for i, (student_id, name, points) in enumerate(students):
print(f" {name}({points}分) - 权重:{weights[i]}")
# 加权随机选择
total_weight = sum(weights)
rand_val = random.uniform(0, total_weight)
current_weight = 0
for i, weight in enumerate(weights):
current_weight += weight
if rand_val <= current_weight:
print(f" 选中:{students[i][1]},权重:{weight}")
return students[i]
return students[0]
def export_to_csv(self, filename=None):
"""导出积分详单到CSV文件"""
if filename is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"积分详单_{timestamp}.csv"
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT student_id, name, major, points
FROM students
ORDER BY points DESC
''')
students = cursor.fetchall()
conn.close()
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
# 写入表头
writer.writerow(['学号', '姓名', '专业', '总积分', '导出时间'])
# 写入数据
for student_id, name, major, points in students:
export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
writer.writerow([student_id, name, major, points, export_time])
return filename
def get_all_students_data(self):
"""获取所有学生数据用于图表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT name, points
FROM students
ORDER BY points DESC
''')
students = cursor.fetchall()
conn.close()
return students
def has_students(self):
"""检查是否有学生数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM students")
count = cursor.fetchone()[0]
conn.close()
return count > 0
def reset_all_points(self):
"""重置所有积分"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("UPDATE students SET points = 0")
conn.commit()
conn.close()
return True
def get_student_count(self):
"""获取学生总数"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM students")
count = cursor.fetchone()[0]
conn.close()
return count
def get_total_points(self):
"""获取总积分"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT SUM(points) FROM students")
total = cursor.fetchone()[0]
conn.close()
return total if total else 0
def get_average_points(self):
"""获取平均积分"""
count = self.get_student_count()
if count == 0:
return 0
total = self.get_total_points()
return round(total / count, 2)
def get_top_student(self):
"""获取积分最高的学生"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT name, student_id, points
FROM students
ORDER BY points DESC
LIMIT 1
''')
top_student = cursor.fetchone()
conn.close()
return top_student
def get_lowest_student(self):
"""获取积分最低的学生"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT name, student_id, points
FROM students
ORDER BY points ASC
LIMIT 1
''')
lowest_student = cursor.fetchone()
conn.close()
return lowest_student
def get_student_by_id(self, student_id):
"""根据学号获取学生信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT student_id, name, major, points
FROM students
WHERE student_id = ?
''', (student_id,))
student = cursor.fetchone()
conn.close()
return student
def update_student_info(self, student_id, name=None, major=None):
"""更新学生信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if name and major:
cursor.execute('''
UPDATE students
SET name = ?, major = ?
WHERE student_id = ?
''', (name, major, student_id))
elif name:
cursor.execute('''
UPDATE students
SET name = ?
WHERE student_id = ?
''', (name, student_id))
elif major:
cursor.execute('''
UPDATE students
SET major = ?
WHERE student_id = ?
''', (major, student_id))
conn.commit()
conn.close()
return True
def delete_student(self, student_id):
"""删除学生"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
DELETE FROM students
WHERE student_id = ?
''', (student_id,))
conn.commit()
conn.close()
return True
def add_student(self, student_id, name, major):
"""添加学生"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO students (student_id, name, major)
VALUES (?, ?, ?)
''', (student_id, name, major))
conn.commit()
conn.close()
return True
def get_statistics(self):
"""获取统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取各种统计信息
cursor.execute("SELECT COUNT(*) FROM students")
total_students = cursor.fetchone()[0]
cursor.execute("SELECT SUM(points) FROM students")
total_points = cursor.fetchone()[0] or 0
cursor.execute("SELECT AVG(points) FROM students")
avg_points = cursor.fetchone()[0] or 0
cursor.execute("SELECT MAX(points) FROM students")
max_points = cursor.fetchone()[0] or 0
cursor.execute("SELECT MIN(points) FROM students")
min_points = cursor.fetchone()[0] or 0
conn.close()
return {
'total_students': total_students,
'total_points': total_points,
'average_points': round(avg_points, 2),
'max_points': max_points,
'min_points': min_points
}
def backup_database(self, backup_path=None):
"""备份数据库"""
if backup_path is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"backup_students_{timestamp}.db"
import shutil
shutil.copy2(self.db_path, backup_path)
return backup_path
def import_from_csv(self, csv_filepath):
"""从CSV文件导入学生数据"""
try:
import pandas as pd
# 读取CSV文件
df = pd.read_csv(csv_filepath)
# 检查必要的列
required_columns = ['学号', '姓名']
if not all(col in df.columns for col in required_columns):
return False, "CSV文件必须包含'学号''姓名'"
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 清空现有数据
cursor.execute("DELETE FROM students")
# 插入新数据
for _, row in df.iterrows():
student_id = row['学号']
name = row['姓名']
major = row.get('专业', '未指定')
cursor.execute(
"INSERT INTO students (student_id, name, major) VALUES (?, ?, ?)",
(student_id, name, major)
)
conn.commit()
conn.close()
return True, f"成功导入 {len(df)} 名学生数据"
except ImportError:
return False, "需要安装pandas库来支持CSV导入"
except Exception as e:
return False, f"导入失败:{str(e)}"
# 测试代码
if __name__ == "__main__":
# 测试积分系统
ps = PointsSystem()
print("=== 积分系统测试 ===")
print(f"是否有学生数据: {ps.has_students()}")
print(f"学生总数: {ps.get_student_count()}")
# 获取统计信息
stats = ps.get_statistics()
print(f"统计信息: {stats}")
print("测试完成!")