Compare commits

..

No commits in common. '5010769b54dfb5e43bc263028d3e56411a9b43ee' and '26890907007c737de5b7dda590ddcf60d551d629' have entirely different histories.

@ -1,15 +1,13 @@
from enum import Enum, auto
from tool.DownloadPathTool import get_config_path
port = 8426
configPath = get_config_path()
priKeySavePath = f"{configPath}/private.pem"
pubKeySavePath = f"{configPath}/public.pem"
priKeySavePath = "./private.pem"
pubKeySavePath = "./public.pem"
class EncryptType(Enum):
AES_ECB = auto()
AES_CBC = auto()
AES_CFB = auto()
AES_OFB = auto()
SM4_ECB = auto()
SM4_CBC = auto()

@ -1,41 +1,9 @@
import json
class Letter:
sign = "计算得到"
encryptType = "SM4_GCM"
encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据
encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据
recvPubKey = ""
senderPubKey = ""
fileName = ""
fileBase64 = ""
def to_dict(self):
return {
"sign": self.sign,
"encryptType": self.encryptType,
"encryptKey": self.encryptKey,
"recvPubKey": self.recvPubKey,
"senderPubKey": self.senderPubKey,
"fileName": self.fileName,
"fileBase64": self.fileBase64
}
def json_to_obj(json_str):
new_obj= Letter()
new_obj.sign = json_str["sign"]
new_obj.encryptType = json_str["encryptType"]
new_obj.encryptKey = json_str["encryptKey"]
new_obj.recvPubKey = json_str["recvPubKey"]
new_obj.senderPubKey = json_str["senderPubKey"]
new_obj.fileName = json_str["fileName"]
new_obj.fileBase64 = json_str["fileBase64"]
return new_obj
# test code
if __name__ == '__main__':
letter = Letter()
# print(json.dumps(letter.to_dict(), indent=1))
json_dict = json.loads(json.dumps(letter.to_dict()))
print(json_dict.__dir__)

@ -1,10 +0,0 @@
import pyfiglet # 程序greet
# 程序greet
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
greet = pyfiglet.figlet_format("File Secure Transfer", font="slant" , width=250)
print(greet)
print(" <For Secure And Fast File Transfer>")
author = " <-Made By Li-Nuo-Cheng Tan-Jun-Wen Ren-Qing-Yu->"
print(author)
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")

@ -1,13 +1,10 @@
import base64
import logging
import os.path
from config import config
from entity.Letter import Letter
from tool import PriKeyHelper, DownloadPathTool
from tool import PriKeyHelper
from tool.asymmetric import RSA
from tool.hash import Segwit
from tool.symmetric import SM4, AES
from tool.symmetric import SM4
def getLetter():
@ -18,17 +15,14 @@ def getLetter():
handleLetter(letter)
pass
def handleLetter(letter: Letter):
def handleLetter(letter:Letter):
# 解析信件 确认收信人
# 获取自身key
pki = PriKeyHelper.getUserKey()
print("pki is: ",pki[1])
print("letter.recvPubKey is: ",letter.recvPubKey)
if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。
key = RSA.decrypt_message(base64.b64decode(letter.encryptKey), pki[0])
key = RSA.decrypt_message(base64.b64decode(letter.encryptKey),pki[0])
# 根据不同的对称加密算法
try:
@ -38,42 +32,23 @@ def handleLetter(letter: Letter):
# 进行解密fileBase64
data = b""
if type == config.EncryptType.SM4_ECB:
data = base64.b64decode(SM4.decrypt_ecb(letter.fileBase64, key))
data = base64.b64decode(SM4.decrypt_ecb(letter.fileBase64,key))
elif type == config.EncryptType.SM4_CBC:
data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64, key))
data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64,key))
elif type == config.EncryptType.AES_ECB:
data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64, "ecb"))
raise NotImplementedError("未实现")
elif type == config.EncryptType.AES_CBC:
data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64,"cbc"))
raise NotImplementedError("未实现")
else:
raise KeyError("不支持的对称加密算法")
# 用发信人的公钥验签摘要
result = RSA.verify_signature(data, letter.sign, letter.senderPubKey)
result = RSA.verify_signature(data,letter.sign,letter.senderPubKey)
if not result:
raise Exception("签名验证失败,文件不可信")
# 保存文件
# 默认下载目录
download_dir = DownloadPathTool.get_download_directory()
filename = letter.fileName
base_name, ext = os.path.splitext(filename) # 分离文件名和扩展名
newName = None
count = 0
while True:
if count:
newName = f"{base_name}({count}){ext}"
else:
newName = filename
path = f"{download_dir}/{newName}"
if os.path.exists(path):
logging.debug("文件已存在,自动避免覆盖写入")
count += 1
else:
break
with open(path, "wb") as f:
with open(f"./{letter.fileName}","wb") as f:
f.write(data)
print(f"确认收到来自 {Segwit.encodeSegwit(letter.senderPubKey.encode('utf-8'))} 的文件")
print(f"签名验证有效,已将文件 {newName} 保存至 {download_dir}")
return
print(f"签名验证有效,已将文件 {letter.fileName} 保存至当前目录下")
return

