You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
abc/student/dal/csv_student_dal.py

181 lines
6.4 KiB

import csv
from typing import List, Optional
from .student_dal import IStudentDAL
from ..model.student import Student
from datetime import datetime
class CsvStudentDAL(IStudentDAL):
"""学生信息的CSV文件存储实现"""
def __init__(self, file_path: str = 'students.csv'):
self.file_path = file_path
self.headers = ['name', 'id_card', 'stu_id', 'gender', 'height', 'weight',
'enrollment_date', 'class_name', 'major', 'email', 'phone']
# 确保文件存在
self._ensure_file_exists()
def _ensure_file_exists(self):
"""确保CSV文件存在并包含表头"""
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
if not any(reader): # 文件为空
self._write_headers()
except FileNotFoundError:
self._write_headers()
def _write_headers(self):
"""写入CSV文件表头"""
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()
def _student_to_row(self, student: Student) -> dict:
"""将Student对象转换为CSV行数据"""
return {
'name': student.name,
'id_card': student.id_card,
'stu_id': student.stu_id,
'gender': student.gender,
'height': student.height,
'weight': student.weight,
'enrollment_date': str(student.enrollment_date) if student.enrollment_date else '',
'class_name': student.class_name,
'major': student.major,
'email': student.email,
'phone': student.phone
}
def _row_to_student(self, row: dict) -> Student:
"""将CSV行数据转换为Student对象"""
# 处理日期类型
enrollment_date = row['enrollment_date']
if enrollment_date:
enrollment_date = datetime.strptime(enrollment_date, '%Y-%m-%d').date()
# 处理布尔类型
gender = row['gender']
if gender:
gender = gender.lower() == 'true'
# 处理数值类型
height = int(row['height']) if row['height'] else None
weight = float(row['weight']) if row['weight'] else None
return Student(
name=row['name'],
id_card=row['id_card'],
stu_id=row['stu_id'],
gender=gender,
height=height,
weight=weight,
enrollment_date=enrollment_date,
class_name=row['class_name'],
major=row['major'],
email=row['email'],
phone=row['phone']
)
def get_by_id(self, id_card: str) -> Optional[Student]:
"""根据身份证号获取学生信息"""
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
if row['id_card'] == id_card:
return self._row_to_student(row)
return None
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
"""根据学号获取学生信息"""
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
if row['stu_id'] == stu_id:
return self._row_to_student(row)
return None
def get_all(self) -> List[Student]:
"""获取所有学生信息"""
students = []
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
students.append(self._row_to_student(row))
return students
def add(self, student: Student) -> bool:
"""添加学生信息"""
# 检查学号和身份证号是否已存在
if self.get_by_id(student.id_card) or self.get_by_stu_id(student.stu_id):
return False
with open(self.file_path, 'a', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writerow(self._student_to_row(student))
return True
def update(self, student: Student) -> bool:
"""更新学生信息"""
students = self.get_all()
updated = False
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()
for s in students:
if s.id_card == student.id_card:
writer.writerow(self._student_to_row(student))
updated = True
else:
writer.writerow(self._student_to_row(s))
return updated
def delete_by_id(self, id_card: str) -> bool:
"""根据身份证号删除学生信息"""
students = self.get_all()
deleted = False
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()
for s in students:
if s.id_card == id_card:
deleted = True
else:
writer.writerow(self._student_to_row(s))
return deleted
def delete_by_stu_id(self, stu_id: str) -> bool:
"""根据学号删除学生信息"""
students = self.get_all()
deleted = False
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()
for s in students:
if s.stu_id == stu_id:
deleted = True
else:
writer.writerow(self._student_to_row(s))
return deleted
def search_by_name(self, name: str) -> List[Student]:
"""根据姓名模糊查询学生信息"""
return [s for s in self.get_all() if name in s.name]
def search_by_class(self, class_name: str) -> List[Student]:
"""根据班级模糊查询学生信息"""
return [s for s in self.get_all() if class_name in (s.class_name or '')]
def search_by_major(self, major: str) -> List[Student]:
"""根据专业模糊查询学生信息"""
return [s for s in self.get_all() if major in (s.major or '')]