功能实现1.0

pull/15/head
topfive 7 months ago
parent b32501dad6
commit 368b9eca39

@ -0,0 +1,3 @@
{
"cmake.ignoreCMakeListsMissing": true
}

@ -0,0 +1,271 @@
import mysql.connector
from mysql.connector import Error
import json
import os
from PyQt5.QtGui import QColor
def load_db_config():
# 获取当前文件的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建配置文件的完整路径
config_path = os.path.join(current_dir, 'db_config.json')
with open(config_path, 'r') as config_file:
config = json.load(config_file)
return config
def create_connection():
"""进行数据库连接"""
connection = None
try:
config = load_db_config()
connection = mysql.connector.connect(
host=config['host'], # 数据库主机地址
user=config['user'], # 数据库用户名
password=config['password'], # 数据库密码
database=config['database'] # 数据库名称
)
if connection.is_connected():
print("Connected to MySQL database")
except Error as e:
print(f"Error: '{e}' occurred")
return connection
def close_connection(connection):
"""关闭连接"""
if connection.is_connected():
connection.close()
print("MySQL connection is closed")
def query_user(username, password):
"""查询用户信息"""
connection = create_connection()
user_exists = False
try:
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
query = "SELECT * FROM users WHERE username = %s AND password = %s"
cursor.execute(query, (username, password))
result = cursor.fetchone()
if result:
user_exists = True
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return user_exists
def register_user(username, password):
"""注册新用户"""
connection = create_connection()
success = False
try:
if connection.is_connected():
cursor = connection.cursor()
query = "INSERT INTO users (username, password) VALUES (%s, %s)"
cursor.execute(query, (username, password))
connection.commit()
success = True
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return success
def save_danger_zone(area):
"""保存危险区域到数据库,避免重复"""
if danger_zone_exists(area):
print("危险区域已存在,跳过存储。")
return False
connection = create_connection()
success = False
try:
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
# 标准化坐标
if area['type'] == 'circle':
coordinates = json.dumps({
'center': [round(c, 6) for c in area.get('center', (0, 0))],
'radius': round(area.get('radius', 50), 2)
})
elif area['type'] == 'rectangle':
coordinates = json.dumps({
'rect': [round(r, 6) for r in area.get('rect', (0, 0, 100, 100))]
})
elif area['type'] == 'polygon':
coordinates = json.dumps({
'points': [[round(p[0], 6), round(p[1], 6)] for p in area.get('points', [])]
})
else:
return False
# 检查是否已存在
check_query = """
SELECT id FROM danger_zones WHERE type = %s AND coordinates = %s
"""
cursor.execute(check_query, (area['type'], coordinates))
result = cursor.fetchone()
if result:
print("Danger zone already exists in the database.")
return False
# 插入新记录
insert_query = """
INSERT INTO danger_zones (type, coordinates, color)
VALUES (%s, %s, %s)
"""
color = area['color'].name() # 假设QColor对象
cursor.execute(insert_query, (area['type'], coordinates, color))
connection.commit()
success = True
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return success
def load_danger_zones():
"""从数据库加载危险区域信息"""
connection = create_connection()
danger_zones = []
try:
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
query = "SELECT type, coordinates, color FROM danger_zones"
cursor.execute(query)
results = cursor.fetchall()
for row in results:
coordinates = json.loads(row['coordinates'])
if row['type'] == 'circle':
zone = {
'type': row['type'],
'center': coordinates['center'],
'radius': coordinates['radius'],
'color': QColor(row['color'])
}
elif row['type'] == 'rectangle':
zone = {
'type': row['type'],
'rect': coordinates['rect'],
'color': QColor(row['color'])
}
elif row['type'] == 'polygon':
zone = {
'type': row['type'],
'points': coordinates['points'],
'color': QColor(row['color'])
}
danger_zones.append(zone)
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return danger_zones
def clear_danger_zones():
"""删除数据库中的所有危险区域数据"""
connection = create_connection()
print("clear_danger_zones called")
try:
if connection.is_connected():
cursor = connection.cursor()
query = "DELETE FROM danger_zones"
cursor.execute(query)
connection.commit()
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
def save_path(drone_id, path):
"""保存路径信息到数据库"""
connection = create_connection()
success = False
print("save_path called")
try:
if connection.is_connected():
cursor = connection.cursor()
query = """
INSERT INTO paths (drone_id, path)
VALUES (%s, %s)
"""
path_str = json.dumps(path) # 将路径转换为JSON字符串
cursor.execute(query, (drone_id, path_str))
connection.commit()
success = True
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return success
def clear_paths(drone_id):
"""删除数据库中与特定无人机ID相关的路径数据"""
connection = create_connection()
try:
if connection.is_connected():
cursor = connection.cursor()
query = "DELETE FROM paths WHERE drone_id = %s"
cursor.execute(query, (drone_id,))
connection.commit()
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
def danger_zone_exists(area):
"""检查数据库中是否存在相同的危险区域"""
connection = create_connection()
exists = False
try:
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
# 标准化坐标
if area['type'] == 'circle':
coordinates = json.dumps({
'center': [round(c, 6) for c in area.get('center', (0, 0))],
'radius': round(area.get('radius', 50), 2)
})
elif area['type'] == 'rectangle':
coordinates = json.dumps({
'rect': [round(r, 6) for r in area.get('rect', (0, 0, 100, 100))]
})
elif area['type'] == 'polygon':
coordinates = json.dumps({
'points': [[round(p[0], 6), round(p[1], 6)] for p in area.get('points', [])]
})
else:
return False
# 检查是否已存在
check_query = """
SELECT id FROM danger_zones WHERE type = %s AND coordinates = %s
"""
cursor.execute(check_query, (area['type'], coordinates))
result = cursor.fetchone()
print(result)
exists = result is not None
print(exists)
cursor.close()
except Error as e:
print(f"Error: '{e}' occurred")
finally:
if connection.is_connected():
close_connection(connection)
return exists

@ -0,0 +1,6 @@
{
"host": "localhost",
"user": "root",
"password": "227622",
"database": "unv"
}

@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL
);
CREATE TABLE IF NOT EXISTS danger_zones (
id INT AUTO_INCREMENT PRIMARY KEY,
type VARCHAR(50) NOT NULL,
coordinates JSON NOT NULL,
color VARCHAR(20) NOT NULL
);
CREATE TABLE IF NOT EXISTS paths (
id INT AUTO_INCREMENT PRIMARY KEY,
drone_id VARCHAR(50) NOT NULL,
path JSON NOT NULL
);

