Compare commits

..

21 Commits

Author SHA1 Message Date
UniDarkstars ebddfeff74 修改了两次输入接收方公钥的问题
3 months ago
Timmoc 5010769b54 尝试合并三大分支进度
3 months ago
Timmoc 753e20382a Merge branch 'sender_okToCheck' into test_merge
3 months ago
Timmoc 104a523445 Merge branch 'recyvan_2' into test_merge
3 months ago
Timmoc 2bbb0962a1 往appdata灌石,公私钥对现存储于appdata目录下
3 months ago
recyvan dc73c10e43 修复了一下不知名bug
3 months ago
recyvan e6c7588e2f Merge remote-tracking branch 'origin/recyvan_2' into recyvan_2
3 months ago
recyvan 50ccd97d6c 支持ip地址格式,局域网访问
3 months ago
Timmoc 5d6226290d 禁止dangerous
3 months ago
recyvan 8227015134 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
Timmoc a14aa4bf31 使用readme进行测试
3 months ago
recyvan 3c34ce49a9 Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
Timmoc 9bd045678e 适配windows环境变量
3 months ago
recyvan b2a6196dbb 更新一下
3 months ago
Timmoc 632981180a Merge branch 'sender_okToCheck' into recv_timmoc
3 months ago
Timmoc 885d288719 增加pytest.ini
3 months ago
recyvan a67a749bb3 Merge remote-tracking branch 'origin/recv_timmoc' into recyvan_2
3 months ago
Timmoc 9a53fb2e12 python3.12惊喜双引号内双引号不报错
3 months ago
Timmoc 80a971702e aes base64 decode
3 months ago
recyvan 27766625bd Merge remote-tracking branch 'origin/sender_okToCheck' into recyvan_2
3 months ago
recyvan 8da58c145a 网络传输改为线程模式,这样就可同时接收和发送数据,没有先后顺序
3 months ago

@ -1,7 +1,11 @@
from enum import Enum, auto
from tool.DownloadPathTool import get_config_path
port = 8426
priKeySavePath = "./private.pem"
pubKeySavePath = "./public.pem"
configPath = get_config_path()
priKeySavePath = f"{configPath}/private.pem"
pubKeySavePath = f"{configPath}/public.pem"
class EncryptType(Enum):
AES_ECB = auto()

@ -1,9 +1,41 @@
import json
class Letter:
sign = "计算得到"
encryptType = "SM4_GCM"
encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据
encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据
recvPubKey = ""
senderPubKey = ""
fileName = ""
fileBase64 = ""
def to_dict(self):
return {
"sign": self.sign,
"encryptType": self.encryptType,
"encryptKey": self.encryptKey,
"recvPubKey": self.recvPubKey,
"senderPubKey": self.senderPubKey,
"fileName": self.fileName,
"fileBase64": self.fileBase64
}
def json_to_obj(json_str):
new_obj= Letter()
new_obj.sign = json_str["sign"]
new_obj.encryptType = json_str["encryptType"]
new_obj.encryptKey = json_str["encryptKey"]
new_obj.recvPubKey = json_str["recvPubKey"]
new_obj.senderPubKey = json_str["senderPubKey"]
new_obj.fileName = json_str["fileName"]
new_obj.fileBase64 = json_str["fileBase64"]
return new_obj
# test code
if __name__ == '__main__':
letter = Letter()
# print(json.dumps(letter.to_dict(), indent=1))
json_dict = json.loads(json.dumps(letter.to_dict()))
print(json_dict.__dir__)

@ -23,6 +23,8 @@ def handleLetter(letter: Letter):
# 解析信件 确认收信人
# 获取自身key
pki = PriKeyHelper.getUserKey()
print("pki is: ",pki[1])
print("letter.recvPubKey is: ",letter.recvPubKey)
if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。

@ -0,0 +1,141 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -4,28 +4,28 @@ import recv
from sender import sender
@patch('builtins.input', side_effect=['./public.pem', 'aes', 'ecb'])
@patch('builtins.input', side_effect=['../README.md', 'aes', 'ecb'])
def test_handleLetter_aes_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['./public.pem', 'aes', 'cbc'])
@patch('builtins.input', side_effect=['../README.md', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'ecb'])
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['./public.pem', 'sm4', 'cbc'])
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)

@ -6,7 +6,6 @@ import os
from entity.Letter import Letter
from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA
@ -44,7 +43,7 @@ def main():
letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
letter.encryptKey = getEncryptKey()
letter.encryptKey = getEncryptKey(letter.recvPubKey)
letter.encryptType = getEncryptType()
letter.sign = getSign(data)
@ -167,16 +166,16 @@ def getEncryptType():
return encryType
# 对称密钥,返回的是使用接收方公钥加密后的密钥
def getEncryptKey():
rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey())
# 对称密钥,返回的是使用接收方公钥加密后的对称密钥
def getEncryptKey(getRecvPubKey):
rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey)
return base64.b64encode(rsaEncrySymKey).decode("utf-8")
# 获得接收方的公钥
def getRecvPubKey():
# recPubKey = input("plz input Receiver's Public Key: ")
# 在某某地方获得对方的公钥,然后保存到某个地方,输入路径uoqu
# recPubKey = input(" plz input Receiver's Public Key: ")
# 在某某地方获得对方的公钥,然后保存到某个地方,输入路径
recPubKeyPath = input("请输入接受方的公钥文件路径:")
with open(recPubKeyPath, "rb") as f:
data = f.read()

@ -0,0 +1,143 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
from entity.Letter import Letter,json_to_obj
import select
import sender
from recv import recv
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class sender_net():
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port=port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, msg):
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def run(self):
threading.Thread(target=self.main).start()
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print(" [*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
#加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
#test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = sender_net(port, client_host, client_port)
a.run()

@ -1,7 +1,20 @@
import os
import sys
import winreg
from pathlib import Path
def get_config_path():
if sys.platform == 'win32':
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars("%appdata%/fst")).resolve()
else:
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
home_dir = Path.home()
download_dir = home_dir / 'fst'
dir_path = Path(download_dir)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
return str(download_dir)
def get_download_directory():
if sys.platform == 'win32':
@ -13,8 +26,8 @@ def get_download_directory():
# 读取 "Downloads" 路径
download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}')
# 将路径从注册表中的格式转换为标准格式
download_dir = Path(download_dir).resolve()
# 展开环境变量并转换为标准格式
download_dir = Path(os.path.expandvars(download_dir)).resolve()
finally:
# 关闭注册表项

@ -10,7 +10,6 @@ def getUserKey() -> (str, str): # 返回base64编码
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥
with open(config.priKeySavePath, "rb") as f:
data = f.read()

Loading…
Cancel
Save