|
|
# -*- coding: utf-8 -*-
|
|
|
# File: genetic_algorithm.py
|
|
|
# Purpose: 实现基于遗传算法的路径规划,支持避开危险区域
|
|
|
|
|
|
import numpy as np
|
|
|
import math
|
|
|
import random
|
|
|
from typing import List, Tuple, Dict, Optional
|
|
|
import time
|
|
|
|
|
|
class GeneticAlgorithm:
|
|
|
"""遗传算法路径规划器"""
|
|
|
|
|
|
def __init__(self,
|
|
|
grid_resolution: float = 5.0,
|
|
|
population_size: int = 100,
|
|
|
generations: int = 50,
|
|
|
mutation_rate: float = 0.1,
|
|
|
elite_size: int = 20,
|
|
|
crossover_rate: float = 0.8):
|
|
|
"""
|
|
|
初始化遗传算法
|
|
|
|
|
|
Args:
|
|
|
grid_resolution: 网格分辨率
|
|
|
population_size: 种群大小
|
|
|
generations: 最大迭代代数
|
|
|
mutation_rate: 变异率
|
|
|
elite_size: 精英个体数量
|
|
|
crossover_rate: 交叉率
|
|
|
"""
|
|
|
self.grid_resolution = grid_resolution
|
|
|
self.population_size = population_size
|
|
|
self.generations = generations
|
|
|
self.mutation_rate = mutation_rate
|
|
|
self.elite_size = elite_size
|
|
|
self.crossover_rate = crossover_rate
|
|
|
|
|
|
def plan(self, start: Tuple[float, float],
|
|
|
goal: Tuple[float, float],
|
|
|
map_width: int, map_height: int,
|
|
|
threat_areas: List[Dict],
|
|
|
obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]:
|
|
|
"""
|
|
|
使用遗传算法规划路径
|
|
|
|
|
|
Args:
|
|
|
start: 起点坐标 (x, y)
|
|
|
goal: 终点坐标 (x, y)
|
|
|
map_width: 地图宽度
|
|
|
map_height: 地图高度
|
|
|
threat_areas: 危险区域列表
|
|
|
obstacles: 障碍物列表
|
|
|
|
|
|
Returns:
|
|
|
路径点列表,如果找不到路径返回空列表
|
|
|
"""
|
|
|
# 初始化种群
|
|
|
population = self._initialize_population(start, goal, map_width, map_height)
|
|
|
|
|
|
# 进化迭代
|
|
|
best_route = None
|
|
|
best_fitness = -1
|
|
|
|
|
|
for generation in range(self.generations):
|
|
|
# 评估种群适应度
|
|
|
fitness_scores = self._evaluate_fitness(population, start, goal, threat_areas, obstacles)
|
|
|
|
|
|
# 检查最佳个体
|
|
|
max_fitness_idx = np.argmax(fitness_scores)
|
|
|
if fitness_scores[max_fitness_idx] > best_fitness:
|
|
|
best_fitness = fitness_scores[max_fitness_idx]
|
|
|
best_route = population[max_fitness_idx]
|
|
|
|
|
|
# 如果找到了有效路径且适应度足够高,可以提前结束
|
|
|
if best_fitness > 0.8:
|
|
|
break
|
|
|
|
|
|
# 选择精英个体
|
|
|
elite_indices = np.argsort(fitness_scores)[-self.elite_size:]
|
|
|
elites = [population[i] for i in elite_indices]
|
|
|
|
|
|
# 选择父代
|
|
|
parents = self._selection(population, fitness_scores)
|
|
|
|
|
|
# 交叉和变异产生新一代
|
|
|
new_population = elites.copy() # 精英保留
|
|
|
|
|
|
while len(new_population) < self.population_size:
|
|
|
# 随机选择两个父代
|
|
|
parent1, parent2 = random.sample(parents, 2)
|
|
|
|
|
|
# 交叉
|
|
|
if random.random() < self.crossover_rate:
|
|
|
child = self._crossover(parent1, parent2)
|
|
|
else:
|
|
|
child = parent1.copy()
|
|
|
|
|
|
# 变异
|
|
|
child = self._mutation(child, map_width, map_height)
|
|
|
|
|
|
# 添加到新种群
|
|
|
new_population.append(child)
|
|
|
|
|
|
# 更新种群
|
|
|
population = new_population[:self.population_size]
|
|
|
|
|
|
# 返回最佳路径(如果找到)
|
|
|
if best_route:
|
|
|
return self._smooth_path(best_route, start, goal)
|
|
|
return []
|
|
|
|
|
|
def _initialize_population(self, start: Tuple[float, float], goal: Tuple[float, float],
|
|
|
map_width: int, map_height: int) -> List[List[Tuple[float, float]]]:
|
|
|
"""初始化种群"""
|
|
|
population = []
|
|
|
|
|
|
# 直接连接起点和终点的个体
|
|
|
direct_route = [start, goal]
|
|
|
population.append(direct_route)
|
|
|
|
|
|
# 随机生成其他个体
|
|
|
for _ in range(self.population_size - 1):
|
|
|
# 随机决定路径点数量(2-10个中间点)
|
|
|
num_waypoints = random.randint(2, 10)
|
|
|
|
|
|
# 创建包含起点和终点的路径
|
|
|
route = [start]
|
|
|
|
|
|
# 添加随机中间点
|
|
|
for _ in range(num_waypoints):
|
|
|
x = random.uniform(0, map_width)
|
|
|
y = random.uniform(0, map_height)
|
|
|
route.append((x, y))
|
|
|
|
|
|
route.append(goal)
|
|
|
population.append(route)
|
|
|
|
|
|
return population
|
|
|
|
|
|
def _evaluate_fitness(self, population: List[List[Tuple[float, float]]],
|
|
|
start: Tuple[float, float], goal: Tuple[float, float],
|
|
|
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> np.ndarray:
|
|
|
"""评估种群适应度"""
|
|
|
fitness_scores = np.zeros(len(population))
|
|
|
|
|
|
for i, route in enumerate(population):
|
|
|
# 检查路径是否包含起点和终点
|
|
|
if route[0] != start or route[-1] != goal:
|
|
|
fitness_scores[i] = 0
|
|
|
continue
|
|
|
|
|
|
# 路径长度(越短越好)
|
|
|
path_length = self._calculate_path_length(route)
|
|
|
length_fitness = 1.0 / (1.0 + path_length / 1000.0) # 归一化,短路径得高分
|
|
|
|
|
|
# 穿过危险区域的惩罚
|
|
|
danger_penalty = 0
|
|
|
for j in range(len(route) - 1):
|
|
|
segment_danger = self._segment_danger(route[j], route[j+1], threat_areas, obstacles)
|
|
|
danger_penalty += segment_danger
|
|
|
|
|
|
danger_fitness = 1.0 / (1.0 + danger_penalty) # 归一化,无危险得高分
|
|
|
|
|
|
# 路径平滑度(转向次数和角度变化)
|
|
|
smoothness = self._calculate_smoothness(route)
|
|
|
smoothness_fitness = 1.0 / (1.0 + smoothness) # 归一化,平滑路径得高分
|
|
|
|
|
|
# 综合评分(权重可调整)
|
|
|
fitness_scores[i] = 0.4 * length_fitness + 0.5 * danger_fitness + 0.1 * smoothness_fitness
|
|
|
|
|
|
return fitness_scores
|
|
|
|
|
|
def _calculate_path_length(self, route: List[Tuple[float, float]]) -> float:
|
|
|
"""计算路径总长度"""
|
|
|
length = 0
|
|
|
for i in range(len(route) - 1):
|
|
|
dx = route[i+1][0] - route[i][0]
|
|
|
dy = route[i+1][1] - route[i][1]
|
|
|
length += math.sqrt(dx*dx + dy*dy)
|
|
|
return length
|
|
|
|
|
|
def _segment_danger(self, p1: Tuple[float, float], p2: Tuple[float, float],
|
|
|
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> float:
|
|
|
"""计算路径段穿过危险区域的程度"""
|
|
|
danger = 0
|
|
|
|
|
|
# 采样点检测
|
|
|
num_samples = max(int(self._distance(p1, p2) / self.grid_resolution), 5)
|
|
|
|
|
|
for i in range(num_samples + 1):
|
|
|
t = i / num_samples
|
|
|
x = p1[0] + t * (p2[0] - p1[0])
|
|
|
y = p1[1] + t * (p2[1] - p1[1])
|
|
|
|
|
|
# 检查是否在危险区域内
|
|
|
if self._is_in_threat_areas(x, y, threat_areas):
|
|
|
danger += 1
|
|
|
|
|
|
# 检查是否与障碍物碰撞
|
|
|
if obstacles and self._is_in_obstacles(x, y, obstacles):
|
|
|
danger += 10 # 障碍物惩罚更高
|
|
|
|
|
|
return danger / (num_samples + 1) # 归一化
|
|
|
|
|
|
def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool:
|
|
|
"""检查点是否在危险区域中"""
|
|
|
for area in threat_areas:
|
|
|
area_type = area.get('type', '')
|
|
|
|
|
|
if area_type == 'circle':
|
|
|
center = area.get('center', (0, 0))
|
|
|
radius = area.get('radius', 0)
|
|
|
distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
|
|
|
if distance <= radius:
|
|
|
return True
|
|
|
|
|
|
elif area_type == 'rectangle':
|
|
|
rect = area.get('rect', (0, 0, 0, 0))
|
|
|
x1, y1, width, height = rect
|
|
|
if x1 <= x <= x1 + width and y1 <= y <= y1 + height:
|
|
|
return True
|
|
|
|
|
|
elif area_type == 'polygon':
|
|
|
points = area.get('points', [])
|
|
|
if self._point_in_polygon(x, y, points):
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool:
|
|
|
"""检查点是否在多边形内(射线法)"""
|
|
|
if len(polygon) < 3:
|
|
|
return False
|
|
|
|
|
|
inside = False
|
|
|
j = len(polygon) - 1
|
|
|
|
|
|
for i in range(len(polygon)):
|
|
|
xi, yi = polygon[i]
|
|
|
xj, yj = polygon[j]
|
|
|
|
|
|
# 检查点是否在多边形边上
|
|
|
if (yi == y and xi == x) or (yj == y and xj == x):
|
|
|
return True
|
|
|
|
|
|
# 射线法判断点是否在多边形内部
|
|
|
intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi)
|
|
|
if intersect:
|
|
|
inside = not inside
|
|
|
|
|
|
j = i
|
|
|
|
|
|
return inside
|
|
|
|
|
|
def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool:
|
|
|
"""检查点是否在障碍物中"""
|
|
|
for obstacle_x, obstacle_y in obstacles:
|
|
|
distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2)
|
|
|
if distance < self.grid_resolution:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def _calculate_smoothness(self, route: List[Tuple[float, float]]) -> float:
|
|
|
"""计算路径的平滑度(转向角度变化的总和)"""
|
|
|
if len(route) < 3:
|
|
|
return 0
|
|
|
|
|
|
total_angle_change = 0
|
|
|
|
|
|
for i in range(1, len(route) - 1):
|
|
|
v1 = (route[i][0] - route[i-1][0], route[i][1] - route[i-1][1])
|
|
|
v2 = (route[i+1][0] - route[i][0], route[i+1][1] - route[i][1])
|
|
|
|
|
|
# 归一化向量
|
|
|
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
|
|
|
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
|
|
|
|
|
|
if len1 > 0 and len2 > 0:
|
|
|
v1_norm = (v1[0]/len1, v1[1]/len1)
|
|
|
v2_norm = (v2[0]/len2, v2[1]/len2)
|
|
|
|
|
|
# 计算点积
|
|
|
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
|
|
|
dot_product = max(-1, min(1, dot_product)) # 限制范围
|
|
|
|
|
|
# 计算角度变化
|
|
|
angle_change = math.acos(dot_product)
|
|
|
total_angle_change += angle_change
|
|
|
|
|
|
return total_angle_change
|
|
|
|
|
|
def _selection(self, population: List[List[Tuple[float, float]]], fitness_scores: np.ndarray) -> List[List[Tuple[float, float]]]:
|
|
|
"""选择操作,使用轮盘赌(roulette wheel)方法"""
|
|
|
# 轮盘赌选择
|
|
|
selection_probs = fitness_scores / np.sum(fitness_scores)
|
|
|
selected_indices = np.random.choice(
|
|
|
len(population),
|
|
|
size=self.population_size,
|
|
|
p=selection_probs,
|
|
|
replace=True
|
|
|
)
|
|
|
|
|
|
return [population[i] for i in selected_indices]
|
|
|
|
|
|
def _crossover(self, route1: List[Tuple[float, float]], route2: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
|
|
|
"""交叉操作,综合两个父代路径"""
|
|
|
if len(route1) <= 2 or len(route2) <= 2:
|
|
|
return route1.copy() if len(route1) > len(route2) else route2.copy()
|
|
|
|
|
|
# 保证起点和终点
|
|
|
start = route1[0]
|
|
|
end = route1[-1]
|
|
|
|
|
|
# 从两条路径的中间点随机选择
|
|
|
mid_points1 = route1[1:-1]
|
|
|
mid_points2 = route2[1:-1]
|
|
|
|
|
|
# 选择交叉点
|
|
|
crossover_point1 = random.randint(0, len(mid_points1))
|
|
|
crossover_point2 = random.randint(0, len(mid_points2))
|
|
|
|
|
|
# 创建子代路径(保持起点和终点)
|
|
|
child = [start]
|
|
|
|
|
|
# 添加前半段和后半段
|
|
|
child.extend(mid_points1[:crossover_point1])
|
|
|
child.extend(mid_points2[crossover_point2:])
|
|
|
|
|
|
# 添加终点
|
|
|
child.append(end)
|
|
|
|
|
|
return child
|
|
|
|
|
|
def _mutation(self, route: List[Tuple[float, float]], map_width: int, map_height: int) -> List[Tuple[float, float]]:
|
|
|
"""变异操作,随机修改路径中的点"""
|
|
|
# 复制路径以避免修改原始路径
|
|
|
mutated_route = route.copy()
|
|
|
|
|
|
# 仅对中间点进行变异,保留起点和终点
|
|
|
for i in range(1, len(mutated_route) - 1):
|
|
|
# 对每个点以变异率进行变异
|
|
|
if random.random() < self.mutation_rate:
|
|
|
# 在原点附近随机偏移
|
|
|
delta_x = random.uniform(-50, 50)
|
|
|
delta_y = random.uniform(-50, 50)
|
|
|
|
|
|
new_x = max(0, min(map_width, mutated_route[i][0] + delta_x))
|
|
|
new_y = max(0, min(map_height, mutated_route[i][1] + delta_y))
|
|
|
|
|
|
mutated_route[i] = (new_x, new_y)
|
|
|
|
|
|
# 有一定概率添加或删除中间点
|
|
|
if random.random() < self.mutation_rate and len(mutated_route) > 3:
|
|
|
# 随机删除一个中间点
|
|
|
idx_to_remove = random.randint(1, len(mutated_route) - 2)
|
|
|
mutated_route.pop(idx_to_remove)
|
|
|
|
|
|
if random.random() < self.mutation_rate:
|
|
|
# 随机添加一个中间点
|
|
|
idx_to_add = random.randint(1, len(mutated_route) - 1)
|
|
|
prev_point = mutated_route[idx_to_add - 1]
|
|
|
next_point = mutated_route[idx_to_add]
|
|
|
|
|
|
# 在两点之间随机生成一个新点
|
|
|
new_x = (prev_point[0] + next_point[0]) / 2 + random.uniform(-20, 20)
|
|
|
new_y = (prev_point[1] + next_point[1]) / 2 + random.uniform(-20, 20)
|
|
|
|
|
|
new_x = max(0, min(map_width, new_x))
|
|
|
new_y = max(0, min(map_height, new_y))
|
|
|
|
|
|
mutated_route.insert(idx_to_add, (new_x, new_y))
|
|
|
|
|
|
return mutated_route
|
|
|
|
|
|
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
|
|
|
"""计算两点间距离"""
|
|
|
return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
|
|
|
|
|
|
def _smooth_path(self, route: List[Tuple[float, float]], start: Tuple[float, float], goal: Tuple[float, float]) -> List[Tuple[float, float]]:
|
|
|
"""平滑路径,去除冗余点"""
|
|
|
# 确保路径起点和终点正确
|
|
|
if route[0] != start:
|
|
|
route[0] = start
|
|
|
if route[-1] != goal:
|
|
|
route[-1] = goal
|
|
|
|
|
|
# 如果路径点太少,直接返回
|
|
|
if len(route) <= 2:
|
|
|
return route
|
|
|
|
|
|
# 移除共线的冗余点
|
|
|
smoothed_route = [route[0]] # 保留起点
|
|
|
|
|
|
for i in range(1, len(route) - 1):
|
|
|
prev = smoothed_route[-1]
|
|
|
curr = route[i]
|
|
|
next_point = route[i + 1]
|
|
|
|
|
|
# 检查三点是否共线(或接近共线)
|
|
|
v1 = (curr[0] - prev[0], curr[1] - prev[1])
|
|
|
v2 = (next_point[0] - curr[0], next_point[1] - curr[1])
|
|
|
|
|
|
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
|
|
|
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
|
|
|
|
|
|
# 避免除零错误
|
|
|
if len1 > 0.001 and len2 > 0.001:
|
|
|
# 归一化向量
|
|
|
v1_norm = (v1[0]/len1, v1[1]/len1)
|
|
|
v2_norm = (v2[0]/len2, v2[1]/len2)
|
|
|
|
|
|
# 计算点积
|
|
|
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
|
|
|
|
|
|
# 如果三点不共线,保留中间点
|
|
|
if abs(dot_product) < 0.98: # 角度差异大于约11度
|
|
|
smoothed_route.append(curr)
|
|
|
else:
|
|
|
# 如果点太近,可以考虑跳过
|
|
|
if len1 > self.grid_resolution or len2 > self.grid_resolution:
|
|
|
smoothed_route.append(curr)
|
|
|
|
|
|
smoothed_route.append(route[-1]) # 保留终点
|
|
|
|
|
|
return smoothed_route
|
|
|
|
|
|
def smooth_path(self, path: List[Tuple[float, float]],
|
|
|
threat_areas: List[Dict],
|
|
|
obstacles: List[Tuple[float, float]] = None,
|
|
|
weight_data: float = 0.5,
|
|
|
weight_smooth: float = 0.3,
|
|
|
tolerance: float = 0.000001) -> List[Tuple[float, float]]:
|
|
|
"""
|
|
|
使用平滑算法优化路径 (与A*接口兼容)
|
|
|
|
|
|
Args:
|
|
|
path: 原始路径
|
|
|
threat_areas: 危险区域
|
|
|
obstacles: 障碍物
|
|
|
weight_data: 数据权重
|
|
|
weight_smooth: 平滑权重
|
|
|
tolerance: 收敛阈值
|
|
|
|
|
|
Returns:
|
|
|
平滑后的路径
|
|
|
"""
|
|
|
# 如果路径点太少,不需要平滑
|
|
|
if len(path) <= 2:
|
|
|
return path
|
|
|
|
|
|
# 将路径转换为numpy数组,方便操作
|
|
|
path_array = np.array(path)
|
|
|
smooth_path = path_array.copy()
|
|
|
|
|
|
# 迭代优化
|
|
|
change = tolerance
|
|
|
while change >= tolerance:
|
|
|
change = 0.0
|
|
|
|
|
|
# 对每个点进行优化(除了起点和终点)
|
|
|
for i in range(1, len(path) - 1):
|
|
|
for j in range(2): # x, y
|
|
|
aux = smooth_path[i][j]
|
|
|
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
|
|
|
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
|
|
|
change += abs(aux - smooth_path[i][j])
|
|
|
|
|
|
# 检查平滑后的路径是否经过危险区域或障碍物
|
|
|
for i in range(len(smooth_path)):
|
|
|
x, y = smooth_path[i]
|
|
|
if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)):
|
|
|
return path # 如果平滑后路径穿过危险区域,返回原路径
|
|
|
|
|
|
return [(x, y) for x, y in smooth_path] |