Merge remote-tracking branch 'origin/recv_timmoc' into sender_okToCheck

sender_okToCheck
UniDarkstars 8 months ago
commit 8e2b103830

6
.gitignore vendored

@ -1 +1,7 @@
/.idea
/tool/asymmetric/private.pem
/tool/asymmetric/public.pem
/tool/public.pem
/tool/private.pem
/private.pem
/public.pem

@ -1,2 +1,25 @@
from enum import Enum, auto
port = 8426
priKeySavePath = "./priKey.txt"
priKeySavePath = "./private.pem"
pubKeySavePath = "./public.pem"
class EncryptType(Enum):
AES_ECB = auto()
AES_CBC = auto()
AES_CFB = auto()
AES_OFB = auto()
SM4_ECB = auto()
SM4_CBC = auto()
# 函数:通过文本获取枚举成员
def getEncryptType(text):
try:
return EncryptType[text]
except KeyError:
raise ValueError(f"Invalid EncryptType name: {text}")
# 函数:通过枚举成员获取文本
def getEncryptTypeName(encryptType):
if not isinstance(encryptType, EncryptType):
raise TypeError("Expected a EncryptType enum member")
return encryptType.name

@ -1,4 +1,11 @@
import base64
from config import config
from entity.Letter import Letter
from tool import PriKeyHelper
from tool.asymmetric import RSA
from tool.symmetric import SM4
def getLetter():
# 阻塞自身 从指定端口获取信件
@ -9,4 +16,39 @@ def getLetter():
pass
def handleLetter(letter:Letter):
pass
# 解析信件 确认收信人
# 获取自身key
pki = PriKeyHelper.getUserKey()
if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。
key = RSA.decrypt_message(letter.encryptKey,pki[0])
# 根据不同的对称加密算法
try:
type = config.getEncryptType(letter.encryptType)
except KeyError:
raise KeyError("不支持的对称加密算法")
# 进行解密fileBase64
data = None
if type == config.EncryptType.SM4_ECB:
SM4.decrypt_ecb(base64.b64decode(letter.fileBase64),key)
elif type == config.EncryptType.SM4_CBC:
SM4.decrypt_cbc_with_iv(base64.b64decode(letter.fileBase64),key)
elif type == config.EncryptType.AES_GCM:
raise NotImplementedError("未实现")
elif type == config.EncryptType.AES_CBC:
raise NotImplementedError("未实现")
else:
raise KeyError("不支持的对称加密算法")
# 用发信人的公钥验签摘要
result = RSA.verify_signature(data,letter.sign,letter.senderPubKey)
if not result:
raise Exception("签名验证失败,文件不可信")
# 保存文件
with open(f"./{letter.fileName}","wb") as f:
f.write(data)
return

@ -0,0 +1,8 @@
from sender import sender
from tool import PriKeyHelper
def test_handleLetter():
pki = PriKeyHelper.getUserKey()
pass

@ -1,4 +1,24 @@
def getUserKey() -> (str, str):
# 获取用户的公私钥对,若不存在,则生成之
# 返回 私钥,公钥 格式
pass
import base64
import os
from config import config
from tool.asymmetric import RSA
def getUserKey() -> (str, str): # 返回base64编码
# 检查是否存在公私钥文件
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥
with open(config.priKeySavePath, "rb") as f:
data = f.read()
private_key = base64.b64encode(data).decode('utf-8')
# 读取公钥
with open(config.pubKeySavePath, "rb") as f:
data = f.read()
public_key = base64.b64encode(data).decode('utf-8')
return private_key, public_key

