You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

422 lines
22 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import ipaddress
import sys
import threading
from tkinter import filedialog
from SimUtil import *
from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase
from dbUtil import search, execute_sql, delete_obj, truncate_db
from PIL import Image as imim
from packet import *
class Message(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.message = []
self.master = master
self.scrollbar_y = tk.Scrollbar(master, orient="vertical", command=self.yview)
self.scrollbar_y.pack(side="right", fill=BOTH)
self.scrollbar_x = tk.Scrollbar(master, orient=HORIZONTAL, command=self.xview)
self.scrollbar_x.pack(side="bottom", fill=BOTH)
self.config(yscrollcommand=self.scrollbar_y.set, xscrollcommand=self.scrollbar_x.set)
def show_message(self, message, color="#5fa8fe"):
for mes in message.split("\n"):
self.message.append({"message": mes, "color": color})
num = 0
self.delete("message")
for function in self.message:
self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15),
fill=function["color"], tags="message")
num += 1
self.update()
self.configure(scrollregion=self.bbox("all"))
self.scrollbar_y.set(1.0, 1.0)
self.yview_moveto(1.0)
class NetWorkAnalog(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.master = master
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.width = int(self.cget("width"))
self.height = int(self.cget("height"))
self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)]
self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4))))
self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45))))
self.message_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30))
)
self.text_back_img = ImageTk.PhotoImage(
Image.open("../images/文字背景@3x.png").resize((155, 90))
)
message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW)
self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
self.message.pack()
self.message.configure(bg="#edf8ff")
self.configure(bg="#ddeafe")
self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3])))
self.create_image(0, 0, image=self.filename, anchor=NW)
self.chose = self.host_img
self.AllSimObjs = {}
self.conns = []
self.drawLine = True # 画线标志
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
self.chose_obj = None
self.show_label_flag = True
self.show_interface_flag = True
self.trans_obj = set()
self.create_widget()
def is_chose(self):
"""
当被选中时,绘制选中框
:return:
"""
self.delete("rectangle")
self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15,
self.chose_obj.ObjY + 15, outline="red", tags="rectangle")
def reload_data(self):
# todo: 加载上一次程序运行的数据
"""
加载上一次程序运行时的数据
:return:
"""
self.AllSimObjs = {}
self.conns = []
sim_obj_sql = "select * from sim_objs"
sim_obj_data = search(sim_obj_sql)
for index, row in sim_obj_data.iterrows(): # 初始化组件对象
sim_type = row["ObjType"]
ObjX = row["ObjX"]
ObjY = row["ObjY"]
ConfigCorrect = row["ConfigCorrect"]
ObjLable = row["ObjLabel"]
ObjID = row["ObjID"]
if sim_type == 1:
tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 2:
tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 3:
tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
else:
tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id"
sim_conn_data = search(sim_conn_sql)
conn_datas = {}
for index, conn in sim_conn_data.iterrows():
if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"]))
for key, value in conn_datas.items():
conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1],
self.AllSimObjs[value[1][0]], value[1][1], key[1])
conn_obj.draw_line()
self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象
self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj
self.conns.append(conn_obj)
conn_obj.ConfigCorrect = key[1]
def show_obj(self, AllSimObj, conns):
self.AllSimObjs = AllSimObj
self.conns = conns
for key, sim_obj in self.AllSimObjs.items():
sim_obj.create_img()
for conn in self.conns:
conn.draw_line()
def send_packet(self):
"""
发送数据包
:return:
"""
hosts = {}
for key, tag in self.AllSimObjs.items():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('392x306+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
send_host = StringVar()
host_combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=send_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
host_combobox.grid(row=0, column=1, pady=10)
tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示
receive_host = StringVar()
ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=receive_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
).grid(row=1, column=1, pady=10)
tk.Label(child2, text='数据大小(kb):', font=('黑体', 16)).grid(row=2, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
packet_size = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_size.grid(row=2, column=1, pady=10)
tk.Label(child2, text='数据标签:', font=('黑体', 16)).grid(row=3, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
packet_label = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_label.grid(row=3, column=1, pady=10)
def send():
"""
发送数据包
:return:
"""
if send_host.get() == "" or receive_host.get() == "" or packet_size.get() == "" or packet_label.get() == "":
messagebox.showerror("注意", message="信息不能为空")
return
if send_host.get() == receive_host.get():
messagebox.showerror("注意", message="发送主机和接收主机不能相同!")
send_message = """发送主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}"""
receive_message = """接收主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}"""
send_host_obj = self.AllSimObjs[hosts[send_host.get()]]
receive_host_obj = self.AllSimObjs[hosts[receive_host.get()]]
pack_label = packet_label.get()
pack_size = packet_size.get()
child2.destroy()
count = split_appdata(int(pack_size))
self.show_top_message(send_host_obj,
message=send_message.format(send_host_obj.ObjLabel, send_host_obj.interface[0]["ip"],
send_host_obj.interface[0]["mac"], pack_size, 0, count))
self.show_bottom_message(receive_host_obj, message=receive_message.format(receive_host_obj.ObjLabel,
receive_host_obj.interface[0][
"ip"],
receive_host_obj.interface[0][
"mac"], pack_size, 0, count))
for obj in self.trans_obj:
self.delete(obj.ObjID + "detail_button")
self.delete(obj.ObjID + "detail")
self.unbind(obj.ObjID, "<Button-1>")
send_host_obj.create_packet(receive_host_obj, pack_label, pack_size)
tk.Button(child2, text='开启模拟', font=('黑体', 16), height=1, command=send).grid(row=5, column=0, sticky='e', pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1, sticky='e', pady=10)
# 建立画出对象详情的函数
def draw_detail(self, obj):
# 判断type对象来画出详情表
if obj.ObjType == 1: # 当type对象为1 画出主机详情图
app_color = ("#94D050", '#00B0F0')
trans_color = ("#94D050", '#00B0F0')
ip_color = ("#94D050", '#00B0F0')
mac_color = ("#94D050", '#00B0F0')
elif obj.ObjType == 2:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#94D050", '#00B0F0')
mac_color = ("#94D050", '#00B0F0')
elif obj.ObjType == 3:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#D3D3D3", "#D3D3D3")
mac_color = ("#94D050", '#00B0F0')
else:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#D3D3D3", "#D3D3D3")
mac_color = ("#94D050", '#00B0F0')
frist_x = obj.ObjX # 获取frist_x
frist_y = obj.ObjY # 获取frist_y
# 这里缺少一个删除其他详情的步骤
# 画出连接详情表的线
tag_id = obj.ObjID + "detail"
if len(self.gettags(tag_id)) > 0:
self.delete(tag_id)
else:
self.create_line((frist_x + 40, frist_y - 20), (frist_x + 80, frist_y - 20),
(frist_x + 90, frist_y - 60),
fill='#50abf8', tags=tag_id, width=2)
# 画出详情表
self.create_image(frist_x + 50, frist_y - 140, image=self.text_back_img, anchor=NW, tags=tag_id)
# 画出相应的绿条数据
self.create_text(frist_x + 70, frist_y - 130, text='应用层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 107, text='传输层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 84, text='IP 层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 62, text='链路层', tags=tag_id)
# 画出 右侧绿色和蓝色的类进度条
# 应用层
self.create_rectangle(frist_x + 197, frist_y - 135, frist_x + 170, frist_y - 120, fill=app_color[0],
outline=app_color[0], tags=tag_id)
self.create_rectangle(frist_x + 168, frist_y - 135, frist_x + 148, frist_y - 120, fill=app_color[1],
outline=app_color[1], tags=tag_id)
# 传输层
self.create_rectangle(frist_x + 197, frist_y - 115, frist_x + 160, frist_y - 100,
fill=trans_color[0],
outline=trans_color[0], tags=tag_id)
self.create_rectangle(frist_x + 158, frist_y - 115, frist_x + 133, frist_y - 100,
fill=trans_color[1],
outline=trans_color[1], tags=tag_id)
# IP 层
self.create_rectangle(frist_x + 197, frist_y - 93, frist_x + 150, frist_y - 78, fill=ip_color[0],
outline=ip_color[0], tags=tag_id)
self.create_rectangle(frist_x + 148, frist_y - 93, frist_x + 118, frist_y - 78, fill=ip_color[1],
outline=ip_color[1], tags=tag_id)
# 链路层
self.create_rectangle(frist_x + 197, frist_y - 70, frist_x + 133, frist_y - 55, fill=mac_color[0],
outline=mac_color[0], tags=tag_id)
self.create_rectangle(frist_x + 131, frist_y - 70, frist_x + 98, frist_y - 55, fill=mac_color[1],
outline=mac_color[1], tags=tag_id)
def trans_over(self, objs):
print("传输完成")
self.trans_obj = objs
for obj in objs:
obj.create_detail_button()
self.tag_bind(obj.ObjID, "<Button-1>", lambda event, obj=obj: self.draw_detail(obj)) # 绑定右击事件
obj.is_packet = False
obj.is_un_packet = False
def clear_canvas(self):
"""
清除画布
:return:
"""
ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?')
if ask == "no":
return
truncate_db() # 清除数据库
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
def export_data(self):
try:
export = ExportUtil(sys.path[0] + "/database.xlsx")
export.export()
self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx"))
except Exception as E:
self.message.show_message(str(E), color="red")
def import_data(self):
truncate_db()
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
file_path = filedialog.askopenfilename()
if file_path:
export = ExportUtil(file_path)
export.import_data()
self.message.show_message("文件导入成功!")
self.reload_data()
def show_top_message(self, obj, message):
# todo: 显示网络配置信息
"""
显示网络配置信息
:return:
"""
self.delete("netSet")
self.create_text(self.width * 0.82 + 120, 80, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="netSet")
self.create_text(self.width * 0.82 + 20, 80 + 25, text=message,
anchor="nw", font=('宋体', 14), tags="netSet")
def show_bottom_message(self, obj, message):
# todo: 显示路由表交换表信息
"""
显示路由交换表信息
:return:
"""
self.delete("routerSet")
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="routerSet")
self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=message,
anchor="nw", font=('宋体', 14), tags="routerSet")
def create_config_button(self):
# 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方
menubar = tk.Menu(root)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='发送数据包', menu=setMenu)
setMenu.add_command(label='发送数据包', command=self.send_packet)
menubar.add_command(label='清除屏幕', command=self.clear_canvas)
menubar.add_command(label='导出数据', command=self.export_data)
menubar.add_command(label='导入数据', command=self.import_data)
root.config(menu=menubar)
def create_widget(self):
# todo: 创建页面
"""
创建整体页面布局
:return:
"""
self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标
self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, 30 + 15, text="发送主机详情", anchor="n", font=('微软雅黑', 18, 'bold'))
self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
# round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="接收主机详情", anchor="n", font=('微软雅黑', 18, 'bold'))
# 显示左边的固定图片
img_height, text_height, split_width = 50, 40, 120
self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width, image=self.router_img, anchor="nw")
self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw")
self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw")
self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw")
if __name__ == '__main__':
root = Window()
root.title('网络拓扑图')
screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小
screen_height = root.winfo_screenheight()
root_attr = {
"width": screen_width * 0.83,
"height": screen_height * 0.85,
}
size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2,
(screen_height - root_attr['height']) / 2 - 30)
canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white")
canvas.place(x=0, y=0, anchor='nw')
canvas.create_config_button()
canvas.reload_data()
root.geometry(size)
root.mainloop()