You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
121 lines
3.9 KiB
121 lines
3.9 KiB
import os
|
|
import socket
|
|
from Crypto.Cipher import AES, PKCS1_OAEP
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.Signature import pkcs1_15
|
|
from Crypto.Hash import SHA256
|
|
from Crypto.Util.Padding import unpad
|
|
|
|
|
|
def generate_keys_if_not_exists():
|
|
"""生成公钥和私钥,如果文件不存在"""
|
|
if not os.path.exists('receiver_public.pem') or not os.path.exists('receiver_private.pem'):
|
|
print("正在生成公钥和私钥...")
|
|
generate_keys()
|
|
else:
|
|
print("公钥和私钥已存在。")
|
|
|
|
|
|
def generate_keys():
|
|
"""生成公钥和私钥并保存为文件"""
|
|
key = RSA.generate(2048)
|
|
|
|
# 导出私钥
|
|
private_key = key.export_key()
|
|
with open('receiver_private.pem', 'wb') as f:
|
|
f.write(private_key)
|
|
print("私钥已保存为 'receiver_private.pem'")
|
|
|
|
# 导出公钥
|
|
public_key = key.publickey().export_key()
|
|
with open('receiver_public.pem', 'wb') as f:
|
|
f.write(public_key)
|
|
print("公钥已保存为 'receiver_public.pem'")
|
|
|
|
# 读取私钥
|
|
def load_private_key():
|
|
with open('receiver_private.pem', 'rb') as f:
|
|
private_key = RSA.import_key(f.read())
|
|
return private_key
|
|
|
|
# 读取发送方公钥
|
|
def load_public_key():
|
|
with open('public.pem', 'rb') as f:
|
|
public_key = RSA.import_key(f.read())
|
|
return public_key
|
|
|
|
# 解密文件
|
|
def decrypt_file(encrypted_session_key, iv, ciphertext, private_key):
|
|
# 使用 RSA 解密 AES 密钥
|
|
|
|
cipher_rsa = PKCS1_OAEP.new(private_key)
|
|
session_key = cipher_rsa.decrypt(encrypted_session_key)
|
|
|
|
# 使用 AES 解密文件内容
|
|
mode = AES.MODE_CBC if iv else AES.MODE_ECB # 如果有 IV 则使用 CBC 模式,否则使用 ECB 模式
|
|
|
|
cipher_aes = AES.new(session_key, mode, iv=iv if iv else None)
|
|
|
|
# 解密并去除填充
|
|
try:
|
|
plaintext = unpad(cipher_aes.decrypt(ciphertext), AES.block_size)
|
|
except ValueError:
|
|
print("错误:填充数据不正确或密文已损坏。")
|
|
raise
|
|
|
|
return plaintext
|
|
|
|
# 验证签名
|
|
def verify_signature(filename, signature, public_key):
|
|
# 计算文件的 SHA-256 哈希值
|
|
hash_obj = SHA256.new()
|
|
with open(filename, 'rb') as f:
|
|
while chunk := f.read(4096):
|
|
hash_obj.update(chunk)
|
|
|
|
# 使用公钥验证签名
|
|
verifier = pkcs1_15.new(public_key)
|
|
try:
|
|
verifier.verify(hash_obj, signature)
|
|
print("签名验证成功。")
|
|
except (ValueError, TypeError):
|
|
print("签名验证失败。")
|
|
|
|
# 服务器接收加密文件并解密
|
|
def receive_encrypted_file(host, port):
|
|
# 监听客户端连接
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
server_socket.bind((host, port))
|
|
server_socket.listen(1)
|
|
|
|
print(f"服务器正在监听 {host}:{port} ...")
|
|
client_socket, addr = server_socket.accept()
|
|
print(f"与客户端 {addr} 建立了连接。")
|
|
|
|
# 接收加密的 AES 密钥、IV 和加密的文件内容
|
|
encrypted_session_key = client_socket.recv(256) # RSA 密钥长度 2048 位 (256 字节)
|
|
iv = client_socket.recv(16) # AES 块大小为 16 字节
|
|
ciphertext = client_socket.recv(16)
|
|
signature = client_socket.recv(256)
|
|
|
|
# 解密文件
|
|
private_key = load_private_key()
|
|
decrypted_file = decrypt_file(encrypted_session_key, iv, ciphertext, private_key)
|
|
# 保存解密后的文件
|
|
with open('received_file.txt', 'wb') as f:
|
|
f.write(decrypted_file)
|
|
with open('received_file.txt', 'r', encoding='utf-8') as f:
|
|
print("解密后的文件内容:")
|
|
print(f.read())
|
|
# 验证签名
|
|
verify_signature('received_file.txt', signature, load_public_key())
|
|
|
|
|
|
|
|
print("文件接收并解密成功。")
|
|
client_socket.close()
|
|
|
|
# 使用示例
|
|
generate_keys_if_not_exists()
|
|
receive_encrypted_file('127.0.0.1', 59290)
|