|
|
import os
|
|
|
import pickle
|
|
|
import socket
|
|
|
import tkinter as tk
|
|
|
from tkinter import filedialog, ttk, messagebox
|
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
import hashlib
|
|
|
|
|
|
|
|
|
# 生成并保存发送方密钥对
|
|
|
def generate_and_save_sender_keys():
|
|
|
private_key = rsa.generate_private_key(
|
|
|
public_exponent=65537,
|
|
|
key_size=2048,
|
|
|
)
|
|
|
public_key = private_key.public_key()
|
|
|
|
|
|
with open('sender_private_key.pem', 'wb') as f:
|
|
|
f.write(private_key.private_bytes(
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
|
))
|
|
|
|
|
|
with open('sender_public_key.pem', 'wb') as f:
|
|
|
f.write(public_key.public_bytes(
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
|
))
|
|
|
|
|
|
return private_key, public_key
|
|
|
|
|
|
|
|
|
# 加载发送方密钥对
|
|
|
def load_sender_keys():
|
|
|
try:
|
|
|
with open('sender_private_key.pem', 'rb') as f:
|
|
|
private_key = serialization.load_pem_private_key(
|
|
|
f.read(),
|
|
|
password=None
|
|
|
)
|
|
|
with open('sender_public_key.pem', 'rb') as f:
|
|
|
public_key = serialization.load_pem_public_key(f.read())
|
|
|
return private_key, public_key
|
|
|
except FileNotFoundError:
|
|
|
return generate_and_save_sender_keys()
|
|
|
|
|
|
|
|
|
sender_private_key, sender_public_key = load_sender_keys()
|
|
|
|
|
|
# 用于存储接收方公钥的变量
|
|
|
receiver_public_key = None
|
|
|
|
|
|
|
|
|
# 填充数据函数
|
|
|
def pad(data, block_size):
|
|
|
padding_length = block_size - (len(data) % block_size)
|
|
|
if padding_length == 0:
|
|
|
padding_length = block_size
|
|
|
padding = bytes([padding_length]) * padding_length
|
|
|
return data + padding
|
|
|
|
|
|
|
|
|
# 加密文件函数
|
|
|
def encrypt_file(file_path, key, algorithm, mode):
|
|
|
with open(file_path, 'rb') as f:
|
|
|
file_data = f.read()
|
|
|
if algorithm == "AES":
|
|
|
block_size = 16
|
|
|
elif algorithm == "DES":
|
|
|
block_size = 8
|
|
|
else:
|
|
|
raise ValueError("Unsupported algorithm")
|
|
|
file_data = pad(file_data, block_size)
|
|
|
|
|
|
mode_mapping = {
|
|
|
"CBC": modes.CBC,
|
|
|
"CFB": modes.CFB
|
|
|
}
|
|
|
if algorithm == "AES":
|
|
|
iv = os.urandom(16)
|
|
|
elif algorithm == "DES":
|
|
|
iv = os.urandom(8) # 生成8字节的IV用于DES算法
|
|
|
else:
|
|
|
raise ValueError("Unsupported algorithm")
|
|
|
|
|
|
if algorithm == "AES":
|
|
|
cipher = Cipher(algorithms.AES(key), mode_mapping[mode](iv))
|
|
|
elif algorithm == "DES":
|
|
|
cipher = Cipher(algorithms.TripleDES(key), mode_mapping[mode](iv))
|
|
|
else:
|
|
|
raise ValueError("Unsupported algorithm")
|
|
|
|
|
|
encryptor = cipher.encryptor()
|
|
|
encrypted_data = encryptor.update(file_data) + encryptor.finalize()
|
|
|
return encrypted_data, iv
|
|
|
|
|
|
|
|
|
# 计算哈希值函数
|
|
|
def calculate_hash(data):
|
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
|
digest.update(data)
|
|
|
return digest.finalize()
|
|
|
|
|
|
|
|
|
# 对哈希值进行签名函数
|
|
|
def sign_hash(private_key, hash_value):
|
|
|
signature = private_key.sign(
|
|
|
hash_value,
|
|
|
padding.PSS(
|
|
|
mgf=padding.MGF1(hashes.SHA256()),
|
|
|
salt_length=padding.PSS.MAX_LENGTH
|
|
|
),
|
|
|
hashes.SHA256()
|
|
|
)
|
|
|
return signature
|
|
|
|
|
|
|
|
|
# 加密对称密钥函数
|
|
|
def encrypt_symmetric_key(public_key, symmetric_key):
|
|
|
encrypted_key = public_key.encrypt(
|
|
|
symmetric_key,
|
|
|
padding.OAEP(
|
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
|
algorithm=hashes.SHA256(),
|
|
|
label=None
|
|
|
)
|
|
|
)
|
|
|
return encrypted_key
|
|
|
|
|
|
|
|
|
# 计算文件的 MD5 校验和
|
|
|
def calculate_md5(data):
|
|
|
md5 = hashlib.md5()
|
|
|
md5.update(data)
|
|
|
return md5.hexdigest()
|
|
|
|
|
|
|
|
|
# 发送文件函数
|
|
|
def send_file(server_ip, server_port, package):
|
|
|
try:
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
|
s.connect((server_ip, server_port))
|
|
|
s.sendall(pickle.dumps(package))
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"连接失败: {e}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
# 导入接收方公钥的函数
|
|
|
def import_receiver_public_key():
|
|
|
global receiver_public_key
|
|
|
file_path = filedialog.askopenfilename(filetypes=[("PEM files", "*.pem")])
|
|
|
if file_path:
|
|
|
try:
|
|
|
with open(file_path, 'rb') as f:
|
|
|
receiver_public_key = serialization.load_pem_public_key(f.read())
|
|
|
messagebox.showinfo("提示", "接收方公钥导入成功")
|
|
|
except Exception as e:
|
|
|
messagebox.showerror("错误", f"导入接收方公钥失败: {e}")
|
|
|
|
|
|
|
|
|
# 主函数,用于启动 GUI 界面
|
|
|
def sender_main():
|
|
|
def select_file():
|
|
|
file_path.set(filedialog.askopenfilename())
|
|
|
|
|
|
def send():
|
|
|
global receiver_public_key
|
|
|
file = file_path.get()
|
|
|
algorithm = algorithm_var.get()
|
|
|
mode = mode_var.get()
|
|
|
|
|
|
if not file:
|
|
|
messagebox.showerror("错误", "请选择文件")
|
|
|
return
|
|
|
|
|
|
if algorithm == "AES":
|
|
|
key_size = 32
|
|
|
elif algorithm == "DES":
|
|
|
key_size = 24
|
|
|
else:
|
|
|
print("不支持的算法!")
|
|
|
return
|
|
|
|
|
|
if mode == "CBC":
|
|
|
pass
|
|
|
elif mode == "CFB":
|
|
|
pass
|
|
|
else:
|
|
|
print("不支持的模式!")
|
|
|
return
|
|
|
|
|
|
if not receiver_public_key:
|
|
|
messagebox.showerror("错误", "请导入证书")
|
|
|
return
|
|
|
|
|
|
symmetric_key = os.urandom(key_size)
|
|
|
encrypted_file, iv = encrypt_file(file, symmetric_key, algorithm, mode)
|
|
|
|
|
|
file_hash = calculate_hash(encrypted_file)
|
|
|
signature = sign_hash(sender_private_key, file_hash)
|
|
|
|
|
|
sender_public_pem = sender_public_key.public_bytes(
|
|
|
encoding=serialization.Encoding.PEM,
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
|
)
|
|
|
|
|
|
encrypted_symmetric_key = encrypt_symmetric_key(receiver_public_key, symmetric_key)
|
|
|
|
|
|
md5_checksum = calculate_md5(encrypted_file)
|
|
|
|
|
|
file_extension = os.path.splitext(file)[1][1:]
|
|
|
# 在封装包中添加发送方公钥
|
|
|
package = {
|
|
|
'encrypted_file': encrypted_file,
|
|
|
'iv': iv,
|
|
|
'encrypted_symmetric_key': encrypted_symmetric_key,
|
|
|
'signature': signature,
|
|
|
'algorithm': algorithm,
|
|
|
'mode': mode,
|
|
|
'md5_checksum': md5_checksum,
|
|
|
'file_extension': file_extension,
|
|
|
'sender_public_key': sender_public_pem
|
|
|
}
|
|
|
|
|
|
if send_file(server_ip.get(), int(server_port.get()), package):
|
|
|
messagebox.showinfo("提示", "发送成功")
|
|
|
else:
|
|
|
messagebox.showerror("错误", "连接失败")
|
|
|
|
|
|
|
|
|
root = tk.Tk()
|
|
|
root.title("发送方")
|
|
|
root.geometry("490x250")
|
|
|
|
|
|
file_path = tk.StringVar()
|
|
|
algorithm_var = tk.StringVar(value="AES")
|
|
|
mode_var = tk.StringVar(value="CBC")
|
|
|
server_ip = tk.StringVar(value="127.0.0.1")
|
|
|
server_port = tk.StringVar(value="12345")
|
|
|
|
|
|
ttk.Label(root, text="选择文件:").grid(row=0, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=file_path, width=30).grid(row=0, column=1, pady=5)
|
|
|
ttk.Button(root, text="浏览", command=select_file).grid(row=0, column=2, pady=5)
|
|
|
ttk.Button(root, text="导入证书", command=import_receiver_public_key).grid(row=0, column=3, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="选择加密算法:").grid(row=1, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Combobox(root, textvariable=algorithm_var, values=["AES", "DES"], state="readonly").grid(row=1, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="选择加密模式:").grid(row=2, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Combobox(root, textvariable=mode_var, values=["CBC", "CFB"], state="readonly").grid(row=2, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="接收方 IP:").grid(row=3, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=server_ip).grid(row=3, column=1, pady=5)
|
|
|
|
|
|
ttk.Label(root, text="接收方端口:").grid(row=4, column=0, sticky=tk.W, pady=5)
|
|
|
ttk.Entry(root, textvariable=server_port).grid(row=4, column=1, pady=5)
|
|
|
|
|
|
ttk.Button(root, text="发送文件", command=send).grid(row=5, columnspan=3, pady=20)
|
|
|
|
|
|
root.mainloop()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
sender_main() |