@ -0,0 +1,68 @@
import base64
import binascii
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
def generate_keys(priKeySavePath, pubKeySavePath):
# 生成一个RSA密钥对象
key = RSA.generate(2048)
# 导出私钥
private_key = key.export_key()
with open(priKeySavePath, "wb") as f:
f.write(private_key)
# 导出公钥
public_key = key.publickey().export_key()
with open(pubKeySavePath, "wb") as f:
f.write(public_key)
def encrypt_message(message, public_key_base64):
# 加载公钥
public_key_bin = base64.b64decode(public_key_base64)
public_key = RSA.import_key(public_key_bin)
# 使用公钥加密消息
cipher_rsa = PKCS1_OAEP.new(public_key)
encrypted_message = cipher_rsa.encrypt(message.encode('utf-8'))
return binascii.hexlify(encrypted_message).decode('utf-8')
def decrypt_message(encrypted_message, private_key_base64):
# 加载私钥
private_key_bin = base64.b64decode(private_key_base64)
private_key = RSA.import_key(private_key_bin)
# 使用私钥解密消息
cipher_rsa = PKCS1_OAEP.new(private_key)
decrypted_message = cipher_rsa.decrypt(binascii.unhexlify(encrypted_message))
return decrypted_message.decode('utf-8')
def sign_message(message, private_key_base64):
# 加载私钥
private_key_bin = base64.b64decode(private_key_base64)
private_key = RSA.import_key(private_key_bin)
# 计算消息的哈希值
hash_obj = SHA256.new(message.encode('utf-8'))
# 使用私钥对哈希值进行签名
signature = pkcs1_15.new(private_key).sign(hash_obj)
return binascii.hexlify(signature).decode('utf-8')
def verify_signature(message, signature, public_key_base64):
# 加载公钥
public_key_bin = base64.b64decode(public_key_base64)
public_key = RSA.import_key(public_key_bin)
# 计算消息的哈希值
hash_obj = SHA256.new(message.encode('utf-8'))
# 使用公钥验证签名
try:
pkcs1_15.new(public_key).verify(hash_obj, binascii.unhexlify(signature))
return True
except (ValueError, TypeError):
return False

@ -0,0 +1,67 @@
import os
from gmssl import sm4
def encrypt_ecb(data, key):
cipher = sm4.CryptSM4()
cipher.set_key(key.encode('utf-8'), sm4.SM4_ENCRYPT)
encrypted_data = cipher.crypt_ecb(data.encode('utf-8'))
return encrypted_data.hex()
def decrypt_ecb(encrypted_hex, key):
cipher = sm4.CryptSM4()
cipher.set_key(key.encode('utf-8'), sm4.SM4_DECRYPT)
decrypted_data = cipher.crypt_ecb(bytes.fromhex(encrypted_hex))
return decrypted_data.decode('utf-8')
def encrypt_cbc_with_iv(data, key):
cipher = sm4.CryptSM4()
cipher.set_key(key.encode('utf-8'), sm4.SM4_ENCRYPT)
# 生成随机的16字节IV
iv = os.urandom(16)
# 加密数据
encrypted_data = cipher.crypt_cbc(iv, data.encode('utf-8'))
# 将IV和加密后的数据拼接在一起
return iv + encrypted_data
def decrypt_cbc_with_iv(encrypted_bytes, key):
cipher = sm4.CryptSM4()
cipher.set_key(key.encode('utf-8'), sm4.SM4_DECRYPT)
# 提取IV
iv = encrypted_bytes[:16]
# 提取加密后的数据
encrypted_data = encrypted_bytes[16:]
# 解密数据
decrypted_data = cipher.crypt_cbc(iv, encrypted_data)
return decrypted_data.decode('utf-8')
if __name__ == "__main__":
# 示例数据和密钥
data = "Hello, SM4!"
key = "1234567890abcdef"
# 加密
encrypted_data = encrypt_ecb(data, key)
print(f"Encrypted: {encrypted_data}")
# 解密
decrypted_data = decrypt_ecb(encrypted_data, key)
print(f"Decrypted: {decrypted_data}")
# 示例数据和密钥
data = "Hello, SM4 CBC with random IV!"
key = "1234567890abcdef"
# 加密
encrypted_data = encrypt_cbc_with_iv(data, key)
print(f"Encrypted: {encrypted_data.hex()}")
# 解密
decrypted_data = decrypt_cbc_with_iv(encrypted_data, key)
print(f"Decrypted: {decrypted_data}")

@ -0,0 +1,39 @@
import pytest
from tool import PriKeyHelper
from tool.asymmetric import RSA
def test_encrypt():
key = PriKeyHelper.getUserKey()
message = "Hello, this is a secret message."
# 打印keys
print(key[0])
print(key[1])
# 加密消息
encrypted = RSA.encrypt_message(message, key[1])
print(f"Encrypted: {encrypted}")
# 解密消息
decrypted = RSA.decrypt_message(encrypted, key[0])
print(f"Decrypted: {decrypted}")
def test_sign():
key = PriKeyHelper.getUserKey()
message = "This is a signed message."
# 打印keys
print(key[0])
print(key[1])
# 签名消息
signature = RSA.sign_message(message, key[0])
print(f"Signature: {signature}")
# 验证签名
is_valid = RSA.verify_signature(message, signature, key[1])
print(f"Is Signature Valid? {is_valid}")
assert is_valid == True
Loading…
Cancel
Save