You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
from SimObjs import *
from tkinter import *
from tkinter import messagebox
from PIL import ImageTk, Image
from AgreementUtil import AgreementUtil
def create_label(canvas: Canvas, x, y, width, height, text, rect_color, font_color, tag):
canvas.create_rectangle(x, y, x + width, y + height, fill=rect_color, tags=tag)
canvas.create_text(x + width / 2, y + height / 2, text=text, fill=font_color, tags=tag)
return tag
class PacketWindow(Toplevel):
def __init__(self, master, packet, packet_step=["app", "trans", "ip", "mac"], *args, **kwargs):
"""
packet{"packet": True, "mode": {"app": True, "trans": True, "ip": True, "mac": True}}
"""
super().__init__(master, *args, **kwargs)
self.master = master
self.width = 300
self.height = 400
self.geometry(f"{self.width}x{self.height}+200+200")
self.attributes("-topmost", True)
self.resizable(width=False, height=False)
self.packet_button_str = {"app": "应用层", "ip": "网络层", "mac": "链路层", "trans": "传输层"}
if not packet["packet"]:
packet["mode"] = packet["mode"][::-1] # True 为封包 False 为解包
self.packet_step = packet_step
self.step = 0
self.packet: dict[str: bool] = packet
self.sendObj: SimHost = packet["sendObj"]
self.receiveObj: SimHost = packet["receiveObj"]
self.packet_option = {"app": {"color": "#ff3b26", "command": self.app_packet if packet["packet"] else self.app_unpack},
"ip": {"color": "#00a2f2", "command": self.ip_packet if packet["packet"] else self.ip_unpack},
"mac": {"color": "#46707a", "command": self.mac_packet if packet["packet"] else self.mac_unpack},
"trans": {"color": "#710a00", "command": self.trans_packet if packet["packet"] else self.trans_unpack}}
self.canvas = Canvas(self, width=self.width, height=self.height)
self.canvas.place(x=0, y=0, anchor='nw')
self.packet_height, self.packet_init_width = 25, 100
self.background_img = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
self.canvas.create_image(0, 0, image=self.background_img, anchor=NW)
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, 10, width=self.packet_init_width, height=self.packet_height, text=self.packet["tag"], rect_color="#dee1e6", font_color="black", tag="packet")
self.step_y = {}
self.message = packet["tag"]
self.create_widget()
def move_to(self, tag, target_x, target_y, speed=10):
# 获取当前位置
current_coords = self.canvas.coords(tag)
if len(current_coords) < 2:
return # 如果没有坐标,就退出函数
current_x, current_y = current_coords[0], current_coords[1]
# 计算移动方向
delta_x = (target_x if target_x else current_x) - current_x
delta_y = (target_y if target_y else current_y) - current_y
# 计算下一步的位置
next_x = current_x + (speed if delta_x > 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x)
next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y)
# 移动对象
self.canvas.move(tag, next_x - current_x, next_y - current_y)
# 如果对象还没有到达目标,继续移动
if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y):
self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed))
def create_widget(self):
mode_text = "封包" if self.packet["packet"] else "解包"
num, margin_top, button_width, button_height = 1, 30, 120, 40
for data in self.packet["mode"]:
key = list(data.keys())[0]
value = list(data.values())[0]
if value:
Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"],
font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1),
width=button_width, height=button_height)
# 动画
# self.canvas.create_rectangle(self.width - 80 - 40, 80 + (margin_top + 20 + button_height) * (num - 1),
# self.width - 40, 80 + (margin_top + 20 + button_height) * (num - 1) + 25, fill=self.packet_option[key]["color"])
# self.canvas.create_text(self.width - 40 - 40, 80 + (margin_top + 20 + button_height) * (num - 1) + 12.5,
# text=self.packet_button_str[key], font=("", 16), fill="white")
# self.step_y[key] = 80 + (margin_top + 20 + button_height) * (num - 1)
num += 1
Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW)
Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW)
def send_packet(self):
if self.step != len(self.packet_step):
messagebox.showerror("注意", "尚未完成{}".format("封包" if self.packet["packet"] else "解包"))
return
self.packet["send"](self.message)
self.destroy()
def create_window(self, option):
toplevel = Toplevel(self)
toplevel.title(option["title"])
toplevel.geometry("450x220+300+300")
for entry_option in option["entry"]:
index = option["entry"].index(entry_option)
key = list(entry_option.keys())[0]
value = list(entry_option.values())[0]
Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10)
if value is None:
continue
Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10)
Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW)
Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW)
toplevel.attributes("-topmost", True)
return toplevel
def app_packet(self):
"""
应用层封包
"""
if self.packet_step[self.step] != "app":
messagebox.showerror("注意", "封包顺序出错!")
return
SourceAppAddr = StringVar()
TargetAppAddr = StringVar()
def packet():
toplevel.destroy()
source_app_addr = SourceAppAddr.get()
target_app_addr = TargetAppAddr.get()
if source_app_addr == self.sendObj.interface[0]["addr"] \
and target_app_addr == self.receiveObj.interface[0]["addr"]:
# 动画
# app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app")
# self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1)
# self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1)
self.message = target_app_addr + "&" + self.packet["tag"]
self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}],
"command": packet})
def app_unpack(self):
"""
应用层解包
"""
if self.packet_step[self.step] != "app":
messagebox.showerror("注意", "解包顺序出错!")
return
TargetAppAddr = StringVar()
def packet():
toplevel.destroy()
target_app_addr = TargetAppAddr.get()
if target_app_addr == self.receiveObj.interface[0]["addr"]:
self.message = self.message.split("&")[1]
self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}],
"command": packet})
def trans_packet(self):
"""
传输层封包
"""
if self.packet_step[self.step] != "trans":
messagebox.showerror("注意", "封包顺序出错!")
return
SentPort = StringVar()
RcptPort = StringVar()
SplitCount = IntVar()
def packet():
sent_port = SentPort.get()
rcpt_port = RcptPort.get()
split_count = SplitCount.get()
count = split_appdata(self.packet["size"])
print(sent_port, self.sendObj.interface[0]["conn_port"])
if sent_port == self.sendObj.interface[0]["conn_port"] \
and rcpt_port == self.receiveObj.interface[0]["conn_port"]\
and split_count == count:
toplevel.destroy()
# 动画
# self.canvas.delete(self.packet_rect)
# self.canvas.delete("app")
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]),
# width=self.packet_init_width + 50, height=self.packet_height,
# text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black",
# tag="packet")
# trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26",
# "white", tag="trans")
# self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1)
# self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1)
self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port))
self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)])))
self.step += 1
else:
messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort},
{"接收主机端口:": RcptPort},
{"拆包数量": SplitCount},
{"每个包约2048": None}],
"command": packet})
def trans_unpack(self):
"""
传输层解包
"""
print(self.packet_step)
print(self.step)
if self.packet_step[self.step] != "trans":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptPort = StringVar()
def packet():
rcpt_port = RcptPort.get()
if rcpt_port == self.receiveObj.interface[0]["conn_port"]:
toplevel.destroy()
data = AgreementUtil.parse_udp_packet(self.message)
print(data)
self.message = data[-1]
self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}],
"command": packet})
def ip_packet(self):
"""
网络层封包
"""
if self.packet_step[self.step] != "ip":
messagebox.showerror("注意", "封包顺序出错!")
return
SourceIP = StringVar()
TargetIP = StringVar()
def packet():
source_ip = SourceIP.get()
target_ip = TargetIP.get()
if source_ip == self.sendObj.interface[0]["ip"] \
and target_ip == self.receiveObj.interface[0]["ip"]:
toplevel.destroy()
self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip)
self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP": SourceIP},
{"接收主机IP": TargetIP}],
"command": packet})
def ip_unpack(self):
"""
网络层解包
"""
print(self.packet_step)
print(self.step)
if self.packet_step[self.step] != "ip":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptIP = StringVar()
def packet():
rcpt_ip = RcptIP.get()
if rcpt_ip == self.receiveObj.interface[0]["ip"]:
toplevel.destroy()
data = AgreementUtil.parse_ip_packet(self.message)
print(data)
self.message = data[-1]
self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP": RcptIP}],
"command": packet})
def mac_packet(self):
"""
链路层封包
"""
if self.packet_step[self.step] != "mac":
messagebox.showerror("注意", "封包顺序出错!")
return
SentMac = StringVar()
RcptMac = StringVar()
def packet():
sent_mac = SentMac.get()
rcpt_mac = RcptMac.get()
if sent_mac == self.sendObj.interface[0]["mac"] \
and rcpt_mac == self.receiveObj.interface[0]["mac"]:
toplevel.destroy()
self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac)
self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC": SentMac},
{"接收主机MAC": RcptMac}],
"command": packet})
def mac_unpack(self):
"""
链路层解包
"""
if self.packet_step[self.step] != "mac":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptMac = StringVar()
def packet():
rcpt_mac = RcptMac.get()
if rcpt_mac == self.receiveObj.interface[0]["mac"]:
toplevel.destroy()
data = AgreementUtil.parse_ethernet_frame(self.packet["tag"])
print(data)
self.message = data[-1]
self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC": RcptMac}],
"command": packet})
class AppData:
def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas):
self.obj_id = obj_id
self.id_key = id_key
self.app_pack_id = app_pack_id
self.app_pack_size = app_pack_size
self.app_pack_tag = app_pack_tag
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.app_packed_string = app_packed_string
self.timestamp = timestamp
def pack(self):
# 为了简化,我们打包成一个字典
return vars(self)
@staticmethod
def unpack(packed_data):
# 解包为AppData对象
return AppData(**packed_data)
class TransData:
def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp):
self.obj_id = obj_id
self.id_key = id_key
self.trans_pack_id = trans_pack_id
self.trans_seq = trans_seq
self.trans_tag = trans_tag
self.app_pack_id = app_pack_id
self.sent_port = sent_port
self.rcpt_port = rcpt_port
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.trans_packed_string = trans_packed_string
self.timestamp = timestamp
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return TransData(**packed_data)
class IPData:
def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string):
self.obj_id = obj_id
self.id_key = id_key
self.ip_pack_id = ip_pack_id
self.trans_pack_id = trans_pack_id
self.source_ip = source_ip
self.target_ip = target_ip
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.ip_packed_string = ip_packed_string
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return IPData(**packed_data)
class MACData:
def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string):
self.obj_id = obj_id
self.id_key = id_key
self.mac_pack_id = mac_pack_id
self.ip_pack_id = ip_pack_id
self.sent_mac = sent_mac
self.rcpt_mac = rcpt_mac
self.source_ip = source_ip
self.target_ip = target_ip
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.mac_packed_string = mac_packed_string
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return MACData(**packed_data)
def split_appdata(AppPackSize):
MTU = 2048
return AppPackSize // MTU + (AppPackSize % MTU > 0)
if __name__ == '__main__':
# 假设的最大传输单元MTU
MTU = 2048 # bytes
# 模拟的数据和数据大小
SIMULATED_DATA = "hello"
SIMULATED_SIZE = 2049 # 10 MB
# 创建应用层数据包
app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time())
packed_app_data = app_packet.pack()
print(packed_app_data)