@ -4,24 +4,22 @@
# 负责初始化UI、处理登录、创建数据模型和协调各个功能模块。
import sys
import traceback
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QTabWidget, QPushButton, QLabel,
QGroupBox, QComboBox, QSpinBox, QDoubleSpinBox,
QProgressBar, QCheckBox, QMessageBox, QFileDialog)
QProgressBar, QCheckBox, QMessageBox, QFileDialog,
QStatusBar, QStackedWidget)
from PyQt5.QtCore import Qt, pyqtSignal, QThread, QObject
from PyQt5.QtGui import QFont, QPixmap, QPainter, QPen, QColor
from ui.login_view import LoginView
from ui.simple_map_view import SimpleMapView
from ui.threat_layer_view import ThreatLayerView
from ui.path_layer_view import PathLayerView
from ui.drone_list_view import DroneListView
from ui.drone_detail_view import DroneDetailView
from ui.status_dashboard import StatusDashboard
from ui.path_planning_view import PathPlanningView
from ui.algorithm_config_view import AlgorithmConfigView
from communication.communication_manager import CommunicationManager
from ui.base_map_view import BaseMapView
from ui.map_data_model import MapDataModel
from ui.integrated_map_view import IntegratedMapView
from ui.drone_management import DroneManagementWindow
class PathPlanningThread(QThread):
path_planned = pyqtSignal(list) # 规划完成后发出新路径
@ -76,31 +74,79 @@ class CommandCenterApp(QMainWindow):
# 创建主布局
main_layout = QVBoxLayout(central_widget)
main_layout.setContentsMargins(0, 0, 0, 0)
# 创建标签页
self.tab_widget = QTabWidget()
# 创建堆叠部件用于切换界面
self.stacked_widget = QStackedWidget()
main_layout.addWidget(self.stacked_widget)
# 创建地图数据模型
self.map_data_model = MapDataModel()
# 创建无人机管理界面
self.drone_management = DroneManagementWindow()
self.drone_management.switch_to_map_view.connect(self.switch_to_map_view)
self.stacked_widget.addWidget(self.drone_management)
# 添加各个功能标签页
self.tab_widget.addTab(SimpleMapView(self.map_data_model), "地图视图")
self.tab_widget.addTab(ThreatLayerView(self.map_data_model), "威胁层")
self.tab_widget.addTab(PathLayerView(self.map_data_model), "路径层")
self.tab_widget.addTab(DroneListView(), "无人机列表")
self.tab_widget.addTab(DroneDetailView(), "无人机详情")
self.tab_widget.addTab(StatusDashboard(), "状态仪表板")
self.tab_widget.addTab(PathPlanningView(), "路径规划")
self.tab_widget.addTab(AlgorithmConfigView(), "算法配置")
# 创建地图数据模型和集成地图视图
self.map_data_model = self.drone_management.map_data_model
main_layout.addWidget(self.tab_widget)
# 创建地图界面容器
map_container = QWidget()
map_layout = QVBoxLayout(map_container)
map_layout.setContentsMargins(0, 0, 0, 0)
# 创建顶部工具栏
top_toolbar = QHBoxLayout()
top_toolbar.setContentsMargins(10, 5, 10, 5)
back_btn = QPushButton("返回无人机管理")
back_btn.clicked.connect(self.switch_to_drone_management)
title_label = QLabel("无人机路径规划")
title_label.setAlignment(Qt.AlignCenter)
title_font = QFont()
title_font.setPointSize(14)
title_font.setBold(True)
title_label.setFont(title_font)
# 添加到顶部工具栏
top_toolbar.addWidget(back_btn)
top_toolbar.addStretch()
top_toolbar.addWidget(title_label)
top_toolbar.addStretch()
top_toolbar.addStretch() # 为了平衡布局
# 创建集成地图视图
self.integrated_map_view = IntegratedMapView(self.map_data_model, self.drone_management.drone_manager)
# 添加到地图容器
map_layout.addLayout(top_toolbar)
map_layout.addWidget(self.integrated_map_view)
# 添加到堆叠部件
self.stacked_widget.addWidget(map_container)
# 创建状态栏
self.statusBar = QStatusBar()
self.setStatusBar(self.statusBar)
# 连接登录成功信号
self.login_successful.connect(self.on_login_successful)
# 默认显示无人机管理界面
self.stacked_widget.setCurrentIndex(0)
# 显示登录窗口
self.show_login()
def switch_to_map_view(self):
"""切换到地图视图"""
# 切换到地图视图前刷新无人机列表
self.integrated_map_view.refresh_drone_list()
self.stacked_widget.setCurrentIndex(1)
def switch_to_drone_management(self):
"""切换到无人机管理视图"""
self.stacked_widget.setCurrentIndex(0)
def show_login(self):
self.login_view = LoginView()
self.login_view.login_successful.connect(self.on_login_successful)
@ -122,32 +168,50 @@ class CommandCenterApp(QMainWindow):
try:
# 暂时跳过数据库连接和通信管理器的初始化
print("通信管理器初始化已跳过")
self.update_drone_list()
except Exception as e:
QMessageBox.critical(self, "错误", f"初始化通信失败: {str(e)}")
def update_drone_list(self):
"""更新无人机列表"""
try:
# 暂时使用空列表
connected_drones = []
# 更新无人机列表视图
drone_list_view = self.tab_widget.widget(3) # DroneListView的索引
if isinstance(drone_list_view, DroneListView):
drone_list_view.update_drone_list(connected_drones)
except Exception as e:
print(f"更新无人机列表失败: {str(e)}")
def closeEvent(self, event):
"""关闭窗口时的处理"""
if self.comm_manager:
self.comm_manager.stop()
event.accept()
# 全局异常处理器
def exception_hook(exctype, value, tb):
"""全局异常捕获防止PyQt应用崩溃"""
error_msg = ''.join(traceback.format_exception(exctype, value, tb))
print(error_msg) # 打印到控制台
# 显示错误对话框
error_box = QMessageBox()
error_box.setIcon(QMessageBox.Critical)
error_box.setWindowTitle("应用程序错误")
error_box.setText("程序遇到了一个错误并无法继续。")
error_box.setInformativeText("详细错误信息:")
error_box.setDetailedText(error_msg)
error_box.setStandardButtons(QMessageBox.Ok)
error_box.exec_()
# 调用原始的异常处理器
sys.__excepthook__(exctype, value, tb)
if __name__ == '__main__':
# 设置全局异常钩子
sys.excepthook = exception_hook
app = QApplication(sys.argv)
window = CommandCenterApp()
sys.exit(app.exec_())
# 设置应用程序样式
app.setStyle('Fusion')
try:
window = CommandCenterApp()
sys.exit(app.exec_())
except Exception as e:
# 捕获主应用程序初始化过程中的任何错误
print(f"初始化失败: {str(e)}")
traceback.print_exc()
QMessageBox.critical(None, "启动错误", f"应用程序启动失败: {str(e)}")
sys.exit(1)

