Compare commits

...

21 Commits

Author SHA1 Message Date
UniDarkstars ebddfeff74 修改了两次输入接收方公钥的问题
3 months ago
Timmoc 5010769b54 尝试合并三大分支进度
3 months ago
Timmoc 753e20382a Merge branch 'sender_okToCheck' into test_merge
3 months ago
Timmoc 104a523445 Merge branch 'recyvan_2' into test_merge
3 months ago
Timmoc 2bbb0962a1 往appdata灌石,公私钥对现存储于appdata目录下
3 months ago
recyvan dc73c10e43 修复了一下不知名bug
3 months ago
recyvan e6c7588e2f Merge remote-tracking branch 'origin/recyvan_2' into recyvan_2
3 months ago
recyvan 50ccd97d6c 支持ip地址格式,局域网访问
3 months ago
Timmoc 5d6226290d 禁止dangerous
3 months ago
recyvan 8227015134 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
Timmoc a14aa4bf31 使用readme进行测试
3 months ago
recyvan 3c34ce49a9 Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
Timmoc 9bd045678e 适配windows环境变量
3 months ago
recyvan b2a6196dbb 更新一下
3 months ago
Timmoc 632981180a Merge branch 'sender_okToCheck' into recv_timmoc
3 months ago
Timmoc 885d288719 增加pytest.ini
3 months ago
recyvan a67a749bb3 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
Timmoc 9a53fb2e12 python3.12惊喜双引号内双引号不报错
3 months ago
Timmoc 80a971702e aes base64 decode
3 months ago
recyvan 27766625bd Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
recyvan 8da58c145a 网络传输改为线程模式,这样就可同时接收和发送数据,没有先后顺序
3 months ago

@ -1,7 +1,11 @@
from enum import Enum, auto from enum import Enum, auto
from tool.DownloadPathTool import get_config_path
port = 8426 port = 8426
priKeySavePath = "./private.pem" configPath = get_config_path()
pubKeySavePath = "./public.pem" priKeySavePath = f"{configPath}/private.pem"
pubKeySavePath = f"{configPath}/public.pem"
class EncryptType(Enum): class EncryptType(Enum):
AES_ECB = auto() AES_ECB = auto()

@ -1,3 +1,6 @@
import json
class Letter: class Letter:
sign = "计算得到" sign = "计算得到"
encryptType = "SM4_GCM" encryptType = "SM4_GCM"
@ -7,3 +10,32 @@ class Letter:
fileName = "" fileName = ""
fileBase64 = "" fileBase64 = ""
def to_dict(self):
return {
"sign": self.sign,
"encryptType": self.encryptType,
"encryptKey": self.encryptKey,
"recvPubKey": self.recvPubKey,
"senderPubKey": self.senderPubKey,
"fileName": self.fileName,
"fileBase64": self.fileBase64
}
def json_to_obj(json_str):
new_obj= Letter()
new_obj.sign = json_str["sign"]
new_obj.encryptType = json_str["encryptType"]
new_obj.encryptKey = json_str["encryptKey"]
new_obj.recvPubKey = json_str["recvPubKey"]
new_obj.senderPubKey = json_str["senderPubKey"]
new_obj.fileName = json_str["fileName"]
new_obj.fileBase64 = json_str["fileBase64"]
return new_obj
# test code
if __name__ == '__main__':
letter = Letter()
# print(json.dumps(letter.to_dict(), indent=1))
json_dict = json.loads(json.dumps(letter.to_dict()))
print(json_dict.__dir__)

@ -23,6 +23,8 @@ def handleLetter(letter: Letter):
# 解析信件 确认收信人 # 解析信件 确认收信人
# 获取自身key # 获取自身key
pki = PriKeyHelper.getUserKey() pki = PriKeyHelper.getUserKey()
print("pki is: ",pki[1])
print("letter.recvPubKey is: ",letter.recvPubKey)
if pki[1] != letter.recvPubKey: if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己") raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。 # 用自己的私钥解密key 获得对称加密秘钥。

