You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
6.0 KiB
175 lines
6.0 KiB
import json
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from dal.interfaces import IStudentDAL
|
|
from models.student import Student
|
|
|
|
|
|
class JSONStudentDAL(IStudentDAL):
|
|
def __init__(self, file_path: str = "students.json"):
|
|
self.file_path = Path(file_path)
|
|
self._ensure_file_exists()
|
|
|
|
def _ensure_file_exists(self):
|
|
"""确保JSON文件存在"""
|
|
if not self.file_path.exists():
|
|
with open(self.file_path, 'w', encoding='utf-8') as f:
|
|
json.dump([], f, ensure_ascii=False, indent=2)
|
|
|
|
def _load_data(self) -> List[Dict[str, Any]]:
|
|
"""从JSON文件加载所有数据"""
|
|
try:
|
|
with open(self.file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
return []
|
|
|
|
def _save_data(self, data: List[Dict[str, Any]]) -> bool:
|
|
"""保存数据到JSON文件"""
|
|
try:
|
|
with open(self.file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
print(f"保存数据时出错: {e}")
|
|
return False
|
|
|
|
def add_student(self, student: Student) -> bool:
|
|
"""添加学生到JSON文件"""
|
|
students_data = self._load_data()
|
|
|
|
# 检查是否已存在
|
|
if any(s['id_card'] == student.id_card or s['stu_id'] == student.stu_id
|
|
for s in students_data):
|
|
return False
|
|
|
|
students_data.append(student.to_dict())
|
|
return self._save_data(students_data)
|
|
|
|
def delete_student(self, id_card: str) -> bool:
|
|
"""从JSON文件删除学生"""
|
|
students_data = self._load_data()
|
|
original_count = len(students_data)
|
|
|
|
students_data = [s for s in students_data if s['id_card'] != id_card]
|
|
|
|
if len(students_data) < original_count:
|
|
return self._save_data(students_data)
|
|
return False
|
|
|
|
def update_student(self, student: Student) -> bool:
|
|
"""更新JSON文件中的学生信息"""
|
|
students_data = self._load_data()
|
|
updated = False
|
|
|
|
for i, s in enumerate(students_data):
|
|
if s['id_card'] == student.id_card:
|
|
students_data[i] = student.to_dict()
|
|
updated = True
|
|
break
|
|
|
|
if updated:
|
|
return self._save_data(students_data)
|
|
return False
|
|
|
|
def get_by_id(self, id_card: str) -> Optional[Student]:
|
|
"""根据身份证号获取学生"""
|
|
students_data = self._load_data()
|
|
for s in students_data:
|
|
if s['id_card'] == id_card:
|
|
return Student.from_dict(s)
|
|
return None
|
|
|
|
def get_by_stu_id(self, stu_id: str) -> Optional[Student]:
|
|
"""根据学号获取学生"""
|
|
students_data = self._load_data()
|
|
for s in students_data:
|
|
if s['stu_id'] == stu_id:
|
|
return Student.from_dict(s)
|
|
return None
|
|
|
|
def get_all(self) -> List[Student]:
|
|
"""获取所有学生"""
|
|
students_data = self._load_data()
|
|
return [Student.from_dict(s) for s in students_data]
|
|
|
|
def get_by_name(self, name: str) -> List[Student]:
|
|
"""根据姓名查询学生信息(模糊查询)"""
|
|
students_data = self._load_data()
|
|
return [Student.from_dict(s) for s in students_data
|
|
if name.lower() in s['name'].lower()]
|
|
|
|
def get_by_class(self, class_name: str) -> List[Student]:
|
|
"""根据班级查询学生信息(模糊查询)"""
|
|
students_data = self._load_data()
|
|
return [Student.from_dict(s) for s in students_data
|
|
if s['class_name'] and class_name.lower() in s['class_name'].lower()]
|
|
|
|
def get_by_major(self, major: str) -> List[Student]:
|
|
"""根据专业查询学生信息(模糊查询)"""
|
|
students_data = self._load_data()
|
|
return [Student.from_dict(s) for s in students_data
|
|
if s['major'] and major.lower() in s['major'].lower()]
|
|
|
|
def count_students(self) -> int:
|
|
"""统计学生总数"""
|
|
students_data = self._load_data()
|
|
return len(students_data)
|
|
|
|
def count_by_major(self) -> Dict[str, int]:
|
|
"""按专业统计学生人数"""
|
|
students_data = self._load_data()
|
|
result = {}
|
|
|
|
for s in students_data:
|
|
major = s.get('major', '未指定')
|
|
result[major] = result.get(major, 0) + 1
|
|
|
|
return result
|
|
|
|
def calculate_avg_height(self, group_by: str = None) -> Dict[str, float]:
|
|
"""计算平均身高"""
|
|
students_data = self._load_data()
|
|
|
|
if not group_by:
|
|
heights = [s['height'] for s in students_data if s.get('height') is not None]
|
|
return {'all': sum(heights) / len(heights)} if heights else {}
|
|
|
|
# 按指定字段分组计算
|
|
groups = {}
|
|
for s in students_data:
|
|
if s.get('height') is None:
|
|
continue
|
|
|
|
key = s.get(group_by, '未指定')
|
|
if key not in groups:
|
|
groups[key] = []
|
|
groups[key].append(s['height'])
|
|
|
|
return {k: sum(v) / len(v) for k, v in groups.items()}
|
|
|
|
def calculate_avg_weight(self, group_by: str = None) -> Dict[str, float]:
|
|
"""计算平均体重"""
|
|
students_data = self._load_data()
|
|
|
|
if not group_by:
|
|
weights = [s['weight'] for s in students_data if s.get('weight') is not None]
|
|
return {'all': sum(weights) / len(weights)} if weights else {}
|
|
|
|
# 按指定字段分组计算
|
|
groups = {}
|
|
for s in students_data:
|
|
if s.get('weight') is None:
|
|
continue
|
|
|
|
key = s.get(group_by, '未指定')
|
|
if key not in groups:
|
|
groups[key] = []
|
|
groups[key].append(s['weight'])
|
|
|
|
return {k: sum(v) / len(v) for k, v in groups.items()}
|
|
|
|
def clear_all(self) -> bool:
|
|
"""清空所有学生数据"""
|
|
return self._save_data([]) |