import os import pickle import socket import tkinter as tk from tkinter import filedialog, ttk, messagebox from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes import hashlib # 生成并保存发送方密钥对 def generate_and_save_sender_keys(): private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) public_key = private_key.public_key() with open('sender_private_key.pem', 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) with open('sender_public_key.pem', 'wb') as f: f.write(public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo )) return private_key, public_key # 加载发送方密钥对 def load_sender_keys(): try: with open('sender_private_key.pem', 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None ) with open('sender_public_key.pem', 'rb') as f: public_key = serialization.load_pem_public_key(f.read()) return private_key, public_key except FileNotFoundError: return generate_and_save_sender_keys() sender_private_key, sender_public_key = load_sender_keys() # 用于存储接收方公钥的变量 receiver_public_key = None # 填充数据函数 def pad(data, block_size): padding_length = block_size - (len(data) % block_size) if padding_length == 0: padding_length = block_size padding = bytes([padding_length]) * padding_length return data + padding # 加密文件函数 def encrypt_file(file_path, key, algorithm, mode): with open(file_path, 'rb') as f: file_data = f.read() if algorithm == "AES": block_size = 16 elif algorithm == "DES": block_size = 8 else: raise ValueError("Unsupported algorithm") file_data = pad(file_data, block_size) mode_mapping = { "CBC": modes.CBC, "CFB": modes.CFB } if algorithm == "AES": iv = os.urandom(16) elif algorithm == "DES": iv = os.urandom(8) # 生成8字节的IV用于DES算法 else: raise ValueError("Unsupported algorithm") if algorithm == "AES": cipher = Cipher(algorithms.AES(key), mode_mapping[mode](iv)) elif algorithm == "DES": cipher = Cipher(algorithms.TripleDES(key), mode_mapping[mode](iv)) else: raise ValueError("Unsupported algorithm") encryptor = cipher.encryptor() encrypted_data = encryptor.update(file_data) + encryptor.finalize() return encrypted_data, iv # 计算哈希值函数 def calculate_hash(data): digest = hashes.Hash(hashes.SHA256()) digest.update(data) return digest.finalize() # 对哈希值进行签名函数 def sign_hash(private_key, hash_value): signature = private_key.sign( hash_value, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return signature # 加密对称密钥函数 def encrypt_symmetric_key(public_key, symmetric_key): encrypted_key = public_key.encrypt( symmetric_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) return encrypted_key # 计算文件的 MD5 校验和 def calculate_md5(data): md5 = hashlib.md5() md5.update(data) return md5.hexdigest() # 发送文件函数 def send_file(server_ip, server_port, package): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((server_ip, server_port)) s.sendall(pickle.dumps(package)) return True except Exception as e: print(f"连接失败: {e}") return False # 导入接收方公钥的函数 def import_receiver_public_key(): global receiver_public_key file_path = filedialog.askopenfilename(filetypes=[("PEM files", "*.pem")]) if file_path: try: with open(file_path, 'rb') as f: receiver_public_key = serialization.load_pem_public_key(f.read()) messagebox.showinfo("提示", "接收方公钥导入成功") except Exception as e: messagebox.showerror("错误", f"导入接收方公钥失败: {e}") # 主函数,用于启动 GUI 界面 def sender_main(): def select_file(): file_path.set(filedialog.askopenfilename()) def send(): global receiver_public_key file = file_path.get() algorithm = algorithm_var.get() mode = mode_var.get() if not file: messagebox.showerror("错误", "请选择文件") return if algorithm == "AES": key_size = 32 elif algorithm == "DES": key_size = 24 else: print("不支持的算法!") return if mode == "CBC": pass elif mode == "CFB": pass else: print("不支持的模式!") return if not receiver_public_key: messagebox.showerror("错误", "请导入证书") return symmetric_key = os.urandom(key_size) encrypted_file, iv = encrypt_file(file, symmetric_key, algorithm, mode) file_hash = calculate_hash(encrypted_file) signature = sign_hash(sender_private_key, file_hash) sender_public_pem = sender_public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) encrypted_symmetric_key = encrypt_symmetric_key(receiver_public_key, symmetric_key) md5_checksum = calculate_md5(encrypted_file) file_extension = os.path.splitext(file)[1][1:] # 在封装包中添加发送方公钥 package = { 'encrypted_file': encrypted_file, 'iv': iv, 'encrypted_symmetric_key': encrypted_symmetric_key, 'signature': signature, 'algorithm': algorithm, 'mode': mode, 'md5_checksum': md5_checksum, 'file_extension': file_extension, 'sender_public_key': sender_public_pem } if send_file(server_ip.get(), int(server_port.get()), package): messagebox.showinfo("提示", "发送成功") else: messagebox.showerror("错误", "连接失败") root = tk.Tk() root.title("发送方") root.geometry("490x250") file_path = tk.StringVar() algorithm_var = tk.StringVar(value="AES") mode_var = tk.StringVar(value="CBC") server_ip = tk.StringVar(value="127.0.0.1") server_port = tk.StringVar(value="12345") ttk.Label(root, text="选择文件:").grid(row=0, column=0, sticky=tk.W, pady=5) ttk.Entry(root, textvariable=file_path, width=30).grid(row=0, column=1, pady=5) ttk.Button(root, text="浏览", command=select_file).grid(row=0, column=2, pady=5) ttk.Button(root, text="导入证书", command=import_receiver_public_key).grid(row=0, column=3, pady=5) ttk.Label(root, text="选择加密算法:").grid(row=1, column=0, sticky=tk.W, pady=5) ttk.Combobox(root, textvariable=algorithm_var, values=["AES", "DES"], state="readonly").grid(row=1, column=1, pady=5) ttk.Label(root, text="选择加密模式:").grid(row=2, column=0, sticky=tk.W, pady=5) ttk.Combobox(root, textvariable=mode_var, values=["CBC", "CFB"], state="readonly").grid(row=2, column=1, pady=5) ttk.Label(root, text="接收方 IP:").grid(row=3, column=0, sticky=tk.W, pady=5) ttk.Entry(root, textvariable=server_ip).grid(row=3, column=1, pady=5) ttk.Label(root, text="接收方端口:").grid(row=4, column=0, sticky=tk.W, pady=5) ttk.Entry(root, textvariable=server_port).grid(row=4, column=1, pady=5) ttk.Button(root, text="发送文件", command=send).grid(row=5, columnspan=3, pady=20) root.mainloop() if __name__ == "__main__": sender_main()