|
|
"""
|
|
|
数字信封协议模块
|
|
|
负责封装和解析数字信封格式
|
|
|
|
|
|
数字信封格式(v2.0 - 包含数字签名):
|
|
|
{
|
|
|
"version": "2.0",
|
|
|
"algorithm": "AES-256-CBC/GCM/CTR",
|
|
|
"encrypted_key": "base64编码的RSA加密的AES密钥",
|
|
|
"iv": "base64编码的初始化向量",
|
|
|
"encrypted_data": "base64编码的加密文件内容",
|
|
|
"digest": "base64编码的SHA256文件摘要",
|
|
|
"filename": "原始文件名",
|
|
|
"filesize": 原始文件大小(字节),
|
|
|
"sender_id": "发送方身份标识",
|
|
|
"signature": "base64编码的发送方数字签名(使用Encrypt-then-Sign模式)"
|
|
|
}
|
|
|
|
|
|
安全说明:
|
|
|
- 所有数字信封必须包含签名(v2.0强制要求)
|
|
|
- 不再在信封中包含sender_public_key字段(防止中间人攻击)
|
|
|
- 接收方必须使用本地预存的受信任公钥验证签名
|
|
|
- 签名基于加密数据的哈希(Encrypt-then-Sign)
|
|
|
"""
|
|
|
import json
|
|
|
import base64
|
|
|
import hashlib
|
|
|
import os
|
|
|
from typing import Dict, Any, Tuple
|
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
|
|
from . import crypto as crypto_mod
|
|
|
import utils
|
|
|
|
|
|
|
|
|
def compute_file_digest(filepath: str, algorithm: str = 'sha256') -> bytes:
|
|
|
"""
|
|
|
计算文件的哈希摘要
|
|
|
|
|
|
Args:
|
|
|
filepath: 文件路径
|
|
|
algorithm: 哈希算法(sha256/sha512)
|
|
|
|
|
|
Returns:
|
|
|
bytes: 文件摘要
|
|
|
"""
|
|
|
if algorithm.lower() == 'sha256':
|
|
|
hash_func = hashlib.sha256()
|
|
|
elif algorithm.lower() == 'sha512':
|
|
|
hash_func = hashlib.sha512()
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported hash algorithm: {algorithm}")
|
|
|
|
|
|
with open(filepath, 'rb') as f:
|
|
|
while True:
|
|
|
chunk = f.read(8192)
|
|
|
if not chunk:
|
|
|
break
|
|
|
hash_func.update(chunk)
|
|
|
|
|
|
return hash_func.digest()
|
|
|
|
|
|
|
|
|
def verify_file_digest(filepath: str, expected_digest: bytes, algorithm: str = 'sha256') -> bool:
|
|
|
"""
|
|
|
验证文件摘要
|
|
|
|
|
|
Args:
|
|
|
filepath: 文件路径
|
|
|
expected_digest: 期望的摘要值
|
|
|
algorithm: 哈希算法
|
|
|
|
|
|
Returns:
|
|
|
bool: 验证是否通过
|
|
|
"""
|
|
|
actual_digest = compute_file_digest(filepath, algorithm)
|
|
|
return actual_digest == expected_digest
|
|
|
|
|
|
|
|
|
def create_digital_envelope(
|
|
|
file_path: str,
|
|
|
recipient_public_key_path: str,
|
|
|
sender_private_key_path: str,
|
|
|
sender_id: str,
|
|
|
algorithm: str = 'AES-256-GCM'
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
创建数字信封(v2.0 - 强制包含数字签名)
|
|
|
|
|
|
安全说明:
|
|
|
- 所有数字信封必须包含发送方的数字签名
|
|
|
- 使用 Encrypt-then-Sign 模式(先加密,后签名)
|
|
|
- 接收方必须使用本地预存的公钥验证签名
|
|
|
|
|
|
Args:
|
|
|
file_path: 要加密的文件路径
|
|
|
recipient_public_key_path: 接收方公钥PEM文件路径
|
|
|
sender_private_key_path: 发送方私钥PEM文件路径(用于签名)
|
|
|
sender_id: 发送方身份标识(如用户名)
|
|
|
algorithm: 对称加密算法(AES-256-CBC/AES-256-GCM/AES-256-CTR)
|
|
|
|
|
|
Returns:
|
|
|
dict: 数字信封字典(v2.0格式,包含签名)
|
|
|
"""
|
|
|
# 1. 读取原始文件
|
|
|
file_data = utils.read_file_bytes(file_path)
|
|
|
file_size = len(file_data)
|
|
|
filename = utils.get_filename_without_ext(file_path) + utils.get_file_extension(file_path)
|
|
|
|
|
|
# 2. 计算文件摘要(用于完整性校验)
|
|
|
digest = hashlib.sha256(file_data).digest()
|
|
|
|
|
|
# 3. 生成随机AES密钥和IV
|
|
|
aes_key = crypto_mod.generate_aes_key(256)
|
|
|
iv = crypto_mod.generate_iv(16)
|
|
|
|
|
|
# 4. 使用AES加密文件内容
|
|
|
if algorithm == 'AES-256-CBC':
|
|
|
encrypted_data = crypto_mod.aes_encrypt_cbc(file_data, aes_key, iv)
|
|
|
tag = None
|
|
|
elif algorithm == 'AES-256-GCM':
|
|
|
encrypted_data, tag = crypto_mod.aes_encrypt_gcm(file_data, aes_key, iv)
|
|
|
elif algorithm == 'AES-256-CTR':
|
|
|
encrypted_data = crypto_mod.aes_encrypt_ctr(file_data, aes_key, iv)
|
|
|
tag = None
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported algorithm: {algorithm}")
|
|
|
|
|
|
# 5. 加载接收方公钥
|
|
|
recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path)
|
|
|
|
|
|
# 6. 使用接收方RSA公钥加密AES密钥
|
|
|
encrypted_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key)
|
|
|
|
|
|
# 7. 加载发送方私钥并签名(Encrypt-then-Sign)
|
|
|
sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path)
|
|
|
data_to_sign = hashlib.sha256(encrypted_data).digest()
|
|
|
signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key)
|
|
|
|
|
|
# 8. 构建数字信封(v2.0格式)
|
|
|
envelope = {
|
|
|
"version": "2.0",
|
|
|
"algorithm": algorithm,
|
|
|
"encrypted_key": base64.b64encode(encrypted_key).decode('utf-8'),
|
|
|
"iv": base64.b64encode(iv).decode('utf-8'),
|
|
|
"encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'),
|
|
|
"digest": base64.b64encode(digest).decode('utf-8'),
|
|
|
"filename": filename,
|
|
|
"filesize": file_size,
|
|
|
"sender_id": sender_id,
|
|
|
"signature": base64.b64encode(signature).decode('utf-8')
|
|
|
}
|
|
|
|
|
|
if tag is not None:
|
|
|
envelope["tag"] = base64.b64encode(tag).decode('utf-8')
|
|
|
|
|
|
return envelope
|
|
|
|
|
|
|
|
|
def parse_digital_envelope(
|
|
|
envelope_data: Dict[str, Any],
|
|
|
recipient_private_key_path: str,
|
|
|
sender_public_key_path: str,
|
|
|
output_path: str
|
|
|
) -> Tuple[bool, str]:
|
|
|
"""
|
|
|
解析数字信封、验证签名并解密文件
|
|
|
|
|
|
安全说明:
|
|
|
- sender_public_key_path 必须是本地预存的受信任公钥
|
|
|
- 绝不能使用信封内携带的公钥(可被中间人伪造)
|
|
|
- 所有数字信封必须包含签名,否则拒绝解析
|
|
|
|
|
|
Args:
|
|
|
envelope_data: 数字信封字典
|
|
|
recipient_private_key_path: 接收方私钥PEM文件路径
|
|
|
sender_public_key_path: 发送方公钥PEM文件路径(必填,必须是本地受信任的公钥)
|
|
|
output_path: 解密后的输出文件路径
|
|
|
|
|
|
Returns:
|
|
|
Tuple[bool, str]: (是否成功, 验证信息)
|
|
|
"""
|
|
|
try:
|
|
|
# 1. 解析数字信封字段
|
|
|
algorithm = envelope_data.get("algorithm")
|
|
|
encrypted_key = base64.b64decode(envelope_data["encrypted_key"])
|
|
|
iv = base64.b64decode(envelope_data["iv"])
|
|
|
encrypted_data = base64.b64decode(envelope_data["encrypted_data"])
|
|
|
expected_digest = base64.b64decode(envelope_data["digest"])
|
|
|
|
|
|
tag = None
|
|
|
if "tag" in envelope_data:
|
|
|
tag = base64.b64decode(envelope_data["tag"])
|
|
|
|
|
|
# 2. 加载接收方私钥
|
|
|
private_key = crypto_mod.load_private_key_from_pem(recipient_private_key_path)
|
|
|
|
|
|
# 3. 使用RSA私钥解密AES密钥
|
|
|
aes_key = crypto_mod.rsa_decrypt(encrypted_key, private_key)
|
|
|
|
|
|
# 4. 使用AES密钥解密文件内容
|
|
|
if algorithm == 'AES-256-CBC':
|
|
|
decrypted_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv)
|
|
|
elif algorithm == 'AES-256-GCM':
|
|
|
if tag is None:
|
|
|
raise ValueError("GCM tag is missing")
|
|
|
decrypted_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag)
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported algorithm: {algorithm}")
|
|
|
|
|
|
# 5. 验证文件摘要
|
|
|
actual_digest = hashlib.sha256(decrypted_data).digest()
|
|
|
if actual_digest != expected_digest:
|
|
|
raise ValueError("File digest verification failed")
|
|
|
|
|
|
# 6. 强制验证数字签名(安全要求:所有信封必须包含签名)
|
|
|
if "signature" not in envelope_data:
|
|
|
return False, "🔒 安全性错误:数字信封缺失签名,拒绝解析(可能是v1.0格式或被篡改)"
|
|
|
|
|
|
signature = base64.b64decode(envelope_data["signature"])
|
|
|
sender_id = envelope_data.get("sender_id", "未知")
|
|
|
|
|
|
# 始终从本地加载受信任的公钥(绝不使用信封内的公钥!)
|
|
|
sender_public_key = crypto_mod.load_public_key_from_pem(sender_public_key_path)
|
|
|
|
|
|
# 对加密数据的哈希进行签名验证(Encrypt-then-Sign)
|
|
|
data_to_verify = hashlib.sha256(encrypted_data).digest()
|
|
|
|
|
|
is_valid = crypto_mod.rsa_verify(data_to_verify, signature, sender_public_key)
|
|
|
|
|
|
if not is_valid:
|
|
|
return False, f"⚠️ 警告:数字签名验证失败!文件可能被篡改或身份伪造(声称发送方: {sender_id})"
|
|
|
|
|
|
verify_msg = f"✓ 签名验证成功 (发送方: {sender_id})"
|
|
|
|
|
|
# 7. 保存解密后的文件
|
|
|
utils.write_file_bytes(output_path, decrypted_data)
|
|
|
|
|
|
return True, verify_msg
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error parsing digital envelope: {e}")
|
|
|
return False, str(e)
|
|
|
|
|
|
|
|
|
def save_envelope_to_file(envelope: Dict[str, Any], output_path: str):
|
|
|
"""
|
|
|
保存数字信封到JSON文件
|
|
|
|
|
|
Args:
|
|
|
envelope: 数字信封字典
|
|
|
output_path: 输出文件路径
|
|
|
"""
|
|
|
envelope_json = json.dumps(envelope, indent=2, ensure_ascii=False)
|
|
|
utils.write_file_text(output_path, envelope_json)
|
|
|
|
|
|
|
|
|
def load_envelope_from_file(envelope_path: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
从JSON文件加载数字信封
|
|
|
|
|
|
Args:
|
|
|
envelope_path: 数字信封文件路径
|
|
|
|
|
|
Returns:
|
|
|
dict: 数字信封字典
|
|
|
"""
|
|
|
envelope_json = utils.read_file_text(envelope_path)
|
|
|
return json.loads(envelope_json)
|
|
|
|
|
|
|
|
|
def create_digital_envelope_from_encrypted_file(
|
|
|
encrypted_file_path: str,
|
|
|
sender_id: str,
|
|
|
sender_private_key_path: str,
|
|
|
recipient_public_key_path: str
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
从加密文件(.enc)创建数字信封,并添加发送方数字签名
|
|
|
|
|
|
适用场景:用户先用【加密/解密】页面用AES加密文件,
|
|
|
然后用【数字信封】页面选择.enc文件添加签名和接收方公钥加密,
|
|
|
最后通过P2P发送。
|
|
|
|
|
|
Args:
|
|
|
encrypted_file_path: .enc文件路径(格式:模式|密钥|IV|TAG?|密文)
|
|
|
sender_id: 发送方身份标识(如用户名)
|
|
|
sender_private_key_path: 发送方RSA私钥路径(用于签名)
|
|
|
recipient_public_key_path: 接收方RSA公钥路径(用于加密AES密钥)
|
|
|
|
|
|
Returns:
|
|
|
dict: 数字信封字典(包含签名)
|
|
|
"""
|
|
|
# 1. 读取.enc文件
|
|
|
with open(encrypted_file_path, 'rb') as f:
|
|
|
enc_data = f.read()
|
|
|
|
|
|
if len(enc_data) < 50:
|
|
|
raise ValueError("加密文件格式不正确")
|
|
|
|
|
|
# 2. 解析.enc文件格式
|
|
|
mode_byte = enc_data[0:1]
|
|
|
mode_map = {b'\x01': 'AES-256-CBC', b'\x02': 'AES-256-GCM', b'\x03': 'AES-256-CTR'}
|
|
|
|
|
|
if mode_byte not in mode_map:
|
|
|
raise ValueError(f"未知的加密模式标识: {mode_byte.hex()}")
|
|
|
|
|
|
algorithm = mode_map[mode_byte]
|
|
|
aes_key = enc_data[1:33]
|
|
|
iv = enc_data[33:49]
|
|
|
|
|
|
tag = None
|
|
|
if algorithm == 'AES-256-GCM':
|
|
|
tag = enc_data[49:65]
|
|
|
encrypted_data = enc_data[65:]
|
|
|
else:
|
|
|
encrypted_data = enc_data[49:]
|
|
|
|
|
|
# 3. 解密原文件以计算正确的digest和filesize
|
|
|
if algorithm == 'AES-256-CBC':
|
|
|
original_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv)
|
|
|
elif algorithm == 'AES-256-GCM':
|
|
|
original_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag)
|
|
|
elif algorithm == 'AES-256-CTR':
|
|
|
original_data = crypto_mod.aes_decrypt_ctr(encrypted_data, aes_key, iv)
|
|
|
else:
|
|
|
raise ValueError(f"Unsupported algorithm: {algorithm}")
|
|
|
|
|
|
# 4. 计算原始文件的digest和大小
|
|
|
filename = os.path.basename(encrypted_file_path).replace('.enc', '')
|
|
|
filesize = len(original_data) # 原始文件大小
|
|
|
digest = hashlib.sha256(original_data).digest() # 原始文件的摘要
|
|
|
|
|
|
# 5. 加载发送方私钥
|
|
|
sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path)
|
|
|
|
|
|
# 6. 加载接收方公钥
|
|
|
recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path)
|
|
|
|
|
|
# 7. 用接收方公钥加密 AES 密钥(保护对称密钥)
|
|
|
encrypted_aes_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key)
|
|
|
|
|
|
# 8. 构建待签名的内容(encrypted_data的哈希,Encrypt-then-Sign)
|
|
|
data_to_sign = hashlib.sha256(encrypted_data).digest()
|
|
|
|
|
|
# 9. 使用发送方私钥签名
|
|
|
signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key)
|
|
|
|
|
|
# 10. 构建数字信封(v2.0格式,不包含sender_public_key字段)
|
|
|
envelope = {
|
|
|
"version": "2.0",
|
|
|
"algorithm": algorithm,
|
|
|
"encrypted_key": base64.b64encode(encrypted_aes_key).decode('utf-8'),
|
|
|
"iv": base64.b64encode(iv).decode('utf-8'),
|
|
|
"encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'),
|
|
|
"digest": base64.b64encode(digest).decode('utf-8'), # 原始文件的摘要
|
|
|
"filename": filename,
|
|
|
"filesize": filesize,
|
|
|
"sender_id": sender_id,
|
|
|
"signature": base64.b64encode(signature).decode('utf-8')
|
|
|
}
|
|
|
|
|
|
if algorithm == 'AES-256-GCM':
|
|
|
envelope["tag"] = base64.b64encode(tag).decode('utf-8')
|
|
|
|
|
|
return envelope
|