Compare commits

..

2 Commits

Author SHA1 Message Date
recyvan 10ad84e676 1111
3 months ago
recyvan 2a470a47cf 修复了断开重连失效bug
3 months ago

@ -1,6 +1,6 @@
import pyfiglet # 程序greet
from senders import sender_net
from sender import sender_net
# 程序greet
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")

@ -0,0 +1,128 @@
import json
import socket
import threading
from time import sleep
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
class RecverNet:
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
self.is_running = True
def recver_thread(self):
while self.is_running:
try:
conn, addr = self.server.accept()
print(f"\n[*] Connection from {addr}")
with conn:
while self.is_running:
data = conn.recv(10240)
if not data:
print("\n[x] Disconnected from client.")
break
msg = json.loads(data.decode())
self.handle_message(msg)
except Exception as e:
print(f"\n[x] Error in recver_thread: {e}")
def handle_message(self, msg):
if msg['flag'] == 0:
print("\n[+] Received data:", msg["data"])
elif msg['flag'] == 1:
self.recv_file(msg['data'])
else:
print("\n[x] Error: Unknown flag")
def send_thread(self, conn):
while self.is_running:
try:
flag = input("[*] Enter 0 to send data, 1 to send file: ")
if flag == "0":
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
self.send_data(conn, msg)
elif flag == "1":
print("[+] Calling file transfer module...")
self.send_file(conn)
else:
print("[x] Invalid input. Please enter 0 or 1.")
except Exception as e:
print(f"[x] Error in send_thread: {e}")
self.is_running = False
def send_data(self, conn, msg):
conn.sendall(json.dumps(msg).encode("utf-8"))
def send_file(self, conn):
self.letter = sender.main()
letter_data = self.letter.to_dict()
msg = {"flag": 1, "data": letter_data}
self.send_data(conn, msg)
def recv_file(self, data):
letter = json_to_obj(data)
recv.handleLetter(letter)
def connect(self):
while True:
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((self.client_host, self.client_port))
return client
except socket.error:
print("[*] Connection failed. Retrying...")
sleep(2)
def main(self):
client = self.connect()
threading.Thread(target=self.recver_thread, daemon=True).start()
self.send_thread(client)
while self.is_running==False:
self.is_running=True
client=self.connect()
self.send_thread(client)
sleep(1)
def run(self):
self.main()
def input_verify():
while True:
try:
port = int(input("[*] Enter the listen port: "))
if 0 <= port <= 65535:
break
else:
print("[x] Port must be between 0 and 65535.")
except ValueError:
print("[x] Invalid port number. Please try again.")
while True:
addr = input("[*] Enter the address to connect to (e.g., 127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535:
return port, client_host, client_port
else:
print("[x] Port must be between 0 and 65535.")
else:
print("[x] Invalid address format.")
else:
print("[x] Invalid address format.")
# Test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
recver_net = RecverNet(port, client_host, client_port)
recver_net.run()

@ -0,0 +1,32 @@
from unittest.mock import patch
import recv
from sender import sender
@patch('builtins.input', side_effect=['../README.md', 'aes', 'ecb'])
def test_handleLetter_aes_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass

@ -1,141 +0,0 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recvs.recv
from senders import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recvs.recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -1,32 +0,0 @@
from unittest.mock import patch
import recv
from senders import sender
@patch('builtins.input', side_effect=['../README.md',r'D:\Users\Timmoc\AppData\Roaming\fst\public.pem','aes', 'ecb'])
def test_handleLetter_aes_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md',r'D:\Users\Timmoc\AppData\Roaming\fst\public.pem', 'aes', 'cbc'])
def test_handleLetter_aes_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md',r'D:\Users\Timmoc\AppData\Roaming\fst\public.pem', 'sm4', 'ecb'])
def test_handleLetter_sm4_ecb(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass
@patch('builtins.input', side_effect=['../README.md',r'D:\Users\Timmoc\AppData\Roaming\fst\public.pem', 'sm4', 'cbc'])
def test_handleLetter_sm4_cbc(mock_input):
letter = sender.main()
recv.handleLetter(letter)
pass

@ -1,25 +1,26 @@
# 模式,文件,自己的公钥从哪里来,别人的公钥从哪里来
import pyfiglet
import base64
import os
import pyfiglet
from Crypto.Random import get_random_bytes
from entity.Letter import Letter
from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA
from tool.hash import Segwit
from tool.symmetric.AES import AESUtils
from tool.symmetric.SM4 import encrypt_ecb, encrypt_cbc_with_iv
from Crypto.Random import get_random_bytes
# 三个全局变量,记录信封
letterWay = ""
letterMode = ""
letterSymKey = b""
def main():
# greet
print("")
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
@ -34,13 +35,14 @@ def main():
# 用户输入各种数据填充letter字段
path = selectFile()
with open(path, "rb") as f:
with open(path,"rb") as f:
data = f.read()
letter.fileName = getFileName(path)
letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"), letterSymKey)
letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
letter.encryptKey = getEncryptKey(letter.recvPubKey)
letter.encryptType = getEncryptType()
@ -54,19 +56,12 @@ def main():
print(letter.recvPubKey)
print(letter.senderPubKey)
# by Timmoc
print(
f"已以 {Segwit.encodeSegwit(letter.senderPubKey.encode('utf-8'))} 的身份向"
f" {Segwit.encodeSegwit(letter.recvPubKey.encode('utf-8'))} 生成加密信件")
# by Timmoc
# 获取用户的公私钥对进行签名
# 使用对方的公钥进行加密
# 发送信件
return letter # 方便recv测试以后可以删除。
return letter # 方便recv测试以后可以删除。
pass
def selectFile() -> str:
# s = "public.pem"
while True:
@ -74,7 +69,7 @@ def selectFile() -> str:
s = input("输入文件路径:")
with open(s, "rb") as _:
pass
return s
return s
except FileNotFoundError:
print("错误: 文件未找到,请输入正确的文件路径。")
except PermissionError:
@ -82,13 +77,11 @@ def selectFile() -> str:
except Exception as e:
print(f"发生未知错误: {e}")
# 获得文件名
def getFileName(fName: str) -> str:
def getFileName(fName:str) -> str:
filePath = os.path.split(fName)
return filePath[-1]
def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# 向目标ip和端口发送指定的信件
pass
@ -136,6 +129,7 @@ def selectSymEncryptionChoice():
# 使用对称加密,返回加密后的数据和随机生成的密钥
def SymEncryption(encryData, key: bytes = None):
global letterSymKey
# 获得加密的方法和加密的模式
way, mode = selectSymEncryptionChoice()
@ -145,7 +139,7 @@ def SymEncryption(encryData, key: bytes = None):
encryptedData, trueKey = aesUtils.encrypt(encryData, mode=mode) # 这里encryData要改为文件内容
letterSymKey = trueKey
return encryptedData, trueKey
return encryptedData,trueKey
if way == "sm4":
key = get_random_bytes(16)
@ -157,30 +151,27 @@ def SymEncryption(encryData, key: bytes = None):
letterSymKey = key
return encrypted_data, key
# 获得签名
def getSign(document_bytes):
# 计算文件bytes
#计算文件bytes
priKey, pubKey = PriKeyHelper.getUserKey()
signDocuHash = RSA.sign_message(document_bytes, priKey)
return signDocuHash
# 获得加密的方法和模式,封装信封
def getEncryptType():
encryType = f"{letterWay}_{letterMode}".upper()
return encryType
# 对称密钥,返回的是使用接收方公钥加密后的对称密钥
def getEncryptKey(getRecvPubKey):
rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey)
return base64.b64encode(rsaEncrySymKey).decode("utf-8")
# 获得接收方的公钥
def getRecvPubKey():
# recPubKey = input(" plz input Receiver's Public Key: ")
@ -191,7 +182,6 @@ def getRecvPubKey():
recPubKey = base64.b64encode(data).decode('utf-8')
return recPubKey
# 获得发送方的公钥
def getSenderPubKey():
privateKey, publicKey = getUserKey()
@ -208,4 +198,4 @@ if __name__ == "__main__":
#
# encryptType = f"{letterWay}_{letterMode}".upper()
# print(encryptType)
main()
main()

@ -4,9 +4,9 @@ import socket
import threading
from time import sleep
import senders.sender
import sender
from entity.Letter import Letter, json_to_obj
from recvs import recv
from recv import recv
# from entity.Letter import Letter
@ -70,7 +70,7 @@ class sender_net():
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = senders.sender.main()
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
Loading…
Cancel
Save