yangyixuan_branch
yyx 7 months ago
parent 4e9059953c
commit a1fbb41d79

@ -7,7 +7,7 @@
3. **接口实现** 提供获取下一题和提交答案的逻辑
"""
import uuid
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Optional, Tuple
# 引入新的试卷生成模块
from . import paper_gen
@ -15,6 +15,10 @@ from . import paper_gen
# --- 内存会话存储 ---
_sessions: Dict[str, Dict[str, Any]] = {}
# 【新增】缓存上一次成功生成的试卷题目
# 键结构: (grade, count)
_last_generated_papers: Dict[Tuple[str, int], List[Dict]] = {}
# ------------------------------
@ -22,7 +26,7 @@ _sessions: Dict[str, Dict[str, Any]] = {}
def create_session(grade: str, count: int) -> dict:
"""
创建测验会话
首先调用 paper_gen.py 生成题目然后初始化会话数据到内存
核心修改实现再答一次功能优先从缓存中获取题目
Args:
grade (str): 年级
@ -31,22 +35,35 @@ def create_session(grade: str, count: int) -> dict:
dict: { success, session_id, total_questions, source } 或错误信息
"""
# 核心:调用 paper_gen 模块生成题目 (已将逻辑从本文件移除)
gen_result = paper_gen.generate_test(grade, count)
questions_key = (grade, count)
generated_questions = _last_generated_papers.get(questions_key)
source = "generator" # 默认来源
# --- 1. 检查缓存,尝试重用题目 ---
if generated_questions:
print(f"INFO: Reusing questions from last paper cache for grade={grade}, count={count}")
source = "reused" # 标记来源为重用
else:
# --- 2. 缓存未命中,调用 paper_gen 模块生成新题目 ---
gen_result = paper_gen.generate_test(grade, count)
if not gen_result["success"]:
return gen_result # 返回生成失败的消息
if not gen_result["success"]:
return gen_result # 返回生成失败的消息
generated_questions = gen_result["questions"]
source = gen_result["source"]
generated_questions = gen_result["questions"]
source = gen_result["source"]
# 【更新缓存】生成成功后,将新的题目缓存起来
_last_generated_papers[questions_key] = generated_questions
# 3. 初始化新会话数据
session_id = str(uuid.uuid4())
_sessions[session_id] = {
"questions": generated_questions,
"answers": {}, # {question_id: answer_key}
"index": 0, # 当前题目索引
"index": 0, # 当前题目索引,新会话从 0 开始
"grade": grade,
"source": source # 记录题目来源(例如: generator, external, fallback
"source": source # 记录题目来源(generator, reused 等
}
return {
@ -71,6 +88,7 @@ def get_next_question(session_id: str) -> dict:
return {"success": False, "message": "无效会话"}
# 检查是否完成
# 【注意】这里访问的是 'index',确保和 create_session 中的键名一致
if s["index"] >= len(s["questions"]):
return {"success": False, "finished": True}
@ -119,14 +137,7 @@ def submit_answer(session_id: str, question_id: str, answer: str) -> dict:
def get_session_data_for_scoring(session_id: str) -> Optional[Dict[str, Any]]:
"""
新增公共接口
paper_scoring 模块提供会话的只读数据副本
职责根据 session_id 获取会话的题目和用户答案防止外部直接修改 _sessions
Args:
session_id (str): 会话 ID
Returns:
Optional[Dict[str, Any]]: 包含 'questions' 'answers' 的数据字典 None
"""
s = _sessions.get(session_id)
if not s:
@ -141,7 +152,6 @@ def get_session_data_for_scoring(session_id: str) -> Optional[Dict[str, Any]]:
def clear_session(session_id: str) -> bool:
"""
新增公共接口
评分完成后清理内存中的会话
Args:
@ -151,6 +161,7 @@ def clear_session(session_id: str) -> bool:
"""
if session_id in _sessions:
del _sessions[session_id]
# 注意: 这里不清理 _last_generated_papers因为我们希望保留它用于“再答一次”
return True
return False

@ -10,6 +10,8 @@ from typing import Dict, Any, List
# 引入策略模块
from . import question_filter
from . import paper_save # 引入试卷保存模块
from . import user_service # 引入用户服务模块,用于获取当前用户
# 实例化策略服务
quiz_generator_strategy = question_filter.DefaultQuizStrategy()
@ -39,6 +41,20 @@ def generate_test(grade: str, count: int) -> Dict[str, Any]:
if not generated_questions:
return {"success": False, "message": "无法生成任何题目"}
# 仅当试卷由生成器生成(即不是 paper_act.py 的重用逻辑)时,才保存
if source == "generator":
# 1. 获取当前登录的用户名 (假设 login_status 返回有效的用户名)
current_user = user_service.get_current_user_info()
username = current_user.get('username')
if username:
# 2. 调用保存函数
paper_save.save_paper(username, generated_questions)
else:
# 警告:未登录状态下生成的试卷不保存
print("WARNING: User not logged in. Skipping paper save.")
# ==========================================================
return {
"success": True,
"questions": generated_questions,

@ -0,0 +1,97 @@
# backend/paper_save.py
"""
试卷持久化存储模块 (paper_save.py)
... (其他说明保持不变)
"""
import json
import time
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime
from . import user_service
# --- 存储路径配置 (保持不变) ---
def _resolve_paper_dir(username: str) -> Path:
"""解析并创建试卷存储目录shared/paper/username"""
shared_dir = user_service._resolve_shared_dir()
paper_dir = shared_dir / 'paper' / username
paper_dir.mkdir(parents=True, exist_ok=True)
return paper_dir
# --------------------
def _format_questions_to_text(questions: List[Dict[str, Any]]) -> str:
"""
将题目列表格式化为用户指定的 TXT 文本格式
"""
lines = []
option_keys = ['A', 'B', 'C', 'D'] # 定义选项字母
# 遍历所有题目i 从 0 开始,题号从 1 开始
for i, q in enumerate(questions):
# 1. 构造题号和题干
question_text = f"{i + 1}. {q.get('text', '【题干缺失】')}"
lines.append(question_text)
# 2. 构造选项行
options_list = q.get('options', [])
options_text = []
# 【核心修正】:假设 options_list 是一个字符串列表。
# 使用 enumerate 安全地将选项键 ('A', 'B', ...) 与选项文本 (value) 组合。
for j, value in enumerate(options_list):
# 确保选项键没有越界
if j < len(option_keys):
key = option_keys[j]
# 假设 value 是选项文本 (str)
options_text.append(f"{key}. {value}")
else:
break
# 使用四个空格连接选项
lines.append(" ".join(options_text))
# 3. 题目之间空一行
lines.append("")
return "\n".join(lines)
def save_paper(username: str, questions: List[Dict[str, Any]]) -> None:
"""
保存试卷数据到用户对应的文件夹
... (函数体保持不变只调用了 _format_questions_to_text)
"""
if not username or not questions:
print("WARNING: Cannot save paper. Username or questions list is empty.")
return
try:
# 1. 解析目标目录
target_dir = _resolve_paper_dir(username)
# 2. 文件名格式:年-月-日-时-分-秒.txt
current_time = datetime.now()
filename_base = current_time.strftime("%Y-%m-%d-%H-%M-%S")
filename = f"{filename_base}.txt"
target_file = target_dir / filename
# 3. 格式化内容
# 【注意】这里调用了修正后的格式化函数
formatted_content = _format_questions_to_text(questions)
# 4. 写入 TXT 文件
if target_file.exists():
print(f"WARNING: File {filename} already exists. Overwriting the existing paper.")
with open(target_file, 'w', encoding='utf-8') as f:
f.write(formatted_content)
print(f"INFO: Paper saved successfully for user {username} to {target_file}")
except Exception as e:
# 这里捕获了新的错误,请确保使用新的代码,否则还会出错
print(f"ERROR: Failed to save paper for user {username}: {e}")

@ -26,6 +26,9 @@ import sys
PASSWORD_PATTERN = re.compile(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[A-Za-z\d]{6,10}$')
EMAIL_PATTERN = re.compile(r'^[^@\s]+@[^@\s]+\.com$')
# 结构: { 'email': str, 'username': str }
_current_user_info: Dict[str, str] = {}
# 确保在打包或运行环境中找到正确的共享目录
def _resolve_shared_dir() -> Path:
@ -44,6 +47,8 @@ USERS_FILE = _SHARED_DIR / 'users.txt'
if not USERS_FILE.exists():
USERS_FILE.touch()
# 注意:移除了 _codes 的定义,因为 user_service 不再管理验证码
# --- 辅助函数 ---
@ -124,6 +129,8 @@ def verify_registration_code(email: str, code: str) -> Dict[str, Any]:
def complete_registration(email: str, username: str, pw1: str, pw2: str) -> Dict[str, Any]:
"""完成注册,设置用户名和密码。"""
# 【已修正】删除了对 _codes 的检查,因为 verify_registration_code 已经完成了验证
# 且验证成功后验证码记录已被清除。
if pw1 != pw2:
return {'success': False, 'message': '两次输入的密码不匹配'}
@ -141,9 +148,10 @@ def complete_registration(email: str, username: str, pw1: str, pw2: str) -> Dict
# 存储新用户
users[email] = {
'username': username,
'password_hash': _hash_password(pw1)
'password_hash': _hash_password(pw1) # _hash_password 现在直接返回明文密码
}
_save_users(users)
# 【已修正】移除了 _codes.pop(email, None)
return {'success': True, 'message': '注册成功'}
@ -158,11 +166,22 @@ def login(identifier: str, password: str) -> Dict[str, Any]:
user = users[target_email]
# 比较明文密码
if user['password_hash'] == _hash_password(password):
if user['password_hash'] == _hash_password(password): # _hash_password 现在直接返回明文密码
_current_user_info['email'] = target_email
_current_user_info['username'] = user['username']
return {'success': True, 'email': target_email, 'username': user['username']}
else:
return {'success': False, 'message': '密码错误'}
#供外部模块(如 paper_gen.py获取当前用户信息
def get_current_user_info() -> Dict[str, str]:
"""
返回当前登录用户的邮箱和用户名
Returns:
Dict[str, str]: {'email': '...', 'username': '...'} 或空字典 {}
"""
return _current_user_info.copy()
def change_password(email: str, old_pw: str, pw1: str, pw2: str) -> Dict[str, Any]:
"""修改密码(前端调用名为 change_password_full"""
@ -173,7 +192,7 @@ def change_password(email: str, old_pw: str, pw1: str, pw2: str) -> Dict[str, An
return {'success': False, 'message': '用户未登录或邮箱不存在'}
# 比较明文密码
if user['password_hash'] != _hash_password(old_pw):
if user['password_hash'] != _hash_password(old_pw): # _hash_password 现在直接返回明文密码
return {'success': False, 'message': '原密码错误'}
if pw1 != pw2:
@ -183,7 +202,7 @@ def change_password(email: str, old_pw: str, pw1: str, pw2: str) -> Dict[str, An
if pw_error:
return {'success': False, 'message': '新密码校验失败: ' + pw_error}
user['password_hash'] = _hash_password(pw1)
user['password_hash'] = _hash_password(pw1) # _hash_password 现在直接返回明文密码
_save_users(users)
return {'success': True, 'message': '密码修改成功'}
Loading…
Cancel
Save