Compare commits

...

36 Commits

Author SHA1 Message Date
Timmoc 5010769b54 尝试合并三大分支进度
3 months ago
Timmoc 753e20382a Merge branch 'sender_okToCheck' into test_merge
3 months ago
Timmoc 104a523445 Merge branch 'recyvan_2' into test_merge
3 months ago
UniDarkstars 2ebfc948dd 获得接收方公钥getRecvPubKey
3 months ago
Timmoc 2bbb0962a1 往appdata灌石,公私钥对现存储于appdata目录下
3 months ago
recyvan dc73c10e43 修复了一下不知名bug
3 months ago
recyvan e6c7588e2f Merge remote-tracking branch 'origin/recyvan_2' into recyvan_2
3 months ago
recyvan 50ccd97d6c 支持ip地址格式,局域网访问
3 months ago
UniDarkstars d0d152272e AES只留下CBC和ECB模式,删除了一些冗余代码
3 months ago
Timmoc 5d6226290d 禁止dangerous
3 months ago
recyvan 8227015134 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
Timmoc a14aa4bf31 使用readme进行测试
3 months ago
recyvan 3c34ce49a9 Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
Timmoc 9bd045678e 适配windows环境变量
3 months ago
recyvan b2a6196dbb 更新一下
3 months ago
Timmoc 632981180a Merge branch 'sender_okToCheck' into recv_timmoc
3 months ago
Timmoc 885d288719 增加pytest.ini
3 months ago
Timmoc 8ee4a2e136 修正base64问题
3 months ago
UniDarkstars a7e3cb67e8 更改了recv和AES,AES使用信封内的密钥进行解密
3 months ago
recyvan a67a749bb3 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
UniDarkstars ca27e6d24f 小改,无伤大雅
3 months ago
Timmoc 9a53fb2e12 python3.12惊喜双引号内双引号不报错
3 months ago
Timmoc 80a971702e aes base64 decode
3 months ago
recyvan 27766625bd Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
Timmoc b60d827192 提交一下未能通过测试的测试用例
3 months ago
Timmoc e1f32293e9 Merge branch 'sender_okToCheck' into recv_timmoc
3 months ago
UniDarkstars 20f2c46dfa 更新了一下sender.py的main(),没做太大改动,只是符合了一下测试用例
3 months ago
Timmoc b07196c651 Merge remote-tracking branch 'origin/recv_timmoc' into recv_timmoc
3 months ago
Timmoc b37acaaf22 文件现在不会覆盖现有的文件了
3 months ago
Timmoc 637fe0b0c8 文件现在不会覆盖现有的文件了
3 months ago
Timmoc a2162b4a3e 文件现在默认保存到download文件夹
3 months ago
UniDarkstars e939dd5cbc 增加了main文件里的greet,好玩爱玩
3 months ago
UniDarkstars 2d8ec4af3d 英文改中文,增加了文件选择的错误异常抛出
3 months ago
recyvan 8da58c145a 网络传输改为线程模式,这样就可同时接收和发送数据,没有先后顺序
3 months ago
UniDarkstars 4dd67a326e 英文改中文,增加了文件选择的错误异常抛出
3 months ago
UniDarkstars aef23d2af7 sender.py 增加了获取文件名
3 months ago

@ -1,13 +1,15 @@
from enum import Enum, auto from enum import Enum, auto
from tool.DownloadPathTool import get_config_path
port = 8426 port = 8426
priKeySavePath = "./private.pem" configPath = get_config_path()
pubKeySavePath = "./public.pem" priKeySavePath = f"{configPath}/private.pem"
pubKeySavePath = f"{configPath}/public.pem"
class EncryptType(Enum): class EncryptType(Enum):
AES_ECB = auto() AES_ECB = auto()
AES_CBC = auto() AES_CBC = auto()
AES_CFB = auto()
AES_OFB = auto()
SM4_ECB = auto() SM4_ECB = auto()
SM4_CBC = auto() SM4_CBC = auto()