@ -1,141 +0,0 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -2,31 +2,12 @@ from unittest.mock import patch
import recv
from sender import sender
from tool import PriKeyHelper
@patch('builtins.input', side_effect=['../README.md', 'aes', 'ecb'])
def test_handleLetter_aes_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input):
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'cbc'])
def test_handleLetter(mock_input):
pki = PriKeyHelper.getUserKey()
letter = sender.main()
recv.handleLetter(letter)
pass

@ -1,55 +1,40 @@
# 模式,文件,自己的公钥从哪里来,别人的公钥从哪里来
import pyfiglet
import base64
import os
from entity.Letter import Letter
from itsdangerous import base64_encode
from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA
from tool.symmetric.AES import AESUtils
from tool.symmetric.SM4 import encrypt_ecb, encrypt_cbc_with_iv
from tool.symmetric.SM4 import encrypt_ecb, decrypt_cbc_with_iv, encrypt_cbc_with_iv
from Crypto.Random import get_random_bytes
# 三个全局变量,记录信封
# 两个变量,记录信封
letterWay = ""
letterMode = ""
letterSymKey = b""
def main():
# greet
print("")
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
greet = pyfiglet.figlet_format("File Secure Transfer", font="slant", width=250)
print(greet)
print(" <For Secure And Fast File Transfer>")
author = " <-Made By Li-Nuo-Cheng Tan-Jun-Wen Ren-Qing-Yu->"
print(author)
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
letter = Letter()
# 用户输入各种数据填充letter字段
path = selectFile()
with open(path,"rb") as f:
data = f.read()
letter.fileName = getFileName(path)
letter.fileName = "交给你了"
letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
letter.fileBase64, akey = SymEncryption(base64_encode(data).decode("utf-8"),letterSymKey)
# data = "Hello, AES!"
letter.encryptKey = getEncryptKey()
letter.encryptType = getEncryptType()
letter.sign = getSign(data)
print(letter.fileName)
print(letter.fileBase64)
print(letter.sign)
print(letter.encryptType)
print(letter.encryptKey)
@ -63,24 +48,8 @@ def main():
return letter # 方便recv测试以后可以删除。
pass
def selectFile() -> str:
# s = "public.pem"
while True:
try:
s = input("输入文件路径:")
with open(s, "rb") as _:
pass
return s
except FileNotFoundError:
print("错误: 文件未找到,请输入正确的文件路径。")
except PermissionError:
print("错误: 无法访问该文件,请检查权限。")
except Exception as e:
print(f"发生未知错误: {e}")
# 获得文件名
def getFileName(fName:str) -> str:
filePath = os.path.split(fName)
return filePath[-1]
s = input("输入文件路径:")
return s
def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# 向目标ip和端口发送指定的信件
@ -95,35 +64,34 @@ def selectSymEncryptionChoice():
# 选择加密算法
while True:
encryWay = input("选择加密算法 (aes/sm4): ").strip().lower() # 统一转成小写
encryWay = input("Choose the way for encryption (aes/sm4): ").strip().lower() # 统一转成小写
if encryWay in ["aes", "sm4"]:
letterWay = encryWay
print(f"已经选择 '{encryWay}'.")
print(f"You have selected '{encryWay}' encryption.")
break # 输入有效后退出循环
else:
print("非法选择,请输入 'aes' 'sm4'.")
print("Invalid choice. Please enter 'aes' or 'sm4'.")
# 选择加密算法的模式
while True:
if encryWay == "aes":
encryMode = input("选择加密算法模式 (ecb/cbc): ").strip().lower()
if encryMode in ["ecb", "cbc"]:
encryMode = input("Choose the encryption mode (ecb/cbc/cfb/ofb): ").strip().lower()
if encryMode in ["ecb", "cbc", "cfb", "ofb"]:
letterMode = encryMode
print(f"已选择 '{encryMode}' 加密模式.")
print(f"You have selected '{encryMode}' encryption mode.")
break # 输入有效后退出循环
else:
print("非法输入,请输入 ecb/cbc")
print("Invalid choice. Please enter ecb/cbc/cfb/ofb")
elif encryWay == "sm4":
encryMode = input("选择加密模式 (ecb/cbc): ").strip().lower()
encryMode = input("Choose the encryption mode (ecb/cbc): ").strip().lower()
if encryMode in ["ecb", "cbc"]:
letterMode = encryMode
print(f"已选择 '{encryMode}' 加密模式.")
print(f"You have selected '{encryMode}' encryption mode.")
break # 输入有效后退出循环
else:
print("非法输入,请输入 enter ecb/cbc")
print("Invalid choice. Please enter ecb/cbc")
# 返回加密方法和加密方法的模式
return encryWay, encryMode
@ -160,7 +128,6 @@ def getSign(document_bytes):
return signDocuHash
# 获得加密的方法和模式,封装信封
def getEncryptType():
encryType = f"{letterWay}_{letterMode}".upper()
@ -175,11 +142,7 @@ def getEncryptKey():
# 获得接收方的公钥
def getRecvPubKey():
# recPubKey = input("plz input Receiver's Public Key: ")
# 在某某地方获得对方的公钥然后保存到某个地方输入路径uoqu
recPubKeyPath = input("请输入接受方的公钥文件路径:")
with open(recPubKeyPath, "rb") as f:
data = f.read()
recPubKey = base64.b64encode(data).decode('utf-8')
recPubKey = getUserKey()[1]
return recPubKey
# 获得发送方的公钥
@ -198,4 +161,5 @@ if __name__ == "__main__":
#
# encryptType = f"{letterWay}_{letterMode}".upper()
# print(encryptType)
main()

