|
|
import time
|
|
|
|
|
|
|
|
|
from SimObjs import *
|
|
|
from tkinter import *
|
|
|
from tkinter import messagebox
|
|
|
|
|
|
from PIL import ImageTk, Image
|
|
|
from AgreementUtil import AgreementUtil
|
|
|
|
|
|
|
|
|
def create_label(canvas: Canvas, x, y, width, height, text, rect_color, font_color, tag):
|
|
|
canvas.create_rectangle(x, y, x + width, y + height, fill=rect_color, tags=tag)
|
|
|
canvas.create_text(x + width / 2, y + height / 2, text=text, fill=font_color, tags=tag)
|
|
|
return tag
|
|
|
|
|
|
|
|
|
class PacketWindow(Toplevel):
|
|
|
def __init__(self, master, packet, packet_step=["app", "trans", "ip", "mac"], *args, **kwargs):
|
|
|
"""
|
|
|
packet:{"packet": True, "mode": {"app": True, "trans": True, "ip": True, "mac": True}}
|
|
|
"""
|
|
|
super().__init__(master, *args, **kwargs)
|
|
|
self.master = master
|
|
|
self.width = 300
|
|
|
self.height = 400
|
|
|
self.geometry(f"{self.width}x{self.height}+200+200")
|
|
|
self.attributes("-topmost", True)
|
|
|
self.resizable(width=False, height=False)
|
|
|
self.packet_button_str = {"app": "应用层", "ip": "网络层", "mac": "链路层", "trans": "传输层"}
|
|
|
if not packet["packet"]:
|
|
|
packet["mode"] = packet["mode"][::-1] # True 为封包 False 为解包
|
|
|
self.packet_step = packet_step
|
|
|
self.step = 0
|
|
|
self.packet: dict[str: bool] = packet
|
|
|
self.sendObj: SimHost = packet["sendObj"]
|
|
|
self.receiveObj: SimHost = packet["receiveObj"]
|
|
|
self.packet_option = {"app": {"color": "#ff3b26", "command": self.app_packet if packet["packet"] else self.app_unpack},
|
|
|
"ip": {"color": "#00a2f2", "command": self.ip_packet if packet["packet"] else self.ip_unpack},
|
|
|
"mac": {"color": "#46707a", "command": self.mac_packet if packet["packet"] else self.mac_unpack},
|
|
|
"trans": {"color": "#710a00", "command": self.trans_packet if packet["packet"] else self.trans_unpack}}
|
|
|
self.canvas = Canvas(self, width=self.width, height=self.height)
|
|
|
self.canvas.place(x=0, y=0, anchor='nw')
|
|
|
self.packet_height, self.packet_init_width = 25, 100
|
|
|
self.background_img = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
|
|
|
self.canvas.create_image(0, 0, image=self.background_img, anchor=NW)
|
|
|
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, 10, width=self.packet_init_width, height=self.packet_height, text=self.packet["tag"], rect_color="#dee1e6", font_color="black", tag="packet")
|
|
|
self.step_y = {}
|
|
|
self.message = packet["tag"]
|
|
|
self.create_widget()
|
|
|
|
|
|
def move_to(self, tag, target_x, target_y, speed=10):
|
|
|
# 获取当前位置
|
|
|
current_coords = self.canvas.coords(tag)
|
|
|
if len(current_coords) < 2:
|
|
|
return # 如果没有坐标,就退出函数
|
|
|
current_x, current_y = current_coords[0], current_coords[1]
|
|
|
# 计算移动方向
|
|
|
delta_x = (target_x if target_x else current_x) - current_x
|
|
|
delta_y = (target_y if target_y else current_y) - current_y
|
|
|
# 计算下一步的位置
|
|
|
next_x = current_x + (speed if delta_x > 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x)
|
|
|
next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y)
|
|
|
# 移动对象
|
|
|
self.canvas.move(tag, next_x - current_x, next_y - current_y)
|
|
|
# 如果对象还没有到达目标,继续移动
|
|
|
if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y):
|
|
|
self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed))
|
|
|
|
|
|
def create_widget(self):
|
|
|
mode_text = "封包" if self.packet["packet"] else "解包"
|
|
|
num, margin_top, button_width, button_height = 1, 30, 120, 40
|
|
|
for data in self.packet["mode"]:
|
|
|
key = list(data.keys())[0]
|
|
|
value = list(data.values())[0]
|
|
|
if value:
|
|
|
Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"],
|
|
|
font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1),
|
|
|
width=button_width, height=button_height)
|
|
|
|
|
|
num += 1
|
|
|
Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW)
|
|
|
Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW)
|
|
|
|
|
|
def send_packet(self):
|
|
|
if self.step != len(self.packet_step):
|
|
|
messagebox.showerror("注意", "尚未完成{}!".format("封包" if self.packet["packet"] else "解包"))
|
|
|
return
|
|
|
self.packet["send"](self.message)
|
|
|
self.destroy()
|
|
|
|
|
|
def create_window(self, option):
|
|
|
toplevel = Toplevel(self)
|
|
|
toplevel.title(option["title"])
|
|
|
toplevel.geometry("450x220+300+300")
|
|
|
for entry_option in option["entry"]:
|
|
|
index = option["entry"].index(entry_option)
|
|
|
key = list(entry_option.keys())[0]
|
|
|
value = list(entry_option.values())[0]
|
|
|
Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10)
|
|
|
if value is None:
|
|
|
continue
|
|
|
Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10)
|
|
|
Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW)
|
|
|
Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW)
|
|
|
toplevel.attributes("-topmost", True)
|
|
|
return toplevel
|
|
|
|
|
|
def app_packet(self):
|
|
|
"""
|
|
|
应用层封包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "app":
|
|
|
messagebox.showerror("注意", "封包顺序出错!")
|
|
|
return
|
|
|
SourceAppAddr = StringVar()
|
|
|
TargetAppAddr = StringVar()
|
|
|
def packet():
|
|
|
toplevel.destroy()
|
|
|
source_app_addr = SourceAppAddr.get()
|
|
|
target_app_addr = TargetAppAddr.get()
|
|
|
if source_app_addr == self.sendObj.interface[0]["addr"] \
|
|
|
and target_app_addr == self.receiveObj.interface[0]["addr"]:
|
|
|
|
|
|
# 动画
|
|
|
# app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app")
|
|
|
# self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1)
|
|
|
# self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1)
|
|
|
|
|
|
self.message = target_app_addr + "&" + self.packet["tag"]
|
|
|
self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
|
|
|
|
|
|
toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}],
|
|
|
"command": packet})
|
|
|
|
|
|
def app_unpack(self):
|
|
|
"""
|
|
|
应用层解包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "app":
|
|
|
messagebox.showerror("注意", "解包顺序出错!")
|
|
|
return
|
|
|
TargetAppAddr = StringVar()
|
|
|
def packet():
|
|
|
toplevel.destroy()
|
|
|
target_app_addr = TargetAppAddr.get()
|
|
|
if target_app_addr == self.receiveObj.interface[0]["addr"]:
|
|
|
self.message = self.message.split("&")[1]
|
|
|
self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
|
|
|
toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}],
|
|
|
"command": packet})
|
|
|
|
|
|
def trans_packet(self):
|
|
|
"""
|
|
|
传输层封包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "trans":
|
|
|
messagebox.showerror("注意", "封包顺序出错!")
|
|
|
return
|
|
|
SentPort = StringVar()
|
|
|
RcptPort = StringVar()
|
|
|
SplitCount = IntVar()
|
|
|
|
|
|
def packet():
|
|
|
|
|
|
sent_port = SentPort.get()
|
|
|
rcpt_port = RcptPort.get()
|
|
|
split_count = SplitCount.get()
|
|
|
count = split_appdata(self.packet["size"])
|
|
|
print(sent_port, self.sendObj.interface[0]["conn_port"])
|
|
|
if sent_port == self.sendObj.interface[0]["conn_port"] \
|
|
|
and rcpt_port == self.receiveObj.interface[0]["conn_port"]\
|
|
|
and split_count == count:
|
|
|
toplevel.destroy()
|
|
|
# 动画
|
|
|
# self.canvas.delete(self.packet_rect)
|
|
|
# self.canvas.delete("app")
|
|
|
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]),
|
|
|
# width=self.packet_init_width + 50, height=self.packet_height,
|
|
|
# text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black",
|
|
|
# tag="packet")
|
|
|
# trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26",
|
|
|
# "white", tag="trans")
|
|
|
# self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1)
|
|
|
# self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1)
|
|
|
|
|
|
|
|
|
self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port))
|
|
|
self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)])))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!")
|
|
|
|
|
|
toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort},
|
|
|
{"接收主机端口:": RcptPort},
|
|
|
{"拆包数量": SplitCount},
|
|
|
{"每个包约2048": None}],
|
|
|
"command": packet})
|
|
|
|
|
|
def trans_unpack(self):
|
|
|
"""
|
|
|
传输层解包
|
|
|
"""
|
|
|
print(self.packet_step)
|
|
|
print(self.step)
|
|
|
if self.packet_step[self.step] != "trans":
|
|
|
messagebox.showerror("注意", "解包顺序出错!")
|
|
|
return
|
|
|
RcptPort = StringVar()
|
|
|
|
|
|
def packet():
|
|
|
rcpt_port = RcptPort.get()
|
|
|
if rcpt_port == self.receiveObj.interface[0]["conn_port"]:
|
|
|
toplevel.destroy()
|
|
|
data = AgreementUtil.parse_udp_packet(self.message)
|
|
|
print(data)
|
|
|
self.message = data[-1]
|
|
|
self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!")
|
|
|
|
|
|
toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}],
|
|
|
"command": packet})
|
|
|
|
|
|
def ip_packet(self):
|
|
|
"""
|
|
|
网络层封包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "ip":
|
|
|
messagebox.showerror("注意", "封包顺序出错!")
|
|
|
return
|
|
|
SourceIP = StringVar()
|
|
|
TargetIP = StringVar()
|
|
|
def packet():
|
|
|
source_ip = SourceIP.get()
|
|
|
target_ip = TargetIP.get()
|
|
|
if source_ip == self.sendObj.interface[0]["ip"] \
|
|
|
and target_ip == self.receiveObj.interface[0]["ip"]:
|
|
|
toplevel.destroy()
|
|
|
self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip)
|
|
|
self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!")
|
|
|
toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP:": SourceIP},
|
|
|
{"接收主机IP:": TargetIP}],
|
|
|
"command": packet})
|
|
|
|
|
|
def ip_unpack(self):
|
|
|
"""
|
|
|
网络层解包
|
|
|
"""
|
|
|
print(self.packet_step)
|
|
|
print(self.step)
|
|
|
if self.packet_step[self.step] != "ip":
|
|
|
messagebox.showerror("注意", "解包顺序出错!")
|
|
|
return
|
|
|
RcptIP = StringVar()
|
|
|
|
|
|
def packet():
|
|
|
rcpt_ip = RcptIP.get()
|
|
|
if rcpt_ip == self.receiveObj.interface[0]["ip"]:
|
|
|
toplevel.destroy()
|
|
|
data = AgreementUtil.parse_ip_packet(self.message)
|
|
|
print(data)
|
|
|
self.message = data[-1]
|
|
|
self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!")
|
|
|
|
|
|
toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP:": RcptIP}],
|
|
|
"command": packet})
|
|
|
|
|
|
def mac_packet(self):
|
|
|
"""
|
|
|
链路层封包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "mac":
|
|
|
messagebox.showerror("注意", "封包顺序出错!")
|
|
|
return
|
|
|
SentMac = StringVar()
|
|
|
RcptMac = StringVar()
|
|
|
def packet():
|
|
|
sent_mac = SentMac.get()
|
|
|
rcpt_mac = RcptMac.get()
|
|
|
if sent_mac == self.sendObj.interface[0]["mac"] \
|
|
|
and rcpt_mac == self.receiveObj.interface[0]["mac"]:
|
|
|
toplevel.destroy()
|
|
|
self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac)
|
|
|
self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!")
|
|
|
toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC:": SentMac},
|
|
|
{"接收主机MAC:": RcptMac}],
|
|
|
"command": packet})
|
|
|
|
|
|
def mac_unpack(self):
|
|
|
"""
|
|
|
链路层解包
|
|
|
"""
|
|
|
if self.packet_step[self.step] != "mac":
|
|
|
messagebox.showerror("注意", "解包顺序出错!")
|
|
|
return
|
|
|
RcptMac = StringVar()
|
|
|
def packet():
|
|
|
rcpt_mac = RcptMac.get()
|
|
|
if rcpt_mac == self.receiveObj.interface[0]["mac"]:
|
|
|
toplevel.destroy()
|
|
|
data = AgreementUtil.parse_ethernet_frame(self.packet["tag"])
|
|
|
print(data)
|
|
|
self.message = data[-1]
|
|
|
self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message))
|
|
|
self.step += 1
|
|
|
else:
|
|
|
messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!")
|
|
|
toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC:": RcptMac}],
|
|
|
"command": packet})
|
|
|
|
|
|
|
|
|
class AppData:
|
|
|
def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas):
|
|
|
self.obj_id = obj_id
|
|
|
self.id_key = id_key
|
|
|
self.app_pack_id = app_pack_id
|
|
|
self.app_pack_size = app_pack_size
|
|
|
self.app_pack_tag = app_pack_tag
|
|
|
self.source_app_addr = source_app_addr
|
|
|
self.target_app_addr = target_app_addr
|
|
|
self.app_packed_string = app_packed_string
|
|
|
self.timestamp = timestamp
|
|
|
|
|
|
|
|
|
def pack(self):
|
|
|
# 为了简化,我们打包成一个字典
|
|
|
return vars(self)
|
|
|
|
|
|
@staticmethod
|
|
|
def unpack(packed_data):
|
|
|
# 解包为AppData对象
|
|
|
return AppData(**packed_data)
|
|
|
|
|
|
class TransData:
|
|
|
def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp):
|
|
|
self.obj_id = obj_id
|
|
|
self.id_key = id_key
|
|
|
self.trans_pack_id = trans_pack_id
|
|
|
self.trans_seq = trans_seq
|
|
|
self.trans_tag = trans_tag
|
|
|
self.app_pack_id = app_pack_id
|
|
|
self.sent_port = sent_port
|
|
|
self.rcpt_port = rcpt_port
|
|
|
self.source_app_addr = source_app_addr
|
|
|
self.target_app_addr = target_app_addr
|
|
|
self.trans_packed_string = trans_packed_string
|
|
|
self.timestamp = timestamp
|
|
|
|
|
|
def pack(self):
|
|
|
return vars(self)
|
|
|
|
|
|
@staticmethod
|
|
|
def unpack(packed_data):
|
|
|
return TransData(**packed_data)
|
|
|
|
|
|
|
|
|
class IPData:
|
|
|
def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string):
|
|
|
self.ObjID = obj_id
|
|
|
self.IDkey = id_key
|
|
|
self.IPPackID = ip_pack_id
|
|
|
self.TransPackID = trans_pack_id
|
|
|
self.SourceIP = source_ip
|
|
|
self.TargetIP = target_ip
|
|
|
self.SourceAppAddr = source_app_addr
|
|
|
self.TargetAppAddr = target_app_addr
|
|
|
self.IPPackedString = ip_packed_string
|
|
|
|
|
|
def pack(self):
|
|
|
return vars(self)
|
|
|
|
|
|
@staticmethod
|
|
|
def unpack(packed_data):
|
|
|
return IPData(**packed_data)
|
|
|
|
|
|
|
|
|
class MACData:
|
|
|
def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string):
|
|
|
self.ObjID = obj_id
|
|
|
self.IDkey = id_key
|
|
|
self.MacPackID = mac_pack_id
|
|
|
self.IPPackID = ip_pack_id
|
|
|
self.SentMAC = sent_mac
|
|
|
self.RcptMAC = rcpt_mac
|
|
|
self.SourceIP = source_ip
|
|
|
self.TargetIP = target_ip
|
|
|
self.SourceAppAddr = source_app_addr
|
|
|
self.TargetAppAddr = target_app_addr
|
|
|
self.MacPackedString = mac_packed_string
|
|
|
|
|
|
def pack(self):
|
|
|
return vars(self)
|
|
|
|
|
|
@staticmethod
|
|
|
def unpack(packed_data):
|
|
|
return MACData(**packed_data)
|
|
|
|
|
|
|
|
|
def split_appdata(AppPackSize):
|
|
|
MTU = 2048
|
|
|
return AppPackSize // MTU + (AppPackSize % MTU > 0)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
# 假设的最大传输单元(MTU)
|
|
|
MTU = 2048 # bytes
|
|
|
|
|
|
# 模拟的数据和数据大小
|
|
|
SIMULATED_DATA = "hello"
|
|
|
SIMULATED_SIZE = 2049 # 10 MB
|
|
|
|
|
|
# 创建应用层数据包
|
|
|
app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time())
|
|
|
packed_app_data = app_packet.pack()
|
|
|
print(packed_app_data)
|
|
|
|