后端实现 #14

Merged
pg9nrcf7t merged 1 commits from main into a_branch 8 months ago

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (pythonProject2)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/project.iml" filepath="$PROJECT_DIR$/.idea/project.iml" />
</modules>
</component>
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

@ -1 +1,5 @@
# -*- coding: utf-8 -*-
# File: __init__.py (in command_center)
# Purpose: 将 command_center 目录标记为 Python 包。
# 指挥控制中心包

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: __init__.py (in communication)
# Purpose: 将 communication 目录标记为 Python 包,并可能导出其中的类。
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
from .message_queue import MessageQueue

@ -1,6 +1,12 @@
import threading
# -*- coding: utf-8 -*-
# File: communication_manager.py
# Purpose: 核心通信管理器,处理与后端、数据库和无人机的通信。
# 管理连接、消息分发和回调注册。
import json
import time
from typing import Dict, Any, Optional, List
from threading import Thread, Lock
from typing import Dict, List, Callable, Optional, Any
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
from .message_queue import MessageQueue
@ -16,8 +22,11 @@ class CommunicationManager:
database=db_config['database']
)
self.drone_manager = DroneConnectionManager(self.database)
self.drones: Dict[str, dict] = {} # 存储无人机信息
self.callbacks: Dict[str, List[Callable]] = {} # 存储回调函数
self.running = False
self.thread = None
self.lock = Lock()
self.update_thread = None
self.message_queue = MessageQueue()
def start(self) -> bool:
@ -45,74 +54,92 @@ class CommunicationManager:
# 启动消息处理线程
self.running = True
self.thread = threading.Thread(target=self._process_messages)
self.thread.daemon = True
self.thread.start()
self.update_thread = Thread(target=self._update_loop)
self.update_thread.daemon = True
self.update_thread.start()
return True
def stop(self):
"""停止通信管理器"""
self.running = False
if self.thread:
self.thread.join()
if self.update_thread:
self.update_thread.join()
self.mavlink.stop()
self.drone_manager.stop()
self.database.disconnect()
def add_drone(self, drone_id: str, ip_address: str, port: int,
connection_type: str = 'udp') -> bool:
"""添加新的无人机连接"""
return self.database.add_drone_connection(drone_id, ip_address, port, connection_type)
def connect_drone(self, drone_id: str) -> bool:
"""连接指定的无人机"""
return self.drone_manager.connect_drone(drone_id)
def disconnect_drone(self, drone_id: str):
"""断开指定无人机的连接"""
self.drone_manager.disconnect_drone(drone_id)
def get_connected_drones(self) -> List[str]:
"""获取所有已连接的无人机ID"""
return self.drone_manager.get_connected_drones()
def register_callback(self, event: str, callback: Callable):
"""注册回调函数"""
if event not in self.callbacks:
self.callbacks[event] = []
self.callbacks[event].append(callback)
def is_drone_connected(self, drone_id: str) -> bool:
"""检查指定无人机是否已连接"""
return self.drone_manager.is_drone_connected(drone_id)
def send_command(self, drone_id: str, command_type: str, **kwargs) -> bool:
"""向指定无人机发送命令"""
return self.drone_manager.send_command(drone_id, command_type, **kwargs)
def _process_messages(self):
"""处理消息队列中的消息"""
def _update_loop(self):
"""更新循环"""
while self.running:
try:
msg = self.mavlink.get_message_queue().get(timeout=1.0)
if msg:
self._handle_message(msg)
self._update_drone_status()
time.sleep(0.1) # 100ms更新间隔
except Exception as e:
print(f"消息处理错误: {str(e)}")
time.sleep(1)
def _handle_message(self, msg: Dict[str, Any]):
"""处理接收到的消息"""
try:
msg_type = msg['message'].get_type()
msg_data = msg['message'].to_dict()
# 保存消息到数据库
self.database.save_mavlink_message(msg_type, msg_data)
# 根据消息类型更新无人机状态
if msg_type == 'GLOBAL_POSITION_INT':
self._update_drone_position(msg_data)
elif msg_type == 'SYS_STATUS':
self._update_drone_status(msg_data)
except Exception as e:
print(f"消息处理失败: {str(e)}")
print(f"更新循环错误: {str(e)}")
def _update_drone_status(self):
"""更新无人机状态"""
# TODO: 实现实际的通信逻辑
# 这里模拟一些数据更新
with self.lock:
for drone_id in self.drones:
self.drones[drone_id].update({
"battery": max(0, self.drones[drone_id].get("battery", 100) - 0.1),
"signal_strength": max(0, self.drones[drone_id].get("signal_strength", 100) - 0.2),
"latency": self.drones[drone_id].get("latency", 0) + 1
})
def add_drone(self, drone_id: str, info: dict):
"""添加无人机"""
with self.lock:
self.drones[drone_id] = info
self._notify("drone_added", drone_id)
def remove_drone(self, drone_id: str):
"""移除无人机"""
with self.lock:
if drone_id in self.drones:
del self.drones[drone_id]
self._notify("drone_removed", drone_id)
def update_drone(self, drone_id: str, info: dict):
"""更新无人机信息"""
with self.lock:
if drone_id in self.drones:
self.drones[drone_id].update(info)
self._notify("drone_updated", drone_id)
def get_drone(self, drone_id: str) -> dict:
"""获取无人机信息"""
with self.lock:
return self.drones.get(drone_id, {})
def get_all_drones(self) -> List[dict]:
"""获取所有无人机信息"""
with self.lock:
return list(self.drones.values())
def send_command(self, drone_id: str, command: str, params: dict = None):
"""发送控制命令"""
# TODO: 实现实际的命令发送逻辑
print(f"发送命令到无人机 {drone_id}: {command} {params}")
self._notify("command_sent", drone_id, {"command": command, "params": params})
def _notify(self, event: str, drone_id: str, data: dict = None):
"""通知事件"""
if event in self.callbacks:
for callback in self.callbacks[event]:
try:
callback(drone_id, data)
except Exception as e:
print(f"回调函数执行错误: {str(e)}")
def _handle_heartbeat(self, msg):
"""处理心跳消息"""
@ -135,37 +162,6 @@ class CommunicationManager:
'data': msg.to_dict()
})
def _update_drone_position(self, position_data: Dict[str, Any]):
"""更新无人机位置信息"""
query = """
INSERT INTO drone_status
(drone_id, latitude, longitude, altitude, timestamp)
VALUES (%s, %s, %s, %s, %s)
"""
params = (
position_data.get('sysid', 'unknown'),
position_data.get('lat', 0) / 1e7, # 转换为度
position_data.get('lon', 0) / 1e7, # 转换为度
position_data.get('alt', 0) / 1000, # 转换为米
datetime.now()
)
self.database.execute_update(query, params)
def _update_drone_status(self, status_data: Dict[str, Any]):
"""更新无人机状态信息"""
query = """
UPDATE drone_status
SET battery_level = %s, status = %s
WHERE drone_id = %s
ORDER BY timestamp DESC LIMIT 1
"""
params = (
status_data.get('battery_remaining', 0),
status_data.get('system_status', 'UNKNOWN'),
status_data.get('sysid', 'unknown')
)
self.database.execute_update(query, params)
def get_drone_status(self, drone_id: str) -> Optional[Dict[str, Any]]:
"""获取指定无人机的状态"""
query = """

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: database_handler.py
# Purpose: 处理与数据库的交互,包括连接、数据读写(如用户信息、无人机数据、任务记录等)。
import mysql.connector
from mysql.connector import Error
from typing import Dict, List, Any, Optional

@ -1,9 +1,15 @@
# -*- coding: utf-8 -*-
# File: drone_connection_manager.py
# Purpose: 管理与单个或多个无人机的直接连接(可能通过 MAVLink 或其他协议)。
# 处理心跳、连接状态和底层数据收发。
from typing import Dict, List, Optional
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
import threading
import time
from datetime import datetime, timedelta
import socket
class DroneConnectionManager:
def __init__(self, db_handler: DatabaseHandler):

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: mavlink_handler.py
# Purpose: 处理 MAVLink 协议的编码、解码和消息处理。
import pymavlink.mavutil as mavutil
import threading
import time

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: message_queue.py
# Purpose: 提供线程安全的消息队列,用于模块间的异步通信。
import queue
import threading
from typing import Any, Optional

@ -1,13 +1,17 @@
# -*- coding: utf-8 -*-
# File: main.py
# Purpose: 主应用程序入口点,定义和启动指挥控制中心主窗口 (CommandCenterApp)。
# 负责初始化UI、处理登录、创建数据模型和协调各个功能模块。
import sys
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QTabWidget, QPushButton, QLabel,
QGroupBox, QComboBox, QSpinBox, QDoubleSpinBox,
QProgressBar, QCheckBox, QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QFont
QProgressBar, QCheckBox, QMessageBox, QFileDialog)
from PyQt5.QtCore import Qt, pyqtSignal, QThread, QObject
from PyQt5.QtGui import QFont, QPixmap, QPainter, QPen, QColor
from ui.login_view import LoginView
from ui.main_view import MainView
from ui.map_view import MapView
from ui.simple_map_view import SimpleMapView
from ui.threat_layer_view import ThreatLayerView
from ui.path_layer_view import PathLayerView
from ui.drone_list_view import DroneListView
@ -15,8 +19,42 @@ from ui.drone_detail_view import DroneDetailView
from ui.status_dashboard import StatusDashboard
from ui.path_planning_view import PathPlanningView
from ui.algorithm_config_view import AlgorithmConfigView
from ui.path_simulation_view import PathSimulationView
from communication.communication_manager import CommunicationManager
from ui.base_map_view import BaseMapView
from ui.map_data_model import MapDataModel
class PathPlanningThread(QThread):
path_planned = pyqtSignal(list) # 规划完成后发出新路径
def __init__(self, simulator, drone_id, goal):
super().__init__()
self.simulator = simulator
self.drone_id = drone_id
self.goal = goal
self._running = True
self._replan_requested = False
def run(self):
while self._running:
if self._replan_requested:
self._replan_requested = False
path = self.simulator.path_planner.plan_path(
self.drone_id,
self.simulator.drones[self.drone_id]['position'],
self.goal
)
if path:
self.path_planned.emit(path)
self.msleep(100) # 避免CPU占用过高
def request_replan(self, goal=None):
if goal is not None:
self.goal = goal
self._replan_requested = True
def stop(self):
self._running = False
self.wait()
class CommandCenterApp(QMainWindow):
# 定义信号
@ -42,16 +80,18 @@ class CommandCenterApp(QMainWindow):
# 创建标签页
self.tab_widget = QTabWidget()
# 创建地图数据模型
self.map_data_model = MapDataModel()
# 添加各个功能标签页
self.tab_widget.addTab(MapView(), "地图视图")
self.tab_widget.addTab(ThreatLayerView(), "威胁层")
self.tab_widget.addTab(PathLayerView(), "路径层")
self.tab_widget.addTab(SimpleMapView(self.map_data_model), "地图视图")
self.tab_widget.addTab(ThreatLayerView(self.map_data_model), "威胁层")
self.tab_widget.addTab(PathLayerView(self.map_data_model), "路径层")
self.tab_widget.addTab(DroneListView(), "无人机列表")
self.tab_widget.addTab(DroneDetailView(), "无人机详情")
self.tab_widget.addTab(StatusDashboard(), "状态仪表板")
self.tab_widget.addTab(PathPlanningView(), "路径规划")
self.tab_widget.addTab(AlgorithmConfigView(), "算法配置")
self.tab_widget.addTab(PathSimulationView(), "路径模拟")
main_layout.addWidget(self.tab_widget)

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# File: battlefield_simulator.py
# Purpose: 定义战场环境模拟器,可能包含地图、无人机状态、威胁等信息。
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict
import random
from .path_planner import PathPlanner
class BattlefieldSimulator:
def __init__(self, width: float = 100.0, height: float = 100.0):
self.width = width
self.height = height
self.obstacles: List[Tuple[float, float]] = []
self.drones: Dict[str, Dict] = {}
self.path_planner = PathPlanner()
def generate_battlefield(self,
num_buildings: int = 10,
num_barriers: int = 15,
num_vehicles: int = 5):
"""生成战场环境"""
# 清空现有障碍物
self.obstacles.clear()
# 生成建筑物(矩形障碍物)
for _ in range(num_buildings):
x = random.uniform(0, self.width)
y = random.uniform(0, self.height)
width = random.uniform(5, 15)
height = random.uniform(5, 15)
self._add_rectangular_obstacle(x, y, width, height)
# 生成路障(点状障碍物)
for _ in range(num_barriers):
x = random.uniform(0, self.width)
y = random.uniform(0, self.height)
self.obstacles.append((x, y))
# 生成废弃车辆(点状障碍物)
for _ in range(num_vehicles):
x = random.uniform(0, self.width)
y = random.uniform(0, self.height)
self.obstacles.append((x, y))
# 更新路径规划器的障碍物
self.path_planner.update_obstacles(self.obstacles)
def _add_rectangular_obstacle(self, x: float, y: float, width: float, height: float):
"""添加矩形障碍物"""
# 在矩形区域内生成多个点作为障碍物
for i in np.arange(x - width/2, x + width/2, 1.0):
for j in np.arange(y - height/2, y + height/2, 1.0):
if 0 <= i <= self.width and 0 <= j <= self.height:
self.obstacles.append((i, j))
def add_drone(self, drone_id: str, start_pos: Tuple[float, float, float]):
"""添加无人机"""
self.drones[drone_id] = {
'position': start_pos,
'path': None,
'status': 'standby'
}
def plan_path(self, drone_id: str, goal: Tuple[float, float, float]) -> bool:
"""为无人机规划路径"""
if drone_id not in self.drones:
return False
start = self.drones[drone_id]['position']
path = self.path_planner.plan_path(drone_id, start, goal)
if path:
self.drones[drone_id]['path'] = path
return True
return False
def visualize(self, show_paths: bool = True):
"""可视化战场环境"""
plt.figure(figsize=(10, 10))
# 绘制障碍物
x_obs = [obs[0] for obs in self.obstacles]
y_obs = [obs[1] for obs in self.obstacles]
plt.scatter(x_obs, y_obs, c='red', s=10, label='Obstacles')
# 绘制无人机和路径
for drone_id, drone in self.drones.items():
# 绘制无人机位置
pos = drone['position']
plt.scatter(pos[0], pos[1], c='blue', s=100, label=f'Drone {drone_id}')
# 绘制航向
dx = 2 * np.cos(pos[2])
dy = 2 * np.sin(pos[2])
plt.arrow(pos[0], pos[1], dx, dy, head_width=1, head_length=1, fc='blue', ec='blue')
# 绘制路径
if show_paths and drone['path']:
path = drone['path']
x_path = [p[0] for p in path]
y_path = [p[1] for p in path]
plt.plot(x_path, y_path, 'g--', label=f'Path {drone_id}')
plt.grid(True)
plt.legend()
plt.title('Battlefield Simulation')
plt.xlabel('X (m)')
plt.ylabel('Y (m)')
plt.axis('equal')
plt.show()
def simulate_movement(self, drone_id: str, step_size: float = 1.0):
"""模拟无人机沿路径移动"""
if drone_id not in self.drones or not self.drones[drone_id]['path']:
return False
drone = self.drones[drone_id]
path = drone['path']
current_pos = drone['position']
# 找到当前位置在路径上的最近点
min_dist = float('inf')
next_idx = 0
for i, point in enumerate(path):
dist = np.sqrt((point[0] - current_pos[0])**2 +
(point[1] - current_pos[1])**2)
if dist < min_dist:
min_dist = dist
next_idx = i
# 移动到下一个点
if next_idx < len(path) - 1:
next_point = path[next_idx + 1]
self.drones[drone_id]['position'] = next_point
return True
return False

@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
# File: hybrid_astar.py
# Purpose: 实现混合 A* 路径规划算法的具体逻辑。
import numpy as np
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
import math
import heapq
@dataclass
class Node:
x: float
y: float
theta: float # 航向角
g_cost: float # 从起点到当前节点的代价
h_cost: float # 从当前节点到终点的估计代价
parent: Optional['Node'] = None
@property
def f_cost(self) -> float:
return self.g_cost + self.h_cost
class HybridAStar:
def __init__(self,
grid_size: float = 0.5,
max_steering_angle: float = math.pi/4,
steering_step: float = math.pi/12,
speed_step: float = 1.0,
max_speed: float = 5.0,
min_speed: float = -2.0):
self.grid_size = grid_size
self.max_steering_angle = max_steering_angle
self.steering_step = steering_step
self.speed_step = speed_step
self.max_speed = max_speed
self.min_speed = min_speed
def plan(self,
start: Tuple[float, float, float],
goal: Tuple[float, float, float],
obstacles: List[Tuple[float, float]],
vehicle_length: float = 4.0,
vehicle_width: float = 2.0) -> Optional[List[Tuple[float, float, float]]]:
"""
使用混合A*算法规划路径
参数:
start: (x, y, theta) 起点位置和航向
goal: (x, y, theta) 终点位置和航向
obstacles: [(x, y), ...] 障碍物位置列表
vehicle_length: 车辆长度
vehicle_width: 车辆宽度
返回:
路径点列表 [(x, y, theta), ...] None如果找不到路径
"""
# 初始化开放列表和关闭列表
open_list: List[Node] = []
closed_set = set()
# 创建起点节点
start_node = Node(
x=start[0],
y=start[1],
theta=start[2],
g_cost=0,
h_cost=self._heuristic(start, goal)
)
open_list.append(start_node)
while open_list:
# 获取f值最小的节点
current = min(open_list, key=lambda n: n.f_cost)
open_list.remove(current)
# 检查是否到达目标
if self._is_goal_reached(current, goal):
return self._reconstruct_path(current)
# 将当前节点加入关闭列表
closed_set.add(self._get_grid_position(current))
# 扩展当前节点
for next_node in self._get_successors(current, goal, obstacles, vehicle_length, vehicle_width):
grid_pos = self._get_grid_position(next_node)
# 如果节点在关闭列表中,跳过
if grid_pos in closed_set:
continue
# 如果节点不在开放列表中,添加它
if not any(self._is_same_node(next_node, n) for n in open_list):
open_list.append(next_node)
return None
def _heuristic(self, current: Tuple[float, float, float], goal: Tuple[float, float, float]) -> float:
"""计算启发式函数值"""
dx = goal[0] - current[0]
dy = goal[1] - current[1]
dtheta = abs(goal[2] - current[2])
return math.sqrt(dx*dx + dy*dy) + 0.5 * dtheta
def _is_goal_reached(self, current: Node, goal: Tuple[float, float, float]) -> bool:
"""检查是否到达目标"""
dx = goal[0] - current.x
dy = goal[1] - current.y
dtheta = abs(goal[2] - current.theta)
return (math.sqrt(dx*dx + dy*dy) < self.grid_size and
dtheta < math.pi/6)
def _get_grid_position(self, node: Node) -> Tuple[int, int, int]:
"""获取节点在网格中的位置"""
x = int(node.x / self.grid_size)
y = int(node.y / self.grid_size)
theta = int(node.theta / self.steering_step)
return (x, y, theta)
def _is_same_node(self, node1: Node, node2: Node) -> bool:
"""检查两个节点是否相同"""
grid_pos1 = self._get_grid_position(node1)
grid_pos2 = self._get_grid_position(node2)
return grid_pos1 == grid_pos2
def _get_successors(self,
current: Node,
goal: Tuple[float, float, float],
obstacles: List[Tuple[float, float]],
vehicle_length: float,
vehicle_width: float) -> List[Node]:
"""获取当前节点的后继节点"""
successors = []
# 尝试不同的转向角和速度
for steering in np.arange(-self.max_steering_angle,
self.max_steering_angle + self.steering_step,
self.steering_step):
for speed in np.arange(self.min_speed,
self.max_speed + self.speed_step,
self.speed_step):
# 计算下一个位置
next_x = current.x + speed * math.cos(current.theta)
next_y = current.y + speed * math.sin(current.theta)
next_theta = current.theta + speed * math.tan(steering) / vehicle_length
# 检查是否与障碍物碰撞
if self._check_collision((next_x, next_y, next_theta),
obstacles,
vehicle_length,
vehicle_width):
continue
# 创建新节点
next_node = Node(
x=next_x,
y=next_y,
theta=next_theta,
g_cost=current.g_cost + self._calculate_cost(current, (next_x, next_y, next_theta)),
h_cost=self._heuristic((next_x, next_y, next_theta), goal),
parent=current
)
successors.append(next_node)
return successors
def _check_collision(self,
pose: Tuple[float, float, float],
obstacles: List[Tuple[float, float]],
vehicle_length: float,
vehicle_width: float) -> bool:
"""检查给定位置是否与障碍物碰撞"""
# 计算车辆四个角的位置
corners = self._get_vehicle_corners(pose, vehicle_length, vehicle_width)
# 检查每个角是否与障碍物碰撞
for corner in corners:
for obstacle in obstacles:
if math.sqrt((corner[0] - obstacle[0])**2 +
(corner[1] - obstacle[1])**2) < self.grid_size:
return True
return False
def _get_vehicle_corners(self,
pose: Tuple[float, float, float],
vehicle_length: float,
vehicle_width: float) -> List[Tuple[float, float]]:
"""计算车辆四个角的位置"""
x, y, theta = pose
half_length = vehicle_length / 2
half_width = vehicle_width / 2
corners = [
(x + half_length * math.cos(theta) - half_width * math.sin(theta),
y + half_length * math.sin(theta) + half_width * math.cos(theta)),
(x + half_length * math.cos(theta) + half_width * math.sin(theta),
y + half_length * math.sin(theta) - half_width * math.cos(theta)),
(x - half_length * math.cos(theta) - half_width * math.sin(theta),
y - half_length * math.sin(theta) + half_width * math.cos(theta)),
(x - half_length * math.cos(theta) + half_width * math.sin(theta),
y - half_length * math.sin(theta) - half_width * math.cos(theta))
]
return corners
def _calculate_cost(self,
current: Node,
next_pose: Tuple[float, float, float]) -> float:
"""计算从当前节点到下一个节点的代价"""
# 距离代价
dx = next_pose[0] - current.x
dy = next_pose[1] - current.y
distance = math.sqrt(dx*dx + dy*dy)
# 转向代价
dtheta = abs(next_pose[2] - current.theta)
# 总代价
return distance + 0.5 * dtheta
def _reconstruct_path(self, goal_node: Node) -> List[Tuple[float, float, float]]:
"""重建路径"""
path = []
current = goal_node
while current is not None:
path.append((current.x, current.y, current.theta))
current = current.parent
return list(reversed(path))

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
# File: path_planner.py
# Purpose: 路径规划的核心逻辑,可能包含调用不同路径规划算法的接口。
from typing import List, Tuple, Optional, Dict
from .hybrid_astar import HybridAStar
import numpy as np
import json
import time
class PathPlanner:
def __init__(self):
self.planner = HybridAStar()
self.current_paths: Dict[str, List[Tuple[float, float, float]]] = {}
self.obstacles: List[Tuple[float, float]] = []
def plan_path(self,
drone_id: str,
start: Tuple[float, float, float],
goal: Tuple[float, float, float],
vehicle_length: float = 4.0,
vehicle_width: float = 2.0) -> Optional[List[Tuple[float, float, float]]]:
"""
为指定无人机规划路径
参数:
drone_id: 无人机ID
start: (x, y, theta) 起点位置和航向
goal: (x, y, theta) 终点位置和航向
vehicle_length: 车辆长度
vehicle_width: 车辆宽度
返回:
路径点列表 [(x, y, theta), ...] None如果找不到路径
"""
# 规划路径
path = self.planner.plan(
start=start,
goal=goal,
obstacles=self.obstacles,
vehicle_length=vehicle_length,
vehicle_width=vehicle_width
)
if path:
self.current_paths[drone_id] = path
return path
return None
def update_obstacles(self, obstacles: List[Tuple[float, float]]):
"""更新障碍物列表"""
self.obstacles = obstacles
def get_current_path(self, drone_id: str) -> Optional[List[Tuple[float, float, float]]]:
"""获取指定无人机的当前路径"""
return self.current_paths.get(drone_id)
def clear_path(self, drone_id: str):
"""清除指定无人机的路径"""
if drone_id in self.current_paths:
del self.current_paths[drone_id]
def smooth_path(self, path: List[Tuple[float, float, float]], weight_data: float = 0.5,
weight_smooth: float = 0.3, tolerance: float = 0.000001) -> List[Tuple[float, float, float]]:
"""
使用平滑算法优化路径
参数:
path: 原始路径
weight_data: 数据项权重
weight_smooth: 平滑项权重
tolerance: 收敛阈值
返回:
平滑后的路径
"""
if len(path) <= 2:
return path
# 将路径转换为numpy数组
path_array = np.array(path)
# 初始化平滑后的路径
smooth_path = path_array.copy()
# 迭代优化
change = tolerance
while change >= tolerance:
change = 0.0
# 对每个点进行优化(除了起点和终点)
for i in range(1, len(path) - 1):
for j in range(3): # x, y, theta
aux = smooth_path[i][j]
smooth_path[i][j] += weight_data * (path_array[i][j] - smooth_path[i][j])
smooth_path[i][j] += weight_smooth * (smooth_path[i-1][j] + smooth_path[i+1][j] - 2.0 * smooth_path[i][j])
change += abs(aux - smooth_path[i][j])
return [(x, y, theta) for x, y, theta in smooth_path]
def save_path(self, drone_id: str, filename: str):
"""保存路径到文件"""
if drone_id in self.current_paths:
path_data = {
'drone_id': drone_id,
'timestamp': time.time(),
'path': self.current_paths[drone_id]
}
with open(filename, 'w') as f:
json.dump(path_data, f)
def load_path(self, filename: str) -> Optional[str]:
"""从文件加载路径"""
try:
with open(filename, 'r') as f:
path_data = json.load(f)
self.current_paths[path_data['drone_id']] = path_data['path']
return path_data['drone_id']
except:
return None

@ -0,0 +1,138 @@
import numpy as np
from battlefield_simulator import BattlefieldSimulator
import time
def test_single_drone():
"""测试单个无人机的路径规划"""
# 创建战场模拟器
simulator = BattlefieldSimulator(width=100, height=100)
# 生成战场环境
simulator.generate_battlefield(
num_buildings=8,
num_barriers=12,
num_vehicles=3
)
# 添加无人机
start_pos = (10, 10, 0) # (x, y, theta)
simulator.add_drone("drone1", start_pos)
# 设置目标点
goal = (80, 80, np.pi/2)
# 规划路径
success = simulator.plan_path("drone1", goal)
print(f"Path planning {'successful' if success else 'failed'}")
# 可视化结果
simulator.visualize()
# 模拟移动
if success:
print("Simulating movement...")
for _ in range(10): # 模拟10步移动
simulator.simulate_movement("drone1")
simulator.visualize()
time.sleep(0.5) # 暂停0.5秒以便观察
def test_multiple_drones():
"""测试多个无人机的协同路径规划"""
# 创建战场模拟器
simulator = BattlefieldSimulator(width=100, height=100)
# 生成战场环境
simulator.generate_battlefield(
num_buildings=10,
num_barriers=15,
num_vehicles=5
)
# 添加多个无人机
drones = {
"drone1": (10, 10, 0),
"drone2": (20, 10, np.pi/4),
"drone3": (10, 20, np.pi/2)
}
goals = {
"drone1": (80, 80, np.pi/2),
"drone2": (70, 70, np.pi/4),
"drone3": (90, 90, 0)
}
# 添加无人机并规划路径
for drone_id, start_pos in drones.items():
simulator.add_drone(drone_id, start_pos)
success = simulator.plan_path(drone_id, goals[drone_id])
print(f"Path planning for {drone_id}: {'successful' if success else 'failed'}")
# 可视化初始状态
simulator.visualize()
# 模拟移动
print("Simulating movement...")
for _ in range(15): # 模拟15步移动
for drone_id in drones.keys():
simulator.simulate_movement(drone_id)
simulator.visualize()
time.sleep(0.5) # 暂停0.5秒以便观察
def test_dynamic_obstacles():
"""测试动态障碍物情况下的路径规划"""
# 创建战场模拟器
simulator = BattlefieldSimulator(width=100, height=100)
# 生成初始战场环境
simulator.generate_battlefield(
num_buildings=5,
num_barriers=8,
num_vehicles=3
)
# 添加无人机
start_pos = (10, 10, 0)
simulator.add_drone("drone1", start_pos)
# 设置目标点
goal = (80, 80, np.pi/2)
# 规划初始路径
success = simulator.plan_path("drone1", goal)
print(f"Initial path planning: {'successful' if success else 'failed'}")
# 可视化初始状态
simulator.visualize()
# 模拟移动并动态添加障碍物
print("Simulating movement with dynamic obstacles...")
for i in range(10):
# 移动无人机
simulator.simulate_movement("drone1")
# 每3步添加新的障碍物
if i % 3 == 0:
# 在无人机当前位置附近添加障碍物
pos = simulator.drones["drone1"]["position"]
new_obstacle = (
pos[0] + np.random.uniform(-10, 10),
pos[1] + np.random.uniform(-10, 10)
)
simulator.obstacles.append(new_obstacle)
simulator.path_planner.update_obstacles(simulator.obstacles)
# 重新规划路径
simulator.plan_path("drone1", goal)
simulator.visualize()
time.sleep(0.5)
if __name__ == "__main__":
print("Testing single drone scenario...")
test_single_drone()
print("\nTesting multiple drones scenario...")
test_multiple_drones()
print("\nTesting dynamic obstacles scenario...")
test_dynamic_obstacles()

@ -1 +1,7 @@
# UI组件包
# -*- coding: utf-8 -*-
# File: __init__.py (in ui)
# Purpose: 将 ui 目录标记为 Python 包。
# UI组件包
# UI package initialization

@ -1,6 +1,6 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout, QComboBox,
QSpinBox, QDoubleSpinBox, QCheckBox)
QSpinBox, QDoubleSpinBox, QCheckBox, QMessageBox)
from PyQt5.QtCore import Qt
class AlgorithmConfigView(QWidget):
@ -95,9 +95,11 @@ class AlgorithmConfigView(QWidget):
self.reset_config_btn.clicked.connect(self.reset_config)
def save_config(self):
# TODO: 实现保存配置功能
QMessageBox.information(self, "功能确认", "保存配置 (功能待实现)")
# TODO: Implement save config logic
pass
def reset_config(self):
# TODO: 实现重置配置功能
QMessageBox.information(self, "功能确认", "重置配置 (功能待实现)")
# TODO: Implement reset config logic (reset UI fields to default)
pass

@ -0,0 +1,296 @@
# -*- coding: utf-8 -*-
# File: base_map_view.py
# Purpose: 定义基础地图视图类,提供地图加载、显示、缩放、平移和点击处理等通用功能。
# 作为 SimpleMapView, ThreatLayerView, PathLayerView 的父类。
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QToolBar, QAction, QFileDialog)
from PyQt5.QtCore import Qt, pyqtSignal, QPointF, QRect, QRectF
from PyQt5.QtGui import QPixmap, QPainter, QPen, QColor, QTransform, QCursor, QMouseEvent
class BaseMapView(QWidget):
# 定义信号
map_loaded = pyqtSignal()
threat_points_changed = pyqtSignal(list)
def __init__(self, map_data_model):
super().__init__()
self.map_data_model = map_data_model
self.scale_factor = 1.0
# Panning state
self.panning = False
self.last_pan_point = QPointF()
self.map_offset = QPointF(0, 0) # Top-left corner offset of the map relative to the widget
self.init_ui()
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0) # Remove margins
# 创建工具栏
toolbar = QToolBar()
toolbar.setMovable(False)
# 添加工具栏按钮
self.load_map_action = QAction("加载地图", self)
self.zoom_in_action = QAction("放大", self)
self.zoom_out_action = QAction("缩小", self)
self.pan_action = QAction("平移", self)
self.pan_action.setCheckable(True) # Make pan action checkable
toolbar.addAction(self.load_map_action)
toolbar.addAction(self.zoom_in_action)
toolbar.addAction(self.zoom_out_action)
toolbar.addAction(self.pan_action)
main_layout.addWidget(toolbar)
# 创建地图显示区域
self.map_label = QLabel("请加载地图图片")
self.map_label.setAlignment(Qt.AlignTop | Qt.AlignLeft) # Align top-left for panning
self.map_label.setStyleSheet("background-color: #333; border: none;") # Darker background, no border
main_layout.addWidget(self.map_label)
# 设置布局
self.setLayout(main_layout)
# 连接信号
print("BaseMapView: Connecting load_map_action to load_map_image") # Debug print
self.load_map_action.triggered.connect(self.load_map_image)
print("BaseMapView: Connecting zoom_in_action to zoom_in") # Debug print
self.zoom_in_action.triggered.connect(self.zoom_in)
print("BaseMapView: Connecting zoom_out_action to zoom_out") # Debug print
self.zoom_out_action.triggered.connect(self.zoom_out)
print("BaseMapView: Connecting pan_action to toggle_pan_mode") # Debug print
self.pan_action.triggered.connect(self.toggle_pan_mode) # Connect pan toggle
print("BaseMapView: Connecting map_data_model.data_changed to update_map") # Debug print
self.map_data_model.data_changed.connect(self.update_map)
def load_map_image(self):
print("BaseMapView: load_map_image called") # Debug print: function entry
file_name, _ = QFileDialog.getOpenFileName(self, "选择地图图片", "", "Images (*.png *.jpg *.bmp)")
if file_name:
pixmap = QPixmap(file_name)
if pixmap.isNull():
print("BaseMapView: Error - Loaded pixmap is null!")
self.map_data_model.set_map(None) # Ensure model knows it's invalid
else:
print(f"BaseMapView: Map loaded successfully, size: {pixmap.width()}x{pixmap.height()}")
self.map_data_model.set_map(pixmap)
self.reset_view() # Reset zoom/pan on new map load
self.map_loaded.emit()
def reset_view(self):
print("BaseMapView: reset_view called") # Debug print
if not self.map_data_model.map_pixmap or self.map_data_model.map_pixmap.isNull():
print("BaseMapView: reset_view - No valid map pixmap found.")
self.scale_factor = 1.0
self.map_offset = QPointF(0, 0)
self.update_map()
return
# Calculate scale factor to fit the map inside the label
label_size = self.map_label.size()
map_size = self.map_data_model.map_pixmap.size()
print(f"BaseMapView: reset_view - Label size: {label_size.width()}x{label_size.height()}, Map size: {map_size.width()}x{map_size.height()}")
if label_size.isEmpty() or map_size.isEmpty() or map_size.width() == 0 or map_size.height() == 0:
print("BaseMapView: reset_view - Warning: Invalid label or map size for scaling.")
self.scale_factor = 1.0 # Default if sizes are invalid
else:
scale_x = label_size.width() / map_size.width()
scale_y = label_size.height() / map_size.height()
self.scale_factor = min(scale_x, scale_y) # Use min to fit entirely (KeepAspectRatio)
print(f"BaseMapView: reset_view - Calculated initial scale factor: {self.scale_factor}")
# Calculate offset to center the scaled map
scaled_map_width = map_size.width() * self.scale_factor
scaled_map_height = map_size.height() * self.scale_factor
offset_x = (label_size.width() - scaled_map_width) / 2
offset_y = (label_size.height() - scaled_map_height) / 2
self.map_offset = QPointF(offset_x, offset_y)
print(f"BaseMapView: reset_view - Calculated initial offset: ({offset_x}, {offset_y})")
self.update_map()
def update_map(self):
# print("BaseMapView: update_map called") # Can be noisy, enable if needed
if self.map_data_model.map_pixmap and not self.map_data_model.map_pixmap.isNull():
original_pixmap = self.map_data_model.map_pixmap
label_size = self.map_label.size()
# Calculate scaled size
scaled_width = int(original_pixmap.width() * self.scale_factor)
scaled_height = int(original_pixmap.height() * self.scale_factor)
if scaled_width <= 0 or scaled_height <= 0:
print(f"BaseMapView: update_map - Warning: Invalid scaled size ({scaled_width}x{scaled_height}), skipping draw.")
return
# Create a new pixmap for drawing
# Ensure display_pixmap is at least the size of the label to avoid issues
display_pixmap_width = max(scaled_width + int(abs(self.map_offset.x())) + 1, label_size.width())
display_pixmap_height = max(scaled_height + int(abs(self.map_offset.y())) + 1, label_size.height())
display_pixmap = QPixmap(display_pixmap_width, display_pixmap_height)
display_pixmap.fill(Qt.transparent) # Use transparent background
# print(f"BaseMapView: update_map - Scale: {self.scale_factor:.2f}, Offset: ({self.map_offset.x():.1f}, {self.map_offset.y():.1f})")
# print(f"BaseMapView: update_map - Scaled map size: {scaled_width}x{scaled_height}")
# print(f"BaseMapView: update_map - Display pixmap size: {display_pixmap_width}x{display_pixmap_height}")
painter = QPainter(display_pixmap)
painter.setRenderHint(QPainter.SmoothPixmapTransform) # Improve scaling quality
# Define the target rectangle for the scaled map within the display_pixmap
# Ensure offset is integer for QRect constructor
target_rect = QRect(int(self.map_offset.x()), int(self.map_offset.y()),
scaled_width, scaled_height)
# Draw the scaled original map
painter.drawPixmap(target_rect, original_pixmap, original_pixmap.rect())
# --- Draw overlays ---
# (Code for drawing overlays remains the same)
# ... (threats, start, goal, paths) ...
# Apply scale and offset transform to the painter for drawing overlays
transform = QTransform()
transform.translate(self.map_offset.x(), self.map_offset.y())
transform.scale(self.scale_factor, self.scale_factor)
painter.setTransform(transform)
# Draw threats
if self.map_data_model.threat_points:
point_size = max(1.0, 5.0 / self.scale_factor) # Ensure point size is at least 1
pen = QPen(QColor(255, 0, 0), point_size) # Keep pen size visually constant
painter.setPen(pen)
for point in self.map_data_model.threat_points:
painter.drawPoint(QPointF(point[0], point[1])) # Draw at original coords (transformed)
# Draw start point
if self.map_data_model.start_point:
point_size = max(1.0, 5.0 / self.scale_factor)
pen = QPen(QColor(0, 255, 0), point_size)
painter.setPen(pen)
painter.drawPoint(QPointF(self.map_data_model.start_point[0], self.map_data_model.start_point[1]))
# Draw goal point
if self.map_data_model.goal_point:
point_size = max(1.0, 5.0 / self.scale_factor)
pen = QPen(QColor(0, 0, 255), point_size)
painter.setPen(pen)
painter.drawPoint(QPointF(self.map_data_model.goal_point[0], self.map_data_model.goal_point[1]))
# Draw paths
if self.map_data_model.paths:
line_width = max(0.5, 2.0 / self.scale_factor) # Use 0 for cosmetic pen if needed, ensure > 0
pen = QPen(QColor(0, 255, 255), line_width)
painter.setPen(pen)
for path in self.map_data_model.paths:
if len(path) > 1:
points = [QPointF(p[0], p[1]) for p in path]
painter.drawPolyline(*points) # Draw lines between original coords
painter.end()
# Set the potentially larger pixmap on the label
self.map_label.setPixmap(display_pixmap)
# print("BaseMapView: update_map - Pixmap set on label")
else:
print("BaseMapView: update_map - No map pixmap to draw.")
self.map_label.setText("请加载地图图片")
self.map_label.setPixmap(QPixmap()) # Clear pixmap if none loaded
def resizeEvent(self, event):
super().resizeEvent(event)
# We might need to call update_map if label size affects how map is displayed
# For now, assume update_map is called when needed by zoom/pan
self.update_map() # Update map on resize
def zoom(self, factor):
# Store previous scale and mouse position (relative to widget)
prev_scale = self.scale_factor
mouse_pos = self.map_label.mapFromGlobal(QCursor.pos())
# Calculate map point under mouse before zoom
map_point_before_zoom = (mouse_pos - self.map_offset) / prev_scale
# Update scale factor
self.scale_factor *= factor
# Add limits to scaling if needed
# self.scale_factor = max(0.1, min(self.scale_factor, 10.0))
# Calculate map point under mouse after zoom (if offset remained the same)
map_point_after_zoom_same_offset = map_point_before_zoom * self.scale_factor
# Adjust offset to keep the point under the mouse cursor stationary
self.map_offset = mouse_pos - map_point_after_zoom_same_offset
self.update_map()
def zoom_in(self):
print("BaseMapView: zoom_in called") # Debug print
self.zoom(1.2) # Zoom in by 20%
def zoom_out(self):
print("BaseMapView: zoom_out called") # Debug print
self.zoom(1 / 1.2) # Zoom out by 20%
def toggle_pan_mode(self, checked):
print(f"BaseMapView: toggle_pan_mode called, checked: {checked}") # Debug print
self.panning = checked
if self.panning:
self.map_label.setCursor(Qt.OpenHandCursor)
else:
self.map_label.setCursor(Qt.ArrowCursor)
def mousePressEvent(self, event):
if self.panning and event.button() == Qt.LeftButton:
self.last_pan_point = event.pos()
self.map_label.setCursor(Qt.ClosedHandCursor)
event.accept()
elif not self.panning and event.button() == Qt.LeftButton:
label_pos = self.map_label.mapFromParent(event.pos())
if self.map_label.pixmap() and not self.map_label.pixmap().isNull() and self.map_label.rect().contains(label_pos):
# Calculate map point under mouse in original map coordinates
map_point_f = (label_pos - self.map_offset) / self.scale_factor
# Check if click is within the actual map boundaries (optional but good practice)
original_map_rect = QRectF(0, 0, self.map_data_model.map_pixmap.width(), self.map_data_model.map_pixmap.height())
if original_map_rect.contains(map_point_f):
# Call the handler method (implemented by subclasses)
self.handle_map_click(map_point_f)
event.accept() # Indicate event was handled
return # Don't ignore if handled
# If click wasn't on map or not handled, ignore
event.ignore()
else:
# Ignore other buttons or if panning is off but not LeftButton
event.ignore()
def mouseMoveEvent(self, event):
if self.panning and event.buttons() & Qt.LeftButton:
delta = event.pos() - self.last_pan_point
self.map_offset += delta # Update offset
self.last_pan_point = event.pos()
self.update_map() # Redraw map at new offset
event.accept()
else:
event.ignore()
def mouseReleaseEvent(self, event):
if self.panning and event.button() == Qt.LeftButton:
self.map_label.setCursor(Qt.OpenHandCursor) # Change back to open hand
event.accept()
else:
event.ignore()
def handle_map_click(self, map_point):
"""Subclasses should override this to handle clicks on the map."""
# Default implementation does nothing
pass
# Need to import QRect, QPointF, QTransform, QCursor, QMouseEvent at the top
from PyQt5.QtCore import QRect, QRectF
from PyQt5.QtGui import QCursor, QMouseEvent

@ -1,12 +1,188 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout)
from PyQt5.QtCore import Qt
QPushButton, QGroupBox, QFormLayout, QProgressBar,
QComboBox, QSpinBox, QDoubleSpinBox, QDialog,
QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QColor, QPalette
class ParameterDialog(QDialog):
def __init__(self, title, min_value, max_value, unit, parent=None):
super().__init__(parent)
self.title = title
self.min_value = min_value
self.max_value = max_value
self.unit = unit
self.init_ui()
def init_ui(self):
self.setWindowTitle(self.title)
layout = QFormLayout()
self.value_spin = QDoubleSpinBox()
self.value_spin.setRange(self.min_value, self.max_value)
self.value_spin.setSuffix(f" {self.unit}")
self.value_spin.setDecimals(1)
layout.addRow("值:", self.value_spin)
buttons = QHBoxLayout()
ok_button = QPushButton("确定")
cancel_button = QPushButton("取消")
ok_button.clicked.connect(self.accept)
cancel_button.clicked.connect(self.reject)
buttons.addWidget(ok_button)
buttons.addWidget(cancel_button)
layout.addRow(buttons)
self.setLayout(layout)
def get_value(self):
return self.value_spin.value()
class PayloadDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
def init_ui(self):
self.setWindowTitle("设置负载")
layout = QFormLayout()
self.type_combo = QComboBox()
self.type_combo.addItems(["", "货物", "医疗物资", "救援设备", "监控设备"])
self.weight_spin = QDoubleSpinBox()
self.weight_spin.setRange(0, 100)
self.weight_spin.setSuffix(" kg")
self.weight_spin.setDecimals(1)
layout.addRow("负载类型:", self.type_combo)
layout.addRow("负载重量:", self.weight_spin)
buttons = QHBoxLayout()
ok_button = QPushButton("确定")
cancel_button = QPushButton("取消")
ok_button.clicked.connect(self.accept)
cancel_button.clicked.connect(self.reject)
buttons.addWidget(ok_button)
buttons.addWidget(cancel_button)
layout.addRow(buttons)
self.setLayout(layout)
def get_payload_info(self):
return {
"type": self.type_combo.currentText(),
"weight": self.weight_spin.value()
}
class DroneDetailView(QWidget):
def __init__(self):
# 定义信号
control_command = pyqtSignal(str, str) # 命令类型, 参数
def __init__(self, comm_manager=None):
super().__init__()
self.comm_manager = comm_manager
self.current_drone_id = None
self.init_ui()
self.setup_communication()
def setup_communication(self):
"""设置通信回调"""
if self.comm_manager:
self.comm_manager.register_callback("drone_updated", self.on_drone_updated)
self.comm_manager.register_callback("command_sent", self.on_command_sent)
def on_drone_updated(self, drone_id, data):
"""处理无人机更新事件"""
if drone_id == self.current_drone_id:
self.update_drone_info(drone_id, self.comm_manager.get_drone(drone_id))
def on_command_sent(self, drone_id, data):
"""处理命令发送事件"""
if drone_id == self.current_drone_id:
command = data.get("command", "")
params = data.get("params", {})
QMessageBox.information(self, "命令发送", f"命令 {command} 已发送到无人机 {drone_id}")
def set_drone(self, drone_id):
"""设置当前显示的无人机"""
self.current_drone_id = drone_id
if self.comm_manager:
info = self.comm_manager.get_drone(drone_id)
self.update_drone_info(drone_id, info)
def send_control_command(self, command_type, params=None):
"""发送控制命令"""
if self.current_drone_id and self.comm_manager:
self.comm_manager.send_command(self.current_drone_id, command_type, params)
def set_altitude(self):
"""设置高度"""
dialog = ParameterDialog("设置高度", 0, 1000, "m", self)
if dialog.exec_() == QDialog.Accepted:
value = dialog.get_value()
self.send_control_command("set_altitude", {"value": value})
def set_speed(self):
"""设置速度"""
dialog = ParameterDialog("设置速度", 0, 50, "m/s", self)
if dialog.exec_() == QDialog.Accepted:
value = dialog.get_value()
self.send_control_command("set_speed", {"value": value})
def set_heading(self):
"""设置航向"""
dialog = ParameterDialog("设置航向", 0, 360, "°", self)
if dialog.exec_() == QDialog.Accepted:
value = dialog.get_value()
self.send_control_command("set_heading", {"value": value})
def set_payload(self):
"""设置负载"""
dialog = PayloadDialog(self)
if dialog.exec_() == QDialog.Accepted:
info = dialog.get_payload_info()
self.send_control_command("set_payload", info)
def update_drone_info(self, drone_id, info):
"""更新无人机信息"""
self.current_drone_id = drone_id
# 更新基本信息
self.id_label.setText(info.get("id", ""))
self.model_label.setText(info.get("model", ""))
self.status_label.setText(info.get("status", ""))
self.battery_label.setText(f"{info.get('battery', 0)}%")
self.group_label.setText(info.get("group", ""))
self.color_label.setText(info.get("color", ""))
# 更新位置信息
self.latitude_label.setText(f"{info.get('latitude', 0):.6f}")
self.longitude_label.setText(f"{info.get('longitude', 0):.6f}")
self.altitude_label.setText(f"{info.get('altitude', 0):.1f}m")
self.speed_label.setText(f"{info.get('speed', 0):.1f}m/s")
self.heading_label.setText(f"{info.get('heading', 0):.1f}°")
# 更新负载信息
self.payload_type_label.setText(info.get("payload_type", ""))
self.payload_weight_label.setText(f"{info.get('payload_weight', 0):.1f}kg")
self.payload_status_label.setText(info.get("payload_status", ""))
# 更新通信状态
self.signal_strength.setValue(info.get("signal_strength", 0))
self.latency_label.setText(f"{info.get('latency', 0)}ms")
self.packet_loss_label.setText(f"{info.get('packet_loss', 0):.1f}%")
# 更新任务信息
self.task_type_label.setText(info.get("task_type", ""))
self.task_status_label.setText(info.get("task_status", ""))
self.task_progress.setValue(info.get("task_progress", 0))
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
@ -19,11 +195,15 @@ class DroneDetailView(QWidget):
self.model_label = QLabel("")
self.status_label = QLabel("")
self.battery_label = QLabel("")
self.group_label = QLabel("")
self.color_label = QLabel("")
basic_layout.addRow("ID:", self.id_label)
basic_layout.addRow("型号:", self.model_label)
basic_layout.addRow("状态:", self.status_label)
basic_layout.addRow("电量:", self.battery_label)
basic_layout.addRow("分组:", self.group_label)
basic_layout.addRow("颜色标识:", self.color_label)
basic_info_group.setLayout(basic_layout)
main_layout.addWidget(basic_info_group)
@ -36,49 +216,129 @@ class DroneDetailView(QWidget):
self.longitude_label = QLabel("")
self.altitude_label = QLabel("")
self.speed_label = QLabel("")
self.heading_label = QLabel("")
position_layout.addRow("纬度:", self.latitude_label)
position_layout.addRow("经度:", self.longitude_label)
position_layout.addRow("高度:", self.altitude_label)
position_layout.addRow("速度:", self.speed_label)
position_layout.addRow("航向:", self.heading_label)
position_group.setLayout(position_layout)
main_layout.addWidget(position_group)
# 创建负载信息组
payload_group = QGroupBox("负载信息")
payload_layout = QFormLayout()
self.payload_type_label = QLabel("")
self.payload_weight_label = QLabel("")
self.payload_status_label = QLabel("")
payload_layout.addRow("负载类型:", self.payload_type_label)
payload_layout.addRow("负载重量:", self.payload_weight_label)
payload_layout.addRow("负载状态:", self.payload_status_label)
payload_group.setLayout(payload_layout)
main_layout.addWidget(payload_group)
# 创建通信状态组
comm_group = QGroupBox("通信状态")
comm_layout = QFormLayout()
self.signal_strength = QProgressBar()
self.signal_strength.setRange(0, 100)
self.signal_strength.setValue(0)
self.latency_label = QLabel("")
self.packet_loss_label = QLabel("")
comm_layout.addRow("信号强度:", self.signal_strength)
comm_layout.addRow("通信延迟:", self.latency_label)
comm_layout.addRow("丢包率:", self.packet_loss_label)
comm_group.setLayout(comm_layout)
main_layout.addWidget(comm_group)
# 创建任务信息组
task_group = QGroupBox("任务信息")
task_layout = QFormLayout()
self.task_type_label = QLabel("")
self.task_status_label = QLabel("")
self.task_progress = QProgressBar()
self.task_progress.setRange(0, 100)
self.task_progress.setValue(0)
task_layout.addRow("任务类型:", self.task_type_label)
task_layout.addRow("任务状态:", self.task_status_label)
task_layout.addRow("任务进度:", self.task_progress)
task_group.setLayout(task_layout)
main_layout.addWidget(task_group)
# 创建控制按钮
button_layout = QHBoxLayout()
control_group = QGroupBox("控制面板")
control_layout = QVBoxLayout()
# 基本控制按钮
basic_control_layout = QHBoxLayout()
self.takeoff_btn = QPushButton("起飞")
self.land_btn = QPushButton("降落")
self.return_btn = QPushButton("返航")
self.emergency_btn = QPushButton("紧急停止")
button_layout.addWidget(self.takeoff_btn)
button_layout.addWidget(self.land_btn)
button_layout.addWidget(self.return_btn)
button_layout.addWidget(self.emergency_btn)
basic_control_layout.addWidget(self.takeoff_btn)
basic_control_layout.addWidget(self.land_btn)
basic_control_layout.addWidget(self.return_btn)
basic_control_layout.addWidget(self.emergency_btn)
# 高级控制
advanced_control_layout = QHBoxLayout()
self.set_altitude_btn = QPushButton("设置高度")
self.set_speed_btn = QPushButton("设置速度")
self.set_heading_btn = QPushButton("设置航向")
self.set_payload_btn = QPushButton("设置负载")
main_layout.addLayout(button_layout)
advanced_control_layout.addWidget(self.set_altitude_btn)
advanced_control_layout.addWidget(self.set_speed_btn)
advanced_control_layout.addWidget(self.set_heading_btn)
advanced_control_layout.addWidget(self.set_payload_btn)
control_layout.addLayout(basic_control_layout)
control_layout.addLayout(advanced_control_layout)
control_group.setLayout(control_layout)
main_layout.addWidget(control_group)
self.setLayout(main_layout)
# 连接信号
# Connect control buttons
self.set_altitude_btn.clicked.connect(self.set_altitude)
self.set_speed_btn.clicked.connect(self.set_speed)
self.set_heading_btn.clicked.connect(self.set_heading)
self.set_payload_btn.clicked.connect(self.set_payload)
self.takeoff_btn.clicked.connect(self.takeoff)
self.land_btn.clicked.connect(self.land)
self.return_btn.clicked.connect(self.return_home)
self.emergency_btn.clicked.connect(self.emergency_stop)
def takeoff(self):
# TODO: 实现起飞功能
pass
QMessageBox.information(self, "功能确认", "起飞命令 (功能待实现)")
# self.send_control_command("takeoff")
def land(self):
# TODO: 实现降落功能
pass
QMessageBox.information(self, "功能确认", "降落命令 (功能待实现)")
# self.send_control_command("land")
def return_home(self):
# TODO: 实现返航功能
pass
QMessageBox.information(self, "功能确认", "返航命令 (功能待实现)")
# self.send_control_command("return_home")
def emergency_stop(self):
# TODO: 实现紧急停止功能
pass
reply = QMessageBox.warning(self, "紧急停止确认",
"确定要发送紧急停止命令吗?这将停止所有电机!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
QMessageBox.information(self, "功能确认", "紧急停止命令 (功能待实现)")
# self.send_control_command("emergency_stop")

@ -1,21 +1,177 @@
# -*- coding: utf-8 -*-
# File: drone_list_view.py
# Purpose: 定义无人机列表标签页的 UI 和逻辑,显示无人机信息并提供管理操作。
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QTableWidget, QTableWidgetItem)
from PyQt5.QtCore import Qt
QPushButton, QTableWidget, QTableWidgetItem,
QComboBox, QColorDialog, QMenu, QAction,
QDialog, QLineEdit, QFormLayout, QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QColor, QBrush
class AddDroneDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
def init_ui(self):
self.setWindowTitle("添加无人机")
layout = QFormLayout()
self.id_edit = QLineEdit()
self.model_edit = QLineEdit()
self.group_combo = QComboBox()
self.group_combo.addItems(["运输组", "监控组", "救援组"])
layout.addRow("ID:", self.id_edit)
layout.addRow("型号:", self.model_edit)
layout.addRow("分组:", self.group_combo)
buttons = QHBoxLayout()
ok_button = QPushButton("确定")
cancel_button = QPushButton("取消")
ok_button.clicked.connect(self.accept)
cancel_button.clicked.connect(self.reject)
buttons.addWidget(ok_button)
buttons.addWidget(cancel_button)
layout.addRow(buttons)
self.setLayout(layout)
def get_drone_info(self):
return {
"id": self.id_edit.text(),
"model": self.model_edit.text(),
"group": self.group_combo.currentText(),
"status": "待命",
"battery": 100,
"signal_strength": 100,
"latency": 0,
"packet_loss": 0,
"payload_type": "",
"payload_weight": 0,
"payload_status": "未装载",
"task_type": "",
"task_status": "空闲",
"task_progress": 0
}
class DroneListView(QWidget):
def __init__(self):
# 定义信号
drone_selected = pyqtSignal(str) # 无人机被选中时发出信号
def __init__(self, comm_manager=None):
super().__init__()
self.comm_manager = comm_manager
self.drone_colors = {} # 存储无人机颜色
self.drone_groups = {} # 存储无人机分组
self.init_ui()
self.setup_communication()
def setup_communication(self):
"""设置通信回调"""
if self.comm_manager:
self.comm_manager.register_callback("drone_added", self.on_drone_added)
self.comm_manager.register_callback("drone_removed", self.on_drone_removed)
self.comm_manager.register_callback("drone_updated", self.on_drone_updated)
def on_drone_added(self, drone_id, data):
"""处理无人机添加事件"""
self.add_drone_to_table(drone_id, self.comm_manager.get_drone(drone_id))
def on_drone_removed(self, drone_id, data):
"""处理无人机移除事件"""
self.remove_drone_from_table(drone_id)
def on_drone_updated(self, drone_id, data):
"""处理无人机更新事件"""
self.update_drone_in_table(drone_id, self.comm_manager.get_drone(drone_id))
def add_drone_to_table(self, drone_id, info):
"""添加无人机到表格"""
row = self.drone_table.rowCount()
self.drone_table.insertRow(row)
# 设置各列数据
self.drone_table.setItem(row, 0, QTableWidgetItem(info.get("id", "")))
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
# 应用颜色
if drone_id in self.drone_colors:
self.update_drone_color(drone_id)
def remove_drone_from_table(self, drone_id):
"""从表格中移除无人机"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.removeRow(row)
break
def update_drone_in_table(self, drone_id, info):
"""更新表格中的无人机信息"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
break
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
# 创建过滤和分组控制
filter_layout = QHBoxLayout()
# 状态过滤
self.status_filter = QComboBox()
self.status_filter.addItems(["全部状态", "飞行中", "待命", "充电中", "故障"])
self.status_filter.currentTextChanged.connect(self.apply_filters)
# 分组过滤
self.group_filter = QComboBox()
self.group_filter.addItems(["全部分组", "运输组", "监控组", "救援组"])
self.group_filter.currentTextChanged.connect(self.apply_filters)
filter_layout.addWidget(QLabel("状态过滤:"))
filter_layout.addWidget(self.status_filter)
filter_layout.addWidget(QLabel("分组过滤:"))
filter_layout.addWidget(self.group_filter)
filter_layout.addStretch()
main_layout.addLayout(filter_layout)
# 创建无人机列表
self.drone_table = QTableWidget()
self.drone_table.setColumnCount(5)
self.drone_table.setHorizontalHeaderLabels(["ID", "型号", "状态", "位置", "电量"])
self.drone_table.setStyleSheet("QTableWidget { border: 1px solid #ccc; }")
self.drone_table.setColumnCount(8) # 增加列数
self.drone_table.setHorizontalHeaderLabels([
"ID", "型号", "状态", "位置", "电量", "负载", "分组", "通信状态"
])
self.drone_table.setStyleSheet("""
QTableWidget {
border: 1px solid #ccc;
gridline-color: #ddd;
}
QTableWidget::item:selected {
background-color: #e0e0e0;
}
""")
self.drone_table.setContextMenuPolicy(Qt.CustomContextMenu)
self.drone_table.customContextMenuRequested.connect(self.show_context_menu)
self.drone_table.itemSelectionChanged.connect(self.on_selection_changed)
main_layout.addWidget(self.drone_table)
# 创建控制按钮
@ -24,11 +180,15 @@ class DroneListView(QWidget):
self.edit_drone_btn = QPushButton("编辑信息")
self.delete_drone_btn = QPushButton("删除无人机")
self.refresh_btn = QPushButton("刷新状态")
self.set_color_btn = QPushButton("设置颜色")
self.set_group_btn = QPushButton("设置分组")
button_layout.addWidget(self.add_drone_btn)
button_layout.addWidget(self.edit_drone_btn)
button_layout.addWidget(self.delete_drone_btn)
button_layout.addWidget(self.refresh_btn)
button_layout.addWidget(self.set_color_btn)
button_layout.addWidget(self.set_group_btn)
main_layout.addLayout(button_layout)
@ -39,19 +199,224 @@ class DroneListView(QWidget):
self.edit_drone_btn.clicked.connect(self.edit_drone)
self.delete_drone_btn.clicked.connect(self.delete_drone)
self.refresh_btn.clicked.connect(self.refresh_status)
self.set_color_btn.clicked.connect(self.set_drone_color)
self.set_group_btn.clicked.connect(self.set_drone_group)
def add_drone(self):
# TODO: 实现添加无人机功能
def show_context_menu(self, position):
menu = QMenu()
view_details = QAction("查看详情", self)
set_color = QAction("设置颜色", self)
set_group = QAction("设置分组", self)
show_trajectory = QAction("显示轨迹", self)
menu.addAction(view_details)
menu.addAction(set_color)
menu.addAction(set_group)
menu.addAction(show_trajectory)
action = menu.exec_(self.drone_table.mapToGlobal(position))
if action == view_details:
self.view_drone_details()
elif action == set_color:
self.set_drone_color()
elif action == set_group:
self.set_drone_group()
elif action == show_trajectory:
self.show_drone_trajectory()
def on_selection_changed(self):
selected_items = self.drone_table.selectedItems()
if selected_items:
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
self.drone_selected.emit(drone_id)
def apply_filters(self):
status_filter = self.status_filter.currentText()
group_filter = self.group_filter.currentText()
for row in range(self.drone_table.rowCount()):
show_row = True
if status_filter != "全部状态":
status = self.drone_table.item(row, 2).text()
if status != status_filter:
show_row = False
if group_filter != "全部分组":
group = self.drone_table.item(row, 6).text()
if group != group_filter:
show_row = False
self.drone_table.setRowHidden(row, not show_row)
def set_drone_color(self):
"""设置无人机颜色"""
selected_items = self.drone_table.selectedItems()
if not selected_items:
QMessageBox.warning(self, "警告", "请先选择要设置颜色的无人机")
return
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
color = QColorDialog.getColor()
if color.isValid():
self.drone_colors[drone_id] = color
self.update_drone_color(drone_id)
QMessageBox.information(self, "功能确认", f"无人机 {drone_id} 颜色已设置")
def set_drone_group(self):
"""设置无人机分组"""
selected_items = self.drone_table.selectedItems()
if not selected_items:
QMessageBox.warning(self, "警告", "请先选择要设置分组的无人机")
return
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
group = self.group_filter.currentText()
if group != "全部分组":
self.drone_groups[drone_id] = group
self.update_drone_group(drone_id)
if self.comm_manager:
self.comm_manager.update_drone(drone_id, {"group": group})
QMessageBox.information(self, "功能确认", f"设置无人机 {drone_id} 分组 (功能待实现)")
def update_drone_color(self, drone_id):
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
color = self.drone_colors.get(drone_id, QColor(255, 255, 255))
for col in range(self.drone_table.columnCount()):
item = self.drone_table.item(row, col)
if item:
item.setBackground(QBrush(color))
break
def update_drone_group(self, drone_id):
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
group = self.drone_groups.get(drone_id, "未分组")
self.drone_table.setItem(row, 6, QTableWidgetItem(group))
break
def view_drone_details(self):
QMessageBox.information(self, "功能确认", "查看详情按钮已连接")
# TODO: 实现查看无人机详情功能
pass
def show_drone_trajectory(self):
QMessageBox.information(self, "功能确认", "显示轨迹按钮已连接")
# TODO: 实现显示无人机轨迹功能
pass
def add_drone(self):
dialog = AddDroneDialog(self)
if dialog.exec_() == QDialog.Accepted:
drone_info = dialog.get_drone_info()
# TODO: Add drone to actual data source (e.g., comm_manager)
self.add_drone_to_table(drone_info['id'], drone_info) # Add to table for now
QMessageBox.information(self, "功能确认", f"添加无人机: {drone_info['id']}")
def edit_drone(self):
# TODO: 实现编辑无人机信息功能
pass
selected_items = self.drone_table.selectedItems()
if not selected_items:
QMessageBox.warning(self, "提示", "请先选择要编辑的无人机")
return
# For simplicity, we'll just show a message. Actual editing would require a dialog.
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
QMessageBox.information(self, "功能确认", f"编辑无人机 {drone_id} (功能待实现)")
# TODO: Implement edit drone functionality
def delete_drone(self):
# TODO: 实现删除无人机功能
selected_items = self.drone_table.selectedItems()
if not selected_items:
QMessageBox.warning(self, "提示", "请先选择要删除的无人机")
return
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
reply = QMessageBox.question(self, "确认删除", f"确定要删除无人机 {drone_id} 吗?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
# TODO: Remove drone from actual data source (e.g., comm_manager)
self.remove_drone_from_table(drone_id) # Remove from table for now
QMessageBox.information(self, "功能确认", f"删除无人机 {drone_id}")
def refresh_status(self):
# TODO: Implement actual status refresh logic from comm_manager or data source
QMessageBox.information(self, "功能确认", "刷新状态 (功能待实现)")
# Example: self.update_drone_list(self.comm_manager.get_all_drones() if self.comm_manager else [])
def update_drone_list(self, drone_list):
# 这里可以实现刷新表格的逻辑,当前先占位
pass
def add_drone_to_table(self, drone_id, info):
"""添加无人机到表格"""
row = self.drone_table.rowCount()
self.drone_table.insertRow(row)
def refresh_status(self):
# TODO: 实现刷新状态功能
pass
# 设置各列数据
self.drone_table.setItem(row, 0, QTableWidgetItem(info.get("id", "")))
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
# 应用颜色
if drone_id in self.drone_colors:
self.update_drone_color(drone_id)
def remove_drone_from_table(self, drone_id):
"""从表格中移除无人机"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.removeRow(row)
break
def update_drone_in_table(self, drone_id, info):
"""更新表格中的无人机信息"""
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
self.drone_table.setItem(row, 1, QTableWidgetItem(info.get("model", "")))
self.drone_table.setItem(row, 2, QTableWidgetItem(info.get("status", "")))
self.drone_table.setItem(row, 3, QTableWidgetItem(f"{info.get('latitude', 0):.6f}, {info.get('longitude', 0):.6f}"))
self.drone_table.setItem(row, 4, QTableWidgetItem(f"{info.get('battery', 0)}%"))
self.drone_table.setItem(row, 5, QTableWidgetItem(f"{info.get('payload_type', '')} ({info.get('payload_weight', 0)}kg)"))
self.drone_table.setItem(row, 6, QTableWidgetItem(info.get("group", "")))
self.drone_table.setItem(row, 7, QTableWidgetItem(f"{info.get('signal_strength', 0)}%"))
break
def update_drone_color(self, drone_id):
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
color = self.drone_colors.get(drone_id, QColor(255, 255, 255))
for col in range(self.drone_table.columnCount()):
item = self.drone_table.item(row, col)
if item:
item.setBackground(QBrush(color))
break
def update_drone_group(self, drone_id):
for row in range(self.drone_table.rowCount()):
if self.drone_table.item(row, 0).text() == drone_id:
group = self.drone_groups.get(drone_id, "未分组")
self.drone_table.setItem(row, 6, QTableWidgetItem(group))
break
def view_drone_details(self):
selected_items = self.drone_table.selectedItems()
if selected_items:
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
self.drone_selected.emit(drone_id)
def show_drone_trajectory(self):
"""显示无人机轨迹"""
selected_items = self.drone_table.selectedItems()
if not selected_items:
QMessageBox.warning(self, "警告", "请先选择要查看轨迹的无人机")
return
drone_id = self.drone_table.item(selected_items[0].row(), 0).text()
# TODO: 实现轨迹显示功能
QMessageBox.information(self, "提示", f"显示无人机 {drone_id} 的轨迹")

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: login_view.py
# Purpose: 定义登录界面的 UI 和逻辑。
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal

