commit ec327db173fbbca2fb795db14ec61f9be4ef1441 Author: Puhan Zhao <3107249095@qq.com> Date: Tue Jun 24 10:52:07 2025 +0800 1 diff --git a/DAL/studentDAL.py b/DAL/studentDAL.py new file mode 100644 index 0000000..4d47216 --- /dev/null +++ b/DAL/studentDAL.py @@ -0,0 +1,288 @@ +from test import Student +import os.path +from abc import ABC, abstractmethod +from datetime import date, datetime +from typing import List, Dict, Optional +import json +import csv + +class IStudentDAL(ABC): + + @abstractmethod + def get_by_id(self, id: str) -> Optional[[Student]]: + pass + + @abstractmethod + def get_by_stu_id(self, stu_id: str) -> Optional[Student]: + pass + + @abstractmethod + def get_all(self) -> List[Student]: + pass + + @abstractmethod + def add(self, student: Student) -> bool: + pass + + @abstractmethod + def delete(self, id: str) -> bool: + pass + + @abstractmethod + def delete_by_stu_id(self, stu_id: str) -> bool: + pass + + @abstractmethod + def update(self, student: Student) -> bool: + pass + + @abstractmethod + def is_exist(self, id: str) -> bool: + pass + + @abstractmethod + def is_exist_stu_id(self, stu_id: str) -> bool: + pass + + @abstractmethod + def import_from_json(self, file_path: str) -> List[Dict]: + pass + + @abstractmethod + def export_to_json(self, file_path: str) -> bool: + pass + + @abstractmethod + def import_from_csv(self, file_path: str) -> List[Dict]: + pass + + @abstractmethod + def export_to_csv(self, file_path: str) -> bool: + pass + +class JsonStudentDAL(IStudentDAL): + + def __init__(self, file_path: str): + self.file_path = file_path + self._ensure_file_exists() + + def _ensure_file_exists(self): + if not os.path.exists(self.file_path): + with open(self.file_path, 'w', encoding='utf-8') as f: + json.dump([], f, ensure_ascii=False) + + def _load(self) -> List[Dict]: + with open(self.file_path, 'r', encoding='utf-8') as f: + return json.load(f) + + def _save(self, data: List[Dict]): + with open(self.file_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + def get_by_id(self, id: str) -> Optional[Student]: + data = self._load() + for item in data: + if item['id'] == id: + return Student.from_dict(item) + return None + + def get_by_stu_id(self, stu_id: str) -> Optional[Student]: + data = self._load() + for item in data: + if item['stu_id'] == stu_id: + return Student.from_dict(item) + return None + + def get_all(self) -> List[Student]: + return [Student.from_dict(item) for item in self._load()] + + def add(self, student: Student) -> bool: + data = self._load() + if self.is_exist(student.id): + return False + + data.append(student.to_dict()) + self._save(data) + return True + + def delete(self, id: str) -> bool: + data = self._load() + new_data = [item for item in data if item['id'] != id] + if len(data) == len(new_data): + return False + + self._save(new_data) + return True + + def delete_by_stu_id(self, stu_id: str) -> bool: + data = self._load() + new_data = [item for item in data if item['stu_id'] != stu_id] + if len(data) == len(new_data): + return False + + self._save(new_data) + return True + + def update(self, student: Student) -> bool: + data = self._load() + updated = False + + for i, item in enumerate(data): + if item['id'] == student.id: + data[i] = student.to_dict() + updated = True + break + + if updated: + self._save(data) + return updated + + def is_exist(self, id: str) -> bool: + return any(item['id'] == id for item in self._load()) + + def is_exist_stu_id(self, stu_id: str) -> bool: + return any(item['stu_id'] == stu_id for item in self._load()) + + def import_from_json(self, file_path: str) -> List[Dict]: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + + def export_to_json(self, file_path: str) -> bool: + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(self._load(), f, ensure_ascii=False, indent=2) + return True + except: + return False + +class CsvStudentDAL(IStudentDAL): + """CSV格式数据访问实现类""" + + FIELD_TYPES = { + 'id': str, + 'stu_id': str, + 'name': str, + 'age': int, + 'gender': str, + 'height': float, + 'weight': float + } + + def __init__(self, file_path: str): + self.file_path = file_path + self._ensure_file_exists() + + def _ensure_file_exists(self): + """确保CSV文件存在并包含表头""" + if not os.path.exists(self.file_path): + with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f: + writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) + writer.writeheader() + + def _convert_row(self, row: Dict) -> Dict: + """转换CSV行的数据类型""" + converted = {} + for field, dtype in self.FIELD_TYPES.items(): + value = row.get(field) + if value is None or value == '': + converted[field] = None + else: + try: + converted[field] = dtype(value) + except (ValueError, TypeError): + converted[field] = None + return converted + + def _load(self) -> List[Dict]: + """从CSV文件加载数据""" + with open(self.file_path, 'r', encoding='utf-8-sig') as f: + reader = csv.DictReader(f) + return [self._convert_row(row) for row in reader] + + def _save(self, data: List[Dict]): + """保存数据到CSV文件""" + with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f: + writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) + writer.writeheader() + writer.writerows(data) + + def get_by_id(self, id: str) -> Optional[Student]: + data = self._load() + for item in data: + if item['id'] == id: + return Student.from_dict(item) + return None + + def get_by_stu_id(self, stu_id: str) -> Optional[Student]: + data = self._load() + for item in data: + if item['stu_id'] == stu_id: + return Student.from_dict(item) + return None + + def get_all(self) -> List[Student]: + return [Student.from_dict(item) for item in self._load()] + + def add(self, student: Student) -> bool: + data = self._load() + if self.is_exist(student.id): + return False + + data.append(student.to_dict()) + self._save(data) + return True + + def delete(self, id: str) -> bool: + data = self._load() + new_data = [item for item in data if item['id'] != id] + if len(data) == len(new_data): + return False + + self._save(new_data) + return True + + def delete_by_stu_id(self, stu_id: str) -> bool: + data = self._load() + new_data = [item for item in data if item['stu_id'] != stu_id] + if len(data) == len(new_data): + return False + + self._save(new_data) + return True + + def update(self, student: Student) -> bool: + data = self._load() + updated = False + + for i, item in enumerate(data): + if item['id'] == student.id: + data[i] = student.to_dict() + updated = True + break + + if updated: + self._save(data) + return updated + + def is_exist(self, id: str) -> bool: + return any(item['id'] == id for item in self._load()) + + def is_exist_stu_id(self, stu_id: str) -> bool: + return any(item['stu_id'] == stu_id for item in self._load()) + + def import_from_csv(self, file_path: str) -> List[Dict]: + with open(file_path, 'r', encoding='utf-8-sig') as f: + reader = csv.DictReader(f) + return [self._convert_row(row) for row in reader] + + def export_to_csv(self, file_path: str) -> bool: + try: + data = self._load() + with open(file_path, 'w', encoding='utf-8-sig', newline='') as f: + writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) + writer.writeheader() + writer.writerows(data) + return True + except: + return False +