|
|
import bcrypt
|
|
|
import time
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
from config import Config
|
|
|
from models import UserToken, db
|
|
|
|
|
|
def get_beijing_now():
|
|
|
"""获取当前北京时间(UTC+8),用于数据库存储"""
|
|
|
from datetime import timezone, timedelta
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
|
utc_now = datetime.now(timezone.utc)
|
|
|
beijing_now = utc_now.astimezone(beijing_tz)
|
|
|
return beijing_now.replace(tzinfo=None)
|
|
|
|
|
|
# 使用 PyJWT(确保安装的是 PyJWT 而不是其他 jwt 包)
|
|
|
# 如果导入失败或不是正确的模块,会给出明确的错误提示
|
|
|
try:
|
|
|
import jwt
|
|
|
# 验证是否是PyJWT(检查是否有encode和decode方法)
|
|
|
if not hasattr(jwt, 'encode') or not hasattr(jwt, 'decode'):
|
|
|
raise ImportError(
|
|
|
"导入的jwt模块不正确。请卸载错误的jwt包并安装PyJWT:\n"
|
|
|
" pip uninstall jwt\n"
|
|
|
" pip install PyJWT"
|
|
|
)
|
|
|
except ImportError as e:
|
|
|
raise ImportError(
|
|
|
f"无法导入PyJWT模块: {e}\n"
|
|
|
"请安装 PyJWT: pip install PyJWT"
|
|
|
)
|
|
|
|
|
|
def hash_password(password):
|
|
|
"""加密密码"""
|
|
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
|
|
|
def verify_password(password, hashed):
|
|
|
"""验证密码"""
|
|
|
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
|
|
def generate_token(user_id, username):
|
|
|
"""生成JWT Token"""
|
|
|
expire_time = int(time.time()) + int(Config.JWT_EXPIRATION_DELTA.total_seconds())
|
|
|
payload = {
|
|
|
'user_id': user_id,
|
|
|
'username': username,
|
|
|
'exp': expire_time
|
|
|
}
|
|
|
token = jwt.encode(payload, Config.SECRET_KEY, algorithm='HS256')
|
|
|
|
|
|
# 保存Token到数据库
|
|
|
user_token = UserToken(
|
|
|
user_id=user_id,
|
|
|
token=token,
|
|
|
expire_time=expire_time
|
|
|
)
|
|
|
db.session.add(user_token)
|
|
|
db.session.commit()
|
|
|
|
|
|
return token, expire_time
|
|
|
|
|
|
def verify_token(token):
|
|
|
"""验证Token"""
|
|
|
try:
|
|
|
payload = jwt.decode(token, Config.SECRET_KEY, algorithms=['HS256'])
|
|
|
user_id = payload.get('user_id')
|
|
|
|
|
|
# 检查Token是否在数据库中且未过期
|
|
|
user_token = UserToken.query.filter_by(
|
|
|
token=token,
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not user_token:
|
|
|
return None
|
|
|
|
|
|
if user_token.expire_time < int(time.time()):
|
|
|
return None
|
|
|
|
|
|
return payload
|
|
|
except jwt.ExpiredSignatureError:
|
|
|
return None
|
|
|
except jwt.InvalidTokenError:
|
|
|
return None
|
|
|
|
|
|
def revoke_token(token):
|
|
|
"""撤销Token"""
|
|
|
user_token = UserToken.query.filter_by(token=token, deleted_at=None).first()
|
|
|
if user_token:
|
|
|
user_token.deleted_at = get_beijing_now()
|
|
|
db.session.commit()
|
|
|
|