You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
6.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import pickle
import socket
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import hashlib
# 去除填充函数
def unpad(data):
padding_length = data[-1]
if padding_length > len(data):
return data
return data[:-padding_length]
# 解密文件函数
def decrypt_file(encrypted_data, key, iv, algorithm, mode):
mode_mapping = {
"CBC": modes.CBC,
"CFB": modes.CFB
}
if algorithm == "AES":
# 对于AES算法IV长度为16字节无需处理
cipher = Cipher(algorithms.AES(key), mode_mapping[mode](iv))
elif algorithm == "DES":
# 确保IV长度为8字节
iv = iv[:8]
cipher = Cipher(algorithms.TripleDES(key), mode_mapping[mode](iv))
else:
raise ValueError("不支持该算法")
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
return decrypted_data
# 计算哈希值函数
def calculate_hash(data):
digest = hashes.Hash(hashes.SHA256())
digest.update(data)
return digest.finalize()
# 验证签名函数
def verify_signature(public_key, signature, hash_value):
try:
public_key.verify(
signature,
hash_value,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
# 解密对称密钥函数
def decrypt_symmetric_key(private_key, encrypted_key):
decrypted_key = private_key.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted_key
# 计算文件的 MD5 校验和
def calculate_md5(data):
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
# 接收文件函数
def receive_file(server_ip, server_port, output_dir, algorithm, mode):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((server_ip, server_port))
s.listen(1)
print(f"等待连接,监听端口 {server_port}...")
conn, addr = s.accept()
with conn:
print(f"接收到连接来自: {addr}")
data = b""
while True:
packet = conn.recv(4096)
if not packet:
break
data += packet
package = pickle.loads(data)
encrypted_file = package['encrypted_file']
iv = package['iv']
encrypted_symmetric_key = package['encrypted_symmetric_key']
signature = package['signature']
received_algorithm = package['algorithm']
received_mode = package['mode']
received_md5_checksum = package['md5_checksum']
file_extension = package['file_extension']
# 从封装包中提取发送方公钥
sender_public_pem = package['sender_public_key']
sender_public_key = serialization.load_pem_public_key(sender_public_pem)
if algorithm!= received_algorithm or mode!= received_mode:
raise ValueError("选择的解密算法或模式与发送方不一致")
with open("receiver_private_key.pem", "rb") as f:
receiver_private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
symmetric_key = decrypt_symmetric_key(receiver_private_key, encrypted_symmetric_key)
decrypted_file = decrypt_file(encrypted_file, symmetric_key, iv, algorithm, mode)
decrypted_file = unpad(decrypted_file)
file_hash = calculate_hash(encrypted_file)
is_valid = verify_signature(sender_public_key, signature, file_hash)
return is_valid, output_dir, file_extension, decrypted_file
# 主函数,用于启动 GUI 界面
def receiver_main():
def select_output_dir():
output_dir.set(filedialog.askdirectory())
def start_receiving():
if not output_dir.get():
print("请选择保存文件的路径!")
return
algorithm = algorithm_var.get()
mode = mode_var.get()
try:
is_valid, output_dir_path, file_extension, decrypted_file = receive_file(
server_ip.get(), int(server_port.get()), output_dir.get(), algorithm, mode
)
result_label.config(text=f"接收方验证签名结果: {is_valid}")
if is_valid:
output_file_path = os.path.join(output_dir_path, f"decrypted_file.{file_extension}")
with open(output_file_path, "wb") as f:
f.write(decrypted_file)
messagebox.showinfo("提示", "接收成功")
except Exception as e:
print(f"接收过程中发生错误: {e}")
root = tk.Tk()
root.title("接收方")
root.geometry("450x300") # 适当增加窗口高度以显示新标签
server_ip = tk.StringVar(value="127.0.0.1")
server_port = tk.StringVar(value="12345")
output_dir = tk.StringVar()
algorithm_var = tk.StringVar(value="AES")
mode_var = tk.StringVar(value="CBC")
ttk.Label(root, text="监听 IP").grid(row=0, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=server_ip).grid(row=0, column=1, pady=5)
ttk.Label(root, text="监听端口:").grid(row=1, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=server_port).grid(row=1, column=1, pady=5)
ttk.Label(root, text="选择保存文件的路径:").grid(row=2, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=output_dir, width=30).grid(row=2, column=1, pady=5)
ttk.Button(root, text="选择目录", command=select_output_dir).grid(row=2, column=2, pady=5)
ttk.Label(root, text="选择解密算法:").grid(row=3, column=0, sticky=tk.W, pady=5)
ttk.Combobox(root, textvariable=algorithm_var, values=["AES", "DES"], state="readonly").grid(row=3, column=1, pady=5)
ttk.Label(root, text="选择解密模式:").grid(row=4, column=0, sticky=tk.W, pady=5)
ttk.Combobox(root, textvariable=mode_var, values=["CBC", "CFB"], state="readonly").grid(row=4, column=1, pady=5)
ttk.Button(root, text="开始接收文件", command=start_receiving).grid(row=5, columnspan=3, pady=20)
# 新增显示验证结果的标签
result_label = ttk.Label(root, text="")
result_label.grid(row=6, columnspan=3, pady=10)
root.mainloop()
if __name__ == "__main__":
receiver_main()