@ -0,0 +1,283 @@
# -*- coding: utf-8 -*-
# File: astar.py
# Purpose: 实现A*路径规划算法,支持避开危险区域
import heapq
import math
from typing import List, Tuple, Dict, Optional, Set
import numpy as np
from dataclasses import dataclass
@dataclass
class Node:
x: float
y: float
g_cost: float # 从起点到当前节点的实际代价
h_cost: float # 从当前节点到终点的估计代价
parent: Optional['Node'] = None
@property
def f_cost(self) -> float:
"""总代价"""
return self.g_cost + self.h_cost
def __lt__(self, other):
"""用于堆比较的方法"""
return self.f_cost < other.f_cost
def __eq__(self, other):
"""相等判断,用于检查是否是同一个位置"""
if not isinstance(other, Node):
return False
return abs(self.x - other.x) < 0.1 and abs(self.y - other.y) < 0.1
class AStar:
def __init__(self, grid_resolution: float = 5.0):
"""
初始化A*算法
Args:
grid_resolution: 网格分辨率值越小路径越精细但计算量越大
"""
self.grid_resolution = grid_resolution
self.directions = [
(1, 0), # 右
(0, 1), # 下
(-1, 0), # 左
(0, -1), # 上
(1, 1), # 右下
(-1, 1), # 左下
(-1, -1), # 左上
(1, -1) # 右上
]
def plan(self, start: Tuple[float, float],
goal: Tuple[float, float],
map_width: int, map_height: int,
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]:
"""
使用A*算法规划路径
Args:
start: 起点坐标 (x, y)
goal: 终点坐标 (x, y)
map_width: 地图宽度
map_height: 地图高度
threat_areas: 危险区域列表
obstacles: 障碍物列表
Returns:
路径点列表如果找不到路径返回空列表
"""
# 创建开放列表(优先队列)和关闭集合
open_list = []
closed_set: Set[Tuple[int, int]] = set()
# 创建起始节点
start_node = Node(
x=start[0],
y=start[1],
g_cost=0,
h_cost=self._heuristic(start, goal)
)
# 将起始节点加入开放列表
heapq.heappush(open_list, start_node)
# 主循环
while open_list:
# 从开放列表中取出f值最小的节点
current = heapq.heappop(open_list)
# 转换为网格坐标,用于检查是否在关闭列表中
grid_pos = self._to_grid_pos(current.x, current.y)
# 如果节点已在关闭列表中,跳过
if grid_pos in closed_set:
continue
# 将当前节点加入关闭列表
closed_set.add(grid_pos)
# 检查是否达到目标
if self._is_goal(current, goal):
return self._reconstruct_path(current)
# 扩展当前节点
for dx, dy in self.directions:
next_x = current.x + dx * self.grid_resolution
next_y = current.y + dy * self.grid_resolution
# 检查边界
if not (0 <= next_x < map_width and 0 <= next_y < map_height):
continue
# 检查是否在关闭列表中
next_grid_pos = self._to_grid_pos(next_x, next_y)
if next_grid_pos in closed_set:
continue
# 检查是否在障碍物中
if obstacles and self._is_in_obstacles(next_x, next_y, obstacles):
continue
# 检查是否在危险区域中
if self._is_in_threat_areas(next_x, next_y, threat_areas):
continue
# 计算新的g值
# 对角线移动距离为根号2直线移动为1
if dx != 0 and dy != 0:
move_cost = self.grid_resolution * 1.414 # 对角线移动成本更高
else:
move_cost = self.grid_resolution
new_g_cost = current.g_cost + move_cost
# 创建新节点
next_node = Node(
x=next_x,
y=next_y,
g_cost=new_g_cost,
h_cost=self._heuristic((next_x, next_y), goal),
parent=current
)
# 将新节点加入开放列表
heapq.heappush(open_list, next_node)
# 如果开放列表为空且没有找到路径,返回空列表
return []
def _to_grid_pos(self, x: float, y: float) -> Tuple[int, int]:
"""将浮点坐标转换为网格坐标"""
grid_x = int(x / self.grid_resolution)
grid_y = int(y / self.grid_resolution)
return (grid_x, grid_y)
def _heuristic(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""计算启发式函数值(欧几里得距离)"""
return math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)
def _is_goal(self, node: Node, goal: Tuple[float, float]) -> bool:
"""检查是否达到目标"""
return self._heuristic((node.x, node.y), goal) < self.grid_resolution * 1.5
def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool:
"""检查点是否在障碍物中"""
for obstacle_x, obstacle_y in obstacles:
distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2)
if distance < self.grid_resolution:
return True
return False
def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool:
"""检查点是否在危险区域中"""
for area in threat_areas:
area_type = area.get('type', '')
if area_type == 'circle':
center = area.get('center', (0, 0))
radius = area.get('radius', 0)
distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
if distance <= radius:
return True
elif area_type == 'rectangle':
rect = area.get('rect', (0, 0, 0, 0))
x1, y1, width, height = rect
if x1 <= x <= x1 + width and y1 <= y <= y1 + height:
return True
elif area_type == 'polygon':
points = area.get('points', [])
if self._point_in_polygon(x, y, points):
return True
return False
def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool:
"""检查点是否在多边形内(射线法)"""
if len(polygon) < 3:
return False
inside = False
j = len(polygon) - 1
for i in range(len(polygon)):
xi, yi = polygon[i]
xj, yj = polygon[j]
# 检查点是否在多边形边上
if (yi == y and xi == x) or (yj == y and xj == x):
return True
# 射线法判断点是否在多边形内部
intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi)
if intersect:
inside = not inside
j = i
return inside
def _reconstruct_path(self, node: Node) -> List[Tuple[float, float]]:
"""重建路径"""
path = []
current = node
while current:
path.append((current.x, current.y))
current = current.parent
return list(reversed(path))
def smooth_path(self, path: List[Tuple[float, float]],
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None,
weight_data: float = 0.5,
weight_smooth: float = 0.3,
tolerance: float = 0.000001) -> List[Tuple[float, float]]:
"""
使用平滑算法优化路径
Args:
path: 原始路径
threat_areas: 危险区域
obstacles: 障碍物
weight_data: 数据权重
weight_smooth: 平滑权重
tolerance: 收敛阈值
Returns:
平滑后的路径
"""
if len(path) <= 2:
return path
# 将路径转换为numpy数组方便操作
path_array = np.array(path)
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(2): # x, y
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
# 检查平滑后的路径是否经过危险区域或障碍物
# 如果是,则返回原始路径
for i in range(len(smooth_path)):
x, y = smooth_path[i]
if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)):
return path
return [(x, y) for x, y in smooth_path]

@ -0,0 +1,475 @@
# -*- coding: utf-8 -*-
# File: genetic_algorithm.py
# Purpose: 实现基于遗传算法的路径规划,支持避开危险区域
import numpy as np
import math
import random
from typing import List, Tuple, Dict, Optional
import time
class GeneticAlgorithm:
"""遗传算法路径规划器"""
def __init__(self,
grid_resolution: float = 5.0,
population_size: int = 100,
generations: int = 50,
mutation_rate: float = 0.1,
elite_size: int = 20,
crossover_rate: float = 0.8):
"""
初始化遗传算法
Args:
grid_resolution: 网格分辨率
population_size: 种群大小
generations: 最大迭代代数
mutation_rate: 变异率
elite_size: 精英个体数量
crossover_rate: 交叉率
"""
self.grid_resolution = grid_resolution
self.population_size = population_size
self.generations = generations
self.mutation_rate = mutation_rate
self.elite_size = elite_size
self.crossover_rate = crossover_rate
def plan(self, start: Tuple[float, float],
goal: Tuple[float, float],
map_width: int, map_height: int,
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]:
"""
使用遗传算法规划路径
Args:
start: 起点坐标 (x, y)
goal: 终点坐标 (x, y)
map_width: 地图宽度
map_height: 地图高度
threat_areas: 危险区域列表
obstacles: 障碍物列表
Returns:
路径点列表如果找不到路径返回空列表
"""
# 初始化种群
population = self._initialize_population(start, goal, map_width, map_height)
# 进化迭代
best_route = None
best_fitness = -1
for generation in range(self.generations):
# 评估种群适应度
fitness_scores = self._evaluate_fitness(population, start, goal, threat_areas, obstacles)
# 检查最佳个体
max_fitness_idx = np.argmax(fitness_scores)
if fitness_scores[max_fitness_idx] > best_fitness:
best_fitness = fitness_scores[max_fitness_idx]
best_route = population[max_fitness_idx]
# 如果找到了有效路径且适应度足够高,可以提前结束
if best_fitness > 0.8:
break
# 选择精英个体
elite_indices = np.argsort(fitness_scores)[-self.elite_size:]
elites = [population[i] for i in elite_indices]
# 选择父代
parents = self._selection(population, fitness_scores)
# 交叉和变异产生新一代
new_population = elites.copy() # 精英保留
while len(new_population) < self.population_size:
# 随机选择两个父代
parent1, parent2 = random.sample(parents, 2)
# 交叉
if random.random() < self.crossover_rate:
child = self._crossover(parent1, parent2)
else:
child = parent1.copy()
# 变异
child = self._mutation(child, map_width, map_height)
# 添加到新种群
new_population.append(child)
# 更新种群
population = new_population[:self.population_size]
# 返回最佳路径(如果找到)
if best_route:
return self._smooth_path(best_route, start, goal)
return []
def _initialize_population(self, start: Tuple[float, float], goal: Tuple[float, float],
map_width: int, map_height: int) -> List[List[Tuple[float, float]]]:
"""初始化种群"""
population = []
# 直接连接起点和终点的个体
direct_route = [start, goal]
population.append(direct_route)
# 随机生成其他个体
for _ in range(self.population_size - 1):
# 随机决定路径点数量2-10个中间点
num_waypoints = random.randint(2, 10)
# 创建包含起点和终点的路径
route = [start]
# 添加随机中间点
for _ in range(num_waypoints):
x = random.uniform(0, map_width)
y = random.uniform(0, map_height)
route.append((x, y))
route.append(goal)
population.append(route)
return population
def _evaluate_fitness(self, population: List[List[Tuple[float, float]]],
start: Tuple[float, float], goal: Tuple[float, float],
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> np.ndarray:
"""评估种群适应度"""
fitness_scores = np.zeros(len(population))
for i, route in enumerate(population):
# 检查路径是否包含起点和终点
if route[0] != start or route[-1] != goal:
fitness_scores[i] = 0
continue
# 路径长度(越短越好)
path_length = self._calculate_path_length(route)
length_fitness = 1.0 / (1.0 + path_length / 1000.0) # 归一化,短路径得高分
# 穿过危险区域的惩罚
danger_penalty = 0
for j in range(len(route) - 1):
segment_danger = self._segment_danger(route[j], route[j+1], threat_areas, obstacles)
danger_penalty += segment_danger
danger_fitness = 1.0 / (1.0 + danger_penalty) # 归一化,无危险得高分
# 路径平滑度(转向次数和角度变化)
smoothness = self._calculate_smoothness(route)
smoothness_fitness = 1.0 / (1.0 + smoothness) # 归一化,平滑路径得高分
# 综合评分(权重可调整)
fitness_scores[i] = 0.4 * length_fitness + 0.5 * danger_fitness + 0.1 * smoothness_fitness
return fitness_scores
def _calculate_path_length(self, route: List[Tuple[float, float]]) -> float:
"""计算路径总长度"""
length = 0
for i in range(len(route) - 1):
dx = route[i+1][0] - route[i][0]
dy = route[i+1][1] - route[i][1]
length += math.sqrt(dx*dx + dy*dy)
return length
def _segment_danger(self, p1: Tuple[float, float], p2: Tuple[float, float],
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> float:
"""计算路径段穿过危险区域的程度"""
danger = 0
# 采样点检测
num_samples = max(int(self._distance(p1, p2) / self.grid_resolution), 5)
for i in range(num_samples + 1):
t = i / num_samples
x = p1[0] + t * (p2[0] - p1[0])
y = p1[1] + t * (p2[1] - p1[1])
# 检查是否在危险区域内
if self._is_in_threat_areas(x, y, threat_areas):
danger += 1
# 检查是否与障碍物碰撞
if obstacles and self._is_in_obstacles(x, y, obstacles):
danger += 10 # 障碍物惩罚更高
return danger / (num_samples + 1) # 归一化
def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool:
"""检查点是否在危险区域中"""
for area in threat_areas:
area_type = area.get('type', '')
if area_type == 'circle':
center = area.get('center', (0, 0))
radius = area.get('radius', 0)
distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
if distance <= radius:
return True
elif area_type == 'rectangle':
rect = area.get('rect', (0, 0, 0, 0))
x1, y1, width, height = rect
if x1 <= x <= x1 + width and y1 <= y <= y1 + height:
return True
elif area_type == 'polygon':
points = area.get('points', [])
if self._point_in_polygon(x, y, points):
return True
return False
def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool:
"""检查点是否在多边形内(射线法)"""
if len(polygon) < 3:
return False
inside = False
j = len(polygon) - 1
for i in range(len(polygon)):
xi, yi = polygon[i]
xj, yj = polygon[j]
# 检查点是否在多边形边上
if (yi == y and xi == x) or (yj == y and xj == x):
return True
# 射线法判断点是否在多边形内部
intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi)
if intersect:
inside = not inside
j = i
return inside
def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool:
"""检查点是否在障碍物中"""
for obstacle_x, obstacle_y in obstacles:
distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2)
if distance < self.grid_resolution:
return True
return False
def _calculate_smoothness(self, route: List[Tuple[float, float]]) -> float:
"""计算路径的平滑度(转向角度变化的总和)"""
if len(route) < 3:
return 0
total_angle_change = 0
for i in range(1, len(route) - 1):
v1 = (route[i][0] - route[i-1][0], route[i][1] - route[i-1][1])
v2 = (route[i+1][0] - route[i][0], route[i+1][1] - route[i][1])
# 归一化向量
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
if len1 > 0 and len2 > 0:
v1_norm = (v1[0]/len1, v1[1]/len1)
v2_norm = (v2[0]/len2, v2[1]/len2)
# 计算点积
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
dot_product = max(-1, min(1, dot_product)) # 限制范围
# 计算角度变化
angle_change = math.acos(dot_product)
total_angle_change += angle_change
return total_angle_change
def _selection(self, population: List[List[Tuple[float, float]]], fitness_scores: np.ndarray) -> List[List[Tuple[float, float]]]:
"""选择操作,使用轮盘赌(roulette wheel)方法"""
# 轮盘赌选择
selection_probs = fitness_scores / np.sum(fitness_scores)
selected_indices = np.random.choice(
len(population),
size=self.population_size,
p=selection_probs,
replace=True
)
return [population[i] for i in selected_indices]
def _crossover(self, route1: List[Tuple[float, float]], route2: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
"""交叉操作,综合两个父代路径"""
if len(route1) <= 2 or len(route2) <= 2:
return route1.copy() if len(route1) > len(route2) else route2.copy()
# 保证起点和终点
start = route1[0]
end = route1[-1]
# 从两条路径的中间点随机选择
mid_points1 = route1[1:-1]
mid_points2 = route2[1:-1]
# 选择交叉点
crossover_point1 = random.randint(0, len(mid_points1))
crossover_point2 = random.randint(0, len(mid_points2))
# 创建子代路径(保持起点和终点)
child = [start]
# 添加前半段和后半段
child.extend(mid_points1[:crossover_point1])
child.extend(mid_points2[crossover_point2:])
# 添加终点
child.append(end)
return child
def _mutation(self, route: List[Tuple[float, float]], map_width: int, map_height: int) -> List[Tuple[float, float]]:
"""变异操作,随机修改路径中的点"""
# 复制路径以避免修改原始路径
mutated_route = route.copy()
# 仅对中间点进行变异,保留起点和终点
for i in range(1, len(mutated_route) - 1):
# 对每个点以变异率进行变异
if random.random() < self.mutation_rate:
# 在原点附近随机偏移
delta_x = random.uniform(-50, 50)
delta_y = random.uniform(-50, 50)
new_x = max(0, min(map_width, mutated_route[i][0] + delta_x))
new_y = max(0, min(map_height, mutated_route[i][1] + delta_y))
mutated_route[i] = (new_x, new_y)
# 有一定概率添加或删除中间点
if random.random() < self.mutation_rate and len(mutated_route) > 3:
# 随机删除一个中间点
idx_to_remove = random.randint(1, len(mutated_route) - 2)
mutated_route.pop(idx_to_remove)
if random.random() < self.mutation_rate:
# 随机添加一个中间点
idx_to_add = random.randint(1, len(mutated_route) - 1)
prev_point = mutated_route[idx_to_add - 1]
next_point = mutated_route[idx_to_add]
# 在两点之间随机生成一个新点
new_x = (prev_point[0] + next_point[0]) / 2 + random.uniform(-20, 20)
new_y = (prev_point[1] + next_point[1]) / 2 + random.uniform(-20, 20)
new_x = max(0, min(map_width, new_x))
new_y = max(0, min(map_height, new_y))
mutated_route.insert(idx_to_add, (new_x, new_y))
return mutated_route
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""计算两点间距离"""
return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
def _smooth_path(self, route: List[Tuple[float, float]], start: Tuple[float, float], goal: Tuple[float, float]) -> List[Tuple[float, float]]:
"""平滑路径,去除冗余点"""
# 确保路径起点和终点正确
if route[0] != start:
route[0] = start
if route[-1] != goal:
route[-1] = goal
# 如果路径点太少,直接返回
if len(route) <= 2:
return route
# 移除共线的冗余点
smoothed_route = [route[0]] # 保留起点
for i in range(1, len(route) - 1):
prev = smoothed_route[-1]
curr = route[i]
next_point = route[i + 1]
# 检查三点是否共线(或接近共线)
v1 = (curr[0] - prev[0], curr[1] - prev[1])
v2 = (next_point[0] - curr[0], next_point[1] - curr[1])
len1 = math.sqrt(v1[0]*v1[0] + v1[1]*v1[1])
len2 = math.sqrt(v2[0]*v2[0] + v2[1]*v2[1])
# 避免除零错误
if len1 > 0.001 and len2 > 0.001:
# 归一化向量
v1_norm = (v1[0]/len1, v1[1]/len1)
v2_norm = (v2[0]/len2, v2[1]/len2)
# 计算点积
dot_product = v1_norm[0]*v2_norm[0] + v1_norm[1]*v2_norm[1]
# 如果三点不共线,保留中间点
if abs(dot_product) < 0.98: # 角度差异大于约11度
smoothed_route.append(curr)
else:
# 如果点太近,可以考虑跳过
if len1 > self.grid_resolution or len2 > self.grid_resolution:
smoothed_route.append(curr)
smoothed_route.append(route[-1]) # 保留终点
return smoothed_route
def smooth_path(self, path: List[Tuple[float, float]],
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None,
weight_data: float = 0.5,
weight_smooth: float = 0.3,
tolerance: float = 0.000001) -> List[Tuple[float, float]]:
"""
使用平滑算法优化路径 (与A*接口兼容)
Args:
path: 原始路径
threat_areas: 危险区域
obstacles: 障碍物
weight_data: 数据权重
weight_smooth: 平滑权重
tolerance: 收敛阈值
Returns:
平滑后的路径
"""
# 如果路径点太少,不需要平滑
if len(path) <= 2:
return path
# 将路径转换为numpy数组方便操作
path_array = np.array(path)
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(2): # x, y
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
# 检查平滑后的路径是否经过危险区域或障碍物
for i in range(len(smooth_path)):
x, y = smooth_path[i]
if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)):
return path # 如果平滑后路径穿过危险区域,返回原路径
return [(x, y) for x, y in smooth_path]

