签名错误?

annn
vernuser 9 months ago
parent 16e4438379
commit 45f7cbb963

8
.idea/.gitignore vendored

@ -1,8 +0,0 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MaterialThemeProjectNewConfig">
<option name="metadata">
<MTProjectMetadataState>
<option name="userId" value="-a35cb17:193adf3a2bf:-7ff2" />
</MTProjectMetadataState>
</option>
</component>
</project>

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13" project-jdk-type="Python SDK" />
</project>

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/应用密码学课设.iml" filepath="$PROJECT_DIR$/.idea/应用密码学课设.iml" />
</modules>
</component>
</project>

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.13" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1 @@
1

@ -1,6 +1,7 @@
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP, AES
from Crypto.Random import get_random_bytes
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
class AsymmetricEncryption:
@ -21,6 +22,19 @@ class AsymmetricEncryption:
cipher_rsa = PKCS1_OAEP.new(self.key_pair)
return cipher_rsa.decrypt(encrypted_data)
def sign_data(self, data):
h = SHA256.new(data)
signature = pkcs1_15.new(self.key_pair).sign(h)
return signature
def verify_signature(self, data, signature, public_key):
h = SHA256.new(data)
try:
pkcs1_15.new(public_key).verify(h, signature)
return True
except (ValueError, TypeError):
return False
class SymmetricEncryption:
def encrypt(self, data, key):
cipher_aes = AES.new(key, AES.MODE_GCM)
@ -30,8 +44,4 @@ class SymmetricEncryption:
def decrypt(self, nonce, ciphertext, tag, key):
cipher_aes = AES.new(key, AES.MODE_GCM, nonce=nonce)
return cipher_aes.decrypt_and_verify(ciphertext, tag)
def calculate_file_hash(file_data):
hash_obj = SHA256.new(file_data)
return hash_obj.digest()
return cipher_aes.decrypt_and_verify(ciphertext, tag)

@ -1,2 +0,0 @@
123456
asdfghjkl

