You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
abc/student/dal/csv_student_dal.py

192 lines
7.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import csv
from typing import List, Optional
from .student_dal import IStudentDAL
from ..model.student import Student
from datetime import datetime
class CsvStudentDAL(IStudentDAL):
"""学生信息的CSV文件存储实现"""
def __init__(self, file_path: str = 'students.csv'):
self.file_path = file_path
self.headers = ['name', 'id_card', 'stu_id', 'gender', 'height', 'weight',
'enrollment_date', 'class_name', 'major', 'email', 'phone']
# 确保文件存在
self._ensure_file_exists()
def _ensure_file_exists(self):
"""确保CSV文件存在并包含表头"""
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
if not any(reader): # 文件为空
self._write_headers()
except FileNotFoundError:
self._write_headers()
def _write_headers(self):
"""写入CSV文件表头"""
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()
def _student_to_row(self, student: Student) -> dict:
"""将Student对象转换为CSV行数据"""
return {
'name': student.name,
'id_card': student.id_card,
'stu_id': student.stu_id,
'gender': student.gender,
'height': student.height,
'weight': student.weight,
'enrollment_date': str(student.enrollment_date) if student.enrollment_date else '',
'class_name': student.class_name,
'major': student.major,
'email': student.email,
'phone': student.phone
}
def _row_to_student(self, row: dict) -> Student:
"""将CSV行数据转换为Student对象"""
# 处理日期类型将字符串转换为datetime.date对象
enrollment_date = row['enrollment_date']
if enrollment_date:
enrollment_date = datetime.strptime(enrollment_date, '%Y-%m-%d').date()
# 处理布尔类型,将字符串转换为布尔值
gender = row['gender']
if gender:
gender = gender.lower() == 'true'
# 处理数值类型,将字符串转换为数值
height = int(row['height']) if row['height'] else None
weight = float(row['weight']) if row['weight'] else None
# 创建并返回学生对象
return Student(
name=row['name'],
id_card=row['id_card'],
stu_id=row['stu_id'],
gender=gender,
height=height,
weight=weight,
enrollment_date=enrollment_date,
class_name=row['class_name'],
major=row['major'],
email=row['email'],
phone=row['phone']
)
def get_by_id(self, id_card: str) -> Optional[Student]:
"""根据身份证号获取学生信息"""
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
if row['id_card'] == id_card:
return self._row_to_student(row)
return None
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
"""根据学号获取学生信息"""
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
if row['stu_id'] == stu_id:
return self._row_to_student(row)
return None
def get_all(self) -> List[Student]:
"""获取所有学生信息"""
students = []
with open(self.file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
students.append(self._row_to_student(row))
return students
def add(self, student: Student) -> bool:
"""添加学生信息"""
# 检查学号和身份证号是否已存在
if self.get_by_id(student.id_card) or self.get_by_stu_id(student.stu_id):
return False
# 将学生信息追加到CSV文件
with open(self.file_path, 'a', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writerow(self._student_to_row(student))
return True
def update(self, student: Student) -> bool:
"""
更新学生信息
通过身份证号匹配要更新的记录
"""
students = self.get_all() # 获取所有学生信息
updated = False
# 重写整个CSV文件替换匹配的学生记录
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader()# 写入表头
for s in students:
if s.id_card == student.id_card: # 找到匹配的学生记录
writer.writerow(self._student_to_row(student))
updated = True
else:
writer.writerow(self._student_to_row(s)) # 保持原有记录不变
return updated
def delete_by_id(self, id_card: str) -> bool:
"""
根据身份证号删除学生信息
通过重写整个文件排除要删除的记录
"""
students = self.get_all() # 获取所有学生信息
deleted = False
# 重写整个CSV文件排除要删除的学生记录
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader() # 写入表头
for s in students:
if s.id_card == id_card: # 找到匹配的学生记录
deleted = True # 标记已删除
else:
writer.writerow(self._student_to_row(s)) # 保留其他记录
return deleted
def delete_by_stu_id(self, stu_id: str) -> bool:
"""
根据学号删除学生信息
通过重写整个文件排除要删除的记录
"""
students = self.get_all() # 获取所有学生信息
deleted = False
# 重写整个CSV文件排除要删除的学生记录
with open(self.file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=self.headers)
writer.writeheader() # 写入表头
for s in students:
if s.stu_id == stu_id: # 找到匹配的学生记录
deleted = True # 标记已删除
else:
writer.writerow(self._student_to_row(s)) # 保留其他记录
return deleted
def search_by_name(self, name: str) -> List[Student]:
"""根据姓名模糊查询学生信息"""
return [s for s in self.get_all() if name in s.name]
def search_by_class(self, class_name: str) -> List[Student]:
"""根据班级模糊查询学生信息"""
return [s for s in self.get_all() if class_name in (s.class_name or '')]
def search_by_major(self, major: str) -> List[Student]:
"""根据专业模糊查询学生信息"""
return [s for s in self.get_all() if major in (s.major or '')]