from flask_sqlalchemy import SQLAlchemy from datetime import datetime, timezone, timedelta db = SQLAlchemy() # 北京时间时区(UTC+8) BEIJING_TIMEZONE = timezone(timedelta(hours=8)) def get_beijing_now(): """获取当前北京时间(UTC+8),用于数据库存储和显示""" # 获取UTC时间,然后转换为北京时间 utc_now = datetime.now(timezone.utc) beijing_now = utc_now.astimezone(BEIJING_TIMEZONE) # 返回naive datetime(去掉时区信息),因为MySQL DATETIME不支持时区 return beijing_now.replace(tzinfo=None) def get_utc_now(): """获取当前UTC时间,用于证书生成(证书标准要求使用UTC时间)""" return datetime.utcnow() def format_datetime_for_api(dt): """格式化datetime为API返回格式,假设数据库中的时间是北京时间""" if dt is None: return None # 数据库中的时间是北京时间(naive),添加北京时间时区标识 # 这样前端JavaScript的Date对象能正确识别为北京时间 if dt.tzinfo is None: # 假设是北京时间,添加时区标识 dt = dt.replace(tzinfo=BEIJING_TIMEZONE) # 返回ISO格式字符串,包含时区信息(+08:00) return dt.isoformat() class BaseModel(db.Model): """基础模型类""" __abstract__ = True id = db.Column(db.Integer, primary_key=True, autoincrement=True) created_at = db.Column(db.DateTime, default=get_beijing_now, comment='创建时间(北京时间)') updated_at = db.Column(db.DateTime, default=get_beijing_now, onupdate=get_beijing_now, comment='更新时间(北京时间)') deleted_at = db.Column(db.DateTime, nullable=True, comment='删除时间(北京时间)') class User(BaseModel): """用户模型""" __tablename__ = 'users' username = db.Column(db.String(16), unique=True, nullable=False, comment='用户名') password = db.Column(db.String(255), nullable=False, comment='密码哈希') email = db.Column(db.String(255), nullable=True, comment='邮箱') authority = db.Column(db.Integer, default=0, comment='权限,1表示系统管理员') def to_dict(self): return { 'id': self.id, 'username': self.username, 'email': self.email, 'authority': self.authority, 'created_at': format_datetime_for_api(self.created_at) } class UserToken(BaseModel): """用户Token模型""" __tablename__ = 'user_tokens' user_id = db.Column(db.Integer, nullable=False, comment='用户ID') token = db.Column(db.String(512), nullable=False, comment='Token') # JWT token通常需要512字符 expire_time = db.Column(db.BigInteger, nullable=False, comment='过期时间戳') class CARequest(BaseModel): """CA证书请求模型""" __tablename__ = 'ca_requests' user_id = db.Column(db.Integer, nullable=False, comment='申请证书的用户ID') state = db.Column(db.Integer, nullable=False, default=1, comment='证书状态(1:待审核,2:审核通过,3:审核未通过)') public_key = db.Column(db.Text, nullable=False, comment='公钥') csr_content = db.Column(db.Text, nullable=True, comment='CSR文件内容') country = db.Column(db.String(20), nullable=True, comment='国家') province = db.Column(db.String(255), nullable=True, comment='省市') locality = db.Column(db.String(255), nullable=True, comment='地区') organization = db.Column(db.String(255), nullable=True, comment='组织') organization_unit_name = db.Column(db.String(255), nullable=True, comment='部门') common_name = db.Column(db.String(255), nullable=False, comment='域名') email_address = db.Column(db.String(255), nullable=True, comment='邮箱') def to_dict(self): return { 'id': self.id, 'user_id': self.user_id, 'state': self.state, 'state_text': {1: '待审核', 2: '审核通过', 3: '审核未通过'}.get(self.state, '未知'), 'country': self.country, 'province': self.province, 'locality': self.locality, 'organization': self.organization, 'organization_unit_name': self.organization_unit_name, 'common_name': self.common_name, 'email_address': self.email_address, 'created_at': format_datetime_for_api(self.created_at), 'updated_at': format_datetime_for_api(self.updated_at) } class Certificate(BaseModel): """证书模型""" __tablename__ = 'certificates' user_id = db.Column(db.Integer, nullable=False, comment='证书拥有者ID') state = db.Column(db.Integer, nullable=False, default=1, comment='状态(1代表在使用中,2代表已撤销或过期)') request_id = db.Column(db.Integer, nullable=False, comment='证书请求ID') expire_time = db.Column(db.DateTime, nullable=False, comment='过期时间') certificate_content = db.Column(db.Text, nullable=True, comment='证书内容(PEM格式)') serial_number = db.Column(db.String(64), nullable=True, comment='证书序列号') def to_dict(self): # 判断证书状态文本 if self.state == 1: state_text = '使用中' elif self.state == 2: # state=2 表示已吊销(手动吊销或过期) # 检查是否过期(如果过期时间已过,说明是过期导致的,否则是手动吊销) from datetime import datetime, timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) beijing_now = datetime.now(timezone.utc).astimezone(beijing_tz).replace(tzinfo=None) if self.expire_time and self.expire_time < beijing_now: state_text = '已过期' else: state_text = '已吊销' else: state_text = '未知' return { 'id': self.id, 'user_id': self.user_id, 'state': self.state, 'state_text': state_text, 'request_id': self.request_id, 'expire_time': format_datetime_for_api(self.expire_time), 'serial_number': self.serial_number, 'created_at': format_datetime_for_api(self.created_at) } class CRL(BaseModel): """证书吊销列表模型""" __tablename__ = 'crls' certificate_id = db.Column(db.Integer, nullable=False, comment='证书ID') input_time = db.Column(db.BigInteger, nullable=False, comment='加入时间戳')