You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import bcrypt
import time
from datetime import datetime, timedelta, timezone
from config import Config
from models import UserToken, db
def get_beijing_now():
"""获取当前北京时间UTC+8用于数据库存储"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(beijing_tz)
return beijing_now.replace(tzinfo=None)
# 使用 PyJWT确保安装的是 PyJWT 而不是其他 jwt 包)
# 如果导入失败或不是正确的模块,会给出明确的错误提示
try:
import jwt
# 验证是否是PyJWT检查是否有encode和decode方法
if not hasattr(jwt, 'encode') or not hasattr(jwt, 'decode'):
raise ImportError(
"导入的jwt模块不正确。请卸载错误的jwt包并安装PyJWT:\n"
" pip uninstall jwt\n"
" pip install PyJWT"
)
except ImportError as e:
raise ImportError(
f"无法导入PyJWT模块: {e}\n"
"请安装 PyJWT: pip install PyJWT"
)
def hash_password(password):
"""加密密码"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password, hashed):
"""验证密码"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def generate_token(user_id, username):
"""生成JWT Token"""
expire_time = int(time.time()) + int(Config.JWT_EXPIRATION_DELTA.total_seconds())
payload = {
'user_id': user_id,
'username': username,
'exp': expire_time
}
token = jwt.encode(payload, Config.SECRET_KEY, algorithm='HS256')
# 保存Token到数据库
user_token = UserToken(
user_id=user_id,
token=token,
expire_time=expire_time
)
db.session.add(user_token)
db.session.commit()
return token, expire_time
def verify_token(token):
"""验证Token"""
try:
payload = jwt.decode(token, Config.SECRET_KEY, algorithms=['HS256'])
user_id = payload.get('user_id')
# 检查Token是否在数据库中且未过期
user_token = UserToken.query.filter_by(
token=token,
user_id=user_id,
deleted_at=None
).first()
if not user_token:
return None
if user_token.expire_time < int(time.time()):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def revoke_token(token):
"""撤销Token"""
user_token = UserToken.query.filter_by(token=token, deleted_at=None).first()
if user_token:
user_token.deleted_at = get_beijing_now()
db.session.commit()