You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Blueprint, request, jsonify, send_file
from models import Certificate, CARequest, CRL, db
from utils.auth_utils import verify_token
from utils.cert_utils import (
generate_key_pair, create_csr_from_data, parse_csr,
sign_certificate, verify_certificate, parse_certificate
)
from middleware.auth_middleware import login_required
from datetime import datetime, timezone
from cryptography.hazmat.primitives import serialization
import os
def get_beijing_now():
"""获取当前北京时间UTC+8用于数据库存储"""
from datetime import timezone, timedelta
beijing_tz = timezone(timedelta(hours=8))
utc_now = datetime.now(timezone.utc)
beijing_now = utc_now.astimezone(beijing_tz)
return beijing_now.replace(tzinfo=None)
cert_bp = Blueprint('certificate', __name__)
@cert_bp.route('/list', methods=['GET'])
@login_required
def get_certificates():
"""获取用户的所有证书"""
user_id = request.user_id
certificates = Certificate.query.filter_by(
user_id=user_id,
deleted_at=None
).order_by(Certificate.created_at.desc()).all()
cert_list = []
for cert in certificates:
cert_dict = cert.to_dict()
# 检查是否已过期(证书过期时间存储在数据库中,是北京时间)
# 如果证书在使用中但已过期,更新状态
if cert.expire_time and cert.expire_time < get_beijing_now() and cert.state == 1:
cert.state = 2 # 更新数据库状态为已过期
cert_dict['state'] = 2
cert_dict['state_text'] = '已过期'
db.session.commit()
cert_list.append(cert_dict)
return jsonify({
'code': 200,
'data': cert_list
}), 200
@cert_bp.route('/detail/<int:cert_id>', methods=['GET'])
@login_required
def get_certificate_detail(cert_id):
"""获取证书详细信息"""
user_id = request.user_id
cert = Certificate.query.filter_by(
id=cert_id,
user_id=user_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
cert_dict = cert.to_dict()
# 解析证书内容
if cert.certificate_content:
cert_info = parse_certificate(cert.certificate_content)
cert_dict['certificate_info'] = cert_info
cert_dict['certificate_content'] = cert.certificate_content
return jsonify({
'code': 200,
'data': cert_dict
}), 200
@cert_bp.route('/download/<int:cert_id>', methods=['GET'])
@login_required
def download_certificate(cert_id):
"""下载证书"""
try:
user_id = request.user_id
cert = Certificate.query.filter_by(
id=cert_id,
user_id=user_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
if not cert.certificate_content:
return jsonify({'code': 404, 'message': '证书内容为空'}), 404
# 使用StringIO直接在内存中创建文件不需要临时文件
from io import BytesIO
filename = f'certificate_{cert_id}.cer'
# 创建文件流
file_data = BytesIO()
file_data.write(cert.certificate_content.encode('utf-8'))
file_data.seek(0)
return send_file(
file_data,
mimetype='application/x-x509-ca-cert',
as_attachment=True,
download_name=filename
)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'code': 500, 'message': f'下载失败: {str(e)}'}), 500
@cert_bp.route('/revoke/<int:cert_id>', methods=['POST'])
@login_required
def revoke_certificate(cert_id):
"""吊销证书"""
user_id = request.user_id
cert = Certificate.query.filter_by(
id=cert_id,
user_id=user_id,
deleted_at=None
).first()
if not cert:
return jsonify({'code': 404, 'message': '证书不存在'}), 404
if cert.state == 2:
return jsonify({'code': 400, 'message': '证书已被吊销'}), 400
# 更新证书状态
cert.state = 2
cert.updated_at = get_beijing_now()
# 添加到CRL
from datetime import timezone
crl = CRL(
certificate_id=cert_id,
input_time=int(datetime.now(timezone.utc).timestamp()) # CRL时间戳使用UTC
)
db.session.add(crl)
db.session.commit()
return jsonify({
'code': 200,
'message': '证书已吊销'
}), 200
@cert_bp.route('/verify', methods=['POST'])
@login_required
def verify_cert():
"""验证证书"""
if 'file' not in request.files:
return jsonify({'code': 400, 'message': '未上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'code': 400, 'message': '未选择文件'}), 400
# 支持多种证书文件格式
allowed_extensions = ['.cer', '.crt', '.pem', '.der']
if not any(file.filename.lower().endswith(ext) for ext in allowed_extensions):
return jsonify({'code': 400, 'message': '文件格式错误,请上传.cer、.crt、.pem或.der文件'}), 400
try:
file_content = file.read()
cert_pem = None
# 尝试判断文件格式PEM格式是文本DER格式是二进制
# PEM格式通常以"-----BEGIN"开头
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# 首先尝试作为DER格式二进制处理
try:
cert = x509.load_der_x509_certificate(file_content, default_backend())
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
except Exception:
# 如果DER格式失败尝试作为PEM格式文本处理
try:
# 尝试UTF-8解码
cert_pem_text = file_content.decode('utf-8')
# 验证是否是PEM格式
if cert_pem_text.strip().startswith('-----BEGIN'):
cert_pem = cert_pem_text
else:
# 如果不是PEM格式尝试直接加载可能是其他编码
cert = x509.load_pem_x509_certificate(file_content, default_backend())
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
except UnicodeDecodeError:
# UTF-8解码失败说明是二进制格式但DER也失败了
# 尝试其他编码或直接作为PEM二进制处理
try:
cert = x509.load_pem_x509_certificate(file_content, default_backend())
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')
except Exception as pem_error:
return jsonify({
'code': 400,
'message': f'无法解析证书文件。请确保文件是有效的PEM或DER格式证书。错误: {str(pem_error)}'
}), 400
except Exception as pem_error:
# PEM格式加载失败
return jsonify({
'code': 400,
'message': f'无法解析证书文件。请确保文件是有效的PEM或DER格式证书。错误: {str(pem_error)}'
}), 400
if not cert_pem:
return jsonify({'code': 400, 'message': '无法解析证书文件'}), 400
# 验证证书包括检查CRL
is_valid, message = verify_certificate(cert_pem, check_crl=True)
cert_info = None
if is_valid:
cert_info = parse_certificate(cert_pem)
return jsonify({
'code': 200,
'data': {
'is_valid': is_valid,
'message': message,
'certificate_info': cert_info
}
}), 200
except Exception as e:
return jsonify({'code': 500, 'message': f'验证失败: {str(e)}'}), 500
@cert_bp.route('/request', methods=['POST'])
@login_required
def request_certificate():
"""申请证书(第一步:提交基本信息)"""
user_id = request.user_id
data = request.get_json()
# 验证必填字段
if not data.get('common_name'):
return jsonify({'code': 400, 'message': '域名为必填项'}), 400
# 检查是否有待审核的请求
existing_request = CARequest.query.filter_by(
user_id=user_id,
state=1,
deleted_at=None
).first()
if existing_request:
return jsonify({'code': 400, 'message': '您有正在审核中的申请,请等待审核完成'}), 400
# 创建证书请求
cert_request = CARequest(
user_id=user_id,
state=1, # 待审核
country=data.get('country'),
province=data.get('province'),
locality=data.get('locality'),
organization=data.get('organization'),
organization_unit_name=data.get('organization_unit_name'),
common_name=data.get('common_name'),
email_address=data.get('email_address'),
public_key='', # 第二步提交
csr_content=data.get('csr_content') # 如果上传了CSR
)
db.session.add(cert_request)
db.session.commit()
return jsonify({
'code': 200,
'message': '基本信息提交成功,请继续提交公钥',
'data': {
'request_id': cert_request.id
}
}), 200
@cert_bp.route('/request/upload-csr', methods=['POST'])
@login_required
def upload_csr():
"""上传CSR文件"""
user_id = request.user_id
if 'file' not in request.files:
return jsonify({'code': 400, 'message': '未上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'code': 400, 'message': '未选择文件'}), 400
if not file.filename.endswith('.csr'):
return jsonify({'code': 400, 'message': '文件格式错误,请上传.csr文件'}), 400
try:
csr_pem = file.read().decode('utf-8')
data, public_key = parse_csr(csr_pem)
# 检查是否有待审核的请求
existing_request = CARequest.query.filter_by(
user_id=user_id,
state=1,
deleted_at=None
).order_by(CARequest.created_at.desc()).first()
if existing_request:
# 更新现有请求
existing_request.csr_content = csr_pem
existing_request.public_key = public_key
existing_request.country = data.get('country')
existing_request.province = data.get('province')
existing_request.locality = data.get('locality')
existing_request.organization = data.get('organization')
existing_request.organization_unit_name = data.get('organization_unit_name')
existing_request.common_name = data.get('common_name')
existing_request.email_address = data.get('email_address')
existing_request.state = 1 # 待审核
else:
# 创建新请求
existing_request = CARequest(
user_id=user_id,
state=1,
csr_content=csr_pem,
public_key=public_key,
country=data.get('country'),
province=data.get('province'),
locality=data.get('locality'),
organization=data.get('organization'),
organization_unit_name=data.get('organization_unit_name'),
common_name=data.get('common_name'),
email_address=data.get('email_address')
)
db.session.add(existing_request)
db.session.commit()
return jsonify({
'code': 200,
'message': 'CSR文件上传成功等待审核',
'data': {
'request_id': existing_request.id,
'parsed_data': data
}
}), 200
except Exception as e:
return jsonify({'code': 500, 'message': f'解析CSR失败: {str(e)}'}), 500
@cert_bp.route('/request/submit-key', methods=['POST'])
@login_required
def submit_key():
"""提交公钥(第二步)"""
user_id = request.user_id
data = request.get_json()
request_id = data.get('request_id')
public_key = data.get('public_key')
private_key = data.get('private_key') # 可选,如果自动生成
if not request_id:
return jsonify({'code': 400, 'message': '请求ID不能为空'}), 400
cert_request = CARequest.query.filter_by(
id=request_id,
user_id=user_id,
deleted_at=None
).first()
if not cert_request:
return jsonify({'code': 404, 'message': '证书请求不存在'}), 404
if cert_request.state != 1:
return jsonify({'code': 400, 'message': '该请求已处理,无法修改'}), 400
# 如果提供了私钥创建CSR
if private_key and not cert_request.csr_content:
try:
csr, _ = create_csr_from_data({
'private_key': private_key,
'country': cert_request.country,
'province': cert_request.province,
'locality': cert_request.locality,
'organization': cert_request.organization,
'organization_unit_name': cert_request.organization_unit_name,
'common_name': cert_request.common_name,
'email_address': cert_request.email_address
})
cert_request.csr_content = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8')
except Exception as e:
return jsonify({'code': 500, 'message': f'创建CSR失败: {str(e)}'}), 500
# 更新公钥
if public_key:
cert_request.public_key = public_key
cert_request.state = 1 # 保持待审核状态
db.session.commit()
return jsonify({
'code': 200,
'message': '公钥提交成功,等待审核'
}), 200
@cert_bp.route('/generate-keypair', methods=['POST'])
@login_required
def generate_keypair():
"""自动生成密钥对"""
try:
private_key, public_key = generate_key_pair()
return jsonify({
'code': 200,
'data': {
'private_key': private_key,
'public_key': public_key
}
}), 200
except Exception as e:
return jsonify({'code': 500, 'message': f'生成密钥对失败: {str(e)}'}), 500
@cert_bp.route('/request/status', methods=['GET'])
@login_required
def get_request_status():
"""获取证书申请状态"""
user_id = request.user_id
requests = CARequest.query.filter_by(
user_id=user_id,
deleted_at=None
).order_by(CARequest.created_at.desc()).all()
request_list = [req.to_dict() for req in requests]
return jsonify({
'code': 200,
'data': request_list
}), 200