master
bettleChen 1 year ago
parent 0230d4834c
commit 60b4414c1f

@ -1,954 +0,0 @@
import ipaddress
import sys
import threading
import ttkbootstrap as tk
from ttkbootstrap import *
from ttkbootstrap import ttk
from tkinter import messagebox
import re
from PIL import ImageTk, Image
import platform
from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase
from dbUtil import search, execute_sql, delete_obj, truncate_db
def validate_ip_address(ip_address):
"""
匹配ip地址格式是否规范
:param ip_address: IP地址
:return: Boolean
"""
# 定义IP地址的正则表达式模式
pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$'
pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$'
# 使用re模块进行匹配
match_with_subnet = re.match(pattern_with_subnet, ip_address)
match_without_subnet = re.match(pattern_without_subnet, ip_address)
if match_with_subnet:
# 带有子网掩码的IP地址
# 检查每个组件的取值范围是否在 0-255 之间
for group in match_with_subnet.groups()[:4]:
if not (0 <= int(group) <= 255):
return False
# 检查子网掩码的取值范围是否在 0-32 之间
subnet_mask = int(match_with_subnet.groups()[4])
if not (0 <= subnet_mask <= 32):
return False
return True
elif match_without_subnet:
# 不带子网掩码的IP地址
# 检查每个组件的取值范围是否在 0-255 之间
for group in match_without_subnet.groups():
if not (0 <= int(group) <= 255):
return False
return True
else:
# IP地址格式不正确
return False
class RouterConfigWindow(tk.Toplevel):
def __init__(self, parent, router_obj):
super().__init__(parent)
self.geometry("435x433+350+200")
self.title(f"{router_obj.ObjLabel}路由表配置")
self.router_obj = router_obj
self.interface_entries = []
self.router_table = {}
self.create_interface_inputs()
self.create_router_table()
def create_interface_inputs(self):
label_text = ["接口1", "接口2", "接口3", "接口4"]
for i in range(4):
label = tk.Label(self, text=label_text[i])
label.grid(row=i, column=0, padx=10, pady=5, sticky="w")
entry = tk.Entry(self, width=20)
entry.grid(row=i, column=1, padx=10, pady=5, sticky="w")
self.interface_entries.append(entry)
button = tk.Button(self, text="添加", command=lambda index=i: self.add_router_entry(index))
button.grid(row=i, column=2, padx=10, pady=5)
lab = LabelFrame(self, text="示例")
lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20)
Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11").pack()
def create_router_table(self):
def on_right_click(event):
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
if row:
self.router_treeview.selection_set(row) # 选中该行
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
def delete_row():
selected_items = self.router_treeview.selection() # 获取选中的行
for item in selected_items:
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
self.router_obj.delete_config(ifs, network)
self.router_treeview.delete(item)
self.router_table_frame = tk.Frame(self)
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings")
self.router_treeview.heading("Interface", text="接口")
self.router_treeview.heading("Route", text="网段")
self.router_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
scrollbar.pack(side="right", fill="y")
self.router_treeview.configure(yscrollcommand=scrollbar.set)
self.router_table = self.router_obj.router_table
self.router_treeview.bind("<Button-3>", on_right_click)
# 创建删除菜单
delete_menu = tk.Menu(root, tearoff=False)
delete_menu.add_command(label="删除", command=delete_row)
self.update_router_table()
def add_router_entry(self, index):
entry_text = self.interface_entries[index].get()
try:
ipaddress.ip_network(entry_text)
if isinstance(self.router_obj, SimRouter):
if not validate_ip_address(entry_text):
messagebox.showerror("注意", message="添加的网段信息格式不合格")
self.interface_entries[index].delete(0, tk.END)
self.focus_set()
return
if entry_text:
if index + 1 in self.router_table:
self.router_table[index + 1].append(entry_text)
else:
self.router_table[index + 1] = [entry_text]
self.interface_entries[index].delete(0, tk.END)
self.router_obj.add_config(entry_text, index + 1)
self.update_router_table()
except:
messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12")
return
def update_router_table(self):
self.router_treeview.delete(*self.router_treeview.get_children())
for i, entrys in self.router_table.items():
for entry in entrys:
self.router_treeview.insert("", "end", values=(f"接口{i}", entry))
class SwitchConfigWindow(RouterConfigWindow):
def __init__(self, parent, router_obj):
super().__init__(parent, router_obj)
self.geometry("435x433+350+200")
self.title(f"{router_obj.ObjLabel}交换表配置")
self.router_obj = router_obj
self.interface_entries = []
self.router_table = {}
self.create_interface_inputs()
self.create_router_table()
def create_router_table(self):
def on_right_click(event):
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
if row:
self.router_treeview.selection_set(row) # 选中该行
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
def delete_row():
selected_items = self.router_treeview.selection() # 获取选中的行
for item in selected_items:
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
self.router_obj.delete_config(ifs, network)
self.router_treeview.delete(item)
self.router_table_frame = tk.Frame(self)
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings")
self.router_treeview.heading("Interface", text="接口")
self.router_treeview.heading("Route", text="mac")
self.router_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
scrollbar.pack(side="right", fill="y")
self.router_treeview.configure(yscrollcommand=scrollbar.set)
self.router_table = self.router_obj.mac_table
self.router_treeview.bind("<Button-3>", on_right_click)
# 创建删除菜单
delete_menu = tk.Menu(root, tearoff=False)
delete_menu.add_command(label="删除", command=delete_row)
self.update_router_table()
def add_router_entry(self, index):
entry_text = self.interface_entries[index].get()
if isinstance(self.router_obj, SimRouter):
if not validate_ip_address(entry_text):
messagebox.showerror("注意", message="添加的网段信息格式不合格")
self.interface_entries[index].delete(0, tk.END)
self.focus_set()
return
if entry_text:
if index + 1 in self.router_table:
self.router_table[index + 1].append(entry_text)
else:
self.router_table[index + 1] = [entry_text]
self.interface_entries[index].delete(0, tk.END)
self.router_obj.add_config(entry_text, index + 1)
self.update_router_table()
class NetWorkAnalog(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.master = master
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/路由器.png").resize((60, 60)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/交换机.png").resize((60, 60)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/集线器.png").resize((60, 60)))
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/主机.png").resize((60, 60)))
self.chose = self.host_img
self.AllSimObjs = {}
self.conns = []
self.drawLine = True # 画线标志
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
self.chose_obj = None
self.show_label_flag = True
self.show_interface_flag = True
self.create_widget()
def is_chose(self):
"""
当被选中时绘制选中框
:return:
"""
self.delete("rectangle")
self.create_rectangle(self.chose_obj.ObjX - 25, self.chose_obj.ObjY - 25, self.chose_obj.ObjX + 25,
self.chose_obj.ObjY + 25, outline="red", tags="rectangle")
def tag_bind_event(self):
"""
为每个子组件绑定事件
:return:
"""
def init():
"""
初始化方法将连接对象赋值为初始化值
:return:
"""
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
def show_menu(event, tag: SimBase):
"""
右击组件弹出选择接口框
:param event:
:param tag:
:return:
"""
menu = Menu(self, tearoff=0)
flag = False
for conn in tag.connections:
i = tag.connections.index(conn)
if not isinstance(conn, AllSimConnect):
menu.add_command(label=f"接口{i + 1}", command=lambda num=i + 1: interface_selected(num, tag))
flag = True
if not flag:
menu.add_command(label="暂无可用接口", state="disabled")
menu.post(event.x_root, event.y_root)
def interface_selected(interface, tag):
"""
选择接口回调
:param interface:
:return:
"""
if self.line_start_ifs is None:
self.line_start_ifs = interface
self.line_start_obj = tag
self.drawLine = True
self.bind("<Motion>", lambda event: right_motion(event))
self.bind("<Escape>", quit)
self.focus_set() # 获取焦点
else:
self.line_end_ifs = interface
self.line_end_obj = tag
self.delete("Line")
flag = False
if self.line_start_obj.ObjID == self.line_end_obj.ObjID:
messagebox.showerror("注意!", message="不能连接自己")
return
conn = AllSimConnect(self, self.line_start_obj, self.line_start_ifs, self.line_end_obj,
self.line_end_ifs)
for conn_obj in self.line_start_obj.connections: # 判断两个连接对象是否已经连接过了
if conn_obj == conn:
flag = True
if not flag:
self.line_start_obj.connections[self.line_start_ifs - 1] = conn
self.line_end_obj.connections[self.line_end_ifs - 1] = conn
conn.save()
self.conns.append(conn)
init()
self.unbind("<Motion>")
else:
del conn
self.delete("Line")
self.unbind("<Motion>")
init()
messagebox.showerror("注意!", message="已经连接过了")
def right_motion(event):
"""
移动鼠标
:param event:
:param tag:
:return:
"""
if self.drawLine:
self.delete("Line")
x, y = event.x, event.y
if not (150 < x < 650 and 40 < y < 490):
if x < 150:
x = 150
elif x > 650:
x = 650
if y < 40:
y = 40
elif y > 490:
y = 490
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1,
tags="Line")
return
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1, tags="Line")
def quit(event):
self.delete("Line") # 删除刚刚产生的连接线
self.drawLine = False # 关闭画线标志
init()
for tag_id, tag in self.AllSimObjs.items():
self.tag_bind(tag.ObjID, "<Button-3>", lambda event, tag=tag: show_menu(event, tag)) # 绑定右击事件
def bind_event(self, component, name):
# todo: 绑定事件
"""
绑定事件
:param component: 组件对象
:param name: 组件名称
"""
def move(event, name):
"""
鼠标左键松开事件
:param event: 事件对象
"""
if 170 < event.x < 630 and 60 < event.y < 470: # 在方框内,无变化
if name == "路由器":
tag = SimRouter(self, event.x, event.y)
elif name == "集线器":
tag = SimHub(self, event.x, event.y)
elif name == "交换机":
tag = SimSwitch(self, event.x, event.y)
else:
tag = SimHost(self, event.x, event.y)
self.AllSimObjs[tag.ObjID] = tag
tag.save()
self.tag_bind_event()
else:
self.delete("L")
def motion(event, name):
"""
鼠标拖动事件
:param event: 事件对象
"""
if name == "路由器":
self.chose = self.router_img
elif name == "集线器":
self.chose = self.hub_img
elif name == "交换机":
self.chose = self.switch_img
else:
self.chose = self.host_img
self.delete("L")
if 170 < event.x < 630 and 60 < event.y < 470: # 在方框内,无变化
pass
else: # 在方框外,显示禁止放置标识
self.create_oval(event.x - 10, event.y - 45, event.x + 10, event.y - 25, outline="red", width=2,
tags="L")
self.create_line(event.x - 7, event.y - 42, event.x + 8, event.y - 27, fill="red", width=2, tags="L")
self.create_image(event.x - 30, event.y - 30, image=self.chose, anchor="nw", tags="L")
self.tag_bind(component, "<ButtonRelease-1>", lambda event: move(event, name))
self.tag_bind(component, "<B1-Motion>", lambda event: motion(event, name))
def reload_data(self):
# todo: 加载上一次程序运行的数据
"""
加载上一次程序运行时的数据
:return:
"""
self.AllSimObjs = {}
self.conns = []
sim_obj_sql = "select * from sim_objs"
sim_obj_data = search(sim_obj_sql)
for index, row in sim_obj_data.iterrows(): # 初始化组件对象
sim_type = row["ObjType"]
ObjX = row["ObjX"]
ObjY = row["ObjY"]
ConfigCorrect = row["ConfigCorrect"]
ObjLable = row["ObjLabel"]
ObjID = row["ObjID"]
if sim_type == 1:
tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 2:
tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 3:
tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
else:
tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
self.AllSimObjs[tag.ObjID] = tag
sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id"
sim_conn_data = search(sim_conn_sql)
conn_datas = {}
for index, conn in sim_conn_data.iterrows():
if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"]))
for key, value in conn_datas.items():
conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1],
self.AllSimObjs[value[1][0]], value[1][1], key[1])
self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象
self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj
self.conns.append(conn_obj)
conn_obj.ConfigCorrect = key[1]
self.tag_bind_event()
def delete_obj(self):
# todo: 删除对象
"""
选中删除对象
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除的对象!")
return
ask = messagebox.askquestion(title='确认操作', message='确认删除该对象吗?')
if ask == "no":
return
self.delete(self.chose_obj.ObjID) # 删除图片
self.delete(self.chose_obj.ObjID + "text") # 删除标签
for conn in self.chose_obj.connections: # 删除该对象的所有连接线
if isinstance(conn, AllSimConnect):
conn.delete_line()
delete_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{self.chose_obj.ObjID}')"
execute_sql(delete_sql)
delete_obj(self.chose_obj.ObjID)
self.delete("rectangle")
self.reload_data()
def delete_line(self):
# todo: 删除连接线
"""
删除连接线
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除连接线的对象!")
return
conn_sql = f"""
select
s.conn_id, ConfigCorrect, node_id, node_ifs
from sim_conn s
join conn_config c on s.conn_id=c.conn_id
where node_id='{self.chose_obj.ObjID}'
"""
conn_data = search(conn_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
child_d = tk.Toplevel()
child_d.title(f"{self.chose_obj.ObjLabel}的连接线配置")
child_d.geometry('300x200+450+200')
child_d.grab_set()
cv1 = Canvas(child_d)
cv1.pack()
value = StringVar()
combobox = ttk.Combobox(
master=child_d, # 父容器
height=12, # 高度,下拉显示的条目数量
width=15, # 宽度
state='readonly', # readonly(只可选)
font=('', 18), # 字体
textvariable=value, # 通过StringVar设置可改变的值
values=list(conn_names.keys()), # 设置下拉框的选项
)
def del_line():
if value.get() == "":
messagebox.showerror("注意", message="请选择需要删除的连接线!")
return
conn_sql = f"""
select
conn_id, node_id, node_ifs
from conn_config
where conn_id='{value.get()}'
"""
conn_data = search(conn_sql)
delete_sql = f"delete from sim_conn where conn_id='{value.get()}'"
execute_sql(delete_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
for data in conn_names[value.get()]:
self.AllSimObjs[data[0]].connections[data[1] - 1].delete_line()
self.AllSimObjs[data[0]].connections[data[1] - 1] = None
child_d.destroy()
messagebox.showinfo("提示", f"连接线{value.get()}删除成功!")
btn_yes = tk.Button(child_d, text='确定', font=('黑体', 12), height=1, command=del_line)
btn_yes.place(x=240, y=20)
combobox.place(x=20, y=20)
def update_tag_name(self):
# todo: 更新组件名称
"""
更新组件名称
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要更新的对象!")
return
child1 = tk.Toplevel()
child1.title(self.chose_obj.ObjLabel + "的标签信息")
child1.geometry('240x100+450+250')
child1.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child1, text='原标签:' + self.chose_obj.ObjLabel,
font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w')
tk.Label(child1, text='新标签:', font=('黑体', 12)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示
new_name = tk.Entry(child1, font=('黑体', 12), textvariable=tk.StringVar())
new_name.grid(row=1, column=1)
def update_tag():
name = new_name.get()
if name == "":
messagebox.showerror("注意", message="请输入新名称!")
return
self.chose_obj.ObjLabel = name
self.chose_obj.update()
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
conn.update_info(self.chose_obj.ObjID)
child1.destroy()
messagebox.showinfo("提示", message="修改成功!")
tk.Button(child1, text='确定', font=('黑体', 10), height=1, command=update_tag).grid(row=2, column=0,
sticky='e')
tk.Button(child1, text='取消', font=('黑体', 10), height=1, command=child1.destroy).grid(row=2, column=1,
sticky='e')
def network_config(self):
# todo: 网络配置
"""
网络配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if len(self.chose_obj.get_config()) == 0:
messagebox.showerror("注意", message="请先给对象添加连接线!")
return
child_r = tk.Toplevel()
child_r.title(self.chose_obj.ObjLabel + "的网络配置信息")
child_r.geometry('530x395+350+200')
child_r.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
ifs_frame = tk.Frame(child_r)
ifs_frame.grid(row=1, column=0, columnspan=4) # 装接口的框架
ifs_frame_YN = tk.Frame(child_r)
ifs_frame_YN.grid(row=2, column=0, columnspan=4)
num = 0
datas = {}
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
index = self.chose_obj.connections.index(conn)
num_label = tk.LabelFrame(ifs_frame, text=f'接口{index + 1}')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 10)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2, 3] else NORMAL)
mac_en.insert('0', str(self.chose_obj.interface[index].get("mac", "")))
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 10)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2] else NORMAL)
ip_en.insert('0', str(self.chose_obj.interface[index].get("ip", "")))
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 10)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
port_en.insert('0', str(self.chose_obj.interface[index].get("conn_port", "")))
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 10)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
addr_en.insert('0', str(self.chose_obj.interface[index].get("addr", "")))
addr_en.grid(row=1, column=3, padx=10)
num += 1
datas[index + 1] = {"mac": mac_en, "ip": ip_en, "conn_port": port_en, "addr": addr_en}
num_label = tk.LabelFrame(ifs_frame, text=f'示例')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 10)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar())
mac_en.insert('0', "MAC11")
mac_en.config(state=READONLY)
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 10)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar())
ip_en.insert(0, "10.1.1.10")
ip_en.config(state=READONLY)
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 10)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar())
port_en.insert('0', "80")
port_en.config(state=READONLY)
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 10)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 12), textvariable=tk.StringVar())
addr_en.insert('0', "10.1.2.10:10810:Name3")
addr_en.config(state=READONLY)
addr_en.grid(row=1, column=3, padx=10)
def commit():
self.chose_obj.config(datas)
child_r.destroy()
tk.Button(ifs_frame_YN, text='确定', font=('黑体', 10), height=1, command=commit).grid(row=0, column=0,
padx=20) # ,sticky="w"
tk.Button(ifs_frame_YN, text='取消', font=('黑体', 10), height=1,
command=child_r.destroy).grid(row=0, column=2, padx=20)
def router_table_config(self):
# todo: 路由表配置
"""
路由表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 2:
messagebox.showerror("注意", message="请选择路由器对象!")
return
RouterConfigWindow(self, self.chose_obj)
def mac_table_config(self):
# todo: 交换表配置
"""
交换表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择交换机对象!")
return
SwitchConfigWindow(self, self.chose_obj)
def show_label(self):
"""
显示/不显示标签
:return:
"""
self.show_label_flag = not self.show_label_flag
if self.show_label_flag:
for tag in self.AllSimObjs.values():
tag.create_img()
else:
for tag in self.AllSimObjs.values():
self.delete(tag.ObjID + "text")
def show_interface(self):
# todo: 显示/不显示接口
"""
显示/不显示接口
:return:
"""
self.show_interface_flag = not self.show_interface_flag
if self.show_interface_flag:
for conn in self.conns:
conn.draw_line()
else:
for conn in self.conns:
self.delete(conn.NobjS.ObjID + conn.NobjE.ObjID)
def show_network_config(self):
# todo: 显示网络配置信息
"""
显示网络配置信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
self.delete("netSet")
self.create_text(740, 120, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 10, 'bold'),
fill="#7030a0", tags="netSet")
self.create_text(675, 145, text=self.chose_obj,
anchor="nw", font=('宋体', 11), tags="netSet")
def show_router_config(self):
# todo: 显示路由表交换表信息
"""
显示路由交换表信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ObjType != 2 and self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择路由器/交换机对象!")
return
self.delete("routerSet")
self.create_text(905, 120, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 10, 'bold'),
fill="#7030a0", tags="routerSet")
self.create_text(835, 145, text=self.chose_obj.get_table_config(),
anchor="nw", font=('宋体', 11), tags="routerSet")
def send_packet(self):
"""
发送数据包
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ConfigCorrect != 1:
messagebox.showerror("注意", message="请先对选择对象进行网络配置!")
return
if self.chose_obj.ObjType != 1:
messagebox.showerror("注意", message="请选择主机对象!")
return
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('240x100+450+250')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='目的IP:', font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w')
packet_ip = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1)
tk.Label(child2, text='目的MAC:', font=('黑体', 12)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示
packet_mac = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_mac.grid(row=1, column=1)
tk.Label(child2, text='消息:', font=('黑体', 12)).grid(row=2, column=0, sticky='w') # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_message.grid(row=2, column=1)
def send():
"""
发送数据包
:return:
"""
if packet_ip.get() == "":
messagebox.showerror("注意", message="ip地址不能为空")
return
if not validate_ip_address(packet_ip.get()):
messagebox.showerror("注意", message="IP地址不规范")
return
if packet_mac.get() == "":
messagebox.showerror("注意", message="mac地址不能为空")
return
if packet_message.get() == "":
messagebox.showerror("注意", message="消息不能为空!")
return
self.chose_obj.create_packet(packet_ip.get(),
packet_mac.get(),
packet_message.get())
child2.destroy()
tk.Button(child2, text='确定', font=('黑体', 10), height=1, command=send).grid(row=3, column=0, sticky='e')
tk.Button(child2, text='取消', font=('黑体', 10), height=1, command=child2.destroy).grid(row=3, column=1,
sticky='e')
def send_packet_list(self):
"""
批量发送数据包
:return:
"""
hosts = {}
for tag in self.AllSimObjs.values():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("批量数据包配置")
child2.geometry('400x420+450+200')
tk.Label(child2, text='目的IP:', font=('黑体', 12)).grid(row=0, column=0, columnspan=2, sticky='w', ipady=10)
packet_ip = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1, pady=5)
tk.Label(child2, text='目的MAC:', font=('黑体', 12)).grid(row=1, column=0, sticky='w',
pady=5) # ,sticky='w'靠左显示
packet_mac = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_mac.grid(row=1, column=1, pady=5)
tk.Label(child2, text='消息:', font=('黑体', 12)).grid(row=2, column=0, sticky='w', pady=5) # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 12), textvariable=tk.StringVar())
packet_message.grid(row=2, column=1, pady=5)
host = StringVar()
tk.Label(child2, text='发送主机:', font=('黑体', 12)).grid(row=3, column=0, sticky='w',
pady=5) # ,sticky='w'靠左显示
combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=12, # 宽度
state='readonly', # readonly(只可选)
font=('', 18), # 字体
textvariable=host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
combobox.grid(row=3, column=1, pady=5)
packet_frame = tk.Frame(child2)
packet_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
packet_treeview = ttk.Treeview(packet_frame, columns=("source_host", "ip", "mac", "message"), show="headings")
packet_treeview.heading("source_host", text="发送主机")
packet_treeview.column("source_host", width=60, anchor=CENTER)
packet_treeview.heading("ip", text="IP")
packet_treeview.column("ip", width=120, anchor=CENTER)
packet_treeview.heading("mac", text="MAC")
packet_treeview.column("mac", width=60, anchor=CENTER)
packet_treeview.heading("message", text="消息")
packet_treeview.column("message", width=120, anchor=CENTER)
packet_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(packet_frame, orient="vertical", command=packet_treeview.yview)
scrollbar.pack(side="right", fill="y")
packet_treeview.configure(yscrollcommand=scrollbar.set)
packets = []
def add():
ip = packet_ip.get()
mac = packet_mac.get()
message = packet_message.get()
chose_host = host.get()
if ip == "" or mac == "" or message == "" or chose_host == "":
messagebox.showerror("注意", message="输入框不能为空")
return
packet = SimPacket(self.AllSimObjs[hosts[chose_host]].interface[0]["ip"],
self.AllSimObjs[hosts[chose_host]].interface[0]["mac"],
ip, mac, message)
packets.append((self.AllSimObjs[hosts[chose_host]], packet))
packet_treeview.delete(*packet_treeview.get_children())
for datas in packets:
tag: SimBase = datas[0]
packet_data: SimPacket = datas[1]
packet_treeview.insert("", "end", values=(
tag.ObjLabel, packet_data.destination_ip, packet_data.destination_mac, packet_data.message))
def send():
if len(packets) == 0:
messagebox.showerror("注意", message="请至少添加一条数据包!")
return
for data in packets:
threading.Thread(target=data[0].send, args=(data[1],)).start()
child2.destroy()
tk.Button(child2, text="添加数据包", font=('黑体', 12), height=3, command=add).grid(row=0, column=2, rowspan=4)
button_frame = Frame(child2)
tk.Button(button_frame, text="发送", font=('黑体', 12), height=1, command=send).pack(side=RIGHT, padx=20)
tk.Button(button_frame, text="取消", font=('黑体', 12), height=1, command=child2.destroy).pack(side=RIGHT)
button_frame.grid(row=6, column=0, columnspan=3)
def clear_canvas(self):
"""
清除画布
:return:
"""
ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?')
if ask == "no":
return
truncate_db() # 清除数据库
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
def create_widget(self):
# todo: 创建页面
"""
创建整体页面布局
:return:
"""
self.create_rectangle(150, 40, 650, 490, outline="#7f6000", width=3) # 矩形框,左上角坐标,右下角坐标
self.create_rectangle(660, 100, 815, 400, outline="#ffff00", width=3, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(735, 105, text="网络配置信息", anchor="n", font=('微软雅黑', 11, 'bold'))
self.create_rectangle(825, 100, 1000, 400, outline="#ffff00", width=3, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(915, 105, text="路由/交换表信息", anchor="n", font=('微软雅黑', 11, 'bold'))
# 显示左边的固定图片
self.create_text(80 - 55, 120 + 15, text="路由器", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字
router = self.create_image(80, 120, image=self.router_img, anchor="nw")
self.create_text(80 - 55, 190 + 15, text="交换机", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字
switch = self.create_image(80, 190, image=self.switch_img, anchor="nw")
self.create_text(80 - 55, 260 + 15, text="集线器", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字
hub = self.create_image(80, 260, image=self.hub_img, anchor="nw")
self.create_text(80 - 55, 330 + 15, text="主机", anchor="nw", font=('微软雅黑', 14, 'bold')) # 显示文字
host = self.create_image(80, 330, image=self.host_img, anchor="nw")
self.bind_event(router, "路由器")
self.bind_event(switch, "交换机")
self.bind_event(hub, "集线器")
self.bind_event(host, "主机")
# 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方
menubar = tk.Menu(root)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='删除与修改', menu=setMenu)
setMenu.add_command(label='删除对象', command=self.delete_obj)
setMenu.add_command(label='删除连接线', command=self.delete_line)
setMenu.add_command(label='修改标签', command=self.update_tag_name)
# # 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='基础配置', menu=setMenu)
setMenu.add_command(label='网络配置', command=self.network_config)
setMenu.add_command(label='路由表配置', command=self.router_table_config)
setMenu.add_command(label='交换表配置', command=self.mac_table_config)
root.config(menu=menubar)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='显示设置', menu=setMenu)
setMenu.add_command(label='显示/不显示标签', command=self.show_label)
setMenu.add_command(label='显示/不显示接口', command=self.show_interface)
setMenu.add_command(label='显示网络配置信息', command=self.show_network_config)
setMenu.add_command(label='显示路由/交换表信息', command=self.show_router_config)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='发送数据包', menu=setMenu)
setMenu.add_command(label='发送数据包', command=self.send_packet)
setMenu.add_command(label='批量发送数据包', command=self.send_packet_list)
menubar.add_command(label='清除屏幕', command=self.clear_canvas)
root.config(menu=menubar)
self.reload_data()
if __name__ == '__main__':
root = Window()
root.title('网络拓扑图')
width = 1030
height = 530 # 窗口大小
screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小
screen_height = root.winfo_screenheight()
x = int((screen_width - width) / 2)
y = int((screen_height - height) / 2) - 40
size = '{}x{}+{}+{}'.format(width, height, x, y)
canvas = NetWorkAnalog(root, width=1030, heigh=height, bg="white")
canvas.place(x=0, y=0, anchor='nw')
root.geometry(size)
root.mainloop()

@ -1,757 +0,0 @@
import math
import sys
import threading
from time import sleep
from ttkbootstrap import *
from uuid import uuid4
import ipaddress
from PIL import ImageTk, Image
from dbUtil import search, execute_sql
class SimBase():
# todo: 组件父类
"""
图标类所有组件的父类
"""
def __init__(self, canvas: Canvas, x, y, id, config=None, label=None):
self.ConfigCorrect = 0 if config is None else config # 是否进行配置
self.ObjType = None # 组件类型 1->主机2->路由器3->交换机4->集线器
self.ObjLabel = ""
self.ObjID = id
self.ObjX = x
self.ObjY = y
self.canvas = canvas
self.img = None
self.img_tm = None
self.interface = [{}, {}, {}, {}]
self.connections = ["1", "2", "3", "4"]
self.set_default_config()
def bind_event(self):
self.canvas.tag_bind(self.ObjID, "<Button-1>", self.start_drag)
self.canvas.tag_bind(self.ObjID, "<B1-Motion>", self.drag)
self.canvas.tag_bind(self.ObjID, "<ButtonRelease-1>", self.release)
def set_default_name(self):
data_frame = search(f"select objid from sim_objs where objType={self.ObjType}")
num = data_frame.size + 1
if isinstance(self, SimHost):
name = "SHO%d" % num
elif isinstance(self, SimRouter):
name = "SRO%d" % num
elif isinstance(self, SimSwitch):
name = "SWI%d" % num
else:
name = "SHUB%d" % num
return name
def release(self, event):
if self.click_x == event.x and self.click_y == event.y: # 鼠标左键单击
self.canvas.chose_obj = self
self.canvas.is_chose()
else:
self.update()
def start_drag(self, event):
self.canvas.tag_raise(self.ObjID) # 将 SimBase 组件置于最上层
self.start_x = event.x
self.start_y = event.y
self.click_x = event.x
self.click_y = event.y
def drag(self, event):
"""
移动图标
:param event:
:return:
"""
self.canvas.delete("rectangle")
self.canvas.chose_obj = None
dx = event.x - self.start_x
dy = event.y - self.start_y
# 移动范围限制, 超出移动范围则直接返回
if not (170 <= self.ObjX + dx <= 630 and 60 <= self.ObjY + dy <= 470):
return
self.ObjX += dx
self.ObjY += dy
self.canvas.move(self.ObjID, dx, dy) # 移动 SimBase 组件
self.canvas.move(self.ObjID + "text", dx, dy) # 移动 SimBase 组件
self.start_x = event.x
self.start_y = event.y
for conn in self.connections:
if isinstance(conn, AllSimConnect):
conn.update_line()
def create_img(self):
"""
创建图片
:return:
"""
self.canvas.delete("L")
id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30,
image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw",
tags=self.ObjID)
self.canvas.dtag("L", "L")
self.canvas.create_text(self.ObjX, self.ObjY - 40, text=self.ObjLabel, font=("", 12, "bold"),
fill="#7030a0", tags=self.ObjID + "text", anchor="nw")
self.canvas.tag_raise(id)
self.bind_event()
def config(self, interface):
"""
网络配置方法
:param interface: 传入配置数据
:return:
"""
for key, value in interface.items():
self.interface[key - 1] = {key: value.get() if not value.get() == "" else "NULL" for key, value in value.items()}
self.ConfigCorrect = 1
self.create_img()
for conn in self.connections:
if isinstance(conn, AllSimConnect):
index = self.connections.index(conn) + 1
conn.update_node_config(self, index)
self.update()
def set_default_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
for index, conn in conn_data.iterrows():
self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"],
"mac": conn["mac"],
"conn_port": conn["conn_port"],
"addr": conn["addr"]}
def get_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
return conn_data
def save(self):
"""
将对象存储至mysql当中
:return:
"""
sql = f"insert into sim_objs values ('{self.ObjID}', {self.ObjType}, '{self.ObjLabel}'," \
f"{self.ObjX}, {self.ObjY}, {self.ConfigCorrect})"
execute_sql(sql)
def update(self):
"""
当坐标发生改变时修改数据库数据
:return:
"""
self.canvas.delete(self.ObjID + "text")
self.canvas.create_text(self.ObjX, self.ObjY - 40, text=self.ObjLabel, font=("", 12, "bold"),
fill="#7030a0", tags=self.ObjID + "text", anchor="nw")
sql = f"update sim_objs set objLabel='{self.ObjLabel}', ObjX={self.ObjX}," \
f"ObjY={self.ObjY}, ConfigCorrect={self.ConfigCorrect} where ObjID='{self.ObjID}'"
execute_sql(sql)
def transfer_animate(self, status, packet, error_message=None):
if status:
text = f"目的IP: {str(packet.destination_ip)}\n" \
f"目的MAC: {packet.destination_mac}\n" \
f"消息内容: {packet.message}"
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20, outline="#92d050",
width=3, fill="#92d050", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
else:
text = f"传输失败\n" if error_message is None else error_message
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red",
width=3, fill="red", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimPacket():
# todo: 数据包类
"""
数据包类
"""
def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message):
"""
:param source_mac: 源主机mac地址
:param source_ip: 源主机ip地址
:param destination_mac: 目的主机mac地址
:param destination_ip: 目的主机ip地址
:param message: 数据
"""
self.source_mac = source_mac
self.source_ip = source_ip
self.destination_mac = destination_mac
self.destination_ip = destination_ip
self.message = message
self.up_jump = None # 上一跳连接对象
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)))
self.id = str(uuid4())
def move(self, cv, target_x, target_y, duration):
start_x, start_y = cv.coords(self.id)
distance_x = target_x - start_x
distance_y = target_y - start_y
steps = duration // 10 # 以10毫秒为间隔进行移动
step_x = distance_x / steps
step_y = distance_y / steps
self._move_step(cv, start_x, start_y, target_x, target_y, step_x, step_y, steps)
def _move_step(self, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps):
if steps > 0:
new_x = start_x + step_x
new_y = start_y + step_y
cv.coords(self.id, new_x, new_y)
cv.update() # 更新画布显示
sleep(0.01) # 添加延迟以控制动画速度
self._move_step(cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1)
else:
cv.coords(self.id, target_x, target_y)
cv.delete(self.id)
def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase):
cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id)
self.move(cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500)
class AllSimConnect():
# todo: 连接类
def __init__(self, canvas: Canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None):
"""
连接对象
:param nodex: 节点
:param nodex_ifs: 节点接口
:param nodey: 节点
:param nodey_ifs: 节点接口
"""
self.canvas = canvas
self.ConfigCorrect = 0 if config is None else config
self.NobjS = nodex
self.NobjE = nodey
self.IfsS = nodex_ifs
self.IfsE = nodey_ifs
self.width = 1 if self.ConfigCorrect == 0 else 4 # 线的粗细
self.draw_line()
def is_connected_to(self, other):
"""
判断两个连接对象是否已经连接
:param other: 另一个连接对象
:return: 如果已连接则返回True否则返回False
"""
return (
self.NobjS.ObjID == other.NobjS.ObjID
and self.NobjE.ObjID == other.NobjE.ObjID
)
def update_node_config(self, node, ifs):
"""
当节点进行网络配置后将节点的接口配置保存
:return:
"""
try:
nodex_update_sql = f"""
update conn_config set
ip='{node.interface[ifs - 1]["ip"]}',
mac='{node.interface[ifs - 1]["mac"]}',
conn_port='{node.interface[ifs - 1]["conn_port"]}',
addr='{node.interface[ifs - 1]["addr"]}'
where node_id='{node.ObjID}' and node_ifs={ifs}
"""
execute_sql(nodex_update_sql)
self.ConfigCorrect = 1
self.check_config()
update_sql = f"""
update sim_conn set ConfigCorrect={self.ConfigCorrect}
where conn_id in (
select distinct conn_id from conn_config where node_id='{node.ObjID}'
)
"""
execute_sql(update_sql)
except Exception as E:
pass
def check_config(self):
"""
检查两边节点的配置是否正确
:return:
"""
if self.NobjS.interface[self.IfsS - 1]["mac"] != "" and self.NobjS.interface[self.IfsS - 1][
"ip"] != "" \
and self.NobjE.interface[self.IfsE - 1]["mac"] != "" and self.NobjE.interface[self.IfsE - 1][
"ip"] != "":
self.width = 4
self.draw_line()
else:
self.width = 1
self.draw_line()
def transfer(self, source_node, packet: SimPacket):
"""
传输数据
:param packet: 数据包
:return:
"""
if source_node == self.NobjS:
packet.transfer_packet(self.canvas, self.NobjS, self.NobjE)
self.NobjE.receive(packet)
else:
packet.transfer_packet(self.canvas, self.NobjE, self.NobjS)
self.NobjS.receive(packet)
def draw_line(self):
line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY,
width=self.width, fill="#5b9bd5", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line")
self.canvas.tag_lower(line)
self.analyseIFS(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjS.ObjLabel, self.IfsS,
self.NobjE.ObjX, self.NobjE.ObjY, self.NobjE.ObjLabel, self.IfsE)
def update_line(self):
"""
当组件移动时重新绘制线
:return:
"""
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line")
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID)
self.draw_line()
def update_info(self, obj_id):
"""
修改数据库中的连接信息
:return:
"""
update_sql = f"""
update sim_conn set conn_id='{self.NobjS.ObjLabel}-{self.NobjE.ObjLabel}', ConfigCorrect={self.ConfigCorrect}
where conn_id in (
select distinct conn_id from conn_config where node_id='{obj_id}'
)
"""
execute_sql(update_sql)
def analyseIFS(self, SX, SY, SLabel, IfsS, EX, EY, ELabel, IfsE): # NobjS的x,y;NobjE的x,y; #分析接口在哪个位置,再显示
'''
:param SX: S对象的x坐标
:param SY: S对象的y坐标
:param SLabel: S对象的标签
:param IfsS: S对象要显示的接口号
'''
if (EX - SX) == 0: # 即垂直的时候,NobjS在NobjE对象的正上方和正下方x坐标无变化只需要y变化
R = 18
x = 0 # x无偏移量
y = R
else:
k = (EY - SY) / (EX - SX) # ey-sy/sx-ex
reat = math.atan(k) # 根据斜率计算弧度
# 连线与图标交接点坐标
R = 18
x = abs(math.cos(reat) * R) + 6 # python math中三角函数中的数值是弧度而计算器中的数值是角度
y = abs(math.sin(reat) * R) + 6
if SX <= EX and SY <= EY: # NobjS在NobjE的左上角NobjS的右下角连接NobjE的左上角
x_S = SX + x
y_S = SY + y # NobjS的连接点坐标
x_E = EX - x
y_E = EY - y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text((x_S - 5, y_S - 5), text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E - 10, y_E - 10, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX < EX and SY > EY: # NobjS在NobjE的左下角NobjS的右上角连接NobjE的左下角
x_S = SX + x
y_S = SY - y # NobjS的连接点坐标
x_E = EX - x
y_E = EY + y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S + 5, y_S - 10, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E - 5, y_E, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX > EX and SY < EY: # NobjS在NobjE的右上角NobjS的左下角连接NobjE的右上角
x_S = SX - x
y_S = SY + y # NobjS的连接点坐标
x_E = EX + x
y_E = EY - y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S - 5, y_S, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E + 5, y_E, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX >= EX and SY >= EY: # NobjS在NobjE的右下角NobjS的左上角连接NobjE的右下角
x_S = SX - x
y_S = SY - y # NobjS的连接点坐标
x_E = EX + x
y_E = EY + y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S - 5, y_S - 15, text=IfsS, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E - 5, y_E - 5, text=IfsE, anchor="nw", font=("幼圆", 12, "bold"), fill="red",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
def save(self):
"""
将连接对象保存至数据库
:return:
"""
conn_id = self.NobjS.ObjLabel + "-" + self.NobjE.ObjLabel
sql = f"insert into sim_conn values ('{conn_id}', {self.ConfigCorrect})"
execute_sql(sql)
execute_sql(
f"insert into conn_config values ('{conn_id}', '{self.NobjS.ObjID}', {self.IfsS}, '', '', '', '')")
execute_sql(
f"insert into conn_config values ('{conn_id}', '{self.NobjE.ObjID}', {self.IfsE}, '', '', '', '')")
def delete_line(self):
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID)
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line")
def __eq__(self, other):
"""
重写equals方法判断两个连接对象是否相同
:param other: 连接对象
:return: Boolean
"""
if isinstance(other, AllSimConnect):
other_ids = other.NobjS.ObjID + other.NobjE.ObjID
return (self.NobjS.ObjID in other_ids
and self.NobjE.ObjID in other_ids)
return False
class SimHost(SimBase):
# todo: 主机类
"""
主机类
"""
def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 1
self.ObjLabel = label if label is not None else self.set_default_name()
self.interface = [{}]
self.connections = [None]
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/主机.png").resize((60, 60)))
self.img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/主机_tm.png").resize((60, 60)))
self.set_default_config()
self.create_img()
def create_packet(self, ip, mac, message):
"""
创建数据包
:param ip: 目的主机ip
:param mac: 目的主机mac
:param message: 消息
:return:
"""
packet = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], ip, mac, message)
print(f"创建数据包成功,数据包由{packet.source_ip} 发往 {packet.destination_ip}")
self.send(packet)
def send(self, packet):
"""
发送数据包
:param packet:
:return:
"""
connection: AllSimConnect = self.connections[0]
print(f"数据包从 {self.ObjLabel} 发出")
packet.up_jump = connection
connection.transfer(self, packet)
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"主机{self.ObjLabel}接受到数据{packet.message}")
if packet.destination_ip == self.interface[0]["ip"]:
self.transfer_animate(True, packet)
else:
self.transfer_animate(False, packet)
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"AppAddr: /Root\n"
str += f"PORT: {data['conn_port']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimRouter(SimBase):
# todo: 路由类
"""
路由类
"""
def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 2
self.ObjLabel = label if label is not None else self.set_default_name()
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/路由器.png").resize((60, 60)))
self.img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/路由器_tm.png").resize((60, 60)))
self.create_img()
self.router_table = {}
self.set_default_router_table()
def set_default_router_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from router_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.router_table:
self.router_table[router_table["node_ifs"]].append(router_table["segment"])
else:
self.router_table[router_table["node_ifs"]] = [router_table["segment"]]
def check_destination_ip(self, destination_ip, network):
"""
检查目标ip是否属于网段范围内
:param destination_ip: 目标ip
:param network: 网段
:return:10.2.3.0/24
"""
ip = ipaddress.ip_address(destination_ip)
network = ipaddress.ip_network(network)
if ip in network:
return True
if network == "0.0.0.0/24": # 如果网段为0.0.0.0/24 则为默认路由
return True
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
flag = False
next_hop_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
if conn == packet.up_jump:
continue
ifs = self.connections.index(conn) + 1
for network in self.router_table[ifs]:
if self.check_destination_ip(packet.destination_ip, network):
flag = True
next_hop_ifs = ifs
if flag:
conn = self.connections[next_hop_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
else:
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.NobjS != self:
if conn.NobjE.ObjType == 1:
conn.transfer(self, packet)
break
error_message = "路由寻址失败"
self.transfer_animate(False, packet, error_message)
def receive(self, packet):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"{self.ObjLabel}-路由器接受到数据{packet.message}")
self.transmit(packet)
def add_config(self, router, router_ifs):
sql = f"insert into router_table values ('{self.ObjID}', {router_ifs}, '{router}')"
execute_sql(sql)
def delete_config(self, ifs, network):
sql = f"delete from router_table where obj_id='{self.ObjID}' and node_ifs={ifs} and segment='{network}'"
execute_sql(sql)
def get_table_config(self):
"""
返回对象的路由表配置信息,用于展示
:return:
"""
str = ""
sql = f"select * from router_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router in router_tables.iterrows():
str += f"网段号: {router['segment']}\n"
str += f"接口号: {router['node_ifs']}\n"
return str
class SimSwitch(SimBase):
# todo: 交换机类
"""
交换机类
"""
def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 3
self.ObjLabel = label if label is not None else self.set_default_name()
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/交换机.png").resize((60, 60)))
self.img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/交换机_tm.png").resize((60, 60)))
self.create_img()
self.mac_table = {}
self.set_default_mac_table()
def set_default_mac_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from mac_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.mac_table:
self.mac_table[router_table["node_ifs"]].append(router_table["mac"])
else:
self.mac_table[router_table["node_ifs"]] = [router_table["mac"]]
def add_config(self, router, router_ifs):
sql = f"insert into mac_table values ('{self.ObjID}', {router_ifs}, '{router}')"
execute_sql(sql)
def delete_config(self, ifs, mac):
sql = f"delete from mac_table where obj_id='{self.ObjID}' and node_ifs={ifs} and mac='{mac}'"
execute_sql(sql)
def get_table_config(self):
"""
返回对象的交换表配置信息,用于展示
:return:
"""
str = ""
sql = f"select * from mac_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router in router_tables.iterrows():
str += f"网段号: {router['mac']}\n"
str += f"接口号: {router['node_ifs']}\n"
return str
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
flag = False
next_hub_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
ifs = self.connections.index(conn) + 1
if packet.destination_mac in self.mac_table.get(ifs, []):
flag = True
next_hub_ifs = ifs
if flag:
conn = self.connections[next_hub_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
return
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"交换机{self.ObjLabel}接受到数据{packet.message}")
self.transmit(packet)
class SimHub(SimBase):
# todo: 集线器类
"""
集线器类
"""
def __init__(self, canvas: Canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 4
self.ObjLabel = label if label is not None else self.set_default_name()
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/集线器.png").resize((60, 60)))
self.img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/datas/images/集线器_tm.png").resize((60, 60)))
self.create_img()
def transmit(self, packet: SimPacket):
"""
集线器转发数据包
:return:
"""
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!")
self.transmit(packet)

@ -1,104 +0,0 @@
import sqlite3
import sys
import pandas as pd
from pandas import DataFrame
conn = sqlite3.connect(sys.path[0]+"/network.db")
def execute_sql(sql):
"""
执行sql语句
:param sql:
:return:
"""
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
def search(sql) -> DataFrame:
return pd.read_sql(sql, conn)
def delete_obj(obj_id):
cursor = conn.cursor()
delete_obj_sql = f"delete from sim_objs where ObjID='{obj_id}'"
cursor.execute(delete_obj_sql)
delete_conn_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{obj_id}')"
cursor.execute(delete_conn_sql)
conn.commit()
def truncate_db():
init_database()
def init_database():
cursor = conn.cursor()
cursor.execute("""
DROP TABLE IF EXISTS `conn_config`;
""")
cursor.execute("""
CREATE TABLE `conn_config` (
`conn_id` varchar(55) NULL DEFAULT NULL,
`node_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`ip` varchar(55) NULL DEFAULT NULL,
`mac` varchar(128) NULL DEFAULT NULL,
`conn_port` varchar(32) NULL DEFAULT NULL,
`addr` varchar(255) NULL DEFAULT NULL,
CONSTRAINT `conn_config_sim_conn_conn_id_fk` FOREIGN KEY (`conn_id`) REFERENCES `sim_conn` (`conn_id`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `mac_table`;
""")
cursor.execute("""
CREATE TABLE `mac_table` (
`obj_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`mac` varchar(55) NULL DEFAULT NULL,
CONSTRAINT `mac_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `router_table`;
""")
cursor.execute("""
CREATE TABLE `router_table` (
`obj_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`segment` varchar(55) NULL DEFAULT NULL,
CONSTRAINT `router_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `sim_conn`;
""")
cursor.execute("""
CREATE TABLE `sim_conn` (
`conn_id` varchar(255) NOT NULL ,
`ConfigCorrect` int(0) NULL DEFAULT NULL ,
PRIMARY KEY (`conn_id`)
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `sim_objs`;
""")
cursor.execute("""
CREATE TABLE `sim_objs` (
`ObjID` varchar(50) NOT NULL,
`ObjType` int(0) NULL DEFAULT NULL,
`ObjLabel` varchar(20) NULL DEFAULT NULL,
`ObjX` int(0) NULL DEFAULT NULL,
`ObjY` int(0) NULL DEFAULT NULL,
`ConfigCorrect` int(0) NULL DEFAULT NULL,
PRIMARY KEY (`ObjID`)
) ;
""")
conn.commit()
if __name__ == '__main__':
init_database()

Binary file not shown.

@ -1,19 +0,0 @@
from tkinter import *
import platform
def get_platform():
import platform
sys_platform = platform.platform().lower()
if "windows" in sys_platform:
print("Windows")
elif "macos" in sys_platform:
print("Mac os")
elif "linux" in sys_platform:
print("Linux")
else:
print("其他系统")
if __name__ == "__main__":
get_platform()
Loading…
Cancel
Save