You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
adss/student_dal_json.py

290 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import os, sys,csv
from datetime import date
from typing import List, Optional
from model.Student import Student
import csv
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from model.Student import Student
class StudentDALJSON:
def __init__(self, file_path="students.json"):
self.file_path = file_path
def get_all_students(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
student_dicts = json.load(file)
return [Student.from_dict(s) for s in student_dicts]
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_students(self, students):
student_dicts = [student.to_dict() for student in students]
with open(self.file_path, 'w', encoding='utf-8') as file:
json.dump(student_dicts, file, ensure_ascii=False, indent=4)
# 新增导出为JSON
def export_to_json(self, file_path=None):
if not file_path:
file_path = self.file_path
students = self.get_all_students()
if not students:
print("没有数据可导出")
return
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump([s.to_dict() for s in students], f, ensure_ascii=False, indent=4)
print(f"数据已导出到 {file_path}")
except Exception as e:
print(f"导出失败: {e}")
# 新增导出为CSV
def export_to_csv(self, file_path="students.csv"):
students = self.get_all_students()
if not students:
print("没有数据可导出")
return
try:
with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(
f,
fieldnames=[
'student_id', 'name', 'gender', 'age', 'class_name',
'major', 'phone', 'email', 'id_number',
'height', 'weight', 'birth_date', 'enrollment_date'
]
)
writer.writeheader()
for student in students:
data = student.to_dict()
# 将日期对象转换为字符串
data['birth_date'] = data['birth_date'].isoformat()
data['enrollment_date'] = data['enrollment_date'].isoformat()
writer.writerow(data)
print(f"数据已导出到 {file_path}")
except Exception as e:
print(f"导出失败: {e}")
# 新增从JSON导入
def import_from_json(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
student_dicts = json.load(f)
students = []
for data in student_dicts:
# 转换日期字符串为日期对象
data['birth_date'] = date.fromisoformat(data['birth_date'])
data['enrollment_date'] = date.fromisoformat(data['enrollment_date'])
students.append(Student.from_dict(data))
self.save_students(students)
print(f"{file_path} 导入成功")
except FileNotFoundError:
print(f"文件不存在: {file_path}")
except json.JSONDecodeError:
print(f"JSON格式错误: {file_path}")
except Exception as e:
print(f"导入失败: {e}")
# 新增从CSV导入
def import_from_csv(self, file_path):
try:
with open(file_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
student_dicts = list(reader)
students = []
for data in student_dicts:
# 转换数值类型
data['age'] = int(data['age'])
data['height'] = float(data['height'])
data['weight'] = float(data['weight'])
# 转换日期类型
data['birth_date'] = date.fromisoformat(data['birth_date'])
data['enrollment_date'] = date.fromisoformat(data['enrollment_date'])
students.append(Student.from_dict(data))
self.save_students(students)
print(f"{file_path} 导入成功")
except FileNotFoundError:
print(f"文件不存在: {file_path}")
except Exception as e:
print(f"导入失败: {e}")
class StudentDAL:
def __init__(self, file_path: str):
self.file_path = file_path
self._ensure_file_exists()
def _ensure_file_exists(self):
if not os.path.exists(self.file_path):
with open(self.file_path, mode='w', encoding='utf-8') as file:
json.dump([], file)
def save_students(self, students: List[Student]) -> None:
student_dicts = [student.to_dict() for student in students]
with open(self.file_path, mode='w', encoding='utf-8') as file:
json.dump(student_dicts, file, ensure_ascii=False, indent=4)
def load_students(self) -> List[Student]:
with open(self.file_path, mode='r', encoding='utf-8') as file:
student_dicts = json.load(file)
return [Student.from_dict(student_dict) for student_dict in student_dicts]
def add_student(self, student: Student) -> bool:
students = self.load_students()
for existing_student in students:
if existing_student.sid == student.sid:
return False
students.append(student)
self.save_students(students)
return True
def find_student_by_sid(self, sid: str) -> Optional[Student]:
students = self.load_students()
for student in students:
if student.sid == sid:
return student
return None
def find_students_by_name(self, name: str) ->List[Student]:
students = self.load_students()
return [student for student in students if name.lower() in student.name.lower()]
def find_students_by_class_name(self, class_name: str) ->List[Student]:
students = self.load_students()
return [s for s in students if class_name.lower() in s.class_name.lower()]
def get_all_students(self) -> List[Student]:
return self.load_students()
def update_student(self, sid: str, updated_student: Student) -> bool:
students = self.load_students()
for i, student in enumerate(students):
if student.sid == sid:
students[i] = updated_student
self.save_students(students)
return True
return False
def update_student_partial(self, sid:str, name:Optional[str]=None, height:Optional[float]=None, birth_date:Optional[date]=None, enrollment_date:Optional[date]=None, class_name:Optional[str]=None) ->bool:
students = self.load_students()
for i, student in enumerate(students):
if student.sid == sid:
if name is not None:
students[i].name = name
if height is not None:
students[i].height = height
if birth_date is not None:
students[i].birth_date = birth_date
if enrollment_date is not None:
students[i].enrollment_date = enrollment_date
if class_name is not None:
students[i].class_name = class_name
self.save_students(students)
return True
return False
def delete_student(self, sid: str) -> bool:
"""根据学生 ID 删除学生"""
students = self.load_students()
initial_length = len(students)
students = [student for student in students if student.sid != sid]
if len(students) < initial_length:
self.save_students(students)
return True
return False
def clear_all_students(self) -> None:
self.save_students([])
def get_student_count(self) -> int:
return len(self.load_students())
def check_student_exists(self, sid: str) ->bool:
return self.find_student_by_sid(sid) is not None
def export_to_json(self, file_path):
students = self.load_students()
student_dicts = [student.to_dict() for student in students]
with open(file_path, mode='w', encoding='utf-8') as file:
json.dump(student_dicts, file, ensure_ascii=False, indent=4)
def import_from_json(self, file_path):
try:
with open(file_path, mode='r', encoding='utf-8') as file:
student_dicts = json.load(file)
students = [Student.from_dict(student_dict) for student_dict in student_dicts]
self.save_students(students)
except FileNotFoundError:
print(f"文件 {file_path} 未找到。")
except json.JSONDecodeError:
print(f"文件 {file_path} 不是有效的 JSON 文件。")
except ValueError as e:
print(f"数据转换错误: {e}")
def export_to_csv(self, file_path):
students = self.load_students()
with open(file_path, mode='w', encoding='utf-8', newline='') as file:
fieldnames = ['sid', 'name', 'height', 'birth_date', 'enrollment_date', 'class_name']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for student in students:
writer.writerow(student.to_dict())
def import_from_csv(self, file_path):
try:
students = []
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
row['birth_date'] = date.fromisoformat(row['birth_date'])
row['enrollment_date'] = date.fromisoformat(row['enrollment_date'])
student = Student.from_dict(row)
students.append(student)
self.save_students(students)
except FileNotFoundError:
print(f"文件 {file_path} 未找到。")
except ValueError as e:
print(f"日期格式错误: {e}")
def get_average_height(self):
students = self.load_students()
if not students:
return 0
total_height = sum(student.height for student in students)
return total_height / len(students)
def count_students_by_height_range(self, min_height, max_height):
students = self.load_students()
return len([student for student in students if min_height <= student.height <= max_height])
def count_students_by_enrollment_year(self, year):
students = self.load_students()
return len([student for student in students if student.enrollment_date.year == year])
def count_students_by_age_range(self, min_age, max_age):
today = date.today()
students = self.load_students()
return len([student for student in students if min_age <= today.year - student.birth_date.year <= max_age])
def backup_data(self, backup_path):
try:
with open(self.file_path, mode='r', encoding='utf-8') as src_file, \
open(backup_path, mode='w', encoding='utf-8') as dst_file:
dst_file.write(src_file.read())
print(f"数据备份到 {backup_path} 成功。")
except Exception as e:
print(f"数据备份失败: {e}")
def restore_data(self, backup_path):
try:
with open(backup_path, mode='r', encoding='utf-8') as src_file, \
open(self.file_path, mode='w', encoding='utf-8') as dst_file:
dst_file.write(src_file.read())
print(f"数据从 {backup_path} 恢复成功。")
except FileNotFoundError:
print(f"备份文件 {backup_path} 未找到。")
except Exception as e:
print(f"数据恢复失败: {e}")