You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
数字信封协议模块
负责封装和解析数字信封格式
数字信封格式v2.0 - 包含数字签名):
{
"version": "2.0",
"algorithm": "AES-256-CBC/GCM/CTR",
"encrypted_key": "base64编码的RSA加密的AES密钥",
"iv": "base64编码的初始化向量",
"encrypted_data": "base64编码的加密文件内容",
"digest": "base64编码的SHA256文件摘要",
"filename": "原始文件名",
"filesize": 原始文件大小(字节),
"sender_id": "发送方身份标识",
"signature": "base64编码的发送方数字签名使用Encrypt-then-Sign模式"
}
安全说明:
- 所有数字信封必须包含签名v2.0强制要求)
- 不再在信封中包含sender_public_key字段防止中间人攻击
- 接收方必须使用本地预存的受信任公钥验证签名
- 签名基于加密数据的哈希Encrypt-then-Sign
"""
import json
import base64
import hashlib
import os
from typing import Dict, Any, Tuple
from cryptography.hazmat.primitives.asymmetric import rsa
from . import crypto as crypto_mod
import utils
def compute_file_digest(filepath: str, algorithm: str = 'sha256') -> bytes:
"""
计算文件的哈希摘要
Args:
filepath: 文件路径
algorithm: 哈希算法sha256/sha512
Returns:
bytes: 文件摘要
"""
if algorithm.lower() == 'sha256':
hash_func = hashlib.sha256()
elif algorithm.lower() == 'sha512':
hash_func = hashlib.sha512()
else:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
with open(filepath, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
hash_func.update(chunk)
return hash_func.digest()
def verify_file_digest(filepath: str, expected_digest: bytes, algorithm: str = 'sha256') -> bool:
"""
验证文件摘要
Args:
filepath: 文件路径
expected_digest: 期望的摘要值
algorithm: 哈希算法
Returns:
bool: 验证是否通过
"""
actual_digest = compute_file_digest(filepath, algorithm)
return actual_digest == expected_digest
def create_digital_envelope(
file_path: str,
recipient_public_key_path: str,
sender_private_key_path: str,
sender_id: str,
algorithm: str = 'AES-256-GCM'
) -> Dict[str, Any]:
"""
创建数字信封v2.0 - 强制包含数字签名)
安全说明:
- 所有数字信封必须包含发送方的数字签名
- 使用 Encrypt-then-Sign 模式(先加密,后签名)
- 接收方必须使用本地预存的公钥验证签名
Args:
file_path: 要加密的文件路径
recipient_public_key_path: 接收方公钥PEM文件路径
sender_private_key_path: 发送方私钥PEM文件路径用于签名
sender_id: 发送方身份标识(如用户名)
algorithm: 对称加密算法AES-256-CBC/AES-256-GCM/AES-256-CTR
Returns:
dict: 数字信封字典v2.0格式,包含签名)
"""
# 1. 读取原始文件
file_data = utils.read_file_bytes(file_path)
file_size = len(file_data)
filename = utils.get_filename_without_ext(file_path) + utils.get_file_extension(file_path)
# 2. 计算文件摘要(用于完整性校验)
digest = hashlib.sha256(file_data).digest()
# 3. 生成随机AES密钥和IV
aes_key = crypto_mod.generate_aes_key(256)
iv = crypto_mod.generate_iv(16)
# 4. 使用AES加密文件内容
if algorithm == 'AES-256-CBC':
encrypted_data = crypto_mod.aes_encrypt_cbc(file_data, aes_key, iv)
tag = None
elif algorithm == 'AES-256-GCM':
encrypted_data, tag = crypto_mod.aes_encrypt_gcm(file_data, aes_key, iv)
elif algorithm == 'AES-256-CTR':
encrypted_data = crypto_mod.aes_encrypt_ctr(file_data, aes_key, iv)
tag = None
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# 5. 加载接收方公钥
recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path)
# 6. 使用接收方RSA公钥加密AES密钥
encrypted_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key)
# 7. 加载发送方私钥并签名Encrypt-then-Sign
sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path)
data_to_sign = hashlib.sha256(encrypted_data).digest()
signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key)
# 8. 构建数字信封v2.0格式)
envelope = {
"version": "2.0",
"algorithm": algorithm,
"encrypted_key": base64.b64encode(encrypted_key).decode('utf-8'),
"iv": base64.b64encode(iv).decode('utf-8'),
"encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'),
"digest": base64.b64encode(digest).decode('utf-8'),
"filename": filename,
"filesize": file_size,
"sender_id": sender_id,
"signature": base64.b64encode(signature).decode('utf-8')
}
if tag is not None:
envelope["tag"] = base64.b64encode(tag).decode('utf-8')
return envelope
def parse_digital_envelope(
envelope_data: Dict[str, Any],
recipient_private_key_path: str,
sender_public_key_path: str,
output_path: str
) -> Tuple[bool, str]:
"""
解析数字信封、验证签名并解密文件
安全说明:
- sender_public_key_path 必须是本地预存的受信任公钥
- 绝不能使用信封内携带的公钥(可被中间人伪造)
- 所有数字信封必须包含签名,否则拒绝解析
Args:
envelope_data: 数字信封字典
recipient_private_key_path: 接收方私钥PEM文件路径
sender_public_key_path: 发送方公钥PEM文件路径必填必须是本地受信任的公钥
output_path: 解密后的输出文件路径
Returns:
Tuple[bool, str]: (是否成功, 验证信息)
"""
try:
# 1. 解析数字信封字段
algorithm = envelope_data.get("algorithm")
encrypted_key = base64.b64decode(envelope_data["encrypted_key"])
iv = base64.b64decode(envelope_data["iv"])
encrypted_data = base64.b64decode(envelope_data["encrypted_data"])
expected_digest = base64.b64decode(envelope_data["digest"])
tag = None
if "tag" in envelope_data:
tag = base64.b64decode(envelope_data["tag"])
# 2. 加载接收方私钥
private_key = crypto_mod.load_private_key_from_pem(recipient_private_key_path)
# 3. 使用RSA私钥解密AES密钥
aes_key = crypto_mod.rsa_decrypt(encrypted_key, private_key)
# 4. 使用AES密钥解密文件内容
if algorithm == 'AES-256-CBC':
decrypted_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv)
elif algorithm == 'AES-256-GCM':
if tag is None:
raise ValueError("GCM tag is missing")
decrypted_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag)
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# 5. 验证文件摘要
actual_digest = hashlib.sha256(decrypted_data).digest()
if actual_digest != expected_digest:
raise ValueError("File digest verification failed")
# 6. 强制验证数字签名(安全要求:所有信封必须包含签名)
if "signature" not in envelope_data:
return False, "🔒 安全性错误数字信封缺失签名拒绝解析可能是v1.0格式或被篡改)"
signature = base64.b64decode(envelope_data["signature"])
sender_id = envelope_data.get("sender_id", "未知")
# 始终从本地加载受信任的公钥(绝不使用信封内的公钥!)
sender_public_key = crypto_mod.load_public_key_from_pem(sender_public_key_path)
# 对加密数据的哈希进行签名验证Encrypt-then-Sign
data_to_verify = hashlib.sha256(encrypted_data).digest()
is_valid = crypto_mod.rsa_verify(data_to_verify, signature, sender_public_key)
if not is_valid:
return False, f"⚠️ 警告:数字签名验证失败!文件可能被篡改或身份伪造(声称发送方: {sender_id}"
verify_msg = f"✓ 签名验证成功 (发送方: {sender_id})"
# 7. 保存解密后的文件
utils.write_file_bytes(output_path, decrypted_data)
return True, verify_msg
except Exception as e:
print(f"Error parsing digital envelope: {e}")
return False, str(e)
def save_envelope_to_file(envelope: Dict[str, Any], output_path: str):
"""
保存数字信封到JSON文件
Args:
envelope: 数字信封字典
output_path: 输出文件路径
"""
envelope_json = json.dumps(envelope, indent=2, ensure_ascii=False)
utils.write_file_text(output_path, envelope_json)
def load_envelope_from_file(envelope_path: str) -> Dict[str, Any]:
"""
从JSON文件加载数字信封
Args:
envelope_path: 数字信封文件路径
Returns:
dict: 数字信封字典
"""
envelope_json = utils.read_file_text(envelope_path)
return json.loads(envelope_json)
def create_digital_envelope_from_encrypted_file(
encrypted_file_path: str,
sender_id: str,
sender_private_key_path: str,
recipient_public_key_path: str
) -> Dict[str, Any]:
"""
从加密文件(.enc)创建数字信封,并添加发送方数字签名
适用场景:用户先用【加密/解密】页面用AES加密文件
然后用【数字信封】页面选择.enc文件添加签名和接收方公钥加密
最后通过P2P发送。
Args:
encrypted_file_path: .enc文件路径格式模式|密钥|IV|TAG?|密文)
sender_id: 发送方身份标识(如用户名)
sender_private_key_path: 发送方RSA私钥路径用于签名
recipient_public_key_path: 接收方RSA公钥路径用于加密AES密钥
Returns:
dict: 数字信封字典(包含签名)
"""
# 1. 读取.enc文件
with open(encrypted_file_path, 'rb') as f:
enc_data = f.read()
if len(enc_data) < 50:
raise ValueError("加密文件格式不正确")
# 2. 解析.enc文件格式
mode_byte = enc_data[0:1]
mode_map = {b'\x01': 'AES-256-CBC', b'\x02': 'AES-256-GCM', b'\x03': 'AES-256-CTR'}
if mode_byte not in mode_map:
raise ValueError(f"未知的加密模式标识: {mode_byte.hex()}")
algorithm = mode_map[mode_byte]
aes_key = enc_data[1:33]
iv = enc_data[33:49]
tag = None
if algorithm == 'AES-256-GCM':
tag = enc_data[49:65]
encrypted_data = enc_data[65:]
else:
encrypted_data = enc_data[49:]
# 3. 解密原文件以计算正确的digest和filesize
if algorithm == 'AES-256-CBC':
original_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv)
elif algorithm == 'AES-256-GCM':
original_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag)
elif algorithm == 'AES-256-CTR':
original_data = crypto_mod.aes_decrypt_ctr(encrypted_data, aes_key, iv)
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# 4. 计算原始文件的digest和大小
filename = os.path.basename(encrypted_file_path).replace('.enc', '')
filesize = len(original_data) # 原始文件大小
digest = hashlib.sha256(original_data).digest() # 原始文件的摘要
# 5. 加载发送方私钥
sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path)
# 6. 加载接收方公钥
recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path)
# 7. 用接收方公钥加密 AES 密钥(保护对称密钥)
encrypted_aes_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key)
# 8. 构建待签名的内容encrypted_data的哈希Encrypt-then-Sign
data_to_sign = hashlib.sha256(encrypted_data).digest()
# 9. 使用发送方私钥签名
signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key)
# 10. 构建数字信封v2.0格式不包含sender_public_key字段
envelope = {
"version": "2.0",
"algorithm": algorithm,
"encrypted_key": base64.b64encode(encrypted_aes_key).decode('utf-8'),
"iv": base64.b64encode(iv).decode('utf-8'),
"encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'),
"digest": base64.b64encode(digest).decode('utf-8'), # 原始文件的摘要
"filename": filename,
"filesize": filesize,
"sender_id": sender_id,
"signature": base64.b64encode(signature).decode('utf-8')
}
if algorithm == 'AES-256-GCM':
envelope["tag"] = base64.b64encode(tag).decode('utf-8')
return envelope