You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
project/Src/command_center/communication/database_handler.py

129 lines
4.3 KiB

import mysql.connector
from mysql.connector import Error
from typing import Dict, List, Any, Optional
import json
from datetime import datetime
class DatabaseHandler:
#替换数据库账号密码
def __init__(self, host: str, user: str, password: str, database: str):
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
self.cursor = None
def connect(self) -> bool:
"""连接到MySQL数据库"""
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
self.cursor = self.connection.cursor(dictionary=True)
print(f"数据库连接成功: {self.database}")
return True
except Error as e:
print(f"数据库连接失败: {str(e)}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict]:
"""执行查询并返回结果"""
try:
self.cursor.execute(query, params)
return self.cursor.fetchall()
except Error as e:
print(f"查询执行失败: {str(e)}")
return []
def execute_update(self, query: str, params: Optional[tuple] = None) -> bool:
"""执行更新操作"""
try:
self.cursor.execute(query, params)
self.connection.commit()
return True
except Error as e:
print(f"更新执行失败: {str(e)}")
self.connection.rollback()
return False
def save_mavlink_message(self, msg_type: str, msg_data: Dict) -> bool:
"""保存MAVLink消息到数据库"""
query = """
INSERT INTO mavlink_messages
(message_type, message_data, timestamp)
VALUES (%s, %s, %s)
"""
params = (
msg_type,
json.dumps(msg_data),
datetime.now()
)
return self.execute_update(query, params)
def get_mavlink_messages(self, msg_type: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None) -> List[Dict]:
"""获取MAVLink消息"""
query = "SELECT * FROM mavlink_messages WHERE 1=1"
params = []
if msg_type:
query += " AND message_type = %s"
params.append(msg_type)
if start_time:
query += " AND timestamp >= %s"
params.append(start_time)
if end_time:
query += " AND timestamp <= %s"
params.append(end_time)
query += " ORDER BY timestamp DESC"
return self.execute_query(query, tuple(params) if params else None)
def create_tables(self):
"""创建必要的数据库表"""
queries = [
"""
CREATE TABLE IF NOT EXISTS mavlink_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
message_type VARCHAR(50) NOT NULL,
message_data JSON NOT NULL,
timestamp DATETIME NOT NULL,
INDEX idx_message_type (message_type),
INDEX idx_timestamp (timestamp)
)
""",
"""
CREATE TABLE IF NOT EXISTS drone_status (
id INT AUTO_INCREMENT PRIMARY KEY,
drone_id VARCHAR(50) NOT NULL,
latitude FLOAT,
longitude FLOAT,
altitude FLOAT,
battery_level FLOAT,
status VARCHAR(50),
timestamp DATETIME NOT NULL,
INDEX idx_drone_id (drone_id),
INDEX idx_timestamp (timestamp)
)
"""
]
for query in queries:
try:
self.cursor.execute(query)
self.connection.commit()
except Error as e:
print(f"创建表失败: {str(e)}")
self.connection.rollback()