总结合并 #9

Merged
ppy4sjqvf merged 5 commits from ybw-branch into develop 1 month ago

8
.gitignore vendored

@ -26,4 +26,10 @@ uploads/
*.txt
# vscode 配置
.vscode/
.vscode/
#github 工作流配置
.github/
#pycharm 配置
.idea/

@ -26,4 +26,10 @@ uploads/
*.txt
# 模型文件
hf_models/
hf_models/
#数据库迁移文件
migrations/
#测试文件
test/

@ -14,6 +14,7 @@ from config.settings import Config
db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()
mail = Mail()
def create_app(config_class=Config):
"""Flask应用工厂函数"""
@ -24,6 +25,7 @@ def create_app(config_class=Config):
db.init_app(app)
migrate.init_app(app, db)
jwt.init_app(app)
mail.init_app(app)
CORS(app)
# 注册蓝图

@ -8,12 +8,14 @@ from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from flask_mail import Mail
import os
# 初始化扩展
db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()
mail = Mail()
cors = CORS()
def create_app(config_name=None):
@ -35,6 +37,7 @@ def create_app(config_name=None):
migrate.init_app(app, db)
jwt.init_app(app)
cors.init_app(app)
mail.init_app(app)
# 注册蓝图
from app.controllers.auth_controller import auth_bp

@ -19,7 +19,7 @@ def admin_required(f):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user or user.role.code != 'admin':
if not user or user.role.role_code != 'admin':
return jsonify({'error': '需要管理员权限'}), 403
return f(*args, **kwargs)
@ -85,16 +85,11 @@ def create_user():
username = data.get('username')
password = data.get('password')
email = data.get('email')
role_code = data.get('role', 'user')
current_user_id = get_jwt_identity()
current_user = User.query.get(current_user_id)
current_user_role = current_user.role.code
if current_user.role == 'admin':
role = data.get('role', 'user')
else:
role = 'user'
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
@ -154,7 +149,7 @@ def update_user(user_id):
user.email = new_email
if 'role' in data:
user.role = user.role_to_id(data['role'])
user.role_id = user.role_to_id(data['role'])
if 'is_active' in data:
user.is_active = bool(data['is_active'])