@ -1,15 +1,14 @@
import socket
import threading
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Random import get_random_bytes
from Crypto.Hash import SHA256
from sender import Sender
from receiver import Receiver
class FileTransferApp:
def __init__(self, host, port):
def __init__(self, host, port, role):
self.host = host
self.port = port
self.role = role
self.private_key = None
self.public_key = None
self.running = True
@ -18,55 +17,114 @@ class FileTransferApp:
key = RSA.generate(2048)
self.private_key = key.export_key()
self.public_key = key.publickey().export_key()
print("密钥生成成功。")
if self.role == 'sender':
priv_filename = 'send_private.pem'
pub_filename = 'send_public.pem'
else:
priv_filename = 'receive_private.pem'
pub_filename = 'receive_public.pem'
with open(priv_filename, 'wb') as priv_file:
priv_file.write(self.private_key)
with open(pub_filename, 'wb') as pub_file:
pub_file.write(self.public_key)
print(f"密钥生成成功,已保存到 {priv_filename}{pub_filename}")
def load_keys(self):
if self.role == 'sender':
priv_filename = 'send_private.pem'
pub_filename = 'send_public.pem'
else:
priv_filename = 'receive_private.pem'
pub_filename = 'receive_public.pem'
try:
with open(priv_filename, 'rb') as priv_file:
self.private_key = priv_file.read()
with open(pub_filename, 'rb') as pub_file:
self.public_key = pub_file.read()
print("密钥加载成功。")
except FileNotFoundError:
print("密钥文件未找到,请先生成密钥。")
def load_public_key(self, filepath):
try:
with open(filepath, 'rb') as pub_file:
return pub_file.read()
except FileNotFoundError:
print(f"公钥文件 {filepath} 未找到。")
return None
def send_file(self, filepath, receiver_host, receiver_public_key_path):
receiver_public_key = self.load_public_key(receiver_public_key_path)
if receiver_public_key is None:
return
def send_file(self, filepath, receiver_host):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((receiver_host, self.port))
sender = Sender(filepath, receiver_public_key=self.public_key)
sender = Sender(filepath, receiver_public_key, self.private_key)
data_to_send = sender.send_file()
s.sendall(data_to_send)
if data_to_send is None:
print("文件加密失败,未发送。")
return
s.sendall(data_to_send.encode())
print("文件发送成功。")
except Exception as e:
print(f"发送文件时发生错误: {e}")
def receive_file(self):
def receive_file(self, sender_public_key_path):
sender_public_key = self.load_public_key(sender_public_key_path)
if sender_public_key is None:
return
try:
if self.private_key is None:
raise ValueError("未生成私钥,请先生成密钥。")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
s.listen()
print(f"服务器正在监听 {self.host}:{self.port}")
print(f"服务器正在 {self.host}:{self.port} 监听...")
conn, addr = s.accept()
with conn:
print(f"已连接 {addr}")
print(f"已连接 {addr}")
data = b''
while True:
packet = conn.recv(1024)
if not packet:
break
data += packet
receiver = Receiver(private_key=self.private_key)
decrypted_data = receiver.receive_file(data)
if decrypted_data:
# 保存或处理解密后的文件数据
receiver = Receiver(self.private_key, sender_public_key)
decrypted_data = receiver.receive_file(data.decode())
if decrypted_data is not None:
with open('received_file', 'wb') as f:
f.write(decrypted_data)
print("文件接收并解密成功。")
else:
print("解密失败,文件未保存。")
except ValueError as ve:
print(f"值错误: {ve}")
except Exception as e:
print(f"接收文件时发生错误: {e}")
def run(self):
while self.running:
command = input("输入 'generate_keys' 生成密钥,'send' 发送文件,'receive' 接收文件,或 'exit' 退出程序: ").strip().lower()
command = input("输入 'generate_keys' 生成密钥,'load_keys' 加载密钥,'send' 发送文件,'receive' 接收文件,或 'exit' 退出程序: ").strip().lower()
if command == 'generate_keys':
self.generate_keys()
elif command == 'load_keys':
self.load_keys()
elif command == 'send':
filepath = input("输入要发送的文件路径: ")
receiver_host = input("输入接收方的计算机IP地址: ")
threading.Thread(target=self.send_file, args=(filepath, receiver_host)).start()
receiver_public_key_path = input("输入接收方的公钥文件路径: ")
threading.Thread(target=self.send_file, args=(filepath, receiver_host, receiver_public_key_path)).start()
elif command == 'receive':
threading.Thread(target=self.receive_file).start()
sender_public_key_path = input("输入发送方的公钥文件路径: ")
threading.Thread(target=self.receive_file, args=(sender_public_key_path,)).start()
elif command == 'exit':
self.running = False
print("正在退出程序。")
@ -75,5 +133,6 @@ class FileTransferApp:
print("无效的命令。")
if __name__ == "__main__":
app = FileTransferApp(host='0.0.0.0', port=65432)
app.run()
role = input("输入角色 ('sender''receiver'): ").strip().lower()
app = FileTransferApp(host='0.0.0.0', port=65432, role=role)
app.run()

@ -1,31 +1,30 @@
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
import json
import base64
from encryption_utils import SymmetricEncryption, AsymmetricEncryption, calculate_file_hash
from encryption_utils import AsymmetricEncryption, SymmetricEncryption
class Receiver:
def __init__(self, private_key):
self.private_key = private_key
def receive_file(self, encoded_data_packet):
decoded_data_packet = base64.b64decode(encoded_data_packet)
nonce = decoded_data_packet[:16]
ciphertext = decoded_data_packet[16:-256-64]
tag = decoded_data_packet[-256-64:-256]
encrypted_symmetric_key = decoded_data_packet[-256:]
original_hash = decoded_data_packet[-256-64:-256] # 保持为字节
asymmetric_encryption = AsymmetricEncryption()
decrypted_symmetric_key = asymmetric_encryption.decrypt_with_private_key(encrypted_symmetric_key)
symmetric_encryption = SymmetricEncryption()
decrypted_file_data = symmetric_encryption.decrypt(nonce, ciphertext, tag, decrypted_symmetric_key)
# Recalculate hash of decrypted file data
received_file_hash = calculate_file_hash(decrypted_file_data).encode()
if received_file_hash == original_hash:
print("文件传输成功,文件完整性验证通过。")
def __init__(self, private_key, sender_public_key):
self.private_key = RSA.import_key(private_key)
self.sender_public_key = RSA.import_key(sender_public_key)
self.asym_enc = AsymmetricEncryption()
self.sym_enc = SymmetricEncryption()
def receive_file(self, data):
data = json.loads(data)
enc_session_key = base64.b64decode(data['enc_session_key'])
nonce = base64.b64decode(data['nonce'])
tag = base64.b64decode(data['tag'])
ciphertext = base64.b64decode(data['ciphertext'])
signature = base64.b64decode(data['signature'])
cipher_rsa = PKCS1_OAEP.new(self.private_key)
session_key = cipher_rsa.decrypt(enc_session_key)
file_data = self.sym_enc.decrypt(nonce, ciphertext, tag, session_key)
if self.asym_enc.verify_signature(file_data, signature, self.sender_public_key):
return file_data
else:
print("文件传输失败,文件可能已被篡改。")
return decrypted_file_data
raise ValueError("签名验证失败")

