You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
CSAR/src/software/simulation.py

194 lines
5.6 KiB

#!/usr/bin/env python3
import os
import sys
import time
import logging
import random
import threading
import signal
import argparse
from config.config import current_config as config
# 确保环境变量设置为启用模拟
os.environ['ENABLE_SIMULATION'] = 'True'
# 设置日志
config.setup_logging()
logger = logging.getLogger(__name__)
class DroneSimulator:
"""无人机模拟器"""
def __init__(self, port=14550):
"""初始化模拟器"""
self.port = port
self.running = False
self.thread = None
self.latitude = config.BASE_LATITUDE
self.longitude = config.BASE_LONGITUDE
self.altitude = 0.0
self.battery = 100.0
self.armed = False
self.mode = "GUIDED"
def start(self):
"""启动模拟器"""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._simulation_loop, daemon=True)
self.thread.start()
logger.info(f"模拟器已启动在端口 {self.port}")
def stop(self):
"""停止模拟器"""
if not self.running:
return
self.running = False
if self.thread:
self.thread.join(timeout=2)
logger.info("模拟器已停止")
def _simulation_loop(self):
"""模拟循环"""
while self.running:
# 模拟电池消耗
if self.armed and self.battery > 0:
self.battery -= 0.01
if self.battery < 0:
self.battery = 0
# 模拟位置变化
if self.armed and self.mode == "GUIDED" and self.altitude > 0:
# 随机漂移
self.latitude += random.uniform(-0.00001, 0.00001)
self.longitude += random.uniform(-0.00001, 0.00001)
time.sleep(0.1)
def arm(self):
"""解锁无人机"""
self.armed = True
logger.info("模拟无人机已解锁")
def disarm(self):
"""锁定无人机"""
self.armed = False
logger.info("模拟无人机已锁定")
def set_mode(self, mode):
"""设置模式"""
self.mode = mode
logger.info(f"模拟无人机模式已设置为 {mode}")
def takeoff(self, altitude):
"""起飞"""
self.arm()
self.altitude = altitude
logger.info(f"模拟无人机起飞到高度 {altitude}m")
def land(self):
"""降落"""
self.altitude = 0
self.disarm()
logger.info("模拟无人机已降落")
def goto(self, lat, lon, alt):
"""飞往指定位置"""
logger.info(f"模拟无人机飞往 ({lat}, {lon}, {alt})")
# 模拟飞行时间
distance = ((lat - self.latitude) ** 2 + (lon - self.longitude) ** 2) ** 0.5 * 111000
speed = 5.0 # 假设速度为5m/s
flight_time = distance / speed
# 模拟飞行过程
start_lat = self.latitude
start_lon = self.longitude
start_alt = self.altitude
for i in range(10):
progress = (i + 1) / 10
self.latitude = start_lat + (lat - start_lat) * progress
self.longitude = start_lon + (lon - start_lon) * progress
self.altitude = start_alt + (alt - start_alt) * progress
time.sleep(flight_time / 10)
self.latitude = lat
self.longitude = lon
self.altitude = alt
logger.info(f"模拟无人机已到达 ({lat}, {lon}, {alt})")
def setup_simulation():
"""设置模拟环境"""
# 启动模拟器
simulator = DroneSimulator()
simulator.start()
# 创建SITL连接字符串和覆盖环境变量
os.environ['DRONE_CONNECTION_STRING'] = f'udpin:localhost:{simulator.port}'
return simulator
def run_simulation():
"""运行模拟"""
# 设置参数解析
parser = argparse.ArgumentParser(description='智能战场医疗后送系统模拟器')
parser.add_argument('--web-port', type=int, default=8080, help='Web服务端口')
parser.add_argument('--comm-port', type=int, default=5000, help='通信服务端口')
parser.add_argument('--drone-port', type=int, default=14550, help='无人机模拟器端口')
args = parser.parse_args()
# 设置环境变量
os.environ['WEB_PORT'] = str(args.web_port)
os.environ['COMM_PORT'] = str(args.comm_port)
# 启动模拟器
simulator = DroneSimulator(port=args.drone_port)
simulator.start()
# 导入主程序模块
from main import MedicalEvacuationSystem
# 创建系统实例
system = MedicalEvacuationSystem()
# 注册信号处理
def signal_handler(sig, frame):
logger.info("接收到停止信号,正在关闭模拟器...")
system.stop()
simulator.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 启动系统
try:
if system.start():
logger.info("系统启动成功,运行在模拟模式")
# 保持程序运行
while True:
time.sleep(1)
else:
logger.error("系统启动失败")
simulator.stop()
return 1
except Exception as e:
logger.error(f"模拟运行错误: {str(e)}")
system.stop()
simulator.stop()
return 1
return 0
if __name__ == "__main__":
exit_code = run_simulation()
sys.exit(exit_code)