From 658c939613b23b3a93eb11709d2446374f681666 Mon Sep 17 00:00:00 2001 From: Timmoc Date: Sun, 24 Nov 2024 14:28:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90RSA=E3=80=81=E5=AE=8C?= =?UTF-8?q?=E6=88=90SM4=E3=80=81=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=8E=A5=E6=94=B6=E8=80=85=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 ++++ config/config.py | 25 +++++++++++++- recv/recv.py | 44 ++++++++++++++++++++++++- recv/test_recv.py | 8 +++++ tool/PriKeyHelper.py | 28 +++++++++++++--- tool/asymmetric/RSA.py | 68 +++++++++++++++++++++++++++++++++++++++ tool/symmetric/SM4.py | 67 ++++++++++++++++++++++++++++++++++++++ tool/test_PriKeyHelper.py | 39 ++++++++++++++++++++++ 8 files changed, 279 insertions(+), 6 deletions(-) create mode 100644 recv/test_recv.py create mode 100644 tool/test_PriKeyHelper.py diff --git a/.gitignore b/.gitignore index a09c56d..56c7dcd 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,7 @@ /.idea +/tool/asymmetric/private.pem +/tool/asymmetric/public.pem +/tool/public.pem +/tool/private.pem +/private.pem +/public.pem diff --git a/config/config.py b/config/config.py index fb63607..6804a94 100644 --- a/config/config.py +++ b/config/config.py @@ -1,2 +1,25 @@ +from enum import Enum, auto port = 8426 -priKeySavePath = "./priKey.txt" \ No newline at end of file +priKeySavePath = "./private.pem" +pubKeySavePath = "./public.pem" + +class EncryptType(Enum): + AES_ECB = auto() + AES_CBC = auto() + AES_CFB = auto() + AES_OFB = auto() + SM4_ECB = auto() + SM4_CBC = auto() + +# 函数:通过文本获取枚举成员 +def getEncryptType(text): + try: + return EncryptType[text] + except KeyError: + raise ValueError(f"Invalid EncryptType name: {text}") + +# 函数:通过枚举成员获取文本 +def getEncryptTypeName(encryptType): + if not isinstance(encryptType, EncryptType): + raise TypeError("Expected a EncryptType enum member") + return encryptType.name \ No newline at end of file diff --git a/recv/recv.py b/recv/recv.py index d0c8c31..a215b8f 100644 --- a/recv/recv.py +++ b/recv/recv.py @@ -1,4 +1,11 @@ +import base64 + +from config import config from entity.Letter import Letter +from tool import PriKeyHelper +from tool.asymmetric import RSA +from tool.symmetric import SM4 + def getLetter(): # 阻塞自身 从指定端口获取信件 @@ -9,4 +16,39 @@ def getLetter(): pass def handleLetter(letter:Letter): - pass \ No newline at end of file + # 解析信件 确认收信人 + # 获取自身key + pki = PriKeyHelper.getUserKey() + if pki[1] != letter.recvPubKey: + raise Exception("信件不属于自己") + # 用自己的私钥解密key 获得对称加密秘钥。 + key = RSA.decrypt_message(letter.encryptKey,pki[0]) + + # 根据不同的对称加密算法 + try: + type = config.getEncryptType(letter.encryptType) + except KeyError: + raise KeyError("不支持的对称加密算法") + # 进行解密fileBase64 + data = None + if type == config.EncryptType.SM4_ECB: + SM4.decrypt_ecb(base64.b64decode(letter.fileBase64),key) + elif type == config.EncryptType.SM4_CBC: + SM4.decrypt_cbc_with_iv(base64.b64decode(letter.fileBase64),key) + elif type == config.EncryptType.AES_GCM: + raise NotImplementedError("未实现") + elif type == config.EncryptType.AES_CBC: + raise NotImplementedError("未实现") + else: + raise KeyError("不支持的对称加密算法") + + # 用发信人的公钥验签摘要 + result = RSA.verify_signature(data,letter.sign,letter.senderPubKey) + if not result: + raise Exception("签名验证失败,文件不可信") + + # 保存文件 + with open(f"./{letter.fileName}","wb") as f: + f.write(data) + + return \ No newline at end of file diff --git a/recv/test_recv.py b/recv/test_recv.py new file mode 100644 index 0000000..26601b1 --- /dev/null +++ b/recv/test_recv.py @@ -0,0 +1,8 @@ +from sender import sender +from tool import PriKeyHelper + + +def test_handleLetter(): + pki = PriKeyHelper.getUserKey() + + pass \ No newline at end of file diff --git a/tool/PriKeyHelper.py b/tool/PriKeyHelper.py index 23feb16..63b12d3 100644 --- a/tool/PriKeyHelper.py +++ b/tool/PriKeyHelper.py @@ -1,4 +1,24 @@ -def getUserKey() -> (str, str): - # 获取用户的公私钥对,若不存在,则生成之 - # 返回 私钥,公钥 格式 - pass +import base64 +import os + +from config import config +from tool.asymmetric import RSA + + +def getUserKey() -> (str, str): # 返回base64编码 + # 检查是否存在公私钥文件 + if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath): + # 生成新的密钥对 + RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath) + + # 读取私钥 + with open(config.priKeySavePath, "rb") as f: + data = f.read() + private_key = base64.b64encode(data).decode('utf-8') + + # 读取公钥 + with open(config.pubKeySavePath, "rb") as f: + data = f.read() + public_key = base64.b64encode(data).decode('utf-8') + + return private_key, public_key diff --git a/tool/asymmetric/RSA.py b/tool/asymmetric/RSA.py index e69de29..652e4f5 100644 --- a/tool/asymmetric/RSA.py +++ b/tool/asymmetric/RSA.py @@ -0,0 +1,68 @@ +import base64 +import binascii +from Crypto.Cipher import PKCS1_OAEP +from Crypto.PublicKey import RSA +from Crypto.Signature import pkcs1_15 +from Crypto.Hash import SHA256 + + +def generate_keys(priKeySavePath, pubKeySavePath): + # 生成一个RSA密钥对象 + key = RSA.generate(2048) + + # 导出私钥 + private_key = key.export_key() + with open(priKeySavePath, "wb") as f: + f.write(private_key) + + # 导出公钥 + public_key = key.publickey().export_key() + with open(pubKeySavePath, "wb") as f: + f.write(public_key) + + +def encrypt_message(message, public_key_base64): + # 加载公钥 + public_key_bin = base64.b64decode(public_key_base64) + public_key = RSA.import_key(public_key_bin) + # 使用公钥加密消息 + cipher_rsa = PKCS1_OAEP.new(public_key) + encrypted_message = cipher_rsa.encrypt(message.encode('utf-8')) + + return binascii.hexlify(encrypted_message).decode('utf-8') + + +def decrypt_message(encrypted_message, private_key_base64): + # 加载私钥 + private_key_bin = base64.b64decode(private_key_base64) + private_key = RSA.import_key(private_key_bin) + # 使用私钥解密消息 + cipher_rsa = PKCS1_OAEP.new(private_key) + decrypted_message = cipher_rsa.decrypt(binascii.unhexlify(encrypted_message)) + + return decrypted_message.decode('utf-8') + +def sign_message(message, private_key_base64): + # 加载私钥 + private_key_bin = base64.b64decode(private_key_base64) + private_key = RSA.import_key(private_key_bin) + # 计算消息的哈希值 + hash_obj = SHA256.new(message.encode('utf-8')) + # 使用私钥对哈希值进行签名 + signature = pkcs1_15.new(private_key).sign(hash_obj) + + return binascii.hexlify(signature).decode('utf-8') + + +def verify_signature(message, signature, public_key_base64): + # 加载公钥 + public_key_bin = base64.b64decode(public_key_base64) + public_key = RSA.import_key(public_key_bin) + # 计算消息的哈希值 + hash_obj = SHA256.new(message.encode('utf-8')) + # 使用公钥验证签名 + try: + pkcs1_15.new(public_key).verify(hash_obj, binascii.unhexlify(signature)) + return True + except (ValueError, TypeError): + return False diff --git a/tool/symmetric/SM4.py b/tool/symmetric/SM4.py index e69de29..d3d53e1 100644 --- a/tool/symmetric/SM4.py +++ b/tool/symmetric/SM4.py @@ -0,0 +1,67 @@ +import os + +from gmssl import sm4 + +def encrypt_ecb(data, key): + cipher = sm4.CryptSM4() + cipher.set_key(key.encode('utf-8'), sm4.SM4_ENCRYPT) + encrypted_data = cipher.crypt_ecb(data.encode('utf-8')) + return encrypted_data.hex() + +def decrypt_ecb(encrypted_hex, key): + cipher = sm4.CryptSM4() + cipher.set_key(key.encode('utf-8'), sm4.SM4_DECRYPT) + decrypted_data = cipher.crypt_ecb(bytes.fromhex(encrypted_hex)) + return decrypted_data.decode('utf-8') + +def encrypt_cbc_with_iv(data, key): + cipher = sm4.CryptSM4() + cipher.set_key(key.encode('utf-8'), sm4.SM4_ENCRYPT) + + # 生成随机的16字节IV + iv = os.urandom(16) + + # 加密数据 + encrypted_data = cipher.crypt_cbc(iv, data.encode('utf-8')) + + # 将IV和加密后的数据拼接在一起 + return iv + encrypted_data + +def decrypt_cbc_with_iv(encrypted_bytes, key): + cipher = sm4.CryptSM4() + cipher.set_key(key.encode('utf-8'), sm4.SM4_DECRYPT) + + # 提取IV + iv = encrypted_bytes[:16] + + # 提取加密后的数据 + encrypted_data = encrypted_bytes[16:] + + # 解密数据 + decrypted_data = cipher.crypt_cbc(iv, encrypted_data) + return decrypted_data.decode('utf-8') + +if __name__ == "__main__": + # 示例数据和密钥 + data = "Hello, SM4!" + key = "1234567890abcdef" + + # 加密 + encrypted_data = encrypt_ecb(data, key) + print(f"Encrypted: {encrypted_data}") + + # 解密 + decrypted_data = decrypt_ecb(encrypted_data, key) + print(f"Decrypted: {decrypted_data}") + + # 示例数据和密钥 + data = "Hello, SM4 CBC with random IV!" + key = "1234567890abcdef" + + # 加密 + encrypted_data = encrypt_cbc_with_iv(data, key) + print(f"Encrypted: {encrypted_data.hex()}") + + # 解密 + decrypted_data = decrypt_cbc_with_iv(encrypted_data, key) + print(f"Decrypted: {decrypted_data}") diff --git a/tool/test_PriKeyHelper.py b/tool/test_PriKeyHelper.py new file mode 100644 index 0000000..8de5054 --- /dev/null +++ b/tool/test_PriKeyHelper.py @@ -0,0 +1,39 @@ +import pytest + +from tool import PriKeyHelper +from tool.asymmetric import RSA + + +def test_encrypt(): + key = PriKeyHelper.getUserKey() + message = "Hello, this is a secret message." + # 打印keys + print(key[0]) + print(key[1]) + + # 加密消息 + encrypted = RSA.encrypt_message(message, key[1]) + print(f"Encrypted: {encrypted}") + + # 解密消息 + decrypted = RSA.decrypt_message(encrypted, key[0]) + print(f"Decrypted: {decrypted}") + + +def test_sign(): + + key = PriKeyHelper.getUserKey() + message = "This is a signed message." + # 打印keys + print(key[0]) + print(key[1]) + + + # 签名消息 + signature = RSA.sign_message(message, key[0]) + print(f"Signature: {signature}") + + # 验证签名 + is_valid = RSA.verify_signature(message, signature, key[1]) + print(f"Is Signature Valid? {is_valid}") + assert is_valid == True \ No newline at end of file