@ -1,57 +1,206 @@
# -*- coding: utf-8 -*-
# File: path_planner.py
# Purpose: 路径规划的核心逻辑,可能包含调用不同路径规划算法的接口。
# Purpose: 路径规划的核心逻辑,包含调用不同路径规划算法的接口。
from typing import List, Tuple, Optional, Dict
from .hybrid_astar import HybridAStar
from typing import List, Tuple, Optional, Dict, Union
from .astar import AStar
from .genetic_algorithm import GeneticAlgorithm
from .rrt_algorithm import RRTAlgorithm
import numpy as np
import json
import time
class PathPlanner:
def __init__(self):
self.planner = HybridAStar()
self.current_paths: Dict[str, List[Tuple[float, float, float]]] = {}
def __init__(self, algorithm="astar", grid_resolution=5.0):
"""
初始化路径规划器
Args:
algorithm: 使用的算法可选 "astar""genetic" "rrt"
grid_resolution: 网格分辨率
"""
self.algorithm = algorithm
self.grid_resolution = grid_resolution
# 根据指定算法创建规划器
self.planner = self._create_planner(algorithm, grid_resolution)
# 初始化路径、障碍物和危险区域
self.current_paths: Dict[str, List[Tuple[float, float]]] = {}
self.obstacles: List[Tuple[float, float]] = []
self.threat_areas: List[Dict] = []
# 算法特定参数
self.ga_population_size = 100
self.ga_generations = 100
self.rrt_step_size = 20
self.rrt_max_iterations = 1000
def _create_planner(self, algorithm, grid_resolution):
"""创建对应算法的规划器实例"""
if algorithm == "genetic":
return GeneticAlgorithm(grid_resolution=grid_resolution)
elif algorithm == "rrt":
return RRTAlgorithm(grid_resolution=grid_resolution)
else: # 默认使用 A*
return AStar(grid_resolution=grid_resolution)
def set_algorithm(self, algorithm: str):
"""设置使用的算法"""
if algorithm != self.algorithm:
self.algorithm = algorithm
self.planner = self._create_planner(algorithm, self.grid_resolution)
# 同时设置算法特定参数
if algorithm == "genetic":
self.planner.population_size = self.ga_population_size
self.planner.generations = self.ga_generations
elif algorithm == "rrt":
self.planner.step_size = self.rrt_step_size
self.planner.max_iterations = self.rrt_max_iterations
def plan_path(self,
drone_id: str,
start: Tuple[float, float, float],
goal: Tuple[float, float, float],
start: Union[Tuple[float, float, float], Tuple[float, float]],
goal: Union[Tuple[float, float, float], Tuple[float, float]],
map_width: int = 1000,
map_height: int = 1000,
vehicle_length: float = 4.0,
vehicle_width: float = 2.0) -> Optional[List[Tuple[float, float, float]]]:
vehicle_width: float = 2.0) -> Optional[List[Union[Tuple[float, float, float], Tuple[float, float]]]]:
"""
为指定无人机规划路径
参数:
drone_id: 无人机ID
start: (x, y, theta) 起点位置和航向
goal: (x, y, theta) 终点位置和航向
start: 起点位置(x, y) (x, y, theta)
goal: 终点位置(x, y) (x, y, theta)
map_width: 地图宽度
map_height: 地图高度
vehicle_length: 车辆长度
vehicle_width: 车辆宽度
返回:
路径点列表 [(x, y, theta), ...] None如果找不到路径
路径点列表 [(x, y), ...]如果找不到路径返回None
"""
# 确保坐标不包含航向信息
start_xy = start[:2] if len(start) >= 2 else start
goal_xy = goal[:2] if len(goal) >= 2 else goal
# 设置算法特定参数
if self.algorithm == "genetic":
self.planner.population_size = self.ga_population_size
self.planner.generations = self.ga_generations
elif self.algorithm == "rrt":
self.planner.step_size = self.rrt_step_size
self.planner.max_iterations = self.rrt_max_iterations
# 规划路径
path = self.planner.plan(
start=start,
goal=goal,
obstacles=self.obstacles,
vehicle_length=vehicle_length,
vehicle_width=vehicle_width
start=start_xy,
goal=goal_xy,
map_width=map_width,
map_height=map_height,
threat_areas=self.threat_areas,
obstacles=self.obstacles
)
if path:
self.current_paths[drone_id] = path
return path
# 平滑路径
smoothed_path = self.planner.smooth_path(
path=path,
threat_areas=self.threat_areas,
obstacles=self.obstacles
)
self.current_paths[drone_id] = smoothed_path
return smoothed_path
return None
def add_obstacle_point(self, x: float, y: float, radius: float = 30.0):
"""
添加一个点状障碍物
Args:
x: 点的x坐标
y: 点的y坐标
radius: 点的影响半径
"""
# 将点状障碍物转换为圆形危险区域
area = {
'type': 'circle',
'center': (x, y),
'radius': radius
}
self.threat_areas.append(area)
# 同时添加到障碍物列表
self.obstacles.append((x, y))
def add_obstacle_circle(self, x: float, y: float, radius: float):
"""
添加一个圆形障碍物
Args:
x: 圆心的x坐标
y: 圆心的y坐标
radius: 圆的半径
"""
area = {
'type': 'circle',
'center': (x, y),
'radius': radius
}
self.threat_areas.append(area)
def add_obstacle_rectangle(self, x1: float, y1: float, x2: float, y2: float):
"""
添加一个矩形障碍物
Args:
x1: 左上角x坐标
y1: 左上角y坐标
x2: 右下角x坐标
y2: 右下角y坐标
"""
# 确保(x1,y1)是左上角,(x2,y2)是右下角
top_left_x = min(x1, x2)
top_left_y = min(y1, y2)
width = abs(x2 - x1)
height = abs(y2 - y1)
area = {
'type': 'rectangle',
'rect': (top_left_x, top_left_y, width, height)
}
self.threat_areas.append(area)
def add_obstacle_polygon(self, points: List[Tuple[float, float]]):
"""
添加一个多边形障碍物
Args:
points: 多边形顶点列表 [(x1,y1), (x2,y2), ...]
"""
if len(points) >= 3: # 确保至少有3个点
area = {
'type': 'polygon',
'points': points
}
self.threat_areas.append(area)
def clear_obstacles(self):
"""清除所有障碍物和危险区域"""
self.obstacles.clear()
self.threat_areas.clear()
def update_obstacles(self, obstacles: List[Tuple[float, float]]):
"""更新障碍物列表"""
self.obstacles = obstacles
def get_current_path(self, drone_id: str) -> Optional[List[Tuple[float, float, float]]]:
def update_threat_areas(self, threat_areas: List[Dict]):
"""更新危险区域列表"""
self.threat_areas = threat_areas
def get_current_path(self, drone_id: str) -> Optional[List[Tuple[float, float]]]:
"""获取指定无人机的当前路径"""
return self.current_paths.get(drone_id)
@ -59,44 +208,6 @@ class PathPlanner:
"""清除指定无人机的路径"""
if drone_id in self.current_paths:
del self.current_paths[drone_id]
def smooth_path(self, path: List[Tuple[float, float, float]], weight_data: float = 0.5,
weight_smooth: float = 0.3, tolerance: float = 0.000001) -> List[Tuple[float, float, float]]:
"""
使用平滑算法优化路径
参数:
path: 原始路径
weight_data: 数据项权重
weight_smooth: 平滑项权重
tolerance: 收敛阈值
返回:
平滑后的路径
"""
if len(path) <= 2:
return path
# 将路径转换为numpy数组
path_array = np.array(path)
# 初始化平滑后的路径
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(3): # x, y, theta
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
return [(x, y, theta) for x, y, theta in smooth_path]
def save_path(self, drone_id: str, filename: str):
"""保存路径到文件"""

