You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
student/stumis/dal/studentDAL.py

242 lines
7.6 KiB

from abc import ABC, abstractmethod
import json
import csv
class IStudentDAL(ABC):
@abstractmethod
def get_by_id(self, id_number):
pass
@abstractmethod
def get_by_stu_id(self, stu_id):
pass
@abstractmethod
def get_all(self):
pass
@abstractmethod
def add(self, student):
pass
@abstractmethod
def delete(self, id_number):
pass
@abstractmethod
def delete_by_stu_id(self, stu_id):
pass
@abstractmethod
def update(self, student):
pass
@abstractmethod
def is_exist(self, id_number):
pass
@abstractmethod
def is_exist_stu_id(self, stu_id):
pass
@abstractmethod
def import_from_json(self, file_path):
pass
@abstractmethod
def export_to_json(self, data, file_path):
pass
@abstractmethod
def import_from_csv(self, file_path):
pass
@abstractmethod
def export_to_csv(self, data, file_path):
pass
class JsonStudentDAL(IStudentDAL):
def __init__(self, file_path):
self.file_path = file_path
self._ensure_file_exists()
self.data = self._load()
def _ensure_file_exists(self):
try:
with open(self.file_path, 'x'):
pass
except FileExistsError:
pass
def _load(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return []
def _save(self):
with open(self.file_path, 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=4)
def get_by_id(self, id_number):
for student in self.data:
if student['id_number'] == id_number:
return student
return None
def get_by_stu_id(self, stu_id):
for student in self.data:
if student['stu_id'] == stu_id:
return student
return None
def get_all(self):
return self.data
def add(self, student):
if self.is_exist(student['id_number']):
return False
self.data.append(student)
self._save()
return True
def delete(self, id_number):
for index, student in enumerate(self.data):
if student['id_number'] == id_number:
del self.data[index]
self._save()
return True
return False
def delete_by_stu_id(self, stu_id):
for index, student in enumerate(self.data):
if student['stu_id'] == stu_id:
del self.data[index]
self._save()
return True
return False
def update(self, student):
for index, existing_student in enumerate(self.data):
if existing_student['id_number'] == student['id_number']:
self.data[index] = student
self._save()
return True
return False
def is_exist(self, id_number):
for student in self.data:
if student['id_number'] == id_number:
return True
return False
def is_exist_stu_id(self, stu_id):
for student in self.data:
if student['stu_id'] == stu_id:
return True
return False
def import_from_json(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return []
def export_to_json(self, data, file_path):
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except IOError:
return False
class CsvStudentDAL(IStudentDAL):
FIELD_TYPES = {
"id_number": str,
"stu_id": str,
"name": str,
"age": int,
"gender": str
}
def __init__(self, file_path):
self.file_path = file_path
self._ensure_file_exists()
self.data = self._load()
def _ensure_file_exists(self):
try:
with open(self.file_path, 'x', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
except FileExistsError:
pass
def _load(self):
data = []
try:
with open(self.file_path, 'r', newline='', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(self._convert_row(row))
except FileNotFoundError:
pass
return data
def _save(self):
with open(self.file_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
for student in self.data:
writer.writerow(student)
def _convert_row(self, row):
converted_row = {}
for key, value in row.items():
try:
converted_row[key] = self.FIELD_TYPES[key](value)
except ValueError:
converted_row[key] = value
return converted_row
def get_by_id(self, id_number):
for student in self.data:
if student['id_number'] == id_number:
return student
return None
def get_by_stu_id(self, stu_id):
for student in self.data:
if student['stu_id'] == stu_id:
return student
return None
def get_all(self):
return self.data
def add(self, student):
if self.is_exist(student['id_number']):
return False
self.data.append(student)
self._save()
return True
def delete(self, id_number):
for index, student in enumerate(self.data):
if student['id_number'] == id_number:
del self.data[index]
self._save()
return True
return False
def delete_by_stu_id(self, stu_id):
for index, student in enumerate(self.data):
if student['stu_id'] == stu_id:
del self.data[index]
self._save()
return True
return False
def update(self, student):
for index, existing_student in enumerate(self.data):
if existing_student['id_number'] == student['id_number']:
self.data[index] = student
self._save()
return True
return False
def is_exist(self, id_number):
for student in self.data:
if student['id_number'] == id_number:
return True
return False
def is_exist_stu_id(self, stu_id):
for student in self.data:
if student['stu_id'] == stu_id:
return True
return False
def import_from_csv(self, file_path):
data = []
try:
with open(file_path, 'r', newline='', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(self._convert_row(row))
except FileNotFoundError:
pass
return data
def export_to_csv(self, data, file_path):
try:
with open(file_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
writer.writeheader()
for student in data:
writer.writerow(student)
return True
except IOError:
return False