@ -1,87 +0,0 @@
from PyQt5.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QTabWidget, QMessageBox)
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QFont
class MainView(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("无人机后勤输送系统 - 指挥控制中心")
self.setMinimumSize(800, 600)
self.init_ui()
def init_ui(self):
# 创建主窗口部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# 顶部工具栏
toolbar = QHBoxLayout()
self.status_label = QLabel("系统状态: 正常")
self.status_label.setFont(QFont('Arial', 10))
logout_button = QPushButton("退出登录")
logout_button.clicked.connect(self.handle_logout)
toolbar.addWidget(self.status_label)
toolbar.addStretch()
toolbar.addWidget(logout_button)
main_layout.addLayout(toolbar)
# 主内容区域
self.tab_widget = QTabWidget()
# 创建各个功能标签页
self.tab_widget.addTab(self.create_drone_management_tab(), "无人机管理")
self.tab_widget.addTab(self.create_task_management_tab(), "任务管理")
self.tab_widget.addTab(self.create_monitoring_tab(), "实时监控")
self.tab_widget.addTab(self.create_system_settings_tab(), "系统设置")
main_layout.addWidget(self.tab_widget)
def create_drone_management_tab(self):
tab = QWidget()
layout = QVBoxLayout(tab)
# 添加无人机管理相关的控件
layout.addWidget(QLabel("无人机管理功能待实现"))
return tab
def create_task_management_tab(self):
tab = QWidget()
layout = QVBoxLayout(tab)
# 添加任务管理相关的控件
layout.addWidget(QLabel("任务管理功能待实现"))
return tab
def create_monitoring_tab(self):
tab = QWidget()
layout = QVBoxLayout(tab)
# 添加实时监控相关的控件
layout.addWidget(QLabel("实时监控功能待实现"))
return tab
def create_system_settings_tab(self):
tab = QWidget()
layout = QVBoxLayout(tab)
# 添加系统设置相关的控件
layout.addWidget(QLabel("系统设置功能待实现"))
return tab
def handle_logout(self):
reply = QMessageBox.question(self, '确认退出',
'确定要退出登录吗?',
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No)
if reply == QMessageBox.Yes:
# TODO: 实现实际的登出逻辑
self.close()
# 这里应该触发登录窗口的显示

@ -0,0 +1,39 @@
from PyQt5.QtCore import QObject, pyqtSignal
class MapDataModel(QObject):
data_changed = pyqtSignal()
def __init__(self):
super().__init__()
self.map_pixmap = None
self.threat_points = []
self.start_point = None
self.goal_point = None
self.paths = []
def set_map(self, pixmap):
self.map_pixmap = pixmap
self.data_changed.emit()
def add_threat_point(self, point):
self.threat_points.append(point)
self.data_changed.emit()
def set_start_point(self, point):
self.start_point = point
self.data_changed.emit()
def set_goal_point(self, point):
self.goal_point = point
self.data_changed.emit()
def clear_all(self):
self.threat_points.clear()
self.start_point = None
self.goal_point = None
self.paths = []
self.data_changed.emit()
def set_paths(self, paths):
self.paths = paths
self.data_changed.emit()

@ -1,63 +1,80 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QTableWidget, QTableWidgetItem)
from PyQt5.QtCore import Qt
from PyQt5.QtCore import Qt, QPointF
from .base_map_view import BaseMapView
class PathLayerView(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
class PathLayerView(BaseMapView):
def __init__(self, map_data_model):
super().__init__(map_data_model)
self.add_path_mode = False
self.current_path = []
self.init_additional_ui()
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
def init_additional_ui(self):
# 创建路径控制按钮
path_control_layout = QHBoxLayout()
# 创建路径列表
self.path_table = QTableWidget()
self.path_table.setColumnCount(4)
self.path_table.setHorizontalHeaderLabels(["ID", "起点", "终点", "状态"])
self.path_table.setStyleSheet("QTableWidget { border: 1px solid #ccc; }")
main_layout.addWidget(self.path_table)
self.add_path_btn = QPushButton("添加路径点")
self.add_path_btn.setCheckable(True)
self.add_path_btn.clicked.connect(self.toggle_add_path_mode)
# 创建控制按钮
button_layout = QHBoxLayout()
self.add_path_btn = QPushButton("添加路径")
self.edit_path_btn = QPushButton("编辑路径")
self.delete_path_btn = QPushButton("删除路径")
self.simulate_path_btn = QPushButton("模拟路径")
self.complete_path_btn = QPushButton("完成路径")
self.complete_path_btn.clicked.connect(self.complete_path)
self.complete_path_btn.setEnabled(False)
button_layout.addWidget(self.add_path_btn)
button_layout.addWidget(self.edit_path_btn)
button_layout.addWidget(self.delete_path_btn)
button_layout.addWidget(self.simulate_path_btn)
self.clear_paths_btn = QPushButton("清除所有路径")
self.clear_paths_btn.clicked.connect(self.clear_all_paths)
main_layout.addLayout(button_layout)
path_control_layout.addWidget(self.add_path_btn)
path_control_layout.addWidget(self.complete_path_btn)
path_control_layout.addWidget(self.clear_paths_btn)
# 创建路径详情区域
self.path_detail = QLabel("路径详情")
self.path_detail.setAlignment(Qt.AlignCenter)
self.path_detail.setStyleSheet("background-color: #f0f0f0; border: 1px solid #ccc;")
main_layout.addWidget(self.path_detail)
# 将按钮添加到主布局
layout = self.layout()
if layout:
layout.addLayout(path_control_layout)
else:
print("Error: Layout not found in PathLayerView")
self.setLayout(main_layout)
def toggle_add_path_mode(self):
self.add_path_mode = self.add_path_btn.isChecked()
if self.add_path_mode:
self.add_path_btn.setText("取消添加点")
self.complete_path_btn.setEnabled(bool(self.current_path))
else:
self.add_path_btn.setText("添加路径点")
self.complete_path_btn.setEnabled(False)
def complete_path(self):
if len(self.current_path) > 1:
# Add a copy to the main data model
self.map_data_model.paths.append(list(self.current_path))
self.map_data_model.data_changed.emit()
self.current_path = []
self.add_path_mode = False
self.add_path_btn.setChecked(False)
self.add_path_btn.setText("添加路径点")
self.complete_path_btn.setEnabled(False)
self.update_map()
def clear_all_paths(self):
self.current_path = []
self.map_data_model.paths = []
self.map_data_model.data_changed.emit()
self.update_map()
# 连接信号
self.add_path_btn.clicked.connect(self.add_path)
self.edit_path_btn.clicked.connect(self.edit_path)
self.delete_path_btn.clicked.connect(self.delete_path)
self.simulate_path_btn.clicked.connect(self.simulate_path)
def add_path(self):
# TODO: 实现添加路径功能
pass
def edit_path(self):
# TODO: 实现编辑路径功能
pass
def delete_path(self):
# TODO: 实现删除路径功能
pass
def simulate_path(self):
# TODO: 实现路径模拟功能
pass
def handle_map_click(self, map_point: QPointF):
"""Handles clicks forwarded from BaseMapView."""
if self.add_path_mode:
img_x = int(map_point.x())
img_y = int(map_point.y())
# 添加路径点到当前路径
self.current_path.append((img_x, img_y))
self.complete_path_btn.setEnabled(True)
print(f"Added path point: {img_x}, {img_y}")
# self.update_map()
# Remove the old mousePressEvent
# def mousePressEvent(self, event):
# pass

@ -1,6 +1,10 @@
# -*- coding: utf-8 -*-
# File: path_planning_view.py
# Purpose: 定义路径规划标签页的 UI提供设置路径规划参数和触发规划操作的界面。
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout, QComboBox,
QSpinBox, QDoubleSpinBox)
QSpinBox, QDoubleSpinBox, QMessageBox)
from PyQt5.QtCore import Qt
class PathPlanningView(QWidget):
@ -76,13 +80,16 @@ class PathPlanningView(QWidget):
self.export_path_btn.clicked.connect(self.export_path)
def plan_path(self):
# TODO: 实现路径规划功能
QMessageBox.information(self, "功能确认", "规划路径 (功能待实现)")
# TODO: Implement path planning logic
pass
def clear_path(self):
# TODO: 实现清除路径功能
QMessageBox.information(self, "功能确认", "清除路径 (功能待实现)")
# TODO: Clear path from MapDataModel or internal state
pass
def export_path(self):
# TODO: 实现导出路径功能
QMessageBox.information(self, "功能确认", "导出路径 (功能待实现)")
# TODO: Implement path export functionality
pass

