You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

768 lines
30 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import os
import bcrypt
import uuid
from datetime import datetime, timedelta
# 导入优化后的待办事项和日程管理模块
from todo.todo_data import TodoManager as OptimizedTodoManager
from schedule.schedule_data import ScheduleManager as OptimizedScheduleManager
"""用户认证与数据管理模块
该模块实现了完整的本地用户认证系统和个人数据管理功能,包括:
1. 用户注册与登录认证
2. 密码加密与验证
3. 会话管理与有效性验证
4. 日程管理(创建、查询、更新、删除)
5. 待办事项管理(创建、标记完成、更新、删除)
主要特点:
- 使用bcrypt进行密码安全加密
- 基于文件的本地数据存储
- 支持会话过期机制
- 为每个用户提供独立的数据文件
- 包含完善的日志记录功能
"""
class UserAuth:
"""本地用户认证与数据管理系统
该类提供了完整的用户认证和个人数据管理功能,包括:
1. 用户注册、登录、登出功能
2. 密码加密与验证
3. 会话创建与管理
4. 日程管理功能
5. 待办事项管理功能
属性:
users_file (str): 用户数据存储文件路径
sessions_file (str): 会话数据存储文件路径
logger: 日志记录器对象(可选)
users (dict): 加载的用户数据
sessions (dict): 加载的会话数据
"""
def __init__(self, logger=None):
"""初始化用户认证系统
Args:
logger: 日志记录器对象,用于记录系统操作日志
"""
self.users_file = "data/users.json" # 用户数据文件路径
self.sessions_file = "data/sessions.json" # 会话数据文件路径
self.logger = logger # 日志记录器
self.users = self._load_users() # 加载用户数据
self.sessions = self._load_sessions() # 加载会话数据
# 初始化优化后的待办事项和日程管理模块
self.todo_manager = OptimizedTodoManager(logger=logger)
self.schedule_manager = OptimizedScheduleManager(logger=logger)
def _load_users(self):
"""加载用户数据
从本地JSON文件加载所有用户数据确保数据目录存在。
Returns:
dict: 包含所有用户信息的字典,格式为{username: user_info}
"""
try:
# 确保data目录存在如果不存在则创建
os.makedirs(os.path.dirname(self.users_file), exist_ok=True)
if os.path.exists(self.users_file):
# 读取并解析用户数据文件
with open(self.users_file, "r", encoding="utf-8") as f:
return json.load(f)
return {} # 文件不存在时返回空字典
except Exception as e:
print(f"加载用户数据失败: {e}")
return {} # 出错时返回空字典
def _save_users(self):
"""保存用户数据
将当前用户数据保存到本地JSON文件。
Returns:
bool: 保存成功返回True失败返回False
"""
try:
# 确保data目录存在
os.makedirs(os.path.dirname(self.users_file), exist_ok=True)
# 将用户数据写入JSON文件
with open(self.users_file, "w", encoding="utf-8") as f:
json.dump(self.users, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存用户数据失败: {e}")
return False
def _load_sessions(self):
"""加载会话数据
从本地JSON文件加载所有会话数据确保数据目录存在。
Returns:
dict: 包含所有会话信息的字典,格式为{session_id: session_info}
"""
try:
# 确保data目录存在
os.makedirs(os.path.dirname(self.sessions_file), exist_ok=True)
if os.path.exists(self.sessions_file):
# 读取并解析会话数据文件
with open(self.sessions_file, "r", encoding="utf-8") as f:
return json.load(f)
return {} # 文件不存在时返回空字典
except Exception as e:
print(f"加载会话数据失败: {e}")
return {} # 出错时返回空字典
def _save_sessions(self):
"""保存会话数据
将当前会话数据保存到本地JSON文件。
Returns:
bool: 保存成功返回True失败返回False
"""
try:
# 确保data目录存在
os.makedirs(os.path.dirname(self.sessions_file), exist_ok=True)
# 将会话数据写入JSON文件
with open(self.sessions_file, "w", encoding="utf-8") as f:
json.dump(self.sessions, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"保存会话数据失败: {e}")
return False
def hash_password(self, password):
"""使用bcrypt算法对密码进行加密
该方法使用bcrypt算法为密码生成安全的哈希值包括
1. 生成随机盐值
2. 使用盐值对密码进行加密
3. 返回加密后的字符串
Args:
password (str): 原始密码字符串
Returns:
str: 加密后的密码哈希值
"""
# 生成随机盐值,增加密码安全性
salt = bcrypt.gensalt()
# 使用盐值和密码生成哈希值
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
# 将字节字符串转换为普通字符串返回
return hashed.decode('utf-8')
def check_password(self, password, hashed_password):
"""验证密码是否与哈希值匹配
使用bcrypt算法验证原始密码与存储的哈希值是否匹配。
Args:
password (str): 原始密码字符串
hashed_password (str): 存储的密码哈希值
Returns:
bool: 密码匹配返回True不匹配或出错返回False
"""
try:
# 验证密码与哈希值是否匹配
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
except Exception:
# 任何异常都返回False提高安全性
return False
def register(self, username, password, email=""):
"""用户注册功能
该方法实现了完整的用户注册流程,包括:
1. 用户名唯一性检查
2. 密码强度验证长度不少于6位
3. 密码加密处理
4. 用户信息创建
5. 用户数据保存
6. 注册结果记录(如配置了日志)
7. 注册结果返回
Args:
username (str): 用户名
password (str): 密码
email (str, optional): 电子邮箱,默认为空
Returns:
tuple: (bool, str)
- bool: 注册成功返回True失败返回False
- str: 注册结果描述
"""
if username in self.users:
if self.logger:
self.logger.log("warning", f"用户注册失败:用户名 {username} 已存在", username)
return False, "用户名已存在"
# 验证密码强度
if len(password) < 6:
if self.logger:
self.logger.log("warning", f"用户注册失败:密码长度不足", username)
return False, "密码长度不能少于6位"
# 创建用户
self.users[username] = {
"password": self.hash_password(password),
"email": email,
"created_at": datetime.now().isoformat(),
"last_login": None
}
if self._save_users():
if self.logger:
self.logger.log("info", f"用户 {username} 注册成功", username)
return True, "注册成功"
else:
if self.logger:
self.logger.log("error", f"用户 {username} 注册失败:保存用户数据失败", username)
return False, "注册失败,请重试"
def login(self, username, password):
"""用户登录功能
该方法实现了完整的用户登录流程,包括:
1. 用户名存在性检查
2. 密码正确性验证
3. 最后登录时间更新
4. 用户数据保存
5. 会话创建(包含过期时间)
6. 会话数据保存
7. 登录结果记录(如配置了日志)
8. 登录结果与会话ID返回
Args:
username (str): 用户名
password (str): 密码
Returns:
tuple: (bool, str)
- bool: 登录成功返回True失败返回False
- str: 登录成功返回会话ID失败返回错误描述
"""
if username not in self.users:
if self.logger:
self.logger.log("warning", f"用户登录失败:用户名 {username} 不存在")
return False, "用户名不存在"
# 验证密码
if not self.check_password(password, self.users[username]["password"]):
if self.logger:
self.logger.log("warning", f"用户登录失败:用户名 {username} 密码错误")
return False, "密码错误"
# 更新最后登录时间
self.users[username]["last_login"] = datetime.now().isoformat()
self._save_users()
# 创建会话
session_id = username + "_" + datetime.now().strftime("%Y%m%d%H%M%S")
self.sessions[session_id] = {
"username": username,
"created_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(days=7)).isoformat()
}
self._save_sessions()
if self.logger:
self.logger.log("info", f"用户 {username} 登录成功")
return True, session_id
def logout(self, session_id):
"""用户登出功能
该方法实现了完整的用户登出流程,包括:
1. 会话ID有效性检查
2. 会话删除
3. 会话数据保存
4. 登出结果记录(如配置了日志)
5. 登出结果返回
Args:
session_id (str): 用户会话ID
Returns:
bool: 登出成功返回True失败返回False
"""
if session_id in self.sessions:
username = self.sessions[session_id]["username"]
del self.sessions[session_id]
self._save_sessions()
if self.logger:
self.logger.log("info", f"用户 {username} 登出成功", username)
return True
if self.logger:
self.logger.log("warning", f"用户登出失败无效的会话ID {session_id}")
return False
def validate_session(self, session_id):
"""验证会话ID的有效性
该方法验证用户会话的有效性,包括:
1. 会话ID存在性检查
2. 会话过期时间验证
3. 过期会话自动清理
4. 验证结果返回
Args:
session_id (str): 用户会话ID
Returns:
tuple: (bool, str)
- bool: 会话有效返回True无效返回False
- str: 会话有效时返回用户名无效时返回None
"""
# 检查会话ID是否存在
if session_id not in self.sessions:
return False, None
# 获取会话信息
session = self.sessions[session_id]
# 解析过期时间
expires_at = datetime.fromisoformat(session["expires_at"])
# 检查会话是否过期
if datetime.now() > expires_at:
# 清理过期会话
del self.sessions[session_id]
# 保存会话数据
self._save_sessions()
return False, None
# 返回会话有效性和用户名
return True, session["username"]
def get_current_user(self, session_id):
"""获取当前用户信息
该方法获取指定会话ID对应的用户详细信息不包含密码包括
1. 会话有效性验证
2. 用户存在性检查
3. 用户信息获取
4. 敏感信息(密码)移除
5. 用户信息返回
Args:
session_id (str): 用户会话ID
Returns:
dict: 用户信息字典包含email、created_at、last_login等不包含密码
会话无效或用户不存在时返回None
"""
valid, username = self.validate_session(session_id)
if not valid:
return None
if username in self.users:
user_info = self.users[username].copy()
# 不返回密码
user_info.pop("password", None)
return user_info
return None
# ==================== 日程管理功能 ====================
def _get_schedule_file(self, user_identifier):
"""(兼容方法) 获取用户日程文件路径
根据用户标识符session_id或用户名获取对应的日程文件路径。
此方法现在仅用于向后兼容实际操作由schedule_manager处理。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
Returns:
str: 用户日程文件的完整路径
"""
# 这个方法在新版存储系统中不再需要实际文件路径,
# 但为了保持API兼容返回一个模拟路径
return self.schedule_manager._get_schedule_file(user_identifier)
def load_schedules(self, user_identifier):
"""加载用户日程数据
从用户的日程存储中加载所有日程数据。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
Returns:
list: 日程数据列表,格式为[{"id": str, "title": str, ...}, ...]
加载失败返回空列表
"""
return self.schedule_manager.load_schedules(user_identifier)
def save_schedules(self, user_identifier, schedules):
"""保存用户日程数据
将用户的日程数据保存到优化的存储系统中。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
schedules (list): 日程数据列表,格式为[{"id": str, "title": str, ...}, ...]
Returns:
bool: 保存成功返回True失败返回False
"""
return self.schedule_manager.save_schedules(user_identifier, schedules)
def create_schedule(self, user_identifier, schedule_data):
"""创建新日程
创建一个新的日程条目,并保存到优化的存储系统中。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
schedule_data (dict): 日程数据包含title、description、start_time等字段
Returns:
tuple: (bool, str, dict)
- bool: 创建成功返回True失败返回False
- str: 操作结果描述
- dict: 创建的日程数据失败时返回None
"""
# 使用优化的日程管理器创建新日程
success, schedule = self.schedule_manager.create_schedule(user_identifier, schedule_data)
if success:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 创建了新日程:{schedule_data.get('title')}", username)
else:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 创建日程失败:{schedule_data.get('title')}", username)
return success, schedule
def update_schedule(self, user_identifier, schedule_id, schedule_data):
"""更新日程信息
更新指定ID的日程条目并保存到用户的日程列表中。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
schedule_id (str): 要更新的日程ID
schedule_data (dict): 更新后的日程数据,包含需要修改的字段
Returns:
tuple: (bool, str, dict)
- bool: 更新成功返回True失败返回False
- str: 操作结果描述
- dict: 更新后的日程数据失败时返回None
"""
# 加载用户的所有日程
schedules = self.load_schedules(user_identifier)
for i, schedule in enumerate(schedules):
if schedule["id"] == schedule_id:
schedules[i].update(schedule_data)
success = self.save_schedules(user_identifier, schedules)
if success:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 更新了日程:{schedule.get('title')}", username)
else:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 更新日程失败:{schedule.get('title')}", username)
return success
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("warning", f"用户 {username} 更新日程失败找不到日程ID {schedule_id}", username)
return False
def delete_schedule(self, user_identifier, schedule_id):
"""删除指定ID的日程
删除用户日程列表中指定ID的日程条目。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
schedule_id (str): 要删除的日程ID
Returns:
bool: 删除成功返回True失败返回False
"""
# 加载用户的所有日程
schedules = self.load_schedules(user_identifier)
# 获取要删除的日程标题,用于日志记录
schedule_title = ""
for schedule in schedules:
if schedule["id"] == schedule_id:
schedule_title = schedule.get("title", "未知日程")
break
# 过滤掉要删除的日程
schedules = [s for s in schedules if s["id"] != schedule_id]
# 保存更新后的日程列表
success = self.save_schedules(user_identifier, schedules)
if success:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 删除了日程:{schedule_title}", username)
else:
# 获取用户名
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 删除日程失败:{schedule_title}", username)
return success
def get_schedules_by_date(self, user_identifier, date_str):
"""获取指定日期的所有日程
从用户的日程列表中筛选出指定日期的所有日程,并按开始时间排序。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
date_str (str): 日期字符串格式与日程的start_time字段匹配
Returns:
list: 指定日期的日程列表,按开始时间排序
"""
# 加载用户的所有日程
schedules = self.load_schedules(user_identifier)
result = []
# 筛选出指定日期的日程
for schedule in schedules:
if date_str in schedule["start_time"]:
result.append(schedule)
# 按开始时间排序
result.sort(key=lambda x: x["start_time"])
return result
# ==================== 待办事项管理功能 ====================
def _get_todo_file(self, user_identifier):
"""(兼容方法) 获取用户待办事项文件路径
根据用户标识符session_id或用户名获取对应的待办事项文件路径。
此方法现在仅用于向后兼容实际操作由todo_manager处理。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
Returns:
str: 用户待办事项文件的完整路径
"""
# 这个方法在新版存储系统中不再需要实际文件路径,
# 但为了保持API兼容返回一个模拟路径
return self.todo_manager._get_todo_file(user_identifier)
def load_todos(self, user_identifier):
"""加载用户待办事项数据
从用户的待办事项存储中加载所有待办事项数据。
支持通过session_id或用户名加载对应用户的待办事项。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
Returns:
list: 待办事项数据列表,格式为[{"id": str, "title": str, "description": str, "due_date": str,
"priority": str, "status": str, "created_at": str}, ...]
加载失败时返回空列表
"""
return self.todo_manager.load_todos(user_identifier)
def save_todos(self, user_identifier, todos):
"""保存用户待办事项
将用户的待办事项数据保存到优化的存储系统中。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
todos (list): 待办事项数据列表,格式为[{"id": str, "title": str, ...}, ...]
Returns:
bool: 保存成功返回True失败返回False
"""
return self.todo_manager.save_todos(user_identifier, todos)
def create_todo(self, user_identifier, todo_data):
"""创建新待办事项
为用户创建一个新的待办事项,并保存到优化的存储系统中。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
todo_data (dict): 待办事项数据包含title、description、due_date等字段
Returns:
tuple: 创建成功返回(success, todo)元组,失败返回(success, None)
"""
# 使用优化的待办事项管理器创建新待办事项
success, new_todo = self.todo_manager.create_todo(user_identifier, todo_data)
if success:
# 记录操作日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 创建了新待办事项:{todo_data.get('title')}", username)
else:
# 记录失败日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 创建待办事项失败:{todo_data.get('title')}", username)
return success, new_todo
def update_todo(self, user_identifier, todo_id, todo_data):
"""更新待办事项
根据ID更新用户的待办事项信息。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
todo_id (str): 待办事项的唯一ID
todo_data (dict): 要更新的待办事项数据,如{"title": "新标题", "status": "completed"}
Returns:
bool: 更新成功返回True失败返回False
"""
# 加载用户的所有待办事项
todos = self.load_todos(user_identifier)
# 查找匹配的待办事项
for i, todo in enumerate(todos):
if todo["id"] == todo_id:
# 更新待办事项信息
todos[i].update(todo_data)
# 保存更新后的待办事项列表
success = self.save_todos(user_identifier, todos)
if success:
# 记录更新成功日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 更新了待办事项:{todo.get('title')}", username)
else:
# 记录更新失败日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 更新待办事项失败:{todo.get('title')}", username)
return success
# 未找到指定ID的待办事项
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("warning", f"用户 {username} 更新待办事项失败找不到待办事项ID {todo_id}", username)
return False
def complete_todo(self, user_identifier, todo_id):
"""标记待办事项为已完成
将指定ID的待办事项状态更新为"completed"
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
todo_id (str): 待办事项的唯一ID
Returns:
bool: 操作成功返回True失败返回False
"""
# 获取待办事项信息,用于日志记录
todos = self.load_todos(user_identifier)
todo_title = ""
for todo in todos:
if todo["id"] == todo_id:
todo_title = todo.get("title", "未知待办事项")
break
# 调用update_todo方法更新状态
success = self.update_todo(user_identifier, todo_id, {"status": "completed"})
if success:
# 记录完成成功日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 完成了待办事项:{todo_title}", username)
else:
# 记录完成失败日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 标记待办事项完成失败:{todo_title}", username)
return success
def delete_todo(self, user_identifier, todo_id):
"""删除待办事项
根据ID删除用户的待办事项。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
todo_id (str): 待办事项的唯一ID
Returns:
bool: 删除成功返回True失败返回False
"""
# 加载用户的所有待办事项
todos = self.load_todos(user_identifier)
# 获取要删除的待办事项标题,用于日志记录
todo_title = ""
for todo in todos:
if todo["id"] == todo_id:
todo_title = todo.get("title", "未知待办事项")
break
# 过滤掉指定ID的待办事项
todos = [t for t in todos if t["id"] != todo_id]
# 保存更新后的待办事项列表
success = self.save_todos(user_identifier, todos)
if success:
# 记录删除成功日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("info", f"用户 {username} 删除了待办事项:{todo_title}", username)
else:
# 记录删除失败日志
valid, username = self.validate_session(user_identifier)
if valid and self.logger:
self.logger.log("error", f"用户 {username} 删除待办事项失败:{todo_title}", username)
return success
def get_pending_todos(self, user_identifier):
"""获取所有待处理的待办事项
获取用户所有状态为"pending"的待办事项。
Args:
user_identifier (str): 用户标识符可以是session_id或用户名
Returns:
list: 待处理的待办事项列表
"""
# 加载用户的所有待办事项
todos = self.load_todos(user_identifier)
# 过滤出待处理的待办事项
return [t for t in todos if t["status"] == "pending"]