From 0fa4bc2d3d4f5052faabd18a765e26cbcf382309 Mon Sep 17 00:00:00 2001 From: zhang_heng <3349123936@qq.com> Date: Sun, 4 May 2025 10:26:17 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E4=BF=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Src/command_center/communication/README.md | 550 ++++++++++++++++++ Src/command_center/communication/__init__.py | 5 + .../communication/communication_manager.py | 150 +++++ .../communication/database_handler.py | 129 ++++ .../communication/mavlink_handler.py | 81 +++ .../communication/message_queue.py | 65 +++ 6 files changed, 980 insertions(+) create mode 100644 Src/command_center/communication/README.md create mode 100644 Src/command_center/communication/__init__.py create mode 100644 Src/command_center/communication/communication_manager.py create mode 100644 Src/command_center/communication/database_handler.py create mode 100644 Src/command_center/communication/mavlink_handler.py create mode 100644 Src/command_center/communication/message_queue.py diff --git a/Src/command_center/communication/README.md b/Src/command_center/communication/README.md new file mode 100644 index 00000000..c66011ef --- /dev/null +++ b/Src/command_center/communication/README.md @@ -0,0 +1,550 @@ +我会帮你实现通信模块。首先,我们需要创建通信模块的目录结构和相关文件。让我们先看看是否已经有相关的目录: + + +Ran tool + +我们需要创建一个新的通信模块。我会创建以下文件: +1. `communication/` 目录 +2. `mavlink_handler.py` - MAVLink通信处理 +3. `database_handler.py` - 数据库操作 +4. `message_queue.py` - 消息队列管理 +5. `__init__.py` - 模块初始化文件 + + +我已经完成了通信模块的实现,这个模块包含以下主要组件: + +1. `MavlinkHandler` - MAVLink通信处理器: + - 负责建立和维护MAVLink连接 + - 处理MAVLink消息的发送和接收 + - 支持消息回调注册 + - 使用线程进行异步消息处理 + +2. `DatabaseHandler` - 数据库处理器: + - 负责MySQL数据库的连接和管理 + - 提供数据库表的创建和管理 + - 支持MAVLink消息的存储和查询 + - 管理无人机状态信息的存储和更新 + +3. `MessageQueue` - 消息队列: + - 提供线程安全的消息队列实现 + - 支持消息的异步处理 + - 提供回调机制 + - 包含消息时间戳 + +4. `CommunicationManager` - 通信管理器: + - 协调MAVLink和数据库组件 + - 处理消息的路由和分发 + - 管理无人机状态更新 + - 提供统一的接口访问通信功能 + +要使用这个通信模块,你需要: + +1. 安装必要的依赖: +```bash +pip install pymavlink mysql-connector-python +``` + +2. 配置数据库连接: +```python +db_config = { + 'host': 'localhost', + 'user': 'your_username', + 'password': 'your_password', + 'database': 'drone_control' +} +``` + +3. 初始化通信管理器: +```python +from command_center.communication import CommunicationManager + +# 创建通信管理器实例 +comm_manager = CommunicationManager( + mavlink_connection="udp:127.0.0.1:14550", # MAVLink连接字符串 + db_config=db_config +) + +# 启动通信管理器 +if comm_manager.start(): + print("通信管理器启动成功") + + # 获取无人机状态 + drone_status = comm_manager.get_drone_status("drone_1") + print(f"无人机状态: {drone_status}") + + # 获取消息队列 + message_queue = comm_manager.get_message_queue() + + # 注册消息回调 + def on_message(): + msg = message_queue.get() + if msg: + print(f"收到消息: {msg}") + + message_queue.register_callback(on_message) +``` + +这个通信模块提供了完整的MAVLink通信和数据库集成功能,可以: +1. 接收和处理MAVLink消息 +2. 将消息存储到MySQL数据库 +3. 实时更新无人机状态 +4. 提供消息队列和回调机制 +5. 支持多线程异步处理 + +你可以根据需要扩展这个模块,比如: +1. 添加更多的MAVLink消息处理 +2. 实现更复杂的数据库查询 +3. 添加消息过滤和转换功能 +4. 实现消息重传机制 +5. 添加错误处理和恢复机制 + +需要我详细解释某个部分或者添加其他功能吗? + +我来详细描述一下当前通信模块的工作流程: + +1. **初始化阶段**: + - 创建 `CommunicationManager` 实例,传入 MAVLink 连接字符串和数据库配置 + - 初始化三个核心组件: + - `MavlinkHandler`:负责 MAVLink 通信 + - `DatabaseHandler`:负责数据库操作 + - `MessageQueue`:负责消息队列管理 + +2. **启动阶段**(调用 `start()` 方法): + - 首先连接数据库 + - 创建必要的数据库表(`mavlink_messages` 和 `drone_status`) + - 建立 MAVLink 连接 + - 注册三个关键消息的回调函数: + - `HEARTBEAT`:心跳消息 + - `GLOBAL_POSITION_INT`:位置信息 + - `SYS_STATUS`:系统状态 + - 启动 MAVLink 消息接收线程 + - 启动消息处理线程 + +3. **消息接收流程**: + ``` + MAVLink设备 -> MavlinkHandler -> MessageQueue -> CommunicationManager -> DatabaseHandler + ``` + - `MavlinkHandler` 通过 `_receive_loop` 持续监听 MAVLink 消息 + - 收到消息后,调用 `_handle_message` 处理 + - 消息被放入 `MessageQueue` 队列 + - 同时触发注册的回调函数 + +4. **消息处理流程**: + ``` + MessageQueue -> CommunicationManager._process_messages -> 数据库存储/状态更新 + ``` + - `CommunicationManager` 的 `_process_messages` 线程从队列获取消息 + - 对消息进行分类处理: + - 保存到数据库的 `mavlink_messages` 表 + - 根据消息类型更新 `drone_status` 表 + - 位置消息:更新经纬度和高度 + - 状态消息:更新电池电量和系统状态 + +5. **数据库操作**: + - 所有 MAVLink 消息都被保存到 `mavlink_messages` 表 + - 无人机状态信息被保存到 `drone_status` 表 + - 支持按时间范围和消息类型查询历史消息 + - 支持获取最新的无人机状态 + +6. **消息回调机制**: + - 支持注册消息回调函数 + - 当收到特定类型的消息时,自动触发对应的回调 + - 回调函数可以处理消息并执行自定义逻辑 + +7. **错误处理**: + - 连接失败时的重试机制 + - 消息处理异常的错误捕获 + - 数据库操作的异常处理和回滚 + +8. **停止流程**(调用 `stop()` 方法): + - 停止消息处理线程 + - 停止 MAVLink 连接 + - 断开数据库连接 + - 清理资源 + +这个通信模块的设计特点: +1. **异步处理**:使用多线程实现消息的异步接收和处理 +2. **数据持久化**:所有消息都被保存到数据库,便于后续分析 +3. **状态管理**:实时维护无人机的最新状态 +4. **可扩展性**:通过回调机制支持自定义消息处理 +5. **错误恢复**:包含完整的错误处理机制 + +使用示例: +```python +# 初始化 +comm_manager = CommunicationManager( + mavlink_connection="udp:127.0.0.1:14550", + db_config={ + 'host': 'localhost', + 'user': 'root', + 'password': 'password', + 'database': 'drone_control' + } +) + +# 启动通信 +comm_manager.start() + +# 获取无人机状态 +drone_status = comm_manager.get_drone_status("drone_1") + +# 注册消息回调 +def on_message(): + msg = comm_manager.get_message_queue().get() + if msg: + print(f"收到消息: {msg}") + +comm_manager.get_message_queue().register_callback(on_message) +``` + +这个通信模块提供了一个完整的解决方案,可以: +1. 实时接收和处理 MAVLink 消息 +2. 将消息持久化到数据库 +3. 维护无人机的最新状态 +4. 支持自定义消息处理 +5. 提供错误恢复机制 + + + +让我详细解释一下与无人机建立连接的过程。在当前的实现中,连接建立主要发生在 `MavlinkHandler` 类中,通过 `connect()` 方法实现。让我们看看具体的连接流程: + +1. **连接初始化**: +```python +# 在 MavlinkHandler 类中 +def __init__(self, connection_string: str = "udp:127.0.0.1:14550"): + self.connection_string = connection_string + self.connection = None + # ... 其他初始化代码 +``` + +2. **建立连接**: +```python +def connect(self) -> bool: + """建立MAVLink连接""" + try: + # 创建MAVLink连接 + self.connection = mavutil.mavlink_connection(self.connection_string) + # 等待心跳包,确认连接成功 + self.connection.wait_heartbeat() + print(f"MAVLink连接成功: {self.connection_string}") + return True + except Exception as e: + print(f"MAVLink连接失败: {str(e)}") + return False +``` + +连接建立的关键点: + +1. **连接字符串格式**: + - 支持多种连接方式: + - UDP: `"udp:IP:PORT"` (例如: `"udp:127.0.0.1:14550"`) + - TCP: `"tcp:IP:PORT"` + - 串口: `"serial:PORT,BAUDRATE"` (例如: `"serial:/dev/ttyUSB0,57600"`) + - 蓝牙: `"bluetooth:ADDRESS"` + +2. **连接验证**: + - 通过 `wait_heartbeat()` 等待无人机发送心跳包 + - 心跳包是MAVLink协议中的基础消息,用于确认连接状态 + - 如果在一定时间内没有收到心跳包,连接会失败 + +3. **连接状态管理**: + - 连接成功后,`connection` 对象会被保存 + - 用于后续的消息发送和接收 + - 连接失败时会返回 `False`,上层可以处理错误 + +要使用这个连接功能,你需要: + +1. **确定连接方式**: + - 如果无人机使用地面站软件(如QGroundControl),通常使用UDP连接 + - 如果直接连接飞控,可能需要使用串口连接 + - 连接字符串需要根据实际情况配置 + +2. **配置连接参数**: +```python +# UDP连接示例 +mavlink_connection = "udp:127.0.0.1:14550" # 本地UDP连接 + +# 串口连接示例 +mavlink_connection = "serial:/dev/ttyUSB0,57600" # Linux系统 +# 或 +mavlink_connection = "serial:COM3,57600" # Windows系统 +``` + +3. **建立连接**: +```python +# 创建通信管理器 +comm_manager = CommunicationManager( + mavlink_connection="udp:127.0.0.1:14550", # 根据实际情况修改 + db_config={ + 'host': 'localhost', + 'user': 'root', + 'password': 'password', + 'database': 'drone_control' + } +) + +# 启动连接 +if comm_manager.start(): + print("连接成功") +else: + print("连接失败,请检查连接参数") +``` + +4. **连接后的操作**: + - 发送命令:使用 `send_message` 方法 + - 接收消息:通过消息队列或回调函数 + - 监控状态:通过数据库查询 + +5. **错误处理**: + - 连接失败时会有错误提示 + - 可以设置重试机制 + - 可以添加连接状态监控 + +建议的连接使用流程: + +1. **准备阶段**: +```python +def setup_connection(): + # 配置连接参数 + connection_config = { + 'mavlink': { + 'type': 'udp', # 或 'serial', 'tcp' 等 + 'address': '127.0.0.1', + 'port': 14550, + 'baudrate': 57600 # 串口连接时需要 + }, + 'database': { + 'host': 'localhost', + 'user': 'root', + 'password': 'password', + 'database': 'drone_control' + } + } + + # 构建连接字符串 + if connection_config['mavlink']['type'] == 'udp': + connection_string = f"udp:{connection_config['mavlink']['address']}:{connection_config['mavlink']['port']}" + elif connection_config['mavlink']['type'] == 'serial': + connection_string = f"serial:{connection_config['mavlink']['address']},{connection_config['mavlink']['baudrate']}" + + return connection_string, connection_config['database'] +``` + +2. **连接建立**: +```python +def establish_connection(): + connection_string, db_config = setup_connection() + + # 创建通信管理器 + comm_manager = CommunicationManager(connection_string, db_config) + + # 尝试连接 + max_retries = 3 + retry_count = 0 + + while retry_count < max_retries: + if comm_manager.start(): + print("连接成功") + return comm_manager + else: + retry_count += 1 + print(f"连接失败,第{retry_count}次重试...") + time.sleep(2) # 等待2秒后重试 + + print("连接失败,达到最大重试次数") + return None +``` + +3. **连接监控**: +```python +def monitor_connection(comm_manager): + def heartbeat_callback(msg): + print(f"收到心跳包: {msg.to_dict()}") + + # 注册心跳回调 + comm_manager.mavlink.register_callback('HEARTBEAT', heartbeat_callback) + + # 定期检查连接状态 + while True: + if not comm_manager.mavlink.connection: + print("连接断开,尝试重连...") + if not comm_manager.mavlink.connect(): + print("重连失败") + break + time.sleep(1) +``` + +这个连接模块提供了灵活的连接方式,可以根据不同的无人机和飞控配置选择合适的连接参数。同时,它也包含了错误处理和重试机制,确保连接的可靠性。 + +需要注意的是: +1. 确保无人机和地面站软件(如QGroundControl)使用相同的端口 +2. 串口连接时需要正确的端口号和波特率 +3. 防火墙设置可能会影响UDP/TCP连接 +4. 建议在连接前先测试基本的MAVLink通信 + +你可以根据实际需求调整连接参数和错误处理策略。需要我详细解释某个部分吗? + + + +MAVLink 消息的格式是标准化的二进制协议格式。让我详细解释一下 MAVLink 消息的格式和结构: + +1. **MAVLink 消息基本结构**: +``` ++-------------------+------------------+------------------+------------------+------------------+ +| 起始标记 (0xFE) | 载荷长度 (1字节) | 序列号 (1字节) | 系统ID (1字节) | 组件ID (1字节) | ++-------------------+------------------+------------------+------------------+------------------+ +| 消息ID (1字节) | 载荷数据 (N字节) | CRC校验 (2字节) | ++-------------------+------------------+------------------+ +``` + +2. **消息字段说明**: + - **起始标记 (0xFE)**:标识消息开始 + - **载荷长度**:消息载荷的长度(不包括头部和CRC) + - **序列号**:用于检测消息丢失 + - **系统ID**:发送系统的ID(如无人机ID) + - **组件ID**:发送组件的ID(如飞控、相机等) + - **消息ID**:标识消息类型 + - **载荷数据**:实际的消息内容 + - **CRC校验**:用于验证消息完整性 + +3. **常见消息类型**: +```python +# 心跳消息 (HEARTBEAT) +{ + 'type': 'HEARTBEAT', + 'system_status': 0, # 系统状态 + 'base_mode': 0, # 基础模式 + 'custom_mode': 0, # 自定义模式 + 'mavlink_version': 3 # MAVLink版本 +} + +# 位置消息 (GLOBAL_POSITION_INT) +{ + 'type': 'GLOBAL_POSITION_INT', + 'time_boot_ms': 0, # 启动时间 + 'lat': 0, # 纬度 (度 * 1e7) + 'lon': 0, # 经度 (度 * 1e7) + 'alt': 0, # 高度 (毫米) + 'relative_alt': 0, # 相对高度 (毫米) + 'vx': 0, # X方向速度 (厘米/秒) + 'vy': 0, # Y方向速度 (厘米/秒) + 'vz': 0, # Z方向速度 (厘米/秒) + 'hdg': 0 # 航向 (度 * 100) +} + +# 系统状态消息 (SYS_STATUS) +{ + 'type': 'SYS_STATUS', + 'onboard_control_sensors_present': 0, # 存在的传感器 + 'onboard_control_sensors_enabled': 0, # 启用的传感器 + 'onboard_control_sensors_health': 0, # 传感器健康状态 + 'load': 0, # 处理器负载 + 'voltage_battery': 0, # 电池电压 + 'current_battery': 0, # 电池电流 + 'battery_remaining': 0, # 剩余电量百分比 + 'drop_rate_comm': 0, # 通信丢包率 + 'errors_comm': 0, # 通信错误 + 'errors_count1': 0, # 错误计数1 + 'errors_count2': 0, # 错误计数2 + 'errors_count3': 0, # 错误计数3 + 'errors_count4': 0 # 错误计数4 +} +``` + +4. **消息处理示例**: +```python +# 在 MavlinkHandler 类中处理消息 +def _handle_message(self, msg): + """处理接收到的MAVLink消息""" + msg_type = msg.get_type() + msg_data = msg.to_dict() + + # 根据消息类型处理 + if msg_type == 'HEARTBEAT': + self._handle_heartbeat(msg_data) + elif msg_type == 'GLOBAL_POSITION_INT': + self._handle_position(msg_data) + elif msg_type == 'SYS_STATUS': + self._handle_system_status(msg_data) + + # 将消息放入队列 + self.message_queue.put({ + 'type': msg_type, + 'data': msg_data, + 'timestamp': datetime.now() + }) +``` + +5. **消息发送示例**: +```python +# 发送命令消息 +def send_command(self, command_type: str, **kwargs): + """发送MAVLink命令""" + try: + # 构建命令消息 + msg = self.connection.mav.command_long_send( + self.connection.target_system, # 目标系统ID + self.connection.target_component, # 目标组件ID + command_type, # 命令类型 + 0, # 确认 + *kwargs.values() # 命令参数 + ) + return True + except Exception as e: + print(f"发送命令失败: {str(e)}") + return False +``` + +6. **消息转换和存储**: +```python +# 在 DatabaseHandler 中存储消息 +def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool: + """保存MAVLink消息到数据库""" + query = """ + INSERT INTO mavlink_messages + (message_type, message_data, timestamp) + VALUES (%s, %s, %s) + """ + params = ( + msg_type, + json.dumps(msg_data), # 将消息数据转换为JSON格式存储 + datetime.now() + ) + return self.execute_update(query, params) +``` + +7. **消息查询示例**: +```python +# 查询特定类型的消息 +def get_messages_by_type(self, msg_type: str, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None) -> List[Dict]: + """获取特定类型的MAVLink消息""" + query = """ + SELECT * FROM mavlink_messages + WHERE message_type = %s + """ + params = [msg_type] + + if start_time: + query += " AND timestamp >= %s" + params.append(start_time) + if end_time: + query += " AND timestamp <= %s" + params.append(end_time) + + query += " ORDER BY timestamp DESC" + return self.execute_query(query, tuple(params)) +``` + +MAVLink 消息的特点: +1. **二进制格式**:高效的数据传输 +2. **固定头部**:便于消息识别和解析 +3. **可变载荷**:支持不同类型的数据 +4. **CRC校验**:确保数据完整性 +5. **系统/组件ID**:支持多设备通信 +6. **序列号**:支持消息顺序和丢失检测 + +在实际使用中,我们不需要直接处理二进制格式,因为 pymavlink 库已经帮我们处理了消息的编码和解码。我们只需要关注消息的内容和类型即可。 + +需要我详细解释某个具体的消息类型或处理过程吗? diff --git a/Src/command_center/communication/__init__.py b/Src/command_center/communication/__init__.py new file mode 100644 index 00000000..2bf3b5d8 --- /dev/null +++ b/Src/command_center/communication/__init__.py @@ -0,0 +1,5 @@ +from .mavlink_handler import MavlinkHandler +from .database_handler import DatabaseHandler +from .message_queue import MessageQueue + +__all__ = ['MavlinkHandler', 'DatabaseHandler', 'MessageQueue'] \ No newline at end of file diff --git a/Src/command_center/communication/communication_manager.py b/Src/command_center/communication/communication_manager.py new file mode 100644 index 00000000..025dbc35 --- /dev/null +++ b/Src/command_center/communication/communication_manager.py @@ -0,0 +1,150 @@ +import threading +import time +from typing import Dict, Any, Optional +from .mavlink_handler import MavlinkHandler +from .database_handler import DatabaseHandler +from .message_queue import MessageQueue + +class CommunicationManager: + def __init__(self, mavlink_connection: str, db_config: Dict[str, str]): + self.mavlink = MavlinkHandler(mavlink_connection) + self.database = DatabaseHandler( + host=db_config['host'], + user=db_config['user'], + password=db_config['password'], + database=db_config['database'] + ) + self.running = False + self.thread = None + self.message_queue = MessageQueue() + + def start(self) -> bool: + """启动通信管理器""" + if not self.database.connect(): + return False + + # 创建数据库表 + self.database.create_tables() + + # 启动MAVLink连接 + if not self.mavlink.connect(): + return False + + # 注册MAVLink消息回调 + self.mavlink.register_callback('HEARTBEAT', self._handle_heartbeat) + self.mavlink.register_callback('GLOBAL_POSITION_INT', self._handle_position) + self.mavlink.register_callback('SYS_STATUS', self._handle_system_status) + + # 启动MAVLink消息接收 + self.mavlink.start() + + # 启动消息处理线程 + self.running = True + self.thread = threading.Thread(target=self._process_messages) + self.thread.daemon = True + self.thread.start() + + return True + + def stop(self): + """停止通信管理器""" + self.running = False + if self.thread: + self.thread.join() + self.mavlink.stop() + self.database.disconnect() + + def _process_messages(self): + """处理消息队列中的消息""" + while self.running: + try: + msg = self.mavlink.get_message_queue().get(timeout=1.0) + if msg: + self._handle_message(msg) + except Exception as e: + print(f"消息处理错误: {str(e)}") + time.sleep(1) + + def _handle_message(self, msg: Dict[str, Any]): + """处理接收到的消息""" + try: + msg_type = msg['message'].get_type() + msg_data = msg['message'].to_dict() + + # 保存消息到数据库 + self.database.save_mavlink_message(msg_type, msg_data) + + # 根据消息类型更新无人机状态 + if msg_type == 'GLOBAL_POSITION_INT': + self._update_drone_position(msg_data) + elif msg_type == 'SYS_STATUS': + self._update_drone_status(msg_data) + + except Exception as e: + print(f"消息处理失败: {str(e)}") + + def _handle_heartbeat(self, msg): + """处理心跳消息""" + self.message_queue.put({ + 'type': 'HEARTBEAT', + 'data': msg.to_dict() + }) + + def _handle_position(self, msg): + """处理位置消息""" + self.message_queue.put({ + 'type': 'POSITION', + 'data': msg.to_dict() + }) + + def _handle_system_status(self, msg): + """处理系统状态消息""" + self.message_queue.put({ + 'type': 'SYSTEM_STATUS', + 'data': msg.to_dict() + }) + + def _update_drone_position(self, position_data: Dict[str, Any]): + """更新无人机位置信息""" + query = """ + INSERT INTO drone_status + (drone_id, latitude, longitude, altitude, timestamp) + VALUES (%s, %s, %s, %s, %s) + """ + params = ( + position_data.get('sysid', 'unknown'), + position_data.get('lat', 0) / 1e7, # 转换为度 + position_data.get('lon', 0) / 1e7, # 转换为度 + position_data.get('alt', 0) / 1000, # 转换为米 + datetime.now() + ) + self.database.execute_update(query, params) + + def _update_drone_status(self, status_data: Dict[str, Any]): + """更新无人机状态信息""" + query = """ + UPDATE drone_status + SET battery_level = %s, status = %s + WHERE drone_id = %s + ORDER BY timestamp DESC LIMIT 1 + """ + params = ( + status_data.get('battery_remaining', 0), + status_data.get('system_status', 'UNKNOWN'), + status_data.get('sysid', 'unknown') + ) + self.database.execute_update(query, params) + + def get_drone_status(self, drone_id: str) -> Optional[Dict[str, Any]]: + """获取指定无人机的状态""" + query = """ + SELECT * FROM drone_status + WHERE drone_id = %s + ORDER BY timestamp DESC LIMIT 1 + """ + result = self.database.execute_query(query, (drone_id,)) + return result[0] if result else None + + def get_message_queue(self) -> MessageQueue: + """获取消息队列""" + return self.message_queue \ No newline at end of file diff --git a/Src/command_center/communication/database_handler.py b/Src/command_center/communication/database_handler.py new file mode 100644 index 00000000..159de9af --- /dev/null +++ b/Src/command_center/communication/database_handler.py @@ -0,0 +1,129 @@ +import mysql.connector +from mysql.connector import Error +from typing import Dict, List, Any, Optional +import json +from datetime import datetime + +class DatabaseHandler: + #替换数据库账号密码 + def __init__(self, host: str, user: str, password: str, database: str): + self.host = host + self.user = user + self.password = password + self.database = database + self.connection = None + self.cursor = None + + def connect(self) -> bool: + """连接到MySQL数据库""" + try: + self.connection = mysql.connector.connect( + host=self.host, + user=self.user, + password=self.password, + database=self.database + ) + self.cursor = self.connection.cursor(dictionary=True) + print(f"数据库连接成功: {self.database}") + return True + except Error as e: + print(f"数据库连接失败: {str(e)}") + return False + + def disconnect(self): + """断开数据库连接""" + if self.cursor: + self.cursor.close() + if self.connection: + self.connection.close() + + def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]: + """执行查询并返回结果""" + try: + self.cursor.execute(query, params) + return self.cursor.fetchall() + except Error as e: + print(f"查询执行失败: {str(e)}") + return [] + + def execute_update(self, query: str, params: Optional[tuple] = None) -> bool: + """执行更新操作""" + try: + self.cursor.execute(query, params) + self.connection.commit() + return True + except Error as e: + print(f"更新执行失败: {str(e)}") + self.connection.rollback() + return False + + def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool: + """保存MAVLink消息到数据库""" + query = """ + INSERT INTO mavlink_messages + (message_type, message_data, timestamp) + VALUES (%s, %s, %s) + """ + params = ( + msg_type, + json.dumps(msg_data), + datetime.now() + ) + return self.execute_update(query, params) + + def get_mavlink_messages(self, msg_type: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None) -> List[Dict]: + """获取MAVLink消息""" + query = "SELECT * FROM mavlink_messages WHERE 1=1" + params = [] + + if msg_type: + query += " AND message_type = %s" + params.append(msg_type) + if start_time: + query += " AND timestamp >= %s" + params.append(start_time) + if end_time: + query += " AND timestamp <= %s" + params.append(end_time) + + query += " ORDER BY timestamp DESC" + return self.execute_query(query, tuple(params) if params else None) + + def create_tables(self): + """创建必要的数据库表""" + queries = [ + """ + CREATE TABLE IF NOT EXISTS mavlink_messages ( + id INT AUTO_INCREMENT PRIMARY KEY, + message_type VARCHAR(50) NOT NULL, + message_data JSON NOT NULL, + timestamp DATETIME NOT NULL, + INDEX idx_message_type (message_type), + INDEX idx_timestamp (timestamp) + ) + """, + """ + CREATE TABLE IF NOT EXISTS drone_status ( + id INT AUTO_INCREMENT PRIMARY KEY, + drone_id VARCHAR(50) NOT NULL, + latitude FLOAT, + longitude FLOAT, + altitude FLOAT, + battery_level FLOAT, + status VARCHAR(50), + timestamp DATETIME NOT NULL, + INDEX idx_drone_id (drone_id), + INDEX idx_timestamp (timestamp) + ) + """ + ] + + for query in queries: + try: + self.cursor.execute(query) + self.connection.commit() + except Error as e: + print(f"创建表失败: {str(e)}") + self.connection.rollback() \ No newline at end of file diff --git a/Src/command_center/communication/mavlink_handler.py b/Src/command_center/communication/mavlink_handler.py new file mode 100644 index 00000000..2674e095 --- /dev/null +++ b/Src/command_center/communication/mavlink_handler.py @@ -0,0 +1,81 @@ +import pymavlink.mavutil as mavutil +import threading +import time +from typing import Callable, Dict, Any +from .message_queue import MessageQueue + +class MavlinkHandler: + def __init__(self, connection_string: str = "udp:127.0.0.1:14550"): + self.connection_string = connection_string + self.connection = None + self.message_queue = MessageQueue() + self.running = False + self.thread = None + self.callbacks: Dict[str, Callable] = {} + + def connect(self) -> bool: + """建立MAVLink连接""" + try: + self.connection = mavutil.mavlink_connection(self.connection_string)#连接无人机的IP地址、蓝牙、串口等 + self.connection.wait_heartbeat() + print(f"MAVLink连接成功: {self.connection_string}") + return True + except Exception as e: + print(f"MAVLink连接失败: {str(e)}") + return False + + def start(self): + """启动MAVLink消息接收线程""" + if not self.connection: + if not self.connect(): + return + + self.running = True + self.thread = threading.Thread(target=self._receive_loop) + self.thread.daemon = True + self.thread.start() + + def stop(self): + """停止MAVLink消息接收""" + self.running = False + if self.thread: + self.thread.join() + if self.connection: + self.connection.close() + + def _receive_loop(self): + """MAVLink消息接收循环""" + while self.running: + try: + msg = self.connection.recv_match(blocking=True, timeout=1.0) + if msg: + self._handle_message(msg) + except Exception as e: + print(f"接收消息错误: {str(e)}") + time.sleep(1) + + def _handle_message(self, msg): + """处理接收到的MAVLink消息""" + msg_type = msg.get_type() + if msg_type in self.callbacks: + self.callbacks[msg_type](msg) + self.message_queue.put(msg) + + def register_callback(self, msg_type: str, callback: Callable): + """注册消息回调函数""" + self.callbacks[msg_type] = callback + + def send_message(self, msg_type: str, **kwargs): + """发送MAVLink消息""" + if not self.connection: + return False + try: + self.connection.mav.send(mavutil.mavlink.MAVLink_message_type_map[msg_type](**kwargs)) + return True + except Exception as e: + print(f"发送消息失败: {str(e)}") + return False + + def get_message_queue(self) -> MessageQueue: + """获取消息队列""" + return self.message_queue \ No newline at end of file diff --git a/Src/command_center/communication/message_queue.py b/Src/command_center/communication/message_queue.py new file mode 100644 index 00000000..3e3db6cc --- /dev/null +++ b/Src/command_center/communication/message_queue.py @@ -0,0 +1,65 @@ +import queue +import threading +from typing import Any, Optional +from datetime import datetime + +class MessageQueue: + def __init__(self, maxsize: int = 1000): + self.queue = queue.Queue(maxsize=maxsize) + self.lock = threading.Lock() + self.callbacks = [] + + def put(self, message: Any): + """将消息放入队列""" + with self.lock: + try: + self.queue.put_nowait({ + 'message': message, + 'timestamp': datetime.now() + }) + self._notify_callbacks() + except queue.Full: + print("消息队列已满,丢弃消息") + + def get(self, block: bool = True, timeout: Optional[float] = None) -> Optional[dict]: + """从队列中获取消息""" + try: + return self.queue.get(block=block, timeout=timeout) + except queue.Empty: + return None + + def register_callback(self, callback: callable): + """注册消息回调函数""" + with self.lock: + self.callbacks.append(callback) + + def unregister_callback(self, callback: callable): + """注销消息回调函数""" + with self.lock: + if callback in self.callbacks: + self.callbacks.remove(callback) + + def _notify_callbacks(self): + """通知所有注册的回调函数""" + for callback in self.callbacks: + try: + callback() + except Exception as e: + print(f"回调函数执行失败: {str(e)}") + + def clear(self): + """清空消息队列""" + with self.lock: + while not self.queue.empty(): + try: + self.queue.get_nowait() + except queue.Empty: + break + + def size(self) -> int: + """获取队列大小""" + return self.queue.qsize() + + def is_empty(self) -> bool: + """检查队列是否为空""" + return self.queue.empty() \ No newline at end of file