新版本代码

main
陈玉辉 5 months ago
parent 24b7906d62
commit 0c05efe3cc

@ -0,0 +1,107 @@
import socket
import struct
def format_mac_address(mac_bytes):
"""
Format a MAC address from bytes to a human-readable string.
Args:
mac_bytes (bytes): The MAC address in bytes format.
Returns:
str: The MAC address in human-readable format.
"""
mac_str = mac_bytes.hex()
mac_formatted = ':'.join(mac_str[i:i+2] for i in range(0, 12, 2))
return mac_formatted
class AgreementUtil():
@staticmethod
def create_udp_packet(data, dest_port, src_port):
"""
创建UDP数据包
Returns:
bytes: `UDP数据包`.
"""
# UDP header fields
udp_header = struct.pack('!HHHH', src_port, dest_port, 8 + len(data), 0) # Length includes header length
return udp_header + data.encode()
@staticmethod
def create_ip_packet(udp_packet, dest_ip, src_ip):
"""
创建IP数据包
Returns:
bytes: IP数据包
"""
# IP header fields
version = 4
ihl = 5 # Internet Header Length
type_of_service = 0
total_length = 20 + len(udp_packet) # 20 bytes IP header + UDP packet length
identification = 54321
flags = 0 # Don't fragment
fragment_offset = 0
ttl = 64 # Time to live
protocol = 17 # UDP
checksum = 0 # For simplicity, set to 0
source_ip = socket.inet_aton(src_ip)
dest_ip = socket.inet_aton(dest_ip)
# IP数据包头部字段
ip_header = struct.pack('!BBHHHBBH4s4s',
(version << 4) + ihl, type_of_service, total_length,
identification, (flags << 13) + fragment_offset,
ttl, protocol, checksum, source_ip, dest_ip)
return ip_header + udp_packet
@staticmethod
def create_ethernet_frame(ip_packet, dest_mac, src_mac):
"""
创建以太网帧数据包
Returns:
bytes: 以太网帧数据包
"""
# Convert MAC addresses from string format to bytes
dest_mac_bytes = bytes.fromhex(dest_mac.replace(':', ''))
src_mac_bytes = bytes.fromhex(src_mac.replace(':', ''))
# Ethernet type for IP (0x0800)
ethernet_type = 0x0800
# Pack Ethernet frame fields
ethernet_frame = struct.pack('!6s6sH', dest_mac_bytes, src_mac_bytes, ethernet_type)
return ethernet_frame + ip_packet
@staticmethod
def parse_ethernet_frame(frame):
"""
解析以太网帧数据包
Returns: 发送端MAC接收端MAC以太网类型IP数据包
"""
dest_mac, src_mac, ethertype = struct.unpack('!6s6sH', frame[:14])
payload = frame[14:]
return format_mac_address(src_mac), format_mac_address(dest_mac), ethertype, payload
@staticmethod
def parse_ip_packet(packet):
"""
解析 IP 数据包
Returns: 发送端IP接收端IP协议UDP数据包
"""
version_ihl, type_of_service, total_length, identification, flags_fragment_offset, \
ttl, protocol, checksum, source_ip, dest_ip = struct.unpack('!BBHHHBBH4s4s', packet[:20])
payload = packet[20:]
return socket.inet_ntoa(source_ip), socket.inet_ntoa(dest_ip), protocol, payload
@staticmethod
def parse_udp_packet(packet):
"""
解析UDP数据包
Returns:
tuple: 发送主机端口接收主机端口数据
"""
src_port, dest_port, length, checksum = struct.unpack('!HHHH', packet[:8])
data = packet[8:]
return src_port, dest_port, data.decode()

