# -*- coding: utf-8 -*- # File: rrt_algorithm.py # Purpose: 实现RRT(快速随机树)路径规划算法,支持避开危险区域 import numpy as np import math import random from typing import List, Tuple, Dict, Optional class Node: """RRT树节点""" def __init__(self, x: float, y: float): self.x = x self.y = y self.parent = None self.cost = 0.0 # 从起点到当前节点的代价 class RRTAlgorithm: """RRT路径规划算法""" def __init__(self, grid_resolution: float = 5.0, max_iterations: int = 2000, step_size: float = 20.0, goal_sample_rate: float = 0.1, search_radius: float = 50.0): """ 初始化RRT算法 Args: grid_resolution: 网格分辨率 max_iterations: 最大迭代次数 step_size: 步长 goal_sample_rate: 采样目标点的概率 search_radius: 搜索半径 """ self.grid_resolution = grid_resolution self.max_iterations = max_iterations self.step_size = step_size self.goal_sample_rate = goal_sample_rate self.search_radius = search_radius def plan(self, start: Tuple[float, float], goal: Tuple[float, float], map_width: int, map_height: int, threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]: """ 使用RRT算法规划路径 Args: start: 起点坐标 (x, y) goal: 终点坐标 (x, y) map_width: 地图宽度 map_height: 地图高度 threat_areas: 危险区域列表 obstacles: 障碍物列表 Returns: 路径点列表,如果找不到路径返回空列表 """ # 创建起点和终点节点 start_node = Node(start[0], start[1]) goal_node = Node(goal[0], goal[1]) # 初始化树 tree = [start_node] # 迭代构建树 for i in range(self.max_iterations): # 以一定概率直接取目标点,提高搜索效率 if random.random() < self.goal_sample_rate: random_point = (goal_node.x, goal_node.y) else: # 随机采样点 random_point = ( random.uniform(0, map_width), random.uniform(0, map_height) ) # 找到树中离随机点最近的节点 nearest_node = self._find_nearest_node(tree, random_point) # 朝随机点方向扩展固定步长 new_node = self._steer(nearest_node, random_point, self.step_size) # 检查路径是否有效 if new_node and self._is_path_valid(nearest_node, new_node, threat_areas, obstacles): # 将新节点添加到树中 new_node.parent = nearest_node new_node.cost = nearest_node.cost + self._distance((nearest_node.x, nearest_node.y), (new_node.x, new_node.y)) tree.append(new_node) # 检查是否可以连接到目标点 dist_to_goal = self._distance((new_node.x, new_node.y), (goal_node.x, goal_node.y)) if dist_to_goal < self.step_size: # 尝试直接连接到目标点 if self._is_path_valid(new_node, goal_node, threat_areas, obstacles): goal_node.parent = new_node goal_node.cost = new_node.cost + dist_to_goal # 找到路径,提前结束 return self._extract_path(goal_node) # RRT* 优化 (可选,提高路径质量) # 尝试重新连接附近节点以获得更优路径 self._rewire(tree, new_node, threat_areas, obstacles) # 如果达到最大迭代次数但仍未找到到达目标的路径 # 尝试找到离目标最近的节点,并返回到该节点的路径 closest_node = self._find_nearest_node(tree, (goal_node.x, goal_node.y)) path = self._extract_path(closest_node) # 如果最近节点离目标足够近,则认为找到了路径 if self._distance((closest_node.x, closest_node.y), (goal_node.x, goal_node.y)) < self.step_size * 2: return path # 否则找不到有效路径,返回空列表 return [] def _find_nearest_node(self, tree: List[Node], point: Tuple[float, float]) -> Node: """找到树中离指定点最近的节点""" min_dist = float('inf') nearest_node = None for node in tree: dist = self._distance((node.x, node.y), point) if dist < min_dist: min_dist = dist nearest_node = node return nearest_node def _steer(self, from_node: Node, to_point: Tuple[float, float], step_size: float) -> Optional[Node]: """从起始节点向目标点方向移动固定步长""" dx = to_point[0] - from_node.x dy = to_point[1] - from_node.y dist = math.sqrt(dx*dx + dy*dy) if dist < 0.0001: # 距离近乎为0 return None # 按照指定的步长移动 ratio = min(step_size / dist, 1.0) new_x = from_node.x + dx * ratio new_y = from_node.y + dy * ratio new_node = Node(new_x, new_y) return new_node def _is_path_valid(self, from_node: Node, to_node: Node, threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> bool: """检查两个节点之间的路径是否有效(不经过障碍物和危险区域)""" # 采样点检测 dist = self._distance((from_node.x, from_node.y), (to_node.x, to_node.y)) num_samples = max(int(dist / self.grid_resolution), 5) for i in range(num_samples + 1): t = i / num_samples x = from_node.x + t * (to_node.x - from_node.x) y = from_node.y + t * (to_node.y - from_node.y) # 检查是否在危险区域内 if self._is_in_threat_areas(x, y, threat_areas): return False # 检查是否与障碍物碰撞 if obstacles and self._is_in_obstacles(x, y, obstacles): return False return True def _rewire(self, tree: List[Node], new_node: Node, threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> None: """重新连接树中节点以优化路径代价 (RRT* 算法的核心步骤)""" # 找到搜索半径内的所有节点 near_nodes = [] for node in tree: if node is not new_node: # 排除新节点本身 dist = self._distance((node.x, node.y), (new_node.x, new_node.y)) if dist < self.search_radius: near_nodes.append(node) # 对于搜索半径内的每个节点,检查是否通过新节点创建的路径更好 for node in near_nodes: # 计算通过新节点到达当前节点的潜在代价 potential_cost = new_node.cost + self._distance((new_node.x, new_node.y), (node.x, node.y)) # 如果代价更低且路径有效,则重新连接 if potential_cost < node.cost and self._is_path_valid(new_node, node, threat_areas, obstacles): node.parent = new_node node.cost = potential_cost def _extract_path(self, goal_node: Node) -> List[Tuple[float, float]]: """从目标节点回溯生成路径""" path = [] current = goal_node # 回溯到起点 while current: path.append((current.x, current.y)) current = current.parent # 反转路径顺序(从起点到终点) return list(reversed(path)) def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float: """计算两点间欧几里得距离""" return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2) def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool: """检查点是否在危险区域中""" for area in threat_areas: area_type = area.get('type', '') if area_type == 'circle': center = area.get('center', (0, 0)) radius = area.get('radius', 0) distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) if distance <= radius: return True elif area_type == 'rectangle': rect = area.get('rect', (0, 0, 0, 0)) x1, y1, width, height = rect if x1 <= x <= x1 + width and y1 <= y <= y1 + height: return True elif area_type == 'polygon': points = area.get('points', []) if self._point_in_polygon(x, y, points): return True return False def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool: """检查点是否在多边形内(射线法)""" if len(polygon) < 3: return False inside = False j = len(polygon) - 1 for i in range(len(polygon)): xi, yi = polygon[i] xj, yj = polygon[j] # 检查点是否在多边形边上 if (yi == y and xi == x) or (yj == y and xj == x): return True # 射线法判断点是否在多边形内部 intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi) if intersect: inside = not inside j = i return inside def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool: """检查点是否在障碍物中""" for obstacle_x, obstacle_y in obstacles: distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2) if distance < self.grid_resolution: return True return False def smooth_path(self, path: List[Tuple[float, float]], threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None, weight_data: float = 0.5, weight_smooth: float = 0.3, tolerance: float = 0.000001) -> List[Tuple[float, float]]: """ 使用平滑算法优化路径 (与A*接口兼容) Args: path: 原始路径 threat_areas: 危险区域 obstacles: 障碍物 weight_data: 数据权重 weight_smooth: 平滑权重 tolerance: 收敛阈值 Returns: 平滑后的路径 """ # 如果路径点太少,不需要平滑 if len(path) <= 2: return path # 将路径转换为numpy数组,方便操作 path_array = np.array(path) smooth_path = path_array.copy() # 迭代优化 change = tolerance while change >= tolerance: change = 0.0 # 对每个点进行优化(除了起点和终点) for i in range(1, len(path) - 1): for j in range(2): # x, y aux = smooth_path[i][j] smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j]) smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j]) change += abs(aux - smooth_path[i][j]) # 检查平滑后的路径是否经过危险区域或障碍物 for i in range(len(smooth_path)): x, y = smooth_path[i] if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)): return path # 如果平滑后路径穿过危险区域,返回原路径 return [(x, y) for x, y in smooth_path]