|
|
import os
|
|
|
import pickle
|
|
|
import socket
|
|
|
import tkinter as tk
|
|
|
from tkinter import filedialog, ttk, messagebox
|
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
import hashlib
|
|
|
|
|
|
|
|
|
# 去除填充函数
|
|
|
def unpad(data):
|
|
|
padding_length = data[-1]
|
|
|
if padding_length > len(data):
|
|
|
return data
|
|
|
return data[:-padding_length]
|
|
|
|
|
|
|
|
|
# 解密文件函数
|
|
|
def decrypt_file(encrypted_data, key, iv, algorithm, mode):
|
|
|
mode_mapping = {
|
|
|
"CBC": modes.CBC,
|
|
|
"CFB": modes.CFB
|
|
|
}
|
|
|
if algorithm == "AES":
|
|
|
# 对于AES算法,IV长度为16字节,无需处理
|
|
|
cipher = Cipher(algorithms.AES(key), mode_mapping[mode](iv))
|
|
|
elif algorithm == "DES":
|
|
|
# 确保IV长度为8字节
|
|
|
iv = iv[:8]
|
|
|
cipher = Cipher(algorithms.TripleDES(key), mode_mapping[mode](iv))
|
|
|
else:
|
|
|
raise ValueError("不支持该算法")
|
|
|
|
|
|
decryptor = cipher.decryptor()
|
|
|
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
|
|
return decrypted_data
|
|
|
|
|
|
|
|
|
# 计算哈希值函数
|
|
|
def calculate_hash(data):
|
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
|
digest.update(data)
|
|
|
return digest.finalize()
|
|
|
|
|
|
|
|
|
# 验证签名函数
|
|
|
def verify_signature(public_key, signature, hash_value):
|
|
|
try:
|
|
|
public_key.verify(
|
|
|
signature,
|
|
|
hash_value,
|
|
|
padding.PSS(
|
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
|
),
|
|
|
hashes.SHA256()
|
|
|
)
|
|
|
return True
|
|
|
except Exception:
|
|
|
return False
|
|
|
|
|
|
|
|
|
# 解密对称密钥函数
|
|
|
def decrypt_symmetric_key(private_key, encrypted_key):
|
|
|
decrypted_key = private_key.decrypt(
|
|
|
encrypted_key,
|
|
|
padding.OAEP(
|
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
|
algorithm=hashes.SHA256(),
|
|
|
label=None
|
|
|
)
|
|
|
)
|
|
|
return decrypted_key
|
|
|
|
|
|
|
|
|
# 计算文件的 MD5 校验和
|
|
|
def calculate_md5(data):
|
|
|
md5 = hashlib.md5()
|
|
|
md5.update(data)
|
|
|
return md5.hexdigest()
|
|
|
|
|
|
|
|
|
# 接收文件函数
|
|
|
def receive_file(server_ip, server_port, output_dir, algorithm, mode):
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
|
s.bind((server_ip, server_port))
|
|
|
s.listen(1)
|
|
|
print(f"等待连接,监听端口 {server_port}...")
|
|
|
conn, addr = s.accept()
|
|
|
with conn:
|
|
|
print(f"接收到连接来自: {addr}")
|
|
|
data = b""
|
|
|
while True:
|
|
|
packet = conn.recv(4096)
|
|
|
if not packet:
|
|
|
break
|
|
|
data += packet
|
|
|
|
|
|
package = pickle.loads(data)
|
|
|
|
|
|
encrypted_file = package['encrypted_file']
|
|
|
iv = package['iv']
|
|
|
encrypted_symmetric_key = package['encrypted_symmetric_key']
|
|
|
signature = package['signature']
|
|
|
received_algorithm = package['algorithm']
|
|
|
received_mode = package['mode']
|
|
|
received_md5_checksum = package['md5_checksum']
|
|
|
file_extension = package['file_extension']
|
|
|
# 从封装包中提取发送方公钥
|
|
|
sender_public_pem = package['sender_public_key']
|
|
|
sender_public_key = serialization.load_pem_public_key(sender_public_pem)
|
|
|
|
|
|
if algorithm!= received_algorithm or mode!= received_mode:
|
|
|
raise ValueError("选择的解密算法或模式与发送方不一致")
|
|
|
|
|
|
with open("receiver_private_key.pem", "rb") as f:
|
|
|
receiver_private_key = serialization.load_pem_private_key(
|
|
|
f.read(),
|
|
|
password=None
|
|
|
)
|
|
|
|
|
|
symmetric_key = decrypt_symmetric_key(receiver_private_key, encrypted_symmetric_key)
|
|
|
|
|
|
decrypted_file = decrypt_file(encrypted_file, symmetric_key, iv, algorithm, mode)
|
|
|
decrypted_file = unpad(decrypted_file)
|
|
|
|
|
|
file_hash = calculate_hash(encrypted_file)
|
|
|
|
|
|
is_valid = verify_signature(sender_public_key, signature, file_hash)
|
|
|
return is_valid, output_dir, file_extension, decrypted_file
|
|
|
|
|
|
|
|
|
# 主函数,用于启动 GUI 界面
|
|
|
def receiver_main():
|
|
|
def select_output_dir():
|
|
|
output_dir.set(filedialog.askdirectory())
|
|
|
|
|
|
def start_receiving():
|
|
|
if not output_dir.get():
|
|
|
print("请选择保存文件的路径!")
|
|
|
return
|
|
|
algorithm = algorithm_var.get()
|
|
|
mode = mode_var.get()
|
|
|
try:
|
|
|
is_valid, output_dir_path, file_extension, decrypted_file = receive_file(
|
|
|
server_ip.get(), int(server_port.get()), output_dir.get(), algorithm, mode
|
|
|
)
|
|
|
result_label.config(text=f"接收方验证签名结果: {is_valid}")
|
|
|
if is_valid:
|
|
|
output_file_path = os.path.join(output_dir_path, f"decrypted_file.{file_extension}")
|
|
|
with open(output_file_path, "wb") as f:
|
|
|
f.write(decrypted_file)
|
|
|
messagebox.showinfo("提示", "接收成功")
|
|
|
except Exception as e:
|
|
|
print(f"接收过程中发生错误: {e}")
|
|
|
|
|
|
root = tk.Tk()
|
|
|
root.title("接收方")
|
|
|
root.geometry("450x300") # 适当增加窗口高度以显示新标签
|
|
|
|
|
|
server_ip = tk.StringVar(value="127.0.0.1")
|
|
|
server_port = tk.StringVar(value="12345")
|
|
|
output_dir = tk.StringVar()
|
|
|
algorithm_var = tk.StringVar(value="AES")
|
|
|
mode_var = tk.StringVar(value="CBC")
|
|
|
|
|
|
ttk.Label(root, text="监听 IP:").grid(row=0, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=server_ip).grid(row=0, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="监听端口:").grid(row=1, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=server_port).grid(row=1, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="选择保存文件的路径:").grid(row=2, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=output_dir, width=30).grid(row=2, column=1, pady=5)
|
|
|
ttk.Button(root, text="选择目录", command=select_output_dir).grid(row=2, column=2, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="选择解密算法:").grid(row=3, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Combobox(root, textvariable=algorithm_var, values=["AES", "DES"], state="readonly").grid(row=3, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="选择解密模式:").grid(row=4, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Combobox(root, textvariable=mode_var, values=["CBC", "CFB"], state="readonly").grid(row=4, column=1, pady=5)
|
|
|
|
|
|
ttk.Button(root, text="开始接收文件", command=start_receiving).grid(row=5, columnspan=3, pady=20)
|
|
|
|
|
|
# 新增显示验证结果的标签
|
|
|
result_label = ttk.Label(root, text="")
|
|
|
result_label.grid(row=6, columnspan=3, pady=10)
|
|
|
|
|
|
root.mainloop()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
receiver_main() |