@ -1,35 +1,138 @@
from encryption_utils import AsymmetricEncryption, SymmetricEncryption
from Crypto.Random import get_random_bytes
import socket
import threading
from Crypto.PublicKey import RSA
from sender import Sender
from receiver import Receiver
def run_example():
# 生成接收方的密钥对
receiver_asymmetric = AsymmetricEncryption()
receiver_public_key = receiver_asymmetric.get_public_key()
receiver_private_key = receiver_asymmetric.get_private_key()
class FileTransferApp:
def __init__(self, host, port, role):
self.host = host
self.port = port
self.role = role
self.private_key = None
self.public_key = None
self.running = True
# 生成对称密钥
symmetric_key = get_random_bytes(16) # AES 128 位密钥
def generate_keys(self):
key = RSA.generate(2048)
self.private_key = key.export_key()
self.public_key = key.publickey().export_key()
if self.role == 'sender':
priv_filename = 'send_private.pem'
pub_filename = 'send_public.pem'
else:
priv_filename = 'receive_private.pem'
pub_filename = 'receive_public.pem'
with open(priv_filename, 'wb') as priv_file:
priv_file.write(self.private_key)
with open(pub_filename, 'wb') as pub_file:
pub_file.write(self.public_key)
print(f"密钥生成成功,已保存到 {priv_filename}{pub_filename}")
# 使用接收方的公钥加密对称密钥
encrypted_symmetric_key = receiver_asymmetric.encrypt_with_public_key(symmetric_key, receiver_public_key)
def load_keys(self):
if self.role == 'sender':
priv_filename = 'send_private.pem'
pub_filename = 'send_public.pem'
else:
priv_filename = 'receive_private.pem'
pub_filename = 'receive_public.pem'
try:
with open(priv_filename, 'rb') as priv_file:
self.private_key = priv_file.read()
with open(pub_filename, 'rb') as pub_file:
self.public_key = pub_file.read()
print("密钥加载成功。")
except FileNotFoundError:
print("密钥文件未找到,请先生成密钥。")
# 读取文件内容
with open('example.txt', 'rb') as file:
file_data = file.read()
def load_public_key(self, filepath):
try:
with open(filepath, 'rb') as pub_file:
return pub_file.read()
except FileNotFoundError:
print(f"公钥文件 {filepath} 未找到。")
return None
# 使用对称密钥加密文件内容
symmetric_encryption = SymmetricEncryption()
nonce, ciphertext, tag = symmetric_encryption.encrypt(file_data, symmetric_key)
def send_file(self, filepath, receiver_host, receiver_public_key_path):
receiver_public_key = self.load_public_key(receiver_public_key_path)
if receiver_public_key is None:
return
# 模拟发送和接收
encoded_packet = (encrypted_symmetric_key, nonce, ciphertext, tag)
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((receiver_host, self.port))
# 使用接收方私钥解密对称密钥
decrypted_symmetric_key = receiver_asymmetric.decrypt_with_private_key(encoded_packet[0])
sender = Sender(filepath, receiver_public_key, self.private_key)
data_to_send = sender.send_file()
if data_to_send is None:
print("文件加密失败,未发送。")
return
# 使用解密后的对称密钥解密文件内容
decrypted_data = symmetric_encryption.decrypt(encoded_packet[1], encoded_packet[2], encoded_packet[3], decrypted_symmetric_key)
print("Decrypted Data:", decrypted_data.decode('utf-8'))
s.sendall(data_to_send.encode())
print("文件发送成功。")
except Exception as e:
print(f"发送文件时发生错误: {e}")
def receive_file(self, sender_public_key_path):
sender_public_key = self.load_public_key(sender_public_key_path)
if sender_public_key is None:
return
try:
if self.private_key is None:
raise ValueError("未生成私钥,请先生成密钥。")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
s.listen()
print(f"服务器正在 {self.host}:{self.port} 监听...")
conn, addr = s.accept()
with conn:
print(f"已连接到 {addr}")
data = b''
while True:
packet = conn.recv(1024)
if not packet:
break
data += packet
receiver = Receiver(self.private_key, sender_public_key)
decrypted_data = receiver.receive_file(data.decode())
if decrypted_data is not None:
with open('received_file', 'wb') as f:
f.write(decrypted_data)
print("文件接收并解密成功。")
else:
print("解密失败,文件未保存。")
except ValueError as ve:
print(f"值错误: {ve}")
except Exception as e:
print(f"接收文件时发生错误: {e}")
def run(self):
while self.running:
command = input("输入 'generate_keys' 生成密钥,'load_keys' 加载密钥,'send' 发送文件,'receive' 接收文件,或 'exit' 退出程序: ").strip().lower()
if command == 'generate_keys':
self.generate_keys()
elif command == 'load_keys':
self.load_keys()
elif command == 'send':
filepath = input("输入要发送的文件路径: ")
receiver_host = input("输入接收方的计算机IP地址: ")
receiver_public_key_path = input("输入接收方的公钥文件路径: ")
threading.Thread(target=self.send_file, args=(filepath, receiver_host, receiver_public_key_path)).start()
elif command == 'receive':
sender_public_key_path = input("输入发送方的公钥文件路径: ")
threading.Thread(target=self.receive_file, args=(sender_public_key_path,)).start()
elif command == 'exit':
self.running = False
print("正在退出程序。")
break
else:
print("无效的命令。")
if __name__ == "__main__":
run_example()
role = input("输入角色 ('sender''receiver'): ").strip().lower()
app = FileTransferApp(host='0.0.0.0', port=65432, role=role)
app.run()