@ -9,6 +9,7 @@ from app import db
from app.database import User, UserConfig
from functools import wraps
import re
from app.services.email_service import send_verification_code, verify_code
def int_jwt_required(f):
"""获取JWT身份并转换为整数的装饰器"""
@ -23,6 +24,12 @@ def int_jwt_required(f):
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/code', methods=['GET'])
def send_email_verification_code(email: str = "3310207578@qq.com", purpose: str = 'register'):
email = "3310207578@qq.com"
send_verification_code(email, purpose=purpose)
return jsonify({'message': '验证码已发送'}), 200
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册"""
@ -31,7 +38,7 @@ def register():
username = data.get('username')
password = data.get('password')
email = data.get('email')
code = data.get('code')
# 验证输入
if not username or not password or not email:
return jsonify({'error': '用户名、密码和邮箱不能为空'}), 400
@ -48,7 +55,11 @@ def register():
# 检查邮箱是否已注册
if User.query.filter_by(email=email).first():
return jsonify({'error': '该邮箱已被注册,同一邮箱只能注册一次'}), 400
# 验证验证码
if not code or not verify_code(email, code, purpose='register'):
return jsonify({'error': '验证码无效或已过期'}), 400
# 创建用户
user = User(username=username, email=email)
user.set_password(password)

Binary file not shown.

@ -0,0 +1,132 @@
"""
验证码服务Redis 存储 + Flask-Mail 发送
提供
- `send_verification_code(email, purpose='register', length=6, expire_seconds=None)`
- `verify_code(email, code, purpose='register')`
依赖
- `redis`通过 `REDIS_URL` 配置默认为 redis://localhost:6379/0
- Flask-Mail 已在应用中初始化通过 `current_app.extensions['mail']` 获取
"""
import random
import string
import logging
from typing import Optional
import redis
from flask import current_app
from flask_mail import Message
logger = logging.getLogger(__name__)
pool = redis.ConnectionPool().from_url('redis://localhost:6379/0', decode_responses=True)
def _get_redis_client() -> redis.Redis:
"""根据 REDIS_URL 创建 redis 客户端"""
redis_url = current_app.config.get('REDIS_URL', 'redis://localhost:6379/0')
# 不再使用全局的 pool而是每次根据 URL 获取连接redis-py 内部自己会管理连接池)
# 或者如果你想复用连接池,应该在 app 启动时初始化 pool而不是在全局写死 localhost
return redis.Redis.from_url(redis_url, decode_responses=True)
def _generate_code(length: int = 6) -> str:
"""生成指定长度的数字验证码(默认 6 位)。"""
# 只产生数字字符串更常用于验证码
return ''.join(random.choices(string.digits, k=length))
def send_verification_code(email: str,
purpose: str = 'register',
length: int = 6,
expire_seconds: Optional[int] = None) -> bool:
"""生成验证码,保存到 Redis并使用 Flask-Mail 发送给 `email`。
返回 True 表示发送成功False 表示失败或抛出异常时捕获后返回 False
"""
# 读取过期时间(秒),优先使用传入值,其次使用 app config
if expire_seconds is None:
expire_seconds = current_app.config.get('VERIFICATION_CODE_EXPIRES', 300)
code = _generate_code(length)
key = f"verify:{purpose}:{email}"
try:
r = _get_redis_client()
# 使用字符串保存验证码,并设置过期时间
r.set(key, code, ex=expire_seconds)
except Exception as e:
logger.exception("保存验证码到 Redis 失败")
return False
# 尝试发送邮件
try:
mail = current_app.extensions.get('mail')
subject = "您的验证码" # current_app.config.get('VERIFICATION_EMAIL_SUBJECT', '您的验证码')
sender = '1798231811@qq.com' # current_app.config.get('MAIL_DEFAULT_SENDER') or current_app.config.get('MAIL_USERNAME')
body = f'您的验证码为:{code},有效期 {expire_seconds} 秒。' # current_app.config.get('VERIFICATION_EMAIL_TEMPLATE',
# f'您的验证码为:{code},有效期 {expire_seconds} 秒。')
# 优先使用简单文本邮件,项目中可按需替换为 HTML 模板
msg = Message(subject=subject, recipients=[email], body=body, sender=sender)
if mail is None:
# 如果 Flask-Mail 未初始化,记录日志并返回 False
logger.error('Flask-Mail 未初始化,无法发送邮件')
return False
mail.send(msg)
logger.info('已发送验证码到 %s (purpose=%s)', email, purpose)
return True
except Exception:
logger.exception('发送验证码邮件失败')
return False
def verify_code(email: str, code: str, purpose: str = 'register') -> bool:
"""校验验证码是否正确。成功可配置是否从 Redis 删除该 key。
返回 True 表示校验通过False 表示失败或异常
"""
key = f"verify:{purpose}:{email}"
try:
r = _get_redis_client()
stored = r.get(key)
if stored is None:
return False
matched = (str(stored) == str(code))
if matched :
try:
r.delete(key)
except Exception:
logger.warning('校验成功,但删除 Redis key 失败: %s', key)
return matched
except Exception:
logger.exception('校验验证码时发生异常')
return False
def clear_verification_code(email: str, purpose: str = 'register') -> bool:
"""显式删除指定 email 的验证码(例如用于管理员撤销)。"""
key = f"verify:{purpose}:{email}"
try:
r = _get_redis_client()
return r.delete(key) == 1
except Exception:
logger.exception('删除验证码失败')
return False
if __name__ == '__main__':
# 简单测试发送和验证功能
test_email = "3310207578@qq.com"
if send_verification_code(test_email, expire_seconds=600):
print("验证码发送成功")
code = input("请输入收到的验证码: ")
if verify_code(test_email, code):
print("验证码验证成功")
else:
print("验证码验证失败")
else:
print("验证码发送失败")

@ -40,6 +40,17 @@ class Config:
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=24)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
# 邮件服务
MAIL_SERVER = os.environ.get('MAIL_SERVER')
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 465)
MAIL_USE_SSL = os.environ.get('MAIL_USE_SSL', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER') or MAIL_USERNAME
# redis配置
REDIS_URL = os.environ.get('REDIS_URL')
# 静态文件根目录
STATIC_ROOT = 'static'
@ -70,13 +81,6 @@ class Config:
DEMO_PERTURBED_FOLDER = os.path.join(DEMO_IMAGES_FOLDER, 'perturbed') # 演示加噪图片
DEMO_COMPARISONS_FOLDER = os.path.join(DEMO_IMAGES_FOLDER, 'comparisons') # 演示对比图
# 邮件配置(用于注册验证)
MAIL_SERVER = os.environ.get('MAIL_SERVER') or 'smtp.gmail.com'
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 587)
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
# 算法配置
ALGORITHMS = {
'simac': {

Loading…
Cancel
Save