@ -1,109 +0,0 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QGroupBox, QFormLayout, QProgressBar,
QSpinBox, QDoubleSpinBox)
from PyQt5.QtCore import Qt, QTimer
class PathSimulationView(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.simulation_timer = QTimer()
self.simulation_timer.timeout.connect(self.update_simulation)
self.simulation_progress = 0
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
# 创建模拟控制组
control_group = QGroupBox("模拟控制")
control_layout = QFormLayout()
self.speed_spinbox = QDoubleSpinBox()
self.speed_spinbox.setRange(0.1, 10.0)
self.speed_spinbox.setValue(1.0)
self.speed_spinbox.setSingleStep(0.1)
self.interval_spinbox = QSpinBox()
self.interval_spinbox.setRange(10, 1000)
self.interval_spinbox.setValue(100)
self.interval_spinbox.setSingleStep(10)
control_layout.addRow("模拟速度:", self.speed_spinbox)
control_layout.addRow("更新间隔(ms):", self.interval_spinbox)
control_group.setLayout(control_layout)
main_layout.addWidget(control_group)
# 创建模拟进度条
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
main_layout.addWidget(self.progress_bar)
# 创建控制按钮
button_layout = QHBoxLayout()
self.start_btn = QPushButton("开始模拟")
self.pause_btn = QPushButton("暂停")
self.stop_btn = QPushButton("停止")
self.reset_btn = QPushButton("重置")
button_layout.addWidget(self.start_btn)
button_layout.addWidget(self.pause_btn)
button_layout.addWidget(self.stop_btn)
button_layout.addWidget(self.reset_btn)
main_layout.addLayout(button_layout)
# 创建状态显示组
status_group = QGroupBox("模拟状态")
status_layout = QFormLayout()
self.time_label = QLabel("0.0s")
self.distance_label = QLabel("0.0m")
self.altitude_label = QLabel("0.0m")
self.speed_label = QLabel("0.0m/s")
status_layout.addRow("已用时间:", self.time_label)
status_layout.addRow("飞行距离:", self.distance_label)
status_layout.addRow("当前高度:", self.altitude_label)
status_layout.addRow("当前速度:", self.speed_label)
status_group.setLayout(status_layout)
main_layout.addWidget(status_group)
self.setLayout(main_layout)
# 连接信号
self.start_btn.clicked.connect(self.start_simulation)
self.pause_btn.clicked.connect(self.pause_simulation)
self.stop_btn.clicked.connect(self.stop_simulation)
self.reset_btn.clicked.connect(self.reset_simulation)
def start_simulation(self):
self.simulation_timer.start(self.interval_spinbox.value())
def pause_simulation(self):
self.simulation_timer.stop()
def stop_simulation(self):
self.simulation_timer.stop()
self.simulation_progress = 0
self.progress_bar.setValue(0)
def reset_simulation(self):
self.simulation_progress = 0
self.progress_bar.setValue(0)
self.time_label.setText("0.0s")
self.distance_label.setText("0.0m")
self.altitude_label.setText("0.0m")
self.speed_label.setText("0.0m/s")
def update_simulation(self):
# TODO: 实现模拟更新逻辑
self.simulation_progress += self.speed_spinbox.value()
if self.simulation_progress > 100:
self.simulation_progress = 100
self.simulation_timer.stop()
self.progress_bar.setValue(int(self.simulation_progress))

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# File: simple_map_view.py
# Purpose: 定义地图视图标签页,继承自 BaseMapView主要负责威胁点的交互。
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QPushButton, QFileDialog
from PyQt5.QtGui import QPixmap, QPainter, QPen, QColor
from PyQt5.QtCore import Qt, pyqtSignal, QPointF
from .base_map_view import BaseMapView
class SimpleMapView(BaseMapView):
# 当威胁点变化时发出信号,便于主程序联动路径规划
threat_points_changed = pyqtSignal(list)
def __init__(self, map_data_model):
super().__init__(map_data_model)
self.add_threat_point_mode = False
self.init_additional_ui()
# Force re-connection of load map action in case it was overwritten
if hasattr(self, 'load_map_action'):
print("SimpleMapView: Forcing re-connection of load_map_action")
try:
self.load_map_action.triggered.disconnect()
except TypeError: # No connections exist
pass
self.load_map_action.triggered.connect(self.load_map_image)
else:
print("SimpleMapView: Warning - load_map_action not found in base class?")
def init_additional_ui(self):
# 添加威胁点标记按钮
self.threat_point_btn = QPushButton("标记威胁点")
self.threat_point_btn.setCheckable(True)
self.threat_point_btn.clicked.connect(self.toggle_threat_point_mode)
# 将按钮添加到布局中
layout = self.layout()
if layout:
layout.addWidget(self.threat_point_btn)
else:
print("Error: Layout not found in SimpleMapView") # Error handling
def toggle_threat_point_mode(self):
print("SimpleMapView: toggle_threat_point_mode called") # Debug print
is_checked = self.threat_point_btn.isChecked()
print(f"SimpleMapView: threat_point_btn isChecked: {is_checked}")
self.add_threat_point_mode = is_checked
print(f"SimpleMapView: self.add_threat_point_mode set to: {self.add_threat_point_mode}")
if self.add_threat_point_mode:
self.threat_point_btn.setText("取消标记")
else:
self.threat_point_btn.setText("标记威胁点")
def handle_map_click(self, map_point: QPointF):
"""Handles clicks forwarded from BaseMapView."""
print(f"SimpleMapView: handle_map_click called, mode: {self.add_threat_point_mode}") # Debug print
if self.add_threat_point_mode:
img_x = int(map_point.x())
img_y = int(map_point.y())
print(f"SimpleMapView: Adding threat point at ({img_x}, {img_y})") # Debug print
# 添加威胁点到数据模型
self.map_data_model.add_threat_point((img_x, img_y))
# Emit signal (optional, BaseMapView already emits on data change)
# self.threat_points_changed.emit(self.map_data_model.threat_points)
# Remove the old mousePressEvent if it exists
# def mousePressEvent(self, event): # <--- REMOVE THIS METHOD
# pass
# load_map_image and update_map are now handled by BaseMapView
# We might need specific update logic here in the future if SimpleMapView
# needs drawing beyond what BaseMapView provides.

@ -1,3 +1,7 @@
# -*- coding: utf-8 -*-
# File: status_dashboard.py
# Purpose: 定义状态仪表板标签页的 UI显示系统整体状态和性能指标。
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QProgressBar, QGroupBox)
from PyQt5.QtCore import Qt