@ -1,3 +1,6 @@
import json
class Letter: class Letter:
sign = "计算得到" sign = "计算得到"
encryptType = "SM4_GCM" encryptType = "SM4_GCM"
@ -7,3 +10,32 @@ class Letter:
fileName = "" fileName = ""
fileBase64 = "" fileBase64 = ""
def to_dict(self):
return {
"sign": self.sign,
"encryptType": self.encryptType,
"encryptKey": self.encryptKey,
"recvPubKey": self.recvPubKey,
"senderPubKey": self.senderPubKey,
"fileName": self.fileName,
"fileBase64": self.fileBase64
}
def json_to_obj(json_str):
new_obj= Letter()
new_obj.sign = json_str["sign"]
new_obj.encryptType = json_str["encryptType"]
new_obj.encryptKey = json_str["encryptKey"]
new_obj.recvPubKey = json_str["recvPubKey"]
new_obj.senderPubKey = json_str["senderPubKey"]
new_obj.fileName = json_str["fileName"]
new_obj.fileBase64 = json_str["fileBase64"]
return new_obj
# test code
if __name__ == '__main__':
letter = Letter()
# print(json.dumps(letter.to_dict(), indent=1))
json_dict = json.loads(json.dumps(letter.to_dict()))
print(json_dict.__dir__)

@ -0,0 +1,10 @@
import pyfiglet # 程序greet
# 程序greet
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
greet = pyfiglet.figlet_format("File Secure Transfer", font="slant" , width=250)
print(greet)
print(" <For Secure And Fast File Transfer>")
author = " <-Made By Li-Nuo-Cheng Tan-Jun-Wen Ren-Qing-Yu->"
print(author)
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")

@ -1,10 +1,13 @@
import base64 import base64
import logging
import os.path
from config import config from config import config
from entity.Letter import Letter from entity.Letter import Letter
from tool import PriKeyHelper from tool import PriKeyHelper, DownloadPathTool
from tool.asymmetric import RSA from tool.asymmetric import RSA
from tool.symmetric import SM4 from tool.hash import Segwit
from tool.symmetric import SM4, AES
def getLetter(): def getLetter():
@ -15,10 +18,13 @@ def getLetter():
handleLetter(letter) handleLetter(letter)
pass pass
def handleLetter(letter: Letter): def handleLetter(letter: Letter):
# 解析信件 确认收信人 # 解析信件 确认收信人
# 获取自身key # 获取自身key
pki = PriKeyHelper.getUserKey() pki = PriKeyHelper.getUserKey()
print("pki is: ",pki[1])
print("letter.recvPubKey is: ",letter.recvPubKey)
if pki[1] != letter.recvPubKey: if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己") raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。 # 用自己的私钥解密key 获得对称加密秘钥。
@ -36,9 +42,9 @@ def handleLetter(letter:Letter):
elif type == config.EncryptType.SM4_CBC: elif type == config.EncryptType.SM4_CBC:
data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64, key)) data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64, key))
elif type == config.EncryptType.AES_ECB: elif type == config.EncryptType.AES_ECB:
raise NotImplementedError("未实现") data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64, "ecb"))
elif type == config.EncryptType.AES_CBC: elif type == config.EncryptType.AES_CBC:
raise NotImplementedError("未实现") data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64,"cbc"))
else: else:
raise KeyError("不支持的对称加密算法") raise KeyError("不支持的对称加密算法")
@ -48,7 +54,26 @@ def handleLetter(letter:Letter):
raise Exception("签名验证失败,文件不可信") raise Exception("签名验证失败,文件不可信")
# 保存文件 # 保存文件
with open(f"./{letter.fileName}","wb") as f:
# 默认下载目录
download_dir = DownloadPathTool.get_download_directory()
filename = letter.fileName
base_name, ext = os.path.splitext(filename) # 分离文件名和扩展名
newName = None
count = 0
while True:
if count:
newName = f"{base_name}({count}){ext}"
else:
newName = filename
path = f"{download_dir}/{newName}"
if os.path.exists(path):
logging.debug("文件已存在,自动避免覆盖写入")
count += 1
else:
break
with open(path, "wb") as f:
f.write(data) f.write(data)
print(f"签名验证有效,已将文件 {letter.fileName} 保存至当前目录下") print(f"确认收到来自 {Segwit.encodeSegwit(letter.senderPubKey.encode('utf-8'))} 的文件")
print(f"签名验证有效,已将文件 {newName} 保存至 {download_dir}")
return return

@ -0,0 +1,141 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -2,12 +2,31 @@ from unittest.mock import patch
import recv import recv
from sender import sender from sender import sender
from tool import PriKeyHelper
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'cbc']) @patch('builtins.input', side_effect=['../README.md', 'aes', 'ecb'])
def test_handleLetter(mock_input): def test_handleLetter_aes_ecb(mock_input):
pki = PriKeyHelper.getUserKey() letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input):
letter = sender.main() letter = sender.main()
recv.handleLetter(letter) recv.handleLetter(letter)
pass pass

