You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
11 KiB
275 lines
11 KiB
import json
|
|
import os
|
|
import csv
|
|
from datetime import date
|
|
from typing import List, Optional
|
|
from model.Student import Student
|
|
|
|
class StudentDAL:
|
|
def __init__(self, file_path: str):
|
|
self.file_path = file_path
|
|
self._ensure_file_exists()
|
|
|
|
def _ensure_file_exists(self):
|
|
if not os.path.exists(self.file_path):
|
|
with open(self.file_path, mode='w', encoding='utf-8') as file:
|
|
json.dump([], file)
|
|
|
|
def save_students(self, students: List[Student]) -> None:
|
|
student_dicts = [student.to_dict() for student in students]
|
|
with open(self.file_path, mode='w', encoding='utf-8') as file:
|
|
json.dump(student_dicts, file, ensure_ascii=False, indent=4)
|
|
|
|
def load_students(self) -> List[Student]:
|
|
with open(self.file_path, mode='r', encoding='utf-8') as file:
|
|
student_dicts = json.load(file)
|
|
return [Student.from_dict(student_dict) for student_dict in student_dicts]
|
|
|
|
def add_student(self, student: Student) -> bool:
|
|
students = self.load_students()
|
|
for existing_student in students:
|
|
if existing_student.sid == student.sid:
|
|
return False
|
|
students.append(student)
|
|
self.save_students(students)
|
|
return True
|
|
|
|
def find_student_by_sid(self, sid: str) -> Optional[Student]:
|
|
students = self.load_students()
|
|
for student in students:
|
|
if student.sid == sid:
|
|
return student
|
|
return None
|
|
|
|
def find_student_by_id_number(self, id_number: str) -> Optional[Student]:
|
|
students = self.load_students()
|
|
for student in students:
|
|
if student.id_number == id_number:
|
|
return student
|
|
return None
|
|
|
|
def find_students_by_name(self, name: str) -> List[Student]:
|
|
students = self.load_students()
|
|
return [student for student in students if name.lower() in student.name.lower()]
|
|
|
|
def find_students_by_class_name(self, class_name: str) -> List[Student]:
|
|
students = self.load_students()
|
|
return [student for student in students if class_name.lower() in student.class_name.lower()]
|
|
|
|
def find_students_by_major(self, major: str) -> List[Student]:
|
|
students = self.load_students()
|
|
return [student for student in students if major.lower() in student.major.lower()]
|
|
|
|
def get_all_students(self) -> List[Student]:
|
|
return self.load_students()
|
|
|
|
def update_student(self, sid: str, updated_student: Student) -> bool:
|
|
students = self.load_students()
|
|
for i, student in enumerate(students):
|
|
if student.sid == sid:
|
|
students[i] = updated_student
|
|
self.save_students(students)
|
|
return True
|
|
return False
|
|
|
|
def update_student_partial(self, sid: str, name: Optional[str] = None, height: Optional[int] = None,
|
|
birth_date: Optional[date] = None, enrollment_date: Optional[date] = None,
|
|
class_name: Optional[str] = None, major: Optional[str] = None,
|
|
phone: Optional[str] = None, email: Optional[str] = None) -> bool:
|
|
students = self.load_students()
|
|
for i, student in enumerate(students):
|
|
if student.sid == sid:
|
|
if name is not None:
|
|
students[i].name = name
|
|
if height is not None:
|
|
students[i].height = height
|
|
if birth_date is not None:
|
|
students[i].birth_date = birth_date
|
|
if enrollment_date is not None:
|
|
students[i].enrollment_date = enrollment_date
|
|
if class_name is not None:
|
|
students[i].class_name = class_name
|
|
if major is not None:
|
|
students[i].major = major
|
|
if phone is not None:
|
|
students[i].phone = phone
|
|
if email is not None:
|
|
students[i].email = email
|
|
self.save_students(students)
|
|
return True
|
|
return False
|
|
|
|
def delete_student(self, sid: str) -> bool:
|
|
students = self.load_students()
|
|
initial_length = len(students)
|
|
students = [student for student in students if student.sid != sid]
|
|
if len(students) < initial_length:
|
|
self.save_students(students)
|
|
return True
|
|
return False
|
|
|
|
def delete_student_by_id_number(self, id_number: str) -> bool:
|
|
students = self.load_students()
|
|
initial_length = len(students)
|
|
students = [student for student in students if student.id_number != id_number]
|
|
if len(students) < initial_length:
|
|
self.save_students(students)
|
|
return True
|
|
return False
|
|
|
|
def clear_all_students(self) -> None:
|
|
self.save_students([])
|
|
|
|
def get_student_count(self) -> int:
|
|
return len(self.load_students())
|
|
|
|
def check_student_exists(self, sid: str) -> bool:
|
|
return self.find_student_by_sid(sid) is not None
|
|
|
|
def export_to_json(self, file_path):
|
|
students = self.load_students()
|
|
student_dicts = [student.to_dict() for student in students]
|
|
with open(file_path, mode='w', encoding='utf-8') as file:
|
|
json.dump(student_dicts, file, ensure_ascii=False, indent=4)
|
|
|
|
def import_from_json(self, file_path):
|
|
try:
|
|
with open(file_path, mode='r', encoding='utf-8') as file:
|
|
student_dicts = json.load(file)
|
|
students = [Student.from_dict(student_dict) for student_dict in student_dicts]
|
|
self.save_students(students)
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
except json.JSONDecodeError:
|
|
print(f"文件 {file_path} 不是有效的 JSON 文件。")
|
|
except ValueError as e:
|
|
print(f"数据转换错误: {e}")
|
|
|
|
def export_to_csv(self, file_path):
|
|
students = self.load_students()
|
|
if not students:
|
|
print("没有数据可导出")
|
|
return
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
|
|
writer = csv.DictWriter(
|
|
f,
|
|
fieldnames=[
|
|
'sid', 'name', 'gender', 'height', 'weight', 'birth_date', 'enrollment_date',
|
|
'class_name', 'id_number', 'phone', 'email', 'major'
|
|
]
|
|
)
|
|
writer.writeheader()
|
|
for student in students:
|
|
data = student.to_dict()
|
|
# 将日期对象转换为字符串
|
|
data['birth_date'] = data['birth_date'].isoformat()
|
|
data['enrollment_date'] = data['enrollment_date'].isoformat()
|
|
writer.writerow(data)
|
|
print(f"数据已导出到 {file_path}")
|
|
except Exception as e:
|
|
print(f"导出失败: {e}")
|
|
|
|
def import_from_csv(self, file_path):
|
|
try:
|
|
students = []
|
|
with open(file_path, mode='r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
for row in reader:
|
|
row['birth_date'] = date.fromisoformat(row['birth_date'])
|
|
row['enrollment_date'] = date.fromisoformat(row['enrollment_date'])
|
|
student = Student.from_dict(row)
|
|
students.append(student)
|
|
self.save_students(students)
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
except ValueError as e:
|
|
print(f"日期格式错误: {e}")
|
|
|
|
def get_average_height(self):
|
|
students = self.load_students()
|
|
if not students:
|
|
return 0
|
|
total_height = sum([student.height for student in students])
|
|
return total_height / len(students)
|
|
|
|
def count_students_by_height_range(self, min_height: int, max_height: int):
|
|
students = self.load_students()
|
|
return len([student for student in students if min_height <= student.height <= max_height])
|
|
|
|
def count_students_by_enrollment_year(self, year: int):
|
|
students = self.load_students()
|
|
return len([student for student in students if student.enrollment_date.year == year])
|
|
|
|
def count_students_by_age_range(self, min_age: int, max_age: int):
|
|
today = date.today()
|
|
students = self.load_students()
|
|
return len([student for student in students if min_age <= today.year - student.birth_date.year <= max_age])
|
|
|
|
def backup_data(self, backup_path):
|
|
try:
|
|
with open(self.file_path, mode='r', encoding='utf-8') as src_file, \
|
|
open(backup_path, mode='w', encoding='utf-8') as dst_file:
|
|
dst_file.write(src_file.read())
|
|
print(f"数据备份到 {backup_path} 成功。")
|
|
except Exception as e:
|
|
print(f"数据备份失败: {e}")
|
|
|
|
def restore_data(self, backup_path):
|
|
try:
|
|
with open(backup_path, mode='r', encoding='utf-8') as src_file, \
|
|
open(self.file_path, mode='w', encoding='utf-8') as dst_file:
|
|
dst_file.write(src_file.read())
|
|
print(f"数据从 {backup_path} 恢复成功。")
|
|
except FileNotFoundError:
|
|
print(f"备份文件 {backup_path} 未找到。")
|
|
except Exception as e:
|
|
print(f"数据恢复失败: {e}")
|
|
|
|
def count_students_by_major(self):
|
|
"""统计各专业学生人数"""
|
|
students = self.load_students()
|
|
major_count = {}
|
|
for student in students:
|
|
major = student.major
|
|
if major in major_count:
|
|
major_count[major] += 1
|
|
else:
|
|
major_count[major] = 1
|
|
return major_count
|
|
|
|
def calculate_average_height_weight(self, group_by='major'):
|
|
"""计算平均身高/体重(按班级或专业)"""
|
|
students = self.load_students()
|
|
group_info = {}
|
|
for student in students:
|
|
group_key = getattr(student, group_by)
|
|
if group_key not in group_info:
|
|
group_info[group_key] = {'height_sum': 0, 'weight_sum': 0, 'count': 0}
|
|
group_info[group_key]['height_sum'] += student.height
|
|
group_info[group_key]['weight_sum'] += student.weight
|
|
group_info[group_key]['count'] += 1
|
|
|
|
result = {}
|
|
for group, info in group_info.items():
|
|
average_height = info['height_sum'] / info['count']
|
|
average_weight = info['weight_sum'] / info['count']
|
|
result[group] = {'average_height': average_height, 'average_weight': average_weight}
|
|
return result
|
|
|
|
def calculate_gender_ratio(self):
|
|
"""统计性别比例"""
|
|
students = self.load_students()
|
|
male_count = 0
|
|
female_count = 0
|
|
for student in students:
|
|
if student.gender == '男':
|
|
male_count += 1
|
|
elif student.gender == '女':
|
|
female_count += 1
|
|
total_count = male_count + female_count
|
|
if total_count == 0:
|
|
return {'male_ratio': 0, 'female_ratio': 0}
|
|
male_ratio = male_count / total_count
|
|
female_ratio = female_count / total_count
|
|
return {'male_ratio': male_ratio, 'female_ratio': female_ratio} |