|
|
from flask import Blueprint, request, jsonify, send_file
|
|
|
from models import Certificate, CARequest, CRL, db
|
|
|
from utils.auth_utils import verify_token
|
|
|
from utils.cert_utils import (
|
|
|
generate_key_pair, create_csr_from_data, parse_csr,
|
|
|
sign_certificate, verify_certificate, parse_certificate
|
|
|
)
|
|
|
from middleware.auth_middleware import login_required
|
|
|
from datetime import datetime, timezone
|
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
import os
|
|
|
|
|
|
def get_beijing_now():
|
|
|
"""获取当前北京时间(UTC+8),用于数据库存储"""
|
|
|
from datetime import timezone, timedelta
|
|
|
beijing_tz = timezone(timedelta(hours=8))
|
|
|
utc_now = datetime.now(timezone.utc)
|
|
|
beijing_now = utc_now.astimezone(beijing_tz)
|
|
|
return beijing_now.replace(tzinfo=None)
|
|
|
|
|
|
cert_bp = Blueprint('certificate', __name__)
|
|
|
|
|
|
@cert_bp.route('/list', methods=['GET'])
|
|
|
@login_required
|
|
|
def get_certificates():
|
|
|
"""获取用户的所有证书"""
|
|
|
user_id = request.user_id
|
|
|
|
|
|
certificates = Certificate.query.filter_by(
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).order_by(Certificate.created_at.desc()).all()
|
|
|
|
|
|
cert_list = []
|
|
|
for cert in certificates:
|
|
|
cert_dict = cert.to_dict()
|
|
|
# 检查是否已过期(证书过期时间存储在数据库中,是北京时间)
|
|
|
# 如果证书在使用中但已过期,更新状态
|
|
|
if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1:
|
|
|
cert.state = 2 # 更新数据库状态为已过期
|
|
|
cert_dict['state'] = 2
|
|
|
cert_dict['state_text'] = '已过期'
|
|
|
db.session.commit()
|
|
|
cert_list.append(cert_dict)
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': cert_list
|
|
|
}), 200
|
|
|
|
|
|
@cert_bp.route('/detail/<int:cert_id>', methods=['GET'])
|
|
|
@login_required
|
|
|
def get_certificate_detail(cert_id):
|
|
|
"""获取证书详细信息"""
|
|
|
user_id = request.user_id
|
|
|
|
|
|
cert = Certificate.query.filter_by(
|
|
|
id=cert_id,
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert:
|
|
|
return jsonify({'code': 404, 'message': '证书不存在'}), 404
|
|
|
|
|
|
cert_dict = cert.to_dict()
|
|
|
|
|
|
# 解析证书内容
|
|
|
if cert.certificate_content:
|
|
|
cert_info = parse_certificate(cert.certificate_content)
|
|
|
cert_dict['certificate_info'] = cert_info
|
|
|
cert_dict['certificate_content'] = cert.certificate_content
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': cert_dict
|
|
|
}), 200
|
|
|
|
|
|
@cert_bp.route('/download/<int:cert_id>', methods=['GET'])
|
|
|
@login_required
|
|
|
def download_certificate(cert_id):
|
|
|
"""下载证书"""
|
|
|
try:
|
|
|
user_id = request.user_id
|
|
|
|
|
|
cert = Certificate.query.filter_by(
|
|
|
id=cert_id,
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert:
|
|
|
return jsonify({'code': 404, 'message': '证书不存在'}), 404
|
|
|
|
|
|
if not cert.certificate_content:
|
|
|
return jsonify({'code': 404, 'message': '证书内容为空'}), 404
|
|
|
|
|
|
# 使用StringIO直接在内存中创建文件,不需要临时文件
|
|
|
from io import BytesIO
|
|
|
filename = f'certificate_{cert_id}.cer'
|
|
|
|
|
|
# 创建文件流
|
|
|
file_data = BytesIO()
|
|
|
file_data.write(cert.certificate_content.encode('utf-8'))
|
|
|
file_data.seek(0)
|
|
|
|
|
|
return send_file(
|
|
|
file_data,
|
|
|
mimetype='application/x-x509-ca-cert',
|
|
|
as_attachment=True,
|
|
|
download_name=filename
|
|
|
)
|
|
|
except Exception as e:
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
return jsonify({'code': 500, 'message': f'下载失败: {str(e)}'}), 500
|
|
|
|
|
|
@cert_bp.route('/revoke/<int:cert_id>', methods=['POST'])
|
|
|
@login_required
|
|
|
def revoke_certificate(cert_id):
|
|
|
"""吊销证书"""
|
|
|
user_id = request.user_id
|
|
|
|
|
|
cert = Certificate.query.filter_by(
|
|
|
id=cert_id,
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert:
|
|
|
return jsonify({'code': 404, 'message': '证书不存在'}), 404
|
|
|
|
|
|
if cert.state == 2:
|
|
|
return jsonify({'code': 400, 'message': '证书已被吊销'}), 400
|
|
|
|
|
|
# 更新证书状态
|
|
|
cert.state = 2
|
|
|
cert.updated_at = get_beijing_now()
|
|
|
|
|
|
# 添加到CRL
|
|
|
from datetime import timezone
|
|
|
crl = CRL(
|
|
|
certificate_id=cert_id,
|
|
|
input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC
|
|
|
)
|
|
|
db.session.add(crl)
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '证书已吊销'
|
|
|
}), 200
|
|
|
|
|
|
@cert_bp.route('/verify', methods=['POST'])
|
|
|
@login_required
|
|
|
def verify_cert():
|
|
|
"""验证证书"""
|
|
|
if 'file' not in request.files:
|
|
|
return jsonify({'code': 400, 'message': '未上传文件'}), 400
|
|
|
|
|
|
file = request.files['file']
|
|
|
if file.filename == '':
|
|
|
return jsonify({'code': 400, 'message': '未选择文件'}), 400
|
|
|
|
|
|
# 支持多种证书文件格式
|
|
|
allowed_extensions = ['.cer', '.crt', '.pem', '.der']
|
|
|
if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions):
|
|
|
return jsonify({'code': 400, 'message': '文件格式错误,请上传.cer、.crt、.pem或.der文件'}), 400
|
|
|
|
|
|
try:
|
|
|
file_content = file.read()
|
|
|
cert_pem = None
|
|
|
|
|
|
# 尝试判断文件格式:PEM格式是文本,DER格式是二进制
|
|
|
# PEM格式通常以"-----BEGIN"开头
|
|
|
from cryptography import x509
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
# 首先尝试作为DER格式(二进制)处理
|
|
|
try:
|
|
|
cert = x509.load_der_x509_certificate(file_content, default_backend())
|
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
|
|
|
except Exception:
|
|
|
# 如果DER格式失败,尝试作为PEM格式(文本)处理
|
|
|
try:
|
|
|
# 尝试UTF-8解码
|
|
|
cert_pem_text = file_content.decode('utf-8')
|
|
|
# 验证是否是PEM格式
|
|
|
if cert_pem_text.strip().startswith('-----BEGIN'):
|
|
|
cert_pem = cert_pem_text
|
|
|
else:
|
|
|
# 如果不是PEM格式,尝试直接加载(可能是其他编码)
|
|
|
cert = x509.load_pem_x509_certificate(file_content, default_backend())
|
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
|
|
|
except UnicodeDecodeError:
|
|
|
# UTF-8解码失败,说明是二进制格式,但DER也失败了
|
|
|
# 尝试其他编码或直接作为PEM二进制处理
|
|
|
try:
|
|
|
cert = x509.load_pem_x509_certificate(file_content, default_backend())
|
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
|
|
|
except Exception as pem_error:
|
|
|
return jsonify({
|
|
|
'code': 400,
|
|
|
'message': f'无法解析证书文件。请确保文件是有效的PEM或DER格式证书。错误: {str(pem_error)}'
|
|
|
}), 400
|
|
|
except Exception as pem_error:
|
|
|
# PEM格式加载失败
|
|
|
return jsonify({
|
|
|
'code': 400,
|
|
|
'message': f'无法解析证书文件。请确保文件是有效的PEM或DER格式证书。错误: {str(pem_error)}'
|
|
|
}), 400
|
|
|
|
|
|
if not cert_pem:
|
|
|
return jsonify({'code': 400, 'message': '无法解析证书文件'}), 400
|
|
|
|
|
|
# 验证证书,包括检查CRL
|
|
|
is_valid, message = verify_certificate(cert_pem, check_crl=True)
|
|
|
|
|
|
cert_info = None
|
|
|
if is_valid:
|
|
|
cert_info = parse_certificate(cert_pem)
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': {
|
|
|
'is_valid': is_valid,
|
|
|
'message': message,
|
|
|
'certificate_info': cert_info
|
|
|
}
|
|
|
}), 200
|
|
|
except Exception as e:
|
|
|
return jsonify({'code': 500, 'message': f'验证失败: {str(e)}'}), 500
|
|
|
|
|
|
@cert_bp.route('/request', methods=['POST'])
|
|
|
@login_required
|
|
|
def request_certificate():
|
|
|
"""申请证书(第一步:提交基本信息)"""
|
|
|
user_id = request.user_id
|
|
|
data = request.get_json()
|
|
|
|
|
|
# 验证必填字段
|
|
|
if not data.get('common_name'):
|
|
|
return jsonify({'code': 400, 'message': '域名为必填项'}), 400
|
|
|
|
|
|
# 检查是否有待审核的请求
|
|
|
existing_request = CARequest.query.filter_by(
|
|
|
user_id=user_id,
|
|
|
state=1,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if existing_request:
|
|
|
return jsonify({'code': 400, 'message': '您有正在审核中的申请,请等待审核完成'}), 400
|
|
|
|
|
|
# 创建证书请求
|
|
|
cert_request = CARequest(
|
|
|
user_id=user_id,
|
|
|
state=1, # 待审核
|
|
|
country=data.get('country'),
|
|
|
province=data.get('province'),
|
|
|
locality=data.get('locality'),
|
|
|
organization=data.get('organization'),
|
|
|
organization_unit_name=data.get('organization_unit_name'),
|
|
|
common_name=data.get('common_name'),
|
|
|
email_address=data.get('email_address'),
|
|
|
public_key='', # 第二步提交
|
|
|
csr_content=data.get('csr_content') # 如果上传了CSR
|
|
|
)
|
|
|
|
|
|
db.session.add(cert_request)
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '基本信息提交成功,请继续提交公钥',
|
|
|
'data': {
|
|
|
'request_id': cert_request.id
|
|
|
}
|
|
|
}), 200
|
|
|
|
|
|
@cert_bp.route('/request/upload-csr', methods=['POST'])
|
|
|
@login_required
|
|
|
def upload_csr():
|
|
|
"""上传CSR文件"""
|
|
|
user_id = request.user_id
|
|
|
|
|
|
if 'file' not in request.files:
|
|
|
return jsonify({'code': 400, 'message': '未上传文件'}), 400
|
|
|
|
|
|
file = request.files['file']
|
|
|
if file.filename == '':
|
|
|
return jsonify({'code': 400, 'message': '未选择文件'}), 400
|
|
|
|
|
|
if not file.filename.endswith('.csr'):
|
|
|
return jsonify({'code': 400, 'message': '文件格式错误,请上传.csr文件'}), 400
|
|
|
|
|
|
try:
|
|
|
csr_pem = file.read().decode('utf-8')
|
|
|
data, public_key = parse_csr(csr_pem)
|
|
|
|
|
|
# 检查是否有待审核的请求
|
|
|
existing_request = CARequest.query.filter_by(
|
|
|
user_id=user_id,
|
|
|
state=1,
|
|
|
deleted_at=None
|
|
|
).order_by(CARequest.created_at.desc()).first()
|
|
|
|
|
|
if existing_request:
|
|
|
# 更新现有请求
|
|
|
existing_request.csr_content = csr_pem
|
|
|
existing_request.public_key = public_key
|
|
|
existing_request.country = data.get('country')
|
|
|
existing_request.province = data.get('province')
|
|
|
existing_request.locality = data.get('locality')
|
|
|
existing_request.organization = data.get('organization')
|
|
|
existing_request.organization_unit_name = data.get('organization_unit_name')
|
|
|
existing_request.common_name = data.get('common_name')
|
|
|
existing_request.email_address = data.get('email_address')
|
|
|
existing_request.state = 1 # 待审核
|
|
|
else:
|
|
|
# 创建新请求
|
|
|
existing_request = CARequest(
|
|
|
user_id=user_id,
|
|
|
state=1,
|
|
|
csr_content=csr_pem,
|
|
|
public_key=public_key,
|
|
|
country=data.get('country'),
|
|
|
province=data.get('province'),
|
|
|
locality=data.get('locality'),
|
|
|
organization=data.get('organization'),
|
|
|
organization_unit_name=data.get('organization_unit_name'),
|
|
|
common_name=data.get('common_name'),
|
|
|
email_address=data.get('email_address')
|
|
|
)
|
|
|
db.session.add(existing_request)
|
|
|
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': 'CSR文件上传成功,等待审核',
|
|
|
'data': {
|
|
|
'request_id': existing_request.id,
|
|
|
'parsed_data': data
|
|
|
}
|
|
|
}), 200
|
|
|
except Exception as e:
|
|
|
return jsonify({'code': 500, 'message': f'解析CSR失败: {str(e)}'}), 500
|
|
|
|
|
|
@cert_bp.route('/request/submit-key', methods=['POST'])
|
|
|
@login_required
|
|
|
def submit_key():
|
|
|
"""提交公钥(第二步)"""
|
|
|
user_id = request.user_id
|
|
|
data = request.get_json()
|
|
|
|
|
|
request_id = data.get('request_id')
|
|
|
public_key = data.get('public_key')
|
|
|
private_key = data.get('private_key') # 可选,如果自动生成
|
|
|
|
|
|
if not request_id:
|
|
|
return jsonify({'code': 400, 'message': '请求ID不能为空'}), 400
|
|
|
|
|
|
cert_request = CARequest.query.filter_by(
|
|
|
id=request_id,
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).first()
|
|
|
|
|
|
if not cert_request:
|
|
|
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
|
|
|
|
|
|
if cert_request.state != 1:
|
|
|
return jsonify({'code': 400, 'message': '该请求已处理,无法修改'}), 400
|
|
|
|
|
|
# 如果提供了私钥,创建CSR
|
|
|
if private_key and not cert_request.csr_content:
|
|
|
try:
|
|
|
csr, _ = create_csr_from_data({
|
|
|
'private_key': private_key,
|
|
|
'country': cert_request.country,
|
|
|
'province': cert_request.province,
|
|
|
'locality': cert_request.locality,
|
|
|
'organization': cert_request.organization,
|
|
|
'organization_unit_name': cert_request.organization_unit_name,
|
|
|
'common_name': cert_request.common_name,
|
|
|
'email_address': cert_request.email_address
|
|
|
})
|
|
|
cert_request.csr_content = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
|
|
|
except Exception as e:
|
|
|
return jsonify({'code': 500, 'message': f'创建CSR失败: {str(e)}'}), 500
|
|
|
|
|
|
# 更新公钥
|
|
|
if public_key:
|
|
|
cert_request.public_key = public_key
|
|
|
|
|
|
cert_request.state = 1 # 保持待审核状态
|
|
|
db.session.commit()
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'message': '公钥提交成功,等待审核'
|
|
|
}), 200
|
|
|
|
|
|
@cert_bp.route('/generate-keypair', methods=['POST'])
|
|
|
@login_required
|
|
|
def generate_keypair():
|
|
|
"""自动生成密钥对"""
|
|
|
try:
|
|
|
private_key, public_key = generate_key_pair()
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': {
|
|
|
'private_key': private_key,
|
|
|
'public_key': public_key
|
|
|
}
|
|
|
}), 200
|
|
|
except Exception as e:
|
|
|
return jsonify({'code': 500, 'message': f'生成密钥对失败: {str(e)}'}), 500
|
|
|
|
|
|
@cert_bp.route('/request/status', methods=['GET'])
|
|
|
@login_required
|
|
|
def get_request_status():
|
|
|
"""获取证书申请状态"""
|
|
|
user_id = request.user_id
|
|
|
|
|
|
requests = CARequest.query.filter_by(
|
|
|
user_id=user_id,
|
|
|
deleted_at=None
|
|
|
).order_by(CARequest.created_at.desc()).all()
|
|
|
|
|
|
request_list = [req.to_dict() for req in requests]
|
|
|
|
|
|
return jsonify({
|
|
|
'code': 200,
|
|
|
'data': request_list
|
|
|
}), 200
|
|
|
|