@ -0,0 +1,316 @@
# -*- coding: utf-8 -*-
# File: rrt_algorithm.py
# Purpose: 实现RRT(快速随机树)路径规划算法,支持避开危险区域
import numpy as np
import math
import random
from typing import List, Tuple, Dict, Optional
class Node:
"""RRT树节点"""
def __init__(self, x: float, y: float):
self.x = x
self.y = y
self.parent = None
self.cost = 0.0 # 从起点到当前节点的代价
class RRTAlgorithm:
"""RRT路径规划算法"""
def __init__(self,
grid_resolution: float = 5.0,
max_iterations: int = 2000,
step_size: float = 20.0,
goal_sample_rate: float = 0.1,
search_radius: float = 50.0):
"""
初始化RRT算法
Args:
grid_resolution: 网格分辨率
max_iterations: 最大迭代次数
step_size: 步长
goal_sample_rate: 采样目标点的概率
search_radius: 搜索半径
"""
self.grid_resolution = grid_resolution
self.max_iterations = max_iterations
self.step_size = step_size
self.goal_sample_rate = goal_sample_rate
self.search_radius = search_radius
def plan(self, start: Tuple[float, float],
goal: Tuple[float, float],
map_width: int, map_height: int,
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None) -> List[Tuple[float, float]]:
"""
使用RRT算法规划路径
Args:
start: 起点坐标 (x, y)
goal: 终点坐标 (x, y)
map_width: 地图宽度
map_height: 地图高度
threat_areas: 危险区域列表
obstacles: 障碍物列表
Returns:
路径点列表如果找不到路径返回空列表
"""
# 创建起点和终点节点
start_node = Node(start[0], start[1])
goal_node = Node(goal[0], goal[1])
# 初始化树
tree = [start_node]
# 迭代构建树
for i in range(self.max_iterations):
# 以一定概率直接取目标点,提高搜索效率
if random.random() < self.goal_sample_rate:
random_point = (goal_node.x, goal_node.y)
else:
# 随机采样点
random_point = (
random.uniform(0, map_width),
random.uniform(0, map_height)
)
# 找到树中离随机点最近的节点
nearest_node = self._find_nearest_node(tree, random_point)
# 朝随机点方向扩展固定步长
new_node = self._steer(nearest_node, random_point, self.step_size)
# 检查路径是否有效
if new_node and self._is_path_valid(nearest_node, new_node, threat_areas, obstacles):
# 将新节点添加到树中
new_node.parent = nearest_node
new_node.cost = nearest_node.cost + self._distance((nearest_node.x, nearest_node.y),
(new_node.x, new_node.y))
tree.append(new_node)
# 检查是否可以连接到目标点
dist_to_goal = self._distance((new_node.x, new_node.y), (goal_node.x, goal_node.y))
if dist_to_goal < self.step_size:
# 尝试直接连接到目标点
if self._is_path_valid(new_node, goal_node, threat_areas, obstacles):
goal_node.parent = new_node
goal_node.cost = new_node.cost + dist_to_goal
# 找到路径,提前结束
return self._extract_path(goal_node)
# RRT* 优化 (可选,提高路径质量)
# 尝试重新连接附近节点以获得更优路径
self._rewire(tree, new_node, threat_areas, obstacles)
# 如果达到最大迭代次数但仍未找到到达目标的路径
# 尝试找到离目标最近的节点,并返回到该节点的路径
closest_node = self._find_nearest_node(tree, (goal_node.x, goal_node.y))
path = self._extract_path(closest_node)
# 如果最近节点离目标足够近,则认为找到了路径
if self._distance((closest_node.x, closest_node.y), (goal_node.x, goal_node.y)) < self.step_size * 2:
return path
# 否则找不到有效路径,返回空列表
return []
def _find_nearest_node(self, tree: List[Node], point: Tuple[float, float]) -> Node:
"""找到树中离指定点最近的节点"""
min_dist = float('inf')
nearest_node = None
for node in tree:
dist = self._distance((node.x, node.y), point)
if dist < min_dist:
min_dist = dist
nearest_node = node
return nearest_node
def _steer(self, from_node: Node, to_point: Tuple[float, float], step_size: float) -> Optional[Node]:
"""从起始节点向目标点方向移动固定步长"""
dx = to_point[0] - from_node.x
dy = to_point[1] - from_node.y
dist = math.sqrt(dx*dx + dy*dy)
if dist < 0.0001: # 距离近乎为0
return None
# 按照指定的步长移动
ratio = min(step_size / dist, 1.0)
new_x = from_node.x + dx * ratio
new_y = from_node.y + dy * ratio
new_node = Node(new_x, new_y)
return new_node
def _is_path_valid(self, from_node: Node, to_node: Node,
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> bool:
"""检查两个节点之间的路径是否有效(不经过障碍物和危险区域)"""
# 采样点检测
dist = self._distance((from_node.x, from_node.y), (to_node.x, to_node.y))
num_samples = max(int(dist / self.grid_resolution), 5)
for i in range(num_samples + 1):
t = i / num_samples
x = from_node.x + t * (to_node.x - from_node.x)
y = from_node.y + t * (to_node.y - from_node.y)
# 检查是否在危险区域内
if self._is_in_threat_areas(x, y, threat_areas):
return False
# 检查是否与障碍物碰撞
if obstacles and self._is_in_obstacles(x, y, obstacles):
return False
return True
def _rewire(self, tree: List[Node], new_node: Node,
threat_areas: List[Dict], obstacles: List[Tuple[float, float]] = None) -> None:
"""重新连接树中节点以优化路径代价 (RRT* 算法的核心步骤)"""
# 找到搜索半径内的所有节点
near_nodes = []
for node in tree:
if node is not new_node: # 排除新节点本身
dist = self._distance((node.x, node.y), (new_node.x, new_node.y))
if dist < self.search_radius:
near_nodes.append(node)
# 对于搜索半径内的每个节点,检查是否通过新节点创建的路径更好
for node in near_nodes:
# 计算通过新节点到达当前节点的潜在代价
potential_cost = new_node.cost + self._distance((new_node.x, new_node.y), (node.x, node.y))
# 如果代价更低且路径有效,则重新连接
if potential_cost < node.cost and self._is_path_valid(new_node, node, threat_areas, obstacles):
node.parent = new_node
node.cost = potential_cost
def _extract_path(self, goal_node: Node) -> List[Tuple[float, float]]:
"""从目标节点回溯生成路径"""
path = []
current = goal_node
# 回溯到起点
while current:
path.append((current.x, current.y))
current = current.parent
# 反转路径顺序(从起点到终点)
return list(reversed(path))
def _distance(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
"""计算两点间欧几里得距离"""
return math.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
def _is_in_threat_areas(self, x: float, y: float, threat_areas: List[Dict]) -> bool:
"""检查点是否在危险区域中"""
for area in threat_areas:
area_type = area.get('type', '')
if area_type == 'circle':
center = area.get('center', (0, 0))
radius = area.get('radius', 0)
distance = math.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
if distance <= radius:
return True
elif area_type == 'rectangle':
rect = area.get('rect', (0, 0, 0, 0))
x1, y1, width, height = rect
if x1 <= x <= x1 + width and y1 <= y <= y1 + height:
return True
elif area_type == 'polygon':
points = area.get('points', [])
if self._point_in_polygon(x, y, points):
return True
return False
def _point_in_polygon(self, x: float, y: float, polygon: List[Tuple[float, float]]) -> bool:
"""检查点是否在多边形内(射线法)"""
if len(polygon) < 3:
return False
inside = False
j = len(polygon) - 1
for i in range(len(polygon)):
xi, yi = polygon[i]
xj, yj = polygon[j]
# 检查点是否在多边形边上
if (yi == y and xi == x) or (yj == y and xj == x):
return True
# 射线法判断点是否在多边形内部
intersect = ((yi > y) != (yj > y)) and (x < (xj - xi) * (y - yi) / (yj - yi) + xi)
if intersect:
inside = not inside
j = i
return inside
def _is_in_obstacles(self, x: float, y: float, obstacles: List[Tuple[float, float]]) -> bool:
"""检查点是否在障碍物中"""
for obstacle_x, obstacle_y in obstacles:
distance = math.sqrt((x - obstacle_x) ** 2 + (y - obstacle_y) ** 2)
if distance < self.grid_resolution:
return True
return False
def smooth_path(self, path: List[Tuple[float, float]],
threat_areas: List[Dict],
obstacles: List[Tuple[float, float]] = None,
weight_data: float = 0.5,
weight_smooth: float = 0.3,
tolerance: float = 0.000001) -> List[Tuple[float, float]]:
"""
使用平滑算法优化路径 (与A*接口兼容)
Args:
path: 原始路径
threat_areas: 危险区域
obstacles: 障碍物
weight_data: 数据权重
weight_smooth: 平滑权重
tolerance: 收敛阈值
Returns:
平滑后的路径
"""
# 如果路径点太少,不需要平滑
if len(path) <= 2:
return path
# 将路径转换为numpy数组方便操作
path_array = np.array(path)
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(2): # x, y
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
# 检查平滑后的路径是否经过危险区域或障碍物
for i in range(len(smooth_path)):
x, y = smooth_path[i]
if self._is_in_threat_areas(x, y, threat_areas) or (obstacles and self._is_in_obstacles(x, y, obstacles)):
return path # 如果平滑后路径穿过危险区域,返回原路径
return [(x, y) for x, y in smooth_path]

@ -7,6 +7,7 @@ from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QToolBar, QAction, QFileDialog)
from PyQt5.QtCore import Qt, pyqtSignal, QPointF, QRect, QRectF
from PyQt5.QtGui import QPixmap, QPainter, QPen, QColor, QTransform, QCursor, QMouseEvent
from database.database import load_danger_zones
class BaseMapView(QWidget):
# 定义信号
@ -80,6 +81,17 @@ class BaseMapView(QWidget):
self.map_data_model.set_map(pixmap)
self.reset_view() # Reset zoom/pan on new map load
self.map_loaded.emit()
# 清除旧的危险区域
self.map_data_model.threat_areas.clear()
# 从数据库加载危险区域
danger_zones = load_danger_zones()
for zone in danger_zones:
self.map_data_model.add_threat_area(zone)
self.update_map()
def reset_view(self):
print("BaseMapView: reset_view called") # Debug print
@ -167,6 +179,39 @@ class BaseMapView(QWidget):
for point in self.map_data_model.threat_points:
painter.drawPoint(QPointF(point[0], point[1])) # Draw at original coords (transformed)
# 绘制危险区域
if hasattr(self.map_data_model, 'threat_areas') and self.map_data_model.threat_areas:
for area in self.map_data_model.threat_areas:
area_type = area.get('type', '')
color = area.get('color', QColor(255, 0, 0, 128)) # 默认半透明红色
# 设置画笔和画刷
pen = QPen(color.darker(150), max(0.5, 2.0 / self.scale_factor))
painter.setPen(pen)
painter.setBrush(color)
if area_type == 'circle':
center = area.get('center', (0, 0))
radius = area.get('radius', 50)
painter.drawEllipse(QPointF(center[0], center[1]), radius, radius)
elif area_type == 'rectangle':
rect = area.get('rect', (0, 0, 100, 100))
painter.drawRect(QRectF(rect[0], rect[1], rect[2], rect[3]))
elif area_type == 'polygon':
points = area.get('points', [])
if len(points) >= 3:
# 创建QPointF列表
qpoints = [QPointF(p[0], p[1]) for p in points]
# 绘制多边形
painter.drawPolygon(*qpoints)
# Draw start point
if self.map_data_model.start_point:
point_size = max(1.0, 5.0 / self.scale_factor)

@ -5,7 +5,8 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QTableWidget, QTableWidgetItem,
QComboBox, QColorDialog, QMenu, QAction,
QDialog, QLineEdit, QFormLayout, QMessageBox)
QDialog, QLineEdit, QFormLayout, QMessageBox,
QListWidget, QListWidgetItem)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QColor, QBrush
@ -59,74 +60,114 @@ class AddDroneDialog(QDialog):
}
class DroneListView(QWidget):
# 定义信号
drone_selected = pyqtSignal(str) # 无人机被选中时发出信号
"""无人机列表视图"""
def __init__(self, comm_manager=None):
super().__init__()
self.comm_manager = comm_manager
self.drone_colors = {} # 存储无人机颜色
self.drone_groups = {} # 存储无人机分组
self.init_ui()
self.setup_communication()
def setup_communication(self):
"""设置通信回调"""
if self.comm_manager:
self.comm_manager.register_callback("drone_added", self.on_drone_added)
self.comm_manager.register_callback("drone_removed", self.on_drone_removed)
self.comm_manager.register_callback("drone_updated", self.on_drone_updated)
def on_drone_added(self, drone_id, data):
"""处理无人机添加事件"""
self.add_drone_to_table(drone_id, self.comm_manager.get_drone(drone_id))
drone_selected = pyqtSignal(str) # 无人机选择信号drone_id
def __init__(self, drone_manager, parent=None):
super().__init__(parent)
self.drone_manager = drone_manager
self.selected_drone_id = None
def on_drone_removed(self, drone_id, data):
"""处理无人机移除事件"""
self.remove_drone_from_table(drone_id)
# 连接信号
self.drone_manager.drone_added.connect(self._on_drone_added)
self.drone_manager.drone_removed.connect(self._on_drone_removed)
def on_drone_updated(self, drone_id, data):
"""处理无人机更新事件"""
self.update_drone_in_table(drone_id, self.comm_manager.get_drone(drone_id))
self._init_ui()
def add_drone_to_table(self, drone_id, info):
"""添加无人机到表格"""
row = self.drone_table.rowCount()
self.drone_table.insertRow(row)
# 默认添加一架无人机
self._add_default_drone()
# 设置各列数据
self.drone_table.setItem(row, 0, QTableWidgetItem(info.get("id", "")))
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
def _init_ui(self):
"""初始化UI"""
layout = QVBoxLayout(self)
# 应用颜色
if drone_id in self.drone_colors:
self.update_drone_color(drone_id)
# 标题
title_label = QLabel("无人机列表")
layout.addWidget(title_label)
# 无人机列表
self.drone_list = QListWidget()
self.drone_list.currentItemChanged.connect(self._on_drone_selection_changed)
layout.addWidget(self.drone_list)
# 按钮区域
btn_layout = QHBoxLayout()
# 添加无人机按钮
self.add_drone_btn = QPushButton("添加无人机")
self.add_drone_btn.clicked.connect(self._on_add_drone_clicked)
btn_layout.addWidget(self.add_drone_btn)
# 删除无人机按钮
self.remove_drone_btn = QPushButton("删除无人机")
self.remove_drone_btn.clicked.connect(self._on_remove_drone_clicked)
self.remove_drone_btn.setEnabled(False) # 初始禁用
btn_layout.addWidget(self.remove_drone_btn)
layout.addLayout(btn_layout)
def _add_default_drone(self):
"""添加默认无人机"""
self.drone_manager.add_drone(position=(50, 50), name="默认无人机")
def _on_add_drone_clicked(self):
"""添加无人机按钮点击事件"""
# 使用中心点位置添加新无人机
self.drone_manager.add_drone(position=(100, 100))
def _on_remove_drone_clicked(self):
"""删除无人机按钮点击事件"""
if self.selected_drone_id:
# 确认删除
reply = QMessageBox.question(
self,
"确认删除",
f"确定要删除所选无人机吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
def remove_drone_from_table(self, drone_id):
"""从表格中移除无人机"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.removeRow(row)
break
if reply == QMessageBox.Yes:
self.drone_manager.remove_drone(self.selected_drone_id)
self.selected_drone_id = None
self.remove_drone_btn.setEnabled(False)
def _on_drone_added(self, drone_id):
"""无人机添加事件处理"""
drone_info = self.drone_manager.get_drone_info(drone_id)
if drone_info:
# 创建列表项
item = QListWidgetItem(drone_info['name'])
item.setData(Qt.UserRole, drone_id) # 存储drone_id
self.drone_list.addItem(item)
# 如果是第一个无人机,选中它
if self.drone_list.count() == 1:
self.drone_list.setCurrentItem(item)
def update_drone_in_table(self, drone_id, info):
"""更新表格中的无人机信息"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
def _on_drone_removed(self, drone_id):
"""无人机移除事件处理"""
# 从列表中找到对应项并移除
for i in range(self.drone_list.count()):
item = self.drone_list.item(i)
if item.data(Qt.UserRole) == drone_id:
self.drone_list.takeItem(i)
break
def _on_drone_selection_changed(self, current, previous):
"""无人机选择变更事件处理"""
if current:
drone_id = current.data(Qt.UserRole)
self.selected_drone_id = drone_id
self.remove_drone_btn.setEnabled(True)
# 发出选择信号
self.drone_selected.emit(drone_id)
# 显示调试消息
print(f"已选择无人机: {drone_id}")
else:
self.selected_drone_id = None
self.remove_drone_btn.setEnabled(False)
def init_ui(self):
# 创建主布局

@ -0,0 +1,251 @@
from PyQt5.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QTableWidget, QTableWidgetItem, QStatusBar,
QGroupBox, QSplitter, QFrame, QListWidget, QListWidgetItem,
QFormLayout, QDoubleSpinBox, QMessageBox, QSlider)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QColor, QPixmap, QFont
from .map_data_model import MapDataModel
from .drone_manager import DroneManager
from .drone_list_view import DroneListView
from .integrated_map_view import IntegratedMapView
class DroneManagementWindow(QMainWindow):
"""无人机管理主窗口"""
# 定义信号
switch_to_map_view = pyqtSignal()
def __init__(self):
super().__init__()
# 创建地图数据模型
self.map_data_model = MapDataModel()
# 创建无人机管理器
self.drone_manager = DroneManager(self.map_data_model)
# 初始化UI
self.init_ui()
# 窗口设置
self.setWindowTitle("无人机后勤输送系统 - 无人机管理")
self.setMinimumSize(1200, 800)
# 添加默认无人机
self.add_default_drones()
def init_ui(self):
"""初始化UI"""
# 创建中心部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 创建主布局
main_layout = QVBoxLayout(central_widget)
# 创建标题
title_label = QLabel("无人机管理中心")
title_label.setAlignment(Qt.AlignCenter)
title_font = QFont()
title_font.setPointSize(16)
title_font.setBold(True)
title_label.setFont(title_font)
main_layout.addWidget(title_label)
# 创建分割器,左侧是无人机列表,右侧是无人机详情
splitter = QSplitter(Qt.Horizontal)
# 左侧无人机列表
left_panel = QFrame()
left_layout = QVBoxLayout(left_panel)
# 无人机列表视图
self.drone_list_view = DroneListView(self.drone_manager)
left_layout.addWidget(self.drone_list_view)
# 操作按钮区域
button_group = QGroupBox("操作")
button_layout = QVBoxLayout()
self.add_drone_btn = QPushButton("添加无人机")
self.add_drone_btn.clicked.connect(self.add_drone)
self.switch_to_map_btn = QPushButton("打开地图视图")
self.switch_to_map_btn.clicked.connect(self.open_map_view)
button_layout.addWidget(self.add_drone_btn)
button_layout.addWidget(self.switch_to_map_btn)
button_group.setLayout(button_layout)
left_layout.addWidget(button_group)
# 右侧无人机详情
right_panel = QFrame()
right_layout = QVBoxLayout(right_panel)
# 无人机详情区域
details_group = QGroupBox("无人机详情")
details_layout = QFormLayout()
# 无人机位置信息
self.drone_id_label = QLabel("未选择")
self.drone_type_label = QLabel("未选择")
self.x_pos_spin = QDoubleSpinBox()
self.x_pos_spin.setRange(0, 10000)
self.x_pos_spin.setDecimals(2)
self.y_pos_spin = QDoubleSpinBox()
self.y_pos_spin.setRange(0, 10000)
self.y_pos_spin.setDecimals(2)
self.update_position_btn = QPushButton("更新位置")
self.update_position_btn.clicked.connect(self.update_drone_position)
details_layout.addRow("无人机ID:", self.drone_id_label)
details_layout.addRow("无人机类型:", self.drone_type_label)
details_layout.addRow("X坐标:", self.x_pos_spin)
details_layout.addRow("Y坐标:", self.y_pos_spin)
details_layout.addRow(self.update_position_btn)
# 添加速度控制
speed_layout = QVBoxLayout()
speed_layout.setContentsMargins(0, 10, 0, 0)
speed_title = QLabel("移动速度调整")
speed_title.setStyleSheet("font-weight: bold")
speed_layout.addWidget(speed_title)
speed_info_layout = QHBoxLayout()
self.speed_label = QLabel("50 ms/帧")
speed_info_layout.addWidget(QLabel(""))
speed_info_layout.addStretch()
speed_info_layout.addWidget(self.speed_label)
speed_info_layout.addStretch()
speed_info_layout.addWidget(QLabel(""))
speed_layout.addLayout(speed_info_layout)
self.speed_slider = QSlider(Qt.Horizontal)
self.speed_slider.setRange(10, 200) # 10ms到200ms
self.speed_slider.setValue(50) # 默认50ms
self.speed_slider.setTickPosition(QSlider.TicksBelow)
self.speed_slider.setTickInterval(20)
self.speed_slider.valueChanged.connect(self.on_speed_changed)
speed_layout.addWidget(self.speed_slider)
details_layout.addRow(speed_layout)
details_group.setLayout(details_layout)
right_layout.addWidget(details_group)
# 无人机状态信息
status_group = QGroupBox("状态信息")
status_layout = QVBoxLayout()
self.status_table = QTableWidget()
self.status_table.setColumnCount(2)
self.status_table.setHorizontalHeaderLabels(["参数", ""])
self.status_table.setRowCount(4)
self.status_table.setItem(0, 0, QTableWidgetItem("当前状态"))
self.status_table.setItem(0, 1, QTableWidgetItem("待命"))
self.status_table.setItem(1, 0, QTableWidgetItem("电池电量"))
self.status_table.setItem(1, 1, QTableWidgetItem("100%"))
self.status_table.setItem(2, 0, QTableWidgetItem("负载情况"))
self.status_table.setItem(2, 1, QTableWidgetItem(""))
self.status_table.setItem(3, 0, QTableWidgetItem("当前任务"))
self.status_table.setItem(3, 1, QTableWidgetItem(""))
status_layout.addWidget(self.status_table)
status_group.setLayout(status_layout)
right_layout.addWidget(status_group)
# 添加到分割器
splitter.addWidget(left_panel)
splitter.addWidget(right_panel)
# 设置分割器的初始大小比例
splitter.setSizes([400, 800])
main_layout.addWidget(splitter)
# 状态栏
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("就绪")
# 连接无人机选择信号
self.drone_list_view.drone_selected.connect(self.on_drone_selected)
def add_default_drones(self):
"""添加默认无人机"""
self.drone_manager.add_drone(position=(100, 100), name="运输无人机1", drone_type="运输型")
self.drone_manager.add_drone(position=(200, 200), name="侦察无人机1", drone_type="侦察型")
self.drone_manager.add_drone(position=(300, 300), name="战斗无人机1", drone_type="战斗型")
def add_drone(self):
"""添加新无人机"""
# 简单实现,后续可以添加对话框让用户输入无人机信息
pos_x = 100 + (self.drone_manager.drone_count % 5) * 50
pos_y = 100 + (self.drone_manager.drone_count % 5) * 50
drone_id = self.drone_manager.add_drone(position=(pos_x, pos_y))
self.status_bar.showMessage(f"已添加新无人机: {drone_id}")
def on_drone_selected(self, drone_id):
"""处理无人机选择事件"""
drone_info = self.drone_manager.get_drone_info(drone_id)
position = self.drone_manager.get_drone_position(drone_id)
if drone_info and position:
self.drone_id_label.setText(drone_id)
self.drone_type_label.setText(drone_info.get('type', '未知'))
self.x_pos_spin.setValue(position[0])
self.y_pos_spin.setValue(position[1])
# 更新速度滑块
speed = self.drone_manager.get_drone_speed(drone_id)
self.speed_slider.setValue(speed)
self.speed_label.setText(f"{speed} ms/帧")
# 更新状态表格
self.status_table.setItem(0, 1, QTableWidgetItem("待命"))
self.status_table.setItem(1, 1, QTableWidgetItem("100%"))
self.status_bar.showMessage(f"已选择无人机: {drone_id}")
def on_speed_changed(self, value):
"""处理速度滑块变更事件"""
drone_id = self.drone_id_label.text()
if drone_id != "未选择":
# 更新标签显示
self.speed_label.setText(f"{value} ms/帧")
# 更新无人机速度
self.drone_manager.set_drone_speed(drone_id, value)
self.status_bar.showMessage(f"无人机 {drone_id} 速度已调整为 {value} ms/帧")
def update_drone_position(self):
"""更新无人机位置"""
drone_id = self.drone_id_label.text()
if drone_id == "未选择":
QMessageBox.warning(self, "警告", "请先选择一架无人机")
return
x = self.x_pos_spin.value()
y = self.y_pos_spin.value()
# 获取当前位置中的航向角
current_position = self.drone_manager.get_drone_position(drone_id)
heading = current_position[2] if current_position else 0
# 更新位置
self.drone_manager.map_data_model.update_drone_position(drone_id, (x, y, heading))
self.status_bar.showMessage(f"无人机 {drone_id} 位置已更新")
def open_map_view(self):
"""打开地图视图"""
# 获取当前无人机列表,用于调试
drone_ids = self.drone_manager.get_all_drones()
print(f"打开地图视图,当前有 {len(drone_ids)} 架无人机: {drone_ids}")
# 发送切换信号
self.switch_to_map_view.emit()

