You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

945 lines
45 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import ipaddress
import sys
import threading
from tkinter import filedialog
from SimUtil import *
from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase
from dbUtil import search, execute_sql, delete_obj, truncate_db
from PIL import Image as imim
class Message(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.message = []
self.master = master
self.scrollbar = tk.Scrollbar(master, orient="vertical", command=self.yview)
self.scrollbar.pack(side="right", fill=BOTH)
self.config(yscrollcommand=self.scrollbar.set)
def show_message(self, message, color="#5fa8fe"):
self.message.append({"message": message, "color": color})
num = 0
self.delete("message")
for function in self.message:
self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15),
fill=function["color"], tags="message")
num += 1
self.update()
self.configure(scrollregion=self.bbox("all"))
self.scrollbar.set(1.0, 1.0)
self.yview_moveto(1.0)
class NetWorkAnalog(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.master = master
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.pack_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)))
self.true_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/true.png").resize((30, 30)))
self.false_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/false.png").resize((30, 30)))
self.width = int(self.cget("width"))
self.height = int(self.cget("height"))
self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)]
self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4))))
self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45))))
self.message_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30))
)
message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW)
self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
self.message.pack()
self.message.configure(bg="#edf8ff")
self.configure(bg="#ddeafe")
self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3])))
self.check_filename = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
self.create_image(0, 0, image=self.filename, anchor=NW)
self.chose = self.host_img
self.AllSimObjs = {}
self.receive = None
self.conns = []
self.drawLine = True # 画线标志
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
self.chose_obj = None
self.show_label_flag = True
self.show_interface_flag = True
self.check_error_obj = None
self.create_widget()
def is_chose(self):
"""
当被选中时,绘制选中框
:return:
"""
self.delete("rectangle")
self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15,
self.chose_obj.ObjY + 15, outline="red", tags="rectangle")
self.show_network_config()
if self.chose_obj.ObjType == 2 or self.chose_obj.ObjType == 3:
self.show_router_config()
def tag_bind_event(self):
"""
为每个子组件绑定事件
:return:
"""
def init():
"""
初始化方法,将连接对象赋值为初始化值
:return:
"""
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
def show_menu(event, tag: SimBase):
"""
右击组件弹出选择接口框
:param event:
:param tag:
:return:
"""
menu = Menu(self, tearoff=0)
flag = False
for conn in tag.connections:
i = tag.connections.index(conn)
if not isinstance(conn, AllSimConnect):
menu.add_command(label=f"接口{i + 1}", command=lambda num=i + 1: interface_selected(num, tag))
flag = True
if not flag:
menu.add_command(label="暂无可用接口", state="disabled")
menu.post(event.x_root, event.y_root)
def interface_selected(interface, tag):
"""
选择接口回调
:param interface:
:return:
"""
if self.line_start_ifs is None:
self.line_start_ifs = interface
self.line_start_obj = tag
self.drawLine = True
self.bind("<Motion>", lambda event: right_motion(event))
self.bind("<Escape>", quit)
self.focus_set() # 获取焦点
else:
self.line_end_ifs = interface
self.line_end_obj = tag
self.delete("Line")
flag = False
if self.line_start_obj.ObjID == self.line_end_obj.ObjID:
messagebox.showerror("注意!", message="不能连接自己")
return
conn = AllSimConnect(self, self.line_start_obj, self.line_start_ifs, self.line_end_obj,
self.line_end_ifs)
conn.draw_line()
for conn_obj in self.line_start_obj.connections: # 判断两个连接对象是否已经连接过了
if conn_obj == conn:
flag = True
if not flag:
self.line_start_obj.connections[self.line_start_ifs - 1] = conn
self.line_end_obj.connections[self.line_end_ifs - 1] = conn
self.message.show_message(f"{self.line_start_obj.ObjLabel}连接{self.line_end_obj.ObjLabel}成功!")
conn.save()
self.conns.append(conn)
init()
self.unbind("<Motion>")
else:
del conn
self.delete("Line")
self.unbind("<Motion>")
init()
messagebox.showerror("注意!", message="已经连接过了")
def right_motion(event):
"""
移动鼠标
:param event:
:param tag:
:return:
"""
if self.drawLine:
self.delete("Line")
x, y = event.x, event.y
if not (0 < x < self.canvas_size[2] and 0 < y < self.canvas_size[3]):
if x < 0:
x = 0
elif x > self.canvas_size[2]:
x = self.canvas_size[2]
if y < 0:
y = 0
elif y > self.canvas_size[3]:
y = self.canvas_size[3]
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1,
tags="Line")
return
self.create_line(self.line_start_obj.ObjX, self.line_start_obj.ObjY, x + 8, y + 8, fill="#5b9bd5",
width=1, tags="Line")
def quit(event):
self.delete("Line") # 删除刚刚产生的连接线
self.drawLine = False # 关闭画线标志
init()
for tag_id, tag in self.AllSimObjs.items():
self.tag_bind(tag.ObjID, "<Button-3>", lambda event, tag=tag: show_menu(event, tag)) # 绑定右击事件
def add_sim_obj(self, component, name):
# todo: 绑定事件
"""
绑定事件
:param component: 组件对象
:param name: 组件名称
"""
def move(event, name):
"""
鼠标左键松开事件
:param event: 事件对象
"""
if 20 + self.canvas_size[0] < event.x < self.canvas_size[2] + 20 and 20 + self.canvas_size[1] < event.y < self.canvas_size[3] - 20: # 在方框内,无变化
if name == "路由器":
tag = SimRouter(self, event.x, event.y)
elif name == "集线器":
tag = SimHub(self, event.x, event.y)
elif name == "交换机":
tag = SimSwitch(self, event.x, event.y)
else:
tag = SimHost(self, event.x, event.y)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
tag.save()
self.message.show_message(f"组件 {tag.ObjLabel} 创建成功!")
self.tag_bind_event()
else:
self.delete("L")
def motion(event, name):
"""
鼠标拖动事件
:param event: 事件对象
"""
if name == "路由器":
self.chose = self.router_img
elif name == "集线器":
self.chose = self.hub_img
elif name == "交换机":
self.chose = self.switch_img
else:
self.chose = self.host_img
self.delete("L")
if 20 + self.canvas_size[0] < event.x < self.canvas_size[2] + 20 and 20 + self.canvas_size[1] < event.y < self.canvas_size[3] - 20: # 在方框内,无变化
pass
else: # 在方框外,显示禁止放置标识
self.create_oval(event.x - 10, event.y - 45, event.x + 10, event.y - 25, outline="red", width=2,
tags="L")
self.create_line(event.x - 7, event.y - 42, event.x + 8, event.y - 27, fill="red", width=2, tags="L")
self.create_image(event.x - 30, event.y - 30, image=self.chose, anchor="nw", tags="L")
self.tag_bind(component, "<ButtonRelease-1>", lambda event: move(event, name))
self.tag_bind(component, "<B1-Motion>", lambda event: motion(event, name))
def reload_data(self):
# todo: 加载上一次程序运行的数据
"""
加载上一次程序运行时的数据
:return:
"""
self.AllSimObjs = {}
self.conns = []
sim_obj_sql = "select * from sim_objs"
sim_obj_data = search(sim_obj_sql)
for index, row in sim_obj_data.iterrows(): # 初始化组件对象
sim_type = row["ObjType"]
ObjX = row["ObjX"]
ObjY = row["ObjY"]
ConfigCorrect = row["ConfigCorrect"]
ObjLable = row["ObjLabel"]
ObjID = row["ObjID"]
if sim_type == 1:
tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 2:
tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 3:
tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
else:
tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id"
sim_conn_data = search(sim_conn_sql)
conn_datas = {}
for index, conn in sim_conn_data.iterrows():
if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"]))
for key, value in conn_datas.items():
conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1],
self.AllSimObjs[value[1][0]], value[1][1], key[1])
conn_obj.draw_line()
self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象
self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj
self.conns.append(conn_obj)
conn_obj.ConfigCorrect = key[1]
self.tag_bind_event()
def show_obj(self, AllSimObj, AllSimConn):
self.AllSimObjs = AllSimObj
self.conns = AllSimConn
for key, sim_obj in self.AllSimObjs.items():
sim_obj.create_img()
for conn in self.conns:
conn.draw_line()
def delete_obj(self):
# todo: 删除对象
"""
选中删除对象
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除的对象!")
return
ask = messagebox.askquestion(title='确认操作', message='确认删除该对象吗?')
if ask == "no":
return
self.delete(self.chose_obj.ObjID) # 删除图片
self.delete(self.chose_obj.ObjID + "text") # 删除标签
for conn in self.chose_obj.connections: # 删除该对象的所有连接线
if isinstance(conn, AllSimConnect):
conn.delete_line()
delete_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{self.chose_obj.ObjID}')"
delete_config_sql = f"delete from conn_config where node_id='{self.chose_obj.ObjID}'"
execute_sql(delete_sql)
execute_sql(delete_config_sql)
delete_obj(self.chose_obj.ObjID)
self.message.show_message(f"组件 {self.chose_obj.ObjLabel} 删除成功!")
self.delete("rectangle")
self.reload_data()
def delete_line(self):
# todo: 删除连接线
"""
删除连接线
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要删除连接线的对象!")
return
conn_sql = f"""
select
s.conn_id, ConfigCorrect, node_id, node_ifs
from sim_conn s
join conn_config c on s.conn_id=c.conn_id
where node_id='{self.chose_obj.ObjID}'
"""
conn_data = search(conn_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
child_d = tk.Toplevel()
child_d.title(f"{self.chose_obj.ObjLabel}的连接线配置")
child_d.geometry('300x200+450+200')
child_d.grab_set()
cv1 = Canvas(child_d)
cv1.pack()
value = StringVar()
combobox = ttk.Combobox(
master=child_d, # 父容器
height=12, # 高度,下拉显示的条目数量
width=15, # 宽度
state='readonly', # readonly(只可选)
font=('', 18), # 字体
textvariable=value, # 通过StringVar设置可改变的值
values=list(conn_names.keys()), # 设置下拉框的选项
)
def del_line():
if value.get() == "":
messagebox.showerror("注意", message="请选择需要删除的连接线!")
return
conn_sql = f"""
select
conn_id, node_id, node_ifs
from conn_config
where conn_id='{value.get()}'
"""
conn_data = search(conn_sql)
delete_sql = f"delete from sim_conn where conn_id='{value.get()}'"
execute_sql(delete_sql)
conn_names = {}
for index, conn in conn_data.iterrows():
if conn["conn_id"] not in conn_names:
conn_names[conn["conn_id"]] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_names[conn["conn_id"]].append((conn["node_id"], conn["node_ifs"]))
for data in conn_names[value.get()]:
self.AllSimObjs[data[0]].connections[data[1] - 1].delete_line()
self.AllSimObjs[data[0]].connections[data[1] - 1] = None
child_d.destroy()
messagebox.showinfo("提示", f"连接线{value.get()}删除成功!")
self.message.show_message(f"连接线{value.get()}删除成功!")
btn_yes = tk.Button(child_d, text='确定', font=('黑体', 12), height=1, command=del_line)
btn_yes.place(x=240, y=20)
combobox.place(x=20, y=20)
def update_tag_name(self):
# todo: 更新组件名称
"""
更新组件名称
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要更新的对象!")
return
child1 = tk.Toplevel()
child1.title(self.chose_obj.ObjLabel + "的标签信息")
child1.geometry('360x150+200+150')
child1.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child1, text='原标签:' + self.chose_obj.ObjLabel,
font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w')
tk.Label(child1, text='新标签:', font=('黑体', 16)).grid(row=1, column=0, sticky='w') # ,sticky='w'靠左显示
new_name = tk.Entry(child1, font=('黑体', 16), textvariable=tk.StringVar())
new_name.grid(row=1, column=1)
def update_tag():
name = new_name.get()
if name == "":
messagebox.showerror("注意", message="请输入新名称!")
return
self.chose_obj.ObjLabel = name
self.chose_obj.update()
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
conn.update_info(self.chose_obj.ObjID)
child1.destroy()
messagebox.showinfo("提示", message="修改成功!")
self.message.show_message(f"{name} 修改名称成功!")
tk.Button(child1, text='确定', font=('黑体', 16), height=1, command=update_tag).grid(row=2, column=0,
sticky='e', pady=10)
tk.Button(child1, text='取消', font=('黑体', 16), height=1, command=child1.destroy).grid(row=2, column=1,
sticky='e', pady=10)
def network_config(self):
# todo: 网络配置
"""
网络配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if len(self.chose_obj.get_config()) == 0:
messagebox.showerror("注意", message="请先给对象添加连接线!")
return
child_r = tk.Toplevel()
child_r.title(self.chose_obj.ObjLabel + "的网络配置信息")
child_r.geometry('713x522+350+200')
child_r.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
ifs_frame = tk.Frame(child_r)
ifs_frame.grid(row=1, column=0, columnspan=4) # 装接口的框架
ifs_frame_YN = tk.Frame(child_r)
ifs_frame_YN.grid(row=2, column=0, columnspan=4)
num = 0
datas = {}
for conn in self.chose_obj.connections:
if isinstance(conn, AllSimConnect):
index = self.chose_obj.connections.index(conn)
num_label = tk.LabelFrame(ifs_frame, text=f'接口{index + 1}')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 16)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2, 3] else NORMAL)
mac_en.insert('0', str(self.chose_obj.interface[index].get("mac", "")))
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 16)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType not in [1, 2] else NORMAL)
ip_en.insert('0', str(self.chose_obj.interface[index].get("ip", "")))
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 16)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
port_en.insert('0', str(self.chose_obj.interface[index].get("conn_port", "")))
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 16)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar(), state=DISABLED if self.chose_obj.ObjType != 1 else NORMAL)
addr_en.insert('0', str(self.chose_obj.interface[index].get("addr", "")))
addr_en.grid(row=1, column=3, padx=10)
num += 1
datas[index + 1] = {"mac": mac_en, "ip": ip_en, "conn_port": port_en, "addr": addr_en}
num_label = tk.LabelFrame(ifs_frame, text=f'示例')
num_label.grid(row=num, column=0, padx=18, ipady=5)
tk.Label(num_label, text='MAC:', font=('黑体', 16)).grid(row=0, column=0)
mac_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
mac_en.insert('0', "MAC11")
mac_en.config(state=READONLY)
mac_en.grid(row=0, column=1, padx=10)
tk.Label(num_label, text='IP:', font=('黑体', 16)).grid(row=1, column=0)
ip_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
ip_en.insert(0, "10.1.1.10")
ip_en.config(state=READONLY)
ip_en.grid(row=1, column=1, padx=10)
tk.Label(num_label, text='端口:', font=('黑体', 16)).grid(row=0, column=2)
port_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
port_en.insert('0', "80")
port_en.config(state=READONLY)
port_en.grid(row=0, column=3, padx=10)
tk.Label(num_label, text='应用层地址:', font=('黑体', 16)).grid(row=1, column=2)
addr_en = tk.Entry(num_label, font=('黑体', 16), textvariable=tk.StringVar())
addr_en.insert('0', "10.1.2.10:10810:Name3")
addr_en.config(state=READONLY)
addr_en.grid(row=1, column=3, padx=10)
def commit():
self.chose_obj.config(datas)
self.message.show_message(f"组件 {self.chose_obj.ObjLabel} 网络配置成功!")
child_r.destroy()
tk.Button(ifs_frame_YN, text='确定', font=('黑体', 16), height=1, command=commit).grid(row=0, column=0,
padx=20, pady=20) # ,sticky="w"
tk.Button(ifs_frame_YN, text='取消', font=('黑体', 16), height=1,
command=child_r.destroy).grid(row=0, column=2, padx=20, pady=20)
def router_table_config(self):
# todo: 路由表配置
"""
路由表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 2:
messagebox.showerror("注意", message="请选择路由器对象!")
return
RouterConfigWindow(self, self.chose_obj)
def mac_table_config(self):
# todo: 交换表配置
"""
交换表配置
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要配置的对象!")
return
if self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择交换机对象!")
return
SwitchConfigWindow(self, self.chose_obj)
def show_label(self):
"""
显示/不显示标签
:return:
"""
self.show_label_flag = not self.show_label_flag
if self.show_label_flag:
for tag in self.AllSimObjs.values():
tag.create_img()
else:
for tag in self.AllSimObjs.values():
self.delete(tag.ObjID + "text")
def show_interface(self):
# todo: 显示/不显示接口
"""
显示/不显示接口
:return:
"""
self.show_interface_flag = not self.show_interface_flag
if self.show_interface_flag:
for conn in self.conns:
conn.draw_line()
else:
for conn in self.conns:
self.delete(conn.NobjS.ObjID + conn.NobjE.ObjID)
def show_network_config(self):
# todo: 显示网络配置信息
"""
显示网络配置信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
self.delete("netSet")
self.create_text(self.width * 0.82 + 120, 80, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="netSet")
self.create_text(self.width * 0.82 + 20, 80 + 25, text=self.chose_obj,
anchor="nw", font=('宋体', 14), tags="netSet")
def show_router_config(self):
# todo: 显示路由表交换表信息
"""
显示路由交换表信息
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ObjType != 2 and self.chose_obj.ObjType != 3:
messagebox.showerror("注意", message="请选择路由器/交换机对象!")
return
self.delete("routerSet")
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + self.chose_obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="routerSet")
self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=self.chose_obj.get_table_config(),
anchor="nw", font=('宋体', 14), tags="routerSet")
def send_packet(self):
"""
发送数据包
:return:
"""
if self.chose_obj is None:
messagebox.showerror("注意", message="请先选择要显示的对象!")
return
if self.chose_obj.ConfigCorrect != 1:
messagebox.showerror("注意", message="请先对选择对象进行网络配置!")
return
if self.chose_obj.ObjType != 1:
messagebox.showerror("注意", message="请选择主机对象!")
return
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('330x195+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='目的IP:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
packet_ip = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1, pady=10)
tk.Label(child2, text='消息:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_message.grid(row=1, column=1, pady=10)
def send():
"""
发送数据包
:return:
"""
if packet_ip.get() == "":
messagebox.showerror("注意", message="ip地址不能为空")
return
if not validate_ip_address(packet_ip.get()):
messagebox.showerror("注意", message="IP地址不规范")
return
if packet_message.get() == "":
messagebox.showerror("注意", message="消息不能为空!")
return
ip = packet_ip.get()
message = packet_message.get()
child2.destroy()
self.chose_obj.create_packet(ip,
None,
message)
tk.Button(child2, text='确定', font=('黑体', 16), height=1, command=send).grid(row=3, column=0, sticky='e', pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=3, column=1, sticky='e', pady=10)
def send_packet_list(self):
"""
批量发送数据包
:return:
"""
hosts = {}
for tag in self.AllSimObjs.values():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("批量数据包配置")
child2.geometry('462x452+200+110')
tk.Label(child2, text='目的IP:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', ipady=10)
packet_ip = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_ip.grid(row=0, column=1, pady=5)
tk.Label(child2, text='消息:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=5) # ,sticky='w'靠左显示
packet_message = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_message.grid(row=1, column=1, pady=5)
host = StringVar()
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=2, column=0, sticky='w',
pady=5) # ,sticky='w'靠左显示
combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=12, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
combobox.grid(row=2, column=1, pady=5)
packet_frame = tk.Frame(child2)
packet_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
packet_treeview = ttk.Treeview(packet_frame, columns=("source_host", "ip", "mac", "message"), show="headings")
packet_treeview.heading("source_host", text="发送主机")
packet_treeview.column("source_host", width=60, anchor=CENTER)
packet_treeview.heading("ip", text="IP")
packet_treeview.column("ip", width=120, anchor=CENTER)
packet_treeview.heading("mac", text="MAC")
packet_treeview.column("mac", width=60, anchor=CENTER)
packet_treeview.heading("message", text="消息")
packet_treeview.column("message", width=120, anchor=CENTER)
packet_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(packet_frame, orient="vertical", command=packet_treeview.yview)
scrollbar.pack(side="right", fill="y")
packet_treeview.configure(yscrollcommand=scrollbar.set)
packets = []
def add():
ip = packet_ip.get()
message = packet_message.get()
chose_host = host.get()
if ip == "" or message == "" or chose_host == "":
messagebox.showerror("注意", message="输入框不能为空")
return
packet = SimPacket(self.AllSimObjs[hosts[chose_host]].interface[0]["ip"],
self.AllSimObjs[hosts[chose_host]].interface[0]["mac"],
ip, None, message)
packets.append((self.AllSimObjs[hosts[chose_host]], packet))
packet_treeview.delete(*packet_treeview.get_children())
for datas in packets:
tag: SimBase = datas[0]
packet_data: SimPacket = datas[1]
packet_treeview.insert("", "end", values=(
tag.ObjLabel, packet_data.destination_ip, packet_data.destination_mac, packet_data.message))
def send():
if len(packets) == 0:
messagebox.showerror("注意", message="请至少添加一条数据包!")
return
for data in packets:
threading.Thread(target=data[0].send, args=(data[1],)).start()
child2.destroy()
tk.Button(child2, text="添加数据包", font=('黑体', 14), height=3, command=add).grid(row=0, column=2, rowspan=4, padx=10)
button_frame = Frame(child2)
tk.Button(button_frame, text="发送", font=('黑体', 16), height=1, command=send).pack(side=RIGHT, padx=20)
tk.Button(button_frame, text="取消", font=('黑体', 16), height=1, command=child2.destroy).pack(side=RIGHT)
button_frame.grid(row=6, column=0, columnspan=3)
def clear_canvas(self):
"""
清除画布
:return:
"""
ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?')
if ask == "no":
return
truncate_db() # 清除数据库
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
def export_data(self):
try:
export = ExportUtil(sys.path[0] + "/database.xlsx")
export.export()
self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx"))
except Exception as E:
self.message.show_message(str(E), color="red")
def import_data(self):
truncate_db()
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
file_path = filedialog.askopenfilename()
if file_path:
export = ExportUtil(file_path)
export.import_data()
self.message.show_message("文件导入成功!")
self.reload_data()
def check_network(self):
"""
校验
:return:
"""
hosts = {}
for key, tag in self.AllSimObjs.items():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('392x168+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
send_host = StringVar()
host_combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=send_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
host_combobox.grid(row=0, column=1, pady=10)
tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
receive_host = StringVar()
ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=receive_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
).grid(row=1, column=1, pady=10)
def check():
"""
发送数据包
:return:
"""
if send_host.get() == "" or receive_host.get() == "":
messagebox.showerror("注意", message="信息不能为空")
return
if send_host.get() == receive_host.get():
messagebox.showerror("注意", message="发送主机和接收主机不能相同!")
send_host_obj: SimHost = self.AllSimObjs[hosts[send_host.get()]]
receive_host_obj: SimHost = self.AllSimObjs[hosts[receive_host.get()]]
child2.destroy()
send_host_obj.create_packet(receive_host_obj.interface[0]["ip"], receive_host_obj.interface[0]["mac"], "ping", True)
TransferAnimate(self, self.receive == receive_host_obj)
if self.receive == receive_host_obj:
self.message.show_message("校验成功:{}{} 能正常连通".format(send_host_obj.ObjLabel, receive_host_obj.ObjLabel))
else:
self.message.show_message(
"校验成功:{}{} 无法正常连通,请检查一下组件 {} 的配置".format(send_host_obj.ObjLabel, receive_host_obj.ObjLabel, self.check_error_obj.ObjLabel), "red")
self.check_error_obj = None
self.receive = None
tk.Button(child2, text='校验', font=('黑体', 16), height=1, command=check).grid(row=5, column=0, sticky='e',
pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1,
sticky='e', pady=10)
def create_config_button(self):
font_size = 16
font_style = ("Arial", font_size)
style = ttk.Style()
style.configure("TMenubutton", font=font_style)
# 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方
menubar = tk.Menu(root, font=font_style)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='删除与修改', menu=setMenu, font=font_style)
setMenu.add_command(label='删除对象', command=self.delete_obj)
setMenu.add_command(label='删除连接线', command=self.delete_line)
setMenu.add_command(label='修改标签', command=self.update_tag_name)
# # 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='基础配置', menu=setMenu, font=font_style)
setMenu.add_command(label='网络配置', command=self.network_config)
setMenu.add_command(label='路由表配置', command=self.router_table_config)
setMenu.add_command(label='交换表配置', command=self.mac_table_config)
root.config(menu=menubar)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='显示设置', menu=setMenu, font=font_style)
setMenu.add_command(label='显示/不显示标签', command=self.show_label)
setMenu.add_command(label='显示/不显示接口', command=self.show_interface)
setMenu.add_command(label='显示网络配置信息', command=self.show_network_config)
setMenu.add_command(label='显示路由/交换表信息', command=self.show_router_config)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0, font=font_style)
menubar.add_cascade(label='发送数据包', menu=setMenu, font=font_style)
setMenu.add_command(label='发送数据包', command=self.send_packet)
setMenu.add_command(label='批量发送数据包', command=self.send_packet_list)
menubar.add_command(label='清除屏幕', command=self.clear_canvas, font=font_style)
menubar.add_command(label='导出数据', command=self.export_data, font=font_style)
menubar.add_command(label='导入数据', command=self.import_data, font=font_style)
menubar.add_command(label='校验', command=self.check_network, font=font_style)
# 设置字体大小
for menu in menubar.winfo_children():
menu.config(font=font_style)
for item in menu.winfo_children():
item.config(font=font_style)
root.config(menu=menubar)
def create_widget(self):
"""
创建整体页面布局
:return:
"""
self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标
self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, 30 + 15, text="网络配置信息", anchor="n", font=('微软雅黑', 18, 'bold'))
self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
# round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="路由/交换表信息", anchor="n", font=('微软雅黑', 18, 'bold'))
# 显示左边的固定图片
img_height, text_height, split_width = 50, 40, 120
self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
router = self.create_image(img_height, split_width, image=self.router_img, anchor="nw")
self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
switch = self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw")
self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
hub = self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw")
self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
host = self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw")
self.add_sim_obj(router, "路由器")
self.add_sim_obj(switch, "交换机")
self.add_sim_obj(hub, "集线器")
self.add_sim_obj(host, "主机")
if __name__ == '__main__':
root = Window()
root.title('网络拓扑图')
screen_width = root.winfo_screenwidth() # winfo 方法来获取当前电脑屏幕大小
screen_height = root.winfo_screenheight()
root_attr = {
"width": screen_width * 0.83,
"height": screen_height * 0.85,
}
size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2,
(screen_height - root_attr['height']) / 2 - 30)
canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white")
canvas.place(x=0, y=0, anchor='nw')
canvas.create_config_button()
canvas.reload_data()
root.geometry(size)
root.mainloop()