@ -0,0 +1,13 @@
|
||||
from .mavlink_handler import MavlinkHandler
|
||||
from .database_handler import DatabaseHandler
|
||||
from .message_queue import MessageQueue
|
||||
from .drone_connection_manager import DroneConnectionManager
|
||||
from .communication_manager import CommunicationManager
|
||||
|
||||
__all__ = [
|
||||
'MavlinkHandler',
|
||||
'DatabaseHandler',
|
||||
'MessageQueue',
|
||||
'DroneConnectionManager',
|
||||
'CommunicationManager'
|
||||
]
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,181 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, Any, Optional, List
|
||||
from .mavlink_handler import MavlinkHandler
|
||||
from .database_handler import DatabaseHandler
|
||||
from .message_queue import MessageQueue
|
||||
from .drone_connection_manager import DroneConnectionManager
|
||||
|
||||
class CommunicationManager:
|
||||
def __init__(self, mavlink_connection: str, db_config: Dict[str, str]):
|
||||
self.mavlink = MavlinkHandler(mavlink_connection)
|
||||
self.database = DatabaseHandler(
|
||||
host=db_config['host'],
|
||||
user=db_config['user'],
|
||||
password=db_config['password'],
|
||||
database=db_config['database']
|
||||
)
|
||||
self.drone_manager = DroneConnectionManager(self.database)
|
||||
self.running = False
|
||||
self.thread = None
|
||||
self.message_queue = MessageQueue()
|
||||
|
||||
def start(self) -> bool:
|
||||
"""启动通信管理器"""
|
||||
if not self.database.connect():
|
||||
return False
|
||||
|
||||
# 创建数据库表
|
||||
self.database.create_tables()
|
||||
|
||||
# 启动MAVLink连接
|
||||
if not self.mavlink.connect():
|
||||
return False
|
||||
|
||||
# 注册MAVLink消息回调
|
||||
self.mavlink.register_callback('HEARTBEAT', self._handle_heartbeat)
|
||||
self.mavlink.register_callback('GLOBAL_POSITION_INT', self._handle_position)
|
||||
self.mavlink.register_callback('SYS_STATUS', self._handle_system_status)
|
||||
|
||||
# 启动MAVLink消息接收
|
||||
self.mavlink.start()
|
||||
|
||||
# 启动无人机连接管理器
|
||||
self.drone_manager.start()
|
||||
|
||||
# 启动消息处理线程
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self._process_messages)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
"""停止通信管理器"""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
self.mavlink.stop()
|
||||
self.drone_manager.stop()
|
||||
self.database.disconnect()
|
||||
|
||||
def add_drone(self, drone_id: str, ip_address: str, port: int,
|
||||
connection_type: str = 'udp') -> bool:
|
||||
"""添加新的无人机连接"""
|
||||
return self.database.add_drone_connection(drone_id, ip_address, port, connection_type)
|
||||
|
||||
def connect_drone(self, drone_id: str) -> bool:
|
||||
"""连接指定的无人机"""
|
||||
return self.drone_manager.connect_drone(drone_id)
|
||||
|
||||
def disconnect_drone(self, drone_id: str):
|
||||
"""断开指定无人机的连接"""
|
||||
self.drone_manager.disconnect_drone(drone_id)
|
||||
|
||||
def get_connected_drones(self) -> List[str]:
|
||||
"""获取所有已连接的无人机ID"""
|
||||
return self.drone_manager.get_connected_drones()
|
||||
|
||||
def is_drone_connected(self, drone_id: str) -> bool:
|
||||
"""检查指定无人机是否已连接"""
|
||||
return self.drone_manager.is_drone_connected(drone_id)
|
||||
|
||||
def send_command(self, drone_id: str, command_type: str, **kwargs) -> bool:
|
||||
"""向指定无人机发送命令"""
|
||||
return self.drone_manager.send_command(drone_id, command_type, **kwargs)
|
||||
|
||||
def _process_messages(self):
|
||||
"""处理消息队列中的消息"""
|
||||
while self.running:
|
||||
try:
|
||||
msg = self.mavlink.get_message_queue().get(timeout=1.0)
|
||||
if msg:
|
||||
self._handle_message(msg)
|
||||
except Exception as e:
|
||||
print(f"消息处理错误: {str(e)}")
|
||||
time.sleep(1)
|
||||
|
||||
def _handle_message(self, msg: Dict[str, Any]):
|
||||
"""处理接收到的消息"""
|
||||
try:
|
||||
msg_type = msg['message'].get_type()
|
||||
msg_data = msg['message'].to_dict()
|
||||
|
||||
# 保存消息到数据库
|
||||
self.database.save_mavlink_message(msg_type, msg_data)
|
||||
|
||||
# 根据消息类型更新无人机状态
|
||||
if msg_type == 'GLOBAL_POSITION_INT':
|
||||
self._update_drone_position(msg_data)
|
||||
elif msg_type == 'SYS_STATUS':
|
||||
self._update_drone_status(msg_data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"消息处理失败: {str(e)}")
|
||||
|
||||
def _handle_heartbeat(self, msg):
|
||||
"""处理心跳消息"""
|
||||
self.message_queue.put({
|
||||
'type': 'HEARTBEAT',
|
||||
'data': msg.to_dict()
|
||||
})
|
||||
|
||||
def _handle_position(self, msg):
|
||||
"""处理位置消息"""
|
||||
self.message_queue.put({
|
||||
'type': 'POSITION',
|
||||
'data': msg.to_dict()
|
||||
})
|
||||
|
||||
def _handle_system_status(self, msg):
|
||||
"""处理系统状态消息"""
|
||||
self.message_queue.put({
|
||||
'type': 'SYSTEM_STATUS',
|
||||
'data': msg.to_dict()
|
||||
})
|
||||
|
||||
def _update_drone_position(self, position_data: Dict[str, Any]):
|
||||
"""更新无人机位置信息"""
|
||||
query = """
|
||||
INSERT INTO drone_status
|
||||
(drone_id, latitude, longitude, altitude, timestamp)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
"""
|
||||
params = (
|
||||
position_data.get('sysid', 'unknown'),
|
||||
position_data.get('lat', 0) / 1e7, # 转换为度
|
||||
position_data.get('lon', 0) / 1e7, # 转换为度
|
||||
position_data.get('alt', 0) / 1000, # 转换为米
|
||||
datetime.now()
|
||||
)
|
||||
self.database.execute_update(query, params)
|
||||
|
||||
def _update_drone_status(self, status_data: Dict[str, Any]):
|
||||
"""更新无人机状态信息"""
|
||||
query = """
|
||||
UPDATE drone_status
|
||||
SET battery_level = %s, status = %s
|
||||
WHERE drone_id = %s
|
||||
ORDER BY timestamp DESC LIMIT 1
|
||||
"""
|
||||
params = (
|
||||
status_data.get('battery_remaining', 0),
|
||||
status_data.get('system_status', 'UNKNOWN'),
|
||||
status_data.get('sysid', 'unknown')
|
||||
)
|
||||
self.database.execute_update(query, params)
|
||||
|
||||
def get_drone_status(self, drone_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""获取指定无人机的状态"""
|
||||
query = """
|
||||
SELECT * FROM drone_status
|
||||
WHERE drone_id = %s
|
||||
ORDER BY timestamp DESC LIMIT 1
|
||||
"""
|
||||
result = self.database.execute_query(query, (drone_id,))
|
||||
return result[0] if result else None
|
||||
|
||||
def get_message_queue(self) -> MessageQueue:
|
||||
"""获取消息队列"""
|
||||
return self.message_queue
|
||||
@ -0,0 +1,188 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from typing import Dict, List, Any, Optional
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
class DatabaseHandler:
|
||||
#替换数据库账号密码
|
||||
def __init__(self, host: str, user: str, password: str, database: str):
|
||||
self.host = host
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.database = database
|
||||
self.connection = None
|
||||
self.cursor = None
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""连接到MySQL数据库"""
|
||||
try:
|
||||
self.connection = mysql.connector.connect(
|
||||
host=self.host,
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
database=self.database
|
||||
)
|
||||
self.cursor = self.connection.cursor(dictionary=True)
|
||||
print(f"数据库连接成功: {self.database}")
|
||||
return True
|
||||
except Error as e:
|
||||
print(f"数据库连接失败: {str(e)}")
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""断开数据库连接"""
|
||||
if self.cursor:
|
||||
self.cursor.close()
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
|
||||
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]:
|
||||
"""执行查询并返回结果"""
|
||||
try:
|
||||
self.cursor.execute(query, params)
|
||||
return self.cursor.fetchall()
|
||||
except Error as e:
|
||||
print(f"查询执行失败: {str(e)}")
|
||||
return []
|
||||
|
||||
def execute_update(self, query: str, params: Optional[tuple] = None) -> bool:
|
||||
"""执行更新操作"""
|
||||
try:
|
||||
self.cursor.execute(query, params)
|
||||
self.connection.commit()
|
||||
return True
|
||||
except Error as e:
|
||||
print(f"更新执行失败: {str(e)}")
|
||||
self.connection.rollback()
|
||||
return False
|
||||
|
||||
def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool:
|
||||
"""保存MAVLink消息到数据库"""
|
||||
query = """
|
||||
INSERT INTO mavlink_messages
|
||||
(message_type, message_data, timestamp)
|
||||
VALUES (%s, %s, %s)
|
||||
"""
|
||||
params = (
|
||||
msg_type,
|
||||
json.dumps(msg_data),
|
||||
datetime.now()
|
||||
)
|
||||
return self.execute_update(query, params)
|
||||
|
||||
def get_mavlink_messages(self, msg_type: Optional[str] = None,
|
||||
start_time: Optional[datetime] = None,
|
||||
end_time: Optional[datetime] = None) -> List[Dict]:
|
||||
"""获取MAVLink消息"""
|
||||
query = "SELECT * FROM mavlink_messages WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if msg_type:
|
||||
query += " AND message_type = %s"
|
||||
params.append(msg_type)
|
||||
if start_time:
|
||||
query += " AND timestamp >= %s"
|
||||
params.append(start_time)
|
||||
if end_time:
|
||||
query += " AND timestamp <= %s"
|
||||
params.append(end_time)
|
||||
|
||||
query += " ORDER BY timestamp DESC"
|
||||
return self.execute_query(query, tuple(params) if params else None)
|
||||
|
||||
def create_tables(self):
|
||||
"""创建必要的数据库表"""
|
||||
queries = [
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS mavlink_messages (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
message_type VARCHAR(50) NOT NULL,
|
||||
message_data JSON NOT NULL,
|
||||
timestamp DATETIME NOT NULL,
|
||||
INDEX idx_message_type (message_type),
|
||||
INDEX idx_timestamp (timestamp)
|
||||
)
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS drone_status (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
drone_id VARCHAR(50) NOT NULL,
|
||||
latitude FLOAT,
|
||||
longitude FLOAT,
|
||||
altitude FLOAT,
|
||||
battery_level FLOAT,
|
||||
status VARCHAR(50),
|
||||
timestamp DATETIME NOT NULL,
|
||||
INDEX idx_drone_id (drone_id),
|
||||
INDEX idx_timestamp (timestamp)
|
||||
)
|
||||
""",
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS drone_connections (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
drone_id VARCHAR(50) NOT NULL UNIQUE,
|
||||
ip_address VARCHAR(50) NOT NULL,
|
||||
port INT NOT NULL,
|
||||
connection_type VARCHAR(20) NOT NULL,
|
||||
is_active BOOLEAN DEFAULT TRUE,
|
||||
last_connected DATETIME,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
INDEX idx_drone_id (drone_id),
|
||||
INDEX idx_is_active (is_active)
|
||||
)
|
||||
"""
|
||||
]
|
||||
|
||||
for query in queries:
|
||||
try:
|
||||
self.cursor.execute(query)
|
||||
self.connection.commit()
|
||||
except Error as e:
|
||||
print(f"创建表失败: {str(e)}")
|
||||
self.connection.rollback()
|
||||
|
||||
def get_all_active_drones(self) -> List[Dict[str, Any]]:
|
||||
"""获取所有活跃的无人机连接信息"""
|
||||
query = """
|
||||
SELECT * FROM drone_connections
|
||||
WHERE is_active = TRUE
|
||||
"""
|
||||
return self.execute_query(query)
|
||||
|
||||
def get_drone_connection(self, drone_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""获取指定无人机的连接信息"""
|
||||
query = """
|
||||
SELECT * FROM drone_connections
|
||||
WHERE drone_id = %s AND is_active = TRUE
|
||||
"""
|
||||
result = self.execute_query(query, (drone_id,))
|
||||
return result[0] if result else None
|
||||
|
||||
def update_drone_connection_status(self, drone_id: str, is_active: bool) -> bool:
|
||||
"""更新无人机连接状态"""
|
||||
query = """
|
||||
UPDATE drone_connections
|
||||
SET is_active = %s, last_connected = %s
|
||||
WHERE drone_id = %s
|
||||
"""
|
||||
params = (is_active, datetime.now(), drone_id)
|
||||
return self.execute_update(query, params)
|
||||
|
||||
def add_drone_connection(self, drone_id: str, ip_address: str, port: int,
|
||||
connection_type: str = 'udp') -> bool:
|
||||
"""添加新的无人机连接信息"""
|
||||
query = """
|
||||
INSERT INTO drone_connections
|
||||
(drone_id, ip_address, port, connection_type)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
ip_address = VALUES(ip_address),
|
||||
port = VALUES(port),
|
||||
connection_type = VALUES(connection_type),
|
||||
is_active = TRUE,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
"""
|
||||
params = (drone_id, ip_address, port, connection_type)
|
||||
return self.execute_update(query, params)
|
||||
@ -0,0 +1,144 @@
|
||||
from typing import Dict, List, Optional
|
||||
from .mavlink_handler import MavlinkHandler
|
||||
from .database_handler import DatabaseHandler
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
class DroneConnectionManager:
|
||||
def __init__(self, db_handler: DatabaseHandler):
|
||||
self.db_handler = db_handler
|
||||
self.drone_handlers: Dict[str, MavlinkHandler] = {}
|
||||
self.last_heartbeat: Dict[str, datetime] = {}
|
||||
self.is_running = False
|
||||
self.monitor_thread = None
|
||||
self.TIMEOUT_SECONDS = 60 # 1分钟超时
|
||||
|
||||
def start(self):
|
||||
"""启动连接管理器"""
|
||||
if not self.is_running:
|
||||
self.is_running = True
|
||||
self.monitor_thread = threading.Thread(target=self._monitor_connections)
|
||||
self.monitor_thread.daemon = True
|
||||
self.monitor_thread.start()
|
||||
# 连接所有活跃的无人机
|
||||
self._connect_all_active_drones()
|
||||
return True
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
"""停止连接管理器"""
|
||||
self.is_running = False
|
||||
if self.monitor_thread:
|
||||
self.monitor_thread.join()
|
||||
self._disconnect_all_drones()
|
||||
|
||||
def _connect_all_active_drones(self):
|
||||
"""连接所有活跃的无人机"""
|
||||
active_drones = self.db_handler.get_all_active_drones()
|
||||
for drone in active_drones:
|
||||
self.connect_drone(drone['drone_id'], drone['ip_address'], drone['port'])
|
||||
|
||||
def _disconnect_all_drones(self):
|
||||
"""断开所有无人机的连接"""
|
||||
for drone_id in list(self.drone_handlers.keys()):
|
||||
self.disconnect_drone(drone_id)
|
||||
|
||||
def connect_drone(self, drone_id: str, ip: str, port: int) -> bool:
|
||||
"""连接到指定无人机"""
|
||||
try:
|
||||
if drone_id in self.drone_handlers:
|
||||
return True
|
||||
|
||||
handler = MavlinkHandler(f"udp:{ip}:{port}")
|
||||
if handler.connect():
|
||||
self.drone_handlers[drone_id] = handler
|
||||
self.last_heartbeat[drone_id] = datetime.now()
|
||||
self.db_handler.update_drone_connection_status(drone_id, "connected")
|
||||
# 启动消息接收线程
|
||||
handler.start()
|
||||
print(f"成功连接到无人机 {drone_id}")
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"连接无人机 {drone_id} 失败: {str(e)}")
|
||||
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
|
||||
return False
|
||||
|
||||
def disconnect_drone(self, drone_id: str):
|
||||
"""断开与指定无人机的连接"""
|
||||
if drone_id in self.drone_handlers:
|
||||
try:
|
||||
handler = self.drone_handlers[drone_id]
|
||||
handler.stop() # 停止消息接收线程
|
||||
handler.disconnect()
|
||||
del self.drone_handlers[drone_id]
|
||||
if drone_id in self.last_heartbeat:
|
||||
del self.last_heartbeat[drone_id]
|
||||
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
|
||||
print(f"已断开与无人机 {drone_id} 的连接")
|
||||
except Exception as e:
|
||||
print(f"断开无人机 {drone_id} 连接失败: {str(e)}")
|
||||
|
||||
def _monitor_connections(self):
|
||||
"""监控所有无人机连接状态"""
|
||||
while self.is_running:
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
for drone_id, handler in list(self.drone_handlers.items()):
|
||||
# 检查最后心跳时间
|
||||
if drone_id in self.last_heartbeat:
|
||||
time_since_last_heartbeat = (current_time - self.last_heartbeat[drone_id]).total_seconds()
|
||||
if time_since_last_heartbeat > self.TIMEOUT_SECONDS:
|
||||
print(f"无人机 {drone_id} 通信超时")
|
||||
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
|
||||
self.disconnect_drone(drone_id)
|
||||
continue
|
||||
|
||||
# 检查是否有新的心跳消息
|
||||
try:
|
||||
msg = handler.receive_message()
|
||||
if msg and msg.get_type() == 'HEARTBEAT':
|
||||
self.last_heartbeat[drone_id] = current_time
|
||||
self.db_handler.update_drone_connection_status(drone_id, "connected")
|
||||
except Exception as e:
|
||||
print(f"接收无人机 {drone_id} 消息失败: {str(e)}")
|
||||
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
|
||||
self.disconnect_drone(drone_id)
|
||||
|
||||
# 检查是否有新的活跃无人机需要连接
|
||||
active_drones = self.db_handler.get_all_active_drones()
|
||||
for drone in active_drones:
|
||||
if drone['drone_id'] not in self.drone_handlers:
|
||||
print(f"发现新的活跃无人机 {drone['drone_id']},尝试连接")
|
||||
self.connect_drone(drone['drone_id'], drone['ip_address'], drone['port'])
|
||||
|
||||
except Exception as e:
|
||||
print(f"监控连接状态时发生错误: {str(e)}")
|
||||
|
||||
time.sleep(1) # 每秒检查一次
|
||||
|
||||
def get_drone_handler(self, drone_id: str) -> Optional[MavlinkHandler]:
|
||||
"""获取指定无人机的处理器"""
|
||||
return self.drone_handlers.get(drone_id)
|
||||
|
||||
def get_connected_drones(self) -> List[str]:
|
||||
"""获取所有已连接的无人机ID"""
|
||||
return list(self.drone_handlers.keys())
|
||||
|
||||
def is_drone_connected(self, drone_id: str) -> bool:
|
||||
"""检查指定无人机是否已连接"""
|
||||
return drone_id in self.drone_handlers
|
||||
|
||||
def send_command(self, drone_id: str, command: str, params: dict = None) -> bool:
|
||||
"""向指定无人机发送命令"""
|
||||
handler = self.get_drone_handler(drone_id)
|
||||
if handler:
|
||||
try:
|
||||
handler.send_command(command, params)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"发送命令到无人机 {drone_id} 失败: {str(e)}")
|
||||
self.db_handler.update_drone_connection_status(drone_id, "disconnected")
|
||||
self.disconnect_drone(drone_id)
|
||||
return False
|
||||
@ -0,0 +1,81 @@
|
||||
import pymavlink.mavutil as mavutil
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable, Dict, Any
|
||||
from .message_queue import MessageQueue
|
||||
|
||||
class MavlinkHandler:
|
||||
def __init__(self, connection_string: str = "udp:127.0.0.1:14550"):
|
||||
self.connection_string = connection_string
|
||||
self.connection = None
|
||||
self.message_queue = MessageQueue()
|
||||
self.running = False
|
||||
self.thread = None
|
||||
self.callbacks: Dict[str, Callable] = {}
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""建立MAVLink连接"""
|
||||
try:
|
||||
self.connection = mavutil.mavlink_connection(self.connection_string)#连接无人机的IP地址、蓝牙、串口等
|
||||
self.connection.wait_heartbeat()
|
||||
print(f"MAVLink连接成功: {self.connection_string}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"MAVLink连接失败: {str(e)}")
|
||||
return False
|
||||
|
||||
def start(self):
|
||||
"""启动MAVLink消息接收线程"""
|
||||
if not self.connection:
|
||||
if not self.connect():
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self._receive_loop)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""停止MAVLink消息接收"""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
|
||||
def _receive_loop(self):
|
||||
"""MAVLink消息接收循环"""
|
||||
while self.running:
|
||||
try:
|
||||
msg = self.connection.recv_match(blocking=True, timeout=1.0)
|
||||
if msg:
|
||||
self._handle_message(msg)
|
||||
except Exception as e:
|
||||
print(f"接收消息错误: {str(e)}")
|
||||
time.sleep(1)
|
||||
|
||||
def _handle_message(self, msg):
|
||||
"""处理接收到的MAVLink消息"""
|
||||
msg_type = msg.get_type()
|
||||
if msg_type in self.callbacks:
|
||||
self.callbacks[msg_type](msg)
|
||||
self.message_queue.put(msg)
|
||||
|
||||
def register_callback(self, msg_type: str, callback: Callable):
|
||||
"""注册消息回调函数"""
|
||||
self.callbacks[msg_type] = callback
|
||||
|
||||
def send_message(self, msg_type: str, **kwargs):
|
||||
"""发送MAVLink消息"""
|
||||
if not self.connection:
|
||||
return False
|
||||
try:
|
||||
self.connection.mav.send(mavutil.mavlink.MAVLink_message_type_map[msg_type](**kwargs))
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"发送消息失败: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_message_queue(self) -> MessageQueue:
|
||||
"""获取消息队列"""
|
||||
return self.message_queue
|
||||
@ -0,0 +1,65 @@
|
||||
import queue
|
||||
import threading
|
||||
from typing import Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
class MessageQueue:
|
||||
def __init__(self, maxsize: int = 1000):
|
||||
self.queue = queue.Queue(maxsize=maxsize)
|
||||
self.lock = threading.Lock()
|
||||
self.callbacks = []
|
||||
|
||||
def put(self, message: Any):
|
||||
"""将消息放入队列"""
|
||||
with self.lock:
|
||||
try:
|
||||
self.queue.put_nowait({
|
||||
'message': message,
|
||||
'timestamp': datetime.now()
|
||||
})
|
||||
self._notify_callbacks()
|
||||
except queue.Full:
|
||||
print("消息队列已满,丢弃消息")
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None) -> Optional[dict]:
|
||||
"""从队列中获取消息"""
|
||||
try:
|
||||
return self.queue.get(block=block, timeout=timeout)
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def register_callback(self, callback: callable):
|
||||
"""注册消息回调函数"""
|
||||
with self.lock:
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def unregister_callback(self, callback: callable):
|
||||
"""注销消息回调函数"""
|
||||
with self.lock:
|
||||
if callback in self.callbacks:
|
||||
self.callbacks.remove(callback)
|
||||
|
||||
def _notify_callbacks(self):
|
||||
"""通知所有注册的回调函数"""
|
||||
for callback in self.callbacks:
|
||||
try:
|
||||
callback()
|
||||
except Exception as e:
|
||||
print(f"回调函数执行失败: {str(e)}")
|
||||
|
||||
def clear(self):
|
||||
"""清空消息队列"""
|
||||
with self.lock:
|
||||
while not self.queue.empty():
|
||||
try:
|
||||
self.queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
def size(self) -> int:
|
||||
"""获取队列大小"""
|
||||
return self.queue.qsize()
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
"""检查队列是否为空"""
|
||||
return self.queue.empty()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in new issue