@ -0,0 +1,278 @@
from PyQt5.QtCore import QObject, QTimer, pyqtSignal
import math
class DronePathAnimation(QObject):
"""无人机路径动画控制器"""
animation_step = pyqtSignal(str, tuple) # 每步动画发出信号drone_id, position(x, y, heading)
animation_finished = pyqtSignal(str) # 动画结束信号drone_id
def __init__(self, map_data_model):
super().__init__()
self.map_data_model = map_data_model
self.drone_paths = {} # {drone_id: [(x1,y1), (x2,y2), ...]}
self.animation_timers = {} # {drone_id: QTimer}
self.current_path_index = {} # {drone_id: current_index}
self.animation_speed = 50 # ms控制动画速度
def set_drone_path(self, drone_id, path):
"""设置无人机路径
Args:
drone_id: 无人机ID
path: 路径点列表 [(x1,y1), (x2,y2), ...]
"""
if len(path) >= 2: # 至少需要起点和终点
self.drone_paths[drone_id] = path
self.current_path_index[drone_id] = 0
def start_animation(self, drone_id):
"""开始无人机沿路径移动的动画
Args:
drone_id: 无人机ID
Returns:
成功开始返回True否则返回False
"""
if drone_id not in self.drone_paths or not self.drone_paths[drone_id]:
return False
# 如果已有动画,先停止
if drone_id in self.animation_timers and self.animation_timers[drone_id].isActive():
self.animation_timers[drone_id].stop()
# 设置无人机初始位置为路径起点
start_point = self.drone_paths[drone_id][0]
self.map_data_model.add_drone(drone_id, (start_point[0], start_point[1], 0))
# 创建定时器
timer = QTimer(self)
timer.timeout.connect(lambda: self._animation_step(drone_id))
self.animation_timers[drone_id] = timer
# 重置路径索引并开始
self.current_path_index[drone_id] = 0
timer.start(self.animation_speed)
return True
def stop_animation(self, drone_id):
"""停止无人机动画
Args:
drone_id: 无人机ID
"""
if drone_id in self.animation_timers and self.animation_timers[drone_id].isActive():
self.animation_timers[drone_id].stop()
def _animation_step(self, drone_id):
"""单步更新无人机位置
Args:
drone_id: 无人机ID
"""
if drone_id not in self.drone_paths:
return
path = self.drone_paths[drone_id]
current_index = self.current_path_index[drone_id]
# 如果到达终点,停止动画
if current_index >= len(path) - 1:
self.animation_timers[drone_id].stop()
self.animation_finished.emit(drone_id)
return
# 计算当前位置和下一个位置
current_point = path[current_index]
next_point = path[current_index + 1]
# 计算航向角度(弧度)
dx = next_point[0] - current_point[0]
dy = next_point[1] - current_point[1]
heading = math.atan2(dy, dx)
# 更新无人机位置
self.map_data_model.update_drone_position(drone_id, (next_point[0], next_point[1], heading))
# 发出位置更新信号
self.animation_step.emit(drone_id, (next_point[0], next_point[1], heading))
# 更新路径索引
self.current_path_index[drone_id] = current_index + 1
def set_animation_speed(self, speed_ms):
"""设置动画速度
Args:
speed_ms: 动画帧间隔单位毫秒值越大动画越慢
"""
if speed_ms >= 10 and speed_ms <= 1000: # 限制合理范围
self.animation_speed = speed_ms
# 更新所有正在运行的动画定时器
for drone_id, timer in self.animation_timers.items():
if timer.isActive():
timer.setInterval(speed_ms)
def get_animation_speed(self):
"""获取当前动画速度"""
return self.animation_speed
class DroneManager(QObject):
"""无人机管理器"""
drone_added = pyqtSignal(str) # 无人机添加信号drone_id
drone_removed = pyqtSignal(str) # 无人机移除信号drone_id
drone_updated = pyqtSignal(str, tuple) # 无人机更新信号drone_id, position
def __init__(self, map_data_model):
super().__init__()
self.map_data_model = map_data_model
self.drones = {} # {drone_id: {'name': 名称, 'type': 类型, 'speed': 移动速度}}
self.drone_count = 0
self.path_animation = DronePathAnimation(map_data_model)
def add_drone(self, position=(0, 0), name=None, drone_type="标准无人机", speed=50):
"""添加新无人机
Args:
position: 位置元组 (x, y)
name: 无人机名称如不提供则自动生成
drone_type: 无人机类型
speed: 无人机移动速度单位毫秒/
Returns:
drone_id: 新添加的无人机ID
"""
self.drone_count += 1
drone_id = f"drone_{self.drone_count}"
if name is None:
name = f"无人机-{self.drone_count}"
self.drones[drone_id] = {
'name': name,
'type': drone_type,
'speed': speed
}
# 添加到地图数据模型
self.map_data_model.add_drone(drone_id, position)
# 发出无人机添加信号
self.drone_added.emit(drone_id)
return drone_id
def remove_drone(self, drone_id):
"""移除无人机
Args:
drone_id: 无人机ID
"""
if drone_id in self.drones:
# 停止可能的动画
self.path_animation.stop_animation(drone_id)
# 从数据结构中移除
del self.drones[drone_id]
self.map_data_model.remove_drone(drone_id)
# 发出无人机移除信号
self.drone_removed.emit(drone_id)
def set_drone_path(self, drone_id, path):
"""设置无人机路径
Args:
drone_id: 无人机ID
path: 路径点列表 [(x1,y1), (x2,y2), ...]
"""
if drone_id in self.drones:
self.path_animation.set_drone_path(drone_id, path)
def start_drone_movement(self, drone_id):
"""开始无人机移动
Args:
drone_id: 无人机ID
Returns:
成功开始返回True否则返回False
"""
if drone_id not in self.drones:
return False
# 获取无人机速度设置
speed = self.get_drone_speed(drone_id)
# 设置动画控制器速度
self.path_animation.set_animation_speed(speed)
# 开始动画
return self.path_animation.start_animation(drone_id)
def stop_drone_movement(self, drone_id):
"""停止无人机移动
Args:
drone_id: 无人机ID
"""
self.path_animation.stop_animation(drone_id)
def get_drone_info(self, drone_id):
"""获取无人机信息
Args:
drone_id: 无人机ID
Returns:
无人机信息字典或None如果无人机不存在
"""
return self.drones.get(drone_id)
def get_drone_position(self, drone_id):
"""获取无人机位置
Args:
drone_id: 无人机ID
Returns:
位置元组 (x, y, heading) None如果无人机不存在
"""
return self.map_data_model.get_drone_position(drone_id)
def get_all_drones(self):
"""获取所有无人机ID
Returns:
无人机ID列表
"""
return list(self.drones.keys())
def set_drone_speed(self, drone_id, speed):
"""设置无人机移动速度
Args:
drone_id: 无人机ID
speed: 速度值单位毫秒/
"""
if drone_id in self.drones:
self.drones[drone_id]['speed'] = speed
# 更新动画控制器的速度
self.path_animation.set_animation_speed(speed)
def get_drone_speed(self, drone_id):
"""获取无人机速度
Args:
drone_id: 无人机ID
Returns:
速度值或默认值50
"""
if drone_id in self.drones and 'speed' in self.drones[drone_id]:
return self.drones[drone_id]['speed']
return 50

