diff --git a/NetworkAnalog.py b/NetworkAnalog.py deleted file mode 100644 index 866c8a6..0000000 --- a/NetworkAnalog.py +++ /dev/null @@ -1,954 +0,0 @@ -import ipaddress -import sys -import threading -import ttkbootstrap as tk -from ttkbootstrap import * -from ttkbootstrap import ttk -from tkinter import messagebox -import re -from PIL import ImageTk, Image -import platform - -from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase -from dbUtil import search, execute_sql, delete_obj, truncate_db - - -def validate_ip_address(ip_address): - """ - 匹配ip地址格式是否规范 - :param ip_address: IP地址 - :return: Boolean - """ - # 定义IP地址的正则表达式模式 - pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$' - pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$' - # 使用re模块进行匹配 - match_with_subnet = re.match(pattern_with_subnet, ip_address) - match_without_subnet = re.match(pattern_without_subnet, ip_address) - if match_with_subnet: - # 带有子网掩码的IP地址 - # 检查每个组件的取值范围是否在 0-255 之间 - for group in match_with_subnet.groups()[:4]: - if not (0 <= int(group) <= 255): - return False - # 检查子网掩码的取值范围是否在 0-32 之间 - subnet_mask = int(match_with_subnet.groups()[4]) - if not (0 <= subnet_mask <= 32): - return False - return True - elif match_without_subnet: - # 不带子网掩码的IP地址 - # 检查每个组件的取值范围是否在 0-255 之间 - for group in match_without_subnet.groups(): - if not (0 <= int(group) <= 255): - return False - return True - else: - # IP地址格式不正确 - return False - - -class RouterConfigWindow(tk.Toplevel): - def __init__(self, parent, router_obj): - super().__init__(parent) - self.geometry("435x433+350+200") - self.title(f"{router_obj.ObjLabel}路由表配置") - self.router_obj = router_obj - self.interface_entries = [] - self.router_table = {} - self.create_interface_inputs() - self.create_router_table() - - def create_interface_inputs(self): - label_text = ["接口1", "接口2", "接口3", "接口4"] - for i in range(4): - label = tk.Label(self, text=label_text[i]) - label.grid(row=i, column=0, padx=10, pady=5, sticky="w") - entry = tk.Entry(self, width=20) - entry.grid(row=i, column=1, padx=10, pady=5, sticky="w") - self.interface_entries.append(entry) - button = tk.Button(self, text="添加", command=lambda index=i: self.add_router_entry(index)) - button.grid(row=i, column=2, padx=10, pady=5) - lab = LabelFrame(self, text="示例") - lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20) - Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11").pack() - - def create_router_table(self): - def on_right_click(event): - row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 - if row: - self.router_treeview.selection_set(row) # 选中该行 - delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 - - def delete_row(): - selected_items = self.router_treeview.selection() # 获取选中的行 - for item in selected_items: - ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] - self.router_obj.delete_config(ifs, network) - self.router_treeview.delete(item) - self.router_table_frame = tk.Frame(self) - self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) - self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings") - self.router_treeview.heading("Interface", text="接口") - self.router_treeview.heading("Route", text="网段") - self.router_treeview.pack(side="left", fill="both") - scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) - scrollbar.pack(side="right", fill="y") - self.router_treeview.configure(yscrollcommand=scrollbar.set) - self.router_table = self.router_obj.router_table - self.router_treeview.bind("", on_right_click) - # 创建删除菜单 - delete_menu = tk.Menu(root, tearoff=False) - delete_menu.add_command(label="删除", command=delete_row) - self.update_router_table() - - def add_router_entry(self, index): - entry_text = self.interface_entries[index].get() - try: - ipaddress.ip_network(entry_text) - if isinstance(self.router_obj, SimRouter): - if not validate_ip_address(entry_text): - messagebox.showerror("注意", message="添加的网段信息格式不合格") - self.interface_entries[index].delete(0, tk.END) - self.focus_set() - return - if entry_text: - if index + 1 in self.router_table: - self.router_table[index + 1].append(entry_text) - else: - self.router_table[index + 1] = [entry_text] - self.interface_entries[index].delete(0, tk.END) - self.router_obj.add_config(entry_text, index + 1) - self.update_router_table() - except: - messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12") - return - - def update_router_table(self): - self.router_treeview.delete(*self.router_treeview.get_children()) - for i, entrys in self.router_table.items(): - for entry in entrys: - self.router_treeview.insert("", "end", values=(f"接口{i}", entry)) - - -class SwitchConfigWindow(RouterConfigWindow): - def __init__(self, parent, router_obj): - super().__init__(parent, router_obj) - self.geometry("435x433+350+200") - self.title(f"{router_obj.ObjLabel}交换表配置") - self.router_obj = router_obj - self.interface_entries = [] - self.router_table = {} - self.create_interface_inputs() - self.create_router_table() - - def create_router_table(self): - def on_right_click(event): - row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引 - if row: - self.router_treeview.selection_set(row) # 选中该行 - delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单 - - def delete_row(): - selected_items = self.router_treeview.selection() # 获取选中的行 - for item in selected_items: - ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1] - self.router_obj.delete_config(ifs, network) - self.router_treeview.delete(item) - - self.router_table_frame = tk.Frame(self) - self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) - self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings") - self.router_treeview.heading("Interface", text="接口") - self.router_treeview.heading("Route", text="mac") - self.router_treeview.pack(side="left", fill="both") - scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview) - scrollbar.pack(side="right", fill="y") - self.router_treeview.configure(yscrollcommand=scrollbar.set) - self.router_table = self.router_obj.mac_table - self.router_treeview.bind("", on_right_click) - # 创建删除菜单 - delete_menu = tk.Menu(root, tearoff=False) - delete_menu.add_command(label="删除", command=delete_row) - self.update_router_table() - - - def add_router_entry(self, index): - entry_text = self.interface_entries[index].get() - if isinstance(self.router_obj, SimRouter): - if not validate_ip_address(entry_text): - messagebox.showerror("注意", message="添加的网段信息格式不合格") - self.interface_entries[index].delete(0, tk.END) - self.focus_set() - return - if entry_text: - if index + 1 in self.router_table: - self.router_table[index + 1].append(entry_text) - else: - self.router_table[index + 1] = [entry_text] - self.interface_entries[index].delete(0, tk.END) - self.router_obj.add_config(entry_text, index + 1) - self.update_router_table() - - -class NetWorkAnalog(Canvas): - def __init__(self, master, **kwargs): - super().__init__(master, **kwargs) - self.master = master - self.router_img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/路由器.png").resize((60, 60))) - self.switch_img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/交换机.png").resize((60, 60))) - self.hub_img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/集线器.png").resize((60, 60))) - self.host_img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/主机.png").resize((60, 60))) - self.chose = self.host_img - self.AllSimObjs = {} - self.conns = [] - self.drawLine = True # 画线标志 - self.line_start_obj = None - self.line_end_obj = None - self.line_start_ifs = None - self.line_end_ifs = None - self.chose_obj = None - self.show_label_flag = True - self.show_interface_flag = True - self.create_widget() - - def is_chose(self): - """ - 当被选中时,绘制选中框 - :return: - """ - self.delete("rectangle") - self.create_rectangle(self.chose_obj.ObjX - 25, self.chose_obj.ObjY - 25, self.chose_obj.ObjX + 25, - self.chose_obj.ObjY + 25, outline="red", tags="rectangle") - - def tag_bind_event(self): - """ - 为每个子组件绑定事件 - :return: - """ - - def init(): - """ - 初始化方法,将连接对象赋值为初始化值 - :return: - """ - self.line_start_obj = None - self.line_end_obj = None - self.line_start_ifs = None - self.line_end_ifs = None - - def show_menu(event, tag: SimBase): - """ - 右击组件弹出选择接口框 - :param event: - :param tag: - :return: - """ - menu = Menu(self, tearoff=0) - flag = False - for conn in tag.connections: - i = tag.connections.index(conn) - if not isinstance(conn, AllSimConnect): - menu.add_command(label=f"接口{i + 1}", command=lambda num=i + 1: interface_selected(num, tag)) - flag = True - if not flag: - menu.add_command(label="暂无可用接口", state="disabled") - menu.post(event.x_root, event.y_root) - - def interface_selected(interface, tag): - """ - 选择接口回调 - :param interface: - :return: - """ - if self.line_start_ifs is None: - self.line_start_ifs = interface - self.line_start_obj = tag - self.drawLine = True - self.bind("", lambda event: right_motion(event)) - self.bind("", quit) - self.focus_set() # 获取焦点 - else: - self.line_end_ifs = interface - self.line_end_obj = tag - self.delete("Line") - flag = False - if self.line_start_obj.ObjID == self.line_end_obj.ObjID: - messagebox.showerror("注意!", message="不能连接自己") - return - conn = AllSimConnect(self, self.line_start_obj, self.line_start_ifs, self.line_end_obj, - self.line_end_ifs) - for conn_obj in self.line_start_obj.connections: # 判断两个连接对象是否已经连接过了 - if conn_obj == conn: - flag = True - if not flag: - self.line_start_obj.connections[self.line_start_ifs - 1] = conn - self.line_end_obj.connections[self.line_end_ifs - 1] = conn - conn.save() - self.conns.append(conn) - init() - self.unbind("") - else: - del conn - self.delete("Line") - self.unbind("") - init() - messagebox.showerror("注意!", message="已经连接过了") - - def right_motion(event): - """ - 移动鼠标 - :param event: - :param tag: - :return: - """ - if self.drawLine: - self.delete("Line") - x, y = event.x, event.y - if not (150 < x < 650 and 40 < y < 490): - if x < 150: - x = 150 - elif x > 650: - x = 650 - if y < 40: - y = 40 - elif y > 490: - y = 490 - self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5", - width=1, - tags="Line") - return - self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5", - width=1, tags="Line") - - def quit(event): - self.delete("Line") # 删除刚刚产生的连接线 - self.drawLine = False # 关闭画线标志 - init() - - for tag_id, tag in self.AllSimObjs.items(): - self.tag_bind(tag.ObjID, "", lambda event, tag=tag: show_menu(event, tag)) # 绑定右击事件 - - def bind_event(self, component, name): - # todo: 绑定事件 - """ - 绑定事件 - :param component: 组件对象 - :param name: 组件名称 - """ - def move(event, name): - """ - 鼠标左键松开事件 - :param event: 事件对象 - """ - if 170 < event.x < 630 and 60 < event.y < 470: # 在方框内,无变化 - if name == "路由器": - tag = SimRouter(self, event.x, event.y) - elif name == "集线器": - tag = SimHub(self, event.x, event.y) - elif name == "交换机": - tag = SimSwitch(self, event.x, event.y) - else: - tag = SimHost(self, event.x, event.y) - self.AllSimObjs[tag.ObjID] = tag - tag.save() - self.tag_bind_event() - else: - self.delete("L") - - def motion(event, name): - """ - 鼠标拖动事件 - :param event: 事件对象 - """ - if name == "路由器": - self.chose = self.router_img - elif name == "集线器": - self.chose = self.hub_img - elif name == "交换机": - self.chose = self.switch_img - else: - self.chose = self.host_img - self.delete("L") - if 170 < event.x < 630 and 60 < event.y < 470: # 在方框内,无变化 - pass - else: # 在方框外,显示禁止放置标识 - self.create_oval(event.x - 10, event.y - 45, event.x + 10, event.y - 25, outline="red", width=2, - tags="L") - self.create_line(event.x - 7, event.y - 42, event.x + 8, event.y - 27, fill="red", width=2, tags="L") - self.create_image(event.x - 30, event.y - 30, image=self.chose, anchor="nw", tags="L") - - self.tag_bind(component, "", lambda event: move(event, name)) - self.tag_bind(component, "", lambda event: motion(event, name)) - - def reload_data(self): - # todo: 加载上一次程序运行的数据 - """ - 加载上一次程序运行时的数据 - :return: - """ - self.AllSimObjs = {} - self.conns = [] - sim_obj_sql = "select * from sim_objs" - sim_obj_data = search(sim_obj_sql) - for index, row in sim_obj_data.iterrows(): # 初始化组件对象 - sim_type = row["ObjType"] - ObjX = row["ObjX"] - ObjY = row["ObjY"] - ConfigCorrect = row["ConfigCorrect"] - ObjLable = row["ObjLabel"] - ObjID = row["ObjID"] - if sim_type == 1: - tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) - elif sim_type == 2: - tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) - elif sim_type == 3: - tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) - else: - tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable) - self.AllSimObjs[tag.ObjID] = tag - - sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id" - sim_conn_data = search(sim_conn_sql) - conn_datas = {} - for index, conn in sim_conn_data.iterrows(): - if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas: - conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])] - else: - conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"])) - for key, value in conn_datas.items(): - conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1], - self.AllSimObjs[value[1][0]], value[1][1], key[1]) - self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象 - self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj - self.conns.append(conn_obj) - conn_obj.ConfigCorrect = key[1] - self.tag_bind_event() - - def delete_obj(self): - # todo: 删除对象 - """ - 选中删除对象 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要删除的对象!") - return - ask = messagebox.askquestion(title='确认操作', message='确认删除该对象吗?') - if ask == "no": - return - self.delete(self.chose_obj.ObjID) # 删除图片 - self.delete(self.chose_obj.ObjID + "text") # 删除标签 - for conn in self.chose_obj.connections: # 删除该对象的所有连接线 - if isinstance(conn, AllSimConnect): - conn.delete_line() - delete_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{self.chose_obj.ObjID}')" - execute_sql(delete_sql) - delete_obj(self.chose_obj.ObjID) - self.delete("rectangle") - self.reload_data() - - def delete_line(self): - # todo: 删除连接线 - """ - 删除连接线 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要删除连接线的对象!") - return - conn_sql = f""" - select - s.conn_id, ConfigCorrect, node_id, node_ifs - from sim_conn s - join conn_config c on s.conn_id=c.conn_id - where node_id='{self.chose_obj.ObjID}' - """ - conn_data = search(conn_sql) - conn_names = {} - for index, conn in conn_data.iterrows(): - if conn["conn_id"] not in conn_names: - conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])] - else: - conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"])) - child_d = tk.Toplevel() - child_d.title(f"{self.chose_obj.ObjLabel}的连接线配置") - child_d.geometry('300x200+450+200') - child_d.grab_set() - cv1 = Canvas(child_d) - cv1.pack() - value = StringVar() - combobox = ttk.Combobox( - master=child_d, # 父容器 - height=12, # 高度,下拉显示的条目数量 - width=15, # 宽度 - state='readonly', # readonly(只可选) - font=('', 18), # 字体 - textvariable=value, # 通过StringVar设置可改变的值 - values=list(conn_names.keys()), # 设置下拉框的选项 - ) - - def del_line(): - if value.get() == "": - messagebox.showerror("注意", message="请选择需要删除的连接线!") - return - conn_sql = f""" - select - conn_id, node_id, node_ifs - from conn_config - where conn_id='{value.get()}' - """ - conn_data = search(conn_sql) - delete_sql = f"delete from sim_conn where conn_id='{value.get()}'" - execute_sql(delete_sql) - conn_names = {} - for index, conn in conn_data.iterrows(): - if conn["conn_id"] not in conn_names: - conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])] - else: - conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"])) - for data in conn_names[value.get()]: - self.AllSimObjs[data[0]].connections[data[1] - 1].delete_line() - self.AllSimObjs[data[0]].connections[data[1] - 1] = None - child_d.destroy() - messagebox.showinfo("提示", f"连接线{value.get()}删除成功!") - - btn_yes = tk.Button(child_d, text='确定', font=('黑体', 12), height=1, command=del_line) - btn_yes.place(x=240, y=20) - combobox.place(x=20, y=20) - - def update_tag_name(self): - # todo: 更新组件名称 - """ - 更新组件名称 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要更新的对象!") - return - child1 = tk.Toplevel() - child1.title(self.chose_obj.ObjLabel + "的标签信息") - child1.geometry('240x100+450+250') - child1.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作 - tk.Label(child1, text='原标签:' + self.chose_obj.ObjLabel, - font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w') - tk.Label(child1, text='新标签:', font=('黑体', 12)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示 - new_name = tk.Entry(child1, font=('黑体', 12), textvariable=tk.StringVar()) - new_name.grid(row=1, column=1) - - def update_tag(): - name = new_name.get() - if name == "": - messagebox.showerror("注意", message="请输入新名称!") - return - self.chose_obj.ObjLabel = name - self.chose_obj.update() - for conn in self.chose_obj.connections: - if isinstance(conn, AllSimConnect): - conn.update_info(self.chose_obj.ObjID) - child1.destroy() - messagebox.showinfo("提示", message="修改成功!") - - tk.Button(child1, text='确定', font=('黑体', 10), height=1, command=update_tag).grid(row=2, column=0, - sticky='e') - tk.Button(child1, text='取消', font=('黑体', 10), height=1, command=child1.destroy).grid(row=2, column=1, - sticky='e') - - def network_config(self): - # todo: 网络配置 - """ - 网络配置 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要配置的对象!") - return - if len(self.chose_obj.get_config()) == 0: - messagebox.showerror("注意", message="请先给对象添加连接线!") - return - child_r = tk.Toplevel() - child_r.title(self.chose_obj.ObjLabel + "的网络配置信息") - child_r.geometry('530x395+350+200') - child_r.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作 - ifs_frame = tk.Frame(child_r) - ifs_frame.grid(row=1, column=0, columnspan=4) # 装接口的框架 - ifs_frame_YN = tk.Frame(child_r) - ifs_frame_YN.grid(row=2, column=0, columnspan=4) - num = 0 - datas = {} - for conn in self.chose_obj.connections: - if isinstance(conn, AllSimConnect): - index = self.chose_obj.connections.index(conn) - num_label = tk.LabelFrame(ifs_frame, text=f'接口{index + 1}') - num_label.grid(row=num, column=0, padx=18, ipady=5) - tk.Label(num_label, text='MAC:', font=('黑体', 10)).grid(row=0, column=0) - mac_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2, 3] else NORMAL) - mac_en.insert('0', str(self.chose_obj.interface[index].get("mac", ""))) - mac_en.grid(row=0, column=1, padx=10) - tk.Label(num_label, text='IP:', font=('黑体', 10)).grid(row=1, column=0) - ip_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2] else NORMAL) - ip_en.insert('0', str(self.chose_obj.interface[index].get("ip", ""))) - ip_en.grid(row=1, column=1, padx=10) - tk.Label(num_label, text='端口:', font=('黑体', 10)).grid(row=0, column=2) - port_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL) - port_en.insert('0', str(self.chose_obj.interface[index].get("conn_port", ""))) - port_en.grid(row=0, column=3, padx=10) - tk.Label(num_label, text='应用层地址:', font=('黑体', 10)).grid(row=1, column=2) - addr_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL) - addr_en.insert('0', str(self.chose_obj.interface[index].get("addr", ""))) - addr_en.grid(row=1, column=3, padx=10) - num += 1 - datas[index + 1] = {"mac": mac_en, "ip": ip_en, "conn_port": port_en, "addr": addr_en} - - num_label = tk.LabelFrame(ifs_frame, text=f'示例') - num_label.grid(row=num, column=0, padx=18, ipady=5) - tk.Label(num_label, text='MAC:', font=('黑体', 10)).grid(row=0, column=0) - mac_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar()) - mac_en.insert('0', "MAC11") - mac_en.config(state=READONLY) - mac_en.grid(row=0, column=1, padx=10) - tk.Label(num_label, text='IP:', font=('黑体', 10)).grid(row=1, column=0) - ip_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar()) - ip_en.insert(0, "10.1.1.10") - ip_en.config(state=READONLY) - ip_en.grid(row=1, column=1, padx=10) - tk.Label(num_label, text='端口:', font=('黑体', 10)).grid(row=0, column=2) - port_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar()) - port_en.insert('0', "80") - port_en.config(state=READONLY) - port_en.grid(row=0, column=3, padx=10) - tk.Label(num_label, text='应用层地址:', font=('黑体', 10)).grid(row=1, column=2) - addr_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar()) - addr_en.insert('0', "10.1.2.10:10810:Name3") - addr_en.config(state=READONLY) - addr_en.grid(row=1, column=3, padx=10) - def commit(): - self.chose_obj.config(datas) - child_r.destroy() - - tk.Button(ifs_frame_YN, text='确定', font=('黑体', 10), height=1, command=commit).grid(row=0, column=0, - padx=20) # ,sticky="w" - tk.Button(ifs_frame_YN, text='取消', font=('黑体', 10), height=1, - command=child_r.destroy).grid(row=0, column=2, padx=20) - - def router_table_config(self): - # todo: 路由表配置 - """ - 路由表配置 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要配置的对象!") - return - if self.chose_obj.ObjType != 2: - messagebox.showerror("注意", message="请选择路由器对象!") - return - RouterConfigWindow(self, self.chose_obj) - - def mac_table_config(self): - # todo: 交换表配置 - """ - 交换表配置 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要配置的对象!") - return - if self.chose_obj.ObjType != 3: - messagebox.showerror("注意", message="请选择交换机对象!") - return - SwitchConfigWindow(self, self.chose_obj) - - def show_label(self): - """ - 显示/不显示标签 - :return: - """ - self.show_label_flag = not self.show_label_flag - if self.show_label_flag: - for tag in self.AllSimObjs.values(): - tag.create_img() - else: - for tag in self.AllSimObjs.values(): - self.delete(tag.ObjID + "text") - - def show_interface(self): - # todo: 显示/不显示接口 - """ - 显示/不显示接口 - :return: - """ - self.show_interface_flag = not self.show_interface_flag - if self.show_interface_flag: - for conn in self.conns: - conn.draw_line() - else: - for conn in self.conns: - self.delete(conn.NobjS.ObjID + conn.NobjE.ObjID) - - def show_network_config(self): - # todo: 显示网络配置信息 - """ - 显示网络配置信息 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要显示的对象!") - return - self.delete("netSet") - self.create_text(740, 120, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 10, 'bold'), - fill="#7030a0", tags="netSet") - self.create_text(675, 145, text=self.chose_obj, - anchor="nw", font=('宋体', 11), tags="netSet") - - def show_router_config(self): - # todo: 显示路由表交换表信息 - """ - 显示路由交换表信息 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要显示的对象!") - return - if self.chose_obj.ObjType != 2 and self.chose_obj.ObjType != 3: - messagebox.showerror("注意", message="请选择路由器/交换机对象!") - return - self.delete("routerSet") - self.create_text(905, 120, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 10, 'bold'), - fill="#7030a0", tags="routerSet") - self.create_text(835, 145, text=self.chose_obj.get_table_config(), - anchor="nw", font=('宋体', 11), tags="routerSet") - - def send_packet(self): - """ - 发送数据包 - :return: - """ - if self.chose_obj is None: - messagebox.showerror("注意", message="请先选择要显示的对象!") - return - if self.chose_obj.ConfigCorrect != 1: - messagebox.showerror("注意", message="请先对选择对象进行网络配置!") - return - if self.chose_obj.ObjType != 1: - messagebox.showerror("注意", message="请选择主机对象!") - return - child2 = tk.Toplevel() - child2.title("数据包配置") - child2.geometry('240x100+450+250') - child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作 - tk.Label(child2, text='目的IP:', font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w') - packet_ip = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_ip.grid(row=0, column=1) - tk.Label(child2, text='目的MAC:', font=('黑体', 12)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示 - packet_mac = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_mac.grid(row=1, column=1) - tk.Label(child2, text='消息:', font=('黑体', 12)).grid(row=2, column=0, sticky='w') # ,sticky='w'靠左显示 - packet_message = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_message.grid(row=2, column=1) - - def send(): - """ - 发送数据包 - :return: - """ - if packet_ip.get() == "": - messagebox.showerror("注意", message="ip地址不能为空") - return - if not validate_ip_address(packet_ip.get()): - messagebox.showerror("注意", message="IP地址不规范!") - return - if packet_mac.get() == "": - messagebox.showerror("注意", message="mac地址不能为空!") - return - if packet_message.get() == "": - messagebox.showerror("注意", message="消息不能为空!") - return - self.chose_obj.create_packet(packet_ip.get(), - packet_mac.get(), - packet_message.get()) - child2.destroy() - - tk.Button(child2, text='确定', font=('黑体', 10), height=1, command=send).grid(row=3, column=0, sticky='e') - tk.Button(child2, text='取消', font=('黑体', 10), height=1, command=child2.destroy).grid(row=3, column=1, - sticky='e') - - def send_packet_list(self): - """ - 批量发送数据包 - :return: - """ - hosts = {} - for tag in self.AllSimObjs.values(): - if tag.ObjType == 1 and tag.ConfigCorrect == 1: - hosts[tag.ObjLabel] = tag.ObjID - child2 = tk.Toplevel() - child2.title("批量数据包配置") - child2.geometry('400x420+450+200') - tk.Label(child2, text='目的IP:', font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w', ipady=10) - packet_ip = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_ip.grid(row=0, column=1, pady=5) - tk.Label(child2, text='目的MAC:', font=('黑体', 12)).grid(row=1, column=0, sticky='w', - pady=5) # ,sticky='w'靠左显示 - packet_mac = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_mac.grid(row=1, column=1, pady=5) - tk.Label(child2, text='消息:', font=('黑体', 12)).grid(row=2, column=0, sticky='w', pady=5) # ,sticky='w'靠左显示 - packet_message = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar()) - packet_message.grid(row=2, column=1, pady=5) - host = StringVar() - tk.Label(child2, text='发送主机:', font=('黑体', 12)).grid(row=3, column=0, sticky='w', - pady=5) # ,sticky='w'靠左显示 - combobox = ttk.Combobox( - master=child2, # 父容器 - height=5, # 高度,下拉显示的条目数量 - width=12, # 宽度 - state='readonly', # readonly(只可选) - font=('', 18), # 字体 - textvariable=host, # 通过StringVar设置可改变的值 - values=list(hosts.keys()), # 设置下拉框的选项 - ) - combobox.grid(row=3, column=1, pady=5) - packet_frame = tk.Frame(child2) - packet_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5) - packet_treeview = ttk.Treeview(packet_frame, columns=("source_host", "ip", "mac", "message"), show="headings") - packet_treeview.heading("source_host", text="发送主机") - packet_treeview.column("source_host", width=60, anchor=CENTER) - packet_treeview.heading("ip", text="IP") - packet_treeview.column("ip", width=120, anchor=CENTER) - packet_treeview.heading("mac", text="MAC") - packet_treeview.column("mac", width=60, anchor=CENTER) - packet_treeview.heading("message", text="消息") - packet_treeview.column("message", width=120, anchor=CENTER) - packet_treeview.pack(side="left", fill="both") - scrollbar = ttk.Scrollbar(packet_frame, orient="vertical", command=packet_treeview.yview) - scrollbar.pack(side="right", fill="y") - packet_treeview.configure(yscrollcommand=scrollbar.set) - packets = [] - - def add(): - ip = packet_ip.get() - mac = packet_mac.get() - message = packet_message.get() - chose_host = host.get() - if ip == "" or mac == "" or message == "" or chose_host == "": - messagebox.showerror("注意", message="输入框不能为空") - return - packet = SimPacket(self.AllSimObjs[hosts[chose_host]].interface[0]["ip"], - self.AllSimObjs[hosts[chose_host]].interface[0]["mac"], - ip, mac, message) - packets.append((self.AllSimObjs[hosts[chose_host]], packet)) - packet_treeview.delete(*packet_treeview.get_children()) - for datas in packets: - tag: SimBase = datas[0] - packet_data: SimPacket = datas[1] - packet_treeview.insert("", "end", values=( - tag.ObjLabel, packet_data.destination_ip, packet_data.destination_mac, packet_data.message)) - - def send(): - if len(packets) == 0: - messagebox.showerror("注意", message="请至少添加一条数据包!") - return - for data in packets: - threading.Thread(target=data[0].send, args=(data[1],)).start() - child2.destroy() - - tk.Button(child2, text="添加数据包", font=('黑体', 12), height=3, command=add).grid(row=0, column=2, rowspan=4) - button_frame = Frame(child2) - tk.Button(button_frame, text="发送", font=('黑体', 12), height=1, command=send).pack(side=RIGHT, padx=20) - tk.Button(button_frame, text="取消", font=('黑体', 12), height=1, command=child2.destroy).pack(side=RIGHT) - button_frame.grid(row=6, column=0, columnspan=3) - - def clear_canvas(self): - """ - 清除画布 - :return: - """ - ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?') - if ask == "no": - return - truncate_db() # 清除数据库 - for tag_id, tag in self.AllSimObjs.items(): - self.delete(tag_id) - self.delete(tag_id + "text") - for conn in self.conns: - conn.delete_line() - self.delete("rectangle") - self.AllSimObjs.clear() - self.conns.clear() - - def create_widget(self): - # todo: 创建页面 - """ - 创建整体页面布局 - :return: - """ - self.create_rectangle(150, 40, 650, 490, outline="#7f6000", width=3) # 矩形框,左上角坐标,右下角坐标 - self.create_rectangle(660, 100, 815, 400, outline="#ffff00", width=3, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标 - self.create_text(735, 105, text="网络配置信息", anchor="n", font=('微软雅黑', 11, 'bold')) - self.create_rectangle(825, 100, 1000, 400, outline="#ffff00", width=3, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标 - self.create_text(915, 105, text="路由/交换表信息", anchor="n", font=('微软雅黑', 11, 'bold')) - # 显示左边的固定图片 - self.create_text(80 - 55, 120 + 15, text="路由器", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字 - router = self.create_image(80, 120, image=self.router_img, anchor="nw") - self.create_text(80 - 55, 190 + 15, text="交换机", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字 - switch = self.create_image(80, 190, image=self.switch_img, anchor="nw") - self.create_text(80 - 55, 260 + 15, text="集线器", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字 - hub = self.create_image(80, 260, image=self.hub_img, anchor="nw") - self.create_text(80 - 55, 330 + 15, text="主机", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字 - host = self.create_image(80, 330, image=self.host_img, anchor="nw") - self.bind_event(router, "路由器") - self.bind_event(switch, "交换机") - self.bind_event(hub, "集线器") - self.bind_event(host, "主机") - - # 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方 - menubar = tk.Menu(root) - # 定义一个空菜单单元 - setMenu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label='删除与修改', menu=setMenu) - setMenu.add_command(label='删除对象', command=self.delete_obj) - setMenu.add_command(label='删除连接线', command=self.delete_line) - setMenu.add_command(label='修改标签', command=self.update_tag_name) - # # 定义一个空菜单单元 - setMenu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label='基础配置', menu=setMenu) - setMenu.add_command(label='网络配置', command=self.network_config) - setMenu.add_command(label='路由表配置', command=self.router_table_config) - setMenu.add_command(label='交换表配置', command=self.mac_table_config) - root.config(menu=menubar) - # 定义一个空菜单单元 - setMenu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label='显示设置', menu=setMenu) - setMenu.add_command(label='显示/不显示标签', command=self.show_label) - setMenu.add_command(label='显示/不显示接口', command=self.show_interface) - setMenu.add_command(label='显示网络配置信息', command=self.show_network_config) - setMenu.add_command(label='显示路由/交换表信息', command=self.show_router_config) - # 定义一个空菜单单元 - setMenu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label='发送数据包', menu=setMenu) - setMenu.add_command(label='发送数据包', command=self.send_packet) - setMenu.add_command(label='批量发送数据包', command=self.send_packet_list) - - menubar.add_command(label='清除屏幕', command=self.clear_canvas) - root.config(menu=menubar) - self.reload_data() - - -if __name__ == '__main__': - root = Window() - root.title('网络拓扑图') - width = 1030 - height = 530 # 窗口大小 - screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小 - screen_height = root.winfo_screenheight() - x = int((screen_width - width) / 2) - y = int((screen_height - height) / 2) - 40 - size = '{}x{}+{}+{}'.format(width, height, x, y) - canvas = NetWorkAnalog(root, width=1030, heigh=height, bg="white") - canvas.place(x=0, y=0, anchor='nw') - root.geometry(size) - root.mainloop() diff --git a/SimObjs.py b/SimObjs.py deleted file mode 100644 index 8c1555c..0000000 --- a/SimObjs.py +++ /dev/null @@ -1,757 +0,0 @@ -import math -import sys -import threading -from time import sleep - -from ttkbootstrap import * -from uuid import uuid4 -import ipaddress - -from PIL import ImageTk, Image - -from dbUtil import search, execute_sql - - -class SimBase(): - # todo: 组件父类 - """ - 图标类,所有组件的父类 - """ - def __init__(self, canvas: Canvas, x, y, id, config=None, label=None): - self.ConfigCorrect = 0 if config is None else config # 是否进行配置 - self.ObjType = None # 组件类型 1->主机,2->路由器,3->交换机,4->集线器 - self.ObjLabel = "" - self.ObjID = id - self.ObjX = x - self.ObjY = y - self.canvas = canvas - self.img = None - self.img_tm = None - self.interface = [{}, {}, {}, {}] - self.connections = ["1", "2", "3", "4"] - self.set_default_config() - - def bind_event(self): - self.canvas.tag_bind(self.ObjID, "", self.start_drag) - self.canvas.tag_bind(self.ObjID, "", self.drag) - self.canvas.tag_bind(self.ObjID, "", self.release) - - def set_default_name(self): - data_frame = search(f"select objid from sim_objs where objType={self.ObjType}") - num = data_frame.size + 1 - if isinstance(self, SimHost): - name = "SHO%d" % num - elif isinstance(self, SimRouter): - name = "SRO%d" % num - elif isinstance(self, SimSwitch): - name = "SWI%d" % num - else: - name = "SHUB%d" % num - return name - - def release(self, event): - if self.click_x == event.x and self.click_y == event.y: # 鼠标左键单击 - self.canvas.chose_obj = self - self.canvas.is_chose() - else: - self.update() - - def start_drag(self, event): - self.canvas.tag_raise(self.ObjID) # 将 SimBase 组件置于最上层 - self.start_x = event.x - self.start_y = event.y - self.click_x = event.x - self.click_y = event.y - - def drag(self, event): - """ - 移动图标 - :param event: - :return: - """ - self.canvas.delete("rectangle") - self.canvas.chose_obj = None - dx = event.x - self.start_x - dy = event.y - self.start_y - # 移动范围限制, 超出移动范围则直接返回 - if not (170 <= self.ObjX + dx <= 630 and 60 <= self.ObjY + dy <= 470): - return - self.ObjX += dx - self.ObjY += dy - self.canvas.move(self.ObjID, dx, dy) # 移动 SimBase 组件 - self.canvas.move(self.ObjID + "text", dx, dy) # 移动 SimBase 组件 - self.start_x = event.x - self.start_y = event.y - for conn in self.connections: - if isinstance(conn, AllSimConnect): - conn.update_line() - - def create_img(self): - """ - 创建图片 - :return: - """ - self.canvas.delete("L") - id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30, - image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw", - tags=self.ObjID) - self.canvas.dtag("L", "L") - self.canvas.create_text(self.ObjX, self.ObjY - 40, text=self.ObjLabel, font=("", 12, "bold"), - fill="#7030a0", tags=self.ObjID + "text", anchor="nw") - self.canvas.tag_raise(id) - self.bind_event() - - def config(self, interface): - """ - 网络配置方法, - :param interface: 传入配置数据 - :return: - """ - for key, value in interface.items(): - self.interface[key - 1] = {key: value.get() if not value.get() == "" else "NULL" for key, value in value.items()} - self.ConfigCorrect = 1 - self.create_img() - for conn in self.connections: - if isinstance(conn, AllSimConnect): - index = self.connections.index(conn) + 1 - conn.update_node_config(self, index) - self.update() - - def set_default_config(self): - sql = f"select * from conn_config where node_id='{self.ObjID}'" - conn_data = search(sql) - for index, conn in conn_data.iterrows(): - self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"], - "mac": conn["mac"], - "conn_port": conn["conn_port"], - "addr": conn["addr"]} - - def get_config(self): - sql = f"select * from conn_config where node_id='{self.ObjID}'" - conn_data = search(sql) - return conn_data - - def save(self): - """ - 将对象存储至mysql当中 - :return: - """ - sql = f"insert into sim_objs values ('{self.ObjID}', {self.ObjType}, '{self.ObjLabel}'," \ - f"{self.ObjX}, {self.ObjY}, {self.ConfigCorrect})" - execute_sql(sql) - - def update(self): - """ - 当坐标发生改变时修改数据库数据 - :return: - """ - self.canvas.delete(self.ObjID + "text") - self.canvas.create_text(self.ObjX, self.ObjY - 40, text=self.ObjLabel, font=("", 12, "bold"), - fill="#7030a0", tags=self.ObjID + "text", anchor="nw") - sql = f"update sim_objs set objLabel='{self.ObjLabel}', ObjX={self.ObjX}," \ - f"ObjY={self.ObjY}, ConfigCorrect={self.ConfigCorrect} where ObjID='{self.ObjID}'" - execute_sql(sql) - - def transfer_animate(self, status, packet, error_message=None): - if status: - text = f"目的IP: {str(packet.destination_ip)}\n" \ - f"目的MAC: {packet.destination_mac}\n" \ - f"消息内容: {packet.message}" - self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20, outline="#92d050", - width=3, fill="#92d050", tags=self.ObjID + "packetData") - self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", - font=('', 10), tags=self.ObjID + "packetData") # 显示文字 - self.canvas.update() - sleep(2) - self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 - else: - text = f"传输失败\n" if error_message is None else error_message - self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red", - width=3, fill="red", tags=self.ObjID + "packetData") - self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw", - font=('', 10), tags=self.ObjID + "packetData") # 显示文字 - self.canvas.update() - sleep(2) - self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容 - - def __str__(self): - str = "" - config = self.get_config() - for index, data in config.iterrows(): - str += f"【接口{data['node_ifs']}】\n" - str += f"IP: {data['ip']}\n" - str += f"MAC: {data['mac']}\n" - return str - - -class SimPacket(): - # todo: 数据包类 - """ - 数据包类 - """ - def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message): - """ - :param source_mac: 源主机mac地址 - :param source_ip: 源主机ip地址 - :param destination_mac: 目的主机mac地址 - :param destination_ip: 目的主机ip地址 - :param message: 数据 - """ - self.source_mac = source_mac - self.source_ip = source_ip - self.destination_mac = destination_mac - self.destination_ip = destination_ip - self.message = message - self.up_jump = None # 上一跳连接对象 - self.img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30))) - self.id = str(uuid4()) - - def move(self, cv, target_x, target_y, duration): - start_x, start_y = cv.coords(self.id) - distance_x = target_x - start_x - distance_y = target_y - start_y - steps = duration // 10 # 以10毫秒为间隔进行移动 - step_x = distance_x / steps - step_y = distance_y / steps - self._move_step(cv, start_x, start_y, target_x, target_y, step_x, step_y, steps) - - def _move_step(self, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps): - if steps > 0: - new_x = start_x + step_x - new_y = start_y + step_y - cv.coords(self.id, new_x, new_y) - cv.update() # 更新画布显示 - sleep(0.01) # 添加延迟以控制动画速度 - self._move_step(cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1) - else: - cv.coords(self.id, target_x, target_y) - cv.delete(self.id) - - def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase): - cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id) - self.move(cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500) - - -class AllSimConnect(): - # todo: 连接类 - def __init__(self, canvas: Canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None): - """ - 连接对象 - :param nodex: 节点 - :param nodex_ifs: 节点接口 - :param nodey: 节点 - :param nodey_ifs: 节点接口 - """ - self.canvas = canvas - self.ConfigCorrect = 0 if config is None else config - self.NobjS = nodex - self.NobjE = nodey - self.IfsS = nodex_ifs - self.IfsE = nodey_ifs - self.width = 1 if self.ConfigCorrect == 0 else 4 # 线的粗细 - self.draw_line() - - def is_connected_to(self, other): - """ - 判断两个连接对象是否已经连接 - :param other: 另一个连接对象 - :return: 如果已连接,则返回True;否则返回False - """ - return ( - self.NobjS.ObjID == other.NobjS.ObjID - and self.NobjE.ObjID == other.NobjE.ObjID - ) - - def update_node_config(self, node, ifs): - """ - 当节点进行网络配置后将节点的接口配置保存 - :return: - """ - try: - nodex_update_sql = f""" - update conn_config set - ip='{node.interface[ifs - 1]["ip"]}', - mac='{node.interface[ifs - 1]["mac"]}', - conn_port='{node.interface[ifs - 1]["conn_port"]}', - addr='{node.interface[ifs - 1]["addr"]}' - where node_id='{node.ObjID}' and node_ifs={ifs} - """ - execute_sql(nodex_update_sql) - self.ConfigCorrect = 1 - self.check_config() - update_sql = f""" - update sim_conn set ConfigCorrect={self.ConfigCorrect} - where conn_id in ( - select distinct conn_id from conn_config where node_id='{node.ObjID}' - ) - """ - execute_sql(update_sql) - except Exception as E: - pass - - def check_config(self): - """ - 检查两边节点的配置是否正确 - :return: - """ - if self.NobjS.interface[self.IfsS - 1]["mac"] != "" and self.NobjS.interface[self.IfsS - 1][ - "ip"] != "" \ - and self.NobjE.interface[self.IfsE - 1]["mac"] != "" and self.NobjE.interface[self.IfsE - 1][ - "ip"] != "": - self.width = 4 - self.draw_line() - else: - self.width = 1 - self.draw_line() - - def transfer(self, source_node, packet: SimPacket): - """ - 传输数据 - :param packet: 数据包 - :return: - """ - if source_node == self.NobjS: - packet.transfer_packet(self.canvas, self.NobjS, self.NobjE) - self.NobjE.receive(packet) - else: - packet.transfer_packet(self.canvas, self.NobjE, self.NobjS) - self.NobjS.receive(packet) - - def draw_line(self): - line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY, - width=self.width, fill="#5b9bd5", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line") - self.canvas.tag_lower(line) - self.analyseIFS(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjS.ObjLabel, self.IfsS, - self.NobjE.ObjX, self.NobjE.ObjY, self.NobjE.ObjLabel, self.IfsE) - - def update_line(self): - """ - 当组件移动时重新绘制线 - :return: - """ - self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line") - self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID) - self.draw_line() - - def update_info(self, obj_id): - """ - 修改数据库中的连接信息 - :return: - """ - update_sql = f""" - update sim_conn set conn_id='{self.NobjS.ObjLabel}-{self.NobjE.ObjLabel}', ConfigCorrect={self.ConfigCorrect} - where conn_id in ( - select distinct conn_id from conn_config where node_id='{obj_id}' - ) - """ - execute_sql(update_sql) - - def analyseIFS(self, SX, SY, SLabel, IfsS, EX, EY, ELabel, IfsE): # NobjS的x,y;NobjE的x,y; #分析接口在哪个位置,再显示 - ''' - :param SX: S对象的x坐标 - :param SY: S对象的y坐标 - :param SLabel: S对象的标签 - :param IfsS: S对象要显示的接口号 - ''' - if (EX - SX) == 0: # 即垂直的时候,NobjS在NobjE对象的正上方和正下方,x坐标无变化,只需要y变化 - R = 18 - x = 0 # x无偏移量 - y = R - else: - k = (EY - SY) / (EX - SX) # ey-sy/sx-ex - reat = math.atan(k) # 根据斜率计算弧度 - # 连线与图标交接点坐标 - R = 18 - x = abs(math.cos(reat) * R) + 6 # python math中三角函数中的数值是弧度,而计算器中的数值是角度 - y = abs(math.sin(reat) * R) + 6 - if SX <= EX and SY <= EY: # NobjS在NobjE的左上角,NobjS的右下角连接NobjE的左上角 - x_S = SX + x - y_S = SY + y # NobjS的连接点坐标 - x_E = EX - x - y_E = EY - y # NobjE的连接点坐标 - # 显示S接口号 - self.canvas.create_text((x_S - 5, y_S - 5), text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - # 显示E接口号 - self.canvas.create_text(x_E - 10, y_E - 10, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - elif SX < EX and SY > EY: # NobjS在NobjE的左下角,NobjS的右上角连接NobjE的左下角 - x_S = SX + x - y_S = SY - y # NobjS的连接点坐标 - x_E = EX - x - y_E = EY + y # NobjE的连接点坐标 - # 显示S接口号 - self.canvas.create_text(x_S + 5, y_S - 10, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - # 显示E接口号 - self.canvas.create_text(x_E - 5, y_E, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - elif SX > EX and SY < EY: # NobjS在NobjE的右上角,NobjS的左下角连接NobjE的右上角 - x_S = SX - x - y_S = SY + y # NobjS的连接点坐标 - x_E = EX + x - y_E = EY - y # NobjE的连接点坐标 - # 显示S接口号 - self.canvas.create_text(x_S - 5, y_S, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - # 显示E接口号 - self.canvas.create_text(x_E + 5, y_E, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - elif SX >= EX and SY >= EY: # NobjS在NobjE的右下角,NobjS的左上角连接NobjE的右下角 - x_S = SX - x - y_S = SY - y # NobjS的连接点坐标 - x_E = EX + x - y_E = EY + y # NobjE的连接点坐标 - # 显示S接口号 - self.canvas.create_text(x_S - 5, y_S - 15, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - # 显示E接口号 - self.canvas.create_text(x_E - 5, y_E - 5, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red", - tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字 - - def save(self): - """ - 将连接对象保存至数据库 - :return: - """ - conn_id = self.NobjS.ObjLabel + "-" + self.NobjE.ObjLabel - sql = f"insert into sim_conn values ('{conn_id}', {self.ConfigCorrect})" - execute_sql(sql) - execute_sql( - f"insert into conn_config values ('{conn_id}', '{self.NobjS.ObjID}', {self.IfsS}, '', '', '', '')") - execute_sql( - f"insert into conn_config values ('{conn_id}', '{self.NobjE.ObjID}', {self.IfsE}, '', '', '', '')") - - def delete_line(self): - self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID) - self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line") - - def __eq__(self, other): - """ - 重写equals方法,判断两个连接对象是否相同 - :param other: 连接对象 - :return: Boolean - """ - if isinstance(other, AllSimConnect): - other_ids = other.NobjS.ObjID + other.NobjE.ObjID - return (self.NobjS.ObjID in other_ids - and self.NobjE.ObjID in other_ids) - return False - - -class SimHost(SimBase): - # todo: 主机类 - """ - 主机类 - """ - def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None): - self.ObjID = str(uuid4()) if id is None else id - super().__init__(canvas, x, y, self.ObjID, config, label) - self.ObjType = 1 - self.ObjLabel = label if label is not None else self.set_default_name() - self.interface = [{}] - self.connections = [None] - self.img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/主机.png").resize((60, 60))) - self.img_tm = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/主机_tm.png").resize((60, 60))) - self.set_default_config() - self.create_img() - - def create_packet(self, ip, mac, message): - """ - 创建数据包 - :param ip: 目的主机ip - :param mac: 目的主机mac - :param message: 消息 - :return: - """ - packet = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], ip, mac, message) - print(f"创建数据包成功,数据包由{packet.source_ip} 发往 {packet.destination_ip}") - self.send(packet) - - def send(self, packet): - """ - 发送数据包 - :param packet: - :return: - """ - connection: AllSimConnect = self.connections[0] - print(f"数据包从 {self.ObjLabel} 发出") - packet.up_jump = connection - connection.transfer(self, packet) - - def receive(self, packet: SimPacket): - """ - 接收数据 - :param packet: 数据包 - :return: - """ - print(f"主机{self.ObjLabel}接受到数据{packet.message}") - if packet.destination_ip == self.interface[0]["ip"]: - self.transfer_animate(True, packet) - else: - self.transfer_animate(False, packet) - - def __str__(self): - str = "" - config = self.get_config() - for index, data in config.iterrows(): - str += f"【接口{data['node_ifs']}】\n" - str += f"AppAddr: /Root\n" - str += f"PORT: {data['conn_port']}\n" - str += f"IP: {data['ip']}\n" - str += f"MAC: {data['mac']}\n" - return str - - -class SimRouter(SimBase): - # todo: 路由类 - """ - 路由类 - """ - def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args): - self.ObjID = str(uuid4()) if id is None else id - super().__init__(canvas, x, y, self.ObjID, config, label) - self.ObjType = 2 - self.ObjLabel = label if label is not None else self.set_default_name() - self.img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/路由器.png").resize((60, 60))) - self.img_tm = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/路由器_tm.png").resize((60, 60))) - self.create_img() - self.router_table = {} - self.set_default_router_table() - - def set_default_router_table(self): - """ - 将数据库中的路由表信息提取 - :return: - """ - sql = f"select * from router_table where obj_id='{self.ObjID}'" - router_tables = search(sql) - for index, router_table in router_tables.iterrows(): - if router_table["node_ifs"] in self.router_table: - self.router_table[router_table["node_ifs"]].append(router_table["segment"]) - else: - self.router_table[router_table["node_ifs"]] = [router_table["segment"]] - - def check_destination_ip(self, destination_ip, network): - """ - 检查目标ip是否属于网段范围内 - :param destination_ip: 目标ip - :param network: 网段 - :return:10.2.3.0/24 - """ - ip = ipaddress.ip_address(destination_ip) - network = ipaddress.ip_network(network) - if ip in network: - return True - if network == "0.0.0.0/24": # 如果网段为0.0.0.0/24 则为默认路由 - return True - - def transmit(self, packet: SimPacket): - """ - 转发数据包 - :return: - """ - flag = False - next_hop_ifs = None - for conn in self.connections: - if isinstance(conn, AllSimConnect): - if conn.ConfigCorrect == 0: - continue - if conn == packet.up_jump: - continue - ifs = self.connections.index(conn) + 1 - for network in self.router_table[ifs]: - if self.check_destination_ip(packet.destination_ip, network): - flag = True - next_hop_ifs = ifs - if flag: - conn = self.connections[next_hop_ifs - 1] - packet.up_jump = conn - conn.transfer(self, packet) - else: - for conn in self.connections: - if isinstance(conn, AllSimConnect): - if conn == packet.up_jump: - continue - if conn.NobjS != self: - if conn.NobjE.ObjType == 1: - conn.transfer(self, packet) - break - error_message = "路由寻址失败" - self.transfer_animate(False, packet, error_message) - - def receive(self, packet): - """ - 接收数据 - :param packet: 数据包 - :return: - """ - print(f"{self.ObjLabel}-路由器接受到数据{packet.message}") - self.transmit(packet) - - def add_config(self, router, router_ifs): - sql = f"insert into router_table values ('{self.ObjID}', {router_ifs}, '{router}')" - execute_sql(sql) - - def delete_config(self, ifs, network): - sql = f"delete from router_table where obj_id='{self.ObjID}' and node_ifs={ifs} and segment='{network}'" - execute_sql(sql) - - def get_table_config(self): - """ - 返回对象的路由表配置信息,用于展示 - :return: - """ - str = "" - sql = f"select * from router_table where obj_id='{self.ObjID}'" - router_tables = search(sql) - for index, router in router_tables.iterrows(): - str += f"网段号: {router['segment']}\n" - str += f"接口号: {router['node_ifs']}\n" - return str - - -class SimSwitch(SimBase): - # todo: 交换机类 - """ - 交换机类 - """ - def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args): - self.ObjID = str(uuid4()) if id is None else id - super().__init__(canvas, x, y, self.ObjID, config, label) - self.ObjType = 3 - self.ObjLabel = label if label is not None else self.set_default_name() - self.img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/交换机.png").resize((60, 60))) - self.img_tm = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/交换机_tm.png").resize((60, 60))) - self.create_img() - self.mac_table = {} - self.set_default_mac_table() - - def set_default_mac_table(self): - """ - 将数据库中的路由表信息提取 - :return: - """ - sql = f"select * from mac_table where obj_id='{self.ObjID}'" - router_tables = search(sql) - for index, router_table in router_tables.iterrows(): - if router_table["node_ifs"] in self.mac_table: - self.mac_table[router_table["node_ifs"]].append(router_table["mac"]) - else: - self.mac_table[router_table["node_ifs"]] = [router_table["mac"]] - - def add_config(self, router, router_ifs): - sql = f"insert into mac_table values ('{self.ObjID}', {router_ifs}, '{router}')" - execute_sql(sql) - - def delete_config(self, ifs, mac): - sql = f"delete from mac_table where obj_id='{self.ObjID}' and node_ifs={ifs} and mac='{mac}'" - execute_sql(sql) - - def get_table_config(self): - """ - 返回对象的交换表配置信息,用于展示 - :return: - """ - str = "" - sql = f"select * from mac_table where obj_id='{self.ObjID}'" - router_tables = search(sql) - for index, router in router_tables.iterrows(): - str += f"网段号: {router['mac']}\n" - str += f"接口号: {router['node_ifs']}\n" - return str - - def transmit(self, packet: SimPacket): - """ - 转发数据包 - :return: - """ - flag = False - next_hub_ifs = None - for conn in self.connections: - if isinstance(conn, AllSimConnect): - if conn.ConfigCorrect == 0: - continue - ifs = self.connections.index(conn) + 1 - if packet.destination_mac in self.mac_table.get(ifs, []): - flag = True - next_hub_ifs = ifs - if flag: - conn = self.connections[next_hub_ifs - 1] - packet.up_jump = conn - conn.transfer(self, packet) - return - for conn in self.connections: # 将数据包往所有接口进行转发 - if isinstance(conn, AllSimConnect): - if conn == packet.up_jump: - continue - if conn.ConfigCorrect == 0: - continue - new_packet = SimPacket(packet.source_ip, - packet.source_mac, - packet.destination_ip, - packet.destination_mac, - packet.message) - new_packet.up_jump = conn - threading.Thread(target=conn.transfer, args=(self, new_packet)).start() - - def receive(self, packet: SimPacket): - """ - 接收数据 - :param packet: 数据包 - :return: - """ - print(f"交换机{self.ObjLabel}接受到数据{packet.message}") - self.transmit(packet) - - -class SimHub(SimBase): - # todo: 集线器类 - """ - 集线器类 - """ - def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args): - self.ObjID = str(uuid4()) if id is None else id - super().__init__(canvas, x, y, self.ObjID, config, label) - self.ObjType = 4 - self.ObjLabel = label if label is not None else self.set_default_name() - self.img = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/集线器.png").resize((60, 60))) - self.img_tm = ImageTk.PhotoImage( - Image.open(sys.path[0] + "/datas/images/集线器_tm.png").resize((60, 60))) - self.create_img() - - def transmit(self, packet: SimPacket): - """ - 集线器转发数据包 - :return: - """ - for conn in self.connections: # 将数据包往所有接口进行转发 - if isinstance(conn, AllSimConnect): - if conn == packet.up_jump: - continue - if conn.ConfigCorrect == 0: - continue - new_packet = SimPacket(packet.source_ip, - packet.source_mac, - packet.destination_ip, - packet.destination_mac, - packet.message) - new_packet.up_jump = conn - threading.Thread(target=conn.transfer, args=(self, new_packet)).start() - - def receive(self, packet: SimPacket): - """ - 接收数据 - :param packet: 数据包 - :return: - """ - print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!") - self.transmit(packet) diff --git a/dbUtil.py b/dbUtil.py deleted file mode 100644 index f2e20ef..0000000 --- a/dbUtil.py +++ /dev/null @@ -1,104 +0,0 @@ -import sqlite3 -import sys - -import pandas as pd -from pandas import DataFrame - -conn = sqlite3.connect(sys.path[0]+"/network.db") - -def execute_sql(sql): - """ - 执行sql语句 - :param sql: - :return: - """ - cursor = conn.cursor() - cursor.execute(sql) - conn.commit() - - -def search(sql) -> DataFrame: - return pd.read_sql(sql, conn) - - -def delete_obj(obj_id): - cursor = conn.cursor() - delete_obj_sql = f"delete from sim_objs where ObjID='{obj_id}'" - cursor.execute(delete_obj_sql) - delete_conn_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{obj_id}')" - cursor.execute(delete_conn_sql) - conn.commit() - - -def truncate_db(): - init_database() - -def init_database(): - cursor = conn.cursor() - cursor.execute(""" - DROP TABLE IF EXISTS `conn_config`; - """) - cursor.execute(""" - CREATE TABLE `conn_config` ( - `conn_id` varchar(55) NULL DEFAULT NULL, - `node_id` varchar(55) NULL DEFAULT NULL, - `node_ifs` int(0) NULL DEFAULT NULL, - `ip` varchar(55) NULL DEFAULT NULL, - `mac` varchar(128) NULL DEFAULT NULL, - `conn_port` varchar(32) NULL DEFAULT NULL, - `addr` varchar(255) NULL DEFAULT NULL, - CONSTRAINT `conn_config_sim_conn_conn_id_fk` FOREIGN KEY (`conn_id`) REFERENCES `sim_conn` (`conn_id`) ON DELETE CASCADE ON UPDATE CASCADE -) ; - """) - cursor.execute(""" - DROP TABLE IF EXISTS `mac_table`; - """) - cursor.execute(""" - CREATE TABLE `mac_table` ( - `obj_id` varchar(55) NULL DEFAULT NULL, - `node_ifs` int(0) NULL DEFAULT NULL, - `mac` varchar(55) NULL DEFAULT NULL, - CONSTRAINT `mac_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE -) ; - """) - cursor.execute(""" - DROP TABLE IF EXISTS `router_table`; - """) - cursor.execute(""" - CREATE TABLE `router_table` ( - `obj_id` varchar(55) NULL DEFAULT NULL, - `node_ifs` int(0) NULL DEFAULT NULL, - `segment` varchar(55) NULL DEFAULT NULL, - CONSTRAINT `router_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE -) ; - - """) - cursor.execute(""" - DROP TABLE IF EXISTS `sim_conn`; - """) - cursor.execute(""" - CREATE TABLE `sim_conn` ( - `conn_id` varchar(255) NOT NULL , - `ConfigCorrect` int(0) NULL DEFAULT NULL , - PRIMARY KEY (`conn_id`) -) ; - - """) - cursor.execute(""" - DROP TABLE IF EXISTS `sim_objs`; - """) - cursor.execute(""" - CREATE TABLE `sim_objs` ( - `ObjID` varchar(50) NOT NULL, - `ObjType` int(0) NULL DEFAULT NULL, - `ObjLabel` varchar(20) NULL DEFAULT NULL, - `ObjX` int(0) NULL DEFAULT NULL, - `ObjY` int(0) NULL DEFAULT NULL, - `ConfigCorrect` int(0) NULL DEFAULT NULL, - PRIMARY KEY (`ObjID`) -) ; - """) - conn.commit() - -if __name__ == '__main__': - init_database() diff --git a/network.db b/network.db deleted file mode 100644 index aeddd48..0000000 Binary files a/network.db and /dev/null differ diff --git a/tkTest.py b/tkTest.py deleted file mode 100644 index 8cd1853..0000000 --- a/tkTest.py +++ /dev/null @@ -1,19 +0,0 @@ -from tkinter import * -import platform - - -def get_platform(): - import platform - sys_platform = platform.platform().lower() - if "windows" in sys_platform: - print("Windows") - elif "macos" in sys_platform: - print("Mac os") - elif "linux" in sys_platform: - print("Linux") - else: - print("其他系统") - - -if __name__ == "__main__": - get_platform()