@ -1,40 +1,55 @@
# 模式,文件,自己的公钥从哪里来,别人的公钥从哪里来 # 模式,文件,自己的公钥从哪里来,别人的公钥从哪里来
import pyfiglet
import base64 import base64
import os
from entity.Letter import Letter from entity.Letter import Letter
from itsdangerous import base64_encode
from tool import PriKeyHelper from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA from tool.asymmetric import RSA
from tool.symmetric.AES import AESUtils from tool.symmetric.AES import AESUtils
from tool.symmetric.SM4 import encrypt_ecb, decrypt_cbc_with_iv, encrypt_cbc_with_iv from tool.symmetric.SM4 import encrypt_ecb, encrypt_cbc_with_iv
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
# 两个变量,记录信封 # 三个全局变量,记录信封
letterWay = "" letterWay = ""
letterMode = "" letterMode = ""
letterSymKey = b"" letterSymKey = b""
def main(): def main():
# greet
print("")
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
greet = pyfiglet.figlet_format("File Secure Transfer", font="slant", width=250)
print(greet)
print(" <For Secure And Fast File Transfer>")
author = " <-Made By Li-Nuo-Cheng Tan-Jun-Wen Ren-Qing-Yu->"
print(author)
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
letter = Letter() letter = Letter()
# 用户输入各种数据填充letter字段 # 用户输入各种数据填充letter字段
path = selectFile() path = selectFile()
with open(path,"rb") as f: with open(path,"rb") as f:
data = f.read() data = f.read()
letter.fileName = "交给你了"
letter.fileName = getFileName(path)
letter.recvPubKey = getRecvPubKey() letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey() letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64_encode(data).decode("utf-8"),letterSymKey) letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
# data = "Hello, AES!"
letter.encryptKey = getEncryptKey() letter.encryptKey = getEncryptKey()
letter.encryptType = getEncryptType() letter.encryptType = getEncryptType()
letter.sign = getSign(data) letter.sign = getSign(data)
print(letter.fileName)
print(letter.fileBase64)
print(letter.sign) print(letter.sign)
print(letter.encryptType) print(letter.encryptType)
print(letter.encryptKey) print(letter.encryptKey)
@ -48,8 +63,24 @@ def main():
return letter # 方便recv测试以后可以删除。 return letter # 方便recv测试以后可以删除。
pass pass
def selectFile() -> str: def selectFile() -> str:
# s = "public.pem"
while True:
try:
s = input("输入文件路径:") s = input("输入文件路径:")
with open(s, "rb") as _:
pass
return s return s
except FileNotFoundError:
print("错误: 文件未找到,请输入正确的文件路径。")
except PermissionError:
print("错误: 无法访问该文件,请检查权限。")
except Exception as e:
print(f"发生未知错误: {e}")
# 获得文件名
def getFileName(fName:str) -> str:
filePath = os.path.split(fName)
return filePath[-1]
def sendLetter(letter: Letter, target="192.168.195.162:8426"): def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# 向目标ip和端口发送指定的信件 # 向目标ip和端口发送指定的信件
@ -64,34 +95,35 @@ def selectSymEncryptionChoice():
# 选择加密算法 # 选择加密算法
while True: while True:
encryWay = input("Choose the way for encryption (aes/sm4): ").strip().lower() # 统一转成小写 encryWay = input("选择加密算法 (aes/sm4): ").strip().lower() # 统一转成小写
if encryWay in ["aes", "sm4"]: if encryWay in ["aes", "sm4"]:
letterWay = encryWay letterWay = encryWay
print(f"You have selected '{encryWay}' encryption.") print(f"已经选择 '{encryWay}'.")
break # 输入有效后退出循环 break # 输入有效后退出循环
else: else:
print("Invalid choice. Please enter 'aes' or 'sm4'.") print("非法选择,请输入 'aes' 'sm4'.")
# 选择加密算法的模式 # 选择加密算法的模式
while True: while True:
if encryWay == "aes": if encryWay == "aes":
encryMode = input("Choose the encryption mode (ecb/cbc/cfb/ofb): ").strip().lower() encryMode = input("选择加密算法模式 (ecb/cbc): ").strip().lower()
if encryMode in ["ecb", "cbc", "cfb", "ofb"]: if encryMode in ["ecb", "cbc"]:
letterMode = encryMode letterMode = encryMode
print(f"You have selected '{encryMode}' encryption mode.") print(f"已选择 '{encryMode}' 加密模式.")
break # 输入有效后退出循环 break # 输入有效后退出循环
else: else:
print("Invalid choice. Please enter ecb/cbc/cfb/ofb") print("非法输入,请输入 ecb/cbc")
elif encryWay == "sm4": elif encryWay == "sm4":
encryMode = input("Choose the encryption mode (ecb/cbc): ").strip().lower() encryMode = input("选择加密模式 (ecb/cbc): ").strip().lower()
if encryMode in ["ecb", "cbc"]: if encryMode in ["ecb", "cbc"]:
letterMode = encryMode letterMode = encryMode
print(f"You have selected '{encryMode}' encryption mode.") print(f"已选择 '{encryMode}' 加密模式.")
break # 输入有效后退出循环 break # 输入有效后退出循环
else: else:
print("Invalid choice. Please enter ecb/cbc") print("非法输入,请输入 enter ecb/cbc")
# 返回加密方法和加密方法的模式
return encryWay, encryMode return encryWay, encryMode
@ -128,6 +160,7 @@ def getSign(document_bytes):
return signDocuHash return signDocuHash
# 获得加密的方法和模式,封装信封
def getEncryptType(): def getEncryptType():
encryType = f"{letterWay}_{letterMode}".upper() encryType = f"{letterWay}_{letterMode}".upper()
@ -142,7 +175,11 @@ def getEncryptKey():
# 获得接收方的公钥 # 获得接收方的公钥
def getRecvPubKey(): def getRecvPubKey():
# recPubKey = input("plz input Receiver's Public Key: ") # recPubKey = input("plz input Receiver's Public Key: ")
recPubKey = getUserKey()[1] # 在某某地方获得对方的公钥然后保存到某个地方输入路径uoqu
recPubKeyPath = input("请输入接受方的公钥文件路径:")
with open(recPubKeyPath, "rb") as f:
data = f.read()
recPubKey = base64.b64encode(data).decode('utf-8')
return recPubKey return recPubKey
# 获得发送方的公钥 # 获得发送方的公钥
@ -161,5 +198,4 @@ if __name__ == "__main__":
# #
# encryptType = f"{letterWay}_{letterMode}".upper() # encryptType = f"{letterWay}_{letterMode}".upper()
# print(encryptType) # print(encryptType)
main() main()

