You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
project/Src/command_center/path_planner.py

231 lines
7.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# File: path_planner.py
# Purpose: 路径规划的核心逻辑,包含调用不同路径规划算法的接口。
from typing import List, Tuple, Optional, Dict, Union
from .astar import AStar
from .genetic_algorithm import GeneticAlgorithm
from .rrt_algorithm import RRTAlgorithm
import numpy as np
import json
import time
class PathPlanner:
def __init__(self, algorithm="astar", grid_resolution=5.0):
"""
初始化路径规划器
Args:
algorithm: 使用的算法,可选 "astar""genetic""rrt"
grid_resolution: 网格分辨率
"""
self.algorithm = algorithm
self.grid_resolution = grid_resolution
# 根据指定算法创建规划器
self.planner = self._create_planner(algorithm, grid_resolution)
# 初始化路径、障碍物和危险区域
self.current_paths: Dict[str, List[Tuple[float, float]]] = {}
self.obstacles: List[Tuple[float, float]] = []
self.threat_areas: List[Dict] = []
# 算法特定参数
self.ga_population_size = 100
self.ga_generations = 100
self.rrt_step_size = 20
self.rrt_max_iterations = 1000
def _create_planner(self, algorithm, grid_resolution):
"""创建对应算法的规划器实例"""
if algorithm == "genetic":
return GeneticAlgorithm(grid_resolution=grid_resolution)
elif algorithm == "rrt":
return RRTAlgorithm(grid_resolution=grid_resolution)
else: # 默认使用 A*
return AStar(grid_resolution=grid_resolution)
def set_algorithm(self, algorithm: str):
"""设置使用的算法"""
if algorithm != self.algorithm:
self.algorithm = algorithm
self.planner = self._create_planner(algorithm, self.grid_resolution)
# 同时设置算法特定参数
if algorithm == "genetic":
self.planner.population_size = self.ga_population_size
self.planner.generations = self.ga_generations
elif algorithm == "rrt":
self.planner.step_size = self.rrt_step_size
self.planner.max_iterations = self.rrt_max_iterations
def plan_path(self,
drone_id: str,
start: Union[Tuple[float, float, float], Tuple[float, float]],
goal: Union[Tuple[float, float, float], Tuple[float, float]],
map_width: int = 1000,
map_height: int = 1000,
vehicle_length: float = 4.0,
vehicle_width: float = 2.0) -> Optional[List[Union[Tuple[float, float, float], Tuple[float, float]]]]:
"""
为指定无人机规划路径
参数:
drone_id: 无人机ID
start: 起点位置(x, y) 或 (x, y, theta)
goal: 终点位置(x, y) 或 (x, y, theta)
map_width: 地图宽度
map_height: 地图高度
vehicle_length: 车辆长度
vehicle_width: 车辆宽度
返回:
路径点列表 [(x, y), ...]如果找不到路径返回None
"""
# 确保坐标不包含航向信息
start_xy = start[:2] if len(start) >= 2 else start
goal_xy = goal[:2] if len(goal) >= 2 else goal
# 设置算法特定参数
if self.algorithm == "genetic":
self.planner.population_size = self.ga_population_size
self.planner.generations = self.ga_generations
elif self.algorithm == "rrt":
self.planner.step_size = self.rrt_step_size
self.planner.max_iterations = self.rrt_max_iterations
# 规划路径
path = self.planner.plan(
start=start_xy,
goal=goal_xy,
map_width=map_width,
map_height=map_height,
threat_areas=self.threat_areas,
obstacles=self.obstacles
)
if path:
# 平滑路径
smoothed_path = self.planner.smooth_path(
path=path,
threat_areas=self.threat_areas,
obstacles=self.obstacles
)
self.current_paths[drone_id] = smoothed_path
return smoothed_path
return None
def add_obstacle_point(self, x: float, y: float, radius: float = 30.0):
"""
添加一个点状障碍物
Args:
x: 点的x坐标
y: 点的y坐标
radius: 点的影响半径
"""
# 将点状障碍物转换为圆形危险区域
area = {
'type': 'circle',
'center': (x, y),
'radius': radius
}
self.threat_areas.append(area)
# 同时添加到障碍物列表
self.obstacles.append((x, y))
def add_obstacle_circle(self, x: float, y: float, radius: float):
"""
添加一个圆形障碍物
Args:
x: 圆心的x坐标
y: 圆心的y坐标
radius: 圆的半径
"""
area = {
'type': 'circle',
'center': (x, y),
'radius': radius
}
self.threat_areas.append(area)
def add_obstacle_rectangle(self, x1: float, y1: float, x2: float, y2: float):
"""
添加一个矩形障碍物
Args:
x1: 左上角x坐标
y1: 左上角y坐标
x2: 右下角x坐标
y2: 右下角y坐标
"""
# 确保(x1,y1)是左上角,(x2,y2)是右下角
top_left_x = min(x1, x2)
top_left_y = min(y1, y2)
width = abs(x2 - x1)
height = abs(y2 - y1)
area = {
'type': 'rectangle',
'rect': (top_left_x, top_left_y, width, height)
}
self.threat_areas.append(area)
def add_obstacle_polygon(self, points: List[Tuple[float, float]]):
"""
添加一个多边形障碍物
Args:
points: 多边形顶点列表 [(x1,y1), (x2,y2), ...]
"""
if len(points) >= 3: # 确保至少有3个点
area = {
'type': 'polygon',
'points': points
}
self.threat_areas.append(area)
def clear_obstacles(self):
"""清除所有障碍物和危险区域"""
self.obstacles.clear()
self.threat_areas.clear()
def update_obstacles(self, obstacles: List[Tuple[float, float]]):
"""更新障碍物列表"""
self.obstacles = obstacles
def update_threat_areas(self, threat_areas: List[Dict]):
"""更新危险区域列表"""
self.threat_areas = threat_areas
def get_current_path(self, drone_id: str) -> Optional[List[Tuple[float, float]]]:
"""获取指定无人机的当前路径"""
return self.current_paths.get(drone_id)
def clear_path(self, drone_id: str):
"""清除指定无人机的路径"""
if drone_id in self.current_paths:
del self.current_paths[drone_id]
def save_path(self, drone_id: str, filename: str):
"""保存路径到文件"""
if drone_id in self.current_paths:
path_data = {
'drone_id': drone_id,
'timestamp': time.time(),
'path': self.current_paths[drone_id]
}
with open(filename, 'w') as f:
json.dump(path_data, f)
def load_path(self, filename: str) -> Optional[str]:
"""从文件加载路径"""
try:
with open(filename, 'r') as f:
path_data = json.load(f)
self.current_paths[path_data['drone_id']] = path_data['path']
return path_data['drone_id']
except:
return None