You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

796 lines
31 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import math
import sys
import threading
from time import sleep
from tkinter import Tk
from ttkbootstrap import *
from uuid import uuid4
import ipaddress
from PIL import ImageTk, Image
from dbUtil import search, execute_sql
class SimBase():
# todo: 组件父类
"""
图标类,所有组件的父类
"""
def __init__(self, canvas, x, y, id, config=None, label=None):
self.ConfigCorrect = 0 if config is None else config # 是否进行配置
self.ObjType = None # 组件类型 1->主机2->路由器3->交换机4->集线器
self.ObjLabel = ""
self.ObjID = id
self.ObjX = x
self.ObjY = y
self.canvas = canvas
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.host_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机--暗色@2x.png").resize((40, 40)))
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.router_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器--暗色@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.switch_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机--暗色@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.hub_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器--暗色@2x.png").resize((40, 40)))
self.img = None
self.img_tm = None
self.interface = [{}, {}, {}, {}]
self.connections = ["1", "2", "3", "4"]
self.set_default_config()
def bind_event(self):
self.canvas.tag_bind(self.ObjID, "<Button-1>", self.start_drag)
self.canvas.tag_bind(self.ObjID, "<B1-Motion>", self.drag)
self.canvas.tag_bind(self.ObjID, "<ButtonRelease-1>", self.release)
def set_default_name(self):
data_frame = search(f"select objid from sim_objs where objType={self.ObjType}")
num = data_frame.size + 1
if isinstance(self, SimHost):
name = "SHO%d" % num
elif isinstance(self, SimRouter):
name = "SRO%d" % num
elif isinstance(self, SimSwitch):
name = "SWI%d" % num
else:
name = "SHUB%d" % num
return name
def release(self, event):
if self.click_x == event.x and self.click_y == event.y: # 鼠标左键单击
self.canvas.chose_obj = self
self.canvas.is_chose()
else:
self.update()
def start_drag(self, event):
self.canvas.tag_raise(self.ObjID) # 将 SimBase 组件置于最上层
self.start_x = event.x
self.start_y = event.y
self.click_x = event.x
self.click_y = event.y
def drag(self, event):
"""
移动图标
:param event:
:return:
"""
self.canvas.delete("rectangle")
self.canvas.chose_obj = None
dx = event.x - self.start_x
dy = event.y - self.start_y
# 移动范围限制, 超出移动范围则直接返回
if not (self.canvas.canvas_size[0] + 20 <= self.ObjX + dx <= self.canvas.canvas_size[2] - 20 and 20 + self.canvas.canvas_size[1] <= self.ObjY + dy <= self.canvas.canvas_size[3] - 20):
return
self.ObjX += dx
self.ObjY += dy
self.canvas.move(self.ObjID, dx, dy) # 移动 SimBase 组件
self.canvas.move(self.ObjID + "text", dx, dy) # 移动 SimBase 组件
self.start_x = event.x
self.start_y = event.y
for conn in self.connections:
if isinstance(conn, AllSimConnect):
conn.update_line()
def create_img(self):
"""
创建图片
:return:
"""
if self.ObjType == 1:
self.img = self.host_img
self.img_tm = self.host_img_tm
elif self.ObjType == 2:
self.img = self.router_img
self.img_tm = self.router_img_tm
elif self.ObjType == 3:
self.img = self.switch_img
self.img_tm = self.switch_img_tm
else:
self.img = self.hub_img
self.img_tm = self.hub_img_tm
self.canvas.delete("L")
id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30,
image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw",
tags=self.ObjID)
self.canvas.dtag("L", "L")
self.canvas.create_text(self.ObjX - 20, self.ObjY - 60, text=self.ObjLabel, font=("", 16, "bold"),
fill="#9b78eb", tags=self.ObjID + "text", anchor="nw")
self.canvas.tag_raise(id)
self.bind_event()
def config(self, interface):
"""
网络配置方法,
:param interface: 传入配置数据
:return:
"""
for key, value in interface.items():
self.interface[key - 1] = {key: value.get() if not value.get() == "" else "NULL" for key, value in value.items()}
self.ConfigCorrect = 1
self.create_img()
for conn in self.connections:
if isinstance(conn, AllSimConnect):
index = self.connections.index(conn) + 1
conn.update_node_config(self, index)
self.update()
def set_default_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
for index, conn in conn_data.iterrows():
self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"],
"mac": conn["mac"],
"conn_port": conn["conn_port"],
"addr": conn["addr"]}
def get_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
return conn_data
def save(self):
"""
将对象存储至mysql当中
:return:
"""
sql = f"insert into sim_objs values ('{self.ObjID}', {self.ObjType}, '{self.ObjLabel}'," \
f"{self.ObjX}, {self.ObjY}, {self.ConfigCorrect})"
execute_sql(sql)
def update(self):
"""
当坐标发生改变时修改数据库数据
:return:
"""
self.canvas.delete(self.ObjID + "text")
self.canvas.create_text(self.ObjX - 20, self.ObjY - 60, text=self.ObjLabel, font=("", 16, "bold"),
fill="#7030a0", tags=self.ObjID + "text", anchor="nw")
sql = f"update sim_objs set objLabel='{self.ObjLabel}', ObjX={self.ObjX}," \
f"ObjY={self.ObjY}, ConfigCorrect={self.ConfigCorrect} where ObjID='{self.ObjID}'"
execute_sql(sql)
def transfer_animate(self, status, packet, error_message=None):
if status:
text = f"目的IP: {str(packet.destination_ip)}\n" \
f"目的MAC: {packet.destination_mac}\n" \
f"消息内容: {packet.message}"
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20, outline="#92d050",
width=3, fill="#92d050", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
else:
text = f"传输失败\n" if error_message is None else error_message
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red",
width=3, fill="red", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimPacket():
# todo: 数据包类
"""
数据包类
"""
def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message, ping=False):
"""
:param source_mac: 源主机mac地址
:param source_ip: 源主机ip地址
:param destination_mac: 目的主机mac地址
:param destination_ip: 目的主机ip地址
:param message: 数据
"""
self.source_mac = source_mac
self.source_ip = source_ip
self.destination_mac = destination_mac
self.destination_ip = destination_ip
self.message = message
self.up_jump = None # 上一跳连接对象
self.ping = ping
self.flag = False
self.img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30)))
self.id = str(uuid4())
def move(self, cv, target_x, target_y, duration):
start_x, start_y = cv.coords(self.id)
distance_x = target_x - start_x
distance_y = target_y - start_y
steps = duration // 10 # 以10毫秒为间隔进行移动
step_x = distance_x / steps
step_y = distance_y / steps
self._move_step(cv, start_x, start_y, target_x, target_y, step_x, step_y, steps)
def _move_step(self, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps):
if steps > 0:
new_x = start_x + step_x
new_y = start_y + step_y
cv.coords(self.id, new_x, new_y)
cv.update() # 更新画布显示
sleep(0.01) # 添加延迟以控制动画速度
self._move_step(cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1)
else:
cv.coords(self.id, target_x, target_y)
cv.delete(self.id)
def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase):
if self.ping:
return
cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id)
self.move(cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500)
class AllSimConnect():
# todo: 连接类
def __init__(self, canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None):
"""
连接对象
:param nodex: 节点
:param nodex_ifs: 节点接口
:param nodey: 节点
:param nodey_ifs: 节点接口
"""
self.canvas = canvas
self.ConfigCorrect = 0 if config is None else config
self.NobjS = nodex
self.NobjE = nodey
self.IfsS = nodex_ifs
self.IfsE = nodey_ifs
self.width = False if self.ConfigCorrect == 0 else True # 线的粗细
def is_connected_to(self, other):
"""
判断两个连接对象是否已经连接
:param other: 另一个连接对象
:return: 如果已连接则返回True否则返回False
"""
return (
self.NobjS.ObjID == other.NobjS.ObjID
and self.NobjE.ObjID == other.NobjE.ObjID
)
def update_node_config(self, node, ifs):
"""
当节点进行网络配置后将节点的接口配置保存
:return:
"""
try:
nodex_update_sql = f"""
update conn_config set
ip='{node.interface[ifs - 1]["ip"]}',
mac='{node.interface[ifs - 1]["mac"]}'
"""
if not node.interface[ifs - 1]["conn_port"] == "NULL":
nodex_update_sql += f"""
,conn_port='{int(node.interface[ifs - 1]["conn_port"])}'
"""
if not node.interface[ifs - 1]["addr"] == "NULL":
nodex_update_sql += f"""
,addr='{node.interface[ifs - 1]["addr"]}'
"""
nodex_update_sql += f"""
where node_id='{node.ObjID}' and node_ifs={ifs}
"""
execute_sql(nodex_update_sql)
self.ConfigCorrect = 1
self.check_config()
update_sql = f"""
update sim_conn set ConfigCorrect={self.ConfigCorrect}
where conn_id in (
select distinct conn_id from conn_config where node_id='{node.ObjID}'
)
"""
execute_sql(update_sql)
except Exception as E:
print(E)
self.canvas.message.show_message(f"{E}")
def check_config(self):
"""
检查两边节点的配置是否正确
:return:
"""
if self.NobjS.interface[self.IfsS - 1]["mac"] != "" and self.NobjS.interface[self.IfsS - 1][
"ip"] != "" \
and self.NobjE.interface[self.IfsE - 1]["mac"] != "" and self.NobjE.interface[self.IfsE - 1][
"ip"] != "":
self.width = 4
self.draw_line()
else:
self.width = 1
self.draw_line()
def transfer(self, source_node, packet: SimPacket):
"""
传输数据
:param packet: 数据包
:return:
"""
if source_node == self.NobjS:
if not packet.ping:
packet.transfer_packet(self.canvas, self.NobjS, self.NobjE)
self.NobjE.receive(packet)
else:
packet.transfer_packet(self.canvas, self.NobjE, self.NobjS)
self.NobjS.receive(packet)
def draw_line(self):
if self.width:
line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY,
width=2, fill="#c372f0", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line")
else:
line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY,
width=2, dash=(250, 4), fill="#c372f0", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line")
self.canvas.tag_lower(line, 2)
self.analyseIFS(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjS.ObjLabel, self.IfsS,
self.NobjE.ObjX, self.NobjE.ObjY, self.NobjE.ObjLabel, self.IfsE)
def update_line(self):
"""
当组件移动时重新绘制线
:return:
"""
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line")
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID)
self.draw_line()
def update_info(self, obj_id):
"""
修改数据库中的连接信息
:return:
"""
update_sql = f"""
update sim_conn set conn_id='{self.NobjS.ObjLabel}-{self.NobjE.ObjLabel}', ConfigCorrect={self.ConfigCorrect}
where conn_id in (
select distinct conn_id from conn_config where node_id='{obj_id}'
)
"""
execute_sql(update_sql)
def analyseIFS(self, SX, SY, SLabel, IfsS, EX, EY, ELabel, IfsE): # NobjS的x,y;NobjE的x,y; #分析接口在哪个位置,再显示
'''
:param SX: S对象的x坐标
:param SY: S对象的y坐标
:param SLabel: S对象的标签
:param IfsS: S对象要显示的接口号
'''
offset = 15
if (EX - SX) == 0: # 即垂直的时候,NobjS在NobjE对象的正上方和正下方x坐标无变化只需要y变化
R = 18
x = 0 # x无偏移量
y = R
else:
k = (EY - SY) / (EX - SX) # ey-sy/sx-ex
reat = math.atan(k) # 根据斜率计算弧度
# 连线与图标交接点坐标
R = 18
x = abs(math.cos(reat) * R) + 6 # python math中三角函数中的数值是弧度而计算器中的数值是角度
y = abs(math.sin(reat) * R) + 6
if SX <= EX and SY <= EY: # NobjS在NobjE的左上角NobjS的右下角连接NobjE的左上角
x_S = SX + x
y_S = SY + y # NobjS的连接点坐标
x_E = EX - x
y_E = EY - y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text((x_S + offset, y_S - offset), text=IfsS, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E - offset, y_E - offset, text=IfsE, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX < EX and SY > EY: # NobjS在NobjE的左下角NobjS的右上角连接NobjE的左下角
x_S = SX + x
y_S = SY - y # NobjS的连接点坐标
x_E = EX - x
y_E = EY + y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S + offset, y_S - offset, text=IfsS, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E - offset, y_E, text=IfsE, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX > EX and SY < EY: # NobjS在NobjE的右上角NobjS的左下角连接NobjE的右上角
x_S = SX - x
y_S = SY + y # NobjS的连接点坐标
x_E = EX + x
y_E = EY - y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S - offset, y_S, text=IfsS, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E + offset, y_E, text=IfsE, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
elif SX >= EX and SY >= EY: # NobjS在NobjE的右下角NobjS的左上角连接NobjE的右下角
x_S = SX - x
y_S = SY - y # NobjS的连接点坐标
x_E = EX + x
y_E = EY + y # NobjE的连接点坐标
# 显示S接口号
self.canvas.create_text(x_S - offset, y_S - offset, text=IfsS, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
# 显示E接口号
self.canvas.create_text(x_E + offset, y_E - offset, text=IfsE, anchor="nw", font=("幼圆", 16, "bold"), fill="#5fa6d6",
tag=self.NobjS.ObjID + self.NobjE.ObjID) # 显示文字
def save(self):
"""
将连接对象保存至数据库
:return:
"""
conn_id = self.NobjS.ObjLabel + "-" + self.NobjE.ObjLabel
sql = f"insert into sim_conn values ('{conn_id}', {self.ConfigCorrect})"
execute_sql(sql)
execute_sql(
f"insert into conn_config values ('{conn_id}', '{self.NobjS.ObjID}', {self.IfsS}, '', '', '', '')")
execute_sql(
f"insert into conn_config values ('{conn_id}', '{self.NobjE.ObjID}', {self.IfsE}, '', '', '', '')")
def delete_line(self):
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID)
self.canvas.delete(self.NobjS.ObjID + self.NobjE.ObjID + "line")
def __eq__(self, other):
"""
重写equals方法判断两个连接对象是否相同
:param other: 连接对象
:return: Boolean
"""
if isinstance(other, AllSimConnect):
other_ids = other.NobjS.ObjID + other.NobjE.ObjID
return (self.NobjS.ObjID in other_ids
and self.NobjE.ObjID in other_ids)
return False
class SimHost(SimBase):
# todo: 主机类
"""
主机类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 1
self.ObjLabel = label if label is not None else self.set_default_name()
self.interface = [{}]
self.connections = [None]
self.set_default_config()
def create_packet(self, ip, mac, message, ping=False):
"""
创建数据包
:param ip: 目的主机ip
:param mac: 目的主机mac
:param message: 消息
:return:
"""
if ping:
packet = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], ip, mac, message, True)
else:
packet = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"], ip, mac, message)
print(f"创建数据包成功,数据包由{packet.source_ip} 发往 {packet.destination_ip}")
self.send(packet)
def send(self, packet):
"""
发送数据包
:param packet:
:return:
"""
connection: AllSimConnect = self.connections[0]
print(f"数据包从 {self.ObjLabel} 发出")
packet.up_jump = connection
connection.transfer(self, packet)
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"主机{self.ObjLabel}接受到数据{packet.message}")
if packet.destination_ip == self.interface[0]["ip"]:
if not packet.ping:
self.transfer_animate(True, packet)
else:
self.canvas.receive = self
else:
self.transfer_animate(False, packet)
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"AppAddr: /Root\n"
str += f"PORT: {data['conn_port']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimRouter(SimBase):
# todo: 路由类
"""
路由类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 2
self.ObjLabel = label if label is not None else self.set_default_name()
self.router_table = {}
self.set_default_router_table()
def set_default_router_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from router_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.router_table:
self.router_table[router_table["node_ifs"]].append(router_table["segment"])
else:
self.router_table[router_table["node_ifs"]] = [router_table["segment"]]
def check_destination_ip(self, destination_ip, network):
"""
检查目标ip是否属于网段范围内
:param destination_ip: 目标ip
:param network: 网段
:return:10.2.3.0/24
"""
ip = ipaddress.ip_address(destination_ip)
network = ipaddress.ip_network(network)
if ip in network:
return True
if network == "0.0.0.0/24": # 如果网段为 0.0.0.0/24 则为默认路由
return True
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
flag = False
next_hop_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
if conn == packet.up_jump:
continue
ifs = self.connections.index(conn) + 1
if self.router_table.get(ifs) is None:
continue
for network in self.router_table.get(ifs):
if self.check_destination_ip(packet.destination_ip, network):
flag = True
next_hop_ifs = ifs
if flag:
conn = self.connections[next_hop_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
else:
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.NobjS != self:
if conn.NobjE.ObjType == 1:
conn.transfer(self, packet)
break
error_message = "路由寻址失败"
if packet.ping:
self.canvas.check_error_obj = self
return
self.transfer_animate(False, packet, error_message)
def receive(self, packet):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"{self.ObjLabel}-路由器接受到数据{packet.message}")
self.transmit(packet)
def add_config(self, router, router_ifs):
sql = f"insert into router_table values ('{self.ObjID}', {router_ifs}, '{router}')"
execute_sql(sql)
def delete_config(self, ifs, network):
sql = f"delete from router_table where obj_id='{self.ObjID}' and node_ifs={ifs} and segment='{network}'"
execute_sql(sql)
def get_table_config(self):
"""
返回对象的路由表配置信息,用于展示
:return:
"""
str = ""
sql = f"select * from router_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router in router_tables.iterrows():
str += f"网段号: {router['segment']}\n"
str += f"接口号: {router['node_ifs']}\n"
return str
class SimSwitch(SimBase):
# todo: 交换机类
"""
交换机类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 3
self.ObjLabel = label if label is not None else self.set_default_name()
self.mac_table = {}
self.set_default_mac_table()
def set_default_mac_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from mac_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.mac_table:
self.mac_table[router_table["node_ifs"]].append(router_table["mac"])
else:
self.mac_table[router_table["node_ifs"]] = [router_table["mac"]]
def add_config(self, router, router_ifs):
sql = f"insert into mac_table values ('{self.ObjID}', {router_ifs}, '{router}')"
execute_sql(sql)
def delete_config(self, ifs, mac):
sql = f"delete from mac_table where obj_id='{self.ObjID}' and node_ifs={ifs} and mac='{mac}'"
execute_sql(sql)
def get_table_config(self):
"""
返回对象的交换表配置信息,用于展示
:return:
"""
str = ""
sql = f"select * from mac_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router in router_tables.iterrows():
str += f"网段号: {router['mac']}\n"
str += f"接口号: {router['node_ifs']}\n"
return str
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
flag = False
next_hub_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
ifs = self.connections.index(conn) + 1
if packet.destination_mac in self.mac_table.get(ifs, []):
flag = True
next_hub_ifs = ifs
if flag:
conn = self.connections[next_hub_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
return
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"交换机{self.ObjLabel}接受到数据{packet.message}")
self.transmit(packet)
class SimHub(SimBase):
"""
集线器类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 4
self.ObjLabel = label if label is not None else self.set_default_name()
def transmit(self, packet: SimPacket):
"""
集线器转发数据包
:return:
"""
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!")
self.transmit(packet)