import time from SimObjs import * from tkinter import * from tkinter import messagebox from PIL import ImageTk, Image from AgreementUtil import AgreementUtil def create_label(canvas: Canvas, x, y, width, height, text, rect_color, font_color, tag): canvas.create_rectangle(x, y, x + width, y + height, fill=rect_color, tags=tag) canvas.create_text(x + width / 2, y + height / 2, text=text, fill=font_color, tags=tag) return tag class PacketWindow(Toplevel): def __init__(self, master, packet, packet_step=["app", "trans", "ip", "mac"], *args, **kwargs): """ packet:{"packet": True, "mode": {"app": True, "trans": True, "ip": True, "mac": True}} """ super().__init__(master, *args, **kwargs) self.master = master self.width = 300 self.height = 400 self.geometry(f"{self.width}x{self.height}+200+200") self.attributes("-topmost", True) self.resizable(width=False, height=False) self.packet_button_str = {"app": "应用层", "ip": "网络层", "mac": "链路层", "trans": "传输层"} if not packet["packet"]: packet["mode"] = packet["mode"][::-1] # True 为封包 False 为解包 self.packet_step = packet_step self.step = 0 self.packet: dict[str: bool] = packet self.sendObj: SimHost = packet["sendObj"] self.receiveObj: SimHost = packet["receiveObj"] self.packet_option = {"app": {"color": "#ff3b26", "command": self.app_packet if packet["packet"] else self.app_unpack}, "ip": {"color": "#00a2f2", "command": self.ip_packet if packet["packet"] else self.ip_unpack}, "mac": {"color": "#46707a", "command": self.mac_packet if packet["packet"] else self.mac_unpack}, "trans": {"color": "#710a00", "command": self.trans_packet if packet["packet"] else self.trans_unpack}} self.canvas = Canvas(self, width=self.width, height=self.height) self.canvas.place(x=0, y=0, anchor='nw') self.packet_height, self.packet_init_width = 25, 100 self.background_img = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height))) self.canvas.create_image(0, 0, image=self.background_img, anchor=NW) # self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, 10, width=self.packet_init_width, height=self.packet_height, text=self.packet["tag"], rect_color="#dee1e6", font_color="black", tag="packet") self.step_y = {} self.message = packet["tag"] self.create_widget() def move_to(self, tag, target_x, target_y, speed=10): # 获取当前位置 current_coords = self.canvas.coords(tag) if len(current_coords) < 2: return # 如果没有坐标,就退出函数 current_x, current_y = current_coords[0], current_coords[1] # 计算移动方向 delta_x = (target_x if target_x else current_x) - current_x delta_y = (target_y if target_y else current_y) - current_y # 计算下一步的位置 next_x = current_x + (speed if delta_x > 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x) next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y) # 移动对象 self.canvas.move(tag, next_x - current_x, next_y - current_y) # 如果对象还没有到达目标,继续移动 if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y): self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed)) def create_widget(self): mode_text = "封包" if self.packet["packet"] else "解包" num, margin_top, button_width, button_height = 1, 30, 120, 40 for data in self.packet["mode"]: key = list(data.keys())[0] value = list(data.values())[0] if value: Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"], font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1), width=button_width, height=button_height) num += 1 Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW) Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW) def send_packet(self): if self.step != len(self.packet_step): messagebox.showerror("注意", "尚未完成{}!".format("封包" if self.packet["packet"] else "解包")) return self.packet["send"](self.message) self.destroy() def create_window(self, option): toplevel = Toplevel(self) toplevel.title(option["title"]) toplevel.geometry("450x220+300+300") for entry_option in option["entry"]: index = option["entry"].index(entry_option) key = list(entry_option.keys())[0] value = list(entry_option.values())[0] Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10) if value is None: continue Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10) Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW) Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW) toplevel.attributes("-topmost", True) return toplevel def app_packet(self): """ 应用层封包 """ if self.packet_step[self.step] != "app": messagebox.showerror("注意", "封包顺序出错!") return SourceAppAddr = StringVar() TargetAppAddr = StringVar() def packet(): toplevel.destroy() source_app_addr = SourceAppAddr.get() target_app_addr = TargetAppAddr.get() if source_app_addr == self.sendObj.interface[0]["addr"] \ and target_app_addr == self.receiveObj.interface[0]["addr"]: # 动画 # app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app") # self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1) # self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1) self.message = target_app_addr + "&" + self.packet["tag"] self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}], "command": packet}) def app_unpack(self): """ 应用层解包 """ if self.packet_step[self.step] != "app": messagebox.showerror("注意", "解包顺序出错!") return TargetAppAddr = StringVar() def packet(): toplevel.destroy() target_app_addr = TargetAppAddr.get() if target_app_addr == self.receiveObj.interface[0]["addr"]: self.message = self.message.split("&")[1] self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}], "command": packet}) def trans_packet(self): """ 传输层封包 """ if self.packet_step[self.step] != "trans": messagebox.showerror("注意", "封包顺序出错!") return SentPort = StringVar() RcptPort = StringVar() SplitCount = IntVar() def packet(): sent_port = SentPort.get() rcpt_port = RcptPort.get() split_count = SplitCount.get() count = split_appdata(self.packet["size"]) print(sent_port, self.sendObj.interface[0]["conn_port"]) if sent_port == self.sendObj.interface[0]["conn_port"] \ and rcpt_port == self.receiveObj.interface[0]["conn_port"]\ and split_count == count: toplevel.destroy() # 动画 # self.canvas.delete(self.packet_rect) # self.canvas.delete("app") # self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]), # width=self.packet_init_width + 50, height=self.packet_height, # text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black", # tag="packet") # trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26", # "white", tag="trans") # self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1) # self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1) self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port)) self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)]))) self.step += 1 else: messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort}, {"接收主机端口:": RcptPort}, {"拆包数量": SplitCount}, {"每个包约2048": None}], "command": packet}) def trans_unpack(self): """ 传输层解包 """ print(self.packet_step) print(self.step) if self.packet_step[self.step] != "trans": messagebox.showerror("注意", "解包顺序出错!") return RcptPort = StringVar() def packet(): rcpt_port = RcptPort.get() if rcpt_port == self.receiveObj.interface[0]["conn_port"]: toplevel.destroy() data = AgreementUtil.parse_udp_packet(self.message) print(data) self.message = data[-1] self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}], "command": packet}) def ip_packet(self): """ 网络层封包 """ if self.packet_step[self.step] != "ip": messagebox.showerror("注意", "封包顺序出错!") return SourceIP = StringVar() TargetIP = StringVar() def packet(): source_ip = SourceIP.get() target_ip = TargetIP.get() if source_ip == self.sendObj.interface[0]["ip"] \ and target_ip == self.receiveObj.interface[0]["ip"]: toplevel.destroy() self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip) self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP:": SourceIP}, {"接收主机IP:": TargetIP}], "command": packet}) def ip_unpack(self): """ 网络层解包 """ print(self.packet_step) print(self.step) if self.packet_step[self.step] != "ip": messagebox.showerror("注意", "解包顺序出错!") return RcptIP = StringVar() def packet(): rcpt_ip = RcptIP.get() if rcpt_ip == self.receiveObj.interface[0]["ip"]: toplevel.destroy() data = AgreementUtil.parse_ip_packet(self.message) print(data) self.message = data[-1] self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP:": RcptIP}], "command": packet}) def mac_packet(self): """ 链路层封包 """ if self.packet_step[self.step] != "mac": messagebox.showerror("注意", "封包顺序出错!") return SentMac = StringVar() RcptMac = StringVar() def packet(): sent_mac = SentMac.get() rcpt_mac = RcptMac.get() if sent_mac == self.sendObj.interface[0]["mac"] \ and rcpt_mac == self.receiveObj.interface[0]["mac"]: toplevel.destroy() self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac) self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC:": SentMac}, {"接收主机MAC:": RcptMac}], "command": packet}) def mac_unpack(self): """ 链路层解包 """ if self.packet_step[self.step] != "mac": messagebox.showerror("注意", "解包顺序出错!") return RcptMac = StringVar() def packet(): rcpt_mac = RcptMac.get() if rcpt_mac == self.receiveObj.interface[0]["mac"]: toplevel.destroy() data = AgreementUtil.parse_ethernet_frame(self.packet["tag"]) print(data) self.message = data[-1] self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message)) self.step += 1 else: messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!") toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC:": RcptMac}], "command": packet}) class AppData: def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas): self.obj_id = obj_id self.id_key = id_key self.app_pack_id = app_pack_id self.app_pack_size = app_pack_size self.app_pack_tag = app_pack_tag self.source_app_addr = source_app_addr self.target_app_addr = target_app_addr self.app_packed_string = app_packed_string self.timestamp = timestamp def pack(self): # 为了简化,我们打包成一个字典 return vars(self) @staticmethod def unpack(packed_data): # 解包为AppData对象 return AppData(**packed_data) class TransData: def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp): self.obj_id = obj_id self.id_key = id_key self.trans_pack_id = trans_pack_id self.trans_seq = trans_seq self.trans_tag = trans_tag self.app_pack_id = app_pack_id self.sent_port = sent_port self.rcpt_port = rcpt_port self.source_app_addr = source_app_addr self.target_app_addr = target_app_addr self.trans_packed_string = trans_packed_string self.timestamp = timestamp def pack(self): return vars(self) @staticmethod def unpack(packed_data): return TransData(**packed_data) class IPData: def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string): self.ObjID = obj_id self.IDkey = id_key self.IPPackID = ip_pack_id self.TransPackID = trans_pack_id self.SourceIP = source_ip self.TargetIP = target_ip self.SourceAppAddr = source_app_addr self.TargetAppAddr = target_app_addr self.IPPackedString = ip_packed_string def pack(self): return vars(self) @staticmethod def unpack(packed_data): return IPData(**packed_data) class MACData: def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string): self.ObjID = obj_id self.IDkey = id_key self.MacPackID = mac_pack_id self.IPPackID = ip_pack_id self.SentMAC = sent_mac self.RcptMAC = rcpt_mac self.SourceIP = source_ip self.TargetIP = target_ip self.SourceAppAddr = source_app_addr self.TargetAppAddr = target_app_addr self.MacPackedString = mac_packed_string def pack(self): return vars(self) @staticmethod def unpack(packed_data): return MACData(**packed_data) def split_appdata(AppPackSize): MTU = 2048 return AppPackSize // MTU + (AppPackSize % MTU > 0) if __name__ == '__main__': # 假设的最大传输单元(MTU) MTU = 2048 # bytes # 模拟的数据和数据大小 SIMULATED_DATA = "hello" SIMULATED_SIZE = 2049 # 10 MB # 创建应用层数据包 app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time()) packed_app_data = app_packet.pack() print(packed_app_data)