|
|
from flask_sqlalchemy import SQLAlchemy
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
|
|
db = SQLAlchemy()
|
|
|
|
|
|
# 北京时间时区(UTC+8)
|
|
|
BEIJING_TIMEZONE = timezone(timedelta(hours=8))
|
|
|
|
|
|
def get_beijing_now():
|
|
|
"""获取当前北京时间(UTC+8),用于数据库存储和显示"""
|
|
|
# 获取UTC时间,然后转换为北京时间
|
|
|
utc_now = datetime.now(timezone.utc)
|
|
|
beijing_now = utc_now.astimezone(BEIJING_TIMEZONE)
|
|
|
# 返回naive datetime(去掉时区信息),因为MySQL DATETIME不支持时区
|
|
|
return beijing_now.replace(tzinfo=None)
|
|
|
|
|
|
def get_utc_now():
|
|
|
"""获取当前UTC时间,用于证书生成(证书标准要求使用UTC时间)"""
|
|
|
return datetime.utcnow()
|
|
|
|
|
|
def format_datetime_for_api(dt):
|
|
|
"""格式化datetime为API返回格式,假设数据库中的时间是北京时间"""
|
|
|
if dt is None:
|
|
|
return None
|
|
|
# 数据库中的时间是北京时间(naive),添加北京时间时区标识
|
|
|
# 这样前端JavaScript的Date对象能正确识别为北京时间
|
|
|
if dt.tzinfo is None:
|
|
|
# 假设是北京时间,添加时区标识
|
|
|
dt = dt.replace(tzinfo=BEIJING_TIMEZONE)
|
|
|
# 返回ISO格式字符串,包含时区信息(+08:00)
|
|
|
return dt.isoformat()
|
|
|
|
|
|
class BaseModel(db.Model):
|
|
|
"""基础模型类"""
|
|
|
__abstract__ = True
|
|
|
|
|
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
|
created_at = db.Column(db.DateTime, default=get_beijing_now, comment='创建时间(北京时间)')
|
|
|
updated_at = db.Column(db.DateTime, default=get_beijing_now, onupdate=get_beijing_now, comment='更新时间(北京时间)')
|
|
|
deleted_at = db.Column(db.DateTime, nullable=True, comment='删除时间(北京时间)')
|
|
|
|
|
|
class User(BaseModel):
|
|
|
"""用户模型"""
|
|
|
__tablename__ = 'users'
|
|
|
|
|
|
username = db.Column(db.String(16), unique=True, nullable=False, comment='用户名')
|
|
|
password = db.Column(db.String(255), nullable=False, comment='密码哈希')
|
|
|
email = db.Column(db.String(255), nullable=True, comment='邮箱')
|
|
|
authority = db.Column(db.Integer, default=0, comment='权限,1表示系统管理员')
|
|
|
|
|
|
def to_dict(self):
|
|
|
return {
|
|
|
'id': self.id,
|
|
|
'username': self.username,
|
|
|
'email': self.email,
|
|
|
'authority': self.authority,
|
|
|
'created_at': format_datetime_for_api(self.created_at)
|
|
|
}
|
|
|
|
|
|
class UserToken(BaseModel):
|
|
|
"""用户Token模型"""
|
|
|
__tablename__ = 'user_tokens'
|
|
|
|
|
|
user_id = db.Column(db.Integer, nullable=False, comment='用户ID')
|
|
|
token = db.Column(db.String(512), nullable=False, comment='Token') # JWT token通常需要512字符
|
|
|
expire_time = db.Column(db.BigInteger, nullable=False, comment='过期时间戳')
|
|
|
|
|
|
class CARequest(BaseModel):
|
|
|
"""CA证书请求模型"""
|
|
|
__tablename__ = 'ca_requests'
|
|
|
|
|
|
user_id = db.Column(db.Integer, nullable=False, comment='申请证书的用户ID')
|
|
|
state = db.Column(db.Integer, nullable=False, default=1, comment='证书状态(1:待审核,2:审核通过,3:审核未通过)')
|
|
|
public_key = db.Column(db.Text, nullable=False, comment='公钥')
|
|
|
csr_content = db.Column(db.Text, nullable=True, comment='CSR文件内容')
|
|
|
country = db.Column(db.String(20), nullable=True, comment='国家')
|
|
|
province = db.Column(db.String(255), nullable=True, comment='省市')
|
|
|
locality = db.Column(db.String(255), nullable=True, comment='地区')
|
|
|
organization = db.Column(db.String(255), nullable=True, comment='组织')
|
|
|
organization_unit_name = db.Column(db.String(255), nullable=True, comment='部门')
|
|
|
common_name = db.Column(db.String(255), nullable=False, comment='域名')
|
|
|
email_address = db.Column(db.String(255), nullable=True, comment='邮箱')
|
|
|
|
|
|
def to_dict(self):
|
|
|
return {
|
|
|
'id': self.id,
|
|
|
'user_id': self.user_id,
|
|
|
'state': self.state,
|
|
|
'state_text': {1: '待审核', 2: '审核通过', 3: '审核未通过'}.get(self.state, '未知'),
|
|
|
'country': self.country,
|
|
|
'province': self.province,
|
|
|
'locality': self.locality,
|
|
|
'organization': self.organization,
|
|
|
'organization_unit_name': self.organization_unit_name,
|
|
|
'common_name': self.common_name,
|
|
|
'email_address': self.email_address,
|
|
|
'created_at': format_datetime_for_api(self.created_at),
|
|
|
'updated_at': format_datetime_for_api(self.updated_at)
|
|
|
}
|
|
|
|
|
|
class Certificate(BaseModel):
|
|
|
"""证书模型"""
|
|
|
__tablename__ = 'certificates'
|
|
|
|
|
|
user_id = db.Column(db.Integer, nullable=False, comment='证书拥有者ID')
|
|
|
state = db.Column(db.Integer, nullable=False, default=1, comment='状态(1代表在使用中,2代表已撤销或过期)')
|
|
|
request_id = db.Column(db.Integer, nullable=False, comment='证书请求ID')
|
|
|
expire_time = db.Column(db.DateTime, nullable=False, comment='过期时间')
|
|
|
certificate_content = db.Column(db.Text, nullable=True, comment='证书内容(PEM格式)')
|
|
|
serial_number = db.Column(db.String(64), nullable=True, comment='证书序列号')
|
|
|
|
|
|
def to_dict(self):
|
|
|
# 判断证书状态文本
|
|
|
if self.state == 1:
|
|
|
state_text = '使用中'
|
|
|
elif self.state == 2:
|
|
|
# state=2 表示已吊销(手动吊销或过期)
|
|
|
# 检查是否过期(如果过期时间已过,说明是过期导致的,否则是手动吊销)
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
|
beijing_now = datetime.now(timezone.utc).astimezone(beijing_tz).replace(tzinfo=None)
|
|
|
if self.expire_time and self.expire_time < beijing_now:
|
|
|
state_text = '已过期'
|
|
|
else:
|
|
|
state_text = '已吊销'
|
|
|
else:
|
|
|
state_text = '未知'
|
|
|
|
|
|
return {
|
|
|
'id': self.id,
|
|
|
'user_id': self.user_id,
|
|
|
'state': self.state,
|
|
|
'state_text': state_text,
|
|
|
'request_id': self.request_id,
|
|
|
'expire_time': format_datetime_for_api(self.expire_time),
|
|
|
'serial_number': self.serial_number,
|
|
|
'created_at': format_datetime_for_api(self.created_at)
|
|
|
}
|
|
|
|
|
|
class CRL(BaseModel):
|
|
|
"""证书吊销列表模型"""
|
|
|
__tablename__ = 'crls'
|
|
|
|
|
|
certificate_id = db.Column(db.Integer, nullable=False, comment='证书ID')
|
|
|
input_time = db.Column(db.BigInteger, nullable=False, comment='加入时间戳') |