@ -1,143 +0,0 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
from entity.Letter import Letter,json_to_obj
import select
import sender
from recv import recv
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class sender_net():
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port=port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, msg):
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def run(self):
threading.Thread(target=self.main).start()
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print(" [*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
#加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
#test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = sender_net(port, client_host, client_port)
a.run()

@ -1,42 +0,0 @@
import os
import sys
import winreg
from pathlib import Path
def get_config_path():
if sys.platform == 'win32':
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars("%appdata%/fst")).resolve()
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'fst'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)
def get_download_directory():
if sys.platform == 'win32':
# 打开注册表项
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders')
try:
# 读取 "Downloads" 路径
download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}')
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars(download_dir)).resolve()
finally:
# 关闭注册表项
winreg.CloseKey(key)
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'Downloads'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)

@ -10,6 +10,7 @@ def getUserKey() -> (str, str): # 返回base64编码
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥
with open(config.priKeySavePath, "rb") as f:
data = f.read()

@ -1,72 +0,0 @@
import hashlib
import bech32
# https://liaoxuefeng.com/books/blockchain/bitcoin/segwit/index.html
def hash160(data):
"""
计算输入数据的SHA-160哈希值
参数:
data (bytes): 输入数据
返回:
bytes: SHA-160哈希值
"""
return hashlib.new('ripemd160', hashlib.sha256(data).digest()).digest()
def encodeSegwit(data_bytes, hrp="fst"):
"""
将字节数据编码为Bech32格式
参数:
hrp (str): 人类可读部分 (Human-readable part)
data_bytes (bytes): 要编码的数据
返回:
str: Bech32编码后的字符串
"""
# 将字节转换为5位整数列表
data_ints = bech32.convertbits(hash160(data_bytes), 8, 5)
if not data_ints:
raise ValueError("Failed to convert bytes to 5-bit integers")
return bech32.bech32_encode(hrp, data_ints)
def verifySegwit(bech32_str):
"""
验证Bech32编码的字符串是否有效
参数:
bech32_str (str): Bech32编码的字符串
返回:
bool: 如果字符串有效则返回True否则返回False
"""
try:
hrp, data_ints = bech32.bech32_decode(bech32_str)
if not hrp or not data_ints:
return False
# 将5位整数列表转换回字节
data_bytes = bech32.convertbits(data_ints, 5, 8, False)
if not data_bytes:
return False
return True
except Exception as e:
print(f"Verification failed: {e}")
return False
# 示例用法
if __name__ == "__main__":
message = b"Hello, Bech32!"
encoded = encodeSegwit(message)
print(f"Encoded: {encoded}")
is_valid = verifySegwit(encoded)
print(f"Is valid: {is_valid}")