@ -0,0 +1,143 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
from entity.Letter import Letter,json_to_obj
import select
import sender
from recv import recv
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class sender_net():
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port=port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, msg):
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def run(self):
threading.Thread(target=self.main).start()
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print(" [*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
#加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
#test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = sender_net(port, client_host, client_port)
a.run()

@ -0,0 +1,42 @@
import os
import sys
import winreg
from pathlib import Path
def get_config_path():
if sys.platform == 'win32':
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars("%appdata%/fst")).resolve()
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'fst'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)
def get_download_directory():
if sys.platform == 'win32':
# 打开注册表项
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders')
try:
# 读取 "Downloads" 路径
download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}')
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars(download_dir)).resolve()
finally:
# 关闭注册表项
winreg.CloseKey(key)
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'Downloads'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)

@ -10,7 +10,6 @@ def getUserKey() -> (str, str): # 返回base64编码
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath): if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对 # 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath) RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥 # 读取私钥
with open(config.priKeySavePath, "rb") as f: with open(config.priKeySavePath, "rb") as f:
data = f.read() data = f.read()

@ -0,0 +1,72 @@
import hashlib
import bech32
# https://liaoxuefeng.com/books/blockchain/bitcoin/segwit/index.html
def hash160(data):
"""
计算输入数据的SHA-160哈希值
参数:
data (bytes): 输入数据
返回:
bytes: SHA-160哈希值
"""
return hashlib.new('ripemd160', hashlib.sha256(data).digest()).digest()
def encodeSegwit(data_bytes, hrp="fst"):
"""
将字节数据编码为Bech32格式
参数:
hrp (str): 人类可读部分 (Human-readable part)
data_bytes (bytes): 要编码的数据
返回:
str: Bech32编码后的字符串
"""
# 将字节转换为5位整数列表
data_ints = bech32.convertbits(hash160(data_bytes), 8, 5)
if not data_ints:
raise ValueError("Failed to convert bytes to 5-bit integers")
return bech32.bech32_encode(hrp, data_ints)
def verifySegwit(bech32_str):
"""
验证Bech32编码的字符串是否有效
参数:
bech32_str (str): Bech32编码的字符串
返回:
bool: 如果字符串有效则返回True否则返回False
"""
try:
hrp, data_ints = bech32.bech32_decode(bech32_str)
if not hrp or not data_ints:
return False
# 将5位整数列表转换回字节
data_bytes = bech32.convertbits(data_ints, 5, 8, False)
if not data_bytes:
return False
return True
except Exception as e:
print(f"Verification failed: {e}")
return False
# 示例用法
if __name__ == "__main__":
message = b"Hello, Bech32!"
encoded = encodeSegwit(message)
print(f"Encoded: {encoded}")
is_valid = verifySegwit(encoded)
print(f"Is valid: {is_valid}")

