You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
7.6 KiB
242 lines
7.6 KiB
from abc import ABC, abstractmethod
|
|
import json
|
|
import csv
|
|
|
|
|
|
class IStudentDAL(ABC):
|
|
@abstractmethod
|
|
def get_by_id(self, id_number):
|
|
pass
|
|
@abstractmethod
|
|
def get_by_stu_id(self, stu_id):
|
|
pass
|
|
@abstractmethod
|
|
def get_all(self):
|
|
pass
|
|
@abstractmethod
|
|
def add(self, student):
|
|
pass
|
|
@abstractmethod
|
|
def delete(self, id_number):
|
|
pass
|
|
@abstractmethod
|
|
def delete_by_stu_id(self, stu_id):
|
|
pass
|
|
@abstractmethod
|
|
def update(self, student):
|
|
pass
|
|
@abstractmethod
|
|
def is_exist(self, id_number):
|
|
pass
|
|
@abstractmethod
|
|
def is_exist_stu_id(self, stu_id):
|
|
pass
|
|
@abstractmethod
|
|
def import_from_json(self, file_path):
|
|
pass
|
|
@abstractmethod
|
|
def export_to_json(self, data, file_path):
|
|
pass
|
|
@abstractmethod
|
|
def import_from_csv(self, file_path):
|
|
pass
|
|
@abstractmethod
|
|
def export_to_csv(self, data, file_path):
|
|
pass
|
|
|
|
|
|
class JsonStudentDAL(IStudentDAL):
|
|
def __init__(self, file_path):
|
|
self.file_path = file_path
|
|
self._ensure_file_exists()
|
|
self.data = self._load()
|
|
def _ensure_file_exists(self):
|
|
try:
|
|
with open(self.file_path, 'x'):
|
|
pass
|
|
except FileExistsError:
|
|
pass
|
|
def _load(self):
|
|
try:
|
|
with open(self.file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
return []
|
|
def _save(self):
|
|
with open(self.file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.data, f, ensure_ascii=False, indent=4)
|
|
def get_by_id(self, id_number):
|
|
for student in self.data:
|
|
if student['id_number'] == id_number:
|
|
return student
|
|
return None
|
|
def get_by_stu_id(self, stu_id):
|
|
for student in self.data:
|
|
if student['stu_id'] == stu_id:
|
|
return student
|
|
return None
|
|
def get_all(self):
|
|
return self.data
|
|
def add(self, student):
|
|
if self.is_exist(student['id_number']):
|
|
return False
|
|
self.data.append(student)
|
|
self._save()
|
|
return True
|
|
def delete(self, id_number):
|
|
for index, student in enumerate(self.data):
|
|
if student['id_number'] == id_number:
|
|
del self.data[index]
|
|
self._save()
|
|
return True
|
|
return False
|
|
def delete_by_stu_id(self, stu_id):
|
|
for index, student in enumerate(self.data):
|
|
if student['stu_id'] == stu_id:
|
|
del self.data[index]
|
|
self._save()
|
|
return True
|
|
return False
|
|
def update(self, student):
|
|
for index, existing_student in enumerate(self.data):
|
|
if existing_student['id_number'] == student['id_number']:
|
|
self.data[index] = student
|
|
self._save()
|
|
return True
|
|
return False
|
|
def is_exist(self, id_number):
|
|
for student in self.data:
|
|
if student['id_number'] == id_number:
|
|
return True
|
|
return False
|
|
def is_exist_stu_id(self, stu_id):
|
|
for student in self.data:
|
|
if student['stu_id'] == stu_id:
|
|
return True
|
|
return False
|
|
def import_from_json(self, file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
return []
|
|
def export_to_json(self, data, file_path):
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=4)
|
|
return True
|
|
except IOError:
|
|
return False
|
|
class CsvStudentDAL(IStudentDAL):
|
|
FIELD_TYPES = {
|
|
"id_number": str,
|
|
"stu_id": str,
|
|
"name": str,
|
|
"age": int,
|
|
"gender": str
|
|
}
|
|
def __init__(self, file_path):
|
|
self.file_path = file_path
|
|
self._ensure_file_exists()
|
|
self.data = self._load()
|
|
def _ensure_file_exists(self):
|
|
try:
|
|
with open(self.file_path, 'x', newline='', encoding='utf-8-sig') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
|
|
writer.writeheader()
|
|
except FileExistsError:
|
|
pass
|
|
def _load(self):
|
|
data = []
|
|
try:
|
|
with open(self.file_path, 'r', newline='', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
data.append(self._convert_row(row))
|
|
except FileNotFoundError:
|
|
pass
|
|
return data
|
|
def _save(self):
|
|
with open(self.file_path, 'w', newline='', encoding='utf-8-sig') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
|
|
writer.writeheader()
|
|
for student in self.data:
|
|
writer.writerow(student)
|
|
def _convert_row(self, row):
|
|
converted_row = {}
|
|
for key, value in row.items():
|
|
try:
|
|
converted_row[key] = self.FIELD_TYPES[key](value)
|
|
except ValueError:
|
|
converted_row[key] = value
|
|
return converted_row
|
|
def get_by_id(self, id_number):
|
|
for student in self.data:
|
|
if student['id_number'] == id_number:
|
|
return student
|
|
return None
|
|
def get_by_stu_id(self, stu_id):
|
|
for student in self.data:
|
|
if student['stu_id'] == stu_id:
|
|
return student
|
|
return None
|
|
def get_all(self):
|
|
return self.data
|
|
def add(self, student):
|
|
if self.is_exist(student['id_number']):
|
|
return False
|
|
self.data.append(student)
|
|
self._save()
|
|
return True
|
|
def delete(self, id_number):
|
|
for index, student in enumerate(self.data):
|
|
if student['id_number'] == id_number:
|
|
del self.data[index]
|
|
self._save()
|
|
return True
|
|
return False
|
|
def delete_by_stu_id(self, stu_id):
|
|
for index, student in enumerate(self.data):
|
|
if student['stu_id'] == stu_id:
|
|
del self.data[index]
|
|
self._save()
|
|
return True
|
|
return False
|
|
def update(self, student):
|
|
for index, existing_student in enumerate(self.data):
|
|
if existing_student['id_number'] == student['id_number']:
|
|
self.data[index] = student
|
|
self._save()
|
|
return True
|
|
return False
|
|
def is_exist(self, id_number):
|
|
for student in self.data:
|
|
if student['id_number'] == id_number:
|
|
return True
|
|
return False
|
|
def is_exist_stu_id(self, stu_id):
|
|
for student in self.data:
|
|
if student['stu_id'] == stu_id:
|
|
return True
|
|
return False
|
|
def import_from_csv(self, file_path):
|
|
data = []
|
|
try:
|
|
with open(file_path, 'r', newline='', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
data.append(self._convert_row(row))
|
|
except FileNotFoundError:
|
|
pass
|
|
return data
|
|
def export_to_csv(self, data, file_path):
|
|
try:
|
|
with open(file_path, 'w', newline='', encoding='utf-8-sig') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.FIELD_TYPES.keys())
|
|
writer.writeheader()
|
|
for student in data:
|
|
writer.writerow(student)
|
|
return True
|
|
except IOError:
|
|
return False
|