You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
6.9 KiB

import sqlite3
import pandas as pd
import random
def init_database():
"""初始化数据库和表"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
# 创建学生表
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT UNIQUE,
name TEXT,
major TEXT,
score INTEGER DEFAULT 0,
call_count INTEGER DEFAULT 0
)
''')
# 创建点名记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS call_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT,
call_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
call_type TEXT
)
''')
conn.commit()
conn.close()
print("数据库初始化完成!")
def import_from_excel(file_path):
"""从Excel导入学生数据"""
try:
# 读取Excel文件
df = pd.read_excel(file_path)
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
success_count = 0
# 插入数据到数据库
for _, row in df.iterrows():
try:
cursor.execute('''
INSERT OR REPLACE INTO students (student_id, name, major)
VALUES (?, ?, ?)
''', (str(row['学号']), row['姓名'], row.get('专业', '未知')))
success_count += 1
except Exception as e:
print(f"插入学生 {row['姓名']} 时出错: {e}")
conn.commit()
conn.close()
print(f"成功导入 {success_count} 名学生数据")
return True
except Exception as e:
print(f"导入错误: {e}")
return False
def get_all_students():
"""获取所有学生"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute('SELECT student_id, name, score, call_count FROM students ORDER BY student_id')
students = cursor.fetchall()
conn.close()
return students
def get_random_student():
"""随机获取一个学生(积分越低越容易被点到)"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
# 获取所有学生
cursor.execute('SELECT student_id, name, score, call_count FROM students')
all_students = cursor.fetchall()
conn.close()
if not all_students:
return None
# 使用权重随机:积分越低,权重越高
weights = []
for student in all_students:
score = student[2] # 积分
# 积分越低,权重越高(+1 避免除零)
weight = 1.0 / (score + 1)
weights.append(weight)
# 根据权重随机选择
selected_student = random.choices(all_students, weights=weights, k=1)[0]
# 更新点名次数
update_call_count(selected_student[0], 'random')
return selected_student
def get_sequential_student():
"""按顺序获取学生"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
# 获取所有学生,按学号排序
cursor.execute('SELECT student_id, name FROM students ORDER BY student_id')
all_students = cursor.fetchall()
if not all_students:
return None
# 获取最后一次顺序点名的学生
cursor.execute('''
SELECT student_id FROM call_records
WHERE call_type = 'sequential'
ORDER BY call_time DESC LIMIT 1
''')
last_student = cursor.fetchone()
next_student = None
if last_student:
last_id = last_student[0]
# 找到当前学生在列表中的位置
current_index = None
for i, (stu_id, _) in enumerate(all_students):
if stu_id == last_id:
current_index = i
break
# 取下一个学生
if current_index is not None and current_index + 1 < len(all_students):
next_student = all_students[current_index + 1]
else:
# 如果是最后一个或没找到,返回第一个
next_student = all_students[0]
else:
# 如果没有记录,返回第一个学生
next_student = all_students[0]
if not next_student:
return None
# 获取完整信息
cursor.execute('SELECT student_id, name, score, call_count FROM students WHERE student_id = ?', (next_student[0],))
full_info = cursor.fetchone()
# 记录这次点名
if full_info:
update_call_count(full_info[0], 'sequential')
return full_info
except Exception as e:
print(f"顺序点名错误: {e}")
return None
finally:
conn.close()
def update_call_count(student_id, call_type):
"""更新点名次数和记录"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
# 更新点名次数
cursor.execute('UPDATE students SET call_count = call_count + 1 WHERE student_id = ?', (student_id,))
# 添加点名记录
cursor.execute('INSERT INTO call_records (student_id, call_type) VALUES (?, ?)', (student_id, call_type))
conn.commit()
print(f"更新学生 {student_id} 的点名记录")
except Exception as e:
print(f"更新点名记录错误: {e}")
finally:
conn.close()
def update_score(student_id, score_change):
"""更新学生积分"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
try:
cursor.execute('UPDATE students SET score = score + ? WHERE student_id = ?', (score_change, student_id))
conn.commit()
print(f"更新学生 {student_id} 积分: {score_change}")
return True
except Exception as e:
print(f"更新积分错误: {e}")
return False
finally:
conn.close()
def get_ranking_data(limit=10):
"""获取积分排名数据"""
students = get_all_students()
# 按积分排序(降序)
sorted_students = sorted(students, key=lambda x: x[2], reverse=True)
ranking_data = []
for i, student in enumerate(sorted_students[:limit]):
ranking_data.append({
'rank': i + 1,
'name': student[1],
'score': student[2],
'call_count': student[3]
})
return ranking_data
def clear_all_data():
"""清空所有数据(用于测试)"""
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM students')
cursor.execute('DELETE FROM call_records')
conn.commit()
conn.close()
print("所有数据已清空")
# 初始化数据库(每次导入时自动执行)
init_database()