# -*- coding: utf-8 -*- # File: genetic_algorithm.py # Purpose: 实现基于遗传算法的路径规划,支持避开危险区域 import numpy as np import math import random from typing import List, Tuple, Dict, Optional import time class GeneticAlgorithm: """遗传算法路径规划器""" def __init__(self, grid_resolution: float = 5.0, population_size: int = 100, generations: int = 50, mutation_rate: float = 0.1, elite_size: int = 20, crossover_rate: float = 0.8): """ 初始化遗传算法 Args: grid_resolution: 网格分辨率 population_size: 种群大小 generations: 最大迭代代数 mutation_rate: 变异率 elite_size: 精英个体数量 crossover_rate: 交叉率 """ self.grid_resolution = grid_resolution self.population_size = population_size self.generations = generations self.mutation_rate = mutation_rate self.elite_size = elite_size self.crossover_rate = crossover_rate def plan(self, start: Tuple[float, float], goal: Tuple[float, float], map_width: int, map_height: int, threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]: """ 使用遗传算法规划路径 Args: start: 起点坐标 (x, y) goal: 终点坐标 (x, y) map_width: 地图宽度 map_height: 地图高度 threat_areas: 危险区域列表 obstacles: 障碍物列表 Returns: 路径点列表,如果找不到路径返回空列表 """ # 初始化种群 population = self._initialize_population(start, goal, map_width, map_height) # 进化迭代 best_route = None best_fitness = -1 for generation in range(self.generations): # 评估种群适应度 fitness_scores = self._evaluate_fitness(population, start, goal, threat_areas, obstacles) # 检查最佳个体 max_fitness_idx = np.argmax(fitness_scores) if fitness_scores[max_fitness_idx] > best_fitness: best_fitness = fitness_scores[max_fitness_idx] best_route = population[max_fitness_idx] # 如果找到了有效路径且适应度足够高,可以提前结束 if best_fitness > 0.8: break # 选择精英个体 elite_indices = np.argsort(fitness_scores)[-self.elite_size:] elites = [population[i] for i in elite_indices] # 选择父代 parents = self._selection(population, fitness_scores) # 交叉和变异产生新一代 new_population = elites.copy() # 精英保留 while len(new_population) < self.population_size: # 随机选择两个父代 parent1, parent2 = random.sample(parents, 2) # 交叉 if random.random() < self.crossover_rate: child = self._crossover(parent1, parent2) else: child = parent1.copy() # 变异 child = self._mutation(child, map_width, map_height) # 添加到新种群 new_population.append(child) # 更新种群 population = new_population[:self.population_size] # 返回最佳路径(如果找到) if best_route: return self._smooth_path(best_route, start, goal) return [] def _initialize_population(self, start: Tuple[float, float], goal: Tuple[float, float], map_width: int, map_height: int) -> List[List[Tuple[float, float]]]: """初始化种群""" population = [] # 直接连接起点和终点的个体 direct_route = [start, goal] population.append(direct_route) # 随机生成其他个体 for _ in range(self.population_size - 1): # 随机决定路径点数量(2-10个中间点) num_waypoints = random.randint(2, 10) # 创建包含起点和终点的路径 route = [start] # 添加随机中间点 for _ in range(num_waypoints): x = random.uniform(0, map_width) y = random.uniform(0, map_height) route.append((x, y)) route.append(goal) population.append(route) return population def _evaluate_fitness(self, population: List[List[Tuple[float, float]]], start: Tuple[float, float], goal: Tuple[float, float], threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> np.ndarray: """评估种群适应度""" fitness_scores = np.zeros(len(population)) for i, route in enumerate(population): # 检查路径是否包含起点和终点 if route[0] != start or route[-1] != goal: fitness_scores[i] = 0 continue # 路径长度(越短越好) path_length = self._calculate_path_length(route) length_fitness = 1.0 / (1.0 + path_length / 1000.0) # 归一化,短路径得高分 # 穿过危险区域的惩罚 danger_penalty = 0 for j in range(len(route) - 1): segment_danger = self._segment_danger(route[j], route[j+1], threat_areas, obstacles) danger_penalty += segment_danger danger_fitness = 1.0 / (1.0 + danger_penalty) # 归一化,无危险得高分 # 路径平滑度(转向次数和角度变化) smoothness = self._calculate_smoothness(route) smoothness_fitness = 1.0 / (1.0 + smoothness) # 归一化,平滑路径得高分 # 综合评分(权重可调整) fitness_scores[i] = 0.4 * length_fitness + 0.5 * danger_fitness + 0.1 * smoothness_fitness return fitness_scores def _calculate_path_length(self, route: List[Tuple[float, float]]) -> float: """计算路径总长度""" length = 0 for i in range(len(route) - 1): dx = route[i+1][0] - route[i][0] dy = route[i+1][1] - route[i][1] length += math.sqrt(dx*dx + dy*dy) return length def _segment_danger(self, p1: Tuple[float, float], p2: Tuple[float, float], threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> float: """计算路径段穿过危险区域的程度""" danger = 0 # 采样点检测 num_samples = max(int(self._distance(p1, p2) / self.grid_resolution), 5) for i in range(num_samples + 1): t = i / num_samples x = p1[0] + t * (p2[0] - p1[0]) y = p1[1] + t * (p2[1] - p1[1]) # 检查是否在危险区域内 if self._is_in_threat_areas(x, y, threat_areas): danger += 1 # 检查是否与障碍物碰撞 if obstacles and self._is_in_obstacles(x, y, obstacles): danger += 10 # 障碍物惩罚更高 return danger / (num_samples + 1) # 归一化 def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool: """检查点是否在危险区域中""" for area in threat_areas: area_type = area.get('type', '') if area_type == 'circle': center = area.get('center', (0, 0)) radius = area.get('radius', 0) distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) if distance <= radius: return True elif area_type == 'rectangle': rect = area.get('rect', (0, 0, 0, 0)) x1, y1, width, height = rect if x1 <= x <= x1 + width and y1 <= y <= y1 + height: return True elif area_type == 'polygon': points = area.get('points', []) if self._point_in_polygon(x, y, points): return True return False def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool: """检查点是否在多边形内(射线法)""" if len(polygon) < 3: return False inside = False j = len(polygon) - 1 for i in range(len(polygon)): xi, yi = polygon[i] xj, yj = polygon[j] # 检查点是否在多边形边上 if (yi == y and xi == x) or (yj == y and xj == x): return True # 射线法判断点是否在多边形内部 intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi) if intersect: inside = not inside j = i return inside def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool: """检查点是否在障碍物中""" for obstacle_x, obstacle_y in obstacles: distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2) if distance < self.grid_resolution: return True return False def _calculate_smoothness(self, route: List[Tuple[float, float]]) -> float: """计算路径的平滑度(转向角度变化的总和)""" if len(route) < 3: return 0 total_angle_change = 0 for i in range(1, len(route) - 1): v1 = (route[i][0] - route[i-1][0], route[i][1] - route[i-1][1]) v2 = (route[i+1][0] - route[i][0], route[i+1][1] - route[i][1]) # 归一化向量 len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1]) len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1]) if len1 > 0 and len2 > 0: v1_norm = (v1[0]/len1, v1[1]/len1) v2_norm = (v2[0]/len2, v2[1]/len2) # 计算点积 dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1] dot_product = max(-1, min(1, dot_product)) # 限制范围 # 计算角度变化 angle_change = math.acos(dot_product) total_angle_change += angle_change return total_angle_change def _selection(self, population: List[List[Tuple[float, float]]], fitness_scores: np.ndarray) -> List[List[Tuple[float, float]]]: """选择操作,使用轮盘赌(roulette wheel)方法""" # 轮盘赌选择 selection_probs = fitness_scores / np.sum(fitness_scores) selected_indices = np.random.choice( len(population), size=self.population_size, p=selection_probs, replace=True ) return [population[i] for i in selected_indices] def _crossover(self, route1: List[Tuple[float, float]], route2: List[Tuple[float, float]]) -> List[Tuple[float, float]]: """交叉操作,综合两个父代路径""" if len(route1) <= 2 or len(route2) <= 2: return route1.copy() if len(route1) > len(route2) else route2.copy() # 保证起点和终点 start = route1[0] end = route1[-1] # 从两条路径的中间点随机选择 mid_points1 = route1[1:-1] mid_points2 = route2[1:-1] # 选择交叉点 crossover_point1 = random.randint(0, len(mid_points1)) crossover_point2 = random.randint(0, len(mid_points2)) # 创建子代路径(保持起点和终点) child = [start] # 添加前半段和后半段 child.extend(mid_points1[:crossover_point1]) child.extend(mid_points2[crossover_point2:]) # 添加终点 child.append(end) return child def _mutation(self, route: List[Tuple[float, float]], map_width: int, map_height: int) -> List[Tuple[float, float]]: """变异操作,随机修改路径中的点""" # 复制路径以避免修改原始路径 mutated_route = route.copy() # 仅对中间点进行变异,保留起点和终点 for i in range(1, len(mutated_route) - 1): # 对每个点以变异率进行变异 if random.random() < self.mutation_rate: # 在原点附近随机偏移 delta_x = random.uniform(-50, 50) delta_y = random.uniform(-50, 50) new_x = max(0, min(map_width, mutated_route[i][0] + delta_x)) new_y = max(0, min(map_height, mutated_route[i][1] + delta_y)) mutated_route[i] = (new_x, new_y) # 有一定概率添加或删除中间点 if random.random() < self.mutation_rate and len(mutated_route) > 3: # 随机删除一个中间点 idx_to_remove = random.randint(1, len(mutated_route) - 2) mutated_route.pop(idx_to_remove) if random.random() < self.mutation_rate: # 随机添加一个中间点 idx_to_add = random.randint(1, len(mutated_route) - 1) prev_point = mutated_route[idx_to_add - 1] next_point = mutated_route[idx_to_add] # 在两点之间随机生成一个新点 new_x = (prev_point[0] + next_point[0]) / 2 + random.uniform(-20, 20) new_y = (prev_point[1] + next_point[1]) / 2 + random.uniform(-20, 20) new_x = max(0, min(map_width, new_x)) new_y = max(0, min(map_height, new_y)) mutated_route.insert(idx_to_add, (new_x, new_y)) return mutated_route def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float: """计算两点间距离""" return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2) def _smooth_path(self, route: List[Tuple[float, float]], start: Tuple[float, float], goal: Tuple[float, float]) -> List[Tuple[float, float]]: """平滑路径,去除冗余点""" # 确保路径起点和终点正确 if route[0] != start: route[0] = start if route[-1] != goal: route[-1] = goal # 如果路径点太少,直接返回 if len(route) <= 2: return route # 移除共线的冗余点 smoothed_route = [route[0]] # 保留起点 for i in range(1, len(route) - 1): prev = smoothed_route[-1] curr = route[i] next_point = route[i + 1] # 检查三点是否共线(或接近共线) v1 = (curr[0] - prev[0], curr[1] - prev[1]) v2 = (next_point[0] - curr[0], next_point[1] - curr[1]) len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1]) len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1]) # 避免除零错误 if len1 > 0.001 and len2 > 0.001: # 归一化向量 v1_norm = (v1[0]/len1, v1[1]/len1) v2_norm = (v2[0]/len2, v2[1]/len2) # 计算点积 dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1] # 如果三点不共线,保留中间点 if abs(dot_product) < 0.98: # 角度差异大于约11度 smoothed_route.append(curr) else: # 如果点太近,可以考虑跳过 if len1 > self.grid_resolution or len2 > self.grid_resolution: smoothed_route.append(curr) smoothed_route.append(route[-1]) # 保留终点 return smoothed_route def smooth_path(self, path: List[Tuple[float, float]], threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None, weight_data: float = 0.5, weight_smooth: float = 0.3, tolerance: float = 0.000001) -> List[Tuple[float, float]]: """ 使用平滑算法优化路径 (与A*接口兼容) Args: path: 原始路径 threat_areas: 危险区域 obstacles: 障碍物 weight_data: 数据权重 weight_smooth: 平滑权重 tolerance: 收敛阈值 Returns: 平滑后的路径 """ # 如果路径点太少,不需要平滑 if len(path) <= 2: return path # 将路径转换为numpy数组,方便操作 path_array = np.array(path) smooth_path = path_array.copy() # 迭代优化 change = tolerance while change >= tolerance: change = 0.0 # 对每个点进行优化(除了起点和终点) for i in range(1, len(path) - 1): for j in range(2): # x, y aux = smooth_path[i][j] smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j]) smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j]) change += abs(aux - smooth_path[i][j]) # 检查平滑后的路径是否经过危险区域或障碍物 for i in range(len(smooth_path)): x, y = smooth_path[i] if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)): return path # 如果平滑后路径穿过危险区域,返回原路径 return [(x, y) for x, y in smooth_path]