You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.5 KiB

#!/usr/bin/env python3
"""AudioSentinel — Obstacle detection and graded audio warning system."""
import os
import sys
import signal
import time
import threading
import logging
# ── Bootstrap: locate and configure cyber module ──────────────────────────
def _find_cyber():
try:
from cyber.python.cyber_py3 import cyber # noqa: F401
return
except (ImportError, ModuleNotFoundError):
pass
_unshadow_cyber()
candidates = [
"/opt/apollo/neo",
"/home/apollo/application-core/.aem/envroot/opt/apollo/neo",
]
ah = os.environ.get("APOLLO_HOME", "")
if ah:
candidates.insert(0, ah)
dist_home = None
fastrtps_lib = None
for d in candidates:
p = os.path.join(d, "python", "cyber")
f = os.path.join(d, "packages", "3rd-fastdds-wrap", "lib")
if not os.path.isdir(f):
f = os.path.join(d, "packages", "3rd-fastrtps", "lib")
if os.path.isdir(p):
dist_home = d
fastrtps_lib = f if os.path.isdir(f) else None
break
if dist_home is None:
print("[WARN] Could not locate cyber module. Ensure you are running "
"inside an Apollo Docker container.", file=sys.stderr)
return
os.environ["APOLLO_DISTRIBUTION_HOME"] = dist_home
if fastrtps_lib:
existing = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = fastrtps_lib + (":" + existing if existing else "")
parent = os.path.join(dist_home, "python")
if parent not in sys.path:
sys.path.insert(0, parent)
internal = os.path.join(dist_home, "lib", "cyber", "python", "internal")
if os.path.isdir(internal) and internal not in sys.path:
sys.path.insert(0, internal)
def _unshadow_cyber():
for path in list(sys.path):
if "site-packages" in path:
cyber_dir = os.path.join(path, "cyber")
init_py = os.path.join(cyber_dir, "__init__.py")
if os.path.isfile(init_py):
bak = init_py + ".cyber_bak"
try:
os.rename(init_py, bak)
except OSError:
pass
break
_find_cyber()
# ── Imports ──────────────────────────────────────────────────────────────
from cyber.python.cyber_py3 import cyber
from config_manager import ConfigManager
from speed_evaluator import SpeedEvaluator
from audio_controller import AudioController
from reporter import Reporter
from models import RiskLevel, SpeedAlert
logger = logging.getLogger("AudioSentinel")
def main():
cyber.init("audio_sentinel")
config = ConfigManager("configs")
# ── Modules ──────────────────────────────────────────────────────
speed_evaluator = SpeedEvaluator(config)
audio_controller = AudioController(config)
reporter = Reporter()
# ── Cyber RT Node + Subscriptions ────────────────────────────────
cyber_node = cyber.Node("audio_sentinel_node")
obstacles_raw = [None]
loc_raw = [None]
def obs_cb(data):
obstacles_raw[0] = data
def loc_cb(data):
loc_raw[0] = data
channels = config.get("channels", {})
obs_topic = channels.get("obstacles", "/apollo/perception/obstacles")
loc_topic = channels.get("localization", "/apollo/localization/pose")
try:
cyber_node.create_rawdata_reader(obs_topic, obs_cb)
logger.info(f"Subscribed to {obs_topic}")
except Exception as e:
logger.error(f"Failed to subscribe to {obs_topic}: {e}")
try:
cyber_node.create_rawdata_reader(loc_topic, loc_cb)
logger.info(f"Subscribed to {loc_topic}")
except Exception as e:
logger.error(f"Failed to subscribe to {loc_topic}: {e}")
spin_thread = threading.Thread(target=cyber_node.spin, daemon=True)
spin_thread.start()
logger.info("AudioSentinel STARTED")
# ── Shutdown handler ─────────────────────────────────────────────
def shutdown_handler(signum, frame):
logger.info("Received signal, shutting down...")
cyber.shutdown()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
# ── Main loop ────────────────────────────────────────────────────
try:
while cyber.ok():
alert = speed_evaluator.evaluate(obstacles_raw[0], loc_raw[0])
# Log status changes
if alert.level != RiskLevel.OK:
logger.warning(f"SPEED ALERT: {alert.message}")
audio_controller.play(alert)
reporter.update(alert)
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt")
finally:
logger.info("Shutting down...")
audio_controller.stop()
cyber.shutdown()
logger.info("AudioSentinel STOPPED")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s",
)
main()