@ -1,55 +1,50 @@
# "ecb", "cbc", "cfb", "ofb"
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes
import base64
from fontTools.misc.eexec import encrypt
class AESUtils:
def __init__(self, key: bytes = None):
"""生成一个随机密钥"""
self.key = key if key else get_random_bytes(16)
self.key = get_random_bytes(32)
def encrypt(self, data1: str, mode: str = 'ECB') -> (str, str):
def encrypt(self, data: str, mode: str = 'ECB') -> (str,str):
"""加密数据"""
cipher = self._get_encipher(mode)
data1 = pad(data1.encode('utf-8'), AES.block_size) # 填充数据
ciphertext = cipher.encrypt(data1)
return base64.b64encode(ciphertext).decode('utf-8'), self.key
cipher = self._get_cipher(mode)
data = pad(data.encode(), AES.block_size) # 填充数据
ciphertext = cipher.encrypt(data)
return base64.b64encode(ciphertext).decode(), self.key
def decrypt(self, dekey, encrypted_data: str, mode: str = 'ECB') -> str:
def decrypt(self, encrypted_data: str, mode: str = 'ECB') -> str:
"""解密数据"""
cipher = self._get_decipher(dekey, mode)
encrypted_data = base64.b64decode(encrypted_data) # 解码密文
cipher = self._get_cipher(mode)
encrypted_data = base64.b64decode(encrypted_data)
plaintext = unpad(cipher.decrypt(encrypted_data), AES.block_size)
return plaintext.decode()
def _get_encipher(self, mode: str) -> AES:
"""根据模式返回相应的加密 AES cipher"""
iv = b'abcdefghijklmnop'
def _get_cipher(self, mode: str) -> AES:
"""根据模式返回相应的 AES cipher"""
iv = None
if mode == 'CBC':
iv = get_random_bytes(AES.block_size)
return AES.new(self.key, AES.MODE_CBC, iv)
elif mode == 'CFB':
return AES.new(self.key, AES.MODE_CFB)
elif mode == 'OFB':
return AES.new(self.key, AES.MODE_OFB)
else: # 默认是 ECB
return AES.new(self.key, AES.MODE_ECB)
def _get_decipher(self, dekey, mode: str) -> AES:
"""根据模式返回相应的解密 AES cipher"""
iv = b'abcdefghijklmnop'
if mode == 'CBC':
iv = get_random_bytes(AES.block_size)
return AES.new(dekey, AES.MODE_CBC, iv)
else: # 默认是 ECB
return AES.new(dekey, AES.MODE_ECB)
if __name__ == "__main__":
# 示例数据和密钥
data = "Hello, SM4!"
key = "1234567890abcdef"
if __name__ == '__main__':
key = b'1234567890abcdef' # AES 密钥必须是 16 字节
aes = AESUtils(key)
enData , akey = AESUtils(key)
# 加密
plaintext = "Hello, AES!"
enData, akey = aes.encrypt(plaintext, mode='ECB')
print("Encrypted Data:", enData)
print("Key:", akey)
print(enData)
# 解密
decrypted = aes.decrypt(aes.key, enData, mode='ECB')
print("Decrypted:", decrypted)
Loading…
Cancel
Save