File diff suppressed because it is too large Load Diff

@ -6,7 +6,7 @@ from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QFont
from database.database import register_user, query_user
class LoginView(QWidget):
# 定义登录成功信号
login_successful = pyqtSignal()
@ -67,13 +67,33 @@ class LoginView(QWidget):
if not username or not password:
QMessageBox.warning(self, "错误", "请输入用户名和密码")
return
# 添加默认测试账号
# 使用数据库查询验证用户
if query_user(username, password):
self.login_successful.emit()
else:
QMessageBox.warning(self, "错误", "用户名或密码错误")
"""# 添加默认测试账号
if username == "admin" and password == "admin123":
self.login_successful.emit()
else:
QMessageBox.warning(self, "错误", "用户名或密码错误")
QMessageBox.warning(self, "错误", "用户名或密码错误")"""
def handle_register(self):
# TODO: 实现注册功能
QMessageBox.information(self, "提示", "注册功能待实现")
"""# TODO: 实现注册功能
QMessageBox.information(self, "提示", "注册功能待实现") """
username = self.username_input.text()
password = self.password_input.text()
if not username or not password:
QMessageBox.warning(self, "错误", "请输入用户名和密码")
return
# 检查用户是否已存在
if query_user(username, password):
QMessageBox.warning(self, "错误", "用户已存在")
return
# 注册新用户
if register_user(username, password):
QMessageBox.information(self, "提示", "注册成功")
else:
QMessageBox.warning(self, "错误", "注册失败,请重试")

