ADD file via upload

main
p6s4t3fer 4 months ago
parent bdeaf1032a
commit 5e0dd1245c

@ -0,0 +1,288 @@
from flask import Blueprint, request, jsonify
from models import CARequest, Certificate, CRL, User, db
from utils.cert_utils import sign_certificate, sign_certificate_from_request, parse_certificate
from middleware.auth_middleware import admin_required
from datetime import datetime, timezone
def get_beijing_now():
"""获取当前北京时间UTC+8用于数据库存储"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(beijing_tz)
return beijing_now.replace(tzinfo=None)
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/requests', methods=['GET'])
@admin_required
def get_pending_requests():
"""获取待审核的证书请求"""
state = request.args.get('state', '1', type=int) # 默认为待审核
requests = CARequest.query.filter_by(
state=state,
deleted_at=None
).order_by(CARequest.created_at.desc()).all()
request_list = []
for req in requests:
req_dict = req.to_dict()
# 获取用户信息
user = User.query.get(req.user_id)
if user:
req_dict['username'] = user.username
# 检查是否已生成证书
cert = Certificate.query.filter_by(
request_id=req.id,
deleted_at=None
).first()
req_dict['has_certificate'] = cert is not None
if cert:
req_dict['certificate_id'] = cert.id
request_list.append(req_dict)
return jsonify({
'code': 200,
'data': request_list
}), 200
@admin_bp.route('/request/<int:request_id>', methods=['GET'])
@admin_required
def get_request_detail(request_id):
"""获取证书请求详细信息(包括证书信息)"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
req_dict = cert_request.to_dict()
# 获取用户信息
user = User.query.get(cert_request.user_id)
if user:
req_dict['username'] = user.username
# 获取证书信息(如果已签发)
cert = Certificate.query.filter_by(
request_id=request_id,
deleted_at=None
).first()
if cert:
req_dict['certificate'] = cert.to_dict()
# 解析证书内容
if cert.certificate_content:
try:
cert_info = parse_certificate(cert.certificate_content)
req_dict['certificate']['certificate_info'] = cert_info
except Exception as e:
print(f'解析证书信息失败: {e}')
return jsonify({
'code': 200,
'data': req_dict
}), 200
@admin_bp.route('/request/<int:request_id>/approve', methods=['POST'])
@admin_required
def approve_request(request_id):
"""同意证书申请"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
if cert_request.state != 1:
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
# 检查必要字段
if not cert_request.common_name:
return jsonify({'code': 400, 'message': '证书请求缺少域名(common_name)'}), 400
if not cert_request.public_key:
return jsonify({'code': 400, 'message': '证书请求缺少公钥'}), 400
try:
# 如果有CSR内容使用CSR签署证书
if cert_request.csr_content:
cert_pem, serial_number, expire_time = sign_certificate(
cert_request.csr_content,
request_id
)
else:
# 如果没有CSR根据请求信息直接生成证书
cert_pem, serial_number, expire_time = sign_certificate_from_request(cert_request)
# 创建证书记录
# expire_time 是UTC时间从证书中获取需要转换为北京时间存储
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
if expire_time.tzinfo is None:
# 如果是naive datetime假设是UTC时间
expire_time_utc = expire_time.replace(tzinfo=timezone.utc)
else:
expire_time_utc = expire_time.astimezone(timezone.utc)
# 转换为北京时间并去掉时区信息因为数据库存储naive datetime
expire_time_beijing = expire_time_utc.astimezone(beijing_tz).replace(tzinfo=None)
certificate = Certificate(
user_id=cert_request.user_id,
state=1, # 使用中
request_id=request_id,
expire_time=expire_time_beijing, # 存储为北京时间
certificate_content=cert_pem,
serial_number=serial_number
)
db.session.add(certificate)
# 更新请求状态
cert_request.state = 2 # 审核通过
cert_request.updated_at = get_beijing_now()
db.session.commit()
return jsonify({
'code': 200,
'message': '证书已签发',
'data': {
'certificate_id': certificate.id,
'serial_number': serial_number
}
}), 200
except Exception as e:
db.session.rollback()
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'message': f'签发证书失败: {str(e)}'}), 500
@admin_bp.route('/request/<int:request_id>/reject', methods=['POST'])
@admin_required
def reject_request(request_id):
"""拒绝证书申请"""
cert_request = CARequest.query.filter_by(
id=request_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
if cert_request.state != 1:
return jsonify({'code': 400, 'message': '该请求已处理'}), 400
# 更新请求状态
cert_request.state = 3 # 审核未通过
cert_request.updated_at = get_beijing_now()
db.session.commit()
return jsonify({
'code': 200,
'message': '申请已拒绝'
}), 200
@admin_bp.route('/certificates', methods=['GET'])
@admin_required
def get_all_certificates():
"""获取所有证书"""
certificates = Certificate.query.filter_by(
deleted_at=None
).order_by(Certificate.created_at.desc()).all()
cert_list = []
for cert in certificates:
cert_dict = cert.to_dict()
# 获取用户信息
user = User.query.get(cert.user_id)
if user:
cert_dict['username'] = user.username
# 检查是否已过期(证书过期时间存储在数据库中,是北京时间)
# 如果证书在使用中但已过期,更新状态
if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1:
cert.state = 2 # 更新数据库状态为已过期
cert_dict['state'] = 2
cert_dict['state_text'] = '已过期'
db.session.commit()
cert_list.append(cert_dict)
return jsonify({
'code': 200,
'data': cert_list
}), 200
@admin_bp.route('/certificate/<int:cert_id>', methods=['GET'])
@admin_required
def get_certificate_detail(cert_id):
"""获取证书详细信息"""
cert = Certificate.query.filter_by(
id=cert_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
cert_dict = cert.to_dict()
# 获取用户信息
user = User.query.get(cert.user_id)
if user:
cert_dict['username'] = user.username
# 解析证书内容
if cert.certificate_content:
try:
cert_info = parse_certificate(cert.certificate_content)
cert_dict['certificate_info'] = cert_info
cert_dict['certificate_content'] = cert.certificate_content
except Exception as e:
print(f'解析证书信息失败: {e}')
return jsonify({
'code': 200,
'data': cert_dict
}), 200
@admin_bp.route('/certificate/<int:cert_id>/revoke', methods=['POST'])
@admin_required
def revoke_certificate(cert_id):
"""管理员吊销证书"""
cert = Certificate.query.filter_by(
id=cert_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
if cert.state == 2:
return jsonify({'code': 400, 'message': '证书已被吊销'}), 400
# 更新证书状态
cert.state = 2
cert.updated_at = get_beijing_now()
# 添加到CRL
from datetime import timezone
crl = CRL(
certificate_id=cert_id,
input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC
)
db.session.add(crl)
db.session.commit()
return jsonify({
'code': 200,
'message': '证书已吊销'
}), 200
Loading…
Cancel
Save