parent
45f7cbb963
commit
ebd59a33e5
@ -1,138 +0,0 @@
|
|||||||
import socket
|
|
||||||
import threading
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
from sender import Sender
|
|
||||||
from receiver import Receiver
|
|
||||||
|
|
||||||
class FileTransferApp:
|
|
||||||
def __init__(self, host, port, role):
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.role = role
|
|
||||||
self.private_key = None
|
|
||||||
self.public_key = None
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def generate_keys(self):
|
|
||||||
key = RSA.generate(2048)
|
|
||||||
self.private_key = key.export_key()
|
|
||||||
self.public_key = key.publickey().export_key()
|
|
||||||
if self.role == 'sender':
|
|
||||||
priv_filename = 'send_private.pem'
|
|
||||||
pub_filename = 'send_public.pem'
|
|
||||||
else:
|
|
||||||
priv_filename = 'receive_private.pem'
|
|
||||||
pub_filename = 'receive_public.pem'
|
|
||||||
with open(priv_filename, 'wb') as priv_file:
|
|
||||||
priv_file.write(self.private_key)
|
|
||||||
with open(pub_filename, 'wb') as pub_file:
|
|
||||||
pub_file.write(self.public_key)
|
|
||||||
print(f"密钥生成成功,已保存到 {priv_filename} 和 {pub_filename}。")
|
|
||||||
|
|
||||||
def load_keys(self):
|
|
||||||
if self.role == 'sender':
|
|
||||||
priv_filename = 'send_private.pem'
|
|
||||||
pub_filename = 'send_public.pem'
|
|
||||||
else:
|
|
||||||
priv_filename = 'receive_private.pem'
|
|
||||||
pub_filename = 'receive_public.pem'
|
|
||||||
try:
|
|
||||||
with open(priv_filename, 'rb') as priv_file:
|
|
||||||
self.private_key = priv_file.read()
|
|
||||||
with open(pub_filename, 'rb') as pub_file:
|
|
||||||
self.public_key = pub_file.read()
|
|
||||||
print("密钥加载成功。")
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("密钥文件未找到,请先生成密钥。")
|
|
||||||
|
|
||||||
def load_public_key(self, filepath):
|
|
||||||
try:
|
|
||||||
with open(filepath, 'rb') as pub_file:
|
|
||||||
return pub_file.read()
|
|
||||||
except FileNotFoundError:
|
|
||||||
print(f"公钥文件 {filepath} 未找到。")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def send_file(self, filepath, receiver_host, receiver_public_key_path):
|
|
||||||
receiver_public_key = self.load_public_key(receiver_public_key_path)
|
|
||||||
if receiver_public_key is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
||||||
s.connect((receiver_host, self.port))
|
|
||||||
|
|
||||||
sender = Sender(filepath, receiver_public_key, self.private_key)
|
|
||||||
data_to_send = sender.send_file()
|
|
||||||
if data_to_send is None:
|
|
||||||
print("文件加密失败,未发送。")
|
|
||||||
return
|
|
||||||
|
|
||||||
s.sendall(data_to_send.encode())
|
|
||||||
print("文件发送成功。")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"发送文件时发生错误: {e}")
|
|
||||||
|
|
||||||
def receive_file(self, sender_public_key_path):
|
|
||||||
sender_public_key = self.load_public_key(sender_public_key_path)
|
|
||||||
if sender_public_key is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
if self.private_key is None:
|
|
||||||
raise ValueError("未生成私钥,请先生成密钥。")
|
|
||||||
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
||||||
s.bind((self.host, self.port))
|
|
||||||
s.listen()
|
|
||||||
print(f"服务器正在 {self.host}:{self.port} 监听...")
|
|
||||||
conn, addr = s.accept()
|
|
||||||
with conn:
|
|
||||||
print(f"已连接到 {addr}")
|
|
||||||
data = b''
|
|
||||||
while True:
|
|
||||||
packet = conn.recv(1024)
|
|
||||||
if not packet:
|
|
||||||
break
|
|
||||||
data += packet
|
|
||||||
|
|
||||||
receiver = Receiver(self.private_key, sender_public_key)
|
|
||||||
decrypted_data = receiver.receive_file(data.decode())
|
|
||||||
|
|
||||||
if decrypted_data is not None:
|
|
||||||
with open('received_file', 'wb') as f:
|
|
||||||
f.write(decrypted_data)
|
|
||||||
print("文件接收并解密成功。")
|
|
||||||
else:
|
|
||||||
print("解密失败,文件未保存。")
|
|
||||||
except ValueError as ve:
|
|
||||||
print(f"值错误: {ve}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"接收文件时发生错误: {e}")
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
while self.running:
|
|
||||||
command = input("输入 'generate_keys' 生成密钥,'load_keys' 加载密钥,'send' 发送文件,'receive' 接收文件,或 'exit' 退出程序: ").strip().lower()
|
|
||||||
if command == 'generate_keys':
|
|
||||||
self.generate_keys()
|
|
||||||
elif command == 'load_keys':
|
|
||||||
self.load_keys()
|
|
||||||
elif command == 'send':
|
|
||||||
filepath = input("输入要发送的文件路径: ")
|
|
||||||
receiver_host = input("输入接收方的计算机IP地址: ")
|
|
||||||
receiver_public_key_path = input("输入接收方的公钥文件路径: ")
|
|
||||||
threading.Thread(target=self.send_file, args=(filepath, receiver_host, receiver_public_key_path)).start()
|
|
||||||
elif command == 'receive':
|
|
||||||
sender_public_key_path = input("输入发送方的公钥文件路径: ")
|
|
||||||
threading.Thread(target=self.receive_file, args=(sender_public_key_path,)).start()
|
|
||||||
elif command == 'exit':
|
|
||||||
self.running = False
|
|
||||||
print("正在退出程序。")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("无效的命令。")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
role = input("输入角色 ('sender' 或 'receiver'): ").strip().lower()
|
|
||||||
app = FileTransferApp(host='0.0.0.0', port=65432, role=role)
|
|
||||||
app.run()
|
|
@ -0,0 +1,9 @@
|
|||||||
|
from utils import generate_rsa_keypair, save_key
|
||||||
|
|
||||||
|
sender_private, sender_public = generate_rsa_keypair()
|
||||||
|
receiver_private, receiver_public = generate_rsa_keypair()
|
||||||
|
|
||||||
|
save_key('sender_private.pem', sender_private)
|
||||||
|
save_key('sender_public.pem', sender_public)
|
||||||
|
save_key('receiver_private.pem', receiver_private)
|
||||||
|
save_key('receiver_public.pem', receiver_public)
|
@ -1,30 +1,55 @@
|
|||||||
from Crypto.Cipher import AES, PKCS1_OAEP
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
import json
|
import json
|
||||||
import base64
|
import base64
|
||||||
from encryption_utils import AsymmetricEncryption, SymmetricEncryption
|
import socket
|
||||||
|
from utils import aes_decrypt, rsa_decrypt, verify_signature, load_key
|
||||||
class Receiver:
|
|
||||||
def __init__(self, private_key, sender_public_key):
|
def receive_file(port, receiver_private_key, sender_public_key):
|
||||||
self.private_key = RSA.import_key(private_key)
|
# 启动服务监听
|
||||||
self.sender_public_key = RSA.import_key(sender_public_key)
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
self.asym_enc = AsymmetricEncryption()
|
s.bind(('', port))
|
||||||
self.sym_enc = SymmetricEncryption()
|
s.listen(1)
|
||||||
|
print(f"接收方正在监听端口 {port}...")
|
||||||
def receive_file(self, data):
|
|
||||||
data = json.loads(data)
|
conn, addr = s.accept()
|
||||||
enc_session_key = base64.b64decode(data['enc_session_key'])
|
print(f"来自 {addr} 的连接已建立!")
|
||||||
nonce = base64.b64decode(data['nonce'])
|
|
||||||
tag = base64.b64decode(data['tag'])
|
with conn:
|
||||||
ciphertext = base64.b64decode(data['ciphertext'])
|
# 接收数据
|
||||||
signature = base64.b64decode(data['signature'])
|
data = conn.recv(65536) # 最大接收 64KB 数据
|
||||||
|
data_packet = json.loads(data.decode('utf-8'))
|
||||||
cipher_rsa = PKCS1_OAEP.new(self.private_key)
|
|
||||||
session_key = cipher_rsa.decrypt(enc_session_key)
|
# 解码 Base64 数据
|
||||||
|
nonce = base64.b64decode(data_packet['nonce'])
|
||||||
file_data = self.sym_enc.decrypt(nonce, ciphertext, tag, session_key)
|
ciphertext = base64.b64decode(data_packet['ciphertext'])
|
||||||
|
tag = base64.b64decode(data_packet['tag'])
|
||||||
if self.asym_enc.verify_signature(file_data, signature, self.sender_public_key):
|
encrypted_key = base64.b64decode(data_packet['encrypted_key'])
|
||||||
return file_data
|
signature = base64.b64decode(data_packet['signature'])
|
||||||
|
filename = data_packet['filename']
|
||||||
|
|
||||||
|
# 使用接收方私钥解密AES密钥
|
||||||
|
aes_key = rsa_decrypt(encrypted_key, receiver_private_key)
|
||||||
|
|
||||||
|
# 使用AES密钥解密文件内容
|
||||||
|
try:
|
||||||
|
file_data = aes_decrypt(nonce, ciphertext, tag, aes_key)
|
||||||
|
|
||||||
|
# 验证签名
|
||||||
|
if verify_signature(sender_public_key, file_data, signature):
|
||||||
|
print("签名验证成功,文件未被篡改!")
|
||||||
|
# 保存解密后的文件
|
||||||
|
with open(f"received_{filename}", 'wb') as f:
|
||||||
|
f.write(file_data)
|
||||||
|
print(f"文件已保存为 received_{filename}")
|
||||||
else:
|
else:
|
||||||
raise ValueError("签名验证失败")
|
print("签名验证失败,文件可能被篡改!")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"文件解密或验证过程中出错:{e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 加载密钥
|
||||||
|
receiver_private_key = load_key('receiver_private.pem')
|
||||||
|
sender_public_key = load_key('sender_public.pem')
|
||||||
|
|
||||||
|
# 监听端口
|
||||||
|
port = 12345
|
||||||
|
receive_file(port, receiver_private_key, sender_public_key)
|
||||||
|
@ -1,138 +0,0 @@
|
|||||||
import socket
|
|
||||||
import threading
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
from sender import Sender
|
|
||||||
from receiver import Receiver
|
|
||||||
|
|
||||||
class FileTransferApp:
|
|
||||||
def __init__(self, host, port, role):
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.role = role
|
|
||||||
self.private_key = None
|
|
||||||
self.public_key = None
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def generate_keys(self):
|
|
||||||
key = RSA.generate(2048)
|
|
||||||
self.private_key = key.export_key()
|
|
||||||
self.public_key = key.publickey().export_key()
|
|
||||||
if self.role == 'sender':
|
|
||||||
priv_filename = 'send_private.pem'
|
|
||||||
pub_filename = 'send_public.pem'
|
|
||||||
else:
|
|
||||||
priv_filename = 'receive_private.pem'
|
|
||||||
pub_filename = 'receive_public.pem'
|
|
||||||
with open(priv_filename, 'wb') as priv_file:
|
|
||||||
priv_file.write(self.private_key)
|
|
||||||
with open(pub_filename, 'wb') as pub_file:
|
|
||||||
pub_file.write(self.public_key)
|
|
||||||
print(f"密钥生成成功,已保存到 {priv_filename} 和 {pub_filename}。")
|
|
||||||
|
|
||||||
def load_keys(self):
|
|
||||||
if self.role == 'sender':
|
|
||||||
priv_filename = 'send_private.pem'
|
|
||||||
pub_filename = 'send_public.pem'
|
|
||||||
else:
|
|
||||||
priv_filename = 'receive_private.pem'
|
|
||||||
pub_filename = 'receive_public.pem'
|
|
||||||
try:
|
|
||||||
with open(priv_filename, 'rb') as priv_file:
|
|
||||||
self.private_key = priv_file.read()
|
|
||||||
with open(pub_filename, 'rb') as pub_file:
|
|
||||||
self.public_key = pub_file.read()
|
|
||||||
print("密钥加载成功。")
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("密钥文件未找到,请先生成密钥。")
|
|
||||||
|
|
||||||
def load_public_key(self, filepath):
|
|
||||||
try:
|
|
||||||
with open(filepath, 'rb') as pub_file:
|
|
||||||
return pub_file.read()
|
|
||||||
except FileNotFoundError:
|
|
||||||
print(f"公钥文件 {filepath} 未找到。")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def send_file(self, filepath, receiver_host, receiver_public_key_path):
|
|
||||||
receiver_public_key = self.load_public_key(receiver_public_key_path)
|
|
||||||
if receiver_public_key is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
||||||
s.connect((receiver_host, self.port))
|
|
||||||
|
|
||||||
sender = Sender(filepath, receiver_public_key, self.private_key)
|
|
||||||
data_to_send = sender.send_file()
|
|
||||||
if data_to_send is None:
|
|
||||||
print("文件加密失败,未发送。")
|
|
||||||
return
|
|
||||||
|
|
||||||
s.sendall(data_to_send.encode())
|
|
||||||
print("文件发送成功。")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"发送文件时发生错误: {e}")
|
|
||||||
|
|
||||||
def receive_file(self, sender_public_key_path):
|
|
||||||
sender_public_key = self.load_public_key(sender_public_key_path)
|
|
||||||
if sender_public_key is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
if self.private_key is None:
|
|
||||||
raise ValueError("未生成私钥,请先生成密钥。")
|
|
||||||
|
|
||||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
||||||
s.bind((self.host, self.port))
|
|
||||||
s.listen()
|
|
||||||
print(f"服务器正在 {self.host}:{self.port} 监听...")
|
|
||||||
conn, addr = s.accept()
|
|
||||||
with conn:
|
|
||||||
print(f"已连接到 {addr}")
|
|
||||||
data = b''
|
|
||||||
while True:
|
|
||||||
packet = conn.recv(1024)
|
|
||||||
if not packet:
|
|
||||||
break
|
|
||||||
data += packet
|
|
||||||
|
|
||||||
receiver = Receiver(self.private_key, sender_public_key)
|
|
||||||
decrypted_data = receiver.receive_file(data.decode())
|
|
||||||
|
|
||||||
if decrypted_data is not None:
|
|
||||||
with open('received_file', 'wb') as f:
|
|
||||||
f.write(decrypted_data)
|
|
||||||
print("文件接收并解密成功。")
|
|
||||||
else:
|
|
||||||
print("解密失败,文件未保存。")
|
|
||||||
except ValueError as ve:
|
|
||||||
print(f"值错误: {ve}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"接收文件时发生错误: {e}")
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
while self.running:
|
|
||||||
command = input("输入 'generate_keys' 生成密钥,'load_keys' 加载密钥,'send' 发送文件,'receive' 接收文件,或 'exit' 退出程序: ").strip().lower()
|
|
||||||
if command == 'generate_keys':
|
|
||||||
self.generate_keys()
|
|
||||||
elif command == 'load_keys':
|
|
||||||
self.load_keys()
|
|
||||||
elif command == 'send':
|
|
||||||
filepath = input("输入要发送的文件路径: ")
|
|
||||||
receiver_host = input("输入接收方的计算机IP地址: ")
|
|
||||||
receiver_public_key_path = input("输入接收方的公钥文件路径: ")
|
|
||||||
threading.Thread(target=self.send_file, args=(filepath, receiver_host, receiver_public_key_path)).start()
|
|
||||||
elif command == 'receive':
|
|
||||||
sender_public_key_path = input("输入发送方的公钥文件路径: ")
|
|
||||||
threading.Thread(target=self.receive_file, args=(sender_public_key_path,)).start()
|
|
||||||
elif command == 'exit':
|
|
||||||
self.running = False
|
|
||||||
print("正在退出程序。")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("无效的命令。")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
role = input("输入角色 ('sender' 或 'receiver'): ").strip().lower()
|
|
||||||
app = FileTransferApp(host='0.0.0.0', port=65432, role=role)
|
|
||||||
app.run()
|
|
@ -1,35 +1,53 @@
|
|||||||
from Crypto.Cipher import AES, PKCS1_OAEP
|
import os
|
||||||
from Crypto.Random import get_random_bytes
|
|
||||||
from Crypto.PublicKey import RSA
|
|
||||||
import json
|
import json
|
||||||
import base64
|
import base64
|
||||||
from encryption_utils import AsymmetricEncryption, SymmetricEncryption
|
import socket
|
||||||
|
from utils import aes_encrypt, rsa_encrypt, generate_signature, load_key
|
||||||
class Sender:
|
|
||||||
def __init__(self, filepath, receiver_public_key, sender_private_key):
|
def send_file(filename, receiver_ip, receiver_port, receiver_public_key, sender_private_key):
|
||||||
self.filepath = filepath
|
# 读取文件内容
|
||||||
self.receiver_public_key = RSA.import_key(receiver_public_key)
|
with open(filename, 'rb') as f:
|
||||||
self.sender_private_key = RSA.import_key(sender_private_key)
|
|
||||||
self.asym_enc = AsymmetricEncryption()
|
|
||||||
self.sym_enc = SymmetricEncryption()
|
|
||||||
|
|
||||||
def send_file(self):
|
|
||||||
with open(self.filepath, 'rb') as f:
|
|
||||||
file_data = f.read()
|
file_data = f.read()
|
||||||
|
|
||||||
session_key = get_random_bytes(16)
|
# 生成随机AES密钥
|
||||||
cipher_rsa = PKCS1_OAEP.new(self.receiver_public_key)
|
aes_key = os.urandom(16)
|
||||||
enc_session_key = cipher_rsa.encrypt(session_key)
|
|
||||||
|
# 加密文件
|
||||||
|
nonce, ciphertext, tag = aes_encrypt(file_data, aes_key)
|
||||||
|
|
||||||
|
# 使用接收方公钥加密AES密钥
|
||||||
|
encrypted_aes_key = rsa_encrypt(aes_key, receiver_public_key)
|
||||||
|
|
||||||
nonce, ciphertext, tag = self.sym_enc.encrypt(file_data, session_key)
|
# 生成数字签名(针对原始文件内容)
|
||||||
signature = self.asym_enc.sign_data(file_data)
|
signature = generate_signature(sender_private_key, file_data)
|
||||||
|
|
||||||
data_to_send = {
|
# 使用 Base64 对所有数据进行编码
|
||||||
'enc_session_key': base64.b64encode(enc_session_key).decode(),
|
data_packet = {
|
||||||
'nonce': base64.b64encode(nonce).decode(),
|
'nonce': base64.b64encode(nonce).decode(),
|
||||||
'tag': base64.b64encode(tag).decode(),
|
|
||||||
'ciphertext': base64.b64encode(ciphertext).decode(),
|
'ciphertext': base64.b64encode(ciphertext).decode(),
|
||||||
'signature': base64.b64encode(signature).decode()
|
'tag': base64.b64encode(tag).decode(),
|
||||||
|
'encrypted_key': base64.b64encode(encrypted_aes_key).decode(),
|
||||||
|
'signature': base64.b64encode(signature).decode(),
|
||||||
|
'filename': os.path.basename(filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
return json.dumps(data_to_send)
|
# 转换为 JSON 格式
|
||||||
|
data_packet_json = json.dumps(data_packet)
|
||||||
|
|
||||||
|
# 建立网络连接并发送数据
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||||
|
s.connect((receiver_ip, receiver_port))
|
||||||
|
s.sendall(data_packet_json.encode('utf-8'))
|
||||||
|
print("文件已发送!")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 加载密钥
|
||||||
|
receiver_public_key = load_key('receiver_public.pem')
|
||||||
|
sender_private_key = load_key('sender_private.pem')
|
||||||
|
|
||||||
|
# 文件路径和接收方信息
|
||||||
|
filename = '1.txt'
|
||||||
|
receiver_ip = '192.168.199.134' # 替换为接收方的IP地址
|
||||||
|
receiver_port = 12345
|
||||||
|
|
||||||
|
send_file(filename, receiver_ip, receiver_port, receiver_public_key, sender_private_key)
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
from utils import aes_encrypt, rsa_encrypt, generate_signature, load_key
|
||||||
|
|
||||||
|
def send_file(filename, receiver_public_key, sender_private_key):
|
||||||
|
# 读取文件内容
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
file_data = f.read()
|
||||||
|
|
||||||
|
# 生成随机AES密钥
|
||||||
|
aes_key = os.urandom(16)
|
||||||
|
|
||||||
|
# 加密文件
|
||||||
|
nonce, ciphertext, tag = aes_encrypt(file_data, aes_key)
|
||||||
|
|
||||||
|
# 使用接收方公钥加密AES密钥
|
||||||
|
encrypted_aes_key = rsa_encrypt(aes_key, receiver_public_key)
|
||||||
|
|
||||||
|
# 生成数字签名(针对原始文件内容)
|
||||||
|
signature = generate_signature(sender_private_key, file_data)
|
||||||
|
|
||||||
|
# 封装数据包
|
||||||
|
data_packet = {
|
||||||
|
'nonce': base64.b64encode(nonce).decode(),
|
||||||
|
'ciphertext': base64.b64encode(ciphertext).decode(),
|
||||||
|
'tag': base64.b64encode(tag).decode(),
|
||||||
|
'encrypted_key': base64.b64encode(encrypted_aes_key).decode(),
|
||||||
|
'signature': base64.b64encode(signature).decode(),
|
||||||
|
'filename': os.path.basename(filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
# 保存数据包到文件
|
||||||
|
with open('data_packet.json', 'w') as f:
|
||||||
|
json.dump(data_packet, f)
|
||||||
|
|
||||||
|
print("文件已加密并发送。")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
receiver_public_key = load_key('receiver_public.pem')
|
||||||
|
sender_private_key = load_key('sender_private.pem')
|
||||||
|
send_file('example.txt', receiver_public_key, sender_private_key)
|
||||||
|
|
@ -0,0 +1,70 @@
|
|||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.Cipher import PKCS1_OAEP, AES
|
||||||
|
from Crypto.Signature import pkcs1_15
|
||||||
|
from Crypto.Hash import SHA256
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
# 生成RSA密钥对
|
||||||
|
def generate_rsa_keypair():
|
||||||
|
key = RSA.generate(2048)
|
||||||
|
private_key = key.export_key()
|
||||||
|
public_key = key.publickey().export_key()
|
||||||
|
return private_key, public_key
|
||||||
|
|
||||||
|
|
||||||
|
# 保存密钥到文件
|
||||||
|
def save_key(filename, key):
|
||||||
|
with open(filename, 'wb') as f:
|
||||||
|
f.write(key)
|
||||||
|
|
||||||
|
|
||||||
|
# 加载密钥
|
||||||
|
def load_key(filename):
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
|
|
||||||
|
# AES加密文件内容
|
||||||
|
def aes_encrypt(data, key):
|
||||||
|
cipher = AES.new(key, AES.MODE_EAX)
|
||||||
|
ciphertext, tag = cipher.encrypt_and_digest(data)
|
||||||
|
return cipher.nonce, ciphertext, tag
|
||||||
|
|
||||||
|
|
||||||
|
# AES解密文件内容
|
||||||
|
def aes_decrypt(nonce, ciphertext, tag, key):
|
||||||
|
cipher = AES.new(key, AES.MODE_EAX, nonce=nonce)
|
||||||
|
return cipher.decrypt_and_verify(ciphertext, tag)
|
||||||
|
|
||||||
|
|
||||||
|
# 使用RSA加密对称密钥
|
||||||
|
def rsa_encrypt(data, public_key):
|
||||||
|
cipher = PKCS1_OAEP.new(RSA.import_key(public_key))
|
||||||
|
return cipher.encrypt(data)
|
||||||
|
|
||||||
|
|
||||||
|
# 使用RSA解密对称密钥
|
||||||
|
def rsa_decrypt(data, private_key):
|
||||||
|
cipher = PKCS1_OAEP.new(RSA.import_key(private_key))
|
||||||
|
return cipher.decrypt(data)
|
||||||
|
|
||||||
|
|
||||||
|
# 生成数字签名
|
||||||
|
def generate_signature(private_key, data):
|
||||||
|
key = RSA.import_key(private_key)
|
||||||
|
h = SHA256.new(data)
|
||||||
|
signature = pkcs1_15.new(key).sign(h)
|
||||||
|
return signature
|
||||||
|
|
||||||
|
|
||||||
|
# 验证数字签名
|
||||||
|
def verify_signature(public_key, data, signature):
|
||||||
|
key = RSA.import_key(public_key)
|
||||||
|
h = SHA256.new(data)
|
||||||
|
try:
|
||||||
|
pkcs1_15.new(key).verify(h, signature)
|
||||||
|
return True
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return False
|
Loading…
Reference in new issue