import os import socket from Crypto.Cipher import AES, PKCS1_OAEP from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 from Crypto.Hash import SHA256 from Crypto.Util.Padding import unpad def generate_keys_if_not_exists(): """生成公钥和私钥,如果文件不存在""" if not os.path.exists('receiver_public.pem') or not os.path.exists('receiver_private.pem'): print("正在生成公钥和私钥...") generate_keys() else: print("公钥和私钥已存在。") def generate_keys(): """生成公钥和私钥并保存为文件""" key = RSA.generate(2048) # 导出私钥 private_key = key.export_key() with open('receiver_private.pem', 'wb') as f: f.write(private_key) print("私钥已保存为 'receiver_private.pem'") # 导出公钥 public_key = key.publickey().export_key() with open('receiver_public.pem', 'wb') as f: f.write(public_key) print("公钥已保存为 'receiver_public.pem'") # 读取私钥 def load_private_key(): with open('receiver_private.pem', 'rb') as f: private_key = RSA.import_key(f.read()) return private_key # 读取发送方公钥 def load_public_key(): with open('public.pem', 'rb') as f: public_key = RSA.import_key(f.read()) return public_key # 解密文件 def decrypt_file(encrypted_session_key, iv, ciphertext, private_key): # 使用 RSA 解密 AES 密钥 cipher_rsa = PKCS1_OAEP.new(private_key) session_key = cipher_rsa.decrypt(encrypted_session_key) # 使用 AES 解密文件内容 mode = AES.MODE_CBC if iv else AES.MODE_ECB # 如果有 IV 则使用 CBC 模式,否则使用 ECB 模式 cipher_aes = AES.new(session_key, mode, iv=iv if iv else None) # 解密并去除填充 try: plaintext = unpad(cipher_aes.decrypt(ciphertext), AES.block_size) except ValueError: print("错误:填充数据不正确或密文已损坏。") raise return plaintext # 验证签名 def verify_signature(filename, signature, public_key): # 计算文件的 SHA-256 哈希值 hash_obj = SHA256.new() with open(filename, 'rb') as f: while chunk := f.read(4096): hash_obj.update(chunk) # 使用公钥验证签名 verifier = pkcs1_15.new(public_key) try: verifier.verify(hash_obj, signature) print("签名验证成功。") except (ValueError, TypeError): print("签名验证失败。") # 服务器接收加密文件并解密 def receive_encrypted_file(host, port): # 监听客户端连接 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((host, port)) server_socket.listen(1) print(f"服务器正在监听 {host}:{port} ...") client_socket, addr = server_socket.accept() print(f"与客户端 {addr} 建立了连接。") # 接收加密的 AES 密钥、IV 和加密的文件内容 encrypted_session_key = client_socket.recv(256) # RSA 密钥长度 2048 位 (256 字节) iv = client_socket.recv(16) # AES 块大小为 16 字节 ciphertext = client_socket.recv(16) signature = client_socket.recv(256) # 解密文件 private_key = load_private_key() decrypted_file = decrypt_file(encrypted_session_key, iv, ciphertext, private_key) # 保存解密后的文件 with open('received_file.txt', 'wb') as f: f.write(decrypted_file) with open('received_file.txt', 'r', encoding='utf-8') as f: print("解密后的文件内容:") print(f.read()) # 验证签名 verify_signature('received_file.txt', signature, load_public_key()) print("文件接收并解密成功。") client_socket.close() # 使用示例 generate_keys_if_not_exists() receive_encrypted_file('127.0.0.1', 59290)