|
|
# -*- coding: utf-8 -*-
|
|
|
# File: path_planner.py
|
|
|
# Purpose: 路径规划的核心逻辑,包含调用不同路径规划算法的接口。
|
|
|
|
|
|
from typing import List, Tuple, Optional, Dict, Union
|
|
|
from .astar import AStar
|
|
|
from .genetic_algorithm import GeneticAlgorithm
|
|
|
from .rrt_algorithm import RRTAlgorithm
|
|
|
import numpy as np
|
|
|
import json
|
|
|
import time
|
|
|
|
|
|
class PathPlanner:
|
|
|
def __init__(self, algorithm="astar", grid_resolution=5.0):
|
|
|
"""
|
|
|
初始化路径规划器
|
|
|
|
|
|
Args:
|
|
|
algorithm: 使用的算法,可选 "astar"、"genetic" 或 "rrt"
|
|
|
grid_resolution: 网格分辨率
|
|
|
"""
|
|
|
self.algorithm = algorithm
|
|
|
self.grid_resolution = grid_resolution
|
|
|
|
|
|
# 根据指定算法创建规划器
|
|
|
self.planner = self._create_planner(algorithm, grid_resolution)
|
|
|
|
|
|
# 初始化路径、障碍物和危险区域
|
|
|
self.current_paths: Dict[str, List[Tuple[float, float]]] = {}
|
|
|
self.obstacles: List[Tuple[float, float]] = []
|
|
|
self.threat_areas: List[Dict] = []
|
|
|
|
|
|
# 算法特定参数
|
|
|
self.ga_population_size = 100
|
|
|
self.ga_generations = 100
|
|
|
self.rrt_step_size = 20
|
|
|
self.rrt_max_iterations = 1000
|
|
|
|
|
|
def _create_planner(self, algorithm, grid_resolution):
|
|
|
"""创建对应算法的规划器实例"""
|
|
|
if algorithm == "genetic":
|
|
|
return GeneticAlgorithm(grid_resolution=grid_resolution)
|
|
|
elif algorithm == "rrt":
|
|
|
return RRTAlgorithm(grid_resolution=grid_resolution)
|
|
|
else: # 默认使用 A*
|
|
|
return AStar(grid_resolution=grid_resolution)
|
|
|
|
|
|
def set_algorithm(self, algorithm: str):
|
|
|
"""设置使用的算法"""
|
|
|
if algorithm != self.algorithm:
|
|
|
self.algorithm = algorithm
|
|
|
self.planner = self._create_planner(algorithm, self.grid_resolution)
|
|
|
|
|
|
# 同时设置算法特定参数
|
|
|
if algorithm == "genetic":
|
|
|
self.planner.population_size = self.ga_population_size
|
|
|
self.planner.generations = self.ga_generations
|
|
|
elif algorithm == "rrt":
|
|
|
self.planner.step_size = self.rrt_step_size
|
|
|
self.planner.max_iterations = self.rrt_max_iterations
|
|
|
|
|
|
def plan_path(self,
|
|
|
drone_id: str,
|
|
|
start: Union[Tuple[float, float, float], Tuple[float, float]],
|
|
|
goal: Union[Tuple[float, float, float], Tuple[float, float]],
|
|
|
map_width: int = 1000,
|
|
|
map_height: int = 1000,
|
|
|
vehicle_length: float = 4.0,
|
|
|
vehicle_width: float = 2.0) -> Optional[List[Union[Tuple[float, float, float], Tuple[float, float]]]]:
|
|
|
"""
|
|
|
为指定无人机规划路径
|
|
|
|
|
|
参数:
|
|
|
drone_id: 无人机ID
|
|
|
start: 起点位置(x, y) 或 (x, y, theta)
|
|
|
goal: 终点位置(x, y) 或 (x, y, theta)
|
|
|
map_width: 地图宽度
|
|
|
map_height: 地图高度
|
|
|
vehicle_length: 车辆长度
|
|
|
vehicle_width: 车辆宽度
|
|
|
|
|
|
返回:
|
|
|
路径点列表 [(x, y), ...],如果找不到路径返回None
|
|
|
"""
|
|
|
# 确保坐标不包含航向信息
|
|
|
start_xy = start[:2] if len(start) >= 2 else start
|
|
|
goal_xy = goal[:2] if len(goal) >= 2 else goal
|
|
|
|
|
|
# 设置算法特定参数
|
|
|
if self.algorithm == "genetic":
|
|
|
self.planner.population_size = self.ga_population_size
|
|
|
self.planner.generations = self.ga_generations
|
|
|
elif self.algorithm == "rrt":
|
|
|
self.planner.step_size = self.rrt_step_size
|
|
|
self.planner.max_iterations = self.rrt_max_iterations
|
|
|
|
|
|
# 规划路径
|
|
|
path = self.planner.plan(
|
|
|
start=start_xy,
|
|
|
goal=goal_xy,
|
|
|
map_width=map_width,
|
|
|
map_height=map_height,
|
|
|
threat_areas=self.threat_areas,
|
|
|
obstacles=self.obstacles
|
|
|
)
|
|
|
|
|
|
if path:
|
|
|
# 平滑路径
|
|
|
smoothed_path = self.planner.smooth_path(
|
|
|
path=path,
|
|
|
threat_areas=self.threat_areas,
|
|
|
obstacles=self.obstacles
|
|
|
)
|
|
|
self.current_paths[drone_id] = smoothed_path
|
|
|
return smoothed_path
|
|
|
|
|
|
return None
|
|
|
|
|
|
def add_obstacle_point(self, x: float, y: float, radius: float = 30.0):
|
|
|
"""
|
|
|
添加一个点状障碍物
|
|
|
|
|
|
Args:
|
|
|
x: 点的x坐标
|
|
|
y: 点的y坐标
|
|
|
radius: 点的影响半径
|
|
|
"""
|
|
|
# 将点状障碍物转换为圆形危险区域
|
|
|
area = {
|
|
|
'type': 'circle',
|
|
|
'center': (x, y),
|
|
|
'radius': radius
|
|
|
}
|
|
|
self.threat_areas.append(area)
|
|
|
# 同时添加到障碍物列表
|
|
|
self.obstacles.append((x, y))
|
|
|
|
|
|
def add_obstacle_circle(self, x: float, y: float, radius: float):
|
|
|
"""
|
|
|
添加一个圆形障碍物
|
|
|
|
|
|
Args:
|
|
|
x: 圆心的x坐标
|
|
|
y: 圆心的y坐标
|
|
|
radius: 圆的半径
|
|
|
"""
|
|
|
area = {
|
|
|
'type': 'circle',
|
|
|
'center': (x, y),
|
|
|
'radius': radius
|
|
|
}
|
|
|
self.threat_areas.append(area)
|
|
|
|
|
|
def add_obstacle_rectangle(self, x1: float, y1: float, x2: float, y2: float):
|
|
|
"""
|
|
|
添加一个矩形障碍物
|
|
|
|
|
|
Args:
|
|
|
x1: 左上角x坐标
|
|
|
y1: 左上角y坐标
|
|
|
x2: 右下角x坐标
|
|
|
y2: 右下角y坐标
|
|
|
"""
|
|
|
# 确保(x1,y1)是左上角,(x2,y2)是右下角
|
|
|
top_left_x = min(x1, x2)
|
|
|
top_left_y = min(y1, y2)
|
|
|
width = abs(x2 - x1)
|
|
|
height = abs(y2 - y1)
|
|
|
|
|
|
area = {
|
|
|
'type': 'rectangle',
|
|
|
'rect': (top_left_x, top_left_y, width, height)
|
|
|
}
|
|
|
self.threat_areas.append(area)
|
|
|
|
|
|
def add_obstacle_polygon(self, points: List[Tuple[float, float]]):
|
|
|
"""
|
|
|
添加一个多边形障碍物
|
|
|
|
|
|
Args:
|
|
|
points: 多边形顶点列表 [(x1,y1), (x2,y2), ...]
|
|
|
"""
|
|
|
if len(points) >= 3: # 确保至少有3个点
|
|
|
area = {
|
|
|
'type': 'polygon',
|
|
|
'points': points
|
|
|
}
|
|
|
self.threat_areas.append(area)
|
|
|
|
|
|
def clear_obstacles(self):
|
|
|
"""清除所有障碍物和危险区域"""
|
|
|
self.obstacles.clear()
|
|
|
self.threat_areas.clear()
|
|
|
|
|
|
def update_obstacles(self, obstacles: List[Tuple[float, float]]):
|
|
|
"""更新障碍物列表"""
|
|
|
self.obstacles = obstacles
|
|
|
|
|
|
def update_threat_areas(self, threat_areas: List[Dict]):
|
|
|
"""更新危险区域列表"""
|
|
|
self.threat_areas = threat_areas
|
|
|
|
|
|
def get_current_path(self, drone_id: str) -> Optional[List[Tuple[float, float]]]:
|
|
|
"""获取指定无人机的当前路径"""
|
|
|
return self.current_paths.get(drone_id)
|
|
|
|
|
|
def clear_path(self, drone_id: str):
|
|
|
"""清除指定无人机的路径"""
|
|
|
if drone_id in self.current_paths:
|
|
|
del self.current_paths[drone_id]
|
|
|
|
|
|
def save_path(self, drone_id: str, filename: str):
|
|
|
"""保存路径到文件"""
|
|
|
if drone_id in self.current_paths:
|
|
|
path_data = {
|
|
|
'drone_id': drone_id,
|
|
|
'timestamp': time.time(),
|
|
|
'path': self.current_paths[drone_id]
|
|
|
}
|
|
|
with open(filename, 'w') as f:
|
|
|
json.dump(path_data, f)
|
|
|
|
|
|
def load_path(self, filename: str) -> Optional[str]:
|
|
|
"""从文件加载路径"""
|
|
|
try:
|
|
|
with open(filename, 'r') as f:
|
|
|
path_data = json.load(f)
|
|
|
self.current_paths[path_data['drone_id']] = path_data['path']
|
|
|
return path_data['drone_id']
|
|
|
except:
|
|
|
return None |