You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
8.3 KiB

from test import Student
import os.path
from abc import ABC, abstractmethod
from datetime import date, datetime
from typing import List, Dict, Optional
import json
import csv
class IStudentDAL(ABC):
@abstractmethod
def get_by_id(self, id: str) -> Optional[Student]:
pass
@abstractmethod
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
pass
@abstractmethod
def get_all(self) -> List[Student]:
pass
@abstractmethod
def add(self, student: Student) -> bool:
pass
@abstractmethod
def delete(self, id: str) -> bool:
pass
@abstractmethod
def delete_by_stu_id(self, stu_id: str) -> bool:
pass
@abstractmethod
def update(self, student: Student) -> bool:
pass
@abstractmethod
def is_exist(self, id: str) -> bool:
pass
@abstractmethod
def is_exist_stu_id(self, stu_id: str) -> bool:
pass
@abstractmethod
def import_from_json(self, file_path: str) -> List[Dict]:
pass
@abstractmethod
def export_to_json(self, file_path: str) -> bool:
pass
@abstractmethod
def import_from_csv(self, file_path: str) -> List[Dict]:
pass
@abstractmethod
def export_to_csv(self, file_path: str) -> bool:
pass
class JsonStudentDAL(IStudentDAL):
def __init__(self, file_path: str):
self.file_path = file_path
self._ensure_file_exists()
def _ensure_file_exists(self):
if not os.path.exists(self.file_path):
with open(self.file_path, 'w', encoding='utf-8') as f:
json.dump([], f, ensure_ascii=False)
def _load(self) -> List[Dict]:
with open(self.file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def _save(self, data: List[Dict]):
with open(self.file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_by_id(self, id: str) -> Optional[Student]:
data = self._load()
for item in data:
if item['id'] == id:
return Student.from_dict(item)
return None
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
data = self._load()
for item in data:
if item['stu_id'] == stu_id:
return Student.from_dict(item)
return None
def get_all(self) -> List[Student]:
return [Student.from_dict(item) for item in self._load()]
def add(self, student: Student) -> bool:
data = self._load()
if self.is_exist(student.id):
return False
data.append(student.to_dict())
self._save(data)
return True
def delete(self, id: str) -> bool:
data = self._load()
new_data = [item for item in data if item['id'] != id]
if len(data) == len(new_data):
return False
self._save(new_data)
return True
def delete_by_stu_id(self, stu_id: str) -> bool:
data = self._load()
new_data = [item for item in data if item['stu_id'] != stu_id]
if len(data) == len(new_data):
return False
self._save(new_data)
return True
def update(self, student: Student) -> bool:
data = self._load()
updated = False
for i, item in enumerate(data):
if item['id'] == student.id:
data[i] = student.to_dict()
updated = True
break
if updated:
self._save(data)
return updated
def is_exist(self, id: str) -> bool:
return any(item['id'] == id for item in self._load())
def is_exist_stu_id(self, stu_id: str) -> bool:
return any(item['stu_id'] == stu_id for item in self._load())
def import_from_json(self, file_path: str) -> List[Dict]:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def export_to_json(self, file_path: str) -> bool:
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self._load(), f, ensure_ascii=False, indent=2)
return True
except:
return False
class CsvStudentDAL(IStudentDAL):
"""CSV格式数据访问实现类"""
FIELD_TYPES = {
'id': str,
'stu_id': str,
'name': str,
'age': int,
'gender': str,
'height': float,
'weight': float
}
def __init__(self, file_path: str):
self.file_path = file_path
self._ensure_file_exists()
def _ensure_file_exists(self):
"""确保CSV文件存在并包含表头"""
if not os.path.exists(self.file_path):
with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
def _convert_row(self, row: Dict) -> Dict:
"""转换CSV行的数据类型"""
converted = {}
for field, dtype in self.FIELD_TYPES.items():
value = row.get(field)
if value is None or value == '':
converted[field] = None
else:
try:
converted[field] = dtype(value)
except (ValueError, TypeError):
converted[field] = None
return converted
def _load(self) -> List[Dict]:
"""从CSV文件加载数据"""
with open(self.file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
return [self._convert_row(row) for row in reader]
def _save(self, data: List[Dict]):
"""保存数据到CSV文件"""
with open(self.file_path, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
writer.writerows(data)
def get_by_id(self, id: str) -> Optional[Student]:
data = self._load()
for item in data:
if item['id'] == id:
return Student.from_dict(item)
return None
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
data = self._load()
for item in data:
if item['stu_id'] == stu_id:
return Student.from_dict(item)
return None
def get_all(self) -> List[Student]:
return [Student.from_dict(item) for item in self._load()]
def add(self, student: Student) -> bool:
data = self._load()
if self.is_exist(student.id):
return False
data.append(student.to_dict())
self._save(data)
return True
def delete(self, id: str) -> bool:
data = self._load()
new_data = [item for item in data if item['id'] != id]
if len(data) == len(new_data):
return False
self._save(new_data)
return True
def delete_by_stu_id(self, stu_id: str) -> bool:
data = self._load()
new_data = [item for item in data if item['stu_id'] != stu_id]
if len(data) == len(new_data):
return False
self._save(new_data)
return True
def update(self, student: Student) -> bool:
data = self._load()
updated = False
for i, item in enumerate(data):
if item['id'] == student.id:
data[i] = student.to_dict()
updated = True
break
if updated:
self._save(data)
return updated
def is_exist(self, id: str) -> bool:
return any(item['id'] == id for item in self._load())
def is_exist_stu_id(self, stu_id: str) -> bool:
return any(item['stu_id'] == stu_id for item in self._load())
def import_from_csv(self, file_path: str) -> List[Dict]:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
return [self._convert_row(row) for row in reader]
def export_to_csv(self, file_path: str) -> bool:
try:
data = self._load()
with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
writer.writerows(data)
return True
except:
return False