From bb657f7c87c616d76be5cc8297245db6a769d463 Mon Sep 17 00:00:00 2001 From: hnu202326010322 <1463365450@qq.com> Date: Fri, 10 Oct 2025 15:58:08 +0800 Subject: [PATCH 01/11] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20'src'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/data_handler.py | 53 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/data_handler.py diff --git a/src/data_handler.py b/src/data_handler.py new file mode 100644 index 0000000..9168701 --- /dev/null +++ b/src/data_handler.py @@ -0,0 +1,53 @@ +import json +import os +from pathlib import Path + + +class DataHandler: + def __init__(self): + self.base_dir = Path("math_learning_data") + self.users_file = self.base_dir / "users.json" + self.ensure_data_dir() + + def ensure_data_dir(self): + """确保数据目录和文件存在,并且文件内容是有效的JSON""" + if not self.base_dir.exists(): + self.base_dir.mkdir(parents=True) + + if not self.users_file.exists(): + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) # 写入空对象 + else: + # 如果文件存在但为空,写入空对象 + if os.path.getsize(self.users_file) == 0: + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) + + def load_users(self): + """加载所有用户数据""" + with open(self.users_file, "r", encoding="utf-8") as f: + try: + return json.load(f) + except json.JSONDecodeError: + # 如果文件内容损坏,返回空字典 + return {} + + def save_users(self, users_data): + """保存用户数据""" + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump(users_data, f, ensure_ascii=False, indent=2) + + def get_user(self, email): + """获取用户信息""" + users = self.load_users() + return users.get(email) + + def add_user(self, email, user_data): + """添加新用户""" + users = self.load_users() + users[email] = user_data + self.save_users(users) + + def update_user(self, email, user_data): + """更新用户信息""" + self.add_user(email, user_data) # 直接覆盖更新 \ No newline at end of file -- 2.34.1 From ed06bbea8f29ec07ced908a03c44bda26ffcc574 Mon Sep 17 00:00:00 2001 From: hnu202326010322 <1463365450@qq.com> Date: Fri, 10 Oct 2025 15:58:53 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E5=88=A0=E9=99=A4=20'src/data=5Fhandler.?= =?UTF-8?q?py'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/data_handler.py | 53 --------------------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 src/data_handler.py diff --git a/src/data_handler.py b/src/data_handler.py deleted file mode 100644 index 9168701..0000000 --- a/src/data_handler.py +++ /dev/null @@ -1,53 +0,0 @@ -import json -import os -from pathlib import Path - - -class DataHandler: - def __init__(self): - self.base_dir = Path("math_learning_data") - self.users_file = self.base_dir / "users.json" - self.ensure_data_dir() - - def ensure_data_dir(self): - """确保数据目录和文件存在,并且文件内容是有效的JSON""" - if not self.base_dir.exists(): - self.base_dir.mkdir(parents=True) - - if not self.users_file.exists(): - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump({}, f, ensure_ascii=False) # 写入空对象 - else: - # 如果文件存在但为空,写入空对象 - if os.path.getsize(self.users_file) == 0: - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump({}, f, ensure_ascii=False) - - def load_users(self): - """加载所有用户数据""" - with open(self.users_file, "r", encoding="utf-8") as f: - try: - return json.load(f) - except json.JSONDecodeError: - # 如果文件内容损坏,返回空字典 - return {} - - def save_users(self, users_data): - """保存用户数据""" - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump(users_data, f, ensure_ascii=False, indent=2) - - def get_user(self, email): - """获取用户信息""" - users = self.load_users() - return users.get(email) - - def add_user(self, email, user_data): - """添加新用户""" - users = self.load_users() - users[email] = user_data - self.save_users(users) - - def update_user(self, email, user_data): - """更新用户信息""" - self.add_user(email, user_data) # 直接覆盖更新 \ No newline at end of file -- 2.34.1 From 8eefc87fc2692a5864401befc873c140551d7219 Mon Sep 17 00:00:00 2001 From: hnu202326010322 <1463365450@qq.com> Date: Fri, 10 Oct 2025 15:59:28 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=87=B3=20'src/core'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/data_handler.py | 53 ++++++++++++ src/core/email_service.py | 111 ++++++++++++++++++++++++ src/core/question_bank.py | 175 ++++++++++++++++++++++++++++++++++++++ src/core/user_system.py | 90 ++++++++++++++++++++ 4 files changed, 429 insertions(+) create mode 100644 src/core/data_handler.py create mode 100644 src/core/email_service.py create mode 100644 src/core/question_bank.py create mode 100644 src/core/user_system.py diff --git a/src/core/data_handler.py b/src/core/data_handler.py new file mode 100644 index 0000000..9168701 --- /dev/null +++ b/src/core/data_handler.py @@ -0,0 +1,53 @@ +import json +import os +from pathlib import Path + + +class DataHandler: + def __init__(self): + self.base_dir = Path("math_learning_data") + self.users_file = self.base_dir / "users.json" + self.ensure_data_dir() + + def ensure_data_dir(self): + """确保数据目录和文件存在,并且文件内容是有效的JSON""" + if not self.base_dir.exists(): + self.base_dir.mkdir(parents=True) + + if not self.users_file.exists(): + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) # 写入空对象 + else: + # 如果文件存在但为空,写入空对象 + if os.path.getsize(self.users_file) == 0: + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) + + def load_users(self): + """加载所有用户数据""" + with open(self.users_file, "r", encoding="utf-8") as f: + try: + return json.load(f) + except json.JSONDecodeError: + # 如果文件内容损坏,返回空字典 + return {} + + def save_users(self, users_data): + """保存用户数据""" + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump(users_data, f, ensure_ascii=False, indent=2) + + def get_user(self, email): + """获取用户信息""" + users = self.load_users() + return users.get(email) + + def add_user(self, email, user_data): + """添加新用户""" + users = self.load_users() + users[email] = user_data + self.save_users(users) + + def update_user(self, email, user_data): + """更新用户信息""" + self.add_user(email, user_data) # 直接覆盖更新 \ No newline at end of file diff --git a/src/core/email_service.py b/src/core/email_service.py new file mode 100644 index 0000000..7ecdb82 --- /dev/null +++ b/src/core/email_service.py @@ -0,0 +1,111 @@ +import random +import time +import smtplib +import logging +from email.mime.text import MIMEText +from email.header import Header +from email.utils import formataddr +from typing import Dict +from smtplib import SMTPResponseException + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class EmailService: + def __init__(self, smtp_server: str, smtp_port: int, sender_email: str, sender_password: str): + self.smtp_server = smtp_server + self.smtp_port = smtp_port + self.sender_email = sender_email + self.sender_password = sender_password + self.verification_codes: Dict[str, tuple[str, float]] = {} # 邮箱: (验证码, 过期时间) + self.code_validity = 5 * 60 # 验证码有效期5分钟 + self.last_send_time: Dict[str, float] = {} # 记录上次发送时间 + self.send_interval = 60 # 发送间隔60秒 + + def generate_code(self) -> str: + """生成6位数字验证码""" + return str(random.randint(100000, 999999)) + + def can_send_code(self, email: str) -> bool: + """检查是否可以发送验证码(防止频繁发送)""" + if email in self.last_send_time: + elapsed = time.time() - self.last_send_time[email] + return elapsed >= self.send_interval + return True + + def send_verification_code(self, email: str) -> bool: + """发送验证码到邮箱""" + if not self.can_send_code(email): + logger.warning(f"发送过于频繁,邮箱: {email}") + return False + + code = self.generate_code() + expire_time = time.time() + self.code_validity + self.verification_codes[email] = (code, expire_time) + self.last_send_time[email] = time.time() + + # 邮件内容 + subject = "数学学习软件验证码" + + # 格式化邮箱显示,保护隐私 + try: + at_index = email.index('@') + formatted_email = email[:3] + '...' + email[at_index - 4:at_index] if at_index >= 4 else email + except ValueError: + formatted_email = email + + message = f"""【数学学习软件】您正在登录 {formatted_email} +您的验证码是:{code} +此验证码5分钟内有效,请尽快完成操作。 +如非本人操作,请忽略此邮件。""" + + try: + msg = MIMEText(message, "plain", "utf-8") + msg["Subject"] = Header(subject, "utf-8") + # 发件人用英文名称,避免QQ邮箱拒绝 + msg["From"] = formataddr(("MathSoftware", self.sender_email)) + msg["To"] = email + + logger.info(f"尝试发送邮件到 {email},SMTP服务器: {self.smtp_server}:{self.smtp_port}") + + with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: + server.set_debuglevel(1) + server.login(self.sender_email, self.sender_password) + server.sendmail(self.sender_email, email, msg.as_string()) + + logger.info(f"验证码已发送到 {email}") + return True + + except SMTPResponseException as e: + # 忽略 QQ 邮箱提前关闭连接的异常 + if e.code == -1 and e.message == b'\x00\x00\x00': + logger.info(f"邮件已成功发送到 {email} (忽略连接关闭异常)") + return True + logger.error(f"SMTP响应异常: {e}") + return False + + except smtplib.SMTPAuthenticationError: + logger.error("邮箱认证失败,请检查账号和授权码是否正确") + return False + + except smtplib.SMTPConnectError: + logger.error("无法连接到SMTP服务器,请检查服务器地址和端口") + return False + + except Exception as e: + logger.error(f"邮件发送失败: {str(e)}", exc_info=True) + return False + + def verify_code(self, email: str, code: str) -> bool: + """验证验证码是否有效""" + if email not in self.verification_codes: + return False + + stored_code, expire_time = self.verification_codes[email] + if time.time() > expire_time: + del self.verification_codes[email] + return False + + return stored_code == code \ No newline at end of file diff --git a/src/core/question_bank.py b/src/core/question_bank.py new file mode 100644 index 0000000..437c74f --- /dev/null +++ b/src/core/question_bank.py @@ -0,0 +1,175 @@ +import random +import math +from typing import List, Tuple, Dict + + +class Question: + def __init__(self, content: str, options: List[str], answer: str): + self.content = content + self.options = options + self.answer = answer + + +class QuestionBank: + def __init__(self): + self.operators = { + "primary": ["+", "-", "*", "/"], + "middle": ["+", "-", "*", "/", "^2", "sqrt"], + "high": ["+", "-", "*", "/", "sin", "cos", "tan"] + } + self.generated_questions = [] # 存储已生成题目避免重复 + + def generate_number(self, level: str) -> int: + """根据学段生成数字范围""" + if level == "primary": + return random.randint(1, 100) + elif level == "middle": + return random.randint(1, 500) + else: # high + return random.randint(1, 10) # 三角函数用小数字 + + def generate_expression(self, level: str) -> Tuple[str, float]: + """生成表达式和计算结果,小学学段避免负数""" + op_count = random.randint(1, 3) # 操作数数量 + nums = [self.generate_number(level) for _ in range(op_count + 1)] + ops = [] + + # 根据学段确保特殊运算符 + if level == "middle": + # 确保至少有一个平方或开根号 + special_op = random.choice(["^2", "sqrt"]) + ops = random.choices(self.operators[level], k=op_count - 1) + ops.append(special_op) + random.shuffle(ops) + elif level == "high": + # 确保至少有一个三角函数 + special_op = random.choice(["sin", "cos", "tan"]) + ops = random.choices(self.operators[level], k=op_count - 1) + ops.append(special_op) + random.shuffle(ops) + else: # primary + ops = random.choices(self.operators[level], k=op_count) + # 小学学段处理减法,确保被减数 >= 减数,避免负数 + for i in range(len(ops)): + if ops[i] == "-": + # 保证前面的数大于等于后面的数 + if nums[i] < nums[i + 1]: + nums[i], nums[i + 1] = nums[i + 1], nums[i] + + # 构建表达式字符串和计算结果 + expr_parts = [str(nums[0])] + result = nums[0] + + for i in range(op_count): + op = ops[i] + num = nums[i + 1] if i < len(nums) - 1 else None # 处理特殊运算符 + + expr_parts.append(op) + if op not in ["^2", "sqrt", "sin", "cos", "tan"]: + expr_parts.append(str(num)) + + # 计算结果 + if op == "+": + result += num + elif op == "-": + result -= num + elif op == "*": + result *= num + elif op == "/": + num = max(num, 1) # 避免除零 + result = round(result / num, 2) + elif op == "^2": + result **= 2 + result = round(result, 2) + elif op == "sqrt": + result = round(math.sqrt(abs(result)), 2) + elif op == "sin": + result = round(math.sin(math.radians(result)), 2) + elif op == "cos": + result = round(math.cos(math.radians(result)), 2) + elif op == "tan": + result = round(math.tan(math.radians(result)), 2) + + # 小学题目添加括号(可选,若需要可保留) + if level == "primary" and len(expr_parts) > 3: + # 随机插入括号,同时确保括号内运算结果非负(这里简单处理,可根据实际细化) + start = random.randint(0, len(expr_parts) // 2) * 2 + end = start + 4 + if end < len(expr_parts): + expr_parts.insert(start, "(") + expr_parts.insert(end, ")") + + expr = " ".join(expr_parts) + + try: + # 用 eval 计算实际结果(安全地) + result = eval(expr.replace("^2", "**2")) + result = round(float(result), 2) + except Exception: + # 如果计算出错就重新生成 + return self.generate_expression(level) + + return expr, round(result, 2) + + + + + + def generate_options(self, correct_answer: float) -> Tuple[List[str], str]: + """生成4个选项(1个正确,3个干扰项)- 修复版:确保正确答案在选项中""" + options = [] + # 1. 先添加正确答案,确保核心选项不丢失 + correct_rounded = round(correct_answer, 2) + options.append(correct_rounded) + + # 2. 生成3个干扰项,严格控制范围(正确答案±30%,避免差异过大) + for _ in range(3): + # 计算偏移量:正确答案的±30%范围内,确保干扰项合理 + max_offset = abs(correct_rounded) * 0.3 if correct_rounded != 0 else 10 # 0的情况固定±10 + offset = random.uniform(-max_offset, max_offset) + wrong_answer = round(correct_rounded + offset, 2) + + # 避免干扰项与已有选项重复(包括正确答案) + while wrong_answer in options: + offset = random.uniform(-max_offset, max_offset) + wrong_answer = round(correct_rounded + offset, 2) + + options.append(wrong_answer) + + # 3. 二次确认:强制检查正确答案是否在选项中(兜底逻辑) + if correct_rounded not in options: + # 若意外丢失,替换第一个干扰项为正确答案 + options[0] = correct_rounded + + # 4. 打乱选项顺序并转换为字符串(保留2位小数格式) + random.shuffle(options) + options_str = [f"{opt:.2f}" for opt in options] + correct_str = f"{correct_rounded:.2f}" + + return options_str, correct_str + + def generate_question(self, level: str) -> Question: + """生成单个选择题""" + while True: + expr, answer = self.generate_expression(level) + + if level == "primary" and answer < 0: + continue + # 避免重复题目 + if expr not in [q.content for q in self.generated_questions]: + break + + options, correct = self.generate_options(answer) + question = Question(f"计算:{expr}", options, correct) + self.generated_questions.append(question) + return question + + def generate_paper(self, level: str, count: int) -> List[Question]: + """生成试卷""" + self.generated_questions = [] # 清空历史 + return [self.generate_question(level) for _ in range(count)] + + def calculate_score(self, answers: List[bool]) -> int: + """计算得分(正确率百分比)""" + correct_count = sum(1 for a in answers if a) + return round((correct_count / len(answers)) * 100) if answers else 0 \ No newline at end of file diff --git a/src/core/user_system.py b/src/core/user_system.py new file mode 100644 index 0000000..f175d27 --- /dev/null +++ b/src/core/user_system.py @@ -0,0 +1,90 @@ +import re +import json +import os +from typing import Dict, Optional +from .email_service import EmailService + +# 项目根目录 -> src所在的路径 +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +USER_DATA_FILE = os.path.join(PROJECT_ROOT, "users.json") # 绝对路径 + +class UserSystem: + def __init__(self, email_service: EmailService): + self.email_service = email_service + self.users: Dict[str, dict] = self.load_users() # {email: {"password": "...", "level": None}} + self.current_user: Optional[str] = None # 当前登录用户邮箱 + self.current_level: Optional[str] = None + + def load_users(self) -> Dict[str, dict]: + """从文件加载用户数据""" + if os.path.exists(USER_DATA_FILE): + with open(USER_DATA_FILE, "r", encoding="utf-8") as f: + return json.load(f) + return {} + + def save_users(self): + """保存用户数据到文件""" + with open(USER_DATA_FILE, "w", encoding="utf-8") as f: + json.dump(self.users, f, ensure_ascii=False, indent=4) + + def is_valid_email(self, email: str) -> bool: + """验证邮箱格式""" + pattern = r"^[\w\.-]+@[\w\.-]+\.\w+$" + return re.match(pattern, email) is not None + + def is_valid_password(self, password: str) -> bool: + """密码要求:6-10位,包含大小写字母和数字""" + if len(password) < 6 or len(password) > 10: + return False + has_upper = any(c.isupper() for c in password) + has_lower = any(c.islower() for c in password) + has_digit = any(c.isdigit() for c in password) + return has_upper and has_lower and has_digit + + def send_verification(self, email: str) -> bool: + """发送验证码(邮箱未注册才可以)""" + if email in self.users: + return False # 邮箱已注册 + return self.email_service.send_verification_code(email) + + def register(self, email: str, code: str, password: str) -> bool: + """注册新用户""" + if email in self.users: + return False + if not self.is_valid_password(password): + return False + if not self.email_service.verify_code(email, code): + return False + + self.users[email] = {"password": password, "level": None} + self.save_users() + return True + + def login(self, email: str, password: str) -> bool: + """登录验证""" + user = self.users.get(email) + if user and user["password"] == password: + self.current_user = email + self.current_level = user.get("level") + return True + return False + + def change_password(self, old_password: str, new_password: str) -> bool: + """修改当前登录用户密码""" + if not self.current_user: + return False + user = self.users.get(self.current_user) + if not user or user["password"] != old_password: + return False + if not self.is_valid_password(new_password): + return False + user["password"] = new_password + self.save_users() + return True + + def set_level(self, level: str): + """设置当前用户学段""" + if self.current_user: + self.users[self.current_user]["level"] = level + self.current_level = level + self.save_users() \ No newline at end of file -- 2.34.1 From 8539729c35800d3c30e5ffe8de539f1207780c8c Mon Sep 17 00:00:00 2001 From: hnu202326010322 <1463365450@qq.com> Date: Sun, 12 Oct 2025 16:12:24 +0800 Subject: [PATCH 04/11] Update data_handler.py --- src/core/data_handler.py | 104 +++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/src/core/data_handler.py b/src/core/data_handler.py index 9168701..59d8d28 100644 --- a/src/core/data_handler.py +++ b/src/core/data_handler.py @@ -1,53 +1,53 @@ -import json -import os -from pathlib import Path - - -class DataHandler: - def __init__(self): - self.base_dir = Path("math_learning_data") - self.users_file = self.base_dir / "users.json" - self.ensure_data_dir() - - def ensure_data_dir(self): - """确保数据目录和文件存在,并且文件内容是有效的JSON""" - if not self.base_dir.exists(): - self.base_dir.mkdir(parents=True) - - if not self.users_file.exists(): - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump({}, f, ensure_ascii=False) # 写入空对象 - else: - # 如果文件存在但为空,写入空对象 - if os.path.getsize(self.users_file) == 0: - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump({}, f, ensure_ascii=False) - - def load_users(self): - """加载所有用户数据""" - with open(self.users_file, "r", encoding="utf-8") as f: - try: - return json.load(f) - except json.JSONDecodeError: - # 如果文件内容损坏,返回空字典 - return {} - - def save_users(self, users_data): - """保存用户数据""" - with open(self.users_file, "w", encoding="utf-8") as f: - json.dump(users_data, f, ensure_ascii=False, indent=2) - - def get_user(self, email): - """获取用户信息""" - users = self.load_users() - return users.get(email) - - def add_user(self, email, user_data): - """添加新用户""" - users = self.load_users() - users[email] = user_data - self.save_users(users) - - def update_user(self, email, user_data): - """更新用户信息""" +import json +import os +from pathlib import Path + + +class DataHandler: + def __init__(self): + self.base_dir = Path("math_learning_data") + self.users_file = self.base_dir / "users.json" + self.ensure_data_dir() + + def ensure_data_dir(self): + """确保数据目录和文件存在,并且文件内容是有效的JSON""" + if not self.base_dir.exists(): + self.base_dir.mkdir(parents=True) + + if not self.users_file.exists(): + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) # 写入空对象 + else: + # 如果文件存在但为空,写入空对象 + if os.path.getsize(self.users_file) == 0: + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump({}, f, ensure_ascii=False) + + def load_users(self): + """加载所有用户数据""" + with open(self.users_file, "r", encoding="utf-8") as f: + try: + return json.load(f) + except json.JSONDecodeError: + # 如果文件内容损坏,返回空字典 + return {} + + def save_users(self, users_data): + """保存用户数据""" + with open(self.users_file, "w", encoding="utf-8") as f: + json.dump(users_data, f, ensure_ascii=False, indent=2) + + def get_user(self, email): + """获取用户信息""" + users = self.load_users() + return users.get(email) + + def add_user(self, email, user_data): + """添加新用户""" + users = self.load_users() + users[email] = user_data + self.save_users(users) + + def update_user(self, email, user_data): + """更新用户信息""" self.add_user(email, user_data) # 直接覆盖更新 \ No newline at end of file -- 2.34.1 From c7d0a4e427449de27ddacfba161cbe6f45d66289 Mon Sep 17 00:00:00 2001 From: hnu202326010322 <1463365450@qq.com> Date: Sun, 12 Oct 2025 16:13:18 +0800 Subject: [PATCH 05/11] Update email_service.py --- src/core/email_service.py | 276 +++++++++++++++++++++++--------------- 1 file changed, 166 insertions(+), 110 deletions(-) diff --git a/src/core/email_service.py b/src/core/email_service.py index 7ecdb82..ecca850 100644 --- a/src/core/email_service.py +++ b/src/core/email_service.py @@ -1,111 +1,167 @@ -import random -import time -import smtplib -import logging -from email.mime.text import MIMEText -from email.header import Header -from email.utils import formataddr -from typing import Dict -from smtplib import SMTPResponseException - -# 配置日志 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class EmailService: - def __init__(self, smtp_server: str, smtp_port: int, sender_email: str, sender_password: str): - self.smtp_server = smtp_server - self.smtp_port = smtp_port - self.sender_email = sender_email - self.sender_password = sender_password - self.verification_codes: Dict[str, tuple[str, float]] = {} # 邮箱: (验证码, 过期时间) - self.code_validity = 5 * 60 # 验证码有效期5分钟 - self.last_send_time: Dict[str, float] = {} # 记录上次发送时间 - self.send_interval = 60 # 发送间隔60秒 - - def generate_code(self) -> str: - """生成6位数字验证码""" - return str(random.randint(100000, 999999)) - - def can_send_code(self, email: str) -> bool: - """检查是否可以发送验证码(防止频繁发送)""" - if email in self.last_send_time: - elapsed = time.time() - self.last_send_time[email] - return elapsed >= self.send_interval - return True - - def send_verification_code(self, email: str) -> bool: - """发送验证码到邮箱""" - if not self.can_send_code(email): - logger.warning(f"发送过于频繁,邮箱: {email}") - return False - - code = self.generate_code() - expire_time = time.time() + self.code_validity - self.verification_codes[email] = (code, expire_time) - self.last_send_time[email] = time.time() - - # 邮件内容 - subject = "数学学习软件验证码" - - # 格式化邮箱显示,保护隐私 - try: - at_index = email.index('@') - formatted_email = email[:3] + '...' + email[at_index - 4:at_index] if at_index >= 4 else email - except ValueError: - formatted_email = email - - message = f"""【数学学习软件】您正在登录 {formatted_email} -您的验证码是:{code} -此验证码5分钟内有效,请尽快完成操作。 -如非本人操作,请忽略此邮件。""" - - try: - msg = MIMEText(message, "plain", "utf-8") - msg["Subject"] = Header(subject, "utf-8") - # 发件人用英文名称,避免QQ邮箱拒绝 - msg["From"] = formataddr(("MathSoftware", self.sender_email)) - msg["To"] = email - - logger.info(f"尝试发送邮件到 {email},SMTP服务器: {self.smtp_server}:{self.smtp_port}") - - with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: - server.set_debuglevel(1) - server.login(self.sender_email, self.sender_password) - server.sendmail(self.sender_email, email, msg.as_string()) - - logger.info(f"验证码已发送到 {email}") - return True - - except SMTPResponseException as e: - # 忽略 QQ 邮箱提前关闭连接的异常 - if e.code == -1 and e.message == b'\x00\x00\x00': - logger.info(f"邮件已成功发送到 {email} (忽略连接关闭异常)") - return True - logger.error(f"SMTP响应异常: {e}") - return False - - except smtplib.SMTPAuthenticationError: - logger.error("邮箱认证失败,请检查账号和授权码是否正确") - return False - - except smtplib.SMTPConnectError: - logger.error("无法连接到SMTP服务器,请检查服务器地址和端口") - return False - - except Exception as e: - logger.error(f"邮件发送失败: {str(e)}", exc_info=True) - return False - - def verify_code(self, email: str, code: str) -> bool: - """验证验证码是否有效""" - if email not in self.verification_codes: - return False - - stored_code, expire_time = self.verification_codes[email] - if time.time() > expire_time: - del self.verification_codes[email] - return False - +import random +import time +import smtplib +import logging +from email.mime.text import MIMEText +from email.header import Header +from email.utils import formataddr +from tkinter import messagebox +from typing import Dict +from smtplib import SMTPResponseException + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class EmailService: + def __init__(self, smtp_server: str, smtp_port: int, sender_email: str, sender_password: str): + self.smtp_server = smtp_server + self.smtp_port = smtp_port + self.sender_email = sender_email + self.sender_password = sender_password + self.verification_codes: Dict[str, tuple[str, float]] = {} # 邮箱: (验证码, 过期时间) + self.code_validity = 60 # 验证码有效期1分钟 + self.last_send_time: Dict[str, float] = {} # 记录上次发送时间 + self.send_interval = 60 # 发送间隔60秒 + + def generate_code(self) -> str: + """生成6位数字注册码""" + return str(random.randint(100000, 999999)) + + def can_send_code(self, email: str) -> bool: + """检查是否可以发送注册码(防止频繁发送)""" + if email in self.last_send_time: + elapsed = time.time() - self.last_send_time[email] + return elapsed >= self.send_interval + return True + + def send_verification_code(self, email: str, username: str = "用户") -> bool: + """发送验证码到用户邮箱。 + + Args: + email: 目标邮箱地址。 + username: 用户昵称,用于邮件正文。 + + Returns: + 成功返回 True,失败返回 False。 + """ + if not self.can_send_code(email): + logger.warning("发送过于频繁,邮箱: %s", email) + return False + + code = self.generate_code() + now = time.time() + self.verification_codes[email] = (code, now + self.code_validity) + self.last_send_time[email] = now + + subject = "数学学习软件 - 注册验证码" + formatted_email = self._format_email(email) + body = self._build_email_body(username, code) + + try: + self._send_email(email, subject, body) + logger.info("注册码已发送到 %s", email) + return True + except smtplib.SMTPAuthenticationError as e: + logger.error("邮箱认证失败: %s", e) + self._show_warning("邮箱认证失败", "请检查邮箱账号和授权码是否正确") + return False + except smtplib.SMTPConnectError as e: + logger.error("无法连接到SMTP服务器: %s", e) + self._show_warning("连接失败", "无法连接到邮件服务器,请检查网络连接") + return False + except Exception as e: + logger.error("邮件发送失败: %s", e, exc_info=True) + self._show_warning("发送失败", f"邮件发送失败: {e}") + return False + + + def _format_email(self, email: str) -> str: + """隐藏部分邮箱字符,保护隐私。""" + try: + at_index = email.index('@') + if at_index >= 4: + return email[:3] + '...' + email[at_index - 4:at_index] + return email + except ValueError: + return email + + + def _build_email_body(self, username: str, code: str) -> str: + """构建HTML邮件正文。""" + return f""" + +
+注册验证码
+亲爱的 {username},您好!
+您正在注册数学学习软件,请使用以下注册码完成注册:
++ ⚠️ 注册码有效期1分钟,请尽快完成注册。 +
+如果这不是您的操作,请忽略此邮件。
+此为系统邮件,请勿回复
+数学学习软件团队
+