import bcrypt import time from datetime import datetime, timedelta, timezone from config import Config from models import UserToken, db def get_beijing_now(): """获取当前北京时间(UTC+8),用于数据库存储""" from datetime import timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) utc_now = datetime.now(timezone.utc) beijing_now = utc_now.astimezone(beijing_tz) return beijing_now.replace(tzinfo=None) # 使用 PyJWT(确保安装的是 PyJWT 而不是其他 jwt 包) # 如果导入失败或不是正确的模块,会给出明确的错误提示 try: import jwt # 验证是否是PyJWT(检查是否有encode和decode方法) if not hasattr(jwt, 'encode') or not hasattr(jwt, 'decode'): raise ImportError( "导入的jwt模块不正确。请卸载错误的jwt包并安装PyJWT:\n" " pip uninstall jwt\n" " pip install PyJWT" ) except ImportError as e: raise ImportError( f"无法导入PyJWT模块: {e}\n" "请安装 PyJWT: pip install PyJWT" ) def hash_password(password): """加密密码""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def verify_password(password, hashed): """验证密码""" return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def generate_token(user_id, username): """生成JWT Token""" expire_time = int(time.time()) + int(Config.JWT_EXPIRATION_DELTA.total_seconds()) payload = { 'user_id': user_id, 'username': username, 'exp': expire_time } token = jwt.encode(payload, Config.SECRET_KEY, algorithm='HS256') # 保存Token到数据库 user_token = UserToken( user_id=user_id, token=token, expire_time=expire_time ) db.session.add(user_token) db.session.commit() return token, expire_time def verify_token(token): """验证Token""" try: payload = jwt.decode(token, Config.SECRET_KEY, algorithms=['HS256']) user_id = payload.get('user_id') # 检查Token是否在数据库中且未过期 user_token = UserToken.query.filter_by( token=token, user_id=user_id, deleted_at=None ).first() if not user_token: return None if user_token.expire_time < int(time.time()): return None return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def revoke_token(token): """撤销Token""" user_token = UserToken.query.filter_by(token=token, deleted_at=None).first() if user_token: user_token.deleted_at = get_beijing_now() db.session.commit()