|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
from NetworkAnalog.SimObjs import SimBase, AllSimConnect, SimRouter, SimHost
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SimPacket(): # 需要补充至编程13.7当中
|
|
|
|
|
def __init__(self, source_ip, source_mac, destination_ip, message=None, ping=False):
|
|
|
|
|
"""
|
|
|
|
|
:param source_mac: 源主机mac地址
|
|
|
|
|
:param source_ip: 源主机ip地址
|
|
|
|
|
:param destination_mac: 目的主机mac地址
|
|
|
|
|
:param destination_ip: 目的主机ip地址
|
|
|
|
|
:param message: 数据
|
|
|
|
|
"""
|
|
|
|
|
self.source_mac = source_mac
|
|
|
|
|
self.source_ip = source_ip
|
|
|
|
|
self.destination_ip = destination_ip
|
|
|
|
|
self.message = message
|
|
|
|
|
self.up_jump = None # 上一跳连接对象
|
|
|
|
|
self.ping = ping
|
|
|
|
|
self.flag = False # 是否传输成功
|
|
|
|
|
|
|
|
|
|
class SimHost(SimBase):
|
|
|
|
|
def receive(self, packet: SimPacket): # 修改编程13.7中SimHost的receive方法
|
|
|
|
|
# 判断目的IP是否与本机IP一致,以及判断当前主机是否联网
|
|
|
|
|
if packet.destination_ip == self.interface[0]["ip"] and self.network_online:
|
|
|
|
|
print(f"主机{self.ObjLabel}接受到数据{packet.message}")
|
|
|
|
|
packet.flag = True # 将packet的传输状态改为True,视为传输成功
|
|
|
|
|
else:
|
|
|
|
|
print(f"数据传输失败")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_ip_address(ip_address):# 定义IP地址的正则表达式模式
|
|
|
|
|
pattern_with_subnet = r'^(\d{1,3})\.(\d{1,3})\.(' \
|
|
|
|
|
r'\d{1,3})\.(\d{1,3})/(\d{1,2})$'
|
|
|
|
|
pattern_without_subnet = r'^(\d{1,3})\.(\d{1,3})' \
|
|
|
|
|
r'\.(\d{1,3})\.(\d{1,3})$'
|
|
|
|
|
match_with_subnet = re.match(pattern_with_subnet,
|
|
|
|
|
ip_address) # 使用re模块进行匹配
|
|
|
|
|
match_without_subnet = re.match(pattern_without_subnet, ip_address)
|
|
|
|
|
if match_with_subnet: # 带有子网掩码的IP地址
|
|
|
|
|
# 检查每个组件的取值范围是否在 0-255 之间
|
|
|
|
|
for group in match_with_subnet.groups()[:4]:
|
|
|
|
|
if not (0 <= int(group) <= 255):
|
|
|
|
|
return False
|
|
|
|
|
subnet_mask = int(match_with_subnet.groups()[4])
|
|
|
|
|
# 检查子网掩码的取值范围是否在 0-32 之间
|
|
|
|
|
if not (0 <= subnet_mask <= 32):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
elif match_without_subnet: # 不带子网掩码的IP地址
|
|
|
|
|
for group in match_without_subnet.groups():
|
|
|
|
|
if not (0 <= int(group) <= 255):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False # IP地址格式不正确
|
|
|
|
|
|
|
|
|
|
def is_same_subnet(ip1, ip2): # 判断两个IP地址是否属于同一网段
|
|
|
|
|
# 将IP地址和子网掩码转换为整数列表
|
|
|
|
|
ip1_parts = [int(part) for part in ip1.split('.')]
|
|
|
|
|
ip2_parts = [int(part) for part in ip2.split('.')]
|
|
|
|
|
subnet_mask_parts = [int(part) for part in "255.255.255.0".split('.')]
|
|
|
|
|
# 计算网络地址
|
|
|
|
|
network1 = [ip1_parts[i] & subnet_mask_parts[i] for i in range(4)]
|
|
|
|
|
network2 = [ip2_parts[i] & subnet_mask_parts[i] for i in range(4)]
|
|
|
|
|
# 比较网络地址是否相同
|
|
|
|
|
return network1 == network2
|
|
|
|
|
|
|
|
|
|
class NetworkAnalog():
|
|
|
|
|
...
|
|
|
|
|
def check_network(self): # 判断各网络节点是否配置正确
|
|
|
|
|
flag = True
|
|
|
|
|
sim_host_list = []
|
|
|
|
|
for key, tag in self.AllSimObjs.items(): # 遍历所有组件
|
|
|
|
|
# todo: 判断主机的ip配置是否都合格
|
|
|
|
|
if tag.ObjType == 1: # 如果该组件为主机
|
|
|
|
|
for interface in tag.interface: # 遍历该主机下所有接口
|
|
|
|
|
index = tag.interface.index(interface) + 1 # 获取当前接口号
|
|
|
|
|
ip_ = interface["ip"] # 获取当前接口IP
|
|
|
|
|
if not validate_ip_address(ip_): # 判断主机配置的IP是否正确
|
|
|
|
|
self.message.show_message(f"主机{tag.ObjLabel}的接口{index} IP 配置错误")
|
|
|
|
|
flag = False
|
|
|
|
|
sim_host_list.append(tag) # 将所有主机组件添加到列表中
|
|
|
|
|
# todo: 判断路由器的四个接口是否都在同一网段下
|
|
|
|
|
if tag.ObjType == 2: # 如果当前组件为路由器
|
|
|
|
|
ip_ = tag.interface[0]["ip"] # 将第一个接口设为默认
|
|
|
|
|
for interface in tag.interface: # 遍历所有接口
|
|
|
|
|
index = tag.interface.index(interface) + 1 # 获取当前接口号
|
|
|
|
|
if not is_same_subnet(ip_, interface["ip"]): # 判断所有接口是否在同一网段下
|
|
|
|
|
self.message.show_message(f"路由器{tag.ObjLabel}中接口{index}不在同一网段下")
|
|
|
|
|
flag = False
|
|
|
|
|
# todo: 检查路由表是否能跳转到下一跳
|
|
|
|
|
if tag.ObjType == 2: # 如果当前组件为路由器
|
|
|
|
|
for conn in tag.connections: # 遍历该组件所有链接线
|
|
|
|
|
next_jump = conn.NobjE # 获取下一跳组件
|
|
|
|
|
if not is_same_subnet(tag.interface[conn.IfsS]["ip"], next_jump.interface[conn.IfsE]["ip"]): # 检查路由表是否能跳转到下一跳
|
|
|
|
|
self.message.show_message(f"路由器组件{tag.ObjLabel}路由表配置错误,请检查")
|
|
|
|
|
flag = False
|
|
|
|
|
# todo: 判断从所有主机发送数据包到其他所有主机能否成功
|
|
|
|
|
for sim_host in sim_host_list: # 遍历所有主机组件
|
|
|
|
|
sim_host: SimHost = sim_host
|
|
|
|
|
for destination_sim_host in sim_host_list: # 遍历所有主机组件
|
|
|
|
|
packet = SimPacket(sim_host.interface[0]['ip'],
|
|
|
|
|
sim_host.interface[0]['mac'],
|
|
|
|
|
destination_sim_host.interface[0]["ip"]) # 封装数据包,将发送主机信息和接收主机信息封装到数据包中
|
|
|
|
|
sim_host.send(packet) # 模拟发送
|
|
|
|
|
if not packet.flag: # 判断是否传输成功
|
|
|
|
|
self.message.show_message(f"主机{sim_host.ObjLabel}传输至{destination_sim_host.ObjLabel}失败,请检查配置!")
|
|
|
|
|
return flag
|
|
|
|
|
|
|
|
|
|
|