parent
51adade261
commit
013f7a9067
@ -0,0 +1,120 @@
|
||||
import os
|
||||
import socket
|
||||
from Crypto.Cipher import AES, PKCS1_OAEP
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Signature import pkcs1_15
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.Util.Padding import unpad
|
||||
|
||||
|
||||
def generate_keys_if_not_exists():
|
||||
"""生成公钥和私钥,如果文件不存在"""
|
||||
if not os.path.exists('receiver_public.pem') or not os.path.exists('receiver_private.pem'):
|
||||
print("正在生成公钥和私钥...")
|
||||
generate_keys()
|
||||
else:
|
||||
print("公钥和私钥已存在。")
|
||||
|
||||
|
||||
def generate_keys():
|
||||
"""生成公钥和私钥并保存为文件"""
|
||||
key = RSA.generate(2048)
|
||||
|
||||
# 导出私钥
|
||||
private_key = key.export_key()
|
||||
with open('receiver_private.pem', 'wb') as f:
|
||||
f.write(private_key)
|
||||
print("私钥已保存为 'receiver_private.pem'")
|
||||
|
||||
# 导出公钥
|
||||
public_key = key.publickey().export_key()
|
||||
with open('receiver_public.pem', 'wb') as f:
|
||||
f.write(public_key)
|
||||
print("公钥已保存为 'receiver_public.pem'")
|
||||
|
||||
# 读取私钥
|
||||
def load_private_key():
|
||||
with open('receiver_private.pem', 'rb') as f:
|
||||
private_key = RSA.import_key(f.read())
|
||||
return private_key
|
||||
|
||||
# 读取发送方公钥
|
||||
def load_public_key():
|
||||
with open('public.pem', 'rb') as f:
|
||||
public_key = RSA.import_key(f.read())
|
||||
return public_key
|
||||
|
||||
# 解密文件
|
||||
def decrypt_file(encrypted_session_key, iv, ciphertext, private_key):
|
||||
# 使用 RSA 解密 AES 密钥
|
||||
|
||||
cipher_rsa = PKCS1_OAEP.new(private_key)
|
||||
session_key = cipher_rsa.decrypt(encrypted_session_key)
|
||||
|
||||
# 使用 AES 解密文件内容
|
||||
mode = AES.MODE_CBC if iv else AES.MODE_ECB # 如果有 IV 则使用 CBC 模式,否则使用 ECB 模式
|
||||
|
||||
cipher_aes = AES.new(session_key, mode, iv=iv if iv else None)
|
||||
|
||||
# 解密并去除填充
|
||||
try:
|
||||
plaintext = unpad(cipher_aes.decrypt(ciphertext), AES.block_size)
|
||||
except ValueError:
|
||||
print("错误:填充数据不正确或密文已损坏。")
|
||||
raise
|
||||
|
||||
return plaintext
|
||||
|
||||
# 验证签名
|
||||
def verify_signature(filename, signature, public_key):
|
||||
# 计算文件的 SHA-256 哈希值
|
||||
hash_obj = SHA256.new()
|
||||
with open(filename, 'rb') as f:
|
||||
while chunk := f.read(4096):
|
||||
hash_obj.update(chunk)
|
||||
|
||||
# 使用公钥验证签名
|
||||
verifier = pkcs1_15.new(public_key)
|
||||
try:
|
||||
verifier.verify(hash_obj, signature)
|
||||
print("签名验证成功。")
|
||||
except (ValueError, TypeError):
|
||||
print("签名验证失败。")
|
||||
|
||||
# 服务器接收加密文件并解密
|
||||
def receive_encrypted_file(host, port):
|
||||
# 监听客户端连接
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server_socket.bind((host, port))
|
||||
server_socket.listen(1)
|
||||
|
||||
print(f"服务器正在监听 {host}:{port} ...")
|
||||
client_socket, addr = server_socket.accept()
|
||||
print(f"与客户端 {addr} 建立了连接。")
|
||||
|
||||
# 接收加密的 AES 密钥、IV 和加密的文件内容
|
||||
encrypted_session_key = client_socket.recv(256) # RSA 密钥长度 2048 位 (256 字节)
|
||||
iv = client_socket.recv(16) # AES 块大小为 16 字节
|
||||
ciphertext = client_socket.recv(16)
|
||||
signature = client_socket.recv(256)
|
||||
|
||||
# 解密文件
|
||||
private_key = load_private_key()
|
||||
decrypted_file = decrypt_file(encrypted_session_key, iv, ciphertext, private_key)
|
||||
# 保存解密后的文件
|
||||
with open('received_file.txt', 'wb') as f:
|
||||
f.write(decrypted_file)
|
||||
with open('received_file.txt', 'r', encoding='utf-8') as f:
|
||||
print("解密后的文件内容:")
|
||||
print(f.read())
|
||||
# 验证签名
|
||||
verify_signature('received_file.txt', signature, load_public_key())
|
||||
|
||||
|
||||
|
||||
print("文件接收并解密成功。")
|
||||
client_socket.close()
|
||||
|
||||
# 使用示例
|
||||
generate_keys_if_not_exists()
|
||||
receive_encrypted_file('127.0.0.1', 59290)
|
Loading…
Reference in new issue