from test import Student import os.path from abc import ABC, abstractmethod from datetime import date, datetime from typing import List, Dict, Optional import json import csv class IStudentDAL(ABC): @abstractmethod def get_by_id(self, id: str) -> Optional[Student]: pass @abstractmethod def get_by_stu_id(self, stu_id: str) -> Optional[Student]: pass @abstractmethod def get_all(self) -> List[Student]: pass @abstractmethod def add(self, student: Student) -> bool: pass @abstractmethod def delete(self, id: str) -> bool: pass @abstractmethod def delete_by_stu_id(self, stu_id: str) -> bool: pass @abstractmethod def update(self, student: Student) -> bool: pass @abstractmethod def is_exist(self, id: str) -> bool: pass @abstractmethod def is_exist_stu_id(self, stu_id: str) -> bool: pass @abstractmethod def import_from_json(self, file_path: str) -> List[Dict]: pass @abstractmethod def export_to_json(self, file_path: str) -> bool: pass @abstractmethod def import_from_csv(self, file_path: str) -> List[Dict]: pass @abstractmethod def export_to_csv(self, file_path: str) -> bool: pass class JsonStudentDAL(IStudentDAL): def __init__(self, file_path: str): self.file_path = file_path self._ensure_file_exists() def _ensure_file_exists(self): if not os.path.exists(self.file_path): with open(self.file_path, 'w', encoding='utf-8') as f: json.dump([], f, ensure_ascii=False) def _load(self) -> List[Dict]: with open(self.file_path, 'r', encoding='utf-8') as f: return json.load(f) def _save(self, data: List[Dict]): with open(self.file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def get_by_id(self, id: str) -> Optional[Student]: data = self._load() for item in data: if item['id'] == id: return Student.from_dict(item) return None def get_by_stu_id(self, stu_id: str) -> Optional[Student]: data = self._load() for item in data: if item['stu_id'] == stu_id: return Student.from_dict(item) return None def get_all(self) -> List[Student]: return [Student.from_dict(item) for item in self._load()] def add(self, student: Student) -> bool: data = self._load() if self.is_exist(student.id): return False data.append(student.to_dict()) self._save(data) return True def delete(self, id: str) -> bool: data = self._load() new_data = [item for item in data if item['id'] != id] if len(data) == len(new_data): return False self._save(new_data) return True def delete_by_stu_id(self, stu_id: str) -> bool: data = self._load() new_data = [item for item in data if item['stu_id'] != stu_id] if len(data) == len(new_data): return False self._save(new_data) return True def update(self, student: Student) -> bool: data = self._load() updated = False for i, item in enumerate(data): if item['id'] == student.id: data[i] = student.to_dict() updated = True break if updated: self._save(data) return updated def is_exist(self, id: str) -> bool: return any(item['id'] == id for item in self._load()) def is_exist_stu_id(self, stu_id: str) -> bool: return any(item['stu_id'] == stu_id for item in self._load()) def import_from_json(self, file_path: str) -> List[Dict]: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) def export_to_json(self, file_path: str) -> bool: try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(self._load(), f, ensure_ascii=False, indent=2) return True except: return False class CsvStudentDAL(IStudentDAL): """CSV格式数据访问实现类""" FIELD_TYPES = { 'id': str, 'stu_id': str, 'name': str, 'age': int, 'gender': str, 'height': float, 'weight': float } def __init__(self, file_path: str): self.file_path = file_path self._ensure_file_exists() def _ensure_file_exists(self): """确保CSV文件存在并包含表头""" if not os.path.exists(self.file_path): with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) writer.writeheader() def _convert_row(self, row: Dict) -> Dict: """转换CSV行的数据类型""" converted = {} for field, dtype in self.FIELD_TYPES.items(): value = row.get(field) if value is None or value == '': converted[field] = None else: try: converted[field] = dtype(value) except (ValueError, TypeError): converted[field] = None return converted def _load(self) -> List[Dict]: """从CSV文件加载数据""" with open(self.file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) return [self._convert_row(row) for row in reader] def _save(self, data: List[Dict]): """保存数据到CSV文件""" with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) writer.writeheader() writer.writerows(data) def get_by_id(self, id: str) -> Optional[Student]: data = self._load() for item in data: if item['id'] == id: return Student.from_dict(item) return None def get_by_stu_id(self, stu_id: str) -> Optional[Student]: data = self._load() for item in data: if item['stu_id'] == stu_id: return Student.from_dict(item) return None def get_all(self) -> List[Student]: return [Student.from_dict(item) for item in self._load()] def add(self, student: Student) -> bool: data = self._load() if self.is_exist(student.id): return False data.append(student.to_dict()) self._save(data) return True def delete(self, id: str) -> bool: data = self._load() new_data = [item for item in data if item['id'] != id] if len(data) == len(new_data): return False self._save(new_data) return True def delete_by_stu_id(self, stu_id: str) -> bool: data = self._load() new_data = [item for item in data if item['stu_id'] != stu_id] if len(data) == len(new_data): return False self._save(new_data) return True def update(self, student: Student) -> bool: data = self._load() updated = False for i, item in enumerate(data): if item['id'] == student.id: data[i] = student.to_dict() updated = True break if updated: self._save(data) return updated def is_exist(self, id: str) -> bool: return any(item['id'] == id for item in self._load()) def is_exist_stu_id(self, stu_id: str) -> bool: return any(item['stu_id'] == stu_id for item in self._load()) def import_from_csv(self, file_path: str) -> List[Dict]: with open(file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) return [self._convert_row(row) for row in reader] def export_to_csv(self, file_path: str) -> bool: try: data = self._load() with open(file_path, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys()) writer.writeheader() writer.writerows(data) return True except: return False