@ -0,0 +1,421 @@
import ipaddress
import sys
import threading
from tkinter import filedialog
from SimUtil import *
from SimObjs import SimPacket, SimHost, AllSimConnect, SimRouter, SimSwitch, SimHub, SimBase
from dbUtil import search, execute_sql, delete_obj, truncate_db
from PIL import Image as imim
from packet import *
class Message(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.message = []
self.master = master
self.scrollbar_y = tk.Scrollbar(master, orient="vertical", command=self.yview)
self.scrollbar_y.pack(side="right", fill=BOTH)
self.scrollbar_x = tk.Scrollbar(master, orient=HORIZONTAL, command=self.xview)
self.scrollbar_x.pack(side="bottom", fill=BOTH)
self.config(yscrollcommand=self.scrollbar_y.set, xscrollcommand=self.scrollbar_x.set)
def show_message(self, message, color="#5fa8fe"):
for mes in message.split("\n"):
self.message.append({"message": mes, "color": color})
num = 0
self.delete("message")
for function in self.message:
self.create_text(20, 25 * num + 10, anchor="nw", text=function["message"], font=("", 15),
fill=function["color"], tags="message")
num += 1
self.update()
self.configure(scrollregion=self.bbox("all"))
self.scrollbar_y.set(1.0, 1.0)
self.yview_moveto(1.0)
class NetWorkAnalog(Canvas):
def __init__(self, master, **kwargs):
super().__init__(master, **kwargs)
self.master = master
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.width = int(self.cget("width"))
self.height = int(self.cget("height"))
self.canvas_size = [150, 0, int(self.width * 0.8), int(self.height * 0.8)]
self.label_top_img = ImageTk.PhotoImage(imim.open("../datas/images/右上框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.4))))
self.label_bottom_img = ImageTk.PhotoImage(imim.open("../datas/images/右下框@2x.png").resize((int(self.width - self.width * 0.85), int(self.canvas_size[3] * 0.45))))
self.message_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/日志消息展示框@2x.png").resize((self.width - 60, self.height - self.canvas_size[3] - 30))
)
self.text_back_img = ImageTk.PhotoImage(
Image.open("../images/文字背景@3x.png").resize((155, 90))
)
message_frame = Frame(self, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
message_frame.place(x=15, y=self.canvas_size[3] + 15, anchor=NW)
self.message = Message(message_frame, width=self.width - 60, height=self.height - self.canvas_size[3] - 30)
self.message.pack()
self.message.configure(bg="#edf8ff")
self.configure(bg="#ddeafe")
self.filename = ImageTk.PhotoImage(imim.open("../datas/images/背景@2x.png").resize((self.width, self.canvas_size[3])))
self.create_image(0, 0, image=self.filename, anchor=NW)
self.chose = self.host_img
self.AllSimObjs = {}
self.conns = []
self.drawLine = True # 画线标志
self.line_start_obj = None
self.line_end_obj = None
self.line_start_ifs = None
self.line_end_ifs = None
self.chose_obj = None
self.show_label_flag = True
self.show_interface_flag = True
self.trans_obj = set()
self.create_widget()
def is_chose(self):
"""
当被选中时绘制选中框
:return:
"""
self.delete("rectangle")
self.create_rectangle(self.chose_obj.ObjX - 35, self.chose_obj.ObjY - 35, self.chose_obj.ObjX + 15,
self.chose_obj.ObjY + 15, outline="red", tags="rectangle")
def reload_data(self):
# todo: 加载上一次程序运行的数据
"""
加载上一次程序运行时的数据
:return:
"""
self.AllSimObjs = {}
self.conns = []
sim_obj_sql = "select * from sim_objs"
sim_obj_data = search(sim_obj_sql)
for index, row in sim_obj_data.iterrows(): # 初始化组件对象
sim_type = row["ObjType"]
ObjX = row["ObjX"]
ObjY = row["ObjY"]
ConfigCorrect = row["ConfigCorrect"]
ObjLable = row["ObjLabel"]
ObjID = row["ObjID"]
if sim_type == 1:
tag = SimHost(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 2:
tag = SimRouter(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
elif sim_type == 3:
tag = SimSwitch(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
else:
tag = SimHub(self, ObjX, ObjY, ObjID, ConfigCorrect, ObjLable)
tag.create_img()
self.AllSimObjs[tag.ObjID] = tag
sim_conn_sql = "select s.conn_id, ConfigCorrect, node_id, node_ifs from sim_conn s join conn_config c on s.conn_id=c.conn_id"
sim_conn_data = search(sim_conn_sql)
conn_datas = {}
for index, conn in sim_conn_data.iterrows():
if (conn["conn_id"], conn["ConfigCorrect"]) not in conn_datas:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])] = [(conn["node_id"], conn["node_ifs"])]
else:
conn_datas[(conn["conn_id"], conn["ConfigCorrect"])].append((conn["node_id"], conn["node_ifs"]))
for key, value in conn_datas.items():
conn_obj = AllSimConnect(self, self.AllSimObjs[value[0][0]], value[0][1],
self.AllSimObjs[value[1][0]], value[1][1], key[1])
conn_obj.draw_line()
self.AllSimObjs[value[0][0]].connections[value[0][1] - 1] = conn_obj # 将连接对象传入组件对象
self.AllSimObjs[value[1][0]].connections[value[1][1] - 1] = conn_obj
self.conns.append(conn_obj)
conn_obj.ConfigCorrect = key[1]
def show_obj(self, AllSimObj, conns):
self.AllSimObjs = AllSimObj
self.conns = conns
for key, sim_obj in self.AllSimObjs.items():
sim_obj.create_img()
for conn in self.conns:
conn.draw_line()
def send_packet(self):
"""
发送数据包
:return:
"""
hosts = {}
for key, tag in self.AllSimObjs.items():
if tag.ObjType == 1 and tag.ConfigCorrect == 1:
hosts[tag.ObjLabel] = tag.ObjID
child2 = tk.Toplevel()
child2.title("数据包配置")
child2.geometry('392x306+200+110')
child2.grab_set() # 设置组件焦点抓取。使焦点在释放之前永远保持在这个组件上,只能在这个组件上操作
tk.Label(child2, text='发送主机:', font=('黑体', 16)).grid(row=0, column=0, columnspan=2, sticky='w', pady=10)
send_host = StringVar()
host_combobox = ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=send_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
)
host_combobox.grid(row=0, column=1, pady=10)
tk.Label(child2, text='接收主机:', font=('黑体', 16)).grid(row=1, column=0, sticky='w', pady=10) # ,sticky='w'靠左显示
receive_host = StringVar()
ttk.Combobox(
master=child2, # 父容器
height=5, # 高度,下拉显示的条目数量
width=18, # 宽度
state='readonly', # readonly(只可选)
font=('', 16), # 字体
textvariable=receive_host, # 通过StringVar设置可改变的值
values=list(hosts.keys()), # 设置下拉框的选项
).grid(row=1, column=1, pady=10)
tk.Label(child2, text='数据大小(kb):', font=('黑体', 16)).grid(row=2, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
packet_size = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_size.grid(row=2, column=1, pady=10)
tk.Label(child2, text='数据标签:', font=('黑体', 16)).grid(row=3, column=0, sticky='w',
pady=10) # ,sticky='w'靠左显示
packet_label = tk.Entry(child2, font=('黑体', 16), textvariable=tk.StringVar())
packet_label.grid(row=3, column=1, pady=10)
def send():
"""
发送数据包
:return:
"""
if send_host.get() == "" or receive_host.get() == "" or packet_size.get() == "" or packet_label.get() == "":
messagebox.showerror("注意", message="信息不能为空")
return
if send_host.get() == receive_host.get():
messagebox.showerror("注意", message="发送主机和接收主机不能相同!")
send_message = """发送主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}"""
receive_message = """接收主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}"""
send_host_obj = self.AllSimObjs[hosts[send_host.get()]]
receive_host_obj = self.AllSimObjs[hosts[receive_host.get()]]
pack_label = packet_label.get()
pack_size = packet_size.get()
child2.destroy()
count = split_appdata(int(pack_size))
self.show_top_message(send_host_obj,
message=send_message.format(send_host_obj.ObjLabel, send_host_obj.interface[0]["ip"],
send_host_obj.interface[0]["mac"], pack_size, 0, count))
self.show_bottom_message(receive_host_obj, message=receive_message.format(receive_host_obj.ObjLabel,
receive_host_obj.interface[0][
"ip"],
receive_host_obj.interface[0][
"mac"], pack_size, 0, count))
for obj in self.trans_obj:
self.delete(obj.ObjID + "detail_button")
self.delete(obj.ObjID + "detail")
self.unbind(obj.ObjID, "<Button-1>")
send_host_obj.create_packet(receive_host_obj, pack_label, pack_size)
tk.Button(child2, text='开启模拟', font=('黑体', 16), height=1, command=send).grid(row=5, column=0, sticky='e', pady=10)
tk.Button(child2, text='取消', font=('黑体', 16), height=1, command=child2.destroy).grid(row=5, column=1, sticky='e', pady=10)
# 建立画出对象详情的函数
def draw_detail(self, obj):
# 判断type对象来画出详情表
if obj.ObjType == 1: # 当type对象为1 画出主机详情图
app_color = ("#94D050", '#00B0F0')
trans_color = ("#94D050", '#00B0F0')
ip_color = ("#94D050", '#00B0F0')
mac_color = ("#94D050", '#00B0F0')
elif obj.ObjType == 2:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#94D050", '#00B0F0')
mac_color = ("#94D050", '#00B0F0')
elif obj.ObjType == 3:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#D3D3D3", "#D3D3D3")
mac_color = ("#94D050", '#00B0F0')
else:
app_color = ("#D3D3D3", "#D3D3D3")
trans_color = ("#D3D3D3", "#D3D3D3")
ip_color = ("#D3D3D3", "#D3D3D3")
mac_color = ("#94D050", '#00B0F0')
frist_x = obj.ObjX # 获取frist_x
frist_y = obj.ObjY # 获取frist_y
# 这里缺少一个删除其他详情的步骤
# 画出连接详情表的线
tag_id = obj.ObjID + "detail"
if len(self.gettags(tag_id)) > 0:
self.delete(tag_id)
else:
self.create_line((frist_x + 40, frist_y - 20), (frist_x + 80, frist_y - 20),
(frist_x + 90, frist_y - 60),
fill='#50abf8', tags=tag_id, width=2)
# 画出详情表
self.create_image(frist_x + 50, frist_y - 140, image=self.text_back_img, anchor=NW, tags=tag_id)
# 画出相应的绿条数据
self.create_text(frist_x + 70, frist_y - 130, text='应用层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 107, text='传输层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 84, text='IP 层', tags=tag_id)
self.create_text(frist_x + 70, frist_y - 62, text='链路层', tags=tag_id)
# 画出 右侧绿色和蓝色的类进度条
# 应用层
self.create_rectangle(frist_x + 197, frist_y - 135, frist_x + 170, frist_y - 120, fill=app_color[0],
outline=app_color[0], tags=tag_id)
self.create_rectangle(frist_x + 168, frist_y - 135, frist_x + 148, frist_y - 120, fill=app_color[1],
outline=app_color[1], tags=tag_id)
# 传输层
self.create_rectangle(frist_x + 197, frist_y - 115, frist_x + 160, frist_y - 100,
fill=trans_color[0],
outline=trans_color[0], tags=tag_id)
self.create_rectangle(frist_x + 158, frist_y - 115, frist_x + 133, frist_y - 100,
fill=trans_color[1],
outline=trans_color[1], tags=tag_id)
# IP 层
self.create_rectangle(frist_x + 197, frist_y - 93, frist_x + 150, frist_y - 78, fill=ip_color[0],
outline=ip_color[0], tags=tag_id)
self.create_rectangle(frist_x + 148, frist_y - 93, frist_x + 118, frist_y - 78, fill=ip_color[1],
outline=ip_color[1], tags=tag_id)
# 链路层
self.create_rectangle(frist_x + 197, frist_y - 70, frist_x + 133, frist_y - 55, fill=mac_color[0],
outline=mac_color[0], tags=tag_id)
self.create_rectangle(frist_x + 131, frist_y - 70, frist_x + 98, frist_y - 55, fill=mac_color[1],
outline=mac_color[1], tags=tag_id)
def trans_over(self, objs):
print("传输完成")
self.trans_obj = objs
for obj in objs:
obj.create_detail_button()
self.tag_bind(obj.ObjID, "<Button-1>", lambda event, obj=obj: self.draw_detail(obj)) # 绑定右击事件
obj.is_packet = False
obj.is_un_packet = False
def clear_canvas(self):
"""
清除画布
:return:
"""
ask = messagebox.askquestion(title='确认操作', message='确认要清除画布吗?')
if ask == "no":
return
truncate_db() # 清除数据库
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id)
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line()
self.delete("rectangle")
self.AllSimObjs.clear()
self.conns.clear()
def export_data(self):
try:
export = ExportUtil(sys.path[0] + "/database.xlsx")
export.export()
self.message.show_message("文件导出成功!文件位置在{}".format(sys.path[0] + "/database.xlsx"))
except Exception as E:
self.message.show_message(str(E), color="red")
def import_data(self):
truncate_db()
for tag_id, tag in self.AllSimObjs.items():
self.delete(tag_id) # 删除画布中的所有组件
self.delete(tag_id + "text")
for conn in self.conns:
conn.delete_line() # 删除画布中的所有连接线
self.delete("rectangle")
self.AllSimObjs.clear() # 删除当前所有组件对象
self.conns.clear() # 删除所有连接配置
file_path = filedialog.askopenfilename() # 获取用户选择的文件
if file_path:
export = ExportUtil(file_path)
export.import_data()
self.message.show_message("文件导入成功!")
self.reload_data()
def show_top_message(self, obj, message):
# todo: 显示网络配置信息
"""
显示网络配置信息
:return:
"""
self.delete("netSet")
self.create_text(self.width * 0.82 + 120, 80, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="netSet")
self.create_text(self.width * 0.82 + 20, 80 + 25, text=message,
anchor="nw", font=('宋体', 14), tags="netSet")
def show_bottom_message(self, obj, message):
# todo: 显示路由表交换表信息
"""
显示路由交换表信息
:return:
"""
self.delete("routerSet")
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 50, text="(" + obj.ObjLabel + ")", anchor="n", font=('微软雅黑', 14, 'bold'),
fill="#7030a0", tags="routerSet")
self.create_text(self.width * 0.82 + 20, self.canvas_size[3] / 2 + 50 + 25, text=message,
anchor="nw", font=('宋体', 14), tags="routerSet")
def create_config_button(self):
# 创建一个菜单栏,这里我们可以把他理解成一个容器,在窗口的上方
menubar = tk.Menu(root)
# 定义一个空菜单单元
setMenu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label='发送数据包', menu=setMenu)
setMenu.add_command(label='发送数据包', command=self.send_packet)
menubar.add_command(label='清除屏幕', command=self.clear_canvas)
menubar.add_command(label='导出数据', command=self.export_data)
menubar.add_command(label='导入数据', command=self.import_data)
root.config(menu=menubar)
def create_widget(self):
# todo: 创建页面
"""
创建整体页面布局
:return:
"""
self.create_rectangle(self.canvas_size[0], self.canvas_size[1], self.canvas_size[2], self.canvas_size[3], outline="#7f6000", width=0) # 矩形框,左上角坐标,右下角坐标
self.create_image(self.width * 0.82, 30, image=self.label_top_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, 30 + 15, text="发送主机详情", anchor="n", font=('微软雅黑', 18, 'bold'))
self.create_image(self.width * 0.82, self.canvas_size[3] / 2, image=self.label_bottom_img, anchor=NW) # 矩形框,左上角坐标,右下角坐标
# round_rectangle(self, self.width * 0.82, self.canvas_size[3] / 2, self.width * 0.98, self.canvas_size[3] - 30, outline="#ffff00", width=2, fill="#f4b88e") # 矩形框,左上角坐标,右下角坐标
self.create_text(self.width * 0.82 + 120, self.canvas_size[3] / 2 + 15, text="接收主机详情", anchor="n", font=('微软雅黑', 18, 'bold'))
# 显示左边的固定图片
img_height, text_height, split_width = 50, 40, 120
self.create_text(text_height, split_width + 40, text="路由器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width, image=self.router_img, anchor="nw")
self.create_text(text_height, split_width * 2 + 40, text="交换机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 2, image=self.switch_img, anchor="nw")
self.create_text(text_height, split_width * 3 + 40, text="集线器", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 3, image=self.hub_img, anchor="nw")
self.create_text(text_height, split_width * 4 + 40, text="主机", anchor="nw", font=('微软雅黑', 18, 'bold')) # 显示文字
self.create_image(img_height, split_width * 4, image=self.host_img, anchor="nw")
if __name__ == '__main__':
root = Window()
root.title('网络拓扑图')
screen_width = root.winfo_screenwidth() # winfo方法来获取当前电脑屏幕大小
screen_height = root.winfo_screenheight()
root_attr = {
"width": screen_width * 0.83,
"height": screen_height * 0.85,
}
size = '%dx%d+%d+%d' % (root_attr['width'], root_attr['height'], (screen_width - root_attr['width']) / 2,
(screen_height - root_attr['height']) / 2 - 30)
canvas = NetWorkAnalog(root, width=root_attr['width'], heigh=root_attr['height'], bg="white")
canvas.place(x=0, y=0, anchor='nw')
canvas.create_config_button()
canvas.reload_data()
root.geometry(size)
root.mainloop()

@ -0,0 +1,600 @@
import math
import sys
import threading
from time import sleep
from tkinter import Tk
from tkinter import messagebox
from ttkbootstrap import *
from uuid import uuid4
import ipaddress
import time
from PIL import ImageTk, Image, ImageFont
from dbUtil import search
from packet import *
pause_event = threading.Event()
class SimBase():
# todo: 组件父类
"""
图标类所有组件的父类
"""
def __init__(self, canvas, x, y, id, config=None, label=None):
self.ConfigCorrect = 0 if config is None else config # 是否进行配置
self.ObjType = None # 组件类型 1->主机2->路由器3->交换机4->集线器
self.ObjLabel = ""
self.ObjID = id
self.ObjX = x
self.ObjY = y
self.canvas = canvas
self.is_packet = False
self.is_un_packet = False
self.host_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机@2x.png").resize((40, 40)))
self.host_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/主机--暗色@2x.png").resize((40, 40)))
self.router_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器@2x.png").resize((40, 40)))
self.router_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/路由器--暗色@2x.png").resize((40, 40)))
self.switch_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机@2x.png").resize((40, 40)))
self.switch_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/交换机--暗色@2x.png").resize((40, 40)))
self.hub_img = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器@2x.png").resize((40, 40)))
self.hub_img_tm = ImageTk.PhotoImage(
Image.open(sys.path[0] + "/../datas/images/集线器--暗色@2x.png").resize((40, 40)))
self.img = None
self.img_tm = None
self.interface = [{}, {}, {}, {}]
self.connections = ["1", "2", "3", "4"]
self.set_default_config()
self.packet_window = None
def set_default_name(self):
data_frame = search(f"select objid from sim_objs where objType={self.ObjType}")
num = data_frame.size + 1
if isinstance(self, SimHost):
name = "SHO%d" % num
elif isinstance(self, SimRouter):
name = "SRO%d" % num
elif isinstance(self, SimSwitch):
name = "SWI%d" % num
else:
name = "SHUB%d" % num
return name
def create_img(self):
"""
创建图片
:return:
"""
if self.ObjType == 1:
self.img = self.host_img
self.img_tm = self.host_img_tm
elif self.ObjType == 2:
self.img = self.router_img
self.img_tm = self.router_img_tm
elif self.ObjType == 3:
self.img = self.switch_img
self.img_tm = self.switch_img_tm
else:
self.img = self.hub_img
self.img_tm = self.hub_img_tm
self.canvas.delete("L")
id = self.canvas.create_image(self.ObjX - 30, self.ObjY - 30,
image=self.img if self.ConfigCorrect == 1 else self.img_tm, anchor="nw",
tags=self.ObjID)
self.canvas.dtag("L", "L")
self.canvas.create_text(self.ObjX - 20, self.ObjY - 60, text=self.ObjLabel, font=("", 16, "bold"),
fill="#9b78eb", tags=self.ObjID + "text", anchor="nw")
if self.ObjType == 1:
self.canvas.create_text(self.ObjX - 45, self.ObjY + 15, text=self.interface[0]["ip"]if len(self.interface) > 0 else "", font=("", 16, "bold"),
fill="#9b78eb", tags=self.ObjID + "text", anchor="nw")
self.canvas.create_text(self.ObjX - 45, self.ObjY + 35, text=self.interface[0]["mac"]if len(self.interface) > 0 else "", font=("", 16, "bold"),
fill="#9b78eb", tags=self.ObjID + "text", anchor="nw")
self.canvas.create_text(self.ObjX - 45, self.ObjY + 55, text=self.interface[0]["addr"]if len(self.interface) > 0 else "", font=("", 16, "bold"),
fill="#9b78eb", tags=self.ObjID + "text", anchor="nw")
self.canvas.tag_raise(id)
def set_default_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
for index, conn in conn_data.iterrows():
self.interface[int(conn["node_ifs"]) - 1] = {"ip": conn["ip"],
"mac": conn["mac"],
"conn_port": conn["conn_port"],
"addr": conn["addr"]}
def get_config(self):
sql = f"select * from conn_config where node_id='{self.ObjID}'"
conn_data = search(sql)
return conn_data
def transfer_animate(self, status, packet, packet_size, error_message=None):
if status:
text = f"消息大小: {packet_size}\n" \
f"消息内容: {packet}"
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY + 20,
outline="#92d050",
width=3, fill="#92d050", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
else:
text = f"传输失败\n" if error_message is None else error_message
self.canvas.create_rectangle(self.ObjX + 30, self.ObjY - 30, self.ObjX + 160, self.ObjY, outline="red",
width=3, fill="red", tags=self.ObjID + "packetData")
self.canvas.create_text(self.ObjX + 35, self.ObjY - 25, text=text, anchor="nw",
font=('', 10), tags=self.ObjID + "packetData") # 显示文字
self.canvas.update()
sleep(2)
self.canvas.delete(self.ObjID + "packetData") # 删除展示的数据包内容
def create_detail_button(self):
frist_x = self.ObjX # 获取frist_x
frist_y = self.ObjY # 获取frist_y
tag_id = self.ObjID + "detail_button"
self.canvas.create_rectangle(frist_x + 10, frist_y - 25, frist_x + 40, frist_y - 15, fill="#f0a732",
outline="#47b2ff",width=2, tags=tag_id)
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimPacket():
# todo: 数据包类
"""
数据包类
"""
def __init__(self, source_ip, source_mac, destination_ip, destination_mac, message, label, size, sendObj, receiveObj):
"""
:param source_mac: 源主机mac地址
:param source_ip: 源主机ip地址
:param destination_mac: 目的主机mac地址
:param destination_ip: 目的主机ip地址
:param message: 数据
"""
self.source_mac = source_mac
self.source_ip = source_ip
self.sendObj = sendObj
self.receiveObj = receiveObj
self.destination_mac = destination_mac
self.destination_ip = destination_ip
self.message = message
self.label = label
self.size = size
self.up_jump = None # 上一跳连接对象
self.objs = set()
self.img = None
self.id = str(uuid4())
self.process_img()
def process_img(self):
img = Image.open(sys.path[0] + "/../datas/images/packet.png").resize((30, 30))
draw = ImageDraw.Draw(img)
font = ImageFont.truetype("arial.ttf", size=16)
text_width, text_height = draw.textsize(str(self.label), font=font)
# 计算文本在右上角的位置
x = img.width - text_width - 10 # 右上角距离右边缘10像素
y = 10 # 距离上边缘10像素
# 绘制文本
draw.text((x, y), str(self.label), font=font, fill=(255, 0, 0)) # 红色字体
self.img = ImageTk.PhotoImage(img)
def move(self, id, cv, target_x, target_y, duration):
start_x, start_y = cv.coords(id)
distance_x = target_x - start_x
distance_y = target_y - start_y
steps = duration // 10 # 以10毫秒为间隔进行移动
step_x = distance_x / steps
step_y = distance_y / steps
self._move_step(id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps)
def _move_step(self,id, cv, start_x, start_y, target_x, target_y, step_x, step_y, steps):
if pause_event.is_set():
new_x = start_x + step_x
new_y = start_y + step_y
sleep(2)
self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps)
if steps > 0:
new_x = start_x + step_x
new_y = start_y + step_y
cv.coords(id, new_x, new_y)
cv.update() # 更新画布显示
sleep(0.01) # 添加延迟以控制动画速度
self._move_step(id, cv, new_x, new_y, target_x, target_y, step_x, step_y, steps - 1)
else:
cv.coords(id, target_x, target_y)
cv.delete(id)
def transfer_packet(self, cv: Canvas, nodex_tag: SimBase, nodey_tag: SimBase):
self.objs.add(nodex_tag)
self.objs.add(nodey_tag)
cv.create_image(nodex_tag.ObjX - 15, nodex_tag.ObjY - 15, image=self.img, anchor="nw", tags=self.id)
self.move(self.id, cv, nodey_tag.ObjX - 15, nodey_tag.ObjY - 15, 500)
class AllSimConnect():
# todo: 连接类
def __init__(self, canvas, nodex: SimBase, nodex_ifs, nodey: SimBase, nodey_ifs, config=None):
"""
连接对象
:param nodex: 节点
:param nodex_ifs: 节点接口
:param nodey: 节点
:param nodey_ifs: 节点接口
"""
self.canvas = canvas
self.ConfigCorrect = 0 if config is None else config
self.NobjS = nodex
self.NobjE = nodey
self.IfsS = nodex_ifs
self.IfsE = nodey_ifs
self.width = False if self.ConfigCorrect == 0 else True # 线的粗细
def transfer(self, source_node, packet: SimPacket):
"""
传输数据
:param packet: 数据包
:return:
"""
if source_node == self.NobjS:
packet.transfer_packet(self.canvas, self.NobjS, self.NobjE)
self.NobjE.receive(packet)
else:
packet.transfer_packet(self.canvas, self.NobjE, self.NobjS)
self.NobjS.receive(packet)
def draw_line(self):
if self.width:
line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY,
width=2, fill="#c372f0", tags=self.NobjS.ObjID + self.NobjE.ObjID + "line")
else:
line = self.canvas.create_line(self.NobjS.ObjX, self.NobjS.ObjY, self.NobjE.ObjX, self.NobjE.ObjY,
width=2, dash=(250, 4), fill="#c372f0",
tags=self.NobjS.ObjID + self.NobjE.ObjID + "line")
self.canvas.tag_lower(line, 2)
class SimHost(SimBase):
# todo: 主机类
"""
主机类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 1
self.ObjLabel = label if label is not None else self.set_default_name()
self.interface = [{}]
self.connections = [None]
self.set_default_config()
def packet_ok(self, receiveObj, label, size):
count = split_appdata(int(size))
for i in range(2, count + 1):
pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"],
receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], label, i, size, self,
receiveObj)
threading.Timer(1 * (i - 1), function=self.send, args=(pack,), ).start()
def create_packet(self, receiveObj, label, size):
"""
创建数据包
:param receiveObj: 目的主机对象
:param message: 消息
:return:
"""
def send(message):
print(message)
pack = SimPacket(self.interface[0]["ip"], self.interface[0]["mac"],
receiveObj.interface[0]["ip"], receiveObj.interface[0]["mac"], message, 1, size, self,
receiveObj)
self.send(pack)
message = """发送主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已发送数据数量:{}\n\n需要发送的数据包总数:{}"""
self.packet_window = PacketWindow(self.canvas, packet={"packet": True,
"size": int(size),
"sendObj": self,
"receiveObj": receiveObj,
"tag": label,
"send": send,
"mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]})
self.packet_window.title("{} 封包".format(self.ObjLabel))
def send(self, packet):
"""
发送数据包
:param packet:
:return:
"""
connection: AllSimConnect = self.connections[0]
print(f"数据包从 {self.ObjLabel} 发出")
packet.up_jump = connection
connection.transfer(self, packet)
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
data = ""
def receive(message):
data = message
self.is_un_packet = True
packet.sendObj.packet_ok(self, data, int(packet.size))
if not self.is_un_packet:
PacketWindow(self.canvas, {"packet": False,
"size": None,
"sendObj": packet.sendObj,
"receiveObj": self,
"tag": packet.message,
"send": receive,
"mode": [{"app": True},{"trans": True},{"ip": True},{"mac": True}]}, packet_step=["app", "trans", "ip", "mac"][::-1])
size = int(packet.size)
message = """接收主机:{}\n\nIP地址{}\n\nMac地址{}\n\n发送数据包大小:{}\n\n已接收数据数量:{}\n\n需要发送的数据包总数:{}"""
self.canvas.show_bottom_message(self, message=message.format(self.ObjLabel, self.interface[0]["ip"],
self.interface[0]["mac"],
size, packet.label, split_appdata(size)))
if split_appdata(size) == packet.label:
if packet.destination_ip == self.interface[0]["ip"]:
self.transfer_animate(True, packet.message, size)
self.canvas.message.show_message(f"{self.ObjLabel}接收到数据:{data}")
else:
self.transfer_animate(False, data, size)
self.canvas.trans_over(packet.objs)
def __str__(self):
str = ""
config = self.get_config()
for index, data in config.iterrows():
str += f"【接口{data['node_ifs']}\n"
str += f"AppAddr: /Root\n"
str += f"PORT: {data['conn_port']}\n"
str += f"IP: {data['ip']}\n"
str += f"MAC: {data['mac']}\n"
return str
class SimRouter(SimBase):
# todo: 路由类
"""
路由类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 2
self.ObjLabel = label if label is not None else self.set_default_name()
self.router_table = {}
self.set_default_router_table()
def set_default_router_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from router_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.router_table:
self.router_table[router_table["node_ifs"]].append(router_table["segment"])
else:
self.router_table[router_table["node_ifs"]] = [router_table["segment"]]
def check_destination_ip(self, destination_ip, network):
"""
检查目标ip是否属于网段范围内
:param destination_ip: 目标ip
:param network: 网段
:return:10.2.3.0/24
"""
ip = ipaddress.ip_address(destination_ip)
network = ipaddress.ip_network(network)
if ip in network:
return True
if network == "0.0.0.0/24": # 如果网段为 0.0.0.0/24 则为默认路由
return True
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
def transfer():
flag = False
next_hop_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
if conn == packet.up_jump:
continue
ifs = self.connections.index(conn) + 1
if self.router_table.get(ifs) is None:
continue
for network in self.router_table.get(ifs):
if self.check_destination_ip(packet.destination_ip, network):
flag = True
next_hop_ifs = ifs
if flag:
conn = self.connections[next_hop_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
else:
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.NobjS != self:
if conn.NobjE.ObjType == 1:
conn.transfer(self, packet)
break
error_message = "路由寻址失败"
self.transfer_animate(False, packet, error_message)
def receive(message):
packet.message = message
self.is_packet = True
transfer()
if not self.is_packet:
PacketWindow(self.canvas, {"packet": True,
"size": None,
"sendObj": packet.sendObj,
"receiveObj": packet.receiveObj,
"tag": packet.message,
"send": receive,
"mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]},
packet_step=["ip", "mac"])
else:
transfer()
def receive(self, packet):
"""
接收数据
:param packet: 数据包
:return:
"""
def receive(message):
print(message)
self.is_un_packet = True
packet.message = message
sleep(1.5)
self.transmit(packet)
if not self.is_un_packet:
PacketWindow(self.canvas, {"packet": False,
"size": None,
"sendObj": packet.sendObj,
"receiveObj": packet.receiveObj,
"tag": packet.message,
"send": receive,
"mode": [{"app": False}, {"trans": False}, {"ip": True}, {"mac": True}]},
packet_step=["ip", "mac"][::-1])
else:
self.transmit(packet)
class SimSwitch(SimBase):
# todo: 交换机类
"""
交换机类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 3
self.ObjLabel = label if label is not None else self.set_default_name()
self.mac_table = {}
self.set_default_mac_table()
def set_default_mac_table(self):
"""
将数据库中的路由表信息提取
:return:
"""
sql = f"select * from mac_table where obj_id='{self.ObjID}'"
router_tables = search(sql)
for index, router_table in router_tables.iterrows():
if router_table["node_ifs"] in self.mac_table:
self.mac_table[router_table["node_ifs"]].append(router_table["mac"])
else:
self.mac_table[router_table["node_ifs"]] = [router_table["mac"]]
def transmit(self, packet: SimPacket):
"""
转发数据包
:return:
"""
flag = False
next_hub_ifs = None
for conn in self.connections:
if isinstance(conn, AllSimConnect):
if conn.ConfigCorrect == 0:
continue
ifs = self.connections.index(conn) + 1
if packet.destination_mac in self.mac_table.get(ifs, []):
flag = True
next_hub_ifs = ifs
if flag:
conn = self.connections[next_hub_ifs - 1]
packet.up_jump = conn
conn.transfer(self, packet)
return
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"交换机{self.ObjLabel}接受到数据{packet.message}")
self.transmit(packet)
class SimHub(SimBase):
"""
集线器类
"""
def __init__(self, canvas, x, y, id=None, config=None, label=None, *args):
self.ObjID = str(uuid4()) if id is None else id
super().__init__(canvas, x, y, self.ObjID, config, label)
self.ObjType = 4
self.ObjLabel = label if label is not None else self.set_default_name()
def transmit(self, packet: SimPacket):
"""
集线器转发数据包
:return:
"""
for conn in self.connections: # 将数据包往所有接口进行转发
if isinstance(conn, AllSimConnect):
if conn == packet.up_jump:
continue
if conn.ConfigCorrect == 0:
continue
new_packet = SimPacket(packet.source_ip,
packet.source_mac,
packet.destination_ip,
packet.destination_mac,
packet.message)
new_packet.up_jump = conn
threading.Thread(target=conn.transfer, args=(self, new_packet)).start()
def receive(self, packet: SimPacket):
"""
接收数据
:param packet: 数据包
:return:
"""
print(f"集线器-{self.ObjLabel}接受到数据,将进行转发!")
self.transmit(packet)

@ -0,0 +1,267 @@
import ipaddress
import re
import sqlite3
from tkinter import messagebox
import ttkbootstrap as tk
from ttkbootstrap import *
from ttkbootstrap import ttk
import pandas as pd
from SimObjs import SimRouter
def round_rectangle(cv, x1, y1, x2, y2, radius=30, **kwargs):
"""
绘制圆角矩形
:param cv: canvas对象
:param radius: 圆角值
:return:
"""
points = [x1 + radius, y1,
x1 + radius, y1,
x2 - radius, y1,
x2 - radius, y1,
x2, y1,
x2, y1 + radius,
x2, y1 + radius,
x2, y2 - radius,
x2, y2 - radius,
x2, y2,
x2 - radius, y2,
x2 - radius, y2,
x1 + radius, y2,
x1 + radius, y2,
x1, y2,
x1, y2 - radius,
x1, y2 - radius,
x1, y1 + radius,
x1, y1 + radius,
x1, y1]
return cv.create_polygon(points, **kwargs, smooth=True)
def validate_ip_address(ip_address):
"""
匹配ip地址格式是否规范
:param ip_address: IP地址
:return: Boolean
"""
# 定义IP地址的正则表达式模式
pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})$'
pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$'
# 使用re模块进行匹配
match_with_subnet = re.match(pattern_with_subnet, ip_address)
match_without_subnet = re.match(pattern_without_subnet, ip_address)
if match_with_subnet:
# 带有子网掩码的IP地址
# 检查每个组件的取值范围是否在 0-255 之间
for group in match_with_subnet.groups()[:4]:
if not (0 <= int(group) <= 255):
return False
# 检查子网掩码的取值范围是否在 0-32 之间
subnet_mask = int(match_with_subnet.groups()[4])
if not (0 <= subnet_mask <= 32):
return False
return True
elif match_without_subnet:
# 不带子网掩码的IP地址
# 检查每个组件的取值范围是否在 0-255 之间
for group in match_without_subnet.groups():
if not (0 <= int(group) <= 255):
return False
return True
else:
# IP地址格式不正确
return False
class ExportUtil():
def __init__(self, path):
self.conn = sqlite3.connect('./network.db')
self.path = path
def get_table_names(self):
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") # 如果你使用SQLite数据库
tables = cursor.fetchall()
cursor.close()
return [table[0] for table in tables]
def export(self):
tables = self.get_table_names()
with pd.ExcelWriter(self.path, engine='openpyxl') as writer:
for table in tables:
table_name = table
# a. 从数据库中获取表的数据并存储在DataFrame中
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, self.conn)
# b. 使用Pandas将数据写入Excel文件的不同sheet中
df.to_excel(writer, sheet_name=table_name, index=False)
def import_data(self):
excel_file = pd.ExcelFile(self.path)
sheet_names = excel_file.sheet_names
cursor = self.conn.cursor()
for sheet_name in sheet_names:
# 4. 使用 Pandas 读取工作表数据
df = pd.read_excel(excel_file, sheet_name=sheet_name)
# 5. 获取工作表的列名
columns = df.columns.tolist()
# 6. 构造插入语句
columns_str = ', '.join(columns)
placeholders = ', '.join(['?' for _ in range(len(columns))])
sql = f"INSERT INTO {sheet_name} ({columns_str}) VALUES ({placeholders})"
# 7. 将数据插入数据库
for index, row in df.iterrows():
# 8. 使用动态生成的 SQL 语句将数据插入数据库
cursor.execute(sql, tuple(row))
self.conn.commit()
class RouterConfigWindow(tk.Toplevel):
def __init__(self, parent, router_obj):
super().__init__(parent)
self.geometry("435x433+350+200")
self.title(f"{router_obj.ObjLabel}路由表配置")
self.router_obj = router_obj
self.interface_entries = []
self.router_table = {}
self.create_interface_inputs()
self.create_router_table()
def create_interface_inputs(self):
label_text = ["接口1", "接口2", "接口3", "接口4"]
for i in range(4):
label = tk.Label(self, text=label_text[i], font=("黑体", 16))
label.grid(row=i, column=0, padx=10, pady=5, sticky="w")
entry = tk.Entry(self, width=20, font=("黑体", 16),)
entry.grid(row=i, column=1, padx=10, pady=5, sticky="w")
self.interface_entries.append(entry)
button = tk.Button(self, text="添加", font=("黑体", 16), command=lambda index=i: self.add_router_entry(index))
button.grid(row=i, column=2, padx=10, pady=5)
lab = LabelFrame(self, text="示例")
lab.grid(row=4, column=0, columnspan=3, sticky=W, padx=20)
Label(lab, text="10.1.2.0/24 或者 10.1.2.12" if self.router_obj.ObjType == 2 else "MAC11", font=("黑体", 16)).pack()
def create_router_table(self):
def on_right_click(event):
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
if row:
self.router_treeview.selection_set(row) # 选中该行
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
def delete_row():
selected_items = self.router_treeview.selection() # 获取选中的行
for item in selected_items:
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
self.router_obj.delete_config(ifs, network)
self.router_treeview.delete(item)
self.router_table_frame = tk.Frame(self)
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
style = ttk.Style()
style.configure("Custom.Treeview.Heading", font=("宋体", 15))
style.configure("Custom.Treeview", rowheight=30, font=("宋体", 15))
self.router_treeview = ttk.Treeview(self.router_table_frame, style="Custom.Treeview", columns=("Interface", "Route"), show="headings")
self.router_treeview.heading("Interface", text="接口")
self.router_treeview.heading("Route", text="网段")
self.router_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
scrollbar.pack(side="right", fill="y")
self.router_treeview.configure(yscrollcommand=scrollbar.set)
self.router_table = self.router_obj.router_table
self.router_treeview.bind("<Button-3>", on_right_click)
# 创建删除菜单
delete_menu = tk.Menu(self.master, tearoff=False)
delete_menu.add_command(label="删除", command=delete_row)
self.update_router_table()
def add_router_entry(self, index):
entry_text = self.interface_entries[index].get()
try:
ipaddress.ip_network(entry_text)
if isinstance(self.router_obj, SimRouter):
if not validate_ip_address(entry_text):
messagebox.showerror("注意", message="添加的网段信息格式不合格")
self.interface_entries[index].delete(0, tk.END)
self.focus_set()
return
if entry_text:
if index + 1 in self.router_table:
self.router_table[index + 1].append(entry_text)
else:
self.router_table[index + 1] = [entry_text]
self.interface_entries[index].delete(0, tk.END)
self.router_obj.add_config(entry_text, index + 1)
self.update_router_table()
self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!")
except:
messagebox.showerror("注意", message="网段格式错误!网段示例如下:\n10.1.2.0/24\n10.1.2.12")
return
def update_router_table(self):
self.router_treeview.delete(*self.router_treeview.get_children())
for i, entrys in self.router_table.items():
for entry in entrys:
self.router_treeview.insert("", "end", values=(f"接口{i}", entry))
class SwitchConfigWindow(RouterConfigWindow):
def __init__(self, parent, router_obj):
super().__init__(parent, router_obj)
self.geometry("435x433+350+200")
self.title(f"{router_obj.ObjLabel}交换表配置")
self.router_obj = router_obj
self.interface_entries = []
self.router_table = {}
self.create_interface_inputs()
self.create_router_table()
def create_router_table(self):
def on_right_click(event):
row = self.router_treeview.identify_row(event.y) # 获取鼠标位置的行索引
if row:
self.router_treeview.selection_set(row) # 选中该行
delete_menu.post(event.x_root, event.y_root) # 在鼠标位置弹出删除菜单
def delete_row():
selected_items = self.router_treeview.selection() # 获取选中的行
for item in selected_items:
ifs, network = int(self.router_treeview.item(item)["values"][0][-1:]), self.router_treeview.item(item)["values"][1]
self.router_obj.delete_config(ifs, network)
self.router_treeview.delete(item)
self.router_table_frame = tk.Frame(self)
self.router_table_frame.grid(row=5, column=0, columnspan=3, padx=10, pady=5)
self.router_treeview = ttk.Treeview(self.router_table_frame, columns=("Interface", "Route"), show="headings")
self.router_treeview.heading("Interface", text="接口")
self.router_treeview.heading("Route", text="mac")
self.router_treeview.pack(side="left", fill="both")
scrollbar = ttk.Scrollbar(self.router_table_frame, orient="vertical", command=self.router_treeview.yview)
scrollbar.pack(side="right", fill="y")
self.router_treeview.configure(yscrollcommand=scrollbar.set)
self.router_table = self.router_obj.mac_table
self.router_treeview.bind("<Button-3>", on_right_click)
# 创建删除菜单
delete_menu = tk.Menu(self.master, tearoff=False)
delete_menu.add_command(label="删除", command=delete_row)
self.update_router_table()
def add_router_entry(self, index):
entry_text = self.interface_entries[index].get()
if isinstance(self.router_obj, SimRouter):
if not validate_ip_address(entry_text):
messagebox.showerror("注意", message="添加的网段信息格式不合格")
self.interface_entries[index].delete(0, tk.END)
self.focus_set()
return
if entry_text:
if index + 1 in self.router_table:
self.router_table[index + 1].append(entry_text)
else:
self.router_table[index + 1] = [entry_text]
self.interface_entries[index].delete(0, tk.END)
self.router_obj.add_config(entry_text, index + 1)
self.master.message.show_message(f"{self.router_obj.ObjLabel}添加配置{entry_text}成功!")
self.update_router_table()

Binary file not shown.

@ -0,0 +1,104 @@
import sqlite3
import sys
import pandas as pd
from pandas import DataFrame
conn = sqlite3.connect(sys.path[0]+"/network.db")
def execute_sql(sql):
"""
执行sql语句
:param sql:
:return:
"""
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
def search(sql) -> DataFrame:
return pd.read_sql(sql, conn)
def delete_obj(obj_id):
cursor = conn.cursor()
delete_obj_sql = f"delete from sim_objs where ObjID='{obj_id}'"
cursor.execute(delete_obj_sql)
delete_conn_sql = f"delete from sim_conn where conn_id in (select conn_id from conn_config where node_id='{obj_id}')"
cursor.execute(delete_conn_sql)
conn.commit()
def truncate_db():
init_database()
def init_database():
cursor = conn.cursor()
cursor.execute("""
DROP TABLE IF EXISTS `conn_config`;
""")
cursor.execute("""
CREATE TABLE `conn_config` (
`conn_id` varchar(55) NULL DEFAULT NULL,
`node_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`ip` varchar(55) NULL DEFAULT NULL,
`mac` varchar(128) NULL DEFAULT NULL,
`conn_port` varchar(32) NULL DEFAULT NULL,
`addr` varchar(255) NULL DEFAULT NULL,
CONSTRAINT `conn_config_sim_conn_conn_id_fk` FOREIGN KEY (`conn_id`) REFERENCES `sim_conn` (`conn_id`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `mac_table`;
""")
cursor.execute("""
CREATE TABLE `mac_table` (
`obj_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`mac` varchar(55) NULL DEFAULT NULL,
CONSTRAINT `mac_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `router_table`;
""")
cursor.execute("""
CREATE TABLE `router_table` (
`obj_id` varchar(55) NULL DEFAULT NULL,
`node_ifs` int(0) NULL DEFAULT NULL,
`segment` varchar(55) NULL DEFAULT NULL,
CONSTRAINT `router_table_sim_objs_ObjID_fk` FOREIGN KEY (`obj_id`) REFERENCES `sim_objs` (`ObjID`) ON DELETE CASCADE ON UPDATE CASCADE
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `sim_conn`;
""")
cursor.execute("""
CREATE TABLE `sim_conn` (
`conn_id` varchar(255) NOT NULL ,
`ConfigCorrect` int(0) NULL DEFAULT NULL ,
PRIMARY KEY (`conn_id`)
) ;
""")
cursor.execute("""
DROP TABLE IF EXISTS `sim_objs`;
""")
cursor.execute("""
CREATE TABLE `sim_objs` (
`ObjID` varchar(50) NOT NULL,
`ObjType` int(0) NULL DEFAULT NULL,
`ObjLabel` varchar(20) NULL DEFAULT NULL,
`ObjX` int(0) NULL DEFAULT NULL,
`ObjY` int(0) NULL DEFAULT NULL,
`ConfigCorrect` int(0) NULL DEFAULT NULL,
PRIMARY KEY (`ObjID`)
) ;
""")
conn.commit()
if __name__ == '__main__':
init_database()

Binary file not shown.

@ -0,0 +1,433 @@
import time
from SimObjs import *
from tkinter import *
from tkinter import messagebox
from PIL import ImageTk, Image
from AgreementUtil import AgreementUtil
def create_label(canvas: Canvas, x, y, width, height, text, rect_color, font_color, tag):
canvas.create_rectangle(x, y, x + width, y + height, fill=rect_color, tags=tag)
canvas.create_text(x + width / 2, y + height / 2, text=text, fill=font_color, tags=tag)
return tag
class PacketWindow(Toplevel):
def __init__(self, master, packet, packet_step=["app", "trans", "ip", "mac"], *args, **kwargs):
"""
packet{"packet": True, "mode": {"app": True, "trans": True, "ip": True, "mac": True}}
"""
super().__init__(master, *args, **kwargs)
self.master = master
self.width = 300
self.height = 400
self.geometry(f"{self.width}x{self.height}+200+200")
self.attributes("-topmost", True)
self.resizable(width=False, height=False)
self.packet_button_str = {"app": "应用层", "ip": "网络层", "mac": "链路层", "trans": "传输层"}
if not packet["packet"]:
packet["mode"] = packet["mode"][::-1] # True 为封包 False 为解包
self.packet_step = packet_step
self.step = 0
self.packet: dict[str: bool] = packet
self.sendObj: SimHost = packet["sendObj"]
self.receiveObj: SimHost = packet["receiveObj"]
self.packet_option = {"app": {"color": "#ff3b26", "command": self.app_packet if packet["packet"] else self.app_unpack},
"ip": {"color": "#00a2f2", "command": self.ip_packet if packet["packet"] else self.ip_unpack},
"mac": {"color": "#46707a", "command": self.mac_packet if packet["packet"] else self.mac_unpack},
"trans": {"color": "#710a00", "command": self.trans_packet if packet["packet"] else self.trans_unpack}}
self.canvas = Canvas(self, width=self.width, height=self.height)
self.canvas.place(x=0, y=0, anchor='nw')
self.packet_height, self.packet_init_width = 25, 100
self.background_img = ImageTk.PhotoImage(Image.open("../datas/images/背景@2x.png").resize((self.width, self.height)))
self.canvas.create_image(0, 0, image=self.background_img, anchor=NW)
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, 10, width=self.packet_init_width, height=self.packet_height, text=self.packet["tag"], rect_color="#dee1e6", font_color="black", tag="packet")
self.step_y = {}
self.message = packet["tag"]
self.create_widget()
def move_to(self, tag, target_x, target_y, speed=10):
# 获取当前位置
current_coords = self.canvas.coords(tag)
if len(current_coords) < 2:
return # 如果没有坐标,就退出函数
current_x, current_y = current_coords[0], current_coords[1]
# 计算移动方向
delta_x = (target_x if target_x else current_x) - current_x
delta_y = (target_y if target_y else current_y) - current_y
# 计算下一步的位置
next_x = current_x + (speed if delta_x > 0 else -speed) if abs(delta_x) > speed else (target_x if target_x else current_x)
next_y = current_y + (speed if delta_y > 0 else -speed) if abs(delta_y) > speed else (target_y if target_y else current_y)
# 移动对象
self.canvas.move(tag, next_x - current_x, next_y - current_y)
# 如果对象还没有到达目标,继续移动
if next_x != (target_x if target_x else current_x) or next_y != (target_y if target_y else current_y):
self.canvas.after(10, lambda: self.move_to(tag, target_x, target_y, speed))
def create_widget(self):
mode_text = "封包" if self.packet["packet"] else "解包"
num, margin_top, button_width, button_height = 1, 30, 120, 40
for data in self.packet["mode"]:
key = list(data.keys())[0]
value = list(data.values())[0]
if value:
Button(self, text=self.packet_button_str[key] + mode_text, command=self.packet_option[key]["command"],
font=("", 16)).place(x=40, y=30 + (margin_top + button_height) * (num - 1),
width=button_width, height=button_height)
num += 1
Button(self, text="发送", command=self.send_packet, font=("", 16)).place(x=self.width - 60, y=self.height - 40, anchor=NW)
Button(self, text="取消", command=self.destroy, font=("", 16)).place(x=self.width - 120, y=self.height - 40, anchor=NW)
def send_packet(self):
if self.step != len(self.packet_step):
messagebox.showerror("注意", "尚未完成{}".format("封包" if self.packet["packet"] else "解包"))
return
self.packet["send"](self.message)
self.destroy()
def create_window(self, option):
toplevel = Toplevel(self)
toplevel.title(option["title"])
toplevel.geometry("450x220+300+300")
for entry_option in option["entry"]:
index = option["entry"].index(entry_option)
key = list(entry_option.keys())[0]
value = list(entry_option.values())[0]
Label(toplevel, text=key, font=("", 16)).grid(row=index, column=0, padx=20, pady=10)
if value is None:
continue
Entry(toplevel, textvariable=value, font=("", 16), width=15).grid(row=index, column=1, pady=10)
Button(toplevel, text="提交", command=option["command"], font=("", 16)).place(x=450 - 60, y=220 - 40, anchor=NW)
Button(toplevel, text="取消", command=toplevel.destroy, font=("", 16)).place(x=450 - 120, y=220 - 40, anchor=NW)
toplevel.attributes("-topmost", True)
return toplevel
def app_packet(self):
"""
应用层封包
"""
if self.packet_step[self.step] != "app":
messagebox.showerror("注意", "封包顺序出错!")
return
SourceAppAddr = StringVar()
TargetAppAddr = StringVar()
def packet():
toplevel.destroy()
source_app_addr = SourceAppAddr.get()
target_app_addr = TargetAppAddr.get()
if source_app_addr == self.sendObj.interface[0]["addr"] \
and target_app_addr == self.receiveObj.interface[0]["addr"]:
# 动画
# app_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["app"]), 50, 25, "AH", "#ff3b26", "white", tag="app")
# self.move_to(self.packet_rect, None, int(self.step_y["app"]), speed=1)
# self.move_to(app_rect, self.width / 2 + self.packet_init_width / 2, int(self.step_y["app"]), speed=1)
self.message = target_app_addr + "&" + self.packet["tag"]
self.master.message.show_message("应用层封包成功!数据包如下: \n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
toplevel = self.create_window({"title": "应用层封包", "entry": [{"发送主机应用层地址:": SourceAppAddr}, {"接收主机应用层地址:": TargetAppAddr}],
"command": packet})
def app_unpack(self):
"""
应用层解包
"""
if self.packet_step[self.step] != "app":
messagebox.showerror("注意", "解包顺序出错!")
return
TargetAppAddr = StringVar()
def packet():
toplevel.destroy()
target_app_addr = TargetAppAddr.get()
if target_app_addr == self.receiveObj.interface[0]["addr"]:
self.message = self.message.split("&")[1]
self.master.message.show_message("应用层解包成功!数据包如下: \n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "应用层地址填写有误,请仔细检查!")
toplevel = self.create_window({"title": "应用层解包", "entry": [{"接收主机应用层地址:": TargetAppAddr}],
"command": packet})
def trans_packet(self):
"""
传输层封包
"""
if self.packet_step[self.step] != "trans":
messagebox.showerror("注意", "封包顺序出错!")
return
SentPort = StringVar()
RcptPort = StringVar()
SplitCount = IntVar()
def packet():
sent_port = SentPort.get()
rcpt_port = RcptPort.get()
split_count = SplitCount.get()
count = split_appdata(self.packet["size"])
print(sent_port, self.sendObj.interface[0]["conn_port"])
if sent_port == self.sendObj.interface[0]["conn_port"] \
and rcpt_port == self.receiveObj.interface[0]["conn_port"]\
and split_count == count:
toplevel.destroy()
# 动画
# self.canvas.delete(self.packet_rect)
# self.canvas.delete("app")
# self.packet_rect = create_label(self.canvas, int(self.width / 2) - self.packet_init_width / 2, int(self.step_y["app"]),
# width=self.packet_init_width + 50, height=self.packet_height,
# text="A-" + self.packet["tag"], rect_color="#dee1e6", font_color="black",
# tag="packet")
# trans_rect = create_label(self.canvas, self.width / 2 + 200, int(self.step_y["trans"]), 50, 25, "PH", "#ff3b26",
# "white", tag="trans")
# self.move_to(self.packet_rect, None, int(self.step_y["trans"]), speed=1)
# self.move_to(trans_rect, self.width / 2 + self.packet_init_width / 2 + 50, int(self.step_y["trans"]), speed=1)
self.message = AgreementUtil.create_udp_packet(self.message, int(sent_port), int(rcpt_port))
self.master.message.show_message("传输层封包成功!数据包如下:\n{}".format(str([str(i + 1) + "-" + str(self.message) for i in range(split_count)])))
self.step += 1
else:
messagebox.showerror("提示", "传输层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "传输层封包", "entry": [{"发送主机端口:": SentPort},
{"接收主机端口:": RcptPort},
{"拆包数量": SplitCount},
{"每个包约2048": None}],
"command": packet})
def trans_unpack(self):
"""
传输层解包
"""
print(self.packet_step)
print(self.step)
if self.packet_step[self.step] != "trans":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptPort = StringVar()
def packet():
rcpt_port = RcptPort.get()
if rcpt_port == self.receiveObj.interface[0]["conn_port"]:
toplevel.destroy()
data = AgreementUtil.parse_udp_packet(self.message)
print(data)
self.message = data[-1]
self.master.message.show_message("传输层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "传输层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "传输层解包", "entry": [{"接收主机端口:": RcptPort}],
"command": packet})
def ip_packet(self):
"""
网络层封包
"""
if self.packet_step[self.step] != "ip":
messagebox.showerror("注意", "封包顺序出错!")
return
SourceIP = StringVar()
TargetIP = StringVar()
def packet():
source_ip = SourceIP.get()
target_ip = TargetIP.get()
if source_ip == self.sendObj.interface[0]["ip"] \
and target_ip == self.receiveObj.interface[0]["ip"]:
toplevel.destroy()
self.message = AgreementUtil.create_ip_packet(self.message, source_ip, target_ip)
self.master.message.show_message("网络层封包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "网络层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "网络层封包", "entry": [{"发送主机IP": SourceIP},
{"接收主机IP": TargetIP}],
"command": packet})
def ip_unpack(self):
"""
网络层解包
"""
print(self.packet_step)
print(self.step)
if self.packet_step[self.step] != "ip":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptIP = StringVar()
def packet():
rcpt_ip = RcptIP.get()
if rcpt_ip == self.receiveObj.interface[0]["ip"]:
toplevel.destroy()
data = AgreementUtil.parse_ip_packet(self.message)
print(data)
self.message = data[-1]
self.master.message.show_message("网络层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "网络层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "网络层解包", "entry": [{"接收主机IP": RcptIP}],
"command": packet})
def mac_packet(self):
"""
链路层封包
"""
if self.packet_step[self.step] != "mac":
messagebox.showerror("注意", "封包顺序出错!")
return
SentMac = StringVar()
RcptMac = StringVar()
def packet():
sent_mac = SentMac.get()
rcpt_mac = RcptMac.get()
if sent_mac == self.sendObj.interface[0]["mac"] \
and rcpt_mac == self.receiveObj.interface[0]["mac"]:
toplevel.destroy()
self.message = AgreementUtil.create_ethernet_frame(self.message, sent_mac, rcpt_mac)
self.master.message.show_message("链路层封包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "链路层封包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "链路层封包", "entry": [{"发送主机MAC": SentMac},
{"接收主机MAC": RcptMac}],
"command": packet})
def mac_unpack(self):
"""
链路层解包
"""
if self.packet_step[self.step] != "mac":
messagebox.showerror("注意", "解包顺序出错!")
return
RcptMac = StringVar()
def packet():
rcpt_mac = RcptMac.get()
if rcpt_mac == self.receiveObj.interface[0]["mac"]:
toplevel.destroy()
data = AgreementUtil.parse_ethernet_frame(self.packet["tag"])
print(data)
self.message = data[-1]
self.master.message.show_message("链路层解包成功!数据包如下:\n{}".format(self.message))
self.step += 1
else:
messagebox.showerror("提示", "链路层解包信息填写有误,请仔细检查!")
toplevel = self.create_window({"title": "链路层解包", "entry": [{"接收主机MAC": RcptMac}],
"command": packet})
class AppData:
def __init__(self, obj_id, id_key, app_pack_id, app_pack_size, app_pack_tag, source_app_addr, target_app_addr, app_packed_string, timestamp, canvas):
self.obj_id = obj_id
self.id_key = id_key
self.app_pack_id = app_pack_id
self.app_pack_size = app_pack_size
self.app_pack_tag = app_pack_tag
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.app_packed_string = app_packed_string
self.timestamp = timestamp
def pack(self):
# 为了简化,我们打包成一个字典
return vars(self)
@staticmethod
def unpack(packed_data):
# 解包为AppData对象
return AppData(**packed_data)
class TransData:
def __init__(self, obj_id, id_key, trans_pack_id, trans_seq, trans_tag, app_pack_id, sent_port, rcpt_port, source_app_addr, target_app_addr, trans_packed_string, timestamp):
self.obj_id = obj_id
self.id_key = id_key
self.trans_pack_id = trans_pack_id
self.trans_seq = trans_seq
self.trans_tag = trans_tag
self.app_pack_id = app_pack_id
self.sent_port = sent_port
self.rcpt_port = rcpt_port
self.source_app_addr = source_app_addr
self.target_app_addr = target_app_addr
self.trans_packed_string = trans_packed_string
self.timestamp = timestamp
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return TransData(**packed_data)
class IPData:
def __init__(self, obj_id, id_key, ip_pack_id, trans_pack_id, source_ip, target_ip, source_app_addr, target_app_addr, ip_packed_string):
self.ObjID = obj_id
self.IDkey = id_key
self.IPPackID = ip_pack_id
self.TransPackID = trans_pack_id
self.SourceIP = source_ip
self.TargetIP = target_ip
self.SourceAppAddr = source_app_addr
self.TargetAppAddr = target_app_addr
self.IPPackedString = ip_packed_string
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return IPData(**packed_data)
class MACData:
def __init__(self, obj_id, id_key, mac_pack_id, ip_pack_id, sent_mac, rcpt_mac, source_ip, target_ip, source_app_addr, target_app_addr, mac_packed_string):
self.ObjID = obj_id
self.IDkey = id_key
self.MacPackID = mac_pack_id
self.IPPackID = ip_pack_id
self.SentMAC = sent_mac
self.RcptMAC = rcpt_mac
self.SourceIP = source_ip
self.TargetIP = target_ip
self.SourceAppAddr = source_app_addr
self.TargetAppAddr = target_app_addr
self.MacPackedString = mac_packed_string
def pack(self):
return vars(self)
@staticmethod
def unpack(packed_data):
return MACData(**packed_data)
def split_appdata(AppPackSize):
MTU = 2048
return AppPackSize // MTU + (AppPackSize % MTU > 0)
if __name__ == '__main__':
# 假设的最大传输单元MTU
MTU = 2048 # bytes
# 模拟的数据和数据大小
SIMULATED_DATA = "hello"
SIMULATED_SIZE = 2049 # 10 MB
# 创建应用层数据包
app_packet = AppData("123", "123", "app1", SIMULATED_SIZE, "DATA", "192.0.2.1", "198.51.100.1", SIMULATED_DATA, time.time())
packed_app_data = app_packet.pack()
print(packed_app_data)

@ -0,0 +1,9 @@
import random
def random_mac():
# 生成六组两位的十六进制数
return ":".join(["%02x" % random.randint(0, 255) for _ in range(6)])
# 生成MAC地址
mac_address = random_mac()
print(mac_address)
Loading…
Cancel
Save