You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
9.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Blueprint, request, jsonify
from models import CARequest, Certificate, CRL, User, db
from utils.cert_utils import sign_certificate, sign_certificate_from_request, parse_certificate
from middleware.auth_middleware import admin_required
from datetime import datetime, timezone
def get_beijing_now():
"""获取当前北京时间UTC+8用于数据库存储"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(beijing_tz)
return beijing_now.replace(tzinfo=None)
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/requests', methods=['GET'])
@admin_required
def get_pending_requests():
"""获取待审核的证书请求"""
state = request.args.get('state', '1', type=int) # 默认为待审核
requests = CARequest.query.filter_by(
state=state,
deleted_at=None
).order_by(CARequest.created_at.desc()).all()
request_list = []
for req in requests:
req_dict = req.to_dict()
# 获取用户信息
user = User.query.get(req.user_id)
if user:
req_dict['username'] = user.username
# 检查是否已生成证书
cert = Certificate.query.filter_by(
request_id=req.id,
deleted_at=None
).first()
req_dict['has_certificate'] = cert is not None
if cert:
req_dict['certificate_id'] = cert.id
request_list.append(req_dict)
return jsonify({
'code': 200,
'data': request_list
}), 200
@admin_bp.route('/request/<int:request_id>', methods=['GET'])
@admin_required
def get_request_detail(request_id):
"""获取证书请求详细信息(包括证书信息)"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
req_dict = cert_request.to_dict()
# 获取用户信息
user = User.query.get(cert_request.user_id)
if user:
req_dict['username'] = user.username
# 获取证书信息(如果已签发)
cert = Certificate.query.filter_by(
request_id=request_id,
deleted_at=None
).first()
if cert:
req_dict['certificate'] = cert.to_dict()
# 解析证书内容
if cert.certificate_content:
try:
cert_info = parse_certificate(cert.certificate_content)
req_dict['certificate']['certificate_info'] = cert_info
except Exception as e:
print(f'解析证书信息失败: {e}')
return jsonify({
'code': 200,
'data': req_dict
}), 200
@admin_bp.route('/request/<int:request_id>/approve', methods=['POST'])
@admin_required
def approve_request(request_id):
"""同意证书申请"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
if cert_request.state != 1:
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
# 检查必要字段
if not cert_request.common_name:
return jsonify({'code': 400, 'message': '证书请求缺少域名(common_name)'}), 400
if not cert_request.public_key:
return jsonify({'code': 400, 'message': '证书请求缺少公钥'}), 400
try:
# 如果有CSR内容使用CSR签署证书
if cert_request.csr_content:
cert_pem, serial_number, expire_time = sign_certificate(
cert_request.csr_content,
request_id
)
else:
# 如果没有CSR根据请求信息直接生成证书
cert_pem, serial_number, expire_time = sign_certificate_from_request(cert_request)
# 创建证书记录
# expire_time 是UTC时间从证书中获取需要转换为北京时间存储
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
if expire_time.tzinfo is None:
# 如果是naive datetime假设是UTC时间
expire_time_utc = expire_time.replace(tzinfo=timezone.utc)
else:
expire_time_utc = expire_time.astimezone(timezone.utc)
# 转换为北京时间并去掉时区信息因为数据库存储naive datetime
expire_time_beijing = expire_time_utc.astimezone(beijing_tz).replace(tzinfo=None)
certificate = Certificate(
user_id=cert_request.user_id,
state=1, # 使用中
request_id=request_id,
expire_time=expire_time_beijing, # 存储为北京时间
certificate_content=cert_pem,
serial_number=serial_number
)
db.session.add(certificate)
# 更新请求状态
cert_request.state = 2 # 审核通过
cert_request.updated_at = get_beijing_now()
db.session.commit()
return jsonify({
'code': 200,
'message': '证书已签发',
'data': {
'certificate_id': certificate.id,
'serial_number': serial_number
}
}), 200
except Exception as e:
db.session.rollback()
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'message': f'签发证书失败: {str(e)}'}), 500
@admin_bp.route('/request/<int:request_id>/reject', methods=['POST'])
@admin_required
def reject_request(request_id):
"""拒绝证书申请"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
if cert_request.state != 1:
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
# 更新请求状态
cert_request.state = 3 # 审核未通过
cert_request.updated_at = get_beijing_now()
db.session.commit()
return jsonify({
'code': 200,
'message': '申请已拒绝'
}), 200
@admin_bp.route('/certificates', methods=['GET'])
@admin_required
def get_all_certificates():
"""获取所有证书"""
certificates = Certificate.query.filter_by(
deleted_at=None
).order_by(Certificate.created_at.desc()).all()
cert_list = []
for cert in certificates:
cert_dict = cert.to_dict()
# 获取用户信息
user = User.query.get(cert.user_id)
if user:
cert_dict['username'] = user.username
# 检查是否已过期(证书过期时间存储在数据库中,是北京时间)
# 如果证书在使用中但已过期,更新状态
if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1:
cert.state = 2 # 更新数据库状态为已过期
cert_dict['state'] = 2
cert_dict['state_text'] = '已过期'
db.session.commit()
cert_list.append(cert_dict)
return jsonify({
'code': 200,
'data': cert_list
}), 200
@admin_bp.route('/certificate/<int:cert_id>', methods=['GET'])
@admin_required
def get_certificate_detail(cert_id):
"""获取证书详细信息"""
cert = Certificate.query.filter_by(
id=cert_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
cert_dict = cert.to_dict()
# 获取用户信息
user = User.query.get(cert.user_id)
if user:
cert_dict['username'] = user.username
# 解析证书内容
if cert.certificate_content:
try:
cert_info = parse_certificate(cert.certificate_content)
cert_dict['certificate_info'] = cert_info
cert_dict['certificate_content'] = cert.certificate_content
except Exception as e:
print(f'解析证书信息失败: {e}')
return jsonify({
'code': 200,
'data': cert_dict
}), 200
@admin_bp.route('/certificate/<int:cert_id>/revoke', methods=['POST'])
@admin_required
def revoke_certificate(cert_id):
"""管理员吊销证书"""
cert = Certificate.query.filter_by(
id=cert_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
if cert.state == 2:
return jsonify({'code': 400, 'message': '证书已被吊销'}), 400
# 更新证书状态
cert.state = 2
cert.updated_at = get_beijing_now()
# 添加到CRL
from datetime import timezone
crl = CRL(
certificate_id=cert_id,
input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC
)
db.session.add(crl)
db.session.commit()
return jsonify({
'code': 200,
'message': '证书已吊销'
}), 200