""" 数字信封协议模块 负责封装和解析数字信封格式 数字信封格式(v2.0 - 包含数字签名): { "version": "2.0", "algorithm": "AES-256-CBC/GCM/CTR", "encrypted_key": "base64编码的RSA加密的AES密钥", "iv": "base64编码的初始化向量", "encrypted_data": "base64编码的加密文件内容", "digest": "base64编码的SHA256文件摘要", "filename": "原始文件名", "filesize": 原始文件大小(字节), "sender_id": "发送方身份标识", "signature": "base64编码的发送方数字签名(使用Encrypt-then-Sign模式)" } 安全说明: - 所有数字信封必须包含签名(v2.0强制要求) - 不再在信封中包含sender_public_key字段(防止中间人攻击) - 接收方必须使用本地预存的受信任公钥验证签名 - 签名基于加密数据的哈希(Encrypt-then-Sign) """ import json import base64 import hashlib import os from typing import Dict, Any, Tuple from cryptography.hazmat.primitives.asymmetric import rsa from . import crypto as crypto_mod import utils def compute_file_digest(filepath: str, algorithm: str = 'sha256') -> bytes: """ 计算文件的哈希摘要 Args: filepath: 文件路径 algorithm: 哈希算法(sha256/sha512) Returns: bytes: 文件摘要 """ if algorithm.lower() == 'sha256': hash_func = hashlib.sha256() elif algorithm.lower() == 'sha512': hash_func = hashlib.sha512() else: raise ValueError(f"Unsupported hash algorithm: {algorithm}") with open(filepath, 'rb') as f: while True: chunk = f.read(8192) if not chunk: break hash_func.update(chunk) return hash_func.digest() def verify_file_digest(filepath: str, expected_digest: bytes, algorithm: str = 'sha256') -> bool: """ 验证文件摘要 Args: filepath: 文件路径 expected_digest: 期望的摘要值 algorithm: 哈希算法 Returns: bool: 验证是否通过 """ actual_digest = compute_file_digest(filepath, algorithm) return actual_digest == expected_digest def create_digital_envelope( file_path: str, recipient_public_key_path: str, sender_private_key_path: str, sender_id: str, algorithm: str = 'AES-256-GCM' ) -> Dict[str, Any]: """ 创建数字信封(v2.0 - 强制包含数字签名) 安全说明: - 所有数字信封必须包含发送方的数字签名 - 使用 Encrypt-then-Sign 模式(先加密,后签名) - 接收方必须使用本地预存的公钥验证签名 Args: file_path: 要加密的文件路径 recipient_public_key_path: 接收方公钥PEM文件路径 sender_private_key_path: 发送方私钥PEM文件路径(用于签名) sender_id: 发送方身份标识(如用户名) algorithm: 对称加密算法(AES-256-CBC/AES-256-GCM/AES-256-CTR) Returns: dict: 数字信封字典(v2.0格式,包含签名) """ # 1. 读取原始文件 file_data = utils.read_file_bytes(file_path) file_size = len(file_data) filename = utils.get_filename_without_ext(file_path) + utils.get_file_extension(file_path) # 2. 计算文件摘要(用于完整性校验) digest = hashlib.sha256(file_data).digest() # 3. 生成随机AES密钥和IV aes_key = crypto_mod.generate_aes_key(256) iv = crypto_mod.generate_iv(16) # 4. 使用AES加密文件内容 if algorithm == 'AES-256-CBC': encrypted_data = crypto_mod.aes_encrypt_cbc(file_data, aes_key, iv) tag = None elif algorithm == 'AES-256-GCM': encrypted_data, tag = crypto_mod.aes_encrypt_gcm(file_data, aes_key, iv) elif algorithm == 'AES-256-CTR': encrypted_data = crypto_mod.aes_encrypt_ctr(file_data, aes_key, iv) tag = None else: raise ValueError(f"Unsupported algorithm: {algorithm}") # 5. 加载接收方公钥 recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path) # 6. 使用接收方RSA公钥加密AES密钥 encrypted_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key) # 7. 加载发送方私钥并签名(Encrypt-then-Sign) sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path) data_to_sign = hashlib.sha256(encrypted_data).digest() signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key) # 8. 构建数字信封(v2.0格式) envelope = { "version": "2.0", "algorithm": algorithm, "encrypted_key": base64.b64encode(encrypted_key).decode('utf-8'), "iv": base64.b64encode(iv).decode('utf-8'), "encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'), "digest": base64.b64encode(digest).decode('utf-8'), "filename": filename, "filesize": file_size, "sender_id": sender_id, "signature": base64.b64encode(signature).decode('utf-8') } if tag is not None: envelope["tag"] = base64.b64encode(tag).decode('utf-8') return envelope def parse_digital_envelope( envelope_data: Dict[str, Any], recipient_private_key_path: str, sender_public_key_path: str, output_path: str ) -> Tuple[bool, str]: """ 解析数字信封、验证签名并解密文件 安全说明: - sender_public_key_path 必须是本地预存的受信任公钥 - 绝不能使用信封内携带的公钥(可被中间人伪造) - 所有数字信封必须包含签名,否则拒绝解析 Args: envelope_data: 数字信封字典 recipient_private_key_path: 接收方私钥PEM文件路径 sender_public_key_path: 发送方公钥PEM文件路径(必填,必须是本地受信任的公钥) output_path: 解密后的输出文件路径 Returns: Tuple[bool, str]: (是否成功, 验证信息) """ try: # 1. 解析数字信封字段 algorithm = envelope_data.get("algorithm") encrypted_key = base64.b64decode(envelope_data["encrypted_key"]) iv = base64.b64decode(envelope_data["iv"]) encrypted_data = base64.b64decode(envelope_data["encrypted_data"]) expected_digest = base64.b64decode(envelope_data["digest"]) tag = None if "tag" in envelope_data: tag = base64.b64decode(envelope_data["tag"]) # 2. 加载接收方私钥 private_key = crypto_mod.load_private_key_from_pem(recipient_private_key_path) # 3. 使用RSA私钥解密AES密钥 aes_key = crypto_mod.rsa_decrypt(encrypted_key, private_key) # 4. 使用AES密钥解密文件内容 if algorithm == 'AES-256-CBC': decrypted_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv) elif algorithm == 'AES-256-GCM': if tag is None: raise ValueError("GCM tag is missing") decrypted_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag) else: raise ValueError(f"Unsupported algorithm: {algorithm}") # 5. 验证文件摘要 actual_digest = hashlib.sha256(decrypted_data).digest() if actual_digest != expected_digest: raise ValueError("File digest verification failed") # 6. 强制验证数字签名(安全要求:所有信封必须包含签名) if "signature" not in envelope_data: return False, "🔒 安全性错误:数字信封缺失签名,拒绝解析(可能是v1.0格式或被篡改)" signature = base64.b64decode(envelope_data["signature"]) sender_id = envelope_data.get("sender_id", "未知") # 始终从本地加载受信任的公钥(绝不使用信封内的公钥!) sender_public_key = crypto_mod.load_public_key_from_pem(sender_public_key_path) # 对加密数据的哈希进行签名验证(Encrypt-then-Sign) data_to_verify = hashlib.sha256(encrypted_data).digest() is_valid = crypto_mod.rsa_verify(data_to_verify, signature, sender_public_key) if not is_valid: return False, f"⚠️ 警告:数字签名验证失败!文件可能被篡改或身份伪造(声称发送方: {sender_id})" verify_msg = f"✓ 签名验证成功 (发送方: {sender_id})" # 7. 保存解密后的文件 utils.write_file_bytes(output_path, decrypted_data) return True, verify_msg except Exception as e: print(f"Error parsing digital envelope: {e}") return False, str(e) def save_envelope_to_file(envelope: Dict[str, Any], output_path: str): """ 保存数字信封到JSON文件 Args: envelope: 数字信封字典 output_path: 输出文件路径 """ envelope_json = json.dumps(envelope, indent=2, ensure_ascii=False) utils.write_file_text(output_path, envelope_json) def load_envelope_from_file(envelope_path: str) -> Dict[str, Any]: """ 从JSON文件加载数字信封 Args: envelope_path: 数字信封文件路径 Returns: dict: 数字信封字典 """ envelope_json = utils.read_file_text(envelope_path) return json.loads(envelope_json) def create_digital_envelope_from_encrypted_file( encrypted_file_path: str, sender_id: str, sender_private_key_path: str, recipient_public_key_path: str ) -> Dict[str, Any]: """ 从加密文件(.enc)创建数字信封,并添加发送方数字签名 适用场景:用户先用【加密/解密】页面用AES加密文件, 然后用【数字信封】页面选择.enc文件添加签名和接收方公钥加密, 最后通过P2P发送。 Args: encrypted_file_path: .enc文件路径(格式:模式|密钥|IV|TAG?|密文) sender_id: 发送方身份标识(如用户名) sender_private_key_path: 发送方RSA私钥路径(用于签名) recipient_public_key_path: 接收方RSA公钥路径(用于加密AES密钥) Returns: dict: 数字信封字典(包含签名) """ # 1. 读取.enc文件 with open(encrypted_file_path, 'rb') as f: enc_data = f.read() if len(enc_data) < 50: raise ValueError("加密文件格式不正确") # 2. 解析.enc文件格式 mode_byte = enc_data[0:1] mode_map = {b'\x01': 'AES-256-CBC', b'\x02': 'AES-256-GCM', b'\x03': 'AES-256-CTR'} if mode_byte not in mode_map: raise ValueError(f"未知的加密模式标识: {mode_byte.hex()}") algorithm = mode_map[mode_byte] aes_key = enc_data[1:33] iv = enc_data[33:49] tag = None if algorithm == 'AES-256-GCM': tag = enc_data[49:65] encrypted_data = enc_data[65:] else: encrypted_data = enc_data[49:] # 3. 解密原文件以计算正确的digest和filesize if algorithm == 'AES-256-CBC': original_data = crypto_mod.aes_decrypt_cbc(encrypted_data, aes_key, iv) elif algorithm == 'AES-256-GCM': original_data = crypto_mod.aes_decrypt_gcm(encrypted_data, aes_key, iv, tag) elif algorithm == 'AES-256-CTR': original_data = crypto_mod.aes_decrypt_ctr(encrypted_data, aes_key, iv) else: raise ValueError(f"Unsupported algorithm: {algorithm}") # 4. 计算原始文件的digest和大小 filename = os.path.basename(encrypted_file_path).replace('.enc', '') filesize = len(original_data) # 原始文件大小 digest = hashlib.sha256(original_data).digest() # 原始文件的摘要 # 5. 加载发送方私钥 sender_private_key = crypto_mod.load_private_key_from_pem(sender_private_key_path) # 6. 加载接收方公钥 recipient_public_key = crypto_mod.load_public_key_from_pem(recipient_public_key_path) # 7. 用接收方公钥加密 AES 密钥(保护对称密钥) encrypted_aes_key = crypto_mod.rsa_encrypt(aes_key, recipient_public_key) # 8. 构建待签名的内容(encrypted_data的哈希,Encrypt-then-Sign) data_to_sign = hashlib.sha256(encrypted_data).digest() # 9. 使用发送方私钥签名 signature = crypto_mod.rsa_sign(data_to_sign, sender_private_key) # 10. 构建数字信封(v2.0格式,不包含sender_public_key字段) envelope = { "version": "2.0", "algorithm": algorithm, "encrypted_key": base64.b64encode(encrypted_aes_key).decode('utf-8'), "iv": base64.b64encode(iv).decode('utf-8'), "encrypted_data": base64.b64encode(encrypted_data).decode('utf-8'), "digest": base64.b64encode(digest).decode('utf-8'), # 原始文件的摘要 "filename": filename, "filesize": filesize, "sender_id": sender_id, "signature": base64.b64encode(signature).decode('utf-8') } if algorithm == 'AES-256-GCM': envelope["tag"] = base64.b64encode(tag).decode('utf-8') return envelope