From 0c05efe3cca2c9d2c04bcb74ce7a3a8b50915df2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=8E=89=E8=BE=89?= <2207153529@qq.com> Date: Tue, 2 Jul 2024 09:00:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E7=89=88=E6=9C=AC=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AgreementUtil.py | 107 +++++++++ NetworkAnalog.py | 421 +++++++++++++++++++++++++++++++++ SimObjs.py | 600 +++++++++++++++++++++++++++++++++++++++++++++++ SimUtil.py | 267 +++++++++++++++++++++ database.xlsx | Bin 0 -> 9289 bytes dbUtil.py | 104 ++++++++ network.db | Bin 0 -> 40960 bytes packet.py | 433 ++++++++++++++++++++++++++++++++++ random_mac.py | 9 + 9 files changed, 1941 insertions(+) create mode 100644 AgreementUtil.py create mode 100644 NetworkAnalog.py create mode 100644 SimObjs.py create mode 100644 SimUtil.py create mode 100644 database.xlsx create mode 100644 dbUtil.py create mode 100644 network.db create mode 100644 packet.py create mode 100644 random_mac.py diff --git a/AgreementUtil.py b/AgreementUtil.py new file mode 100644 index 0000000..d9e330f --- /dev/null +++ b/AgreementUtil.py @@ -0,0 +1,107 @@ +import socket +import struct + +def format_mac_address(mac_bytes): + """ + Format a MAC address from bytes to a human-readable string. + + Args: + mac_bytes (bytes): The MAC address in bytes format. + + Returns: + str: The MAC address in human-readable format. + """ + mac_str = mac_bytes.hex() + mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2)) + return mac_formatted + +class AgreementUtil(): + + @staticmethod + def create_udp_packet(data, dest_port, src_port): + """ + 创建UDP数据包 + Returns: + bytes: `UDP数据包`. + """ + # UDP header fields + udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length + return udp_header + data.encode() + + @staticmethod + def create_ip_packet(udp_packet, dest_ip, src_ip): + """ + 创建IP数据包 + Returns: + bytes: IP数据包 + """ + # IP header fields + version = 4 + ihl = 5 # Internet Header Length + type_of_service = 0 + total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length + identification = 54321 + flags = 0 # Don't fragment + fragment_offset = 0 + ttl = 64 # Time to live + protocol = 17 # UDP + checksum = 0 # For simplicity, set to 0 + source_ip = socket.inet_aton(src_ip) + dest_ip = socket.inet_aton(dest_ip) + # IP数据包头部字段 + ip_header = struct.pack('!BBHHHBBH4s4s', + (version << 4) + ihl, type_of_service, total_length, + identification, (flags << 13) + fragment_offset, + ttl, protocol, checksum, source_ip, dest_ip) + return ip_header + udp_packet + + @staticmethod + def create_ethernet_frame(ip_packet, dest_mac, src_mac): + """ + 创建以太网帧数据包 + Returns: + bytes: 以太网帧数据包 + """ + # Convert MAC addresses from string format to bytes + dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', '')) + src_mac_bytes = bytes.fromhex(src_mac.replace(':', '')) + # Ethernet type for IP (0x0800) + ethernet_type = 0x0800 + # Pack Ethernet frame fields + ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type) + return ethernet_frame + ip_packet + + @staticmethod + def parse_ethernet_frame(frame): + """ + 解析以太网帧数据包 + Returns: 发送端MAC,接收端MAC,以太网类型,IP数据包 + """ + dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14]) + payload = frame[14:] + return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload + + @staticmethod + def parse_ip_packet(packet): + """ + 解析 IP 数据包 + Returns: 发送端IP,接收端IP,协议,UDP数据包 + """ + version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \ + ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20]) + payload = packet[20:] + return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload + + @staticmethod + def parse_udp_packet(packet): + """ + 解析UDP数据包 + Returns: + tuple: 发送主机端口,接收主机端口,数据 + """ + src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8]) + data = packet[8:] + return src_port, dest_port, data.decode() + + + diff --git a/NetworkAnalog.py b/NetworkAnalog.py new file mode 100644 index 0000000..8f350b4 --- /dev/null +++ b/NetworkAnalog.py @@ -0,0 +1,421 @@ +import ipaddress +import sys +import threading +from tkinter import filedialog + +from SimUtil import * + +from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase +from dbUtil import search, execute_sql, delete_obj, truncate_db +from PIL import Image as imim +from packet import * + + +class Message(Canvas): + def __init__(self, master, **kwargs): + super().__init__(master, **kwargs) + self.message = [] + self.master = master + self.scrollbar_y = tk.Scrollbar(master, orient="vertical", command=self.yview) + self.scrollbar_y.pack(side="right", fill=BOTH) + self.scrollbar_x = tk.Scrollbar(master, orient=HORIZONTAL, command=self.xview) + self.scrollbar_x.pack(side="bottom", fill=BOTH) + self.config(yscrollcommand=self.scrollbar_y.set, xscrollcommand=self.scrollbar_x.set) + + def show_message(self, message, color="#5fa8fe"): + for mes in message.split("\n"): + self.message.append({"message": mes, "color": color}) + num = 0 + self.delete("message") + for function in self.message: + self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15), + fill=function["color"], tags="message") + num += 1 + self.update() + self.configure(scrollregion=self.bbox("all")) + self.scrollbar_y.set(1.0, 1.0) + self.yview_moveto(1.0) + + +class NetWorkAnalog(Canvas): + def __init__(self, master, **kwargs): + super().__init__(master, **kwargs) + self.master = master + self.router_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40))) + self.switch_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40))) + self.hub_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40))) + self.host_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40))) + + self.width = int(self.cget("width")) + self.height = int(self.cget("height")) + self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)] + self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4)))) + self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45)))) + self.message_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30)) + ) + self.text_back_img = ImageTk.PhotoImage( + Image.open("../images/文字背景@3x.png").resize((155, 90)) + ) + message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30) + message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW) + self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30) + self.message.pack() + self.message.configure(bg="#edf8ff") + self.configure(bg="#ddeafe") + self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3]))) + self.create_image(0, 0, image=self.filename, anchor=NW) + self.chose = self.host_img + self.AllSimObjs = {} + self.conns = [] + self.drawLine = True # 画线标志 + self.line_start_obj = None + self.line_end_obj = None + self.line_start_ifs = None + self.line_end_ifs = None + self.chose_obj = None + self.show_label_flag = True + self.show_interface_flag = True + self.trans_obj = set() + self.create_widget() + + def is_chose(self): + """ + 当被选中时,绘制选中框 + :return: + """ + self.delete("rectangle") + self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15, + self.chose_obj.ObjY + 15, outline="red", tags="rectangle") + + def reload_data(self): + # todo: 加载上一次程序运行的数据 + """ + 加载上一次程序运行时的数据 + :return: + """ + self.AllSimObjs = {} + self.conns = [] + sim_obj_sql = "select * from sim_objs" + sim_obj_data = search(sim_obj_sql) + for index, row in sim_obj_data.iterrows(): # 初始化组件对象 + sim_type = row["ObjType"] + ObjX = row["ObjX"] + ObjY = row["ObjY"] + ConfigCorrect = row["ConfigCorrect"] + ObjLable = row["ObjLabel"] + ObjID = row["ObjID"] + if sim_type == 1: + tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + elif sim_type == 2: + tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + elif sim_type == 3: + tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + else: + tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) + tag.create_img() + self.AllSimObjs[tag.ObjID] = tag + + sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id" + sim_conn_data = search(sim_conn_sql) + conn_datas = {} + for index, conn in sim_conn_data.iterrows(): + if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas: + conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])] + else: + conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"])) + for key, value in conn_datas.items(): + conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1], + self.AllSimObjs[value[1][0]], value[1][1], key[1]) + conn_obj.draw_line() + self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象 + self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj + self.conns.append(conn_obj) + conn_obj.ConfigCorrect = key[1] + + def show_obj(self, AllSimObj, conns): + self.AllSimObjs = AllSimObj + self.conns = conns + for key, sim_obj in self.AllSimObjs.items(): + sim_obj.create_img() + for conn in self.conns: + conn.draw_line() + + def send_packet(self): + """ + 发送数据包 + :return: + """ + hosts = {} + for key, tag in self.AllSimObjs.items(): + if tag.ObjType == 1 and tag.ConfigCorrect == 1: + hosts[tag.ObjLabel] = tag.ObjID + child2 = tk.Toplevel() + child2.title("数据包配置") + child2.geometry('392x306+200+110') + child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作 + tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10) + send_host = StringVar() + host_combobox = ttk.Combobox( + master=child2, # 父容器 + height=5, # 高度,下拉显示的条目数量 + width=18, # 宽度 + state='readonly', # readonly(只可选) + font=('', 16), # 字体 + textvariable=send_host, # 通过StringVar设置可改变的值 + values=list(hosts.keys()), # 设置下拉框的选项 + ) + + host_combobox.grid(row=0, column=1, pady=10) + tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示 + receive_host = StringVar() + ttk.Combobox( + master=child2, # 父容器 + height=5, # 高度,下拉显示的条目数量 + width=18, # 宽度 + state='readonly', # readonly(只可选) + font=('', 16), # 字体 + textvariable=receive_host, # 通过StringVar设置可改变的值 + values=list(hosts.keys()), # 设置下拉框的选项 + ).grid(row=1, column=1, pady=10) + tk.Label(child2, text='数据大小(kb):', font=('黑体', 16)).grid(row=2, column=0, sticky='w', + pady=10) # ,sticky='w'靠左显示 + packet_size = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar()) + packet_size.grid(row=2, column=1, pady=10) + tk.Label(child2, text='数据标签:', font=('黑体', 16)).grid(row=3, column=0, sticky='w', + pady=10) # ,sticky='w'靠左显示 + packet_label = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar()) + packet_label.grid(row=3, column=1, pady=10) + + def send(): + """ + 发送数据包 + :return: + """ + if send_host.get() == "" or receive_host.get() == "" or packet_size.get() == "" or packet_label.get() == "": + messagebox.showerror("注意", message="信息不能为空") + return + if send_host.get() == receive_host.get(): + messagebox.showerror("注意", message="发送主机和接收主机不能相同!") + send_message = """发送主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}""" + receive_message = """接收主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}""" + send_host_obj = self.AllSimObjs[hosts[send_host.get()]] + receive_host_obj = self.AllSimObjs[hosts[receive_host.get()]] + pack_label = packet_label.get() + pack_size = packet_size.get() + child2.destroy() + count = split_appdata(int(pack_size)) + self.show_top_message(send_host_obj, + message=send_message.format(send_host_obj.ObjLabel, send_host_obj.interface[0]["ip"], + send_host_obj.interface[0]["mac"], pack_size, 0, count)) + self.show_bottom_message(receive_host_obj, message=receive_message.format(receive_host_obj.ObjLabel, + receive_host_obj.interface[0][ + "ip"], + receive_host_obj.interface[0][ + "mac"], pack_size, 0, count)) + for obj in self.trans_obj: + self.delete(obj.ObjID + "detail_button") + self.delete(obj.ObjID + "detail") + self.unbind(obj.ObjID, "") + send_host_obj.create_packet(receive_host_obj, pack_label, pack_size) + + tk.Button(child2, text='开启模拟', font=('黑体', 16), height=1, command=send).grid(row=5, column=0, sticky='e', pady=10) + tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1, sticky='e', pady=10) + # 建立画出对象详情的函数 + def draw_detail(self, obj): + # 判断type对象来画出详情表 + if obj.ObjType == 1: # 当type对象为1 画出主机详情图 + app_color = ("#94D050", '#00B0F0') + trans_color = ("#94D050", '#00B0F0') + ip_color = ("#94D050", '#00B0F0') + mac_color = ("#94D050", '#00B0F0') + elif obj.ObjType == 2: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#94D050", '#00B0F0') + mac_color = ("#94D050", '#00B0F0') + elif obj.ObjType == 3: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#D3D3D3", "#D3D3D3") + mac_color = ("#94D050", '#00B0F0') + else: + app_color = ("#D3D3D3", "#D3D3D3") + trans_color = ("#D3D3D3", "#D3D3D3") + ip_color = ("#D3D3D3", "#D3D3D3") + mac_color = ("#94D050", '#00B0F0') + frist_x = obj.ObjX # 获取frist_x + frist_y = obj.ObjY # 获取frist_y + # 这里缺少一个删除其他详情的步骤 + # 画出连接详情表的线 + tag_id = obj.ObjID + "detail" + if len(self.gettags(tag_id)) > 0: + self.delete(tag_id) + else: + self.create_line((frist_x + 40, frist_y - 20), (frist_x + 80, frist_y - 20), + (frist_x + 90, frist_y - 60), + fill='#50abf8', tags=tag_id, width=2) + # 画出详情表 + self.create_image(frist_x + 50, frist_y - 140, image=self.text_back_img, anchor=NW, tags=tag_id) + # 画出相应的绿条数据 + self.create_text(frist_x + 70, frist_y - 130, text='应用层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 107, text='传输层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 84, text='IP 层', tags=tag_id) + self.create_text(frist_x + 70, frist_y - 62, text='链路层', tags=tag_id) + # 画出 右侧绿色和蓝色的类进度条 + # 应用层 + self.create_rectangle(frist_x + 197, frist_y - 135, frist_x + 170, frist_y - 120, fill=app_color[0], + outline=app_color[0], tags=tag_id) + self.create_rectangle(frist_x + 168, frist_y - 135, frist_x + 148, frist_y - 120, fill=app_color[1], + outline=app_color[1], tags=tag_id) + # 传输层 + self.create_rectangle(frist_x + 197, frist_y - 115, frist_x + 160, frist_y - 100, + fill=trans_color[0], + outline=trans_color[0], tags=tag_id) + self.create_rectangle(frist_x + 158, frist_y - 115, frist_x + 133, frist_y - 100, + fill=trans_color[1], + outline=trans_color[1], tags=tag_id) + # IP 层 + self.create_rectangle(frist_x + 197, frist_y - 93, frist_x + 150, frist_y - 78, fill=ip_color[0], + outline=ip_color[0], tags=tag_id) + self.create_rectangle(frist_x + 148, frist_y - 93, frist_x + 118, frist_y - 78, fill=ip_color[1], + outline=ip_color[1], tags=tag_id) + # 链路层 + self.create_rectangle(frist_x + 197, frist_y - 70, frist_x + 133, frist_y - 55, fill=mac_color[0], + outline=mac_color[0], tags=tag_id) + self.create_rectangle(frist_x + 131, frist_y - 70, frist_x + 98, frist_y - 55, fill=mac_color[1], + outline=mac_color[1], tags=tag_id) + + def trans_over(self, objs): + print("传输完成") + self.trans_obj = objs + for obj in objs: + obj.create_detail_button() + self.tag_bind(obj.ObjID, "", lambda event, obj=obj: self.draw_detail(obj)) # 绑定右击事件 + obj.is_packet = False + obj.is_un_packet = False + + def clear_canvas(self): + """ + 清除画布 + :return: + """ + ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?') + if ask == "no": + return + truncate_db() # 清除数据库 + for tag_id, tag in self.AllSimObjs.items(): + self.delete(tag_id) + self.delete(tag_id + "text") + for conn in self.conns: + conn.delete_line() + self.delete("rectangle") + self.AllSimObjs.clear() + self.conns.clear() + + def export_data(self): + try: + export = ExportUtil(sys.path[0] + "/database.xlsx") + export.export() + self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx")) + except Exception as E: + self.message.show_message(str(E), color="red") + + def import_data(self): + truncate_db() + for tag_id, tag in self.AllSimObjs.items(): + self.delete(tag_id) # 删除画布中的所有组件 + self.delete(tag_id + "text") + for conn in self.conns: + conn.delete_line() # 删除画布中的所有连接线 + self.delete("rectangle") + self.AllSimObjs.clear() # 删除当前所有组件对象 + self.conns.clear() # 删除所有连接配置 + file_path = filedialog.askopenfilename() # 获取用户选择的文件 + if file_path: + export = ExportUtil(file_path) + export.import_data() + self.message.show_message("文件导入成功!") + self.reload_data() + + def show_top_message(self, obj, message): + # todo: 显示网络配置信息 + """ + 显示网络配置信息 + :return: + """ + self.delete("netSet") + self.create_text(self.width * 0.82 + 120, 80, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'), + fill="#7030a0", tags="netSet") + self.create_text(self.width * 0.82 + 20, 80 + 25, text=message, + anchor="nw", font=('宋体', 14), tags="netSet") + + def show_bottom_message(self, obj, message): + # todo: 显示路由表交换表信息 + """ + 显示路由交换表信息 + :return: + """ + self.delete("routerSet") + self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'), + fill="#7030a0", tags="routerSet") + self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=message, + anchor="nw", font=('宋体', 14), tags="routerSet") + + def create_config_button(self): + # 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方 + menubar = tk.Menu(root) + # 定义一个空菜单单元 + setMenu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label='发送数据包', menu=setMenu) + setMenu.add_command(label='发送数据包', command=self.send_packet) + menubar.add_command(label='清除屏幕', command=self.clear_canvas) + menubar.add_command(label='导出数据', command=self.export_data) + menubar.add_command(label='导入数据', command=self.import_data) + root.config(menu=menubar) + + def create_widget(self): + # todo: 创建页面 + """ + 创建整体页面布局 + :return: + """ + self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标 + self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标 + self.create_text(self.width * 0.82 + 120, 30 + 15, text="发送主机详情", anchor="n", font=('微软雅黑', 18, 'bold')) + self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标 + # round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标 + self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="接收主机详情", anchor="n", font=('微软雅黑', 18, 'bold')) + # 显示左边的固定图片 + img_height, text_height, split_width = 50, 40, 120 + self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width, image=self.router_img, anchor="nw") + self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw") + self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw") + self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字 + self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw") + + +if __name__ == '__main__': + root = Window() + root.title('网络拓扑图') + screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小 + screen_height = root.winfo_screenheight() + root_attr = { + "width": screen_width * 0.83, + "height": screen_height * 0.85, + } + size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2, + (screen_height - root_attr['height']) / 2 - 30) + canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white") + canvas.place(x=0, y=0, anchor='nw') + canvas.create_config_button() + canvas.reload_data() + root.geometry(size) + root.mainloop() diff --git a/SimObjs.py b/SimObjs.py new file mode 100644 index 0000000..47c7f04 --- /dev/null +++ b/SimObjs.py @@ -0,0 +1,600 @@ +import math +import sys +import threading +from time import sleep +from tkinter import Tk +from tkinter import messagebox + +from ttkbootstrap import * +from uuid import uuid4 +import ipaddress +import time + +from PIL import ImageTk, Image, ImageFont + +from dbUtil import search +from packet import * + +pause_event = threading.Event() + +class SimBase(): + # todo: 组件父类 + """ + 图标类,所有组件的父类 + """ + def __init__(self, canvas, x, y, id, config=None, label=None): + self.ConfigCorrect = 0 if config is None else config # 是否进行配置 + self.ObjType = None # 组件类型 1->主机,2->路由器,3->交换机,4->集线器 + self.ObjLabel = "" + self.ObjID = id + self.ObjX = x + self.ObjY = y + self.canvas = canvas + self.is_packet = False + self.is_un_packet = False + self.host_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40))) + self.host_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/主机--暗色@2x.png").resize((40, 40))) + self.router_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40))) + self.router_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/路由器--暗色@2x.png").resize((40, 40))) + self.switch_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40))) + self.switch_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/交换机--暗色@2x.png").resize((40, 40))) + self.hub_img = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40))) + self.hub_img_tm = ImageTk.PhotoImage( + Image.open(sys.path[0] + "/../datas/images/集线器--暗色@2x.png").resize((40, 40))) + self.img = None + self.img_tm = None + self.interface = [{}, {}, {}, {}] + self.connections = ["1", "2", "3", "4"] + self.set_default_config() + self.packet_window = None + + def set_default_name(self): + data_frame = search(f"select objid from sim_objs where objType={self.ObjType}") + num = data_frame.size + 1 + if isinstance(self, SimHost): + name = "SHO%d" % num + elif isinstance(self, SimRouter): + name = "SRO%d" % num + elif isinstance(self, SimSwitch): + name = "SWI%d" % num + else: + name = "SHUB%d" % num + return name + + def create_img(self): + """ + 创建图片 + :return: + """ + if self.ObjType == 1: + self.img = self.host_img + self.img_tm = self.host_img_tm + elif self.ObjType == 2: + self.img = self.router_img + self.img_tm = self.router_img_tm + elif self.ObjType == 3: + self.img = self.switch_img + self.img_tm = self.switch_img_tm + else: + self.img = self.hub_img + self.img_tm = self.hub_img_tm + self.canvas.delete("L") + id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30, + image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw", + tags=self.ObjID) + self.canvas.dtag("L", "L") + self.canvas.create_text(self.ObjX - 20, self.ObjY - 60, text=self.ObjLabel, font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + if self.ObjType == 1: + self.canvas.create_text(self.ObjX - 45, self.ObjY + 15, text=self.interface[0]["ip"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.create_text(self.ObjX - 45, self.ObjY + 35, text=self.interface[0]["mac"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.create_text(self.ObjX - 45, self.ObjY + 55, text=self.interface[0]["addr"]if len(self.interface) > 0 else "", font=("", 16, "bold"), + fill="#9b78eb", tags=self.ObjID + "text", anchor="nw") + self.canvas.tag_raise(id) + + def set_default_config(self): + sql = f"select * from conn_config where node_id='{self.ObjID}'" + conn_data = search(sql) + for index, conn in conn_data.iterrows(): + self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"], + "mac": conn["mac"], + "conn_port": conn["conn_port"], + "addr": conn["addr"]} + + def get_config(self): + sql = f"select * from conn_config where node_id='{self.ObjID}'" + conn_data = search(sql) + return conn_data + + def transfer_animate(self, status, packet, packet_size, error_message=None): + if status: + text = f"消息大小: {packet_size}\n" \ + f"消息内容: {packet}" + self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20, + outline="#92d050", + width=3, fill="#92d050", tags=self.ObjID + "packetData") + self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", + font=('', 10), tags=self.ObjID + "packetData") # 显示文字 + self.canvas.update() + sleep(2) + self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 + else: + text = f"传输失败\n" if error_message is None else error_message + self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red", + width=3, fill="red", tags=self.ObjID + "packetData") + self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", + font=('', 10), tags=self.ObjID + "packetData") # 显示文字 + self.canvas.update() + sleep(2) + self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 + + def create_detail_button(self): + frist_x = self.ObjX # 获取frist_x + frist_y = self.ObjY # 获取frist_y + tag_id = self.ObjID + "detail_button" + self.canvas.create_rectangle(frist_x + 10, frist_y - 25, frist_x + 40, frist_y - 15, fill="#f0a732", + outline="#47b2ff",width=2, tags=tag_id) + + def __str__(self): + str = "" + config = self.get_config() + for index, data in config.iterrows(): + str += f"【接口{data['node_ifs']}】\n" + str += f"IP: {data['ip']}\n" + str += f"MAC: {data['mac']}\n" + return str + + +class SimPacket(): + # todo: 数据包类 + """ + 数据包类 + """ + def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message, label, size, sendObj, receiveObj): + """ + :param source_mac: 源主机mac地址 + :param source_ip: 源主机ip地址 + :param destination_mac: 目的主机mac地址 + :param destination_ip: 目的主机ip地址 + :param message: 数据 + """ + self.source_mac = source_mac + self.source_ip = source_ip + self.sendObj = sendObj + self.receiveObj = receiveObj + self.destination_mac = destination_mac + self.destination_ip = destination_ip + self.message = message + self.label = label + self.size = size + self.up_jump = None # 上一跳连接对象 + self.objs = set() + self.img = None + self.id = str(uuid4()) + self.process_img() + + def process_img(self): + img = Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)) + draw = ImageDraw.Draw(img) + font = ImageFont.truetype("arial.ttf", size=16) + text_width, text_height = draw.textsize(str(self.label), font=font) + # 计算文本在右上角的位置 + x = img.width - text_width - 10 # 右上角,距离右边缘10像素 + y = 10 # 距离上边缘10像素 + # 绘制文本 + draw.text((x, y), str(self.label), font=font, fill=(255, 0, 0)) # 红色字体 + self.img = ImageTk.PhotoImage(img) + + def move(self, id, cv, target_x, target_y, duration): + start_x, start_y = cv.coords(id) + distance_x = target_x - start_x + distance_y = target_y - start_y + steps = duration // 10 # 以10毫秒为间隔进行移动 + step_x = distance_x / steps + step_y = distance_y / steps + self._move_step(id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps) + + def _move_step(self,id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps): + if pause_event.is_set(): + new_x = start_x + step_x + new_y = start_y + step_y + sleep(2) + self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps) + if steps > 0: + new_x = start_x + step_x + new_y = start_y + step_y + cv.coords(id, new_x, new_y) + cv.update() # 更新画布显示 + sleep(0.01) # 添加延迟以控制动画速度 + self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1) + else: + cv.coords(id, target_x, target_y) + cv.delete(id) + + def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase): + self.objs.add(nodex_tag) + self.objs.add(nodey_tag) + cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id) + self.move(self.id, cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500) + + +class AllSimConnect(): + # todo: 连接类 + def __init__(self, canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None): + """ + 连接对象 + :param nodex: 节点 + :param nodex_ifs: 节点接口 + :param nodey: 节点 + :param nodey_ifs: 节点接口 + """ + self.canvas = canvas + self.ConfigCorrect = 0 if config is None else config + self.NobjS = nodex + self.NobjE = nodey + self.IfsS = nodex_ifs + self.IfsE = nodey_ifs + self.width = False if self.ConfigCorrect == 0 else True # 线的粗细 + + def transfer(self, source_node, packet: SimPacket): + """ + 传输数据 + :param packet: 数据包 + :return: + """ + if source_node == self.NobjS: + packet.transfer_packet(self.canvas, self.NobjS, self.NobjE) + self.NobjE.receive(packet) + else: + packet.transfer_packet(self.canvas, self.NobjE, self.NobjS) + self.NobjS.receive(packet) + + + def draw_line(self): + if self.width: + line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY, + width=2, fill="#c372f0", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line") + else: + line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY, + width=2, dash=(250, 4), fill="#c372f0", + tags=self.NobjS.ObjID + self.NobjE.ObjID + "line") + self.canvas.tag_lower(line, 2) + + +class SimHost(SimBase): + # todo: 主机类 + """ + 主机类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 1 + self.ObjLabel = label if label is not None else self.set_default_name() + self.interface = [{}] + self.connections = [None] + self.set_default_config() + + def packet_ok(self, receiveObj, label, size): + count = split_appdata(int(size)) + for i in range(2, count + 1): + pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], + receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], label, i, size, self, + receiveObj) + threading.Timer(1 * (i - 1), function=self.send, args=(pack,), ).start() + + def create_packet(self, receiveObj, label, size): + """ + 创建数据包 + :param receiveObj: 目的主机对象 + :param message: 消息 + :return: + """ + def send(message): + print(message) + pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], + receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], message, 1, size, self, + receiveObj) + self.send(pack) + message = """发送主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}""" + self.packet_window = PacketWindow(self.canvas, packet={"packet": True, + "size": int(size), + "sendObj": self, + "receiveObj": receiveObj, + "tag": label, + "send": send, + "mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]}) + self.packet_window.title("{} 封包".format(self.ObjLabel)) + + def send(self, packet): + """ + 发送数据包 + :param packet: + :return: + """ + connection: AllSimConnect = self.connections[0] + print(f"数据包从 {self.ObjLabel} 发出") + packet.up_jump = connection + connection.transfer(self, packet) + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + data = "" + def receive(message): + data = message + self.is_un_packet = True + packet.sendObj.packet_ok(self, data, int(packet.size)) + if not self.is_un_packet: + PacketWindow(self.canvas, {"packet": False, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": self, + "tag": packet.message, + "send": receive, + "mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]}, packet_step=["app", "trans", "ip", "mac"][::-1]) + size = int(packet.size) + message = """接收主机:{}\n\nIP地址:{}\n\nMac地址:{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}""" + self.canvas.show_bottom_message(self, message=message.format(self.ObjLabel, self.interface[0]["ip"], + self.interface[0]["mac"], + size, packet.label, split_appdata(size))) + if split_appdata(size) == packet.label: + if packet.destination_ip == self.interface[0]["ip"]: + self.transfer_animate(True, packet.message, size) + self.canvas.message.show_message(f"{self.ObjLabel}接收到数据:{data}") + else: + self.transfer_animate(False, data, size) + self.canvas.trans_over(packet.objs) + + + def __str__(self): + str = "" + config = self.get_config() + for index, data in config.iterrows(): + str += f"【接口{data['node_ifs']}】\n" + str += f"AppAddr: /Root\n" + str += f"PORT: {data['conn_port']}\n" + str += f"IP: {data['ip']}\n" + str += f"MAC: {data['mac']}\n" + return str + + +class SimRouter(SimBase): + # todo: 路由类 + """ + 路由类 + """ + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 2 + self.ObjLabel = label if label is not None else self.set_default_name() + self.router_table = {} + self.set_default_router_table() + + def set_default_router_table(self): + """ + 将数据库中的路由表信息提取 + :return: + """ + sql = f"select * from router_table where obj_id='{self.ObjID}'" + router_tables = search(sql) + for index, router_table in router_tables.iterrows(): + if router_table["node_ifs"] in self.router_table: + self.router_table[router_table["node_ifs"]].append(router_table["segment"]) + else: + self.router_table[router_table["node_ifs"]] = [router_table["segment"]] + + def check_destination_ip(self, destination_ip, network): + """ + 检查目标ip是否属于网段范围内 + :param destination_ip: 目标ip + :param network: 网段 + :return:10.2.3.0/24 + """ + ip = ipaddress.ip_address(destination_ip) + network = ipaddress.ip_network(network) + if ip in network: + return True + if network == "0.0.0.0/24": # 如果网段为 0.0.0.0/24 则为默认路由 + return True + + def transmit(self, packet: SimPacket): + """ + 转发数据包 + :return: + """ + def transfer(): + flag = False + next_hop_ifs = None + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn.ConfigCorrect == 0: + continue + if conn == packet.up_jump: + continue + ifs = self.connections.index(conn) + 1 + if self.router_table.get(ifs) is None: + continue + for network in self.router_table.get(ifs): + if self.check_destination_ip(packet.destination_ip, network): + flag = True + next_hop_ifs = ifs + if flag: + conn = self.connections[next_hop_ifs - 1] + packet.up_jump = conn + conn.transfer(self, packet) + else: + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.NobjS != self: + if conn.NobjE.ObjType == 1: + conn.transfer(self, packet) + break + error_message = "路由寻址失败" + self.transfer_animate(False, packet, error_message) + def receive(message): + packet.message = message + self.is_packet = True + transfer() + if not self.is_packet: + PacketWindow(self.canvas, {"packet": True, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": packet.receiveObj, + "tag": packet.message, + "send": receive, + "mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]}, + packet_step=["ip", "mac"]) + else: + transfer() + + def receive(self, packet): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + def receive(message): + print(message) + self.is_un_packet = True + packet.message = message + sleep(1.5) + self.transmit(packet) + if not self.is_un_packet: + PacketWindow(self.canvas, {"packet": False, + "size": None, + "sendObj": packet.sendObj, + "receiveObj": packet.receiveObj, + "tag": packet.message, + "send": receive, + "mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]}, + packet_step=["ip", "mac"][::-1]) + else: + self.transmit(packet) + + +class SimSwitch(SimBase): + # todo: 交换机类 + """ + 交换机类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 3 + self.ObjLabel = label if label is not None else self.set_default_name() + self.mac_table = {} + self.set_default_mac_table() + + def set_default_mac_table(self): + """ + 将数据库中的路由表信息提取 + :return: + """ + sql = f"select * from mac_table where obj_id='{self.ObjID}'" + router_tables = search(sql) + for index, router_table in router_tables.iterrows(): + if router_table["node_ifs"] in self.mac_table: + self.mac_table[router_table["node_ifs"]].append(router_table["mac"]) + else: + self.mac_table[router_table["node_ifs"]] = [router_table["mac"]] + + def transmit(self, packet: SimPacket): + """ + 转发数据包 + :return: + """ + flag = False + next_hub_ifs = None + for conn in self.connections: + if isinstance(conn, AllSimConnect): + if conn.ConfigCorrect == 0: + continue + ifs = self.connections.index(conn) + 1 + if packet.destination_mac in self.mac_table.get(ifs, []): + flag = True + next_hub_ifs = ifs + if flag: + conn = self.connections[next_hub_ifs - 1] + packet.up_jump = conn + conn.transfer(self, packet) + return + for conn in self.connections: # 将数据包往所有接口进行转发 + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.ConfigCorrect == 0: + continue + new_packet = SimPacket(packet.source_ip, + packet.source_mac, + packet.destination_ip, + packet.destination_mac, + packet.message) + new_packet.up_jump = conn + threading.Thread(target=conn.transfer, args=(self, new_packet)).start() + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + print(f"交换机{self.ObjLabel}接受到数据{packet.message}") + self.transmit(packet) + + +class SimHub(SimBase): + """ + 集线器类 + """ + + def __init__(self, canvas, x, y, id=None, config=None, label=None, *args): + self.ObjID = str(uuid4()) if id is None else id + super().__init__(canvas, x, y, self.ObjID, config, label) + self.ObjType = 4 + self.ObjLabel = label if label is not None else self.set_default_name() + + def transmit(self, packet: SimPacket): + """ + 集线器转发数据包 + :return: + """ + for conn in self.connections: # 将数据包往所有接口进行转发 + if isinstance(conn, AllSimConnect): + if conn == packet.up_jump: + continue + if conn.ConfigCorrect == 0: + continue + new_packet = SimPacket(packet.source_ip, + packet.source_mac, + packet.destination_ip, + packet.destination_mac, + packet.message) + new_packet.up_jump = conn + threading.Thread(target=conn.transfer, args=(self, new_packet)).start() + + def receive(self, packet: SimPacket): + """ + 接收数据 + :param packet: 数据包 + :return: + """ + print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!") + self.transmit(packet) diff --git a/SimUtil.py b/SimUtil.py new file mode 100644 index 0000000..3b3c268 --- /dev/null +++ b/SimUtil.py @@ -0,0 +1,267 @@ +import ipaddress +import re +import sqlite3 +from tkinter import messagebox + +import ttkbootstrap as tk +from ttkbootstrap import * +from ttkbootstrap import ttk +import pandas as pd + +from SimObjs import SimRouter + + +def round_rectangle(cv, x1, y1, x2, y2, radius=30, **kwargs): + """ + 绘制圆角矩形 + :param cv: canvas对象 + :param radius: 圆角值 + :return: + """ + points = [x1 + radius, y1, + x1 + radius, y1, + x2 - radius, y1, + x2 - radius, y1, + x2, y1, + x2, y1 + radius, + x2, y1 + radius, + x2, y2 - radius, + x2, y2 - radius, + x2, y2, + x2 - radius, y2, + x2 - radius, y2, + x1 + radius, y2, + x1 + radius, y2, + x1, y2, + x1, y2 - radius, + x1, y2 - radius, + x1, y1 + radius, + x1, y1 + radius, + x1, y1] + + return cv.create_polygon(points, **kwargs, smooth=True) + +def validate_ip_address(ip_address): + """ + 匹配ip地址格式是否规范 + :param ip_address: IP地址 + :return: Boolean + """ + # 定义IP地址的正则表达式模式 + pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$' + pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$' + # 使用re模块进行匹配 + match_with_subnet = re.match(pattern_with_subnet, ip_address) + match_without_subnet = re.match(pattern_without_subnet, ip_address) + if match_with_subnet: + # 带有子网掩码的IP地址 + # 检查每个组件的取值范围是否在 0-255 之间 + for group in match_with_subnet.groups()[:4]: + if not (0 <= int(group) <= 255): + return False + # 检查子网掩码的取值范围是否在 0-32 之间 + subnet_mask = int(match_with_subnet.groups()[4]) + if not (0 <= subnet_mask <= 32): + return False + return True + elif match_without_subnet: + # 不带子网掩码的IP地址 + # 检查每个组件的取值范围是否在 0-255 之间 + for group in match_without_subnet.groups(): + if not (0 <= int(group) <= 255): + return False + return True + else: + # IP地址格式不正确 + return False + + +class ExportUtil(): + def __init__(self, path): + self.conn = sqlite3.connect('./network.db') + self.path = path + + def get_table_names(self): + cursor = self.conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") # 如果你使用SQLite数据库 + tables = cursor.fetchall() + cursor.close() + return [table[0] for table in tables] + + def export(self): + tables = self.get_table_names() + with pd.ExcelWriter(self.path, engine='openpyxl') as writer: + for table in tables: + table_name = table + # a. 从数据库中获取表的数据并存储在DataFrame中 + query = f"SELECT * FROM {table_name}" + df = pd.read_sql(query, self.conn) + # b. 使用Pandas将数据写入Excel文件的不同sheet中 + df.to_excel(writer, sheet_name=table_name, index=False) + + def import_data(self): + excel_file = pd.ExcelFile(self.path) + sheet_names = excel_file.sheet_names + cursor = self.conn.cursor() + for sheet_name in sheet_names: + # 4. 使用 Pandas 读取工作表数据 + df = pd.read_excel(excel_file, sheet_name=sheet_name) + # 5. 获取工作表的列名 + columns = df.columns.tolist() + # 6. 构造插入语句 + columns_str = ', '.join(columns) + placeholders = ', '.join(['?' for _ in range(len(columns))]) + sql = f"INSERT INTO {sheet_name} ({columns_str}) VALUES ({placeholders})" + # 7. 将数据插入数据库 + for index, row in df.iterrows(): + # 8. 使用动态生成的 SQL 语句将数据插入数据库 + cursor.execute(sql, tuple(row)) + self.conn.commit() + + +class RouterConfigWindow(tk.Toplevel): + def __init__(self, parent, router_obj): + super().__init__(parent) + self.geometry("435x433+350+200") + self.title(f"{router_obj.ObjLabel}路由表配置") + self.router_obj = router_obj + self.interface_entries = [] + self.router_table = {} + self.create_interface_inputs() + self.create_router_table() + + def create_interface_inputs(self): + label_text = ["接口1", "接口2", "接口3", "接口4"] + for i in range(4): + label = tk.Label(self, text=label_text[i], font=("黑体", 16)) + label.grid(row=i, column=0, padx=10, pady=5, sticky="w") + entry = tk.Entry(self, width=20, font=("黑体", 16),) + entry.grid(row=i, column=1, padx=10, pady=5, sticky="w") + self.interface_entries.append(entry) + button = tk.Button(self, text="添加", font=("黑体", 16), command=lambda index=i: self.add_router_entry(index)) + button.grid(row=i, column=2, padx=10, pady=5) + lab = LabelFrame(self, text="示例") + lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20) + Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11", font=("黑体", 16)).pack() + + def create_router_table(self): + def on_right_click(event): + row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 + if row: + self.router_treeview.selection_set(row) # 选中该行 + delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 + + def delete_row(): + selected_items = self.router_treeview.selection() # 获取选中的行 + for item in selected_items: + ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] + self.router_obj.delete_config(ifs, network) + self.router_treeview.delete(item) + self.router_table_frame = tk.Frame(self) + self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) + style = ttk.Style() + style.configure("Custom.Treeview.Heading", font=("宋体", 15)) + style.configure("Custom.Treeview", rowheight=30, font=("宋体", 15)) + self.router_treeview = ttk.Treeview(self.router_table_frame, style="Custom.Treeview", columns=("Interface", "Route"), show="headings") + self.router_treeview.heading("Interface", text="接口") + self.router_treeview.heading("Route", text="网段") + self.router_treeview.pack(side="left", fill="both") + scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) + scrollbar.pack(side="right", fill="y") + self.router_treeview.configure(yscrollcommand=scrollbar.set) + self.router_table = self.router_obj.router_table + self.router_treeview.bind("", on_right_click) + # 创建删除菜单 + delete_menu = tk.Menu(self.master, tearoff=False) + delete_menu.add_command(label="删除", command=delete_row) + self.update_router_table() + + def add_router_entry(self, index): + entry_text = self.interface_entries[index].get() + try: + ipaddress.ip_network(entry_text) + if isinstance(self.router_obj, SimRouter): + if not validate_ip_address(entry_text): + messagebox.showerror("注意", message="添加的网段信息格式不合格") + self.interface_entries[index].delete(0, tk.END) + self.focus_set() + return + if entry_text: + if index + 1 in self.router_table: + self.router_table[index + 1].append(entry_text) + else: + self.router_table[index + 1] = [entry_text] + self.interface_entries[index].delete(0, tk.END) + self.router_obj.add_config(entry_text, index + 1) + self.update_router_table() + self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!") + except: + messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12") + return + + def update_router_table(self): + self.router_treeview.delete(*self.router_treeview.get_children()) + for i, entrys in self.router_table.items(): + for entry in entrys: + self.router_treeview.insert("", "end", values=(f"接口{i}", entry)) + + +class SwitchConfigWindow(RouterConfigWindow): + def __init__(self, parent, router_obj): + super().__init__(parent, router_obj) + self.geometry("435x433+350+200") + self.title(f"{router_obj.ObjLabel}交换表配置") + self.router_obj = router_obj + self.interface_entries = [] + self.router_table = {} + self.create_interface_inputs() + self.create_router_table() + + def create_router_table(self): + def on_right_click(event): + row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 + if row: + self.router_treeview.selection_set(row) # 选中该行 + delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 + + def delete_row(): + selected_items = self.router_treeview.selection() # 获取选中的行 + for item in selected_items: + ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] + self.router_obj.delete_config(ifs, network) + self.router_treeview.delete(item) + + self.router_table_frame = tk.Frame(self) + self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) + self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings") + self.router_treeview.heading("Interface", text="接口") + self.router_treeview.heading("Route", text="mac") + self.router_treeview.pack(side="left", fill="both") + scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) + scrollbar.pack(side="right", fill="y") + self.router_treeview.configure(yscrollcommand=scrollbar.set) + self.router_table = self.router_obj.mac_table + self.router_treeview.bind("", on_right_click) + # 创建删除菜单 + delete_menu = tk.Menu(self.master, tearoff=False) + delete_menu.add_command(label="删除", command=delete_row) + self.update_router_table() + + + def add_router_entry(self, index): + entry_text = self.interface_entries[index].get() + if isinstance(self.router_obj, SimRouter): + if not validate_ip_address(entry_text): + messagebox.showerror("注意", message="添加的网段信息格式不合格") + self.interface_entries[index].delete(0, tk.END) + self.focus_set() + return + if entry_text: + if index + 1 in self.router_table: + self.router_table[index + 1].append(entry_text) + else: + self.router_table[index + 1] = [entry_text] + self.interface_entries[index].delete(0, tk.END) + self.router_obj.add_config(entry_text, index + 1) + self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!") + self.update_router_table() \ No newline at end of file diff --git a/database.xlsx b/database.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..1f560c62cb9906c53c0918cd7e1e404bd8a5277d GIT binary patch literal 9289 zcmZ{K1yEes)^+2qjRa3{Y1|1AB)Gc=ZQLzL&>+Fx-2+LmpuyeU9fG?{(7%)UUd~5bW zCN~>vxkx#bb{5RHPtvmk*vn4MQrW zBS&azfU7~SlEFDnImwX^Tu#L$d!y3Q`W5P6%s3l07q4}apOv(z`6G%!5EgY-3yQhM zPvr+-gR^f&tct6ZC2^03b&~F!aO=Xv`aq@arU*3&YNGy}6rcEF`T54o)sgpIt7=DD z4R)^Ouf!GU;mvBdEIpqay?_UUrQrwT_3znxt226^ng6*qDZ1$tnD78VIt>5-gshF5 zHM6rh*arO9o%Q$X9BS)0&GX=RAD4}~nO)hSbaVODQ~{kVTolfRK>;;{sp~&go&B=zx}RSb1dr_Uk+AvMvJh#$^cl7* z+K|`&w5HnL*=lL_M%}7sY+TaUzt1Zk?uXP>-UKcp8`^8W49#KJ4_BRsS>gTj&Kx#` ztLvw3JT(L3VfMtZt4a--?HL!=E&0MoRe2VpCbL)br7?vi5h@B3*Iq08Y3Aw35jqMJ zR_~W4B15{rjk=*VpDF}*l!kPF8Fk*bpr&~!waVYcGZztG-}k`At=<%xcnQ$7cby&_ zhe>=#ck2>QI7ia3B&>U)dBhun_1N77J8dyekjNqKm=O(!4&d$_lU4jsCR`J`+8#Tu z8$CTjSFK2hnA+=iDdGRBYq9L%GO3|2pWP$Fi*MK}-pAq#RMBBW+DP2J)6Pa#QQVB2 z$gq7Q2MLZVE56wuvyEWpP`b&bw09 z`56u-T4N2zq23j*`fVQhY6Eb{SWCdbqxPbf#* zb^O}xawGi7I|p0um8@a2Jv%mbx80^xo_EZjNYeA^Ecew>PEhhlhiI_7r+%_oLboCZ zj*azKN|1Q1LEX$L2xryAfH524W0?~4nX;Mzt4=ozLv-#3*>JHP`|TuM{q?cf^@T!k zwQU*QdjcDBW!`p+WT4rX{$+4Qn;OZ8n$B5yv!y^4uKdWO*0F@? z+;akz=4}S4PBoquG4>klq!*3rTRjf)Ds|YdH5kxr^$onm9$;3p!m|}dR3wnjCoSxs za(ahT+iB^g9^|gKN$=YX&K~UhBrrB#Z|h&^iI#d_Ev?_$#=Vx5O?U6Yf^rsvrbr-$ z$HQmaP&|r_h6h3o#rM5Ef%Z2kB=$|dMQJ)it*65s{wB!lBL?~yB>!%l3L7W192M>r zzT^7wah&D*@R?hXpG|-&o5>V@zi3FC0K22;tSZ%c={z+`;KEPUp$5WcY6F=L58Dhz znx}<#lLA%v0^;X+DXXO4gPqbm>e|J=K50&q{$g94p5OrB&1qlV7w8Oc=1<$RK<=9`5{H1jJo4&oTNcdRKjTzMPv$oz|s z$7tp4osnyYVJ%Sq3?T7|$(aOmj40d*6{`2)Gie#iL*e>uu zVrH+sbug%qu4b;<>TNGZEbEuwMUwrnRgt6CIbT)x@Eo&V+umIe1_U(rqs*NE{~4)A?nO*JC#v<^tPP{nHI@PhM&V zG22~P{G!pd7Mk~)i32<;nmsFJagle)N_9T>Tz4PdpRO5MHo0gFos9$@k}~x;@>dS; z_kgeS+?!p>@Kvu+?sH!Yq}2?ae~!TCOG|Yt4{M^pYHre$kbSR5B#!?|mz#aa<6)*< zo<>ho>fYSwU0~KkyNfXisF6=>r`GpOM!S9%kh))GT7dR4H46Zqs3LjUBqdW_eVAj3 z%{E>XcxhqS3;VuyzXc(@H6Ce{ZR@dj2Jo|=A7y13*bfh7SqAxTl0r6NFN=|f#}C^mujQ>g}%LO$iO zN^Djfwya6MMm1<}kjU`6{{)-LkL;9+d0xHSSbB^CN`-vbumo8o>f77hP!SD|H+ZO{ zYZWk=Vk6&M-QeS6Xg<0bafl&uP;uN`YMT~j=LwOu%%|7{mZ4w}_CR0!Vse_nIsx0X^t$p_xTI!8FwU9eNtuYx1B3CKv_0BPlbzEh@%# z^)Bt)>_lfOY~0{3kF~yN>fmDNQDSSy9)BoWepi6%mCz=F5DFqKHD9gJCh6!si6UrP z$4`WyrTcA#P!2;X847&n$t;w@)Vb>RN$1;4%$#~d0w8!6-$BWg6PI44UUk{ZLPkY| zUoBUtLi2TIj$n>sL-$)h8kqg_{}WG{?cfPXzUr-=yJ}ZhdqaDQ&_w^&UO8HI4IHh~ zudFo}{Y2$&^=Sf9a>w30{G&+N^Sw zb~A5hNOhDjud3w&TqanlQuRy6&~vNNF%&W&q)k7WMGLC!H=KLQ&Phr3Uv3dau5oRPV*w2V#*VEmb=f7~v$F3Vj~=cFBBu3u7Ui+= zm=PFBvu8Ve{QL4g4&))FB>}>UrI|Q^A|_0=IeThu{?pTQ;Zw?vYQR{Tx;+(fjEJl8 z!ag|Ni{$>^5+?e#l5{nf8;y_QtenahWK5@%sbu%$Vrc6&mNUur;cq2LYdF9=(t&tf zUj)`uD#oSS-Q7~#ee#luCn*IeMBhgwgO03pH#NbkV3DBJ+nz~DTfukh#eD>WG-69; zY`n>P2SHWGZ^EwoqzFf6qlF}*`i(@P83iB`XR+nPHC-#6f)_NFmYJs)#>RhTzrrWy zmz__#)?lX5`##sCT%1;FrQXniQ1s1e9lyQKF7@b;vRQC{g?#uC=AZ65_Ob3l6$$_d zfYd_&!(FrePWru+^<+9&a2^<_-Wco#6o=QW(&u8d&nl8h%rDnCNZHCAeSP$dvnD~V zELtveI^wPE6R+}|fl)jwNO0iY4+)~$E1PA;`8#hKGSS>64sv5J{8$^_4p>ts>y%xFg7XE< z7$x3(`qEGmI6Vrc3|st~LEb+O*?}y!h}z5o6BgML8MWOEZtokxD7$$UIGaP%Qo>*W09#05 z{XaCp{#z5@l69i;xNv-*q&r`?ac$K|le-j^%m+*7=~a8_5`0J>g-iE08-KhbWP@>z zUw@felpHql%bMhlVp-4muIV*Y^3aOQl(Xlt$>1*)Oy@2n%Q2fv0>ZnuyES;-?`nFo zU)R3e7%eV^7MCQR%(ZUpu&ujZBz3**QU$||92R;OTa7)Q6VQ9jc}{w^oEGqB%KA|7 z91Q_>-6W&J;l)VVvyU6n^&x@>}wuQEh+>bPR4@%Nb^Pa`EG1;9m z;O3oykrS?LbJ%;O7Y?)4x0M!mex$BqD=!=|*bP^h(!5ifQ>O(PZTOr9mX-c}FCkd{bSD$W%DWBq2WJNPBWc1zq(Hi@OGO(kdrx(v5hjQI2o3M%>HV z@8K&0K1qsksMK~lN6aJ*@B1D4&+u|4hQ%^2u~kw*t)uVQ2cVOolJI3$=)!P-ud{z4 z4w0b_k&J4V?#r$OMvCtsBE<`37Ku@R5ca*{|Cy7;wvg)}OGy#kiL~e+X*#-wU=EtW zy(M+xXY0W*jWip)fMHME*Cx46j*H=Sw|AgKHwDeGZ8@9B$MA$kzwLxO*TO~`NPp`s zA2&GC&Y>cm&qklCGgzDl5dcNQkW0i2BWRcy7_q6%H5M=H{|ckf8^w)?XsE_MtB{@D zC{!CwUT;;iCUAi?qe_xh;~TsglYJ4vaT0gNq$I2Ux21>AGLhyr1qE!oF-F7MXh+zT zsWukHvZ47=u9OkFtMW2Tv0)v>|qoJ!Q>D8pZ!h5XBu`wy=x;wdS(+w=$Lbk}Ky?3l#&at56*U~9P{|4oT+;J(dNXLE)GG-tsjDwy*@J%X;RJPa za7<|mA!4%2ox_{vNo(w5iTehRp|@&lx4fBI)yER^ZEK6^=&4wa%OcK@TnCM`zIAE7 zaX>&T&X82c%Luz6k=jzugE3xbwC$k0QJ6LGHNoMf{l16cg*%U2 ze>Lk5(o`Bu?DBoeo7p+QIx*C_m2zzmsJQ*Oec}#; z7iT%Tkur$JFdot5b$B%Ow^KVpC?wBTq>qE%AAze8D(=hQzwwgU(<#Ifj69xeiQKAA z0VbAjUf>t2rrj~De97)MD?$G-+cuwaB0ug>O$t|Q}s(khUlhEj4AYNj*Jy5wQ zxQpM=0!ijtjz%71z`ea6Tb;~fR&M3OGPArUpgIkcsC~mqZv@!B`jkabH`3x^0*lIQ&!=x5KuAYWfkaAE{5d*e{x#*MDCWFwr6=FzmJy96^)XZ^(V z_{^oVa^%lXLR`8F+W)i&&Oa8BWD}Rhh1t^*+H;HxyTiJEE_>V-H&_*y*1%<>FKrH< zFjzM_TKsSa5+sA7pDT;jylVMEHZHAAyuY6VrnxxIavLo6S?Ev^6p+uQI&$kX$sd!ZZRUDgd~rmG8agR_+(IUplCplS+bs4agTe`!1I+miAW>#bWr6Fh;hFT+C$ zf2hTcqcgMeW7zCHdy$^FqVSk2HauCGJ%ZX4ogwK{rr5UE;IPB2w}yTOeALH^Ex>!1 zOS9eip&{KNGxSFH;`yT68>C^FpI-jgg`5LoDr9DI+@qVhJA_5EG}t#T_4_OWwP@Nh zs54A0vp#|=%&~aqJMSpF<_@n)7LHf0G3^^0_HMpZD8lg{mw7zmo_N&lUA}L%a+%vN zUH;|hrNNE%I8@>3@9WvGrNA3T>yRT$T|gP#;LkF|W<{)t)8Y4k|C9(bcpmJ>Ovu) zcurm35wU0yZ$A+bo^r1UPLOtLF;YNI!WX1mHJ&Ie*lAINn9eF+S)@duW&A5ya6E0X z-^O@IyGkYjvk2xa9Abqg;wuc{7nT*yd_eJ-x^yALZC3wQPelIMvabNmOH#h9!tZTO%OA8va{IPIrQ|4$~=ZOX;4ks0f` z2I721UeuCnIJER{>8nwcF!5;Zlbm2=()eqE~YUZM- zT^_wWi3kaX47mP3!u@gBss@%zP#_RYk#aWlY7@c0Y;jC#7pH=`gaHQNNeV<@b#M-+*+fcqnW*IM2n<0){uW(= zI1WJ{We=K&q&!F+I@hV7iM#)lyeigi#NQBo%p8$ilSIN&S}yUiqc=UY&sr;D2n+9H zhs6Ob^^5b??AIWkz`M7846rXK>RrXw>Sme|OT)e!1c-qMD-oxAV^U==(ek_uh;<{brkWt!l3@s2meVdrRoA8^Ow$ZUu7(!bP%ia|fYAD;k zYMRhvy-k}wL-=Q!y?Uf5Q4C43@lXH&h%@-zR(EoCw+1`?-u_GK3!w`vm=CyX7@y;k zQ|mhYxG-s}z=qd3WzwJ?Xe~!DFXi23Mhx+GWop5pF_4RNXe@<2m0(Tp^5|=)6Geb= zk@{}4EaCPK1&jG(qOr5bkGlcz`CK)d=!sX+R~fjCdohzG&a7#h76S$H(+mPt3)7d@ zcJPP&CW-m`YBm>B-zEKFYR=TlDKx;9YtlzwFFf5Drg-b{cV>yeAgAb1+Bja1+Zn(} z2l98~^(tkeuwf3`fR&}X2l$gHQZC7YBX*b`k12=kfGHhKx5)R+ezWE+>8~XZ4Su;d z-z4iIJwlPcHfAt9Hu1M=QqFZ1DnA=tok(F48|9twSCaB(NxdUx3fG}N|Hc5E5!z)o zB+bA)AH#(uZHntO7uxd7nPfk@)l%3UdG(+&B$CylK5G;-)tGF*HHK=~4!yo2j#6RQ z&h$tJ+TR$fV}?nWxVtQC<2f}EV^r|=DxvYBzfJ#y?!D}?cdym=(N>J-jtr$#j%UA2H4HLe*5_ow#fvq$SGgrlkQALfxrT@g0P)$8K10!h=rsKJNn7cxDVU)IVz{vrZh!@4x!t` z=A-MX)L0I=CSIyo-$X7+fe%T~RrJJenK5~lhtAF~2Xj^A$VFqSa0jERM^67fj011L zKR2)90QaN|B(Vd-0st6)ZQkE4O(Q!ytKZwVI&KA08e{f6CB(jIwRIxmIe3TT@{*q3 zlf=S9Pa#fMimm8wzo|+UMdY-AS?<_#?W|ruNy71%cg*w5qUti`v&d%+i#;_s@muzi zw#p((GZTY}Llynk(Sc>OD zD1ie=2%xcFql@N0WlZk@&5PQ>LfoHW>Z}M>r^j-{8=?$(!m5(QRf3PK!|EKL=qM7X z=vSsA=dRjLT!#rLNMzD1MRs&GWFLlG3a z>JHp&U<_LW%($4)uY(pYf_Fan?tIWCkD}%pyfWW{;W$6Fp!GQyNbLd)<))n2enp`j z6FF^-;oomG+Uuh_H4n1q(wAj+OF-AwuedTlmcm;~bS7$kg#V|80(m7fdLWwNhiHiK zFAe==jsLBuKbF}QFNf~Wf+=1m;sFqcVr6aZN7Gq{OXX8+S4bOvae9>t$__!{4pKXp zo49To;%G3Ssb_`R?N(;FS`w-s9lU}Sk~U_h5vEo3WA;uq8Sm{5|yDN?miusV&Z zWMXA%@1=boXkJpHVR4{dcs?QEUFw$A#h?)G3Oz28PU6fdU$*^xm{(o_4Brr}XA z+V7!C%VB(8Rvi&?>W4OsJ$NpM(yKrNCaG=kT z!#gM4%ZcgqSYmHpaJ3IKCK6$W{kj+47{=C={yg^qNdH;>02 zP)8k4=D50_%0c(Q`t)NgF(RFYE3YP96ubPVE-ewFUnKUQJr{%#OMJ~4VVa~nv~tIw zEy|!K3`5vS^|VTek(2DulJa+8dZ8HrY$SWmFiWpzuot3rh8RWJV1o0eP1+Co9&0RH#!2FL>c`S?Os{Qp1f@Era8u)$wg z0KgZj{J+uvIePFM{(K()H+&HC_y01Te@^gx9`hf98b36|KLr0eoq5jkyiNEwOCuzm zfr#;(=Pu`O9zU{wbwG+TaFAmqzq4O7fI6fmUV8ca>Hh)oPkkN$ literal 0 HcmV?d00001 diff --git a/dbUtil.py b/dbUtil.py new file mode 100644 index 0000000..f2e20ef --- /dev/null +++ b/dbUtil.py @@ -0,0 +1,104 @@ +import sqlite3 +import sys + +import pandas as pd +from pandas import DataFrame + +conn = sqlite3.connect(sys.path[0]+"/network.db") + +def execute_sql(sql): + """ + 执行sql语句 + :param sql: + :return: + """ + cursor = conn.cursor() + cursor.execute(sql) + conn.commit() + + +def search(sql) -> DataFrame: + return pd.read_sql(sql, conn) + + +def delete_obj(obj_id): + cursor = conn.cursor() + delete_obj_sql = f"delete from sim_objs where ObjID='{obj_id}'" + cursor.execute(delete_obj_sql) + delete_conn_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{obj_id}')" + cursor.execute(delete_conn_sql) + conn.commit() + + +def truncate_db(): + init_database() + +def init_database(): + cursor = conn.cursor() + cursor.execute(""" + DROP TABLE IF EXISTS `conn_config`; + """) + cursor.execute(""" + CREATE TABLE `conn_config` ( + `conn_id` varchar(55) NULL DEFAULT NULL, + `node_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `ip` varchar(55) NULL DEFAULT NULL, + `mac` varchar(128) NULL DEFAULT NULL, + `conn_port` varchar(32) NULL DEFAULT NULL, + `addr` varchar(255) NULL DEFAULT NULL, + CONSTRAINT `conn_config_sim_conn_conn_id_fk` FOREIGN KEY (`conn_id`) REFERENCES `sim_conn` (`conn_id`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + """) + cursor.execute(""" + DROP TABLE IF EXISTS `mac_table`; + """) + cursor.execute(""" + CREATE TABLE `mac_table` ( + `obj_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `mac` varchar(55) NULL DEFAULT NULL, + CONSTRAINT `mac_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + """) + cursor.execute(""" + DROP TABLE IF EXISTS `router_table`; + """) + cursor.execute(""" + CREATE TABLE `router_table` ( + `obj_id` varchar(55) NULL DEFAULT NULL, + `node_ifs` int(0) NULL DEFAULT NULL, + `segment` varchar(55) NULL DEFAULT NULL, + CONSTRAINT `router_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE +) ; + + """) + cursor.execute(""" + DROP TABLE IF EXISTS `sim_conn`; + """) + cursor.execute(""" + CREATE TABLE `sim_conn` ( + `conn_id` varchar(255) NOT NULL , + `ConfigCorrect` int(0) NULL DEFAULT NULL , + PRIMARY KEY (`conn_id`) +) ; + + """) + cursor.execute(""" + DROP TABLE IF EXISTS `sim_objs`; + """) + cursor.execute(""" + CREATE TABLE `sim_objs` ( + `ObjID` varchar(50) NOT NULL, + `ObjType` int(0) NULL DEFAULT NULL, + `ObjLabel` varchar(20) NULL DEFAULT NULL, + `ObjX` int(0) NULL DEFAULT NULL, + `ObjY` int(0) NULL DEFAULT NULL, + `ConfigCorrect` int(0) NULL DEFAULT NULL, + PRIMARY KEY (`ObjID`) +) ; + """) + conn.commit() + +if __name__ == '__main__': + init_database() diff --git a/network.db b/network.db new file mode 100644 index 0000000000000000000000000000000000000000..2b5d9a04f6d34077c3c8698d65578381828c6cd6 GIT binary patch literal 40960 zcmeI5dyE}b8Nl!ReeblCvQ{j6OD*kEW;^H1nVBy&d$-%tv$u$1 z&fVVLnLG1+=R3dio#)J*Z||P**#)oe&&?mU7V2tZMIxC@Y^~Q5iTr^?B2j{O5#A|y zKLPIyyyI6g-Z=Ls=ooa8)$;9$d~tK4majZpK3@8Kd24ZV$v?N0I3frL0)l`bAP5Ko zg24X>ffMw7`M$MllP8-CmVKLdZ1(Wob$~Ejx#2>UUc6&LL}llco&R>l2gv>JzQ;@vEW}_8mX!&D3X)ENp_K>_B~ZWOS%C zzWA(h%l2;TXu^S}bDjL|^Xt3mJf}40j`*_&n{)H?p0n`!=kA``xoc?Zruy|GH`O;q zSstjD2DZFE*VkN|Ow1l}y}OUyej5xqr>#2{=7Rm{7}Ye2w+bJ;;=KXu6a8)0&fJkB z>^S?(E@jQWj#;zq=~=gnk*0Owt_QrEeAx<4UYxI3eUP(z&zG~{>>y`sD+kV8U=Dp^ z{GNQ@h7HM+`-2?L&)u=$%}+1c&MeMp_i1r%!3h^-Hq7f;9pcEG>rK!4$6mh_XO4LX z4|_+TtlQoi;5(bj=H$fQeN#g_C-&86x{d1;YT)?O{;f0h(aEWio!ciO?}*GEs85ZI zj!cbAG)MM!*P4#|Oin;G7$1SE)g0Q}92y>}!#Azn!>qDJ-;@SUTzx#>*WaH!xjjI5 z*m6Ra?Tz$eknQjPa}*(o*|N_yUN&+E6dM|ptHEGig;DQg}W8Y=R(@RhRAqbl=jG3quPlA5bQf#3#7htm1Ox#=KoAfF1OY)n5a@xx$VG+zR&rfm zBC&VJq$cm3np9O^L$-=ESu-t7CK}dc({nx9r--aL-tN)D|A zHN|yXR+?=!hG#1V)v;`9I+BTHxU%hIU3OgRQAO90VY-193|f%tG&HZ_;f7;29NbV< zs&o!EkV28tn6M6es;BxZSX%vCRvOG&84^UAPAzO0hU}@XBNLNg*>*iw#x|w8X)B6K zNvKX4)EwHdNyC6daKnawcAOL(Tb$B})P@tM!~-YYhJzU#^-!%1XNoc2F^N6jC$-m?72-yJ5KvjW%?r;kpeIMHUAK&oH^#WpbF7PKfUj zk0s+$nW&B>TOJ{@YPyc&YQE_^UMe(Mg^~c12_CL6HT|Jm5at%dR}{n29V}x*vp|=j z$u^XsY^v1seT%x9qlId~WR`J^{W8mVP*OW4LoJr=U`KbsAvCOjF;G&btzjA2y6IV# zXBvJ~QY_zE!&Dl~HE07{4bO>73XYwT?b70GbGmVqZRUWir0{T&864Dxprk+z54|=# zg~3D3LXd3+Z)y^A!wVfWGzFvKJj+o~0TDTXbi>xzpN&(IAo z^1ZNLA~Kxo%1~emSM_c?Fw2mNhO=kdqVm`h(G9%;sal5F!rCanf*2OeqeV}%#21M< z)HiKPY}t39RUtaGRi;G{G%c3rSsJuork`r2SaOjILo)$s^2mi*A4DwY2>HoC32SS1 zu(k%2n7W6l4(t_+LOr11#?&x9Sv6eU@f8hQzTQ?s?1pJJG{0d$%{3Z6jcYC>u-Jpy z{2wm*i)6JYcIHKnH$(hsE1Nc*J?wbyFT z)K1nWYip{%sXku4vwB^%Pb@&Uip*d8_VlTe*_Wn5(ESRK|l}?1Ox#= zKoAfF-aG-n#&4k}wPA$=gUfm;afDqvFTCCf@G z6e6pamy{ADrCkglxD0Gxc4ii8tC$$ieFclfF5(pn@70KSXe)We!Yj!*%$vG`VdOC~ zx{4jJmzE`jLW(tJAkT&2#WHcJU{$C9C;`x6e+6(9Z`RL4W{-5g{gVVFo zUO4~HbnmI@XfK@qrz+j^e|G)9RDC5OJqJ$!w4|$Q|EPVp_K8}n)(>BbmmnYr2m*qD zARq_`0)l`bAP5Kog1}pk04e2L${r@DYycyrJhf1BgIIH>+XDzBI%=v)1(kd z@%(awrKP5Rk=#%6krdC}`AiC@ew8>ya*-6*jrJggQd5aX)NH<0WHcbH^FA(=P0o{_ zOCC@&kr2<*BteK|Qi;dZbR@*{06%tFKKUq^mx_dV9@q&&IQ?X5r<#m}cpgg%LMZ)e zvc<0dmud$SwWp<@)ecIe_LB5r>1L@cJyCn$t>@&zVnIL<5CjAPK|l}?1Ox#=KoAfF z1cCoDflcXFl4s$e#7^W{Ybdf4d6pLn>_nawg*-cvXAvP6X*}<`L6)7!vsRE{C-STh zq}ho)YXYf_=~j_v9U$@x7U+O21+dVh`Rbpldn(_k+*-M;{BrpV<(cxv(#I0NE?tvf zRs2_RzPPFI=fY!!U4=~khxrHd`}3FQ{*e1x?s#r%u8@5``%v~kc0=aX%-1vbW}2CD z`i1nvX*)fT`b+Am)CW@AQ&RH9K`zx&EQsUYyJi=hLueYa9rhl(n1xb^l))VkoXWa& zp;%Y(%h{J(#VWkm>JCIyX>CIUVnUeIqB2!gLnfXLF<>;+ktsxMfCv#9_6RnvF05-M z7t0yM_6gQ>7RUs`k~kn@D;Ud)tGYe}W>E~RWb}7|;28;G2P>MT*s`u;2(F^35Y0t% z9NBa|6GA}fx(_jF8oefHE=>>-4T9M~oD0qLAcsWvWNIQ;)*<+X3tPlP#Gt(McQC>Bxa&vw%T_tL(RQ42|WuyxSP;2*%ZkOHh|Pmd1iS zmg-k^JA>b>T$ ze`$imGc9UkDjT|Mvrt?R@yk{{SwV(kI|$lv)7gF*OEAcA@UuYHt?@Wbx;wXQ`-Zj5 zfd0PK1JHJ`5NV(YQBSvRTaz({PLe=)H88+{ux`*fdb)uS1TKmm-lgg+_z|W`JKmP6 zH`v~ecNEsg2DY}rh@fW+jY~mO)S6RNgp>(sR@-#}?&MO!dH#O&b_OkFQA!46v{*fU(c z#wuno01aZG@mg_xwaWWG##k)BxZJ{8+A}ej6U8+?be;<~E{N{eQ4br|BQB%5 z?P1%ylT1(01;F+DuphJu#>DiXbJrZ0NUFq?Et7b%Ye4g4+DLPupO5+gco1sX2cY<~ zn)_&CbM|yz8mf&;h3bRV_f#*hyi)mUtXrO*w?GzR9i3}wQ}`W8Dn>kg0+t)Xgo2C*B_{l49|052I0VR zBQaU%XAi+VhalTlVFmzyWTJ3s$Yv<)4tYj_8DqRhGmWvf;Eg+%F%iws9c=y>mSH%7 z;GAS!MY-XH7bjRtXFMaMyjS7uomkPyM3iJo@^S! z?LLmJ_k_{sITn4=I`0bm85a+wdnD 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x) + next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y) + # 移动对象 + self.canvas.move(tag, next_x - current_x, next_y - current_y) + # 如果对象还没有到达目标,继续移动 + if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y): + self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed)) + + def create_widget(self): + mode_text = "封包" if self.packet["packet"] else "解包" + num, margin_top, button_width, button_height = 1, 30, 120, 40 + for data in self.packet["mode"]: + key = list(data.keys())[0] + value = list(data.values())[0] + if value: + Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"], + font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1), + width=button_width, height=button_height) + + num += 1 + Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW) + Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW) + + def send_packet(self): + if self.step != len(self.packet_step): + messagebox.showerror("注意", "尚未完成{}!".format("封包" if self.packet["packet"] else "解包")) + return + self.packet["send"](self.message) + self.destroy() + + def create_window(self, option): + toplevel = Toplevel(self) + toplevel.title(option["title"]) + toplevel.geometry("450x220+300+300") + for entry_option in option["entry"]: + index = option["entry"].index(entry_option) + key = list(entry_option.keys())[0] + value = list(entry_option.values())[0] + Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10) + if value is None: + continue + Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10) + Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW) + Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW) + toplevel.attributes("-topmost", True) + return toplevel + + def app_packet(self): + """ + 应用层封包 + """ + if self.packet_step[self.step] != "app": + messagebox.showerror("注意", "封包顺序出错!") + return + SourceAppAddr = StringVar() + TargetAppAddr = StringVar() + def packet(): + toplevel.destroy() + source_app_addr = SourceAppAddr.get() + target_app_addr = TargetAppAddr.get() + if source_app_addr == self.sendObj.interface[0]["addr"] \ + and target_app_addr == self.receiveObj.interface[0]["addr"]: + + # 动画 + # app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app") + # self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1) + # self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1) + + self.message = target_app_addr + "&" + self.packet["tag"] + self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}], + "command": packet}) + + def app_unpack(self): + """ + 应用层解包 + """ + if self.packet_step[self.step] != "app": + messagebox.showerror("注意", "解包顺序出错!") + return + TargetAppAddr = StringVar() + def packet(): + toplevel.destroy() + target_app_addr = TargetAppAddr.get() + if target_app_addr == self.receiveObj.interface[0]["addr"]: + self.message = self.message.split("&")[1] + self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!") + toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}], + "command": packet}) + + def trans_packet(self): + """ + 传输层封包 + """ + if self.packet_step[self.step] != "trans": + messagebox.showerror("注意", "封包顺序出错!") + return + SentPort = StringVar() + RcptPort = StringVar() + SplitCount = IntVar() + + def packet(): + + sent_port = SentPort.get() + rcpt_port = RcptPort.get() + split_count = SplitCount.get() + count = split_appdata(self.packet["size"]) + print(sent_port, self.sendObj.interface[0]["conn_port"]) + if sent_port == self.sendObj.interface[0]["conn_port"] \ + and rcpt_port == self.receiveObj.interface[0]["conn_port"]\ + and split_count == count: + toplevel.destroy() + # 动画 + # self.canvas.delete(self.packet_rect) + # self.canvas.delete("app") + # self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]), + # width=self.packet_init_width + 50, height=self.packet_height, + # text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black", + # tag="packet") + # trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26", + # "white", tag="trans") + # self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1) + # self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1) + + + self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port)) + self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)]))) + self.step += 1 + else: + messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort}, + {"接收主机端口:": RcptPort}, + {"拆包数量": SplitCount}, + {"每个包约2048": None}], + "command": packet}) + + def trans_unpack(self): + """ + 传输层解包 + """ + print(self.packet_step) + print(self.step) + if self.packet_step[self.step] != "trans": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptPort = StringVar() + + def packet(): + rcpt_port = RcptPort.get() + if rcpt_port == self.receiveObj.interface[0]["conn_port"]: + toplevel.destroy() + data = AgreementUtil.parse_udp_packet(self.message) + print(data) + self.message = data[-1] + self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}], + "command": packet}) + + def ip_packet(self): + """ + 网络层封包 + """ + if self.packet_step[self.step] != "ip": + messagebox.showerror("注意", "封包顺序出错!") + return + SourceIP = StringVar() + TargetIP = StringVar() + def packet(): + source_ip = SourceIP.get() + target_ip = TargetIP.get() + if source_ip == self.sendObj.interface[0]["ip"] \ + and target_ip == self.receiveObj.interface[0]["ip"]: + toplevel.destroy() + self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip) + self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP:": SourceIP}, + {"接收主机IP:": TargetIP}], + "command": packet}) + + def ip_unpack(self): + """ + 网络层解包 + """ + print(self.packet_step) + print(self.step) + if self.packet_step[self.step] != "ip": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptIP = StringVar() + + def packet(): + rcpt_ip = RcptIP.get() + if rcpt_ip == self.receiveObj.interface[0]["ip"]: + toplevel.destroy() + data = AgreementUtil.parse_ip_packet(self.message) + print(data) + self.message = data[-1] + self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!") + + toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP:": RcptIP}], + "command": packet}) + + def mac_packet(self): + """ + 链路层封包 + """ + if self.packet_step[self.step] != "mac": + messagebox.showerror("注意", "封包顺序出错!") + return + SentMac = StringVar() + RcptMac = StringVar() + def packet(): + sent_mac = SentMac.get() + rcpt_mac = RcptMac.get() + if sent_mac == self.sendObj.interface[0]["mac"] \ + and rcpt_mac == self.receiveObj.interface[0]["mac"]: + toplevel.destroy() + self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac) + self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC:": SentMac}, + {"接收主机MAC:": RcptMac}], + "command": packet}) + + def mac_unpack(self): + """ + 链路层解包 + """ + if self.packet_step[self.step] != "mac": + messagebox.showerror("注意", "解包顺序出错!") + return + RcptMac = StringVar() + def packet(): + rcpt_mac = RcptMac.get() + if rcpt_mac == self.receiveObj.interface[0]["mac"]: + toplevel.destroy() + data = AgreementUtil.parse_ethernet_frame(self.packet["tag"]) + print(data) + self.message = data[-1] + self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message)) + self.step += 1 + else: + messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!") + toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC:": RcptMac}], + "command": packet}) + + +class AppData: + def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas): + self.obj_id = obj_id + self.id_key = id_key + self.app_pack_id = app_pack_id + self.app_pack_size = app_pack_size + self.app_pack_tag = app_pack_tag + self.source_app_addr = source_app_addr + self.target_app_addr = target_app_addr + self.app_packed_string = app_packed_string + self.timestamp = timestamp + + + def pack(self): + # 为了简化,我们打包成一个字典 + return vars(self) + + @staticmethod + def unpack(packed_data): + # 解包为AppData对象 + return AppData(**packed_data) + +class TransData: + def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp): + self.obj_id = obj_id + self.id_key = id_key + self.trans_pack_id = trans_pack_id + self.trans_seq = trans_seq + self.trans_tag = trans_tag + self.app_pack_id = app_pack_id + self.sent_port = sent_port + self.rcpt_port = rcpt_port + self.source_app_addr = source_app_addr + self.target_app_addr = target_app_addr + self.trans_packed_string = trans_packed_string + self.timestamp = timestamp + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return TransData(**packed_data) + + +class IPData: + def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string): + self.ObjID = obj_id + self.IDkey = id_key + self.IPPackID = ip_pack_id + self.TransPackID = trans_pack_id + self.SourceIP = source_ip + self.TargetIP = target_ip + self.SourceAppAddr = source_app_addr + self.TargetAppAddr = target_app_addr + self.IPPackedString = ip_packed_string + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return IPData(**packed_data) + + +class MACData: + def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string): + self.ObjID = obj_id + self.IDkey = id_key + self.MacPackID = mac_pack_id + self.IPPackID = ip_pack_id + self.SentMAC = sent_mac + self.RcptMAC = rcpt_mac + self.SourceIP = source_ip + self.TargetIP = target_ip + self.SourceAppAddr = source_app_addr + self.TargetAppAddr = target_app_addr + self.MacPackedString = mac_packed_string + + def pack(self): + return vars(self) + + @staticmethod + def unpack(packed_data): + return MACData(**packed_data) + + +def split_appdata(AppPackSize): + MTU = 2048 + return AppPackSize // MTU + (AppPackSize % MTU > 0) + + +if __name__ == '__main__': + # 假设的最大传输单元(MTU) + MTU = 2048 # bytes + + # 模拟的数据和数据大小 + SIMULATED_DATA = "hello" + SIMULATED_SIZE = 2049 # 10 MB + + # 创建应用层数据包 + app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time()) + packed_app_data = app_packet.pack() + print(packed_app_data) + diff --git a/random_mac.py b/random_mac.py new file mode 100644 index 0000000..a033311 --- /dev/null +++ b/random_mac.py @@ -0,0 +1,9 @@ +import random + +def random_mac(): + # 生成六组两位的十六进制数 + return ":".join(["%02x" % random.randint(0, 255) for _ in range(6)]) + +# 生成MAC地址 +mac_address = random_mac() +print(mac_address)