import csv from typing import List, Optional from .student_dal import IStudentDAL from ..model.student import Student from datetime import datetime class CsvStudentDAL(IStudentDAL): """学生信息的CSV文件存储实现""" def __init__(self, file_path: str = 'students.csv'): self.file_path = file_path self.headers = ['name', 'id_card', 'stu_id', 'gender', 'height', 'weight', 'enrollment_date', 'class_name', 'major', 'email', 'phone'] # 确保文件存在 self._ensure_file_exists() def _ensure_file_exists(self): """确保CSV文件存在并包含表头""" try: with open(self.file_path, 'r', encoding='utf-8') as file: reader = csv.reader(file) if not any(reader): # 文件为空 self._write_headers() except FileNotFoundError: self._write_headers() def _write_headers(self): """写入CSV文件表头""" with open(self.file_path, 'w', encoding='utf-8', newline='') as file: writer = csv.DictWriter(file, fieldnames=self.headers) writer.writeheader() def _student_to_row(self, student: Student) -> dict: """将Student对象转换为CSV行数据""" return { 'name': student.name, 'id_card': student.id_card, 'stu_id': student.stu_id, 'gender': student.gender, 'height': student.height, 'weight': student.weight, 'enrollment_date': str(student.enrollment_date) if student.enrollment_date else '', 'class_name': student.class_name, 'major': student.major, 'email': student.email, 'phone': student.phone } def _row_to_student(self, row: dict) -> Student: """将CSV行数据转换为Student对象""" # 处理日期类型,将字符串转换为datetime.date对象 enrollment_date = row['enrollment_date'] if enrollment_date: enrollment_date = datetime.strptime(enrollment_date, '%Y-%m-%d').date() # 处理布尔类型,将字符串转换为布尔值 gender = row['gender'] if gender: gender = gender.lower() == 'true' # 处理数值类型,将字符串转换为数值 height = int(row['height']) if row['height'] else None weight = float(row['weight']) if row['weight'] else None # 创建并返回学生对象 return Student( name=row['name'], id_card=row['id_card'], stu_id=row['stu_id'], gender=gender, height=height, weight=weight, enrollment_date=enrollment_date, class_name=row['class_name'], major=row['major'], email=row['email'], phone=row['phone'] ) def get_by_id(self, id_card: str) -> Optional[Student]: """根据身份证号获取学生信息""" with open(self.file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: if row['id_card'] == id_card: return self._row_to_student(row) return None def get_by_stu_id(self, stu_id: str) -> Optional[Student]: """根据学号获取学生信息""" with open(self.file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: if row['stu_id'] == stu_id: return self._row_to_student(row) return None def get_all(self) -> List[Student]: """获取所有学生信息""" students = [] with open(self.file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: students.append(self._row_to_student(row)) return students def add(self, student: Student) -> bool: """添加学生信息""" # 检查学号和身份证号是否已存在 if self.get_by_id(student.id_card) or self.get_by_stu_id(student.stu_id): return False # 将学生信息追加到CSV文件 with open(self.file_path, 'a', encoding='utf-8', newline='') as file: writer = csv.DictWriter(file, fieldnames=self.headers) writer.writerow(self._student_to_row(student)) return True def update(self, student: Student) -> bool: """ 更新学生信息 通过身份证号匹配要更新的记录 """ students = self.get_all() # 获取所有学生信息 updated = False # 重写整个CSV文件,替换匹配的学生记录 with open(self.file_path, 'w', encoding='utf-8', newline='') as file: writer = csv.DictWriter(file, fieldnames=self.headers) writer.writeheader()# 写入表头 for s in students: if s.id_card == student.id_card: # 找到匹配的学生记录 writer.writerow(self._student_to_row(student)) updated = True else: writer.writerow(self._student_to_row(s)) # 保持原有记录不变 return updated def delete_by_id(self, id_card: str) -> bool: """ 根据身份证号删除学生信息 通过重写整个文件排除要删除的记录 """ students = self.get_all() # 获取所有学生信息 deleted = False # 重写整个CSV文件,排除要删除的学生记录 with open(self.file_path, 'w', encoding='utf-8', newline='') as file: writer = csv.DictWriter(file, fieldnames=self.headers) writer.writeheader() # 写入表头 for s in students: if s.id_card == id_card: # 找到匹配的学生记录 deleted = True # 标记已删除 else: writer.writerow(self._student_to_row(s)) # 保留其他记录 return deleted def delete_by_stu_id(self, stu_id: str) -> bool: """ 根据学号删除学生信息 通过重写整个文件排除要删除的记录 """ students = self.get_all() # 获取所有学生信息 deleted = False # 重写整个CSV文件,排除要删除的学生记录 with open(self.file_path, 'w', encoding='utf-8', newline='') as file: writer = csv.DictWriter(file, fieldnames=self.headers) writer.writeheader() # 写入表头 for s in students: if s.stu_id == stu_id: # 找到匹配的学生记录 deleted = True # 标记已删除 else: writer.writerow(self._student_to_row(s)) # 保留其他记录 return deleted def search_by_name(self, name: str) -> List[Student]: """根据姓名模糊查询学生信息""" return [s for s in self.get_all() if name in s.name] def search_by_class(self, class_name: str) -> List[Student]: """根据班级模糊查询学生信息""" return [s for s in self.get_all() if class_name in (s.class_name or '')] def search_by_major(self, major: str) -> List[Student]: """根据专业模糊查询学生信息""" return [s for s in self.get_all() if major in (s.major or '')]