diff --git a/AgreementUtil.py b/AgreementUtil.py new file mode 100644 index 0000000..d9e330f --- /dev/null +++ b/AgreementUtil.py @@ -0,0 +1,107 @@ +import socket +import struct + +def format_mac_address(mac_bytes): + """ + Format a MAC address from bytes to a human-readable string. + + Args: + mac_bytes (bytes): The MAC address in bytes format. + + Returns: + str: The MAC address in human-readable format. + """ + mac_str = mac_bytes.hex() + mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2)) + return mac_formatted + +class AgreementUtil(): + + @staticmethod + def create_udp_packet(data, dest_port, src_port): + """ + 创建UDP数据包 + Returns: + bytes: `UDP数据包`. + """ + # UDP header fields + udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length + return udp_header + data.encode() + + @staticmethod + def create_ip_packet(udp_packet, dest_ip, src_ip): + """ + 创建IP数据包 + Returns: + bytes: IP数据包 + """ + # IP header fields + version = 4 + ihl = 5 # Internet Header Length + type_of_service = 0 + total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length + identification = 54321 + flags = 0 # Don't fragment + fragment_offset = 0 + ttl = 64 # Time to live + protocol = 17 # UDP + checksum = 0 # For simplicity, set to 0 + source_ip = socket.inet_aton(src_ip) + dest_ip = socket.inet_aton(dest_ip) + # IP数据包头部字段 + ip_header = struct.pack('!BBHHHBBH4s4s', + (version << 4) + ihl, type_of_service, total_length, + identification, (flags << 13) + fragment_offset, + ttl, protocol, checksum, source_ip, dest_ip) + return ip_header + udp_packet + + @staticmethod + def create_ethernet_frame(ip_packet, dest_mac, src_mac): + """ + 创建以太网帧数据包 + Returns: + bytes: 以太网帧数据包 + """ + # Convert MAC addresses from string format to bytes + dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', '')) + src_mac_bytes = bytes.fromhex(src_mac.replace(':', '')) + # Ethernet type for IP (0x0800) + ethernet_type = 0x0800 + # Pack Ethernet frame fields + ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type) + return ethernet_frame + ip_packet + + @staticmethod + def parse_ethernet_frame(frame): + """ + 解析以太网帧数据包 + Returns: 发送端MAC,接收端MAC,以太网类型,IP数据包 + """ + dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14]) + payload = frame[14:] + return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload + + @staticmethod + def parse_ip_packet(packet): + """ + 解析 IP 数据包 + Returns: 发送端IP,接收端IP,协议,UDP数据包 + """ + version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \ + ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20]) + payload = packet[20:] + return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload + + @staticmethod + def parse_udp_packet(packet): + """ + 解析UDP数据包 + Returns: + tuple: 发送主机端口,接收主机端口,数据 + """ + src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8]) + data = packet[8:] + return src_port, dest_port, data.decode() + + + diff --git a/NetworkAnalog.py b/NetworkAnalog.py new file mode 100644 index 0000000..8f350b4 --- /dev/null +++ b/NetworkAnalog.py @@ -0,0 +1,421 @@ +import ipaddress +import sys +import threading +from tkinter import filedialog + +from SimUtil import * + +from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase +from dbUtil import search, execute_sql, delete_obj, truncate_db +from PIL import Image as imim +from packet import * + + +class Message(Canvas): + def __init__(self, master, **kwargs): + super().__init__(master, **kwargs) + self.message = [] + self.master = master + self.scrollbar_y = tk.Scrollbar(master, orient="vertical", command=self.yview) + self.scrollbar_y.pack(side="right", fill=BOTH) + self.scrollbar_x = tk.Scrollbar(master, orient=HORIZONTAL, command=self.xview) + self.scrollbar_x.pack(side="bottom", fill=BOTH) + self.config(yscrollcommand=self.scrollbar_y.set, xscrollcommand=self.scrollbar_x.set) + + def show_message(self, message, color="#5fa8fe"): + for mes in message.split("\n"): + self.message.append({"message": mes, "color": color}) + num = 0 + self.delete("message") + for function in self.message: + self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15), + fill=function["color"], tags="message") + num += 1 + self.update() + self.configure(scrollregion=self.bbox("all")) + self.scrollbar_y.set(1.0, 1.0) + self.yview_moveto(1.0) + + +class NetWorkAnalog(Canvas): + def __init__(self, master, **kwargs): + super().__init__(master, **kwargs) + self.master = master + self.router_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40))) + self.switch_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40))) + self.hub_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40))) + self.host_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40))) + + self.width = int(self.cget("width")) + self.height = int(self.cget("height")) + self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)] + self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4)))) + self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45)))) + self.message_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30)) + ) + self.text_back_img = ImageTk.PhotoImage( + Image.open("../images/文字背景@3x.png").resize((155, 90)) + ) + message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30) + message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW) + self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30) + self.message.pack() + self.message.configure(bg="#edf8ff") + self.configure(bg="#ddeafe") + self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3]))) + self.create_image(0, 0, image=self.filename, anchor=NW) + self.chose = self.host_img + self.AllSimObjs = {} + self.conns = [] + self.drawLine = True # 画线标志 + self.line_start_obj = None + self.line_end_obj = None + self.line_start_ifs = None + self.line_end_ifs = None + self.chose_obj = None + self.show_label_flag = True + self.show_interface_flag = True + self.trans_obj = set() + self.create_widget() + + def is_chose(self): + """ + 当被选中时,绘制选中框 + :return: + """ + self.delete("rectangle") + self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15, + self.chose_obj.ObjY + 15, outline="red", tags="rectangle") + + def reload_data(self): + # todo: 加载上一次程序运行的数据 + """ + 加载上一次程序运行时的数据 + :return: + """ + self.AllSimObjs = {} + self.conns = [] + sim_obj_sql = "select * from sim_objs" + sim_obj_data = search(sim_obj_sql) + for index, row in sim_obj_data.iterrows(): # 初始化组件对象 + sim_type = row["ObjType"] + ObjX = row["ObjX"] + ObjY = row["ObjY"] + ConfigCorrect = row["ConfigCorrect"] + ObjLable = row["ObjLabel"] + ObjID = row["ObjID"] + if sim_type == 1: + tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + elif sim_type == 2: + tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + elif sim_type == 3: + tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + else: + tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + tag.create_img() + self.AllSimObjs[tag.ObjID] = tag + + sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id" + sim_conn_data = search(sim_conn_sql) + conn_datas = {} + for index, conn in sim_conn_data.iterrows(): + if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas: + conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])] + else: + conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"])) + for key, value in conn_datas.items(): + conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1], + self.AllSimObjs[value[1][0]], value[1][1], key[1]) + conn_obj.draw_line() + self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象 + self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj + self.conns.append(conn_obj) + conn_obj.ConfigCorrect = key[1] + + def show_obj(self, AllSimObj, conns): + self.AllSimObjs = AllSimObj + self.conns = conns + for key, sim_obj in self.AllSimObjs.items(): + sim_obj.create_img() + for conn in self.conns: + conn.draw_line() + + def send_packet(self): + """ + 发送数据包 + :return: + """ + hosts = {} + for key, tag in self.AllSimObjs.items(): + if tag.ObjType == 1 and tag.ConfigCorrect == 1: + hosts[tag.ObjLabel] = tag.ObjID + child2 = tk.Toplevel() + child2.title("数据包配置") + child2.geometry('392x306+200+110') + child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作 + tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10) + send_host = StringVar() + host_combobox = ttk.Combobox( + master=child2, # 父容器 + height=5, # 高度,下拉显示的条目数量 + width=18, # 宽度 + state='readonly', # readonly(只可选) + font=('', 16), # 字体 + textvariable=send_host, # 通过StringVar设置可改变的值 + values=list(hosts.keys()), # 设置下拉框的选项 + ) + + host_combobox.grid(row=0, column=1, pady=10) + tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示 + receive_host = StringVar() + ttk.Combobox( + master=child2, # 父容器 + height=5, # 高度,下拉显示的条目数量 + width=18, # 宽度 + state='readonly', # readonly(只可选) + font=('', 16), # 字体 + textvariable=receive_host, # 通过StringVar设置可改变的值 + values=list(hosts.keys()), # 设置下拉框的选项 + ).grid(row=1, column=1, pady=10) + tk.Label(child2, text='数据大小(kb):', font=('黑体', 16)).grid(row=2, column=0, sticky='w', + pady=10) # ,sticky='w'靠左显示 + packet_size = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar()) + packet_size.grid(row=2, column=1, pady=10) + tk.Label(child2, text='数据标签:', font=('黑体', 16)).grid(row=3, column=0, sticky='w', + pady=10) # ,sticky='w'靠左显示 + packet_label = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar()) + packet_label.grid(row=3, column=1, pady=10) + + def send(): + """ + 发送数据包 + :return: + """ + if send_host.get() == "" or receive_host.get() == "" or packet_size.get() == "" or packet_label.get() == "": + messagebox.showerror("注意", message="信息不能为空") + return + if send_host.get() == receive_host.get(): + messagebox.showerror("注意", message="发送主机和接收主机不能相同!") + send_message = """发送主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}""" + receive_message = """接收主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}""" + send_host_obj = self.AllSimObjs[hosts[send_host.get()]] + receive_host_obj = self.AllSimObjs[hosts[receive_host.get()]] + pack_label = packet_label.get() + pack_size = packet_size.get() + child2.destroy() + count = split_appdata(int(pack_size)) + self.show_top_message(send_host_obj, + message=send_message.format(send_host_obj.ObjLabel, send_host_obj.interface[0]["ip"], + send_host_obj.interface[0]["mac"], pack_size, 0, count)) + self.show_bottom_message(receive_host_obj, message=receive_message.format(receive_host_obj.ObjLabel, + receive_host_obj.interface[0][ + "ip"], + receive_host_obj.interface[0][ + "mac"], pack_size, 0, count)) + for obj in self.trans_obj: + self.delete(obj.ObjID + "detail_button") + self.delete(obj.ObjID + "detail") + self.unbind(obj.ObjID, "") + send_host_obj.create_packet(receive_host_obj, pack_label, pack_size) + + tk.Button(child2, text='开启模拟', font=('黑体', 16), height=1, command=send).grid(row=5, column=0, sticky='e', pady=10) + tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1, sticky='e', pady=10) + # 建立画出对象详情的函数 + def draw_detail(self, obj): + # 判断type对象来画出详情表 + if obj.ObjType == 1: # 当type对象为1 画出主机详情图 + app_color = ("#94D050", '#00B0F0') + trans_color = ("#94D050", '#00B0F0') + ip_color = ("#94D050", '#00B0F0') + mac_color = ("#94D050", '#00B0F0') + elif obj.ObjType == 2: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#94D050", '#00B0F0') + mac_color = ("#94D050", '#00B0F0') + elif obj.ObjType == 3: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#D3D3D3", "#D3D3D3") + mac_color = ("#94D050", '#00B0F0') + else: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#D3D3D3", "#D3D3D3") + mac_color = ("#94D050", '#00B0F0') + frist_x = obj.ObjX # 获取frist_x + frist_y = obj.ObjY # 获取frist_y + # 这里缺少一个删除其他详情的步骤 + # 画出连接详情表的线 + tag_id = obj.ObjID + "detail" + if len(self.gettags(tag_id)) > 0: + self.delete(tag_id) + else: + self.create_line((frist_x + 40, frist_y - 20), (frist_x + 80, frist_y - 20), + (frist_x + 90, frist_y - 60), + fill='#50abf8', tags=tag_id, width=2) + # 画出详情表 + self.create_image(frist_x + 50, frist_y - 140, image=self.text_back_img, anchor=NW, tags=tag_id) + # 画出相应的绿条数据 + self.create_text(frist_x + 70, frist_y - 130, text='应用层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 107, text='传输层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 84, text='IP 层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 62, text='链路层', tags=tag_id) + # 画出 右侧绿色和蓝色的类进度条 + # 应用层 + self.create_rectangle(frist_x + 197, frist_y - 135, frist_x + 170, frist_y - 120, fill=app_color[0], + outline=app_color[0], tags=tag_id) + self.create_rectangle(frist_x + 168, frist_y - 135, frist_x + 148, frist_y - 120, fill=app_color[1], + outline=app_color[1], tags=tag_id) + # 传输层 + self.create_rectangle(frist_x + 197, frist_y - 115, frist_x + 160, frist_y - 100, + fill=trans_color[0], + outline=trans_color[0], tags=tag_id) + self.create_rectangle(frist_x + 158, frist_y - 115, frist_x + 133, frist_y - 100, + fill=trans_color[1], + outline=trans_color[1], tags=tag_id) + # IP 层 + self.create_rectangle(frist_x + 197, frist_y - 93, frist_x + 150, frist_y - 78, fill=ip_color[0], + outline=ip_color[0], tags=tag_id) + self.create_rectangle(frist_x + 148, frist_y - 93, frist_x + 118, frist_y - 78, fill=ip_color[1], + outline=ip_color[1], tags=tag_id) + # 链路层 + self.create_rectangle(frist_x + 197, frist_y - 70, frist_x + 133, frist_y - 55, fill=mac_color[0], + outline=mac_color[0], tags=tag_id) + self.create_rectangle(frist_x + 131, frist_y - 70, frist_x + 98, frist_y - 55, fill=mac_color[1], + outline=mac_color[1], tags=tag_id) + + def trans_over(self, objs): + print("传输完成") + self.trans_obj = objs + for obj in objs: + obj.create_detail_button() + self.tag_bind(obj.ObjID, "", lambda event, obj=obj: self.draw_detail(obj)) # 绑定右击事件 + obj.is_packet = False + obj.is_un_packet = False + + def clear_canvas(self): + """ + 清除画布 + :return: + """ + ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?') + if ask == "no": + return + truncate_db() # 清除数据库 + for tag_id, tag in self.AllSimObjs.items(): + self.delete(tag_id) + self.delete(tag_id + "text") + for conn in self.conns: + conn.delete_line() + self.delete("rectangle") + self.AllSimObjs.clear() + self.conns.clear() + + def export_data(self): + try: + export = ExportUtil(sys.path[0] + "/database.xlsx") + export.export() + self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx")) + except Exception as E: + self.message.show_message(str(E), color="red") + + def import_data(self): + truncate_db() + for tag_id, tag in self.AllSimObjs.items(): + self.delete(tag_id) # 删除画布中的所有组件 + self.delete(tag_id + "text") + for conn in self.conns: + conn.delete_line() # 删除画布中的所有连接线 + self.delete("rectangle") + self.AllSimObjs.clear() # 删除当前所有组件对象 + self.conns.clear() # 删除所有连接配置 + file_path = filedialog.askopenfilename() # 获取用户选择的文件 + if file_path: + export = ExportUtil(file_path) + export.import_data() + self.message.show_message("文件导入成功!") + self.reload_data() + + def show_top_message(self, obj, message): + # todo: 显示网络配置信息 + """ + 显示网络配置信息 + :return: + """ + self.delete("netSet") + self.create_text(self.width * 0.82 + 120, 80, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'), + fill="#7030a0", tags="netSet") + self.create_text(self.width * 0.82 + 20, 80 + 25, text=message, + anchor="nw", font=('宋体', 14), tags="netSet") + + def show_bottom_message(self, obj, message): + # todo: 显示路由表交换表信息 + """ + 显示路由交换表信息 + :return: + """ + self.delete("routerSet") + self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'), + fill="#7030a0", tags="routerSet") + self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=message, + anchor="nw", font=('宋体', 14), tags="routerSet") + + def create_config_button(self): + # 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方 + menubar = tk.Menu(root) + # 定义一个空菜单单元 + setMenu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label='发送数据包', menu=setMenu) + setMenu.add_command(label='发送数据包', command=self.send_packet) + menubar.add_command(label='清除屏幕', command=self.clear_canvas) + menubar.add_command(label='导出数据', command=self.export_data) + menubar.add_command(label='导入数据', command=self.import_data) + root.config(menu=menubar) + + def create_widget(self): + # todo: 创建页面 + """ + 创建整体页面布局 + :return: + """ + self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标 + self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标 + self.create_text(self.width * 0.82 + 120, 30 + 15, text="发送主机详情", anchor="n", font=('微软雅黑', 18, 'bold')) + self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标 + # round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标 + self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="接收主机详情", anchor="n", font=('微软雅黑', 18, 'bold')) + # 显示左边的固定图片 + img_height, text_height, split_width = 50, 40, 120 + self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width, image=self.router_img, anchor="nw") + self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw") + self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw") + self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw") + + +if __name__ == '__main__': + root = Window() + root.title('网络拓扑图') + screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小 + screen_height = root.winfo_screenheight() + root_attr = { + "width": screen_width * 0.83, + "height": screen_height * 0.85, + } + size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2, + (screen_height - root_attr['height']) / 2 - 30) + canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white") + canvas.place(x=0, y=0, anchor='nw') + canvas.create_config_button() + canvas.reload_data() + root.geometry(size) + root.mainloop() diff --git a/SimObjs.py b/SimObjs.py new file mode 100644 index 0000000..47c7f04 --- /dev/null +++ b/SimObjs.py @@ -0,0 +1,600 @@ +import math +import sys +import threading +from time import sleep +from tkinter import Tk +from tkinter import messagebox + +from ttkbootstrap import * +from uuid import uuid4 +import ipaddress +import time + +from PIL import ImageTk, Image, ImageFont + +from dbUtil import search +from packet import * + +pause_event = threading.Event() + +class SimBase(): + # todo: 组件父类 + """ + 图标类,所有组件的父类 + """ + def __init__(self, canvas, x, y, id, config=None, label=None): + self.ConfigCorrect = 0 if config is None else config # 是否进行配置 + self.ObjType = None # 组件类型 1->主机,2->路由器,3->交换机,4->集线器 + self.ObjLabel = "" + self.ObjID = id + self.ObjX = x + self.ObjY = y + self.canvas = canvas + self.is_packet = False + self.is_un_packet = False + self.host_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40))) + self.host_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机--暗色@2x.png").resize((40, 40))) + self.router_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40))) + self.router_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器--暗色@2x.png").resize((40, 40))) + self.switch_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40))) + self.switch_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机--暗色@2x.png").resize((40, 40))) + self.hub_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40))) + self.hub_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器--暗色@2x.png").resize((40, 40))) + self.img = None + self.img_tm = None + self.interface = [{}, {}, {}, {}] + self.connections = ["1", "2", "3", "4"] + self.set_default_config() + self.packet_window = None + + def set_default_name(self): + data_frame = search(f"select objid from sim_objs where objType={self.ObjType}") + num = data_frame.size + 1 + if isinstance(self, SimHost): + name = "SHO%d" % num + elif isinstance(self, SimRouter): + name = "SRO%d" % num + elif isinstance(self, SimSwitch): + name = "SWI%d" % num + else: + name = "SHUB%d" % num + return name + + def create_img(self): + """ + 创建图片 + :return: + """ + if self.ObjType == 1: + self.img = self.host_img + self.img_tm = self.host_img_tm + elif self.ObjType == 2: + self.img = self.router_img + self.img_tm = self.router_img_tm + elif self.ObjType == 3: + self.img = self.switch_img + self.img_tm = self.switch_img_tm + else: + self.img = self.hub_img + self.img_tm = self.hub_img_tm + self.canvas.delete("L") + id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30, + image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw", + tags=self.ObjID) + self.canvas.dtag("L", "L") + self.canvas.create_text(self.ObjX - 20, self.ObjY - 60, text=self.ObjLabel, font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + if self.ObjType == 1: + self.canvas.create_text(self.ObjX - 45, self.ObjY + 15, text=self.interface[0]["ip"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.create_text(self.ObjX - 45, self.ObjY + 35, text=self.interface[0]["mac"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.create_text(self.ObjX - 45, self.ObjY + 55, text=self.interface[0]["addr"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.tag_raise(id) + + def set_default_config(self): + sql = f"select * from conn_config where node_id='{self.ObjID}'" + conn_data = search(sql) + for index, conn in conn_data.iterrows(): + self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"], + "mac": conn["mac"], + "conn_port": conn["conn_port"], + "addr": conn["addr"]} + + def get_config(self): + sql = f"select * from conn_config where node_id='{self.ObjID}'" + conn_data = search(sql) + return conn_data + + def transfer_animate(self, status, packet, packet_size, error_message=None): + if status: + text = f"消息大小: {packet_size}\n" \ + f"消息内容: {packet}" + self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20, + outline="#92d050", + width=3, fill="#92d050", tags=self.ObjID + "packetData") + self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", + font=('', 10), tags=self.ObjID + "packetData") # 显示文字 + self.canvas.update() + sleep(2) + self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 + else: + text = f"传输失败\n" if error_message is None else error_message + self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red", + width=3, fill="red", tags=self.ObjID + "packetData") + self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", + font=('', 10), tags=self.ObjID + "packetData") # 显示文字 + self.canvas.update() + sleep(2) + self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 + + def create_detail_button(self): + frist_x = self.ObjX # 获取frist_x + frist_y = self.ObjY # 获取frist_y + tag_id = self.ObjID + "detail_button" + self.canvas.create_rectangle(frist_x + 10, frist_y - 25, frist_x + 40, frist_y - 15, fill="#f0a732", + outline="#47b2ff",width=2, tags=tag_id) + + def __str__(self): + str = "" + config = self.get_config() + for index, data in config.iterrows(): + str += f"【接口{data['node_ifs']}】\n" + str += f"IP: {data['ip']}\n" + str += f"MAC: {data['mac']}\n" + return str + + +class SimPacket(): + # todo: 数据包类 + """ + 数据包类 + """ + def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message, label, size, sendObj, receiveObj): + """ + :param source_mac: 源主机mac地址 + :param source_ip: 源主机ip地址 + :param destination_mac: 目的主机mac地址 + :param destination_ip: 目的主机ip地址 + :param message: 数据 + """ + self.source_mac = source_mac + self.source_ip = source_ip + self.sendObj = sendObj + self.receiveObj = receiveObj + self.destination_mac = destination_mac + self.destination_ip = destination_ip + self.message = message + self.label = label + self.size = size + self.up_jump = None # 上一跳连接对象 + self.objs = set() + self.img = None + self.id = str(uuid4()) + self.process_img() + + def process_img(self): + img = Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)) + draw = ImageDraw.Draw(img) + font = ImageFont.truetype("arial.ttf", size=16) + text_width, text_height = draw.textsize(str(self.label), font=font) + # 计算文本在右上角的位置 + x = img.width - text_width - 10 # 右上角,距离右边缘10像素 + y = 10 # 距离上边缘10像素 + # 绘制文本 + draw.text((x, y), str(self.label), font=font, fill=(255, 0, 0)) # 红色字体 + self.img = ImageTk.PhotoImage(img) + + def move(self, id, cv, target_x, target_y, duration): + start_x, start_y = cv.coords(id) + distance_x = target_x - start_x + distance_y = target_y - start_y + steps = duration // 10 # 以10毫秒为间隔进行移动 + step_x = distance_x / steps + step_y = distance_y / steps + self._move_step(id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps) + + def _move_step(self,id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps): + if pause_event.is_set(): + new_x = start_x + step_x + new_y = start_y + step_y + sleep(2) + self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps) + if steps > 0: + new_x = start_x + step_x + new_y = start_y + step_y + cv.coords(id, new_x, new_y) + cv.update() # 更新画布显示 + sleep(0.01) # 添加延迟以控制动画速度 + self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1) + else: + cv.coords(id, target_x, target_y) + cv.delete(id) + + def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase): + self.objs.add(nodex_tag) + self.objs.add(nodey_tag) + cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id) + self.move(self.id, cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500) + + +class AllSimConnect(): + # todo: 连接类 + def __init__(self, canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None): + """ + 连接对象 + :param nodex: 节点 + :param nodex_ifs: 节点接口 + :param nodey: 节点 + :param nodey_ifs: 节点接口 + """ + self.canvas = canvas + self.ConfigCorrect = 0 if config is None else config + self.NobjS = nodex + self.NobjE = nodey + self.IfsS = nodex_ifs + self.IfsE = nodey_ifs + self.width = False if self.ConfigCorrect == 0 else True # 线的粗细 + + def transfer(self, source_node, packet: SimPacket): + """ + 传输数据 + :param packet: 数据包 + :return: + """ + if source_node == self.NobjS: + packet.transfer_packet(self.canvas, self.NobjS, self.NobjE) + self.NobjE.receive(packet) + else: + packet.transfer_packet(self.canvas, self.NobjE, self.NobjS) + self.NobjS.receive(packet) + + + def draw_line(self): + if self.width: + line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY, + width=2, fill="#c372f0", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line") + else: + line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY, + width=2, dash=(250, 4), fill="#c372f0", + tags=self.NobjS.ObjID + self.NobjE.ObjID + "line") + self.canvas.tag_lower(line, 2) + + +class SimHost(SimBase): + # todo: 主机类 + """ + 主机类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 1 + self.ObjLabel = label if label is not None else self.set_default_name() + self.interface = [{}] + self.connections = [None] + self.set_default_config() + + def packet_ok(self, receiveObj, label, size): + count = split_appdata(int(size)) + for i in range(2, count + 1): + pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], + receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], label, i, size, self, + receiveObj) + threading.Timer(1 * (i - 1), function=self.send, args=(pack,), ).start() + + def create_packet(self, receiveObj, label, size): + """ + 创建数据包 + :param receiveObj: 目的主机对象 + :param message: 消息 + :return: + """ + def send(message): + print(message) + pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], + receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], message, 1, size, self, + receiveObj) + self.send(pack) + message = """发送主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}""" + self.packet_window = PacketWindow(self.canvas, packet={"packet": True, + "size": int(size), + "sendObj": self, + "receiveObj": receiveObj, + "tag": label, + "send": send, + "mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]}) + self.packet_window.title("{} 封包".format(self.ObjLabel)) + + def send(self, packet): + """ + 发送数据包 + :param packet: + :return: + """ + connection: AllSimConnect = self.connections[0] + print(f"数据包从 {self.ObjLabel} 发出") + packet.up_jump = connection + connection.transfer(self, packet) + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + data = "" + def receive(message): + data = message + self.is_un_packet = True + packet.sendObj.packet_ok(self, data, int(packet.size)) + if not self.is_un_packet: + PacketWindow(self.canvas, {"packet": False, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": self, + "tag": packet.message, + "send": receive, + "mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]}, packet_step=["app", "trans", "ip", "mac"][::-1]) + size = int(packet.size) + message = """接收主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}""" + self.canvas.show_bottom_message(self, message=message.format(self.ObjLabel, self.interface[0]["ip"], + self.interface[0]["mac"], + size, packet.label, split_appdata(size))) + if split_appdata(size) == packet.label: + if packet.destination_ip == self.interface[0]["ip"]: + self.transfer_animate(True, packet.message, size) + self.canvas.message.show_message(f"{self.ObjLabel}接收到数据:{data}") + else: + self.transfer_animate(False, data, size) + self.canvas.trans_over(packet.objs) + + + def __str__(self): + str = "" + config = self.get_config() + for index, data in config.iterrows(): + str += f"【接口{data['node_ifs']}】\n" + str += f"AppAddr: /Root\n" + str += f"PORT: {data['conn_port']}\n" + str += f"IP: {data['ip']}\n" + str += f"MAC: {data['mac']}\n" + return str + + +class SimRouter(SimBase): + # todo: 路由类 + """ + 路由类 + """ + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 2 + self.ObjLabel = label if label is not None else self.set_default_name() + self.router_table = {} + self.set_default_router_table() + + def set_default_router_table(self): + """ + 将数据库中的路由表信息提取 + :return: + """ + sql = f"select * from router_table where obj_id='{self.ObjID}'" + router_tables = search(sql) + for index, router_table in router_tables.iterrows(): + if router_table["node_ifs"] in self.router_table: + self.router_table[router_table["node_ifs"]].append(router_table["segment"]) + else: + self.router_table[router_table["node_ifs"]] = [router_table["segment"]] + + def check_destination_ip(self, destination_ip, network): + """ + 检查目标ip是否属于网段范围内 + :param destination_ip: 目标ip + :param network: 网段 + :return:10.2.3.0/24 + """ + ip = ipaddress.ip_address(destination_ip) + network = ipaddress.ip_network(network) + if ip in network: + return True + if network == "0.0.0.0/24": # 如果网段为 0.0.0.0/24 则为默认路由 + return True + + def transmit(self, packet: SimPacket): + """ + 转发数据包 + :return: + """ + def transfer(): + flag = False + next_hop_ifs = None + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn.ConfigCorrect == 0: + continue + if conn == packet.up_jump: + continue + ifs = self.connections.index(conn) + 1 + if self.router_table.get(ifs) is None: + continue + for network in self.router_table.get(ifs): + if self.check_destination_ip(packet.destination_ip, network): + flag = True + next_hop_ifs = ifs + if flag: + conn = self.connections[next_hop_ifs - 1] + packet.up_jump = conn + conn.transfer(self, packet) + else: + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.NobjS != self: + if conn.NobjE.ObjType == 1: + conn.transfer(self, packet) + break + error_message = "路由寻址失败" + self.transfer_animate(False, packet, error_message) + def receive(message): + packet.message = message + self.is_packet = True + transfer() + if not self.is_packet: + PacketWindow(self.canvas, {"packet": True, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": packet.receiveObj, + "tag": packet.message, + "send": receive, + "mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]}, + packet_step=["ip", "mac"]) + else: + transfer() + + def receive(self, packet): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + def receive(message): + print(message) + self.is_un_packet = True + packet.message = message + sleep(1.5) + self.transmit(packet) + if not self.is_un_packet: + PacketWindow(self.canvas, {"packet": False, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": packet.receiveObj, + "tag": packet.message, + "send": receive, + "mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]}, + packet_step=["ip", "mac"][::-1]) + else: + self.transmit(packet) + + +class SimSwitch(SimBase): + # todo: 交换机类 + """ + 交换机类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 3 + self.ObjLabel = label if label is not None else self.set_default_name() + self.mac_table = {} + self.set_default_mac_table() + + def set_default_mac_table(self): + """ + 将数据库中的路由表信息提取 + :return: + """ + sql = f"select * from mac_table where obj_id='{self.ObjID}'" + router_tables = search(sql) + for index, router_table in router_tables.iterrows(): + if router_table["node_ifs"] in self.mac_table: + self.mac_table[router_table["node_ifs"]].append(router_table["mac"]) + else: + self.mac_table[router_table["node_ifs"]] = [router_table["mac"]] + + def transmit(self, packet: SimPacket): + """ + 转发数据包 + :return: + """ + flag = False + next_hub_ifs = None + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn.ConfigCorrect == 0: + continue + ifs = self.connections.index(conn) + 1 + if packet.destination_mac in self.mac_table.get(ifs, []): + flag = True + next_hub_ifs = ifs + if flag: + conn = self.connections[next_hub_ifs - 1] + packet.up_jump = conn + conn.transfer(self, packet) + return + for conn in self.connections: # 将数据包往所有接口进行转发 + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.ConfigCorrect == 0: + continue + new_packet = SimPacket(packet.source_ip, + packet.source_mac, + packet.destination_ip, + packet.destination_mac, + packet.message) + new_packet.up_jump = conn + threading.Thread(target=conn.transfer, args=(self, new_packet)).start() + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + print(f"交换机{self.ObjLabel}接受到数据{packet.message}") + self.transmit(packet) + + +class SimHub(SimBase): + """ + 集线器类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 4 + self.ObjLabel = label if label is not None else self.set_default_name() + + def transmit(self, packet: SimPacket): + """ + 集线器转发数据包 + :return: + """ + for conn in self.connections: # 将数据包往所有接口进行转发 + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.ConfigCorrect == 0: + continue + new_packet = SimPacket(packet.source_ip, + packet.source_mac, + packet.destination_ip, + packet.destination_mac, + packet.message) + new_packet.up_jump = conn + threading.Thread(target=conn.transfer, args=(self, new_packet)).start() + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!") + self.transmit(packet) diff --git a/SimUtil.py b/SimUtil.py new file mode 100644 index 0000000..3b3c268 --- /dev/null +++ b/SimUtil.py @@ -0,0 +1,267 @@ +import ipaddress +import re +import sqlite3 +from tkinter import messagebox + +import ttkbootstrap as tk +from ttkbootstrap import * +from ttkbootstrap import ttk +import pandas as pd + +from SimObjs import SimRouter + + +def round_rectangle(cv, x1, y1, x2, y2, radius=30, **kwargs): + """ + 绘制圆角矩形 + :param cv: canvas对象 + :param radius: 圆角值 + :return: + """ + points = [x1 + radius, y1, + x1 + radius, y1, + x2 - radius, y1, + x2 - radius, y1, + x2, y1, + x2, y1 + radius, + x2, y1 + radius, + x2, y2 - radius, + x2, y2 - radius, + x2, y2, + x2 - radius, y2, + x2 - radius, y2, + x1 + radius, y2, + x1 + radius, y2, + x1, y2, + x1, y2 - radius, + x1, y2 - radius, + x1, y1 + radius, + x1, y1 + radius, + x1, y1] + + return cv.create_polygon(points, **kwargs, smooth=True) + +def validate_ip_address(ip_address): + """ + 匹配ip地址格式是否规范 + :param ip_address: IP地址 + :return: Boolean + """ + # 定义IP地址的正则表达式模式 + pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$' + pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$' + # 使用re模块进行匹配 + match_with_subnet = re.match(pattern_with_subnet, ip_address) + match_without_subnet = re.match(pattern_without_subnet, ip_address) + if match_with_subnet: + # 带有子网掩码的IP地址 + # 检查每个组件的取值范围是否在 0-255 之间 + for group in match_with_subnet.groups()[:4]: + if not (0 <= int(group) <= 255): + return False + # 检查子网掩码的取值范围是否在 0-32 之间 + subnet_mask = int(match_with_subnet.groups()[4]) + if not (0 <= subnet_mask <= 32): + return False + return True + elif match_without_subnet: + # 不带子网掩码的IP地址 + # 检查每个组件的取值范围是否在 0-255 之间 + for group in match_without_subnet.groups(): + if not (0 <= int(group) <= 255): + return False + return True + else: + # IP地址格式不正确 + return False + + +class ExportUtil(): + def __init__(self, path): + self.conn = sqlite3.connect('./network.db') + self.path = path + + def get_table_names(self): + cursor = self.conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") # 如果你使用SQLite数据库 + tables = cursor.fetchall() + cursor.close() + return [table[0] for table in tables] + + def export(self): + tables = self.get_table_names() + with pd.ExcelWriter(self.path, engine='openpyxl') as writer: + for table in tables: + table_name = table + # a. 从数据库中获取表的数据并存储在DataFrame中 + query = f"SELECT * FROM {table_name}" + df = pd.read_sql(query, self.conn) + # b. 使用Pandas将数据写入Excel文件的不同sheet中 + df.to_excel(writer, sheet_name=table_name, index=False) + + def import_data(self): + excel_file = pd.ExcelFile(self.path) + sheet_names = excel_file.sheet_names + cursor = self.conn.cursor() + for sheet_name in sheet_names: + # 4. 使用 Pandas 读取工作表数据 + df = pd.read_excel(excel_file, sheet_name=sheet_name) + # 5. 获取工作表的列名 + columns = df.columns.tolist() + # 6. 构造插入语句 + columns_str = ', '.join(columns) + placeholders = ', '.join(['?' for _ in range(len(columns))]) + sql = f"INSERT INTO {sheet_name} ({columns_str}) VALUES ({placeholders})" + # 7. 将数据插入数据库 + for index, row in df.iterrows(): + # 8. 使用动态生成的 SQL 语句将数据插入数据库 + cursor.execute(sql, tuple(row)) + self.conn.commit() + + +class RouterConfigWindow(tk.Toplevel): + def __init__(self, parent, router_obj): + super().__init__(parent) + self.geometry("435x433+350+200") + self.title(f"{router_obj.ObjLabel}路由表配置") + self.router_obj = router_obj + self.interface_entries = [] + self.router_table = {} + self.create_interface_inputs() + self.create_router_table() + + def create_interface_inputs(self): + label_text = ["接口1", "接口2", "接口3", "接口4"] + for i in range(4): + label = tk.Label(self, text=label_text[i], font=("黑体", 16)) + label.grid(row=i, column=0, padx=10, pady=5, sticky="w") + entry = tk.Entry(self, width=20, font=("黑体", 16),) + entry.grid(row=i, column=1, padx=10, pady=5, sticky="w") + self.interface_entries.append(entry) + button = tk.Button(self, text="添加", font=("黑体", 16), command=lambda index=i: self.add_router_entry(index)) + button.grid(row=i, column=2, padx=10, pady=5) + lab = LabelFrame(self, text="示例") + lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20) + Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11", font=("黑体", 16)).pack() + + def create_router_table(self): + def on_right_click(event): + row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 + if row: + self.router_treeview.selection_set(row) # 选中该行 + delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 + + def delete_row(): + selected_items = self.router_treeview.selection() # 获取选中的行 + for item in selected_items: + ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] + self.router_obj.delete_config(ifs, network) + self.router_treeview.delete(item) + self.router_table_frame = tk.Frame(self) + self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) + style = ttk.Style() + style.configure("Custom.Treeview.Heading", font=("宋体", 15)) + style.configure("Custom.Treeview", rowheight=30, font=("宋体", 15)) + self.router_treeview = ttk.Treeview(self.router_table_frame, style="Custom.Treeview", columns=("Interface", "Route"), show="headings") + self.router_treeview.heading("Interface", text="接口") + self.router_treeview.heading("Route", text="网段") + self.router_treeview.pack(side="left", fill="both") + scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) + scrollbar.pack(side="right", fill="y") + self.router_treeview.configure(yscrollcommand=scrollbar.set) + self.router_table = self.router_obj.router_table + self.router_treeview.bind("", on_right_click) + # 创建删除菜单 + delete_menu = tk.Menu(self.master, tearoff=False) + delete_menu.add_command(label="删除", command=delete_row) + self.update_router_table() + + def add_router_entry(self, index): + entry_text = self.interface_entries[index].get() + try: + ipaddress.ip_network(entry_text) + if isinstance(self.router_obj, SimRouter): + if not validate_ip_address(entry_text): + messagebox.showerror("注意", message="添加的网段信息格式不合格") + self.interface_entries[index].delete(0, tk.END) + self.focus_set() + return + if entry_text: + if index + 1 in self.router_table: + self.router_table[index + 1].append(entry_text) + else: + self.router_table[index + 1] = [entry_text] + self.interface_entries[index].delete(0, tk.END) + self.router_obj.add_config(entry_text, index + 1) + self.update_router_table() + self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!") + except: + messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12") + return + + def update_router_table(self): + self.router_treeview.delete(*self.router_treeview.get_children()) + for i, entrys in self.router_table.items(): + for entry in entrys: + self.router_treeview.insert("", "end", values=(f"接口{i}", entry)) + + +class SwitchConfigWindow(RouterConfigWindow): + def __init__(self, parent, router_obj): + super().__init__(parent, router_obj) + self.geometry("435x433+350+200") + self.title(f"{router_obj.ObjLabel}交换表配置") + self.router_obj = router_obj + self.interface_entries = [] + self.router_table = {} + self.create_interface_inputs() + self.create_router_table() + + def create_router_table(self): + def on_right_click(event): + row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 + if row: + self.router_treeview.selection_set(row) # 选中该行 + delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 + + def delete_row(): + selected_items = self.router_treeview.selection() # 获取选中的行 + for item in selected_items: + ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] + self.router_obj.delete_config(ifs, network) + self.router_treeview.delete(item) + + self.router_table_frame = tk.Frame(self) + self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) + self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings") + self.router_treeview.heading("Interface", text="接口") + self.router_treeview.heading("Route", text="mac") + self.router_treeview.pack(side="left", fill="both") + scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) + scrollbar.pack(side="right", fill="y") + self.router_treeview.configure(yscrollcommand=scrollbar.set) + self.router_table = self.router_obj.mac_table + self.router_treeview.bind("", on_right_click) + # 创建删除菜单 + delete_menu = tk.Menu(self.master, tearoff=False) + delete_menu.add_command(label="删除", command=delete_row) + self.update_router_table() + + + def add_router_entry(self, index): + entry_text = self.interface_entries[index].get() + if isinstance(self.router_obj, SimRouter): + if not validate_ip_address(entry_text): + messagebox.showerror("注意", message="添加的网段信息格式不合格") + self.interface_entries[index].delete(0, tk.END) + self.focus_set() + return + if entry_text: + if index + 1 in self.router_table: + self.router_table[index + 1].append(entry_text) + else: + self.router_table[index + 1] = [entry_text] + self.interface_entries[index].delete(0, tk.END) + self.router_obj.add_config(entry_text, index + 1) + self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!") + self.update_router_table() \ No newline at end of file diff --git a/database.xlsx b/database.xlsx new file mode 100644 index 0000000..1f560c6 Binary files /dev/null and b/database.xlsx differ diff --git a/dbUtil.py b/dbUtil.py new file mode 100644 index 0000000..f2e20ef --- /dev/null +++ b/dbUtil.py @@ -0,0 +1,104 @@ +import sqlite3 +import sys + +import pandas as pd +from pandas import DataFrame + +conn = sqlite3.connect(sys.path[0]+"/network.db") + +def execute_sql(sql): + """ + 执行sql语句 + :param sql: + :return: + """ + cursor = conn.cursor() + cursor.execute(sql) + conn.commit() + + +def search(sql) -> DataFrame: + return pd.read_sql(sql, conn) + + +def delete_obj(obj_id): + cursor = conn.cursor() + delete_obj_sql = f"delete from sim_objs where ObjID='{obj_id}'" + cursor.execute(delete_obj_sql) + delete_conn_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{obj_id}')" + cursor.execute(delete_conn_sql) + conn.commit() + + +def truncate_db(): + init_database() + +def init_database(): + cursor = conn.cursor() + cursor.execute(""" + DROP TABLE IF EXISTS `conn_config`; + """) + cursor.execute(""" + CREATE TABLE `conn_config` ( + `conn_id` varchar(55) NULL DEFAULT NULL, + `node_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `ip` varchar(55) NULL DEFAULT NULL, + `mac` varchar(128) NULL DEFAULT NULL, + `conn_port` varchar(32) NULL DEFAULT NULL, + `addr` varchar(255) NULL DEFAULT NULL, + CONSTRAINT `conn_config_sim_conn_conn_id_fk` FOREIGN KEY (`conn_id`) REFERENCES `sim_conn` (`conn_id`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + """) + cursor.execute(""" + DROP TABLE IF EXISTS `mac_table`; + """) + cursor.execute(""" + CREATE TABLE `mac_table` ( + `obj_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `mac` varchar(55) NULL DEFAULT NULL, + CONSTRAINT `mac_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + """) + cursor.execute(""" + DROP TABLE IF EXISTS `router_table`; + """) + cursor.execute(""" + CREATE TABLE `router_table` ( + `obj_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `segment` varchar(55) NULL DEFAULT NULL, + CONSTRAINT `router_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + + """) + cursor.execute(""" + DROP TABLE IF EXISTS `sim_conn`; + """) + cursor.execute(""" + CREATE TABLE `sim_conn` ( + `conn_id` varchar(255) NOT NULL , + `ConfigCorrect` int(0) NULL DEFAULT NULL , + PRIMARY KEY (`conn_id`) +) ; + + """) + cursor.execute(""" + DROP TABLE IF EXISTS `sim_objs`; + """) + cursor.execute(""" + CREATE TABLE `sim_objs` ( + `ObjID` varchar(50) NOT NULL, + `ObjType` int(0) NULL DEFAULT NULL, + `ObjLabel` varchar(20) NULL DEFAULT NULL, + `ObjX` int(0) NULL DEFAULT NULL, + `ObjY` int(0) NULL DEFAULT NULL, + `ConfigCorrect` int(0) NULL DEFAULT NULL, + PRIMARY KEY (`ObjID`) +) ; + """) + conn.commit() + +if __name__ == '__main__': + init_database() diff --git a/network.db b/network.db new file mode 100644 index 0000000..2b5d9a0 Binary files /dev/null and b/network.db differ diff --git a/packet.py b/packet.py new file mode 100644 index 0000000..012a16b --- /dev/null +++ b/packet.py @@ -0,0 +1,433 @@ +import time + + +from SimObjs import * +from tkinter import * +from tkinter import messagebox + +from PIL import ImageTk, Image +from AgreementUtil import AgreementUtil + + +def create_label(canvas: Canvas, x, y, width, height, text, rect_color, font_color, tag): + canvas.create_rectangle(x, y, x + width, y + height, fill=rect_color, tags=tag) + canvas.create_text(x + width / 2, y + height / 2, text=text, fill=font_color, tags=tag) + return tag + + +class PacketWindow(Toplevel): + def __init__(self, master, packet, packet_step=["app", "trans", "ip", "mac"], *args, **kwargs): + """ + packet:{"packet": True, "mode": {"app": True, "trans": True, "ip": True, "mac": True}} + """ + super().__init__(master, *args, **kwargs) + self.master = master + self.width = 300 + self.height = 400 + self.geometry(f"{self.width}x{self.height}+200+200") + self.attributes("-topmost", True) + self.resizable(width=False, height=False) + self.packet_button_str = {"app": "应用层", "ip": "网络层", "mac": "链路层", "trans": "传输层"} + if not packet["packet"]: + packet["mode"] = packet["mode"][::-1] # True 为封包 False 为解包 + self.packet_step = packet_step + self.step = 0 + self.packet: dict[str: bool] = packet + self.sendObj: SimHost = packet["sendObj"] + self.receiveObj: SimHost = packet["receiveObj"] + self.packet_option = {"app": {"color": "#ff3b26", "command": self.app_packet if packet["packet"] else self.app_unpack}, + "ip": {"color": "#00a2f2", "command": self.ip_packet if packet["packet"] else self.ip_unpack}, + "mac": {"color": "#46707a", "command": self.mac_packet if packet["packet"] else self.mac_unpack}, + "trans": {"color": "#710a00", "command": self.trans_packet if packet["packet"] else self.trans_unpack}} + self.canvas = Canvas(self, width=self.width, height=self.height) + self.canvas.place(x=0, y=0, anchor='nw') + self.packet_height, self.packet_init_width = 25, 100 + self.background_img = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height))) + self.canvas.create_image(0, 0, image=self.background_img, anchor=NW) + # self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, 10, width=self.packet_init_width, height=self.packet_height, text=self.packet["tag"], rect_color="#dee1e6", font_color="black", tag="packet") + self.step_y = {} + self.message = packet["tag"] + self.create_widget() + + def move_to(self, tag, target_x, target_y, speed=10): + # 获取当前位置 + current_coords = self.canvas.coords(tag) + if len(current_coords) < 2: + return # 如果没有坐标,就退出函数 + current_x, current_y = current_coords[0], current_coords[1] + # 计算移动方向 + delta_x = (target_x if target_x else current_x) - current_x + delta_y = (target_y if target_y else current_y) - current_y + # 计算下一步的位置 + next_x = current_x + (speed if delta_x > 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x) + next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y) + # 移动对象 + self.canvas.move(tag, next_x - current_x, next_y - current_y) + # 如果对象还没有到达目标,继续移动 + if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y): + self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed)) + + def create_widget(self): + mode_text = "封包" if self.packet["packet"] else "解包" + num, margin_top, button_width, button_height = 1, 30, 120, 40 + for data in self.packet["mode"]: + key = list(data.keys())[0] + value = list(data.values())[0] + if value: + Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"], + font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1), + width=button_width, height=button_height) + + num += 1 + Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW) + Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW) + + def send_packet(self): + if self.step != len(self.packet_step): + messagebox.showerror("注意", "尚未完成{}!".format("封包" if self.packet["packet"] else "解包")) + return + self.packet["send"](self.message) + self.destroy() + + def create_window(self, option): + toplevel = Toplevel(self) + toplevel.title(option["title"]) + toplevel.geometry("450x220+300+300") + for entry_option in option["entry"]: + index = option["entry"].index(entry_option) + key = list(entry_option.keys())[0] + value = list(entry_option.values())[0] + Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10) + if value is None: + continue + Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10) + Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW) + Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW) + toplevel.attributes("-topmost", True) + return toplevel + + def app_packet(self): + """ + 应用层封包 + """ + if self.packet_step[self.step] != "app": + messagebox.showerror("注意", "封包顺序出错!") + return + SourceAppAddr = StringVar() + TargetAppAddr = StringVar() + def packet(): + toplevel.destroy() + source_app_addr = SourceAppAddr.get() + target_app_addr = TargetAppAddr.get() + if source_app_addr == self.sendObj.interface[0]["addr"] \ + and target_app_addr == self.receiveObj.interface[0]["addr"]: + + # 动画 + # app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app") + # self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1) + # self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1) + + self.message = target_app_addr + "&" + self.packet["tag"] + self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}], + "command": packet}) + + def app_unpack(self): + """ + 应用层解包 + """ + if self.packet_step[self.step] != "app": + messagebox.showerror("注意", "解包顺序出错!") + return + TargetAppAddr = StringVar() + def packet(): + toplevel.destroy() + target_app_addr = TargetAppAddr.get() + if target_app_addr == self.receiveObj.interface[0]["addr"]: + self.message = self.message.split("&")[1] + self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") + toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}], + "command": packet}) + + def trans_packet(self): + """ + 传输层封包 + """ + if self.packet_step[self.step] != "trans": + messagebox.showerror("注意", "封包顺序出错!") + return + SentPort = StringVar() + RcptPort = StringVar() + SplitCount = IntVar() + + def packet(): + + sent_port = SentPort.get() + rcpt_port = RcptPort.get() + split_count = SplitCount.get() + count = split_appdata(self.packet["size"]) + print(sent_port, self.sendObj.interface[0]["conn_port"]) + if sent_port == self.sendObj.interface[0]["conn_port"] \ + and rcpt_port == self.receiveObj.interface[0]["conn_port"]\ + and split_count == count: + toplevel.destroy() + # 动画 + # self.canvas.delete(self.packet_rect) + # self.canvas.delete("app") + # self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]), + # width=self.packet_init_width + 50, height=self.packet_height, + # text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black", + # tag="packet") + # trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26", + # "white", tag="trans") + # self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1) + # self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1) + + + self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port)) + self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)]))) + self.step += 1 + else: + messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort}, + {"接收主机端口:": RcptPort}, + {"拆包数量": SplitCount}, + {"每个包约2048": None}], + "command": packet}) + + def trans_unpack(self): + """ + 传输层解包 + """ + print(self.packet_step) + print(self.step) + if self.packet_step[self.step] != "trans": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptPort = StringVar() + + def packet(): + rcpt_port = RcptPort.get() + if rcpt_port == self.receiveObj.interface[0]["conn_port"]: + toplevel.destroy() + data = AgreementUtil.parse_udp_packet(self.message) + print(data) + self.message = data[-1] + self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}], + "command": packet}) + + def ip_packet(self): + """ + 网络层封包 + """ + if self.packet_step[self.step] != "ip": + messagebox.showerror("注意", "封包顺序出错!") + return + SourceIP = StringVar() + TargetIP = StringVar() + def packet(): + source_ip = SourceIP.get() + target_ip = TargetIP.get() + if source_ip == self.sendObj.interface[0]["ip"] \ + and target_ip == self.receiveObj.interface[0]["ip"]: + toplevel.destroy() + self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip) + self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP:": SourceIP}, + {"接收主机IP:": TargetIP}], + "command": packet}) + + def ip_unpack(self): + """ + 网络层解包 + """ + print(self.packet_step) + print(self.step) + if self.packet_step[self.step] != "ip": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptIP = StringVar() + + def packet(): + rcpt_ip = RcptIP.get() + if rcpt_ip == self.receiveObj.interface[0]["ip"]: + toplevel.destroy() + data = AgreementUtil.parse_ip_packet(self.message) + print(data) + self.message = data[-1] + self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP:": RcptIP}], + "command": packet}) + + def mac_packet(self): + """ + 链路层封包 + """ + if self.packet_step[self.step] != "mac": + messagebox.showerror("注意", "封包顺序出错!") + return + SentMac = StringVar() + RcptMac = StringVar() + def packet(): + sent_mac = SentMac.get() + rcpt_mac = RcptMac.get() + if sent_mac == self.sendObj.interface[0]["mac"] \ + and rcpt_mac == self.receiveObj.interface[0]["mac"]: + toplevel.destroy() + self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac) + self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC:": SentMac}, + {"接收主机MAC:": RcptMac}], + "command": packet}) + + def mac_unpack(self): + """ + 链路层解包 + """ + if self.packet_step[self.step] != "mac": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptMac = StringVar() + def packet(): + rcpt_mac = RcptMac.get() + if rcpt_mac == self.receiveObj.interface[0]["mac"]: + toplevel.destroy() + data = AgreementUtil.parse_ethernet_frame(self.packet["tag"]) + print(data) + self.message = data[-1] + self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC:": RcptMac}], + "command": packet}) + + +class AppData: + def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas): + self.obj_id = obj_id + self.id_key = id_key + self.app_pack_id = app_pack_id + self.app_pack_size = app_pack_size + self.app_pack_tag = app_pack_tag + self.source_app_addr = source_app_addr + self.target_app_addr = target_app_addr + self.app_packed_string = app_packed_string + self.timestamp = timestamp + + + def pack(self): + # 为了简化,我们打包成一个字典 + return vars(self) + + @staticmethod + def unpack(packed_data): + # 解包为AppData对象 + return AppData(**packed_data) + +class TransData: + def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp): + self.obj_id = obj_id + self.id_key = id_key + self.trans_pack_id = trans_pack_id + self.trans_seq = trans_seq + self.trans_tag = trans_tag + self.app_pack_id = app_pack_id + self.sent_port = sent_port + self.rcpt_port = rcpt_port + self.source_app_addr = source_app_addr + self.target_app_addr = target_app_addr + self.trans_packed_string = trans_packed_string + self.timestamp = timestamp + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return TransData(**packed_data) + + +class IPData: + def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string): + self.ObjID = obj_id + self.IDkey = id_key + self.IPPackID = ip_pack_id + self.TransPackID = trans_pack_id + self.SourceIP = source_ip + self.TargetIP = target_ip + self.SourceAppAddr = source_app_addr + self.TargetAppAddr = target_app_addr + self.IPPackedString = ip_packed_string + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return IPData(**packed_data) + + +class MACData: + def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string): + self.ObjID = obj_id + self.IDkey = id_key + self.MacPackID = mac_pack_id + self.IPPackID = ip_pack_id + self.SentMAC = sent_mac + self.RcptMAC = rcpt_mac + self.SourceIP = source_ip + self.TargetIP = target_ip + self.SourceAppAddr = source_app_addr + self.TargetAppAddr = target_app_addr + self.MacPackedString = mac_packed_string + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return MACData(**packed_data) + + +def split_appdata(AppPackSize): + MTU = 2048 + return AppPackSize // MTU + (AppPackSize % MTU > 0) + + +if __name__ == '__main__': + # 假设的最大传输单元(MTU) + MTU = 2048 # bytes + + # 模拟的数据和数据大小 + SIMULATED_DATA = "hello" + SIMULATED_SIZE = 2049 # 10 MB + + # 创建应用层数据包 + app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time()) + packed_app_data = app_packet.pack() + print(packed_app_data) + diff --git a/random_mac.py b/random_mac.py new file mode 100644 index 0000000..a033311 --- /dev/null +++ b/random_mac.py @@ -0,0 +1,9 @@ +import random + +def random_mac(): + # 生成六组两位的十六进制数 + return ":".join(["%02x" % random.randint(0, 255) for _ in range(6)]) + +# 生成MAC地址 +mac_address = random_mac() +print(mac_address)