You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
333 lines
15 KiB
333 lines
15 KiB
5 months ago
|
import ipaddress
|
||
|
import re
|
||
|
import sqlite3
|
||
|
from time import sleep
|
||
|
from tkinter import messagebox
|
||
|
|
||
|
import ttkbootstrap as tk
|
||
|
from ttkbootstrap import *
|
||
|
from ttkbootstrap import ttk
|
||
|
import pandas as pd
|
||
|
import sys
|
||
|
from PIL import Image, ImageTk
|
||
|
|
||
|
from SimObjs import SimRouter
|
||
|
|
||
|
|
||
|
def round_rectangle(cv, x1, y1, x2, y2, radius=30, **kwargs):
|
||
|
"""
|
||
|
绘制圆角矩形
|
||
|
:param cv: canvas对象
|
||
|
:param radius: 圆角值
|
||
|
:return:
|
||
|
"""
|
||
|
points = [x1 + radius, y1,
|
||
|
x1 + radius, y1,
|
||
|
x2 - radius, y1,
|
||
|
x2 - radius, y1,
|
||
|
x2, y1,
|
||
|
x2, y1 + radius,
|
||
|
x2, y1 + radius,
|
||
|
x2, y2 - radius,
|
||
|
x2, y2 - radius,
|
||
|
x2, y2,
|
||
|
x2 - radius, y2,
|
||
|
x2 - radius, y2,
|
||
|
x1 + radius, y2,
|
||
|
x1 + radius, y2,
|
||
|
x1, y2,
|
||
|
x1, y2 - radius,
|
||
|
x1, y2 - radius,
|
||
|
x1, y1 + radius,
|
||
|
x1, y1 + radius,
|
||
|
x1, y1]
|
||
|
|
||
|
return cv.create_polygon(points, **kwargs, smooth=True)
|
||
|
|
||
|
def validate_ip_address(ip_address):
|
||
|
"""
|
||
|
匹配ip地址格式是否规范
|
||
|
:param ip_address: IP地址
|
||
|
:return: Boolean
|
||
|
"""
|
||
|
# 定义IP地址的正则表达式模式
|
||
|
pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$'
|
||
|
pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$'
|
||
|
# 使用re模块进行匹配
|
||
|
match_with_subnet = re.match(pattern_with_subnet, ip_address)
|
||
|
match_without_subnet = re.match(pattern_without_subnet, ip_address)
|
||
|
if match_with_subnet:
|
||
|
# 带有子网掩码的IP地址
|
||
|
# 检查每个组件的取值范围是否在 0-255 之间
|
||
|
for group in match_with_subnet.groups()[:4]:
|
||
|
if not (0 <= int(group) <= 255):
|
||
|
return False
|
||
|
# 检查子网掩码的取值范围是否在 0-32 之间
|
||
|
subnet_mask = int(match_with_subnet.groups()[4])
|
||
|
if not (0 <= subnet_mask <= 32):
|
||
|
return False
|
||
|
return True
|
||
|
elif match_without_subnet:
|
||
|
# 不带子网掩码的IP地址
|
||
|
# 检查每个组件的取值范围是否在 0-255 之间
|
||
|
for group in match_without_subnet.groups():
|
||
|
if not (0 <= int(group) <= 255):
|
||
|
return False
|
||
|
return True
|
||
|
else:
|
||
|
# IP地址格式不正确
|
||
|
return False
|
||
|
|
||
|
|
||
|
class ExportUtil():
|
||
|
def __init__(self, path):
|
||
|
self.conn = sqlite3.connect('./network.db')
|
||
|
self.path = path
|
||
|
|
||
|
def get_table_names(self):
|
||
|
cursor = self.conn.cursor()
|
||
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") # 如果你使用SQLite数据库
|
||
|
tables = cursor.fetchall()
|
||
|
cursor.close()
|
||
|
return [table[0] for table in tables]
|
||
|
|
||
|
def export(self):
|
||
|
tables = self.get_table_names()
|
||
|
with pd.ExcelWriter(self.path, engine='openpyxl') as writer:
|
||
|
for table in tables:
|
||
|
table_name = table
|
||
|
# a. 从数据库中获取表的数据并存储在DataFrame中
|
||
|
query = f"SELECT * FROM {table_name}"
|
||
|
df = pd.read_sql(query, self.conn)
|
||
|
# b. 使用Pandas将数据写入Excel文件的不同sheet中
|
||
|
df.to_excel(writer, sheet_name=table_name, index=False)
|
||
|
|
||
|
def import_data(self):
|
||
|
excel_file = pd.ExcelFile(self.path)
|
||
|
sheet_names = excel_file.sheet_names
|
||
|
print(sheet_names)
|
||
|
cursor = self.conn.cursor()
|
||
|
for sheet_name in sheet_names:
|
||
|
# 4. 使用 Pandas 读取工作表数据
|
||
|
df = pd.read_excel(excel_file, sheet_name=sheet_name)
|
||
|
# 5. 获取工作表的列名
|
||
|
columns = df.columns.tolist()
|
||
|
# 6. 构造插入语句
|
||
|
columns_str = ', '.join(columns)
|
||
|
placeholders = ', '.join(['?' for _ in range(len(columns))])
|
||
|
sql = f"INSERT INTO {sheet_name} ({columns_str}) VALUES ({placeholders})"
|
||
|
# 7. 将数据插入数据库
|
||
|
for index, row in df.iterrows():
|
||
|
# 8. 使用动态生成的 SQL 语句将数据插入数据库
|
||
|
cursor.execute(sql, tuple(row))
|
||
|
self.conn.commit()
|
||
|
|
||
|
|
||
|
class RouterConfigWindow(tk.Toplevel):
|
||
|
def __init__(self, parent, router_obj):
|
||
|
super().__init__(parent)
|
||
|
self.geometry("435x433+350+200")
|
||
|
self.title(f"{router_obj.ObjLabel}路由表配置")
|
||
|
self.router_obj = router_obj
|
||
|
self.interface_entries = []
|
||
|
self.router_table = {}
|
||
|
self.create_interface_inputs()
|
||
|
self.create_router_table()
|
||
|
|
||
|
def create_interface_inputs(self):
|
||
|
label_text = ["接口1", "接口2", "接口3", "接口4"]
|
||
|
for i in range(4):
|
||
|
label = tk.Label(self, text=label_text[i], font=("黑体", 16))
|
||
|
label.grid(row=i, column=0, padx=10, pady=5, sticky="w")
|
||
|
entry = tk.Entry(self, width=20, font=("黑体", 16),)
|
||
|
entry.grid(row=i, column=1, padx=10, pady=5, sticky="w")
|
||
|
self.interface_entries.append(entry)
|
||
|
button = tk.Button(self, text="添加", font=("黑体", 16), command=lambda index=i: self.add_router_entry(index))
|
||
|
button.grid(row=i, column=2, padx=10, pady=5)
|
||
|
lab = LabelFrame(self, text="示例")
|
||
|
lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20)
|
||
|
Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11", font=("黑体", 16)).pack()
|
||
|
|
||
|
def create_router_table(self):
|
||
|
def on_right_click(event):
|
||
|
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
|
||
|
if row:
|
||
|
self.router_treeview.selection_set(row) # 选中该行
|
||
|
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
|
||
|
|
||
|
def delete_row():
|
||
|
selected_items = self.router_treeview.selection() # 获取选中的行
|
||
|
for item in selected_items:
|
||
|
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
|
||
|
self.router_obj.delete_config(ifs, network)
|
||
|
self.router_treeview.delete(item)
|
||
|
self.router_table_frame = tk.Frame(self)
|
||
|
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
|
||
|
style = ttk.Style()
|
||
|
style.configure("Custom.Treeview.Heading", font=("宋体", 15))
|
||
|
style.configure("Custom.Treeview", rowheight=30, font=("宋体", 15))
|
||
|
self.router_treeview = ttk.Treeview(self.router_table_frame, style="Custom.Treeview", columns=("Interface", "Route"), show="headings")
|
||
|
self.router_treeview.heading("Interface", text="接口")
|
||
|
self.router_treeview.heading("Route", text="网段")
|
||
|
self.router_treeview.pack(side="left", fill="both")
|
||
|
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
|
||
|
scrollbar.pack(side="right", fill="y")
|
||
|
self.router_treeview.configure(yscrollcommand=scrollbar.set)
|
||
|
self.router_table = self.router_obj.router_table
|
||
|
self.router_treeview.bind("<Button-3>", on_right_click)
|
||
|
# 创建删除菜单
|
||
|
delete_menu = tk.Menu(self.master, tearoff=False)
|
||
|
delete_menu.add_command(label="删除", command=delete_row)
|
||
|
self.update_router_table()
|
||
|
|
||
|
def add_router_entry(self, index):
|
||
|
entry_text = self.interface_entries[index].get()
|
||
|
try:
|
||
|
ipaddress.ip_network(entry_text)
|
||
|
if isinstance(self.router_obj, SimRouter):
|
||
|
if not validate_ip_address(entry_text):
|
||
|
messagebox.showerror("注意", message="添加的网段信息格式不合格")
|
||
|
self.interface_entries[index].delete(0, tk.END)
|
||
|
self.focus_set()
|
||
|
return
|
||
|
if entry_text:
|
||
|
if index + 1 in self.router_table:
|
||
|
self.router_table[index + 1].append(entry_text)
|
||
|
else:
|
||
|
self.router_table[index + 1] = [entry_text]
|
||
|
self.interface_entries[index].delete(0, tk.END)
|
||
|
self.router_obj.add_config(entry_text, index + 1)
|
||
|
self.update_router_table()
|
||
|
self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!")
|
||
|
except:
|
||
|
messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12")
|
||
|
return
|
||
|
|
||
|
def update_router_table(self):
|
||
|
self.router_treeview.delete(*self.router_treeview.get_children())
|
||
|
for i, entrys in self.router_table.items():
|
||
|
for entry in entrys:
|
||
|
self.router_treeview.insert("", "end", values=(f"接口{i}", entry))
|
||
|
|
||
|
|
||
|
class SwitchConfigWindow(RouterConfigWindow):
|
||
|
def __init__(self, parent, router_obj):
|
||
|
super().__init__(parent, router_obj)
|
||
|
self.geometry("435x433+350+200")
|
||
|
self.title(f"{router_obj.ObjLabel}交换表配置")
|
||
|
self.router_obj = router_obj
|
||
|
self.interface_entries = []
|
||
|
self.router_table = {}
|
||
|
self.create_interface_inputs()
|
||
|
self.create_router_table()
|
||
|
|
||
|
def create_router_table(self):
|
||
|
def on_right_click(event):
|
||
|
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
|
||
|
if row:
|
||
|
self.router_treeview.selection_set(row) # 选中该行
|
||
|
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
|
||
|
|
||
|
def delete_row():
|
||
|
selected_items = self.router_treeview.selection() # 获取选中的行
|
||
|
for item in selected_items:
|
||
|
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
|
||
|
self.router_obj.delete_config(ifs, network)
|
||
|
self.router_treeview.delete(item)
|
||
|
|
||
|
self.router_table_frame = tk.Frame(self)
|
||
|
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
|
||
|
self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings")
|
||
|
self.router_treeview.heading("Interface", text="接口")
|
||
|
self.router_treeview.heading("Route", text="mac")
|
||
|
self.router_treeview.pack(side="left", fill="both")
|
||
|
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
|
||
|
scrollbar.pack(side="right", fill="y")
|
||
|
self.router_treeview.configure(yscrollcommand=scrollbar.set)
|
||
|
self.router_table = self.router_obj.mac_table
|
||
|
self.router_treeview.bind("<Button-3>", on_right_click)
|
||
|
# 创建删除菜单
|
||
|
delete_menu = tk.Menu(self.master, tearoff=False)
|
||
|
delete_menu.add_command(label="删除", command=delete_row)
|
||
|
self.update_router_table()
|
||
|
|
||
|
|
||
|
def add_router_entry(self, index):
|
||
|
entry_text = self.interface_entries[index].get()
|
||
|
if isinstance(self.router_obj, SimRouter):
|
||
|
if not validate_ip_address(entry_text):
|
||
|
messagebox.showerror("注意", message="添加的网段信息格式不合格")
|
||
|
self.interface_entries[index].delete(0, tk.END)
|
||
|
self.focus_set()
|
||
|
return
|
||
|
if entry_text:
|
||
|
if index + 1 in self.router_table:
|
||
|
self.router_table[index + 1].append(entry_text)
|
||
|
else:
|
||
|
self.router_table[index + 1] = [entry_text]
|
||
|
self.interface_entries[index].delete(0, tk.END)
|
||
|
self.router_obj.add_config(entry_text, index + 1)
|
||
|
self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!")
|
||
|
self.update_router_table()
|
||
|
|
||
|
|
||
|
class TransferAnimate:
|
||
|
"""
|
||
|
校验动画
|
||
|
"""
|
||
|
def __init__(self, master, flag=False):
|
||
|
self.master = master
|
||
|
self.flag = flag
|
||
|
self.width, self.height = 320, 140
|
||
|
self.host_img = ImageTk.PhotoImage(
|
||
|
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
|
||
|
self.filename = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
|
||
|
self.toplevel = Toplevel(alpha=0.9)
|
||
|
self.toplevel.geometry("{}x{}+200+150".format(self.width, self.height))
|
||
|
self.toplevel.title("校验")
|
||
|
self.toplevel.resizable(width=False, height=False)
|
||
|
self.canvas = Canvas(self.toplevel, width=self.width, height=self.height, bg="red")
|
||
|
self.canvas.place(x=0, y=0, anchor=NW)
|
||
|
back = self.canvas.create_image(10, 20, image=master.check_filename)
|
||
|
self.send_x, self.receive_x, self.obj_y = 50, self.width - 50, self.height / 2 - 20
|
||
|
self.canvas.create_image(self.send_x, self.obj_y, image=master.host_img)
|
||
|
self.canvas.create_image(self.receive_x, self.obj_y, image=master.host_img)
|
||
|
line = self.canvas.create_line(self.send_x, self.obj_y, self.receive_x, self.obj_y, width=2, fill="#c372f0")
|
||
|
self.canvas.tag_lower(line)
|
||
|
self.canvas.tag_lower(back)
|
||
|
self.animate()
|
||
|
|
||
|
def animate(self):
|
||
|
for i in range(3):
|
||
|
self.canvas.create_image(self.send_x, self.obj_y, image=self.master.pack_img, tags="check_img")
|
||
|
self.move(self.canvas, self.receive_x, self.obj_y, 500)
|
||
|
self.canvas.delete("check_img")
|
||
|
self.canvas.update()
|
||
|
self.canvas.create_image(self.receive_x - self.send_x - 45, self.obj_y, image=self.master.true_img if self.flag else self.master.false_img)
|
||
|
self.canvas.update()
|
||
|
sleep(2)
|
||
|
if not self.flag:
|
||
|
messagebox.showerror("提示", "校验失败,请检查组件{}的相关配置!".format(self.master.check_error_obj.ObjLabel))
|
||
|
self.toplevel.destroy()
|
||
|
def move(self, cv, target_x, target_y, duration):
|
||
|
start_x, start_y = cv.coords("check_img")
|
||
|
distance_x = target_x - start_x
|
||
|
distance_y = target_y - start_y
|
||
|
steps = duration // 10 # 以10毫秒为间隔进行移动
|
||
|
step_x = distance_x / steps
|
||
|
step_y = distance_y / steps
|
||
|
self._move_step(cv, start_x, start_y, target_x, target_y, step_x, step_y, steps)
|
||
|
|
||
|
def _move_step(self, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps):
|
||
|
if steps > 0:
|
||
|
new_x = start_x + step_x
|
||
|
new_y = start_y + step_y
|
||
|
cv.coords("check_img", new_x, new_y)
|
||
|
cv.update() # 更新画布显示
|
||
|
sleep(0.01) # 添加延迟以控制动画速度
|
||
|
self._move_step(cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1)
|
||
|
else:
|
||
|
cv.coords("check_img", target_x, target_y)
|
||
|
cv.delete("check_img")
|
||
|
|