通信协议 #12

Merged
pg9nrcf7t merged 6 commits from yyb_branch into main 8 months ago

@ -0,0 +1,57 @@
## MAVLINK的数据库数据连接与处理
在上次的技术博客中我们谈到了mavlink的基本传输格式与基础应用然而真正想在软件中使用mavlink进行通信还有许多的困难需要解决其中一大问题就是如何连接调用数据库并在对其中数据内容进行转化处理。
### 一、mavlink与无人机的连接
该功能需要我们设计一个mavlink_handler模块负责建立和维护MAVLink连接处理MAVLink消息的发送和接收。
```
class MavlinkHandler:
def __init__(self, connection_string: str = "udp:127.0.0.1:14550"):
self.connection_string = connection_string
self.connection = None
def connect(self) -> bool:
"""建立MAVLink连接"""
try:
self.connection = mavutil.mavlink_connection(self.connection_string)
self.connection.wait_heartbeat()
print(f"MAVLink连接成功: {self.connection_string}")
return True
except Exception as e:
print(f"MAVLink连接失败: {str(e)}")
return False
```
可以看到在我们定义的类的初始化函数中我们声明了一串叫connection_string的字符串而该串就是主通信端与无人机端相连的关键它可以是无人机的IP地址也可以是一个蓝牙、串口具体连接方式需要根据无人机的类型和底层通信形式而决定在样例中通过该IP地址我们就能与无人机相连接以实现进一步的功能。
### 二、mavlink与数据库的连接
```
class DatabaseHandler:
#替换数据库账号密码
def __init__(self, host: str, user: str, password: str, database: str):
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self) -> bool:
"""连接到MySQL数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
print(f"数据库连接成功: {self.database}")
return True
except Error as e:
print(f"数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.connection:
self.connection.close()
```
如上代码所示我们连接数据库的方式和一般的程序并无不同只需要通过mysql.connector库在初始化函数中正确输入自己的mysql账号密码与数据库建立连接即可。
而在数据库数据的处理问题上我们则是需要编写一个函数对从数据库中提取出的数据进行分解处理并将处理好的数据存放在我们设置的一个数据队列中随着每一次的队列处理循环将一条条数据通过mavlink发送出去。