@ -1,50 +1,55 @@
# "ecb", "cbc", "cfb", "ofb"
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
import base64 import base64
from fontTools.misc.eexec import encrypt
class AESUtils: class AESUtils:
def __init__(self, key: bytes = None): def __init__(self, key: bytes = None):
"""生成一个随机密钥""" """生成一个随机密钥"""
self.key = get_random_bytes(32) self.key = key if key else get_random_bytes(16)
def encrypt(self, data: str, mode: str = 'ECB') -> (str,str): def encrypt(self, data1: str, mode: str = 'ECB') -> (str, str):
"""加密数据""" """加密数据"""
cipher = self._get_cipher(mode) cipher = self._get_encipher(mode)
data = pad(data.encode(), AES.block_size) # 填充数据 data1 = pad(data1.encode('utf-8'), AES.block_size) # 填充数据
ciphertext = cipher.encrypt(data) ciphertext = cipher.encrypt(data1)
return base64.b64encode(ciphertext).decode(), self.key return base64.b64encode(ciphertext).decode('utf-8'), self.key
def decrypt(self, encrypted_data: str, mode: str = 'ECB') -> str: def decrypt(self, dekey, encrypted_data: str, mode: str = 'ECB') -> str:
"""解密数据""" """解密数据"""
cipher = self._get_cipher(mode) cipher = self._get_decipher(dekey, mode)
encrypted_data = base64.b64decode(encrypted_data) encrypted_data = base64.b64decode(encrypted_data) # 解码密文
plaintext = unpad(cipher.decrypt(encrypted_data), AES.block_size) plaintext = unpad(cipher.decrypt(encrypted_data), AES.block_size)
return plaintext.decode() return plaintext.decode()
def _get_cipher(self, mode: str) -> AES: def _get_encipher(self, mode: str) -> AES:
"""根据模式返回相应的 AES cipher""" """根据模式返回相应的加密 AES cipher"""
iv = None iv = b'abcdefghijklmnop'
if mode == 'CBC': if mode == 'CBC':
iv = get_random_bytes(AES.block_size) iv = get_random_bytes(AES.block_size)
return AES.new(self.key, AES.MODE_CBC, iv) return AES.new(self.key, AES.MODE_CBC, iv)
elif mode == 'CFB':
return AES.new(self.key, AES.MODE_CFB)
elif mode == 'OFB':
return AES.new(self.key, AES.MODE_OFB)
else: # 默认是 ECB else: # 默认是 ECB
return AES.new(self.key, AES.MODE_ECB) return AES.new(self.key, AES.MODE_ECB)
if __name__ == "__main__": def _get_decipher(self, dekey, mode: str) -> AES:
# 示例数据和密钥 """根据模式返回相应的解密 AES cipher"""
data = "Hello, SM4!" iv = b'abcdefghijklmnop'
key = "1234567890abcdef" if mode == 'CBC':
iv = get_random_bytes(AES.block_size)
return AES.new(dekey, AES.MODE_CBC, iv)
else: # 默认是 ECB
return AES.new(dekey, AES.MODE_ECB)
enData , akey = AESUtils(key) if __name__ == '__main__':
key = b'1234567890abcdef' # AES 密钥必须是 16 字节
aes = AESUtils(key)
print(enData) # 加密
plaintext = "Hello, AES!"
enData, akey = aes.encrypt(plaintext, mode='ECB')
print("Encrypted Data:", enData)
print("Key:", akey)
# 解密
decrypted = aes.decrypt(aes.key, enData, mode='ECB')
print("Decrypted:", decrypted)
Loading…
Cancel
Save