@ -0,0 +1,141 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -4,28 +4,28 @@ import recv
from sender import sender from sender import sender
@patch('builtins.input', side_effect=['./public.pem', 'aes', 'ecb']) @patch('builtins.input', side_effect=['../README.md', 'aes', 'ecb'])
def test_handleLetter_aes_ecb(mock_input): def test_handleLetter_aes_ecb(mock_input):
letter = sender.main() letter = sender.main()
recv.handleLetter(letter) recv.handleLetter(letter)
pass pass
@patch('builtins.input', side_effect=['./public.pem', 'aes', 'cbc']) @patch('builtins.input', side_effect=['../README.md', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input): def test_handleLetter_aes_cbc(mock_input):
letter = sender.main() letter = sender.main()
recv.handleLetter(letter) recv.handleLetter(letter)
pass pass
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'ecb']) @patch('builtins.input', side_effect=['../README.md', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input): def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main() letter = sender.main()
recv.handleLetter(letter) recv.handleLetter(letter)
pass pass
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'cbc']) @patch('builtins.input', side_effect=['../README.md', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input): def test_handleLetter_sm4_cbc(mock_input):
letter = sender.main() letter = sender.main()
recv.handleLetter(letter) recv.handleLetter(letter)

@ -6,7 +6,6 @@ import os
from entity.Letter import Letter from entity.Letter import Letter
from tool import PriKeyHelper from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA from tool.asymmetric import RSA
@ -44,7 +43,7 @@ def main():
letter.recvPubKey = getRecvPubKey() letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey() letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey) letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
letter.encryptKey = getEncryptKey() letter.encryptKey = getEncryptKey(letter.recvPubKey)
letter.encryptType = getEncryptType() letter.encryptType = getEncryptType()
letter.sign = getSign(data) letter.sign = getSign(data)
@ -167,16 +166,16 @@ def getEncryptType():
return encryType return encryType
# 对称密钥,返回的是使用接收方公钥加密后的密钥 # 对称密钥,返回的是使用接收方公钥加密后的对称密钥
def getEncryptKey(): def getEncryptKey(getRecvPubKey):
rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey()) rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey)
return base64.b64encode(rsaEncrySymKey).decode("utf-8") return base64.b64encode(rsaEncrySymKey).decode("utf-8")
# 获得接收方的公钥 # 获得接收方的公钥
def getRecvPubKey(): def getRecvPubKey():
# recPubKey = input("plz input Receiver's Public Key: ") # recPubKey = input(" plz input Receiver's Public Key: ")
# 在某某地方获得对方的公钥,然后保存到某个地方,输入路径uoqu # 在某某地方获得对方的公钥,然后保存到某个地方,输入路径
recPubKeyPath = input("请输入接受方的公钥文件路径:") recPubKeyPath = input("请输入接受方的公钥文件路径:")
with open(recPubKeyPath, "rb") as f: with open(recPubKeyPath, "rb") as f:
data = f.read() data = f.read()

@ -0,0 +1,143 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
from entity.Letter import Letter,json_to_obj
import select
import sender
from recv import recv
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class sender_net():
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port=port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, msg):
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def run(self):
threading.Thread(target=self.main).start()
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print(" [*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
#加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
#test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = sender_net(port, client_host, client_port)
a.run()

@ -1,7 +1,20 @@
import os
import sys import sys
import winreg import winreg
from pathlib import Path from pathlib import Path
def get_config_path():
if sys.platform == 'win32':
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars("%appdata%/fst")).resolve()
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'fst'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)
def get_download_directory(): def get_download_directory():
if sys.platform == 'win32': if sys.platform == 'win32':
@ -13,8 +26,8 @@ def get_download_directory():
# 读取 "Downloads" 路径 # 读取 "Downloads" 路径
download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}') download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}')
# 将路径从注册表中的格式转换为标准格式 # 展开环境变量并转换为标准格式
download_dir = Path(download_dir).resolve() download_dir = Path(os.path.expandvars(download_dir)).resolve()
finally: finally:
# 关闭注册表项 # 关闭注册表项

@ -10,7 +10,6 @@ def getUserKey() -> (str, str): # 返回base64编码
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath): if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对 # 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath) RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥 # 读取私钥
with open(config.priKeySavePath, "rb") as f: with open(config.priKeySavePath, "rb") as f:
data = f.read() data = f.read()

Loading…
Cancel
Save