You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

269 lines
8.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import pickle
import socket
import tkinter as tk
from tkinter import filedialog, ttk, messagebox
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import hashlib
# 生成并保存发送方密钥对
def generate_and_save_sender_keys():
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
public_key = private_key.public_key()
with open('sender_private_key.pem', 'wb') as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
with open('sender_public_key.pem', 'wb') as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
return private_key, public_key
# 加载发送方密钥对
def load_sender_keys():
try:
with open('sender_private_key.pem', 'rb') as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
with open('sender_public_key.pem', 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
return private_key, public_key
except FileNotFoundError:
return generate_and_save_sender_keys()
sender_private_key, sender_public_key = load_sender_keys()
# 用于存储接收方公钥的变量
receiver_public_key = None
# 填充数据函数
def pad(data, block_size):
padding_length = block_size - (len(data) % block_size)
if padding_length == 0:
padding_length = block_size
padding = bytes([padding_length]) * padding_length
return data + padding
# 加密文件函数
def encrypt_file(file_path, key, algorithm, mode):
with open(file_path, 'rb') as f:
file_data = f.read()
if algorithm == "AES":
block_size = 16
elif algorithm == "DES":
block_size = 8
else:
raise ValueError("Unsupported algorithm")
file_data = pad(file_data, block_size)
mode_mapping = {
"CBC": modes.CBC,
"CFB": modes.CFB
}
if algorithm == "AES":
iv = os.urandom(16)
elif algorithm == "DES":
iv = os.urandom(8) # 生成8字节的IV用于DES算法
else:
raise ValueError("Unsupported algorithm")
if algorithm == "AES":
cipher = Cipher(algorithms.AES(key), mode_mapping[mode](iv))
elif algorithm == "DES":
cipher = Cipher(algorithms.TripleDES(key), mode_mapping[mode](iv))
else:
raise ValueError("Unsupported algorithm")
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(file_data) + encryptor.finalize()
return encrypted_data, iv
# 计算哈希值函数
def calculate_hash(data):
digest = hashes.Hash(hashes.SHA256())
digest.update(data)
return digest.finalize()
# 对哈希值进行签名函数
def sign_hash(private_key, hash_value):
signature = private_key.sign(
hash_value,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
# 加密对称密钥函数
def encrypt_symmetric_key(public_key, symmetric_key):
encrypted_key = public_key.encrypt(
symmetric_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return encrypted_key
# 计算文件的 MD5 校验和
def calculate_md5(data):
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
# 发送文件函数
def send_file(server_ip, server_port, package):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((server_ip, server_port))
s.sendall(pickle.dumps(package))
return True
except Exception as e:
print(f"连接失败: {e}")
return False
# 导入接收方公钥的函数
def import_receiver_public_key():
global receiver_public_key
file_path = filedialog.askopenfilename(filetypes=[("PEM files", "*.pem")])
if file_path:
try:
with open(file_path, 'rb') as f:
receiver_public_key = serialization.load_pem_public_key(f.read())
messagebox.showinfo("提示", "接收方公钥导入成功")
except Exception as e:
messagebox.showerror("错误", f"导入接收方公钥失败: {e}")
# 主函数,用于启动 GUI 界面
def sender_main():
def select_file():
file_path.set(filedialog.askopenfilename())
def send():
global receiver_public_key
file = file_path.get()
algorithm = algorithm_var.get()
mode = mode_var.get()
if not file:
messagebox.showerror("错误", "请选择文件")
return
if algorithm == "AES":
key_size = 32
elif algorithm == "DES":
key_size = 24
else:
print("不支持的算法!")
return
if mode == "CBC":
pass
elif mode == "CFB":
pass
else:
print("不支持的模式!")
return
if not receiver_public_key:
messagebox.showerror("错误", "请导入证书")
return
symmetric_key = os.urandom(key_size)
encrypted_file, iv = encrypt_file(file, symmetric_key, algorithm, mode)
file_hash = calculate_hash(encrypted_file)
signature = sign_hash(sender_private_key, file_hash)
sender_public_pem = sender_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
encrypted_symmetric_key = encrypt_symmetric_key(receiver_public_key, symmetric_key)
md5_checksum = calculate_md5(encrypted_file)
file_extension = os.path.splitext(file)[1][1:]
# 在封装包中添加发送方公钥
package = {
'encrypted_file': encrypted_file,
'iv': iv,
'encrypted_symmetric_key': encrypted_symmetric_key,
'signature': signature,
'algorithm': algorithm,
'mode': mode,
'md5_checksum': md5_checksum,
'file_extension': file_extension,
'sender_public_key': sender_public_pem
}
if send_file(server_ip.get(), int(server_port.get()), package):
messagebox.showinfo("提示", "发送成功")
else:
messagebox.showerror("错误", "连接失败")
root = tk.Tk()
root.title("发送方")
root.geometry("490x250")
file_path = tk.StringVar()
algorithm_var = tk.StringVar(value="AES")
mode_var = tk.StringVar(value="CBC")
server_ip = tk.StringVar(value="127.0.0.1")
server_port = tk.StringVar(value="12345")
ttk.Label(root, text="选择文件:").grid(row=0, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=file_path, width=30).grid(row=0, column=1, pady=5)
ttk.Button(root, text="浏览", command=select_file).grid(row=0, column=2, pady=5)
ttk.Button(root, text="导入证书", command=import_receiver_public_key).grid(row=0, column=3, pady=5)
ttk.Label(root, text="选择加密算法:").grid(row=1, column=0, sticky=tk.W, pady=5)
ttk.Combobox(root, textvariable=algorithm_var, values=["AES", "DES"], state="readonly").grid(row=1, column=1, pady=5)
ttk.Label(root, text="选择加密模式:").grid(row=2, column=0, sticky=tk.W, pady=5)
ttk.Combobox(root, textvariable=mode_var, values=["CBC", "CFB"], state="readonly").grid(row=2, column=1, pady=5)
ttk.Label(root, text="接收方 IP").grid(row=3, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=server_ip).grid(row=3, column=1, pady=5)
ttk.Label(root, text="接收方端口:").grid(row=4, column=0, sticky=tk.W, pady=5)
ttk.Entry(root, textvariable=server_port).grid(row=4, column=1, pady=5)
ttk.Button(root, text="发送文件", command=send).grid(row=5, columnspan=3, pady=20)
root.mainloop()
if __name__ == "__main__":
sender_main()