Compare commits

...

16 Commits

@ -10,8 +10,6 @@ pubKeySavePath = f"{configPath}/public.pem"
class EncryptType(Enum): class EncryptType(Enum):
AES_ECB = auto() AES_ECB = auto()
AES_CBC = auto() AES_CBC = auto()
AES_CFB = auto()
AES_OFB = auto()
SM4_ECB = auto() SM4_ECB = auto()
SM4_CBC = auto() SM4_CBC = auto()

@ -1,9 +1,41 @@
import json
class Letter: class Letter:
sign = "计算得到" sign = "计算得到"
encryptType = "SM4_GCM" encryptType = "SM4_GCM"
encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据 encryptKey = "计算获得" # recvPubKey 加密后的 对称加密秘钥 数据
recvPubKey = "" recvPubKey = ""
senderPubKey = "" senderPubKey = ""
fileName = "" fileName = ""
fileBase64 = "" fileBase64 = ""
def to_dict(self):
return {
"sign": self.sign,
"encryptType": self.encryptType,
"encryptKey": self.encryptKey,
"recvPubKey": self.recvPubKey,
"senderPubKey": self.senderPubKey,
"fileName": self.fileName,
"fileBase64": self.fileBase64
}
def json_to_obj(json_str):
new_obj= Letter()
new_obj.sign = json_str["sign"]
new_obj.encryptType = json_str["encryptType"]
new_obj.encryptKey = json_str["encryptKey"]
new_obj.recvPubKey = json_str["recvPubKey"]
new_obj.senderPubKey = json_str["senderPubKey"]
new_obj.fileName = json_str["fileName"]
new_obj.fileBase64 = json_str["fileBase64"]
return new_obj
# test code
if __name__ == '__main__':
letter = Letter()
# print(json.dumps(letter.to_dict(), indent=1))
json_dict = json.loads(json.dumps(letter.to_dict()))
print(json_dict.__dir__)

@ -23,6 +23,8 @@ def handleLetter(letter: Letter):
# 解析信件 确认收信人 # 解析信件 确认收信人
# 获取自身key # 获取自身key
pki = PriKeyHelper.getUserKey() pki = PriKeyHelper.getUserKey()
print("pki is: ",pki[1])
print("letter.recvPubKey is: ",letter.recvPubKey)
if pki[1] != letter.recvPubKey: if pki[1] != letter.recvPubKey:
raise Exception("信件不属于自己") raise Exception("信件不属于自己")
# 用自己的私钥解密key 获得对称加密秘钥。 # 用自己的私钥解密key 获得对称加密秘钥。
@ -40,9 +42,9 @@ def handleLetter(letter: Letter):
elif type == config.EncryptType.SM4_CBC: elif type == config.EncryptType.SM4_CBC:
data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64, key)) data = base64.b64decode(SM4.decrypt_cbc_with_iv(letter.fileBase64, key))
elif type == config.EncryptType.AES_ECB: elif type == config.EncryptType.AES_ECB:
data = base64.b64decode(AES.AESUtils(key).decrypt(key, letter.fileBase64, "ecb")) data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64, "ecb"))
elif type == config.EncryptType.AES_CBC: elif type == config.EncryptType.AES_CBC:
data = base64.b64decode(AES.AESUtils(key).decrypt(key, letter.fileBase64,"cbc")) data = base64.b64decode(AES.AESUtils().decrypt(key, letter.fileBase64,"cbc"))
else: else:
raise KeyError("不支持的对称加密算法") raise KeyError("不支持的对称加密算法")

