|
|
from flask import Blueprint, request, jsonify
|
|
|
from models import CARequest, Certificate, CRL, User, db
|
|
|
from utils.cert_utils import sign_certificate, sign_certificate_from_request, parse_certificate
|
|
|
from middleware.auth_middleware import admin_required
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def get_beijing_now():
|
|
|
"""获取当前北京时间(UTC+8),用于数据库存储"""
|
|
|
from datetime import timezone, timedelta
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
|
utc_now = datetime.now(timezone.utc)
|
|
|
beijing_now = utc_now.astimezone(beijing_tz)
|
|
|
return beijing_now.replace(tzinfo=None)
|
|
|
|
|
|
admin_bp = Blueprint('admin', __name__)
|
|
|
|
|
|
@admin_bp.route('/requests', methods=['GET'])
|
|
|
@admin_required
|
|
|
def get_pending_requests():
|
|
|
"""获取待审核的证书请求"""
|
|
|
state = request.args.get('state', '1', type=int) # 默认为待审核
|
|
|
|
|
|
requests = CARequest.query.filter_by(
|
|
|
state=state,
|
|
|
deleted_at=None
|
|
|
).order_by(CARequest.created_at.desc()).all()
|
|
|
|
|
|
request_list = []
|
|
|
for req in requests:
|
|
|
req_dict = req.to_dict()
|
|
|
|
|
|
# 获取用户信息
|
|
|
user = User.query.get(req.user_id)
|
|
|
if user:
|
|
|
req_dict['username'] = user.username
|
|
|
|
|
|
# 检查是否已生成证书
|
|
|
cert = Certificate.query.filter_by(
|
|
|
request_id=req.id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
req_dict['has_certificate'] = cert is not None
|
|
|
if cert:
|
|
|
req_dict['certificate_id'] = cert.id
|
|
|
|
|
|
request_list.append(req_dict)
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': request_list
|
|
|
}), 200
|
|
|
|
|
|
@admin_bp.route('/request/<int:request_id>', methods=['GET'])
|
|
|
@admin_required
|
|
|
def get_request_detail(request_id):
|
|
|
"""获取证书请求详细信息(包括证书信息)"""
|
|
|
cert_request = CARequest.query.filter_by(
|
|
|
id=request_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert_request:
|
|
|
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
|
|
|
|
|
|
req_dict = cert_request.to_dict()
|
|
|
|
|
|
# 获取用户信息
|
|
|
user = User.query.get(cert_request.user_id)
|
|
|
if user:
|
|
|
req_dict['username'] = user.username
|
|
|
|
|
|
# 获取证书信息(如果已签发)
|
|
|
cert = Certificate.query.filter_by(
|
|
|
request_id=request_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if cert:
|
|
|
req_dict['certificate'] = cert.to_dict()
|
|
|
# 解析证书内容
|
|
|
if cert.certificate_content:
|
|
|
try:
|
|
|
cert_info = parse_certificate(cert.certificate_content)
|
|
|
req_dict['certificate']['certificate_info'] = cert_info
|
|
|
except Exception as e:
|
|
|
print(f'解析证书信息失败: {e}')
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': req_dict
|
|
|
}), 200
|
|
|
|
|
|
@admin_bp.route('/request/<int:request_id>/approve', methods=['POST'])
|
|
|
@admin_required
|
|
|
def approve_request(request_id):
|
|
|
"""同意证书申请"""
|
|
|
cert_request = CARequest.query.filter_by(
|
|
|
id=request_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert_request:
|
|
|
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
|
|
|
|
|
|
if cert_request.state != 1:
|
|
|
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
|
|
|
|
|
|
# 检查必要字段
|
|
|
if not cert_request.common_name:
|
|
|
return jsonify({'code': 400, 'message': '证书请求缺少域名(common_name)'}), 400
|
|
|
|
|
|
if not cert_request.public_key:
|
|
|
return jsonify({'code': 400, 'message': '证书请求缺少公钥'}), 400
|
|
|
|
|
|
try:
|
|
|
# 如果有CSR内容,使用CSR签署证书
|
|
|
if cert_request.csr_content:
|
|
|
cert_pem, serial_number, expire_time = sign_certificate(
|
|
|
cert_request.csr_content,
|
|
|
request_id
|
|
|
)
|
|
|
else:
|
|
|
# 如果没有CSR,根据请求信息直接生成证书
|
|
|
cert_pem, serial_number, expire_time = sign_certificate_from_request(cert_request)
|
|
|
|
|
|
# 创建证书记录
|
|
|
# expire_time 是UTC时间(从证书中获取),需要转换为北京时间存储
|
|
|
from datetime import timezone, timedelta
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
|
if expire_time.tzinfo is None:
|
|
|
# 如果是naive datetime,假设是UTC时间
|
|
|
expire_time_utc = expire_time.replace(tzinfo=timezone.utc)
|
|
|
else:
|
|
|
expire_time_utc = expire_time.astimezone(timezone.utc)
|
|
|
# 转换为北京时间并去掉时区信息(因为数据库存储naive datetime)
|
|
|
expire_time_beijing = expire_time_utc.astimezone(beijing_tz).replace(tzinfo=None)
|
|
|
|
|
|
certificate = Certificate(
|
|
|
user_id=cert_request.user_id,
|
|
|
state=1, # 使用中
|
|
|
request_id=request_id,
|
|
|
expire_time=expire_time_beijing, # 存储为北京时间
|
|
|
certificate_content=cert_pem,
|
|
|
serial_number=serial_number
|
|
|
)
|
|
|
db.session.add(certificate)
|
|
|
|
|
|
# 更新请求状态
|
|
|
cert_request.state = 2 # 审核通过
|
|
|
cert_request.updated_at = get_beijing_now()
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '证书已签发',
|
|
|
'data': {
|
|
|
'certificate_id': certificate.id,
|
|
|
'serial_number': serial_number
|
|
|
}
|
|
|
}), 200
|
|
|
except Exception as e:
|
|
|
db.session.rollback()
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return jsonify({'code': 500, 'message': f'签发证书失败: {str(e)}'}), 500
|
|
|
|
|
|
@admin_bp.route('/request/<int:request_id>/reject', methods=['POST'])
|
|
|
@admin_required
|
|
|
def reject_request(request_id):
|
|
|
"""拒绝证书申请"""
|
|
|
cert_request = CARequest.query.filter_by(
|
|
|
id=request_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert_request:
|
|
|
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
|
|
|
|
|
|
if cert_request.state != 1:
|
|
|
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
|
|
|
|
|
|
# 更新请求状态
|
|
|
cert_request.state = 3 # 审核未通过
|
|
|
cert_request.updated_at = get_beijing_now()
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '申请已拒绝'
|
|
|
}), 200
|
|
|
|
|
|
@admin_bp.route('/certificates', methods=['GET'])
|
|
|
@admin_required
|
|
|
def get_all_certificates():
|
|
|
"""获取所有证书"""
|
|
|
certificates = Certificate.query.filter_by(
|
|
|
deleted_at=None
|
|
|
).order_by(Certificate.created_at.desc()).all()
|
|
|
|
|
|
cert_list = []
|
|
|
for cert in certificates:
|
|
|
cert_dict = cert.to_dict()
|
|
|
|
|
|
# 获取用户信息
|
|
|
user = User.query.get(cert.user_id)
|
|
|
if user:
|
|
|
cert_dict['username'] = user.username
|
|
|
|
|
|
# 检查是否已过期(证书过期时间存储在数据库中,是北京时间)
|
|
|
# 如果证书在使用中但已过期,更新状态
|
|
|
if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1:
|
|
|
cert.state = 2 # 更新数据库状态为已过期
|
|
|
cert_dict['state'] = 2
|
|
|
cert_dict['state_text'] = '已过期'
|
|
|
db.session.commit()
|
|
|
cert_list.append(cert_dict)
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': cert_list
|
|
|
}), 200
|
|
|
|
|
|
@admin_bp.route('/certificate/<int:cert_id>', methods=['GET'])
|
|
|
@admin_required
|
|
|
def get_certificate_detail(cert_id):
|
|
|
"""获取证书详细信息"""
|
|
|
cert = Certificate.query.filter_by(
|
|
|
id=cert_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert:
|
|
|
return jsonify({'code': 404, 'message': '证书不存在'}), 404
|
|
|
|
|
|
cert_dict = cert.to_dict()
|
|
|
|
|
|
# 获取用户信息
|
|
|
user = User.query.get(cert.user_id)
|
|
|
if user:
|
|
|
cert_dict['username'] = user.username
|
|
|
|
|
|
# 解析证书内容
|
|
|
if cert.certificate_content:
|
|
|
try:
|
|
|
cert_info = parse_certificate(cert.certificate_content)
|
|
|
cert_dict['certificate_info'] = cert_info
|
|
|
cert_dict['certificate_content'] = cert.certificate_content
|
|
|
except Exception as e:
|
|
|
print(f'解析证书信息失败: {e}')
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': cert_dict
|
|
|
}), 200
|
|
|
|
|
|
@admin_bp.route('/certificate/<int:cert_id>/revoke', methods=['POST'])
|
|
|
@admin_required
|
|
|
def revoke_certificate(cert_id):
|
|
|
"""管理员吊销证书"""
|
|
|
cert = Certificate.query.filter_by(
|
|
|
id=cert_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert:
|
|
|
return jsonify({'code': 404, 'message': '证书不存在'}), 404
|
|
|
|
|
|
if cert.state == 2:
|
|
|
return jsonify({'code': 400, 'message': '证书已被吊销'}), 400
|
|
|
|
|
|
# 更新证书状态
|
|
|
cert.state = 2
|
|
|
cert.updated_at = get_beijing_now()
|
|
|
|
|
|
# 添加到CRL
|
|
|
from datetime import timezone
|
|
|
crl = CRL(
|
|
|
certificate_id=cert_id,
|
|
|
input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC
|
|
|
)
|
|
|
db.session.add(crl)
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '证书已吊销'
|
|
|
}), 200
|