@ -1,56 +1,71 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QTableWidget, QTableWidgetItem)
from PyQt5.QtCore import Qt
QPushButton, QTableWidget, QTableWidgetItem, QSpinBox)
from PyQt5.QtCore import Qt, QPointF
from .base_map_view import BaseMapView
class ThreatLayerView(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
class ThreatLayerView(BaseMapView):
def __init__(self, map_data_model):
super().__init__(map_data_model)
self.add_threat_mode = False
self.threat_radius = 50 # 默认威胁半径
self.init_additional_ui()
def init_ui(self):
# 创建主布局
main_layout = QVBoxLayout()
def init_additional_ui(self):
# 创建威胁控制面板
threat_control_layout = QHBoxLayout()
# 创建威胁列表
self.threat_table = QTableWidget()
self.threat_table.setColumnCount(4)
self.threat_table.setHorizontalHeaderLabels(["ID", "类型", "位置", "威胁等级"])
self.threat_table.setStyleSheet("QTableWidget { border: 1px solid #ccc; }")
main_layout.addWidget(self.threat_table)
# 威胁点标记按钮
self.add_threat_btn = QPushButton("添加威胁区域")
self.add_threat_btn.setCheckable(True)
self.add_threat_btn.clicked.connect(self.toggle_add_threat_mode)
# 创建控制按钮
button_layout = QHBoxLayout()
self.add_threat_btn = QPushButton("添加威胁")
self.edit_threat_btn = QPushButton("编辑威胁")
self.delete_threat_btn = QPushButton("删除威胁")
# 威胁半径控制
radius_label = QLabel("威胁半径:")
self.radius_spinbox = QSpinBox()
self.radius_spinbox.setRange(10, 200)
self.radius_spinbox.setValue(self.threat_radius)
self.radius_spinbox.valueChanged.connect(self.set_threat_radius)
button_layout.addWidget(self.add_threat_btn)
button_layout.addWidget(self.edit_threat_btn)
button_layout.addWidget(self.delete_threat_btn)
# 清除威胁按钮
self.clear_threat_btn = QPushButton("清除威胁")
self.clear_threat_btn.clicked.connect(self.clear_threats)
main_layout.addLayout(button_layout)
# 添加控件到布局
threat_control_layout.addWidget(self.add_threat_btn)
threat_control_layout.addWidget(self.clear_threat_btn)
# 创建威胁详情区域
self.threat_detail = QLabel("威胁详情")
self.threat_detail.setAlignment(Qt.AlignCenter)
self.threat_detail.setStyleSheet("background-color: #f0f0f0; border: 1px solid #ccc;")
main_layout.addWidget(self.threat_detail)
self.setLayout(main_layout)
# 连接信号
self.add_threat_btn.clicked.connect(self.add_threat)
self.edit_threat_btn.clicked.connect(self.edit_threat)
self.delete_threat_btn.clicked.connect(self.delete_threat)
def add_threat(self):
# TODO: 实现添加威胁功能
pass
# 将控制面板添加到主布局
layout = self.layout()
if layout:
layout.addLayout(threat_control_layout)
else:
print("Error: Layout not found in ThreatLayerView")
def toggle_add_threat_mode(self):
self.add_threat_mode = self.add_threat_btn.isChecked()
if self.add_threat_mode:
self.add_threat_btn.setText("取消添加")
else:
self.add_threat_btn.setText("添加威胁区域")
def set_threat_radius(self, value):
self.threat_radius = value
def edit_threat(self):
# TODO: 实现编辑威胁功能
pass
def clear_threats(self):
self.map_data_model.threat_points = []
self.map_data_model.data_changed.emit()
self.update_map() # Force redraw
def delete_threat(self):
# TODO: 实现删除威胁功能
pass
def handle_map_click(self, map_point: QPointF):
"""Handles clicks forwarded from BaseMapView."""
if self.add_threat_mode:
img_x = int(map_point.x())
img_y = int(map_point.y())
# 添加威胁点到数据模型
# Note: Threat radius is stored but not visualized yet
self.map_data_model.add_threat_point((img_x, img_y))
# BaseMapView will redraw automatically due to data_changed signal
# Remove the old mousePressEvent
# def mousePressEvent(self, event):
# pass
Loading…
Cancel
Save