You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

145 lines
6.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timezone, timedelta
db = SQLAlchemy()
# 北京时间时区UTC+8
BEIJING_TIMEZONE = timezone(timedelta(hours=8))
def get_beijing_now():
"""获取当前北京时间UTC+8用于数据库存储和显示"""
# 获取UTC时间然后转换为北京时间
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(BEIJING_TIMEZONE)
# 返回naive datetime去掉时区信息因为MySQL DATETIME不支持时区
return beijing_now.replace(tzinfo=None)
def get_utc_now():
"""获取当前UTC时间用于证书生成证书标准要求使用UTC时间"""
return datetime.utcnow()
def format_datetime_for_api(dt):
"""格式化datetime为API返回格式假设数据库中的时间是北京时间"""
if dt is None:
return None
# 数据库中的时间是北京时间naive添加北京时间时区标识
# 这样前端JavaScript的Date对象能正确识别为北京时间
if dt.tzinfo is None:
# 假设是北京时间,添加时区标识
dt = dt.replace(tzinfo=BEIJING_TIMEZONE)
# 返回ISO格式字符串包含时区信息+08:00
return dt.isoformat()
class BaseModel(db.Model):
"""基础模型类"""
__abstract__ = True
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
created_at = db.Column(db.DateTime, default=get_beijing_now, comment='创建时间(北京时间)')
updated_at = db.Column(db.DateTime, default=get_beijing_now, onupdate=get_beijing_now, comment='更新时间(北京时间)')
deleted_at = db.Column(db.DateTime, nullable=True, comment='删除时间(北京时间)')
class User(BaseModel):
"""用户模型"""
__tablename__ = 'users'
username = db.Column(db.String(16), unique=True, nullable=False, comment='用户名')
password = db.Column(db.String(255), nullable=False, comment='密码哈希')
email = db.Column(db.String(255), nullable=True, comment='邮箱')
authority = db.Column(db.Integer, default=0, comment='权限1表示系统管理员')
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'authority': self.authority,
'created_at': format_datetime_for_api(self.created_at)
}
class UserToken(BaseModel):
"""用户Token模型"""
__tablename__ = 'user_tokens'
user_id = db.Column(db.Integer, nullable=False, comment='用户ID')
token = db.Column(db.String(512), nullable=False, comment='Token') # JWT token通常需要512字符
expire_time = db.Column(db.BigInteger, nullable=False, comment='过期时间戳')
class CARequest(BaseModel):
"""CA证书请求模型"""
__tablename__ = 'ca_requests'
user_id = db.Column(db.Integer, nullable=False, comment='申请证书的用户ID')
state = db.Column(db.Integer, nullable=False, default=1, comment='证书状态1待审核2审核通过3审核未通过')
public_key = db.Column(db.Text, nullable=False, comment='公钥')
csr_content = db.Column(db.Text, nullable=True, comment='CSR文件内容')
country = db.Column(db.String(20), nullable=True, comment='国家')
province = db.Column(db.String(255), nullable=True, comment='省市')
locality = db.Column(db.String(255), nullable=True, comment='地区')
organization = db.Column(db.String(255), nullable=True, comment='组织')
organization_unit_name = db.Column(db.String(255), nullable=True, comment='部门')
common_name = db.Column(db.String(255), nullable=False, comment='域名')
email_address = db.Column(db.String(255), nullable=True, comment='邮箱')
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'state': self.state,
'state_text': {1: '待审核', 2: '审核通过', 3: '审核未通过'}.get(self.state, '未知'),
'country': self.country,
'province': self.province,
'locality': self.locality,
'organization': self.organization,
'organization_unit_name': self.organization_unit_name,
'common_name': self.common_name,
'email_address': self.email_address,
'created_at': format_datetime_for_api(self.created_at),
'updated_at': format_datetime_for_api(self.updated_at)
}
class Certificate(BaseModel):
"""证书模型"""
__tablename__ = 'certificates'
user_id = db.Column(db.Integer, nullable=False, comment='证书拥有者ID')
state = db.Column(db.Integer, nullable=False, default=1, comment='状态1代表在使用中2代表已撤销或过期')
request_id = db.Column(db.Integer, nullable=False, comment='证书请求ID')
expire_time = db.Column(db.DateTime, nullable=False, comment='过期时间')
certificate_content = db.Column(db.Text, nullable=True, comment='证书内容(PEM格式)')
serial_number = db.Column(db.String(64), nullable=True, comment='证书序列号')
def to_dict(self):
# 判断证书状态文本
if self.state == 1:
state_text = '使用中'
elif self.state == 2:
# state=2 表示已吊销(手动吊销或过期)
# 检查是否过期(如果过期时间已过,说明是过期导致的,否则是手动吊销)
from datetime import datetime, timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
beijing_now = datetime.now(timezone.utc).astimezone(beijing_tz).replace(tzinfo=None)
if self.expire_time and self.expire_time < beijing_now:
state_text = '已过期'
else:
state_text = '已吊销'
else:
state_text = '未知'
return {
'id': self.id,
'user_id': self.user_id,
'state': self.state,
'state_text': state_text,
'request_id': self.request_id,
'expire_time': format_datetime_for_api(self.expire_time),
'serial_number': self.serial_number,
'created_at': format_datetime_for_api(self.created_at)
}
class CRL(BaseModel):
"""证书吊销列表模型"""
__tablename__ = 'crls'
certificate_id = db.Column(db.Integer, nullable=False, comment='证书ID')
input_time = db.Column(db.BigInteger, nullable=False, comment='加入时间戳')