|
|
import csv
|
|
|
from typing import List, Optional
|
|
|
from .student_dal import IStudentDAL
|
|
|
from ..model.student import Student
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
class CsvStudentDAL(IStudentDAL):
|
|
|
"""学生信息的CSV文件存储实现"""
|
|
|
|
|
|
def __init__(self, file_path: str = 'students.csv'):
|
|
|
self.file_path = file_path
|
|
|
self.headers = ['name', 'id_card', 'stu_id', 'gender', 'height', 'weight',
|
|
|
'enrollment_date', 'class_name', 'major', 'email', 'phone']
|
|
|
|
|
|
# 确保文件存在
|
|
|
self._ensure_file_exists()
|
|
|
|
|
|
def _ensure_file_exists(self):
|
|
|
"""确保CSV文件存在并包含表头"""
|
|
|
try:
|
|
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
|
|
reader = csv.reader(file)
|
|
|
if not any(reader): # 文件为空
|
|
|
self._write_headers()
|
|
|
except FileNotFoundError:
|
|
|
self._write_headers()
|
|
|
|
|
|
def _write_headers(self):
|
|
|
"""写入CSV文件表头"""
|
|
|
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
|
|
|
writer = csv.DictWriter(file, fieldnames=self.headers)
|
|
|
writer.writeheader()
|
|
|
|
|
|
def _student_to_row(self, student: Student) -> dict:
|
|
|
"""将Student对象转换为CSV行数据"""
|
|
|
return {
|
|
|
'name': student.name,
|
|
|
'id_card': student.id_card,
|
|
|
'stu_id': student.stu_id,
|
|
|
'gender': student.gender,
|
|
|
'height': student.height,
|
|
|
'weight': student.weight,
|
|
|
'enrollment_date': str(student.enrollment_date) if student.enrollment_date else '',
|
|
|
'class_name': student.class_name,
|
|
|
'major': student.major,
|
|
|
'email': student.email,
|
|
|
'phone': student.phone
|
|
|
}
|
|
|
|
|
|
def _row_to_student(self, row: dict) -> Student:
|
|
|
"""将CSV行数据转换为Student对象"""
|
|
|
# 处理日期类型,将字符串转换为datetime.date对象
|
|
|
enrollment_date = row['enrollment_date']
|
|
|
if enrollment_date:
|
|
|
enrollment_date = datetime.strptime(enrollment_date, '%Y-%m-%d').date()
|
|
|
|
|
|
# 处理布尔类型,将字符串转换为布尔值
|
|
|
gender = row['gender']
|
|
|
if gender:
|
|
|
gender = gender.lower() == 'true'
|
|
|
|
|
|
# 处理数值类型,将字符串转换为数值
|
|
|
height = int(row['height']) if row['height'] else None
|
|
|
weight = float(row['weight']) if row['weight'] else None
|
|
|
# 创建并返回学生对象
|
|
|
return Student(
|
|
|
name=row['name'],
|
|
|
id_card=row['id_card'],
|
|
|
stu_id=row['stu_id'],
|
|
|
gender=gender,
|
|
|
height=height,
|
|
|
weight=weight,
|
|
|
enrollment_date=enrollment_date,
|
|
|
class_name=row['class_name'],
|
|
|
major=row['major'],
|
|
|
email=row['email'],
|
|
|
phone=row['phone']
|
|
|
)
|
|
|
|
|
|
def get_by_id(self, id_card: str) -> Optional[Student]:
|
|
|
"""根据身份证号获取学生信息"""
|
|
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
|
|
reader = csv.DictReader(file)
|
|
|
for row in reader:
|
|
|
if row['id_card'] == id_card:
|
|
|
return self._row_to_student(row)
|
|
|
return None
|
|
|
|
|
|
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
|
|
|
"""根据学号获取学生信息"""
|
|
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
|
|
reader = csv.DictReader(file)
|
|
|
for row in reader:
|
|
|
if row['stu_id'] == stu_id:
|
|
|
return self._row_to_student(row)
|
|
|
return None
|
|
|
|
|
|
def get_all(self) -> List[Student]:
|
|
|
"""获取所有学生信息"""
|
|
|
students = []
|
|
|
with open(self.file_path, 'r', encoding='utf-8') as file:
|
|
|
reader = csv.DictReader(file)
|
|
|
for row in reader:
|
|
|
students.append(self._row_to_student(row))
|
|
|
return students
|
|
|
|
|
|
def add(self, student: Student) -> bool:
|
|
|
"""添加学生信息"""
|
|
|
# 检查学号和身份证号是否已存在
|
|
|
if self.get_by_id(student.id_card) or self.get_by_stu_id(student.stu_id):
|
|
|
return False
|
|
|
# 将学生信息追加到CSV文件
|
|
|
with open(self.file_path, 'a', encoding='utf-8', newline='') as file:
|
|
|
writer = csv.DictWriter(file, fieldnames=self.headers)
|
|
|
writer.writerow(self._student_to_row(student))
|
|
|
return True
|
|
|
|
|
|
def update(self, student: Student) -> bool:
|
|
|
"""
|
|
|
更新学生信息
|
|
|
通过身份证号匹配要更新的记录
|
|
|
"""
|
|
|
students = self.get_all() # 获取所有学生信息
|
|
|
updated = False
|
|
|
|
|
|
# 重写整个CSV文件,替换匹配的学生记录
|
|
|
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
|
|
|
writer = csv.DictWriter(file, fieldnames=self.headers)
|
|
|
writer.writeheader()# 写入表头
|
|
|
|
|
|
for s in students:
|
|
|
if s.id_card == student.id_card: # 找到匹配的学生记录
|
|
|
writer.writerow(self._student_to_row(student))
|
|
|
updated = True
|
|
|
else:
|
|
|
writer.writerow(self._student_to_row(s)) # 保持原有记录不变
|
|
|
|
|
|
return updated
|
|
|
|
|
|
def delete_by_id(self, id_card: str) -> bool:
|
|
|
"""
|
|
|
根据身份证号删除学生信息
|
|
|
通过重写整个文件排除要删除的记录
|
|
|
"""
|
|
|
students = self.get_all() # 获取所有学生信息
|
|
|
deleted = False
|
|
|
|
|
|
# 重写整个CSV文件,排除要删除的学生记录
|
|
|
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
|
|
|
writer = csv.DictWriter(file, fieldnames=self.headers)
|
|
|
writer.writeheader() # 写入表头
|
|
|
|
|
|
for s in students:
|
|
|
if s.id_card == id_card: # 找到匹配的学生记录
|
|
|
deleted = True # 标记已删除
|
|
|
else:
|
|
|
writer.writerow(self._student_to_row(s)) # 保留其他记录
|
|
|
|
|
|
return deleted
|
|
|
|
|
|
def delete_by_stu_id(self, stu_id: str) -> bool:
|
|
|
"""
|
|
|
根据学号删除学生信息
|
|
|
通过重写整个文件排除要删除的记录
|
|
|
"""
|
|
|
students = self.get_all() # 获取所有学生信息
|
|
|
deleted = False
|
|
|
|
|
|
# 重写整个CSV文件,排除要删除的学生记录
|
|
|
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
|
|
|
writer = csv.DictWriter(file, fieldnames=self.headers)
|
|
|
writer.writeheader() # 写入表头
|
|
|
|
|
|
for s in students:
|
|
|
if s.stu_id == stu_id: # 找到匹配的学生记录
|
|
|
deleted = True # 标记已删除
|
|
|
else:
|
|
|
writer.writerow(self._student_to_row(s)) # 保留其他记录
|
|
|
|
|
|
return deleted
|
|
|
def search_by_name(self, name: str) -> List[Student]:
|
|
|
"""根据姓名模糊查询学生信息"""
|
|
|
return [s for s in self.get_all() if name in s.name]
|
|
|
|
|
|
def search_by_class(self, class_name: str) -> List[Student]:
|
|
|
"""根据班级模糊查询学生信息"""
|
|
|
return [s for s in self.get_all() if class_name in (s.class_name or '')]
|
|
|
|
|
|
def search_by_major(self, major: str) -> List[Student]:
|
|
|
"""根据专业模糊查询学生信息"""
|
|
|
return [s for s in self.get_all() if major in (s.major or '')] |