You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
project/Src/command_center/genetic_algorithm.py

475 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# File: genetic_algorithm.py
# Purpose: 实现基于遗传算法的路径规划,支持避开危险区域
import numpy as np
import math
import random
from typing import List, Tuple, Dict, Optional
import time
class GeneticAlgorithm:
"""遗传算法路径规划器"""
def __init__(self,
grid_resolution: float = 5.0,
population_size: int = 100,
generations: int = 50,
mutation_rate: float = 0.1,
elite_size: int = 20,
crossover_rate: float = 0.8):
"""
初始化遗传算法
Args:
grid_resolution: 网格分辨率
population_size: 种群大小
generations: 最大迭代代数
mutation_rate: 变异率
elite_size: 精英个体数量
crossover_rate: 交叉率
"""
self.grid_resolution = grid_resolution
self.population_size = population_size
self.generations = generations
self.mutation_rate = mutation_rate
self.elite_size = elite_size
self.crossover_rate = crossover_rate
def plan(self, start: Tuple[float, float],
goal: Tuple[float, float],
map_width: int, map_height: int,
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]:
"""
使用遗传算法规划路径
Args:
start: 起点坐标 (x, y)
goal: 终点坐标 (x, y)
map_width: 地图宽度
map_height: 地图高度
threat_areas: 危险区域列表
obstacles: 障碍物列表
Returns:
路径点列表,如果找不到路径返回空列表
"""
# 初始化种群
population = self._initialize_population(start, goal, map_width, map_height)
# 进化迭代
best_route = None
best_fitness = -1
for generation in range(self.generations):
# 评估种群适应度
fitness_scores = self._evaluate_fitness(population, start, goal, threat_areas, obstacles)
# 检查最佳个体
max_fitness_idx = np.argmax(fitness_scores)
if fitness_scores[max_fitness_idx] > best_fitness:
best_fitness = fitness_scores[max_fitness_idx]
best_route = population[max_fitness_idx]
# 如果找到了有效路径且适应度足够高,可以提前结束
if best_fitness > 0.8:
break
# 选择精英个体
elite_indices = np.argsort(fitness_scores)[-self.elite_size:]
elites = [population[i] for i in elite_indices]
# 选择父代
parents = self._selection(population, fitness_scores)
# 交叉和变异产生新一代
new_population = elites.copy() # 精英保留
while len(new_population) < self.population_size:
# 随机选择两个父代
parent1, parent2 = random.sample(parents, 2)
# 交叉
if random.random() < self.crossover_rate:
child = self._crossover(parent1, parent2)
else:
child = parent1.copy()
# 变异
child = self._mutation(child, map_width, map_height)
# 添加到新种群
new_population.append(child)
# 更新种群
population = new_population[:self.population_size]
# 返回最佳路径(如果找到)
if best_route:
return self._smooth_path(best_route, start, goal)
return []
def _initialize_population(self, start: Tuple[float, float], goal: Tuple[float, float],
map_width: int, map_height: int) -> List[List[Tuple[float, float]]]:
"""初始化种群"""
population = []
# 直接连接起点和终点的个体
direct_route = [start, goal]
population.append(direct_route)
# 随机生成其他个体
for _ in range(self.population_size - 1):
# 随机决定路径点数量2-10个中间点
num_waypoints = random.randint(2, 10)
# 创建包含起点和终点的路径
route = [start]
# 添加随机中间点
for _ in range(num_waypoints):
x = random.uniform(0, map_width)
y = random.uniform(0, map_height)
route.append((x, y))
route.append(goal)
population.append(route)
return population
def _evaluate_fitness(self, population: List[List[Tuple[float, float]]],
start: Tuple[float, float], goal: Tuple[float, float],
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> np.ndarray:
"""评估种群适应度"""
fitness_scores = np.zeros(len(population))
for i, route in enumerate(population):
# 检查路径是否包含起点和终点
if route[0] != start or route[-1] != goal:
fitness_scores[i] = 0
continue
# 路径长度(越短越好)
path_length = self._calculate_path_length(route)
length_fitness = 1.0 / (1.0 + path_length / 1000.0) # 归一化,短路径得高分
# 穿过危险区域的惩罚
danger_penalty = 0
for j in range(len(route) - 1):
segment_danger = self._segment_danger(route[j], route[j+1], threat_areas, obstacles)
danger_penalty += segment_danger
danger_fitness = 1.0 / (1.0 + danger_penalty) # 归一化,无危险得高分
# 路径平滑度(转向次数和角度变化)
smoothness = self._calculate_smoothness(route)
smoothness_fitness = 1.0 / (1.0 + smoothness) # 归一化,平滑路径得高分
# 综合评分(权重可调整)
fitness_scores[i] = 0.4 * length_fitness + 0.5 * danger_fitness + 0.1 * smoothness_fitness
return fitness_scores
def _calculate_path_length(self, route: List[Tuple[float, float]]) -> float:
"""计算路径总长度"""
length = 0
for i in range(len(route) - 1):
dx = route[i+1][0] - route[i][0]
dy = route[i+1][1] - route[i][1]
length += math.sqrt(dx*dx + dy*dy)
return length
def _segment_danger(self, p1: Tuple[float, float], p2: Tuple[float, float],
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> float:
"""计算路径段穿过危险区域的程度"""
danger = 0
# 采样点检测
num_samples = max(int(self._distance(p1, p2) / self.grid_resolution), 5)
for i in range(num_samples + 1):
t = i / num_samples
x = p1[0] + t * (p2[0] - p1[0])
y = p1[1] + t * (p2[1] - p1[1])
# 检查是否在危险区域内
if self._is_in_threat_areas(x, y, threat_areas):
danger += 1
# 检查是否与障碍物碰撞
if obstacles and self._is_in_obstacles(x, y, obstacles):
danger += 10 # 障碍物惩罚更高
return danger / (num_samples + 1) # 归一化
def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool:
"""检查点是否在危险区域中"""
for area in threat_areas:
area_type = area.get('type', '')
if area_type == 'circle':
center = area.get('center', (0, 0))
radius = area.get('radius', 0)
distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
if distance <= radius:
return True
elif area_type == 'rectangle':
rect = area.get('rect', (0, 0, 0, 0))
x1, y1, width, height = rect
if x1 <= x <= x1 + width and y1 <= y <= y1 + height:
return True
elif area_type == 'polygon':
points = area.get('points', [])
if self._point_in_polygon(x, y, points):
return True
return False
def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool:
"""检查点是否在多边形内(射线法)"""
if len(polygon) < 3:
return False
inside = False
j = len(polygon) - 1
for i in range(len(polygon)):
xi, yi = polygon[i]
xj, yj = polygon[j]
# 检查点是否在多边形边上
if (yi == y and xi == x) or (yj == y and xj == x):
return True
# 射线法判断点是否在多边形内部
intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi)
if intersect:
inside = not inside
j = i
return inside
def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool:
"""检查点是否在障碍物中"""
for obstacle_x, obstacle_y in obstacles:
distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2)
if distance < self.grid_resolution:
return True
return False
def _calculate_smoothness(self, route: List[Tuple[float, float]]) -> float:
"""计算路径的平滑度(转向角度变化的总和)"""
if len(route) < 3:
return 0
total_angle_change = 0
for i in range(1, len(route) - 1):
v1 = (route[i][0] - route[i-1][0], route[i][1] - route[i-1][1])
v2 = (route[i+1][0] - route[i][0], route[i+1][1] - route[i][1])
# 归一化向量
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
if len1 > 0 and len2 > 0:
v1_norm = (v1[0]/len1, v1[1]/len1)
v2_norm = (v2[0]/len2, v2[1]/len2)
# 计算点积
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
dot_product = max(-1, min(1, dot_product)) # 限制范围
# 计算角度变化
angle_change = math.acos(dot_product)
total_angle_change += angle_change
return total_angle_change
def _selection(self, population: List[List[Tuple[float, float]]], fitness_scores: np.ndarray) -> List[List[Tuple[float, float]]]:
"""选择操作,使用轮盘赌(roulette wheel)方法"""
# 轮盘赌选择
selection_probs = fitness_scores / np.sum(fitness_scores)
selected_indices = np.random.choice(
len(population),
size=self.population_size,
p=selection_probs,
replace=True
)
return [population[i] for i in selected_indices]
def _crossover(self, route1: List[Tuple[float, float]], route2: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
"""交叉操作,综合两个父代路径"""
if len(route1) <= 2 or len(route2) <= 2:
return route1.copy() if len(route1) > len(route2) else route2.copy()
# 保证起点和终点
start = route1[0]
end = route1[-1]
# 从两条路径的中间点随机选择
mid_points1 = route1[1:-1]
mid_points2 = route2[1:-1]
# 选择交叉点
crossover_point1 = random.randint(0, len(mid_points1))
crossover_point2 = random.randint(0, len(mid_points2))
# 创建子代路径(保持起点和终点)
child = [start]
# 添加前半段和后半段
child.extend(mid_points1[:crossover_point1])
child.extend(mid_points2[crossover_point2:])
# 添加终点
child.append(end)
return child
def _mutation(self, route: List[Tuple[float, float]], map_width: int, map_height: int) -> List[Tuple[float, float]]:
"""变异操作,随机修改路径中的点"""
# 复制路径以避免修改原始路径
mutated_route = route.copy()
# 仅对中间点进行变异,保留起点和终点
for i in range(1, len(mutated_route) - 1):
# 对每个点以变异率进行变异
if random.random() < self.mutation_rate:
# 在原点附近随机偏移
delta_x = random.uniform(-50, 50)
delta_y = random.uniform(-50, 50)
new_x = max(0, min(map_width, mutated_route[i][0] + delta_x))
new_y = max(0, min(map_height, mutated_route[i][1] + delta_y))
mutated_route[i] = (new_x, new_y)
# 有一定概率添加或删除中间点
if random.random() < self.mutation_rate and len(mutated_route) > 3:
# 随机删除一个中间点
idx_to_remove = random.randint(1, len(mutated_route) - 2)
mutated_route.pop(idx_to_remove)
if random.random() < self.mutation_rate:
# 随机添加一个中间点
idx_to_add = random.randint(1, len(mutated_route) - 1)
prev_point = mutated_route[idx_to_add - 1]
next_point = mutated_route[idx_to_add]
# 在两点之间随机生成一个新点
new_x = (prev_point[0] + next_point[0]) / 2 + random.uniform(-20, 20)
new_y = (prev_point[1] + next_point[1]) / 2 + random.uniform(-20, 20)
new_x = max(0, min(map_width, new_x))
new_y = max(0, min(map_height, new_y))
mutated_route.insert(idx_to_add, (new_x, new_y))
return mutated_route
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""计算两点间距离"""
return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
def _smooth_path(self, route: List[Tuple[float, float]], start: Tuple[float, float], goal: Tuple[float, float]) -> List[Tuple[float, float]]:
"""平滑路径,去除冗余点"""
# 确保路径起点和终点正确
if route[0] != start:
route[0] = start
if route[-1] != goal:
route[-1] = goal
# 如果路径点太少,直接返回
if len(route) <= 2:
return route
# 移除共线的冗余点
smoothed_route = [route[0]] # 保留起点
for i in range(1, len(route) - 1):
prev = smoothed_route[-1]
curr = route[i]
next_point = route[i + 1]
# 检查三点是否共线(或接近共线)
v1 = (curr[0] - prev[0], curr[1] - prev[1])
v2 = (next_point[0] - curr[0], next_point[1] - curr[1])
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
# 避免除零错误
if len1 > 0.001 and len2 > 0.001:
# 归一化向量
v1_norm = (v1[0]/len1, v1[1]/len1)
v2_norm = (v2[0]/len2, v2[1]/len2)
# 计算点积
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
# 如果三点不共线,保留中间点
if abs(dot_product) < 0.98: # 角度差异大于约11度
smoothed_route.append(curr)
else:
# 如果点太近,可以考虑跳过
if len1 > self.grid_resolution or len2 > self.grid_resolution:
smoothed_route.append(curr)
smoothed_route.append(route[-1]) # 保留终点
return smoothed_route
def smooth_path(self, path: List[Tuple[float, float]],
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None,
weight_data: float = 0.5,
weight_smooth: float = 0.3,
tolerance: float = 0.000001) -> List[Tuple[float, float]]:
"""
使用平滑算法优化路径 (与A*接口兼容)
Args:
path: 原始路径
threat_areas: 危险区域
obstacles: 障碍物
weight_data: 数据权重
weight_smooth: 平滑权重
tolerance: 收敛阈值
Returns:
平滑后的路径
"""
# 如果路径点太少,不需要平滑
if len(path) <= 2:
return path
# 将路径转换为numpy数组方便操作
path_array = np.array(path)
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(2): # x, y
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
# 检查平滑后的路径是否经过危险区域或障碍物
for i in range(len(smooth_path)):
x, y = smooth_path[i]
if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)):
return path # 如果平滑后路径穿过危险区域,返回原路径
return [(x, y) for x, y in smooth_path]