@ -0,0 +1,141 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
import select
from entity.Letter import Letter, json_to_obj
import recv
from sender import sender
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class recver_net():
def __init__(self, port, client_host, client_port):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, data):
conn.sendall(data)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print("[*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
# 加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def run(self):
threading.Thread(target=self.main).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
# test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = recver_net(port, client_host, client_port)
a.run()

@ -6,12 +6,11 @@ import os
from entity.Letter import Letter from entity.Letter import Letter
from itsdangerous import base64_encode
from tool import PriKeyHelper from tool import PriKeyHelper
from tool.PriKeyHelper import getUserKey from tool.PriKeyHelper import getUserKey
from tool.asymmetric import RSA from tool.asymmetric import RSA
from tool.symmetric.AES import AESUtils from tool.symmetric.AES import AESUtils
from tool.symmetric.SM4 import encrypt_ecb, decrypt_cbc_with_iv, encrypt_cbc_with_iv from tool.symmetric.SM4 import encrypt_ecb, encrypt_cbc_with_iv
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
@ -43,8 +42,8 @@ def main():
letter.fileName = getFileName(path) letter.fileName = getFileName(path)
letter.recvPubKey = getRecvPubKey() letter.recvPubKey = getRecvPubKey()
letter.senderPubKey = getSenderPubKey() letter.senderPubKey = getSenderPubKey()
letter.fileBase64, akey = SymEncryption(base64_encode(data).decode("utf-8"),letterSymKey) letter.fileBase64, akey = SymEncryption(base64.b64encode(data).decode("utf-8"),letterSymKey)
letter.encryptKey = getEncryptKey() letter.encryptKey = getEncryptKey(letter.recvPubKey)
letter.encryptType = getEncryptType() letter.encryptType = getEncryptType()
letter.sign = getSign(data) letter.sign = getSign(data)
@ -107,13 +106,13 @@ def selectSymEncryptionChoice():
# 选择加密算法的模式 # 选择加密算法的模式
while True: while True:
if encryWay == "aes": if encryWay == "aes":
encryMode = input("选择加密算法模式 (ecb/cbc/cfb/ofb): ").strip().lower() encryMode = input("选择加密算法模式 (ecb/cbc): ").strip().lower()
if encryMode in ["ecb", "cbc", "cfb", "ofb"]: if encryMode in ["ecb", "cbc"]:
letterMode = encryMode letterMode = encryMode
print(f"已选择 '{encryMode}' 加密模式.") print(f"已选择 '{encryMode}' 加密模式.")
break # 输入有效后退出循环 break # 输入有效后退出循环
else: else:
print("非法输入,请输入 ecb/cbc/cfb/ofb") print("非法输入,请输入 ecb/cbc")
elif encryWay == "sm4": elif encryWay == "sm4":
encryMode = input("选择加密模式 (ecb/cbc): ").strip().lower() encryMode = input("选择加密模式 (ecb/cbc): ").strip().lower()
@ -167,16 +166,20 @@ def getEncryptType():
return encryType return encryType
# 对称密钥,返回的是使用接收方公钥加密后的密钥 # 对称密钥,返回的是使用接收方公钥加密后的对称密钥
def getEncryptKey(): def getEncryptKey(getRecvPubKey):
rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey()) rsaEncrySymKey = RSA.encrypt_message(letterSymKey, getRecvPubKey)
return base64.b64encode(rsaEncrySymKey).decode("utf-8") return base64.b64encode(rsaEncrySymKey).decode("utf-8")
# 获得接收方的公钥 # 获得接收方的公钥
def getRecvPubKey(): def getRecvPubKey():
# recPubKey = input("plz input Receiver's Public Key: ") # recPubKey = input(" plz input Receiver's Public Key: ")
recPubKey = getUserKey()[1] # 在某某地方获得对方的公钥,然后保存到某个地方,输入路径
recPubKeyPath = input("请输入接受方的公钥文件路径:")
with open(recPubKeyPath, "rb") as f:
data = f.read()
recPubKey = base64.b64encode(data).decode('utf-8')
return recPubKey return recPubKey
# 获得发送方的公钥 # 获得发送方的公钥