@ -1,15 +1,18 @@
from PyQt5.QtCore import QObject, pyqtSignal
from database.database import save_danger_zone
class MapDataModel(QObject):
data_changed = pyqtSignal()
def __init__(self):
super().__init__()
self.map_pixmap = None
self.threat_points = []
self.threat_points = [] # 保留向后兼容
self.threat_areas = [] # 新增:危险区域列表
self.start_point = None
self.goal_point = None
self.paths = []
self.drones = {} # 存储无人机位置信息 {drone_id: (x, y, heading)}
self.selected_drone = None # 当前选中的无人机ID
def set_map(self, pixmap):
self.map_pixmap = pixmap
@ -18,6 +21,17 @@ class MapDataModel(QObject):
def add_threat_point(self, point):
self.threat_points.append(point)
self.data_changed.emit()
def add_threat_area(self, area):
"""添加危险区域
area: 字典包含类型(type)和相应的参数
{'type': 'circle', 'center': (x, y), 'radius': r, 'color': color}
{'type': 'rectangle', 'rect': (x, y, width, height), 'color': color}
{'type': 'polygon', 'points': [(x1,y1), (x2,y2), ...], 'color': color}
"""
self.threat_areas.append(area)
self.data_changed.emit()
def set_start_point(self, point):
self.start_point = point
@ -29,6 +43,7 @@ class MapDataModel(QObject):
def clear_all(self):
self.threat_points.clear()
self.threat_areas.clear()
self.start_point = None
self.goal_point = None
self.paths = []
@ -36,4 +51,68 @@ class MapDataModel(QObject):
def set_paths(self, paths):
self.paths = paths
self.data_changed.emit()
self.data_changed.emit()
def add_drone(self, drone_id, position):
"""添加或更新无人机位置
Args:
drone_id: 无人机ID
position: 位置元组 (x, y, heading)heading可选
"""
if len(position) == 2:
# 如果只提供了x,y坐标添加默认航向角0
self.drones[drone_id] = (position[0], position[1], 0)
else:
self.drones[drone_id] = position
self.data_changed.emit()
def update_drone_position(self, drone_id, position):
"""更新无人机位置
Args:
drone_id: 无人机ID
position: 新位置元组 (x, y, heading)
"""
if drone_id in self.drones:
self.drones[drone_id] = position
self.data_changed.emit()
def get_drone_position(self, drone_id):
"""获取无人机位置
Args:
drone_id: 无人机ID
Returns:
位置元组 (x, y, heading) None如果无人机不存在
"""
return self.drones.get(drone_id)
def remove_drone(self, drone_id):
"""移除无人机
Args:
drone_id: 无人机ID
"""
if drone_id in self.drones:
del self.drones[drone_id]
self.data_changed.emit()
def set_selected_drone(self, drone_id):
"""设置当前选中的无人机
Args:
drone_id: 无人机ID或None表示取消选择
"""
if drone_id is None or drone_id in self.drones:
self.selected_drone = drone_id
self.data_changed.emit()
def get_selected_drone(self):
"""获取当前选中的无人机ID
Returns:
当前选中的无人机ID或None
"""
return self.selected_drone
Loading…
Cancel
Save