Compare commits
36 Commits
2689090700
...
5010769b54
@ -0,0 +1,10 @@
|
||||
import pyfiglet # 程序greet
|
||||
|
||||
# 程序greet
|
||||
print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
|
||||
greet = pyfiglet.figlet_format("File Secure Transfer", font="slant" , width=250)
|
||||
print(greet)
|
||||
print(" <For Secure And Fast File Transfer>")
|
||||
author = " <-Made By Li-Nuo-Cheng Tan-Jun-Wen Ren-Qing-Yu->"
|
||||
print(author)
|
||||
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
|
@ -0,0 +1,141 @@
|
||||
import hashlib
|
||||
import json
|
||||
import selectors
|
||||
import socket
|
||||
import threading
|
||||
from time import sleep
|
||||
|
||||
import select
|
||||
|
||||
from entity.Letter import Letter, json_to_obj
|
||||
import recv
|
||||
from sender import sender
|
||||
|
||||
|
||||
# from entity.Letter import Letter
|
||||
|
||||
|
||||
# def main():
|
||||
# # 用户输入各种数据填充letter字段
|
||||
# # 获取用户的公私钥对进行签名
|
||||
# # 使用对方的公钥进行加密
|
||||
# # 发送信件
|
||||
# pass
|
||||
#
|
||||
#
|
||||
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
|
||||
# # 向目标ip和端口发送指定的信件
|
||||
# pass
|
||||
|
||||
class recver_net():
|
||||
def __init__(self, port, client_host, client_port):
|
||||
self.client_host = client_host
|
||||
self.client_port = client_port
|
||||
self.letter = Letter()
|
||||
self.port = port
|
||||
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.server.bind(('0.0.0.0', self.port))
|
||||
self.server.listen(10)
|
||||
|
||||
def recver_thread(self):
|
||||
while True:
|
||||
conn, addr = self.server.accept()
|
||||
data = conn.recv(10240)
|
||||
msg = json.loads(data.decode())
|
||||
if int(msg['flag']) == 0:
|
||||
print("[+] haved received data:" + msg["data"])
|
||||
elif int(msg['flag']) == 1:
|
||||
self.recv_file(msg['data'])
|
||||
else:
|
||||
print("[x] Error")
|
||||
|
||||
def send_thread(self, conn):
|
||||
while True:
|
||||
try:
|
||||
flag: int = int(input())
|
||||
if flag == 0:
|
||||
data = input("[-] Enter data to send: ")
|
||||
msg = {"flag": 0, "data": data}
|
||||
msg = json.dumps(msg).encode("utf-8")
|
||||
self.send_data(conn, msg)
|
||||
elif flag == 1:
|
||||
print("[+] Calling file transfer module...")
|
||||
# file_name = input("Enter file name to send: ")
|
||||
self.send_file(conn)
|
||||
else:
|
||||
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
|
||||
except ValueError:
|
||||
print("[x] Error, please enter a valid number.")
|
||||
|
||||
def send_data(self, conn, data):
|
||||
conn.sendall(data)
|
||||
|
||||
def send_file(self, conn):
|
||||
self.letter: Letter = sender.main()
|
||||
letter = self.letter.to_dict()
|
||||
msg = {"flag": 1, "data": letter}
|
||||
msg = json.dumps(msg).encode("utf-8")
|
||||
conn.sendall(msg)
|
||||
|
||||
def recv_data(self, conn):
|
||||
con, addr = conn.accept()
|
||||
data = con.recv(1024)
|
||||
if not data:
|
||||
return None
|
||||
# print(data.decode())
|
||||
return data.decode()
|
||||
|
||||
def recv_file(self, data):
|
||||
letter: Letter = json_to_obj(data)
|
||||
recv.handleLetter(letter)
|
||||
|
||||
def main(self):
|
||||
|
||||
while True:
|
||||
try:
|
||||
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
|
||||
client.connect((self.client_host, self.client_port))
|
||||
print("[*] Connected...")
|
||||
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
|
||||
sleep(1)
|
||||
break
|
||||
except socket.error:
|
||||
print("[*] Waiting for ...")
|
||||
# 加入线程
|
||||
threading.Thread(target=self.recver_thread, ).start()
|
||||
threading.Thread(target=self.send_thread, args=(client,)).start()
|
||||
|
||||
def run(self):
|
||||
threading.Thread(target=self.main).start()
|
||||
|
||||
|
||||
def input_verify():
|
||||
while True:
|
||||
try:
|
||||
port = int(input("[*] Enter to the listen port: "))
|
||||
break
|
||||
except ValueError:
|
||||
print("[x] Error, please enter a valid port number.")
|
||||
|
||||
while True:
|
||||
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
|
||||
if ':' in addr:
|
||||
client_host, client_port = addr.split(":")
|
||||
if client_host and client_port.isdigit():
|
||||
client_port = int(client_port)
|
||||
if 0 <= client_port <= 65535: # 检查端口范围
|
||||
break # 输入有效,跳出循环
|
||||
else:
|
||||
print("Port must be between 0 and 65535.")
|
||||
else:
|
||||
print("Invalid address format. Please enter in the format 'host:port'.")
|
||||
else:
|
||||
print("Invalid address format. Please enter in the format 'host:port'.")
|
||||
return port, client_host, client_port
|
||||
|
||||
|
||||
# test
|
||||
if __name__ == '__main__':
|
||||
port, client_host, client_port = input_verify()
|
||||
a = recver_net(port, client_host, client_port)
|
||||
a.run()
|
@ -0,0 +1,143 @@
|
||||
import hashlib
|
||||
import json
|
||||
import selectors
|
||||
import socket
|
||||
import threading
|
||||
from time import sleep
|
||||
from entity.Letter import Letter,json_to_obj
|
||||
import select
|
||||
import sender
|
||||
from recv import recv
|
||||
|
||||
|
||||
# from entity.Letter import Letter
|
||||
|
||||
|
||||
# def main():
|
||||
# # 用户输入各种数据填充letter字段
|
||||
# # 获取用户的公私钥对进行签名
|
||||
# # 使用对方的公钥进行加密
|
||||
# # 发送信件
|
||||
# pass
|
||||
#
|
||||
#
|
||||
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
|
||||
# # 向目标ip和端口发送指定的信件
|
||||
# pass
|
||||
|
||||
class sender_net():
|
||||
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
|
||||
self.client_host = client_host
|
||||
self.client_port = client_port
|
||||
self.letter = Letter()
|
||||
self.port=port
|
||||
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.server.bind(('0.0.0.0', self.port))
|
||||
self.server.listen(10)
|
||||
|
||||
|
||||
def recver_thread(self):
|
||||
while True:
|
||||
conn, addr = self.server.accept()
|
||||
data = conn.recv(10240)
|
||||
msg = json.loads(data.decode())
|
||||
if int(msg['flag']) == 0:
|
||||
print("[+] haved received data:" + msg["data"])
|
||||
elif int(msg['flag']) == 1:
|
||||
self.recv_file(msg['data'])
|
||||
else:
|
||||
print("[x] Error")
|
||||
|
||||
def send_thread(self, conn):
|
||||
while True:
|
||||
try:
|
||||
flag: int = int(input())
|
||||
if flag == 0:
|
||||
data = input("[-] Enter data to send: ")
|
||||
msg = {"flag": 0, "data": data}
|
||||
msg = json.dumps(msg).encode("utf-8")
|
||||
self.send_data(conn, msg)
|
||||
elif flag == 1:
|
||||
print("[+] Calling file transfer module...")
|
||||
# file_name = input("Enter file name to send: ")
|
||||
self.send_file(conn)
|
||||
else:
|
||||
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
|
||||
except ValueError:
|
||||
print("[x] Error, please enter a valid number.")
|
||||
|
||||
|
||||
def send_data(self, conn, msg):
|
||||
conn.sendall(msg)
|
||||
|
||||
def send_file(self, conn):
|
||||
self.letter: Letter = sender.main()
|
||||
letter = self.letter.to_dict()
|
||||
msg = {"flag": 1, "data": letter}
|
||||
print(letter)
|
||||
msg = json.dumps(msg).encode("utf-8")
|
||||
conn.sendall(msg)
|
||||
|
||||
def recv_data(self, conn):
|
||||
con, addr = conn.accept()
|
||||
data = con.recv(1024)
|
||||
if not data:
|
||||
return None
|
||||
# print(data.decode())
|
||||
return data.decode()
|
||||
|
||||
def recv_file(self, data):
|
||||
letter: Letter = json_to_obj(data)
|
||||
recv.handleLetter(letter)
|
||||
|
||||
|
||||
def run(self):
|
||||
threading.Thread(target=self.main).start()
|
||||
|
||||
def main(self):
|
||||
while True:
|
||||
try:
|
||||
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
|
||||
client.connect((self.client_host, self.client_port))
|
||||
print(" [*] Connected...")
|
||||
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
|
||||
sleep(1)
|
||||
break
|
||||
except socket.error:
|
||||
print("[*] Waiting for ...")
|
||||
#加入线程
|
||||
threading.Thread(target=self.recver_thread, ).start()
|
||||
threading.Thread(target=self.send_thread, args=(client,)).start()
|
||||
|
||||
|
||||
|
||||
def input_verify():
|
||||
while True:
|
||||
try:
|
||||
port = int(input("[*] Enter to the listen port: "))
|
||||
break
|
||||
except ValueError:
|
||||
print("[x] Error, please enter a valid port number.")
|
||||
|
||||
while True:
|
||||
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
|
||||
if ':' in addr:
|
||||
client_host, client_port = addr.split(":")
|
||||
if client_host and client_port.isdigit():
|
||||
client_port = int(client_port)
|
||||
if 0 <= client_port <= 65535: # 检查端口范围
|
||||
break # 输入有效,跳出循环
|
||||
else:
|
||||
print("Port must be between 0 and 65535.")
|
||||
else:
|
||||
print("Invalid address format. Please enter in the format 'host:port'.")
|
||||
else:
|
||||
print("Invalid address format. Please enter in the format 'host:port'.")
|
||||
return port, client_host, client_port
|
||||
|
||||
|
||||
#test
|
||||
if __name__ == '__main__':
|
||||
port, client_host, client_port = input_verify()
|
||||
a = sender_net(port, client_host, client_port)
|
||||
a.run()
|
@ -0,0 +1,42 @@
|
||||
import os
|
||||
import sys
|
||||
import winreg
|
||||
from pathlib import Path
|
||||
|
||||
def get_config_path():
|
||||
if sys.platform == 'win32':
|
||||
# 展开环境变量并转换为标准格式
|
||||
download_dir = Path(os.path.expandvars("%appdata%/fst")).resolve()
|
||||
else:
|
||||
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
|
||||
home_dir = Path.home()
|
||||
download_dir = home_dir / 'fst'
|
||||
dir_path = Path(download_dir)
|
||||
if not dir_path.exists():
|
||||
dir_path.mkdir(parents=True, exist_ok=True)
|
||||
return str(download_dir)
|
||||
|
||||
def get_download_directory():
|
||||
if sys.platform == 'win32':
|
||||
# 打开注册表项
|
||||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
|
||||
r'Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders')
|
||||
|
||||
try:
|
||||
# 读取 "Downloads" 路径
|
||||
download_dir, _ = winreg.QueryValueEx(key, '{374DE290-123F-4565-9164-39C4925E467B}')
|
||||
|
||||
# 展开环境变量并转换为标准格式
|
||||
download_dir = Path(os.path.expandvars(download_dir)).resolve()
|
||||
|
||||
finally:
|
||||
# 关闭注册表项
|
||||
winreg.CloseKey(key)
|
||||
else:
|
||||
# 对于非 Windows 系统,使用 ~\Downloads 作为默认下载目录
|
||||
home_dir = Path.home()
|
||||
download_dir = home_dir / 'Downloads'
|
||||
dir_path = Path(download_dir)
|
||||
if not dir_path.exists():
|
||||
dir_path.mkdir(parents=True, exist_ok=True)
|
||||
return str(download_dir)
|
@ -1,50 +1,55 @@
|
||||
# "ecb", "cbc", "cfb", "ofb"
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
from Crypto.Random import get_random_bytes
|
||||
import base64
|
||||
from fontTools.misc.eexec import encrypt
|
||||
|
||||
|
||||
class AESUtils:
|
||||
def __init__(self, key: bytes = None):
|
||||
"""生成一个随机密钥"""
|
||||
self.key = get_random_bytes(32)
|
||||
self.key = key if key else get_random_bytes(16)
|
||||
|
||||
def encrypt(self, data: str, mode: str = 'ECB') -> (str,str):
|
||||
def encrypt(self, data1: str, mode: str = 'ECB') -> (str, str):
|
||||
"""加密数据"""
|
||||
cipher = self._get_cipher(mode)
|
||||
data = pad(data.encode(), AES.block_size) # 填充数据
|
||||
ciphertext = cipher.encrypt(data)
|
||||
return base64.b64encode(ciphertext).decode(), self.key
|
||||
cipher = self._get_encipher(mode)
|
||||
data1 = pad(data1.encode('utf-8'), AES.block_size) # 填充数据
|
||||
ciphertext = cipher.encrypt(data1)
|
||||
return base64.b64encode(ciphertext).decode('utf-8'), self.key
|
||||
|
||||
def decrypt(self, encrypted_data: str, mode: str = 'ECB') -> str:
|
||||
def decrypt(self, dekey, encrypted_data: str, mode: str = 'ECB') -> str:
|
||||
"""解密数据"""
|
||||
cipher = self._get_cipher(mode)
|
||||
encrypted_data = base64.b64decode(encrypted_data)
|
||||
cipher = self._get_decipher(dekey, mode)
|
||||
encrypted_data = base64.b64decode(encrypted_data) # 解码密文
|
||||
plaintext = unpad(cipher.decrypt(encrypted_data), AES.block_size)
|
||||
return plaintext.decode()
|
||||
|
||||
def _get_cipher(self, mode: str) -> AES:
|
||||
"""根据模式返回相应的 AES cipher"""
|
||||
iv = None
|
||||
def _get_encipher(self, mode: str) -> AES:
|
||||
"""根据模式返回相应的加密 AES cipher"""
|
||||
iv = b'abcdefghijklmnop'
|
||||
if mode == 'CBC':
|
||||
iv = get_random_bytes(AES.block_size)
|
||||
return AES.new(self.key, AES.MODE_CBC, iv)
|
||||
elif mode == 'CFB':
|
||||
return AES.new(self.key, AES.MODE_CFB)
|
||||
elif mode == 'OFB':
|
||||
return AES.new(self.key, AES.MODE_OFB)
|
||||
else: # 默认是 ECB
|
||||
return AES.new(self.key, AES.MODE_ECB)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 示例数据和密钥
|
||||
data = "Hello, SM4!"
|
||||
key = "1234567890abcdef"
|
||||
def _get_decipher(self, dekey, mode: str) -> AES:
|
||||
"""根据模式返回相应的解密 AES cipher"""
|
||||
iv = b'abcdefghijklmnop'
|
||||
if mode == 'CBC':
|
||||
iv = get_random_bytes(AES.block_size)
|
||||
return AES.new(dekey, AES.MODE_CBC, iv)
|
||||
else: # 默认是 ECB
|
||||
return AES.new(dekey, AES.MODE_ECB)
|
||||
|
||||
enData , akey = AESUtils(key)
|
||||
if __name__ == '__main__':
|
||||
key = b'1234567890abcdef' # AES 密钥必须是 16 字节
|
||||
aes = AESUtils(key)
|
||||
|
||||
print(enData)
|
||||
# 加密
|
||||
plaintext = "Hello, AES!"
|
||||
enData, akey = aes.encrypt(plaintext, mode='ECB')
|
||||
print("Encrypted Data:", enData)
|
||||
print("Key:", akey)
|
||||
|
||||
# 解密
|
||||
decrypted = aes.decrypt(aes.key, enData, mode='ECB')
|
||||
print("Decrypted:", decrypted)
|
Loading…
Reference in new issue