""" 用户认证控制器 处理注册、登录、密码修改等功能 """ from flask import Blueprint, request, jsonify from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity from app import db from app.database import User, UserConfig from functools import wraps import re def int_jwt_required(f): """获取JWT身份并转换为整数的装饰器""" @wraps(f) def wrapped(*args, **kwargs): try: current_user_id = int(get_jwt_identity()) return f(*args, current_user_id=current_user_id, **kwargs) except (TypeError, ValueError): return jsonify({'error': '无效的用户身份标识'}), 401 return jwt_required()(wrapped) auth_bp = Blueprint('auth', __name__) @auth_bp.route('/register', methods=['POST']) def register(): """用户注册""" try: data = request.get_json() username = data.get('username') password = data.get('password') email = data.get('email') # 验证输入 if not username or not password or not email: return jsonify({'error': '用户名、密码和邮箱不能为空'}), 400 # 验证邮箱格式 email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if not re.match(email_pattern, email): return jsonify({'error': '邮箱格式不正确'}), 400 # 检查用户名是否已存在 if User.query.filter_by(username=username).first(): return jsonify({'error': '用户名已存在'}), 400 # 检查邮箱是否已注册 if User.query.filter_by(email=email).first(): return jsonify({'error': '该邮箱已被注册,同一邮箱只能注册一次'}), 400 # 创建用户 user = User(username=username, email=email) user.set_password(password) db.session.add(user) db.session.commit() # 创建用户默认配置 user_config = UserConfig(user_id=user.id) db.session.add(user_config) db.session.commit() return jsonify({ 'message': '注册成功', 'user': user.to_dict() }), 201 except Exception as e: db.session.rollback() return jsonify({'error': f'注册失败: {str(e)}'}), 500 @auth_bp.route('/login', methods=['POST']) def login(): """用户登录""" try: data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': '用户名和密码不能为空'}), 400 # 查找用户 user = User.query.filter_by(username=username).first() if not user or not user.check_password(password): return jsonify({'error': '用户名或密码错误'}), 401 if not user.is_active: return jsonify({'error': '账户已被禁用'}), 401 # 创建访问令牌 - 确保用户ID为字符串类型 access_token = create_access_token(identity=str(user.id)) return jsonify({ 'message': '登录成功', 'access_token': access_token, 'user': user.to_dict() }), 200 except Exception as e: return jsonify({'error': f'登录失败: {str(e)}'}), 500 @auth_bp.route('/change-password', methods=['POST']) @int_jwt_required def change_password(current_user_id): """修改密码""" try: user = User.query.get(current_user_id) if not user: return jsonify({'error': '用户不存在'}), 404 data = request.get_json() old_password = data.get('old_password') new_password = data.get('new_password') if not old_password or not new_password: return jsonify({'error': '旧密码和新密码不能为空'}), 400 # 验证旧密码 if not user.check_password(old_password): return jsonify({'error': '旧密码错误'}), 401 # 设置新密码 user.set_password(new_password) db.session.commit() return jsonify({'message': '密码修改成功'}), 200 except Exception as e: db.session.rollback() return jsonify({'error': f'密码修改失败: {str(e)}'}), 500 @auth_bp.route('/profile', methods=['GET']) @int_jwt_required def get_profile(current_user_id): """获取用户信息""" try: user = User.query.get(current_user_id) if not user: return jsonify({'error': '用户不存在'}), 404 return jsonify({'user': user.to_dict()}), 200 except Exception as e: return jsonify({'error': f'获取用户信息失败: {str(e)}'}), 500 @auth_bp.route('/logout', methods=['POST']) @jwt_required() def logout(): """用户登出(客户端删除token即可)""" return jsonify({'message': '登出成功'}), 200