from flask import Blueprint, request, jsonify from models import CARequest, Certificate, CRL, User, db from utils.cert_utils import sign_certificate, sign_certificate_from_request, parse_certificate from middleware.auth_middleware import admin_required from datetime import datetime, timezone def get_beijing_now(): """获取当前北京时间(UTC+8),用于数据库存储""" from datetime import timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) utc_now = datetime.now(timezone.utc) beijing_now = utc_now.astimezone(beijing_tz) return beijing_now.replace(tzinfo=None) admin_bp = Blueprint('admin', __name__) @admin_bp.route('/requests', methods=['GET']) @admin_required def get_pending_requests(): """获取待审核的证书请求""" state = request.args.get('state', '1', type=int) # 默认为待审核 requests = CARequest.query.filter_by( state=state, deleted_at=None ).order_by(CARequest.created_at.desc()).all() request_list = [] for req in requests: req_dict = req.to_dict() # 获取用户信息 user = User.query.get(req.user_id) if user: req_dict['username'] = user.username # 检查是否已生成证书 cert = Certificate.query.filter_by( request_id=req.id, deleted_at=None ).first() req_dict['has_certificate'] = cert is not None if cert: req_dict['certificate_id'] = cert.id request_list.append(req_dict) return jsonify({ 'code': 200, 'data': request_list }), 200 @admin_bp.route('/request/', methods=['GET']) @admin_required def get_request_detail(request_id): """获取证书请求详细信息(包括证书信息)""" cert_request = CARequest.query.filter_by( id=request_id, deleted_at=None ).first() if not cert_request: return jsonify({'code': 404, 'message': '证书请求不存在'}), 404 req_dict = cert_request.to_dict() # 获取用户信息 user = User.query.get(cert_request.user_id) if user: req_dict['username'] = user.username # 获取证书信息(如果已签发) cert = Certificate.query.filter_by( request_id=request_id, deleted_at=None ).first() if cert: req_dict['certificate'] = cert.to_dict() # 解析证书内容 if cert.certificate_content: try: cert_info = parse_certificate(cert.certificate_content) req_dict['certificate']['certificate_info'] = cert_info except Exception as e: print(f'解析证书信息失败: {e}') return jsonify({ 'code': 200, 'data': req_dict }), 200 @admin_bp.route('/request//approve', methods=['POST']) @admin_required def approve_request(request_id): """同意证书申请""" cert_request = CARequest.query.filter_by( id=request_id, deleted_at=None ).first() if not cert_request: return jsonify({'code': 404, 'message': '证书请求不存在'}), 404 if cert_request.state != 1: return jsonify({'code': 400, 'message': '该请求已处理'}), 400 # 检查必要字段 if not cert_request.common_name: return jsonify({'code': 400, 'message': '证书请求缺少域名(common_name)'}), 400 if not cert_request.public_key: return jsonify({'code': 400, 'message': '证书请求缺少公钥'}), 400 try: # 如果有CSR内容,使用CSR签署证书 if cert_request.csr_content: cert_pem, serial_number, expire_time = sign_certificate( cert_request.csr_content, request_id ) else: # 如果没有CSR,根据请求信息直接生成证书 cert_pem, serial_number, expire_time = sign_certificate_from_request(cert_request) # 创建证书记录 # expire_time 是UTC时间(从证书中获取),需要转换为北京时间存储 from datetime import timezone, timedelta beijing_tz = timezone(timedelta(hours=8)) if expire_time.tzinfo is None: # 如果是naive datetime,假设是UTC时间 expire_time_utc = expire_time.replace(tzinfo=timezone.utc) else: expire_time_utc = expire_time.astimezone(timezone.utc) # 转换为北京时间并去掉时区信息(因为数据库存储naive datetime) expire_time_beijing = expire_time_utc.astimezone(beijing_tz).replace(tzinfo=None) certificate = Certificate( user_id=cert_request.user_id, state=1, # 使用中 request_id=request_id, expire_time=expire_time_beijing, # 存储为北京时间 certificate_content=cert_pem, serial_number=serial_number ) db.session.add(certificate) # 更新请求状态 cert_request.state = 2 # 审核通过 cert_request.updated_at = get_beijing_now() db.session.commit() return jsonify({ 'code': 200, 'message': '证书已签发', 'data': { 'certificate_id': certificate.id, 'serial_number': serial_number } }), 200 except Exception as e: db.session.rollback() import traceback traceback.print_exc() return jsonify({'code': 500, 'message': f'签发证书失败: {str(e)}'}), 500 @admin_bp.route('/request//reject', methods=['POST']) @admin_required def reject_request(request_id): """拒绝证书申请""" cert_request = CARequest.query.filter_by( id=request_id, deleted_at=None ).first() if not cert_request: return jsonify({'code': 404, 'message': '证书请求不存在'}), 404 if cert_request.state != 1: return jsonify({'code': 400, 'message': '该请求已处理'}), 400 # 更新请求状态 cert_request.state = 3 # 审核未通过 cert_request.updated_at = get_beijing_now() db.session.commit() return jsonify({ 'code': 200, 'message': '申请已拒绝' }), 200 @admin_bp.route('/certificates', methods=['GET']) @admin_required def get_all_certificates(): """获取所有证书""" certificates = Certificate.query.filter_by( deleted_at=None ).order_by(Certificate.created_at.desc()).all() cert_list = [] for cert in certificates: cert_dict = cert.to_dict() # 获取用户信息 user = User.query.get(cert.user_id) if user: cert_dict['username'] = user.username # 检查是否已过期(证书过期时间存储在数据库中,是北京时间) # 如果证书在使用中但已过期,更新状态 if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1: cert.state = 2 # 更新数据库状态为已过期 cert_dict['state'] = 2 cert_dict['state_text'] = '已过期' db.session.commit() cert_list.append(cert_dict) return jsonify({ 'code': 200, 'data': cert_list }), 200 @admin_bp.route('/certificate/', methods=['GET']) @admin_required def get_certificate_detail(cert_id): """获取证书详细信息""" cert = Certificate.query.filter_by( id=cert_id, deleted_at=None ).first() if not cert: return jsonify({'code': 404, 'message': '证书不存在'}), 404 cert_dict = cert.to_dict() # 获取用户信息 user = User.query.get(cert.user_id) if user: cert_dict['username'] = user.username # 解析证书内容 if cert.certificate_content: try: cert_info = parse_certificate(cert.certificate_content) cert_dict['certificate_info'] = cert_info cert_dict['certificate_content'] = cert.certificate_content except Exception as e: print(f'解析证书信息失败: {e}') return jsonify({ 'code': 200, 'data': cert_dict }), 200 @admin_bp.route('/certificate//revoke', methods=['POST']) @admin_required def revoke_certificate(cert_id): """管理员吊销证书""" cert = Certificate.query.filter_by( id=cert_id, deleted_at=None ).first() if not cert: return jsonify({'code': 404, 'message': '证书不存在'}), 404 if cert.state == 2: return jsonify({'code': 400, 'message': '证书已被吊销'}), 400 # 更新证书状态 cert.state = 2 cert.updated_at = get_beijing_now() # 添加到CRL from datetime import timezone crl = CRL( certificate_id=cert_id, input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC ) db.session.add(crl) db.session.commit() return jsonify({ 'code': 200, 'message': '证书已吊销' }), 200