@ -0,0 +1,143 @@
import hashlib
import json
import selectors
import socket
import threading
from time import sleep
from entity.Letter import Letter,json_to_obj
import select
import sender
from recv import recv
# from entity.Letter import Letter
# def main():
# # 用户输入各种数据填充letter字段
# # 获取用户的公私钥对进行签名
# # 使用对方的公钥进行加密
# # 发送信件
# pass
#
#
# def sendLetter(letter: Letter, target="192.168.195.162:8426"):
# # 向目标ip和端口发送指定的信件
# pass
class sender_net():
def __init__(self,port=8424,client_host='127.0.0.1',client_port=8424):
self.client_host = client_host
self.client_port = client_port
self.letter = Letter()
self.port=port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('0.0.0.0', self.port))
self.server.listen(10)
def recver_thread(self):
while True:
conn, addr = self.server.accept()
data = conn.recv(10240)
msg = json.loads(data.decode())
if int(msg['flag']) == 0:
print("[+] haved received data:" + msg["data"])
elif int(msg['flag']) == 1:
self.recv_file(msg['data'])
else:
print("[x] Error")
def send_thread(self, conn):
while True:
try:
flag: int = int(input())
if flag == 0:
data = input("[-] Enter data to send: ")
msg = {"flag": 0, "data": data}
msg = json.dumps(msg).encode("utf-8")
self.send_data(conn, msg)
elif flag == 1:
print("[+] Calling file transfer module...")
# file_name = input("Enter file name to send: ")
self.send_file(conn)
else:
print("[x] Error,plesae enter 0 to send data, or 1 to send file.")
except ValueError:
print("[x] Error, please enter a valid number.")
def send_data(self, conn, msg):
conn.sendall(msg)
def send_file(self, conn):
self.letter: Letter = sender.main()
letter = self.letter.to_dict()
msg = {"flag": 1, "data": letter}
print(letter)
msg = json.dumps(msg).encode("utf-8")
conn.sendall(msg)
def recv_data(self, conn):
con, addr = conn.accept()
data = con.recv(1024)
if not data:
return None
# print(data.decode())
return data.decode()
def recv_file(self, data):
letter: Letter = json_to_obj(data)
recv.handleLetter(letter)
def run(self):
threading.Thread(target=self.main).start()
def main(self):
while True:
try:
client = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
client.connect((self.client_host, self.client_port))
print(" [*] Connected...")
print("[*] if you want to send data, enter 0, if you want to send file, enter 1.")
sleep(1)
break
except socket.error:
print("[*] Waiting for ...")
#加入线程
threading.Thread(target=self.recver_thread, ).start()
threading.Thread(target=self.send_thread, args=(client,)).start()
def input_verify():
while True:
try:
port = int(input("[*] Enter to the listen port: "))
break
except ValueError:
print("[x] Error, please enter a valid port number.")
while True:
addr = input("[*] Enter the address to connect to(127.0.0.1:8424): ")
if ':' in addr:
client_host, client_port = addr.split(":")
if client_host and client_port.isdigit():
client_port = int(client_port)
if 0 <= client_port <= 65535: # 检查端口范围
break # 输入有效,跳出循环
else:
print("Port must be between 0 and 65535.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
else:
print("Invalid address format. Please enter in the format 'host:port'.")
return port, client_host, client_port
#test
if __name__ == '__main__':
port, client_host, client_port = input_verify()
a = sender_net(port, client_host, client_port)
a.run()

@ -10,7 +10,6 @@ def getUserKey() -> (str, str): # 返回base64编码
if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath): if not os.path.exists(config.priKeySavePath) or not os.path.exists(config.pubKeySavePath):
# 生成新的密钥对 # 生成新的密钥对
RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath) RSA.generate_keys(config.priKeySavePath,config.pubKeySavePath)
# 读取私钥 # 读取私钥
with open(config.priKeySavePath, "rb") as f: with open(config.priKeySavePath, "rb") as f:
data = f.read() data = f.read()

@ -24,27 +24,19 @@ class AESUtils:
def _get_encipher(self, mode: str) -> AES: def _get_encipher(self, mode: str) -> AES:
"""根据模式返回相应的加密 AES cipher""" """根据模式返回相应的加密 AES cipher"""
iv = None iv = b'abcdefghijklmnop'
if mode == 'CBC': if mode == 'CBC':
iv = get_random_bytes(AES.block_size) iv = get_random_bytes(AES.block_size)
return AES.new(self.key, AES.MODE_CBC, iv) return AES.new(self.key, AES.MODE_CBC, iv)
elif mode == 'CFB':
return AES.new(self.key, AES.MODE_CFB)
elif mode == 'OFB':
return AES.new(self.key, AES.MODE_OFB)
else: # 默认是 ECB else: # 默认是 ECB
return AES.new(self.key, AES.MODE_ECB) return AES.new(self.key, AES.MODE_ECB)
def _get_decipher(self, dekey, mode: str) -> AES: def _get_decipher(self, dekey, mode: str) -> AES:
"""根据模式返回相应的解密 AES cipher""" """根据模式返回相应的解密 AES cipher"""
iv = None iv = b'abcdefghijklmnop'
if mode == 'CBC': if mode == 'CBC':
iv = get_random_bytes(AES.block_size) iv = get_random_bytes(AES.block_size)
return AES.new(dekey, AES.MODE_CBC, iv) return AES.new(dekey, AES.MODE_CBC, iv)
elif mode == 'CFB':
return AES.new(dekey, AES.MODE_CFB)
elif mode == 'OFB':
return AES.new(dekey, AES.MODE_OFB)
else: # 默认是 ECB else: # 默认是 ECB
return AES.new(dekey, AES.MODE_ECB) return AES.new(dekey, AES.MODE_ECB)

Loading…
Cancel
Save