@ -0,0 +1,550 @@
我会帮你实现通信模块。首先,我们需要创建通信模块的目录结构和相关文件。让我们先看看是否已经有相关的目录:
Ran tool
我们需要创建一个新的通信模块。我会创建以下文件:
1. `communication/` 目录
2. `mavlink_handler.py` - MAVLink通信处理
3. `database_handler.py` - 数据库操作
4. `message_queue.py` - 消息队列管理
5. `__init__.py` - 模块初始化文件
我已经完成了通信模块的实现,这个模块包含以下主要组件:
1. `MavlinkHandler` - MAVLink通信处理器
- 负责建立和维护MAVLink连接
- 处理MAVLink消息的发送和接收
- 支持消息回调注册
- 使用线程进行异步消息处理
2. `DatabaseHandler` - 数据库处理器:
- 负责MySQL数据库的连接和管理
- 提供数据库表的创建和管理
- 支持MAVLink消息的存储和查询
- 管理无人机状态信息的存储和更新
3. `MessageQueue` - 消息队列:
- 提供线程安全的消息队列实现
- 支持消息的异步处理
- 提供回调机制
- 包含消息时间戳
4. `CommunicationManager` - 通信管理器:
- 协调MAVLink和数据库组件
- 处理消息的路由和分发
- 管理无人机状态更新
- 提供统一的接口访问通信功能
要使用这个通信模块,你需要:
1. 安装必要的依赖:
```bash
pip install pymavlink mysql-connector-python
```
2. 配置数据库连接:
```python
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'drone_control'
}
```
3. 初始化通信管理器:
```python
from command_center.communication import CommunicationManager
# 创建通信管理器实例
comm_manager = CommunicationManager(
mavlink_connection="udp:127.0.0.1:14550", # MAVLink连接字符串
db_config=db_config
)
# 启动通信管理器
if comm_manager.start():
print("通信管理器启动成功")
# 获取无人机状态
drone_status = comm_manager.get_drone_status("drone_1")
print(f"无人机状态: {drone_status}")
# 获取消息队列
message_queue = comm_manager.get_message_queue()
# 注册消息回调
def on_message():
msg = message_queue.get()
if msg:
print(f"收到消息: {msg}")
message_queue.register_callback(on_message)
```
这个通信模块提供了完整的MAVLink通信和数据库集成功能可以
1. 接收和处理MAVLink消息
2. 将消息存储到MySQL数据库
3. 实时更新无人机状态
4. 提供消息队列和回调机制
5. 支持多线程异步处理
你可以根据需要扩展这个模块,比如:
1. 添加更多的MAVLink消息处理
2. 实现更复杂的数据库查询
3. 添加消息过滤和转换功能
4. 实现消息重传机制
5. 添加错误处理和恢复机制
需要我详细解释某个部分或者添加其他功能吗?
我来详细描述一下当前通信模块的工作流程:
1. **初始化阶段**
- 创建 `CommunicationManager` 实例,传入 MAVLink 连接字符串和数据库配置
- 初始化三个核心组件:
- `MavlinkHandler`:负责 MAVLink 通信
- `DatabaseHandler`:负责数据库操作
- `MessageQueue`:负责消息队列管理
2. **启动阶段**(调用 `start()` 方法):
- 首先连接数据库
- 创建必要的数据库表(`mavlink_messages` 和 `drone_status`
- 建立 MAVLink 连接
- 注册三个关键消息的回调函数:
- `HEARTBEAT`:心跳消息
- `GLOBAL_POSITION_INT`:位置信息
- `SYS_STATUS`:系统状态
- 启动 MAVLink 消息接收线程
- 启动消息处理线程
3. **消息接收流程**
```
MAVLink设备 -> MavlinkHandler -> MessageQueue -> CommunicationManager -> DatabaseHandler
```
- `MavlinkHandler` 通过 `_receive_loop` 持续监听 MAVLink 消息
- 收到消息后,调用 `_handle_message` 处理
- 消息被放入 `MessageQueue` 队列
- 同时触发注册的回调函数
4. **消息处理流程**
```
MessageQueue -> CommunicationManager._process_messages -> 数据库存储/状态更新
```
- `CommunicationManager``_process_messages` 线程从队列获取消息
- 对消息进行分类处理:
- 保存到数据库的 `mavlink_messages`
- 根据消息类型更新 `drone_status`
- 位置消息:更新经纬度和高度
- 状态消息:更新电池电量和系统状态
5. **数据库操作**
- 所有 MAVLink 消息都被保存到 `mavlink_messages`
- 无人机状态信息被保存到 `drone_status`
- 支持按时间范围和消息类型查询历史消息
- 支持获取最新的无人机状态
6. **消息回调机制**
- 支持注册消息回调函数
- 当收到特定类型的消息时,自动触发对应的回调
- 回调函数可以处理消息并执行自定义逻辑
7. **错误处理**
- 连接失败时的重试机制
- 消息处理异常的错误捕获
- 数据库操作的异常处理和回滚
8. **停止流程**(调用 `stop()` 方法):
- 停止消息处理线程
- 停止 MAVLink 连接
- 断开数据库连接
- 清理资源
这个通信模块的设计特点:
1. **异步处理**:使用多线程实现消息的异步接收和处理
2. **数据持久化**:所有消息都被保存到数据库,便于后续分析
3. **状态管理**:实时维护无人机的最新状态
4. **可扩展性**:通过回调机制支持自定义消息处理
5. **错误恢复**:包含完整的错误处理机制
使用示例:
```python
# 初始化
comm_manager = CommunicationManager(
mavlink_connection="udp:127.0.0.1:14550",
db_config={
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'drone_control'
}
)
# 启动通信
comm_manager.start()
# 获取无人机状态
drone_status = comm_manager.get_drone_status("drone_1")
# 注册消息回调
def on_message():
msg = comm_manager.get_message_queue().get()
if msg:
print(f"收到消息: {msg}")
comm_manager.get_message_queue().register_callback(on_message)
```
这个通信模块提供了一个完整的解决方案,可以:
1. 实时接收和处理 MAVLink 消息
2. 将消息持久化到数据库
3. 维护无人机的最新状态
4. 支持自定义消息处理
5. 提供错误恢复机制
让我详细解释一下与无人机建立连接的过程。在当前的实现中,连接建立主要发生在 `MavlinkHandler` 类中,通过 `connect()` 方法实现。让我们看看具体的连接流程:
1. **连接初始化**
```python
# 在 MavlinkHandler 类中
def __init__(self, connection_string: str = "udp:127.0.0.1:14550"):
self.connection_string = connection_string
self.connection = None
# ... 其他初始化代码
```
2. **建立连接**
```python
def connect(self) -> bool:
"""建立MAVLink连接"""
try:
# 创建MAVLink连接
self.connection = mavutil.mavlink_connection(self.connection_string)
# 等待心跳包,确认连接成功
self.connection.wait_heartbeat()
print(f"MAVLink连接成功: {self.connection_string}")
return True
except Exception as e:
print(f"MAVLink连接失败: {str(e)}")
return False
```
连接建立的关键点:
1. **连接字符串格式**
- 支持多种连接方式:
- UDP: `"udp:IP:PORT"` (例如: `"udp:127.0.0.1:14550"`)
- TCP: `"tcp:IP:PORT"`
- 串口: `"serial:PORT,BAUDRATE"` (例如: `"serial:/dev/ttyUSB0,57600"`)
- 蓝牙: `"bluetooth:ADDRESS"`
2. **连接验证**
- 通过 `wait_heartbeat()` 等待无人机发送心跳包
- 心跳包是MAVLink协议中的基础消息用于确认连接状态
- 如果在一定时间内没有收到心跳包,连接会失败
3. **连接状态管理**
- 连接成功后,`connection` 对象会被保存
- 用于后续的消息发送和接收
- 连接失败时会返回 `False`,上层可以处理错误
要使用这个连接功能,你需要:
1. **确定连接方式**
- 如果无人机使用地面站软件如QGroundControl通常使用UDP连接
- 如果直接连接飞控,可能需要使用串口连接
- 连接字符串需要根据实际情况配置
2. **配置连接参数**
```python
# UDP连接示例
mavlink_connection = "udp:127.0.0.1:14550" # 本地UDP连接
# 串口连接示例
mavlink_connection = "serial:/dev/ttyUSB0,57600" # Linux系统
# 或
mavlink_connection = "serial:COM3,57600" # Windows系统
```
3. **建立连接**
```python
# 创建通信管理器
comm_manager = CommunicationManager(
mavlink_connection="udp:127.0.0.1:14550", # 根据实际情况修改
db_config={
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'drone_control'
}
)
# 启动连接
if comm_manager.start():
print("连接成功")
else:
print("连接失败,请检查连接参数")
```
4. **连接后的操作**
- 发送命令:使用 `send_message` 方法
- 接收消息:通过消息队列或回调函数
- 监控状态:通过数据库查询
5. **错误处理**
- 连接失败时会有错误提示
- 可以设置重试机制
- 可以添加连接状态监控
建议的连接使用流程:
1. **准备阶段**
```python
def setup_connection():
# 配置连接参数
connection_config = {
'mavlink': {
'type': 'udp', # 或 'serial', 'tcp' 等
'address': '127.0.0.1',
'port': 14550,
'baudrate': 57600 # 串口连接时需要
},
'database': {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'drone_control'
}
}
# 构建连接字符串
if connection_config['mavlink']['type'] == 'udp':
connection_string = f"udp:{connection_config['mavlink']['address']}:{connection_config['mavlink']['port']}"
elif connection_config['mavlink']['type'] == 'serial':
connection_string = f"serial:{connection_config['mavlink']['address']},{connection_config['mavlink']['baudrate']}"
return connection_string, connection_config['database']
```
2. **连接建立**
```python
def establish_connection():
connection_string, db_config = setup_connection()
# 创建通信管理器
comm_manager = CommunicationManager(connection_string, db_config)
# 尝试连接
max_retries = 3
retry_count = 0
while retry_count < max_retries:
if comm_manager.start():
print("连接成功")
return comm_manager
else:
retry_count += 1
print(f"连接失败,第{retry_count}次重试...")
time.sleep(2) # 等待2秒后重试
print("连接失败,达到最大重试次数")
return None
```
3. **连接监控**
```python
def monitor_connection(comm_manager):
def heartbeat_callback(msg):
print(f"收到心跳包: {msg.to_dict()}")
# 注册心跳回调
comm_manager.mavlink.register_callback('HEARTBEAT', heartbeat_callback)
# 定期检查连接状态
while True:
if not comm_manager.mavlink.connection:
print("连接断开,尝试重连...")
if not comm_manager.mavlink.connect():
print("重连失败")
break
time.sleep(1)
```
这个连接模块提供了灵活的连接方式,可以根据不同的无人机和飞控配置选择合适的连接参数。同时,它也包含了错误处理和重试机制,确保连接的可靠性。
需要注意的是:
1. 确保无人机和地面站软件如QGroundControl使用相同的端口
2. 串口连接时需要正确的端口号和波特率
3. 防火墙设置可能会影响UDP/TCP连接
4. 建议在连接前先测试基本的MAVLink通信
你可以根据实际需求调整连接参数和错误处理策略。需要我详细解释某个部分吗?
MAVLink 消息的格式是标准化的二进制协议格式。让我详细解释一下 MAVLink 消息的格式和结构:
1. **MAVLink 消息基本结构**
```
+-------------------+------------------+------------------+------------------+------------------+
| 起始标记 (0xFE) | 载荷长度 (1字节) | 序列号 (1字节) | 系统ID (1字节) | 组件ID (1字节) |
+-------------------+------------------+------------------+------------------+------------------+
| 消息ID (1字节) | 载荷数据 (N字节) | CRC校验 (2字节) |
+-------------------+------------------+------------------+
```
2. **消息字段说明**
- **起始标记 (0xFE)**:标识消息开始
- **载荷长度**消息载荷的长度不包括头部和CRC
- **序列号**:用于检测消息丢失
- **系统ID**发送系统的ID如无人机ID
- **组件ID**发送组件的ID如飞控、相机等
- **消息ID**:标识消息类型
- **载荷数据**:实际的消息内容
- **CRC校验**:用于验证消息完整性
3. **常见消息类型**
```python
# 心跳消息 (HEARTBEAT)
{
'type': 'HEARTBEAT',
'system_status': 0, # 系统状态
'base_mode': 0, # 基础模式
'custom_mode': 0, # 自定义模式
'mavlink_version': 3 # MAVLink版本
}
# 位置消息 (GLOBAL_POSITION_INT)
{
'type': 'GLOBAL_POSITION_INT',
'time_boot_ms': 0, # 启动时间
'lat': 0, # 纬度 (度 * 1e7)
'lon': 0, # 经度 (度 * 1e7)
'alt': 0, # 高度 (毫米)
'relative_alt': 0, # 相对高度 (毫米)
'vx': 0, # X方向速度 (厘米/秒)
'vy': 0, # Y方向速度 (厘米/秒)
'vz': 0, # Z方向速度 (厘米/秒)
'hdg': 0 # 航向 (度 * 100)
}
# 系统状态消息 (SYS_STATUS)
{
'type': 'SYS_STATUS',
'onboard_control_sensors_present': 0, # 存在的传感器
'onboard_control_sensors_enabled': 0, # 启用的传感器
'onboard_control_sensors_health': 0, # 传感器健康状态
'load': 0, # 处理器负载
'voltage_battery': 0, # 电池电压
'current_battery': 0, # 电池电流
'battery_remaining': 0, # 剩余电量百分比
'drop_rate_comm': 0, # 通信丢包率
'errors_comm': 0, # 通信错误
'errors_count1': 0, # 错误计数1
'errors_count2': 0, # 错误计数2
'errors_count3': 0, # 错误计数3
'errors_count4': 0 # 错误计数4
}
```
4. **消息处理示例**
```python
# 在 MavlinkHandler 类中处理消息
def _handle_message(self, msg):
"""处理接收到的MAVLink消息"""
msg_type = msg.get_type()
msg_data = msg.to_dict()
# 根据消息类型处理
if msg_type == 'HEARTBEAT':
self._handle_heartbeat(msg_data)
elif msg_type == 'GLOBAL_POSITION_INT':
self._handle_position(msg_data)
elif msg_type == 'SYS_STATUS':
self._handle_system_status(msg_data)
# 将消息放入队列
self.message_queue.put({
'type': msg_type,
'data': msg_data,
'timestamp': datetime.now()
})
```
5. **消息发送示例**
```python
# 发送命令消息
def send_command(self, command_type: str, **kwargs):
"""发送MAVLink命令"""
try:
# 构建命令消息
msg = self.connection.mav.command_long_send(
self.connection.target_system, # 目标系统ID
self.connection.target_component, # 目标组件ID
command_type, # 命令类型
0, # 确认
*kwargs.values() # 命令参数
)
return True
except Exception as e:
print(f"发送命令失败: {str(e)}")
return False
```
6. **消息转换和存储**
```python
# 在 DatabaseHandler 中存储消息
def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool:
"""保存MAVLink消息到数据库"""
query = """
INSERT INTO mavlink_messages
(message_type, message_data, timestamp)
VALUES (%s, %s, %s)
"""
params = (
msg_type,
json.dumps(msg_data), # 将消息数据转换为JSON格式存储
datetime.now()
)
return self.execute_update(query, params)
```
7. **消息查询示例**
```python
# 查询特定类型的消息
def get_messages_by_type(self, msg_type: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> List[Dict]:
"""获取特定类型的MAVLink消息"""
query = """
SELECT * FROM mavlink_messages
WHERE message_type = %s
"""
params = [msg_type]
if start_time:
query += " AND timestamp >= %s"
params.append(start_time)
if end_time:
query += " AND timestamp <= %s"
params.append(end_time)
query += " ORDER BY timestamp DESC"
return self.execute_query(query, tuple(params))
```
MAVLink 消息的特点:
1. **二进制格式**:高效的数据传输
2. **固定头部**:便于消息识别和解析
3. **可变载荷**:支持不同类型的数据
4. **CRC校验**:确保数据完整性
5. **系统/组件ID**:支持多设备通信
6. **序列号**:支持消息顺序和丢失检测
在实际使用中,我们不需要直接处理二进制格式,因为 pymavlink 库已经帮我们处理了消息的编码和解码。我们只需要关注消息的内容和类型即可。
需要我详细解释某个具体的消息类型或处理过程吗?

@ -0,0 +1,13 @@
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
from .message_queue import MessageQueue
from .drone_connection_manager import DroneConnectionManager
from .communication_manager import CommunicationManager
__all__ = [
'MavlinkHandler',
'DatabaseHandler',
'MessageQueue',
'DroneConnectionManager',
'CommunicationManager'
]

@ -0,0 +1,181 @@
import threading
import time
from typing import Dict, Any, Optional, List
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
from .message_queue import MessageQueue
from .drone_connection_manager import DroneConnectionManager
class CommunicationManager:
def __init__(self, mavlink_connection: str, db_config: Dict[str, str]):
self.mavlink = MavlinkHandler(mavlink_connection)
self.database = DatabaseHandler(
host=db_config['host'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database']
)
self.drone_manager = DroneConnectionManager(self.database)
self.running = False
self.thread = None
self.message_queue = MessageQueue()
def start(self) -> bool:
"""启动通信管理器"""
if not self.database.connect():
return False
# 创建数据库表
self.database.create_tables()
# 启动MAVLink连接
if not self.mavlink.connect():
return False
# 注册MAVLink消息回调
self.mavlink.register_callback('HEARTBEAT', self._handle_heartbeat)
self.mavlink.register_callback('GLOBAL_POSITION_INT', self._handle_position)
self.mavlink.register_callback('SYS_STATUS', self._handle_system_status)
# 启动MAVLink消息接收
self.mavlink.start()
# 启动无人机连接管理器
self.drone_manager.start()
# 启动消息处理线程
self.running = True
self.thread = threading.Thread(target=self._process_messages)
self.thread.daemon = True
self.thread.start()
return True
def stop(self):
"""停止通信管理器"""
self.running = False
if self.thread:
self.thread.join()
self.mavlink.stop()
self.drone_manager.stop()
self.database.disconnect()
def add_drone(self, drone_id: str, ip_address: str, port: int,
connection_type: str = 'udp') -> bool:
"""添加新的无人机连接"""
return self.database.add_drone_connection(drone_id, ip_address, port, connection_type)
def connect_drone(self, drone_id: str) -> bool:
"""连接指定的无人机"""
return self.drone_manager.connect_drone(drone_id)
def disconnect_drone(self, drone_id: str):
"""断开指定无人机的连接"""
self.drone_manager.disconnect_drone(drone_id)
def get_connected_drones(self) -> List[str]:
"""获取所有已连接的无人机ID"""
return self.drone_manager.get_connected_drones()
def is_drone_connected(self, drone_id: str) -> bool:
"""检查指定无人机是否已连接"""
return self.drone_manager.is_drone_connected(drone_id)
def send_command(self, drone_id: str, command_type: str, **kwargs) -> bool:
"""向指定无人机发送命令"""
return self.drone_manager.send_command(drone_id, command_type, **kwargs)
def _process_messages(self):
"""处理消息队列中的消息"""
while self.running:
try:
msg = self.mavlink.get_message_queue().get(timeout=1.0)
if msg:
self._handle_message(msg)
except Exception as e:
print(f"消息处理错误: {str(e)}")
time.sleep(1)
def _handle_message(self, msg: Dict[str, Any]):
"""处理接收到的消息"""
try:
msg_type = msg['message'].get_type()
msg_data = msg['message'].to_dict()
# 保存消息到数据库
self.database.save_mavlink_message(msg_type, msg_data)
# 根据消息类型更新无人机状态
if msg_type == 'GLOBAL_POSITION_INT':
self._update_drone_position(msg_data)
elif msg_type == 'SYS_STATUS':
self._update_drone_status(msg_data)
except Exception as e:
print(f"消息处理失败: {str(e)}")
def _handle_heartbeat(self, msg):
"""处理心跳消息"""
self.message_queue.put({
'type': 'HEARTBEAT',
'data': msg.to_dict()
})
def _handle_position(self, msg):
"""处理位置消息"""
self.message_queue.put({
'type': 'POSITION',
'data': msg.to_dict()
})
def _handle_system_status(self, msg):
"""处理系统状态消息"""
self.message_queue.put({
'type': 'SYSTEM_STATUS',
'data': msg.to_dict()
})
def _update_drone_position(self, position_data: Dict[str, Any]):
"""更新无人机位置信息"""
query = """
INSERT INTO drone_status
(drone_id, latitude, longitude, altitude, timestamp)
VALUES (%s, %s, %s, %s, %s)
"""
params = (
position_data.get('sysid', 'unknown'),
position_data.get('lat', 0) / 1e7, # 转换为度
position_data.get('lon', 0) / 1e7, # 转换为度
position_data.get('alt', 0) / 1000, # 转换为米
datetime.now()
)
self.database.execute_update(query, params)
def _update_drone_status(self, status_data: Dict[str, Any]):
"""更新无人机状态信息"""
query = """
UPDATE drone_status
SET battery_level = %s, status = %s
WHERE drone_id = %s
ORDER BY timestamp DESC LIMIT 1
"""
params = (
status_data.get('battery_remaining', 0),
status_data.get('system_status', 'UNKNOWN'),
status_data.get('sysid', 'unknown')
)
self.database.execute_update(query, params)
def get_drone_status(self, drone_id: str) -> Optional[Dict[str, Any]]:
"""获取指定无人机的状态"""
query = """
SELECT * FROM drone_status
WHERE drone_id = %s
ORDER BY timestamp DESC LIMIT 1
"""
result = self.database.execute_query(query, (drone_id,))
return result[0] if result else None
def get_message_queue(self) -> MessageQueue:
"""获取消息队列"""
return self.message_queue

@ -0,0 +1,188 @@
import mysql.connector
from mysql.connector import Error
from typing import Dict, List, Any, Optional
import json
from datetime import datetime
class DatabaseHandler:
#替换数据库账号密码
def __init__(self, host: str, user: str, password: str, database: str):
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
self.cursor = None
def connect(self) -> bool:
"""连接到MySQL数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
self.cursor = self.connection.cursor(dictionary=True)
print(f"数据库连接成功: {self.database}")
return True
except Error as e:
print(f"数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]:
"""执行查询并返回结果"""
try:
self.cursor.execute(query, params)
return self.cursor.fetchall()
except Error as e:
print(f"查询执行失败: {str(e)}")
return []
def execute_update(self, query: str, params: Optional[tuple] = None) -> bool:
"""执行更新操作"""
try:
self.cursor.execute(query, params)
self.connection.commit()
return True
except Error as e:
print(f"更新执行失败: {str(e)}")
self.connection.rollback()
return False
def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool:
"""保存MAVLink消息到数据库"""
query = """
INSERT INTO mavlink_messages
(message_type, message_data, timestamp)
VALUES (%s, %s, %s)
"""
params = (
msg_type,
json.dumps(msg_data),
datetime.now()
)
return self.execute_update(query, params)
def get_mavlink_messages(self, msg_type: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> List[Dict]:
"""获取MAVLink消息"""
query = "SELECT * FROM mavlink_messages WHERE 1=1"
params = []
if msg_type:
query += " AND message_type = %s"
params.append(msg_type)
if start_time:
query += " AND timestamp >= %s"
params.append(start_time)
if end_time:
query += " AND timestamp <= %s"
params.append(end_time)
query += " ORDER BY timestamp DESC"
return self.execute_query(query, tuple(params) if params else None)
def create_tables(self):
"""创建必要的数据库表"""
queries = [
"""
CREATE TABLE IF NOT EXISTS mavlink_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
message_type VARCHAR(50) NOT NULL,
message_data JSON NOT NULL,
timestamp DATETIME NOT NULL,
INDEX idx_message_type (message_type),
INDEX idx_timestamp (timestamp)
)
""",
"""
CREATE TABLE IF NOT EXISTS drone_status (
id INT AUTO_INCREMENT PRIMARY KEY,
drone_id VARCHAR(50) NOT NULL,
latitude FLOAT,
longitude FLOAT,
altitude FLOAT,
battery_level FLOAT,
status VARCHAR(50),
timestamp DATETIME NOT NULL,
INDEX idx_drone_id (drone_id),
INDEX idx_timestamp (timestamp)
)
""",
"""
CREATE TABLE IF NOT EXISTS drone_connections (
id INT AUTO_INCREMENT PRIMARY KEY,
drone_id VARCHAR(50) NOT NULL UNIQUE,
ip_address VARCHAR(50) NOT NULL,
port INT NOT NULL,
connection_type VARCHAR(20) NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
last_connected DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_drone_id (drone_id),
INDEX idx_is_active (is_active)
)
"""
]
for query in queries:
try:
self.cursor.execute(query)
self.connection.commit()
except Error as e:
print(f"创建表失败: {str(e)}")
self.connection.rollback()
def get_all_active_drones(self) -> List[Dict[str, Any]]:
"""获取所有活跃的无人机连接信息"""
query = """
SELECT * FROM drone_connections
WHERE is_active = TRUE
"""
return self.execute_query(query)
def get_drone_connection(self, drone_id: str) -> Optional[Dict[str, Any]]:
"""获取指定无人机的连接信息"""
query = """
SELECT * FROM drone_connections
WHERE drone_id = %s AND is_active = TRUE
"""
result = self.execute_query(query, (drone_id,))
return result[0] if result else None
def update_drone_connection_status(self, drone_id: str, is_active: bool) -> bool:
"""更新无人机连接状态"""
query = """
UPDATE drone_connections
SET is_active = %s, last_connected = %s
WHERE drone_id = %s
"""
params = (is_active, datetime.now(), drone_id)
return self.execute_update(query, params)
def add_drone_connection(self, drone_id: str, ip_address: str, port: int,
connection_type: str = 'udp') -> bool:
"""添加新的无人机连接信息"""
query = """
INSERT INTO drone_connections
(drone_id, ip_address, port, connection_type)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
ip_address = VALUES(ip_address),
port = VALUES(port),
connection_type = VALUES(connection_type),
is_active = TRUE,
updated_at = CURRENT_TIMESTAMP
"""
params = (drone_id, ip_address, port, connection_type)
return self.execute_update(query, params)

@ -0,0 +1,144 @@
from typing import Dict, List, Optional
from .mavlink_handler import MavlinkHandler
from .database_handler import DatabaseHandler
import threading
import time
from datetime import datetime, timedelta
class DroneConnectionManager:
def __init__(self, db_handler: DatabaseHandler):
self.db_handler = db_handler
self.drone_handlers: Dict[str, MavlinkHandler] = {}
self.last_heartbeat: Dict[str, datetime] = {}
self.is_running = False
self.monitor_thread = None
self.TIMEOUT_SECONDS = 60 # 1分钟超时
def start(self):
"""启动连接管理器"""
if not self.is_running:
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_connections)
self.monitor_thread.daemon = True
self.monitor_thread.start()
# 连接所有活跃的无人机
self._connect_all_active_drones()
return True
return False
def stop(self):
"""停止连接管理器"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join()
self._disconnect_all_drones()
def _connect_all_active_drones(self):
"""连接所有活跃的无人机"""
active_drones = self.db_handler.get_all_active_drones()
for drone in active_drones:
self.connect_drone(drone['drone_id'], drone['ip_address'], drone['port'])
def _disconnect_all_drones(self):
"""断开所有无人机的连接"""
for drone_id in list(self.drone_handlers.keys()):
self.disconnect_drone(drone_id)
def connect_drone(self, drone_id: str, ip: str, port: int) -> bool:
"""连接到指定无人机"""
try:
if drone_id in self.drone_handlers:
return True
handler = MavlinkHandler(f"udp:{ip}:{port}")
if handler.connect():
self.drone_handlers[drone_id] = handler
self.last_heartbeat[drone_id] = datetime.now()
self.db_handler.update_drone_connection_status(drone_id, "connected")
# 启动消息接收线程
handler.start()
print(f"成功连接到无人机 {drone_id}")
return True
return False
except Exception as e:
print(f"连接无人机 {drone_id} 失败: {str(e)}")
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
return False
def disconnect_drone(self, drone_id: str):
"""断开与指定无人机的连接"""
if drone_id in self.drone_handlers:
try:
handler = self.drone_handlers[drone_id]
handler.stop() # 停止消息接收线程
handler.disconnect()
del self.drone_handlers[drone_id]
if drone_id in self.last_heartbeat:
del self.last_heartbeat[drone_id]
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
print(f"已断开与无人机 {drone_id} 的连接")
except Exception as e:
print(f"断开无人机 {drone_id} 连接失败: {str(e)}")
def _monitor_connections(self):
"""监控所有无人机连接状态"""
while self.is_running:
try:
current_time = datetime.now()
for drone_id, handler in list(self.drone_handlers.items()):
# 检查最后心跳时间
if drone_id in self.last_heartbeat:
time_since_last_heartbeat = (current_time - self.last_heartbeat[drone_id]).total_seconds()
if time_since_last_heartbeat > self.TIMEOUT_SECONDS:
print(f"无人机 {drone_id} 通信超时")
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
self.disconnect_drone(drone_id)
continue
# 检查是否有新的心跳消息
try:
msg = handler.receive_message()
if msg and msg.get_type() == 'HEARTBEAT':
self.last_heartbeat[drone_id] = current_time
self.db_handler.update_drone_connection_status(drone_id, "connected")
except Exception as e:
print(f"接收无人机 {drone_id} 消息失败: {str(e)}")
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
self.disconnect_drone(drone_id)
# 检查是否有新的活跃无人机需要连接
active_drones = self.db_handler.get_all_active_drones()
for drone in active_drones:
if drone['drone_id'] not in self.drone_handlers:
print(f"发现新的活跃无人机 {drone['drone_id']},尝试连接")
self.connect_drone(drone['drone_id'], drone['ip_address'], drone['port'])
except Exception as e:
print(f"监控连接状态时发生错误: {str(e)}")
time.sleep(1) # 每秒检查一次
def get_drone_handler(self, drone_id: str) -> Optional[MavlinkHandler]:
"""获取指定无人机的处理器"""
return self.drone_handlers.get(drone_id)
def get_connected_drones(self) -> List[str]:
"""获取所有已连接的无人机ID"""
return list(self.drone_handlers.keys())
def is_drone_connected(self, drone_id: str) -> bool:
"""检查指定无人机是否已连接"""
return drone_id in self.drone_handlers
def send_command(self, drone_id: str, command: str, params: dict = None) -> bool:
"""向指定无人机发送命令"""
handler = self.get_drone_handler(drone_id)
if handler:
try:
handler.send_command(command, params)
return True
except Exception as e:
print(f"发送命令到无人机 {drone_id} 失败: {str(e)}")
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
self.disconnect_drone(drone_id)
return False

@ -0,0 +1,81 @@
import pymavlink.mavutil as mavutil
import threading
import time
from typing import Callable, Dict, Any
from .message_queue import MessageQueue
class MavlinkHandler:
def __init__(self, connection_string: str = "udp:127.0.0.1:14550"):
self.connection_string = connection_string
self.connection = None
self.message_queue = MessageQueue()
self.running = False
self.thread = None
self.callbacks: Dict[str, Callable] = {}
def connect(self) -> bool:
"""建立MAVLink连接"""
try:
self.connection = mavutil.mavlink_connection(self.connection_string)#连接无人机的IP地址、蓝牙、串口等
self.connection.wait_heartbeat()
print(f"MAVLink连接成功: {self.connection_string}")
return True
except Exception as e:
print(f"MAVLink连接失败: {str(e)}")
return False
def start(self):
"""启动MAVLink消息接收线程"""
if not self.connection:
if not self.connect():
return
self.running = True
self.thread = threading.Thread(target=self._receive_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""停止MAVLink消息接收"""
self.running = False
if self.thread:
self.thread.join()
if self.connection:
self.connection.close()
def _receive_loop(self):
"""MAVLink消息接收循环"""
while self.running:
try:
msg = self.connection.recv_match(blocking=True, timeout=1.0)
if msg:
self._handle_message(msg)
except Exception as e:
print(f"接收消息错误: {str(e)}")
time.sleep(1)
def _handle_message(self, msg):
"""处理接收到的MAVLink消息"""
msg_type = msg.get_type()
if msg_type in self.callbacks:
self.callbacks[msg_type](msg)
self.message_queue.put(msg)
def register_callback(self, msg_type: str, callback: Callable):
"""注册消息回调函数"""
self.callbacks[msg_type] = callback
def send_message(self, msg_type: str, **kwargs):
"""发送MAVLink消息"""
if not self.connection:
return False
try:
self.connection.mav.send(mavutil.mavlink.MAVLink_message_type_map[msg_type](**kwargs))
return True
except Exception as e:
print(f"发送消息失败: {str(e)}")
return False
def get_message_queue(self) -> MessageQueue:
"""获取消息队列"""
return self.message_queue

@ -0,0 +1,65 @@
import queue
import threading
from typing import Any, Optional
from datetime import datetime
class MessageQueue:
def __init__(self, maxsize: int = 1000):
self.queue = queue.Queue(maxsize=maxsize)
self.lock = threading.Lock()
self.callbacks = []
def put(self, message: Any):
"""将消息放入队列"""
with self.lock:
try:
self.queue.put_nowait({
'message': message,
'timestamp': datetime.now()
})
self._notify_callbacks()
except queue.Full:
print("消息队列已满,丢弃消息")
def get(self, block: bool = True, timeout: Optional[float] = None) -> Optional[dict]:
"""从队列中获取消息"""
try:
return self.queue.get(block=block, timeout=timeout)
except queue.Empty:
return None
def register_callback(self, callback: callable):
"""注册消息回调函数"""
with self.lock:
self.callbacks.append(callback)
def unregister_callback(self, callback: callable):
"""注销消息回调函数"""
with self.lock:
if callback in self.callbacks:
self.callbacks.remove(callback)
def _notify_callbacks(self):
"""通知所有注册的回调函数"""
for callback in self.callbacks:
try:
callback()
except Exception as e:
print(f"回调函数执行失败: {str(e)}")
def clear(self):
"""清空消息队列"""
with self.lock:
while not self.queue.empty():
try:
self.queue.get_nowait()
except queue.Empty:
break
def size(self) -> int:
"""获取队列大小"""
return self.queue.qsize()
def is_empty(self) -> bool:
"""检查队列是否为空"""
return self.queue.empty()

@ -2,8 +2,8 @@ import sys
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QTabWidget, QPushButton, QLabel,
QGroupBox, QComboBox, QSpinBox, QDoubleSpinBox,
QProgressBar, QCheckBox)
from PyQt5.QtCore import Qt
QProgressBar, QCheckBox, QMessageBox)
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QFont
from ui.login_view import LoginView
from ui.main_view import MainView
@ -16,10 +16,15 @@ from ui.status_dashboard import StatusDashboard
from ui.path_planning_view import PathPlanningView
from ui.algorithm_config_view import AlgorithmConfigView
from ui.path_simulation_view import PathSimulationView
from communication.communication_manager import CommunicationManager
class CommandCenterApp(QMainWindow):
# 定义信号
login_successful = pyqtSignal()
def __init__(self):
super().__init__()
self.comm_manager = None
self.init_ui()
def init_ui(self):
@ -50,18 +55,59 @@ class CommandCenterApp(QMainWindow):
main_layout.addWidget(self.tab_widget)
# 连接登录成功信号
self.login_successful.connect(self.on_login_successful)
# 显示登录窗口
self.show_login()
def show_login(self):
self.login_view = LoginView()
self.login_view.login_successful.connect(self.on_login_successful)
self.login_view.show()
# 连接登录成功信号
# TODO: 实现登录成功后的处理逻辑
def on_login_successful(self):
"""登录成功后的处理"""
# 初始化通信管理器
self.init_communication()
# 显示主窗口
self.show()
# 关闭登录窗口
self.login_view.close()
def init_communication(self):
"""初始化通信管理器"""
try:
# 暂时跳过数据库连接和通信管理器的初始化
print("通信管理器初始化已跳过")
self.update_drone_list()
except Exception as e:
QMessageBox.critical(self, "错误", f"初始化通信失败: {str(e)}")
def update_drone_list(self):
"""更新无人机列表"""
try:
# 暂时使用空列表
connected_drones = []
# 更新无人机列表视图
drone_list_view = self.tab_widget.widget(3) # DroneListView的索引
if isinstance(drone_list_view, DroneListView):
drone_list_view.update_drone_list(connected_drones)
except Exception as e:
print(f"更新无人机列表失败: {str(e)}")
def closeEvent(self, event):
"""关闭窗口时的处理"""
if self.comm_manager:
self.comm_manager.stop()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = CommandCenterApp()
window.show()
sys.exit(app.exec_())

@ -1,9 +1,12 @@
from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QMessageBox)
from PyQt5.QtCore import Qt
from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtGui import QFont
class LoginView(QWidget):
# 定义登录成功信号
login_successful = pyqtSignal()
def __init__(self):
super().__init__()
self.setWindowTitle("无人机后勤输送系统 - 登录")
@ -61,8 +64,11 @@ class LoginView(QWidget):
QMessageBox.warning(self, "错误", "请输入用户名和密码")
return
# TODO: 实现实际的登录逻辑
print(f"尝试登录 - 用户名: {username}, 密码: {password}")
# 添加默认测试账号
if username == "admin" and password == "admin123":
self.login_successful.emit()
else:
QMessageBox.warning(self, "错误", "用户名或密码错误")
def handle_register(self):
# TODO: 实现注册功能

Loading…
Cancel
Save