You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

945 lines
45 KiB

5 months ago
import ipaddress
import sys
import threading
from tkinter import filedialog
from SimUtil import *
from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase
from dbUtil import search, execute_sql, delete_obj, truncate_db
from PIL import Image as imim
class Message(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.message = []
self.master = master
self.scrollbar = tk.Scrollbar(master, orient="vertical", command=self.yview)
self.scrollbar.pack(side="right", fill=BOTH)
self.config(yscrollcommand=self.scrollbar.set)
def show_message(self, message, color="#5fa8fe"):
self.message.append({"message": message, "color": color})
num = 0
self.delete("message")
for function in self.message:
self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15),
fill=function["color"], tags="message")
num += 1
self.update()
self.configure(scrollregion=self.bbox("all"))
self.scrollbar.set(1.0, 1.0)
self.yview_moveto(1.0)
class NetWorkAnalog(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.master = master
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.pack_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)))
self.true_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/true.png").resize((30, 30)))
self.false_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/false.png").resize((30, 30)))
self.width = int(self.cget("width"))
self.height = int(self.cget("height"))
self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)]
self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4))))
self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45))))
self.message_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30))
)
message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW)
self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
self.message.pack()
self.message.configure(bg="#edf8ff")
self.configure(bg="#ddeafe")
self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3])))
self.check_filename = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
self.create_image(0, 0, image=self.filename, anchor=NW)
self.chose = self.host_img
self.AllSimObjs = {}
self.receive = None
self.conns = []
self.drawLine = True # 画线标志
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
self.chose_obj = None
self.show_label_flag = True
self.show_interface_flag = True
self.check_error_obj = None
self.create_widget()
def is_chose(self):
"""
当被选中时绘制选中框
:return:
"""
self.delete("rectangle")
self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15,
self.chose_obj.ObjY + 15, outline="red", tags="rectangle")
self.show_network_config()
if self.chose_obj.ObjType == 2 or self.chose_obj.ObjType == 3:
self.show_router_config()
def tag_bind_event(self):
"""
为每个子组件绑定事件
:return:
"""
def init():
"""
初始化方法将连接对象赋值为初始化值
:return:
"""
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
def show_menu(event, tag: SimBase):
"""
右击组件弹出选择接口框
:param event:
:param tag:
:return:
"""
menu = Menu(self, tearoff=0)
flag = False
for conn in tag.connections:
i = tag.connections.index(conn)
if not isinstance(conn, AllSimConnect):
menu.add_command(label=f"接口{i + 1}", command=lambda num=i + 1: interface_selected(num, tag))
flag = True
if not flag:
menu.add_command(label="暂无可用接口", state="disabled")
menu.post(event.x_root, event.y_root)
def interface_selected(interface, tag):
"""
选择接口回调
:param interface:
:return:
"""
if self.line_start_ifs is None:
self.line_start_ifs = interface
self.line_start_obj = tag
self.drawLine = True
self.bind("<Motion>", lambda event: right_motion(event))
self.bind("<Escape>", quit)
self.focus_set() # 获取焦点
else:
self.line_end_ifs = interface
self.line_end_obj = tag
self.delete("Line")
flag = False
if self.line_start_obj.ObjID == self.line_end_obj.ObjID:
messagebox.showerror("注意!", message="不能连接自己")
return
conn = AllSimConnect(self, self.line_start_obj, self.line_start_ifs, self.line_end_obj,
self.line_end_ifs)
conn.draw_line()
for conn_obj in self.line_start_obj.connections: # 判断两个连接对象是否已经连接过了
if conn_obj == conn:
flag = True
if not flag:
self.line_start_obj.connections[self.line_start_ifs - 1] = conn
self.line_end_obj.connections[self.line_end_ifs - 1] = conn
self.message.show_message(f"{self.line_start_obj.ObjLabel}连接{self.line_end_obj.ObjLabel}成功!")
conn.save()
self.conns.append(conn)
init()
self.unbind("<Motion>")
else:
del conn
self.delete("Line")
self.unbind("<Motion>")
init()
messagebox.showerror("注意!", message="已经连接过了")
def right_motion(event):
"""
移动鼠标
:param event:
:param tag:
:return:
"""
if self.drawLine:
self.delete("Line")
x, y = event.x, event.y
if not (0 < x < self.canvas_size[2] and 0 < y < self.canvas_size[3]):
if x < 0:
x = 0
elif x > self.canvas_size[2]:
x = self.canvas_size[2]
if y < 0:
y = 0
elif y > self.canvas_size[3]:
y = self.canvas_size[3]
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1,
tags="Line")
return
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1, tags="Line")
def quit(event):
self.delete("Line") # 删除刚刚产生的连接线
self.drawLine = False # 关闭画线标志
init()
for tag_id, tag in self.AllSimObjs.items():
self.tag_bind(tag.ObjID, "<Button-3>", lambda event, tag=tag: show_menu(event, tag)) # 绑定右击事件
def add_sim_obj(self, component, name):
# todo: 绑定事件
"""
绑定事件
:param component: 组件对象
:param name: 组件名称
"""
def move(event, name):
"""
鼠标左键松开事件
:param event: 事件对象
"""
if 20 + self.canvas_size[0] < event.x < self.canvas_size[2] + 20 and 20 + self.canvas_size[1] < event.y < self.canvas_size[3] - 20: # 在方框内,无变化
if name == "路由器":
tag = SimRouter(self, event.x, event.y)
elif name == "集线器":
tag = SimHub(self, event.x, event.y)
elif name == "交换机":
tag = SimSwitch(self, event.x, event.y)
else:
tag = SimHost(self, event.x, event.y)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
tag.save()
self.message.show_message(f"组件 {tag.ObjLabel} 创建成功!")
self.tag_bind_event()
else:
self.delete("L")
def motion(event, name):
"""
鼠标拖动事件
:param event: 事件对象
"""
if name == "路由器":
self.chose = self.router_img
elif name == "集线器":
self.chose = self.hub_img
elif name == "交换机":
self.chose = self.switch_img
else:
self.chose = self.host_img
self.delete("L")
if 20 + self.canvas_size[0] < event.x < self.canvas_size[2] + 20 and 20 + self.canvas_size[1] < event.y < self.canvas_size[3] - 20: # 在方框内,无变化
pass
else: # 在方框外,显示禁止放置标识
self.create_oval(event.x - 10, event.y - 45, event.x + 10, event.y - 25, outline="red", width=2,
tags="L")
self.create_line(event.x - 7, event.y - 42, event.x + 8, event.y - 27, fill="red", width=2, tags="L")
self.create_image(event.x - 30, event.y - 30, image=self.chose, anchor="nw", tags="L")
self.tag_bind(component, "<ButtonRelease-1>", lambda event: move(event, name))
self.tag_bind(component, "<B1-Motion>", lambda event: motion(event, name))
def reload_data(self):
# todo: 加载上一次程序运行的数据
"""
加载上一次程序运行时的数据
:return:
"""
self.AllSimObjs = {}
self.conns = []
sim_obj_sql = "select * from sim_objs"
sim_obj_data = search(sim_obj_sql)
for index, row in sim_obj_data.iterrows(): # 初始化组件对象
sim_type = row["ObjType"]
ObjX = row["ObjX"]
ObjY = row["ObjY"]
ConfigCorrect = row["ConfigCorrect"]
ObjLable = row["ObjLabel"]
ObjID = row["ObjID"]
if sim_type == 1:
tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 2:
tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 3:
tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
else:
tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id"
sim_conn_data = search(sim_conn_sql)
conn_datas = {}
for index, conn in sim_conn_data.iterrows():
if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"]))
for key, value in conn_datas.items():
conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1],
self.AllSimObjs[value[1][0]], value[1][1], key[1])
conn_obj.draw_line()
self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象
self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj
self.conns.append(conn_obj)
conn_obj.ConfigCorrect = key[1]
self.tag_bind_event()
def show_obj(self, AllSimObj, AllSimConn):
self.AllSimObjs = AllSimObj
self.conns = AllSimConn
for key, sim_obj in self.AllSimObjs.items():
sim_obj.create_img()
for conn in self.conns:
conn.draw_line()
def delete_obj(self):
# todo: 删除对象
"""
选中删除对象
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除的对象!")
return
ask = messagebox.askquestion(title='确认操作', message='确认删除该对象吗?')
if ask == "no":
return
self.delete(self.chose_obj.ObjID) # 删除图片
self.delete(self.chose_obj.ObjID + "text") # 删除标签
for conn in self.chose_obj.connections: # 删除该对象的所有连接线
if isinstance(conn, AllSimConnect):
conn.delete_line()
delete_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{self.chose_obj.ObjID}')"
delete_config_sql = f"delete from conn_config where node_id='{self.chose_obj.ObjID}'"
execute_sql(delete_sql)
execute_sql(delete_config_sql)
delete_obj(self.chose_obj.ObjID)
self.message.show_message(f"组件 {self.chose_obj.ObjLabel} 删除成功!")
self.delete("rectangle")
self.reload_data()
def delete_line(self):
# todo: 删除连接线
"""
删除连接线
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除连接线的对象!")
return
conn_sql = f"""
select
s.conn_id, ConfigCorrect, node_id, node_ifs
from sim_conn s
join conn_config c on s.conn_id=c.conn_id
where node_id='{self.chose_obj.ObjID}'
"""
conn_data = search(conn_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
child_d = tk.Toplevel()
child_d.title(f"{self.chose_obj.ObjLabel}的连接线配置")
child_d.geometry('300x200+450+200')
child_d.grab_set()
cv1 = Canvas(child_d)
cv1.pack()
value = StringVar()
combobox = ttk.Combobox(
master=child_d, # 父容器
height=12, # 高度,下拉显示的条目数量
width=15, # 宽度
state='readonly', # readonly(只可选)
font=('', 18), # 字体
textvariable=value, # 通过StringVar设置可改变的值
values=list(conn_names.keys()), # 设置下拉框的选项
)
def del_line():
if value.get() == "":
messagebox.showerror("注意", message="请选择需要删除的连接线!")
return
conn_sql = f"""
select
conn_id, node_id, node_ifs
from conn_config
where conn_id='{value.get()}'
"""
conn_data = search(conn_sql)
delete_sql = f"delete from sim_conn where conn_id='{value.get()}'"
execute_sql(delete_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
for data in conn_names[value.get()]:
self.AllSimObjs[data[0]].connections[data[1] - 1].delete_line()
self.AllSimObjs[data[0]].connections[data[1] - 1] = None
child_d.destroy()
messagebox.showinfo("提示", f"连接线{value.get()}删除成功!")
self.message.show_message(f"连接线{value.get()}删除成功!")
btn_yes = tk.Button(child_d, text='确定', font=('黑体', 12), height=1, command=del_line)
btn_yes.place(x=240, y=20)
combobox.place(x=20, y=20)
def update_tag_name(self):
# todo: 更新组件名称
"""
更新组件名称
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要更新的对象!")
return
child1 = tk.Toplevel()
child1.title(self.chose_obj.ObjLabel + "的标签信息")
child1.geometry('360x150+200+150')
child1.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child1, text='原标签:' + self.chose_obj.ObjLabel,
font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w')
tk.Label(child1, text='新标签:', font=('黑体', 16)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示
new_name = tk.Entry(child1, font=('黑体', 16), textvariable=tk.StringVar())
new_name.grid(row=1, column=1)
def update_tag():
name = new_name.get()
if name == "":
messagebox.showerror("注意", message="请输入新名称!")
return
self.chose_obj.ObjLabel = name
self.chose_obj.update()
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
conn.update_info(self.chose_obj.ObjID)
child1.destroy()
messagebox.showinfo("提示", message="修改成功!")
self.message.show_message(f"{name} 修改名称成功!")
tk.Button(child1, text='确定', font=('黑体', 16), height=1, command=update_tag).grid(row=2, column=0,
sticky='e', pady=10)
tk.Button(child1, text='取消', font=('黑体', 16), height=1, command=child1.destroy).grid(row=2, column=1,
sticky='e', pady=10)
def network_config(self):
# todo: 网络配置
"""
网络配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if len(self.chose_obj.get_config()) == 0:
messagebox.showerror("注意", message="请先给对象添加连接线!")
return
child_r = tk.Toplevel()
child_r.title(self.chose_obj.ObjLabel + "的网络配置信息")
child_r.geometry('713x522+350+200')
child_r.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
ifs_frame = tk.Frame(child_r)
ifs_frame.grid(row=1, column=0, columnspan=4) # 装接口的框架
ifs_frame_YN = tk.Frame(child_r)
ifs_frame_YN.grid(row=2, column=0, columnspan=4)
num = 0
datas = {}
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
index = self.chose_obj.connections.index(conn)
num_label = tk.LabelFrame(ifs_frame, text=f'接口{index + 1}')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 16)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2, 3] else NORMAL)
mac_en.insert('0', str(self.chose_obj.interface[index].get("mac", "")))
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 16)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2] else NORMAL)
ip_en.insert('0', str(self.chose_obj.interface[index].get("ip", "")))
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 16)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
port_en.insert('0', str(self.chose_obj.interface[index].get("conn_port", "")))
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 16)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
addr_en.insert('0', str(self.chose_obj.interface[index].get("addr", "")))
addr_en.grid(row=1, column=3, padx=10)
num += 1
datas[index + 1] = {"mac": mac_en, "ip": ip_en, "conn_port": port_en, "addr": addr_en}
num_label = tk.LabelFrame(ifs_frame, text=f'示例')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 16)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
mac_en.insert('0', "MAC11")
mac_en.config(state=READONLY)
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 16)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
ip_en.insert(0, "10.1.1.10")
ip_en.config(state=READONLY)
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 16)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
port_en.insert('0', "80")
port_en.config(state=READONLY)
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 16)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
addr_en.insert('0', "10.1.2.10:10810:Name3")
addr_en.config(state=READONLY)
addr_en.grid(row=1, column=3, padx=10)
def commit():
self.chose_obj.config(datas)
self.message.show_message(f"组件 {self.chose_obj.ObjLabel} 网络配置成功!")
child_r.destroy()
tk.Button(ifs_frame_YN, text='确定', font=('黑体', 16), height=1, command=commit).grid(row=0, column=0,
padx=20, pady=20) # ,sticky="w"
tk.Button(ifs_frame_YN, text='取消', font=('黑体', 16), height=1,
command=child_r.destroy).grid(row=0, column=2, padx=20, pady=20)
def router_table_config(self):
# todo: 路由表配置
"""
路由表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 2:
messagebox.showerror("注意", message="请选择路由器对象!")
return
RouterConfigWindow(self, self.chose_obj)
def mac_table_config(self):
# todo: 交换表配置
"""
交换表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择交换机对象!")
return
SwitchConfigWindow(self, self.chose_obj)
def show_label(self):
"""
显示/不显示标签
:return:
"""
self.show_label_flag = not self.show_label_flag
if self.show_label_flag:
for tag in self.AllSimObjs.values():
tag.create_img()
else:
for tag in self.AllSimObjs.values():
self.delete(tag.ObjID + "text")
def show_interface(self):
# todo: 显示/不显示接口
"""
显示/不显示接口
:return:
"""
self.show_interface_flag = not self.show_interface_flag
if self.show_interface_flag:
for conn in self.conns:
conn.draw_line()
else:
for conn in self.conns:
self.delete(conn.NobjS.ObjID + conn.NobjE.ObjID)
def show_network_config(self):
# todo: 显示网络配置信息
"""
显示网络配置信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
self.delete("netSet")
self.create_text(self.width * 0.82 + 120, 80, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="netSet")
self.create_text(self.width * 0.82 + 20, 80 + 25, text=self.chose_obj,
anchor="nw", font=('宋体', 14), tags="netSet")
def show_router_config(self):
# todo: 显示路由表交换表信息
"""
显示路由交换表信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ObjType != 2 and self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择路由器/交换机对象!")
return
self.delete("routerSet")
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="routerSet")
self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=self.chose_obj.get_table_config(),
anchor="nw", font=('宋体', 14), tags="routerSet")
def send_packet(self):
"""
发送数据包
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ConfigCorrect != 1:
messagebox.showerror("注意", message="请先对选择对象进行网络配置!")
return
if self.chose_obj.ObjType != 1:
messagebox.showerror("注意", message="请选择主机对象!")
return
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('330x195+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='目的IP:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
packet_ip = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1, pady=10)
tk.Label(child2, text='消息:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_message.grid(row=1, column=1, pady=10)
def send():
"""
发送数据包
:return:
"""
if packet_ip.get() == "":
messagebox.showerror("注意", message="ip地址不能为空")
return
if not validate_ip_address(packet_ip.get()):
messagebox.showerror("注意", message="IP地址不规范")
return
if packet_message.get() == "":
messagebox.showerror("注意", message="消息不能为空!")
return
ip = packet_ip.get()
message = packet_message.get()
child2.destroy()
self.chose_obj.create_packet(ip,
None,
message)
tk.Button(child2, text='确定', font=('黑体', 16), height=1, command=send).grid(row=3, column=0, sticky='e', pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=3, column=1, sticky='e', pady=10)
def send_packet_list(self):
"""
批量发送数据包
:return:
"""
hosts = {}
for tag in self.AllSimObjs.values():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("批量数据包配置")
child2.geometry('462x452+200+110')
tk.Label(child2, text='目的IP:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', ipady=10)
packet_ip = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1, pady=5)
tk.Label(child2, text='消息:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=5) # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_message.grid(row=1, column=1, pady=5)
host = StringVar()
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=2, column=0, sticky='w',
pady=5) # ,sticky='w'靠左显示
combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=12, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
combobox.grid(row=2, column=1, pady=5)
packet_frame = tk.Frame(child2)
packet_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
packet_treeview = ttk.Treeview(packet_frame, columns=("source_host", "ip", "mac", "message"), show="headings")
packet_treeview.heading("source_host", text="发送主机")
packet_treeview.column("source_host", width=60, anchor=CENTER)
packet_treeview.heading("ip", text="IP")
packet_treeview.column("ip", width=120, anchor=CENTER)
packet_treeview.heading("mac", text="MAC")
packet_treeview.column("mac", width=60, anchor=CENTER)
packet_treeview.heading("message", text="消息")
packet_treeview.column("message", width=120, anchor=CENTER)
packet_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(packet_frame, orient="vertical", command=packet_treeview.yview)
scrollbar.pack(side="right", fill="y")
packet_treeview.configure(yscrollcommand=scrollbar.set)
packets = []
def add():
ip = packet_ip.get()
message = packet_message.get()
chose_host = host.get()
if ip == "" or message == "" or chose_host == "":
messagebox.showerror("注意", message="输入框不能为空")
return
packet = SimPacket(self.AllSimObjs[hosts[chose_host]].interface[0]["ip"],
self.AllSimObjs[hosts[chose_host]].interface[0]["mac"],
ip, None, message)
packets.append((self.AllSimObjs[hosts[chose_host]], packet))
packet_treeview.delete(*packet_treeview.get_children())
for datas in packets:
tag: SimBase = datas[0]
packet_data: SimPacket = datas[1]
packet_treeview.insert("", "end", values=(
tag.ObjLabel, packet_data.destination_ip, packet_data.destination_mac, packet_data.message))
def send():
if len(packets) == 0:
messagebox.showerror("注意", message="请至少添加一条数据包!")
return
for data in packets:
threading.Thread(target=data[0].send, args=(data[1],)).start()
child2.destroy()
tk.Button(child2, text="添加数据包", font=('黑体', 14), height=3, command=add).grid(row=0, column=2, rowspan=4, padx=10)
button_frame = Frame(child2)
tk.Button(button_frame, text="发送", font=('黑体', 16), height=1, command=send).pack(side=RIGHT, padx=20)
tk.Button(button_frame, text="取消", font=('黑体', 16), height=1, command=child2.destroy).pack(side=RIGHT)
button_frame.grid(row=6, column=0, columnspan=3)
def clear_canvas(self):
"""
清除画布
:return:
"""
ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?')
if ask == "no":
return
truncate_db() # 清除数据库
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
def export_data(self):
try:
export = ExportUtil(sys.path[0] + "/database.xlsx")
export.export()
self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx"))
except Exception as E:
self.message.show_message(str(E), color="red")
def import_data(self):
truncate_db()
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
file_path = filedialog.askopenfilename()
if file_path:
export = ExportUtil(file_path)
export.import_data()
self.message.show_message("文件导入成功!")
self.reload_data()
def check_network(self):
"""
校验
:return:
"""
hosts = {}
for key, tag in self.AllSimObjs.items():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('392x168+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
send_host = StringVar()
host_combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=send_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
host_combobox.grid(row=0, column=1, pady=10)
tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
receive_host = StringVar()
ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=receive_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
).grid(row=1, column=1, pady=10)
def check():
"""
发送数据包
:return:
"""
if send_host.get() == "" or receive_host.get() == "":
messagebox.showerror("注意", message="信息不能为空")
return
if send_host.get() == receive_host.get():
messagebox.showerror("注意", message="发送主机和接收主机不能相同!")
send_host_obj: SimHost = self.AllSimObjs[hosts[send_host.get()]]
receive_host_obj: SimHost = self.AllSimObjs[hosts[receive_host.get()]]
child2.destroy()
send_host_obj.create_packet(receive_host_obj.interface[0]["ip"], receive_host_obj.interface[0]["mac"], "ping", True)
TransferAnimate(self, self.receive == receive_host_obj)
if self.receive == receive_host_obj:
self.message.show_message("校验成功:{}{} 能正常连通".format(send_host_obj.ObjLabel, receive_host_obj.ObjLabel))
else:
self.message.show_message(
"校验成功:{}{} 无法正常连通,请检查一下组件 {} 的配置".format(send_host_obj.ObjLabel, receive_host_obj.ObjLabel, self.check_error_obj.ObjLabel), "red")
self.check_error_obj = None
self.receive = None
tk.Button(child2, text='校验', font=('黑体', 16), height=1, command=check).grid(row=5, column=0, sticky='e',
pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1,
sticky='e', pady=10)
def create_config_button(self):
font_size = 16
font_style = ("Arial", font_size)
style = ttk.Style()
style.configure("TMenubutton", font=font_style)
# 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方
menubar = tk.Menu(root, font=font_style)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='删除与修改', menu=setMenu, font=font_style)
setMenu.add_command(label='删除对象', command=self.delete_obj)
setMenu.add_command(label='删除连接线', command=self.delete_line)
setMenu.add_command(label='修改标签', command=self.update_tag_name)
# # 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='基础配置', menu=setMenu, font=font_style)
setMenu.add_command(label='网络配置', command=self.network_config)
setMenu.add_command(label='路由表配置', command=self.router_table_config)
setMenu.add_command(label='交换表配置', command=self.mac_table_config)
root.config(menu=menubar)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='显示设置', menu=setMenu, font=font_style)
setMenu.add_command(label='显示/不显示标签', command=self.show_label)
setMenu.add_command(label='显示/不显示接口', command=self.show_interface)
setMenu.add_command(label='显示网络配置信息', command=self.show_network_config)
setMenu.add_command(label='显示路由/交换表信息', command=self.show_router_config)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='发送数据包', menu=setMenu, font=font_style)
setMenu.add_command(label='发送数据包', command=self.send_packet)
setMenu.add_command(label='批量发送数据包', command=self.send_packet_list)
menubar.add_command(label='清除屏幕', command=self.clear_canvas, font=font_style)
menubar.add_command(label='导出数据', command=self.export_data, font=font_style)
menubar.add_command(label='导入数据', command=self.import_data, font=font_style)
menubar.add_command(label='校验', command=self.check_network, font=font_style)
# 设置字体大小
for menu in menubar.winfo_children():
menu.config(font=font_style)
for item in menu.winfo_children():
item.config(font=font_style)
root.config(menu=menubar)
def create_widget(self):
"""
创建整体页面布局
:return:
"""
self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标
self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, 30 + 15, text="网络配置信息", anchor="n", font=('微软雅黑', 18, 'bold'))
self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
# round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="路由/交换表信息", anchor="n", font=('微软雅黑', 18, 'bold'))
# 显示左边的固定图片
img_height, text_height, split_width = 50, 40, 120
self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
router = self.create_image(img_height, split_width, image=self.router_img, anchor="nw")
self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
switch = self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw")
self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
hub = self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw")
self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
host = self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw")
self.add_sim_obj(router, "路由器")
self.add_sim_obj(switch, "交换机")
self.add_sim_obj(hub, "集线器")
self.add_sim_obj(host, "主机")
if __name__ == '__main__':
root = Window()
root.title('网络拓扑图')
screen_width = root.winfo_screenwidth() # winfo 方法来获取当前电脑屏幕大小
screen_height = root.winfo_screenheight()
root_attr = {
"width": screen_width * 0.83,
"height": screen_height * 0.85,
}
size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2,
(screen_height - root_attr['height']) / 2 - 30)
canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white")
canvas.place(x=0, y=0, anchor='nw')
canvas.create_config_button()
canvas.reload_data()
root.geometry(size)
root.mainloop()