@ -1,25 +1,35 @@
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Random import get_random_bytes
from Crypto.PublicKey import RSA
import json
import base64
from Crypto.Random import get_random_bytes # 确保导入随机字节生成函数
from encryption_utils import SymmetricEncryption, AsymmetricEncryption, calculate_file_hash
from encryption_utils import AsymmetricEncryption, SymmetricEncryption
class Sender:
def __init__(self, filepath, receiver_public_key):
def __init__(self, filepath, receiver_public_key, sender_private_key):
self.filepath = filepath
self.receiver_public_key = receiver_public_key
self.receiver_public_key = RSA.import_key(receiver_public_key)
self.sender_private_key = RSA.import_key(sender_private_key)
self.asym_enc = AsymmetricEncryption()
self.sym_enc = SymmetricEncryption()
def send_file(self):
with open(self.filepath, 'rb') as f:
file_data = f.read()
symmetric_key = get_random_bytes(16)
symmetric_encryption = SymmetricEncryption()
nonce, ciphertext, tag = symmetric_encryption.encrypt(file_data, symmetric_key)
session_key = get_random_bytes(16)
cipher_rsa = PKCS1_OAEP.new(self.receiver_public_key)
enc_session_key = cipher_rsa.encrypt(session_key)
file_hash = calculate_file_hash(file_data)
nonce, ciphertext, tag = self.sym_enc.encrypt(file_data, session_key)
signature = self.asym_enc.sign_data(file_data)
asymmetric_encryption = AsymmetricEncryption()
encrypted_symmetric_key = asymmetric_encryption.encrypt_with_public_key(symmetric_key, self.receiver_public_key)
data_to_send = {
'enc_session_key': base64.b64encode(enc_session_key).decode(),
'nonce': base64.b64encode(nonce).decode(),
'tag': base64.b64encode(tag).decode(),
'ciphertext': base64.b64encode(ciphertext).decode(),
'signature': base64.b64encode(signature).decode()
}
encoded_data_packet = base64.b64encode(nonce + ciphertext + tag + encrypted_symmetric_key + file_hash.encode())
return encoded_data_packet
return json.dumps(data_to_send)
Loading…
Cancel
Save