You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
9.7 KiB

import io
import math
import os
import shutil
import struct
import subprocess
import threading
import time
import wave
import logging
from pathlib import Path
from typing import Optional
from models import RiskLevel, SpeedAlert
logger = logging.getLogger("AudioSentinel.Audio")
# Fallback audio players to try if the configured one is not available.
_FALLBACK_PLAYERS = ["aplay", "paplay", "ffplay", "gst-play-1.0"]
# Common install paths checked when PATH-based lookup fails.
_FALLBACK_PATHS = [
"/usr/bin/aplay",
"/bin/aplay",
"/usr/bin/paplay",
"/usr/bin/ffplay",
"/usr/bin/gst-play-1.0",
]
class AudioController:
def __init__(self, config):
audio_cfg = config.get("audio", {})
self._player = audio_cfg.get("player", "aplay")
self._sample_rate = audio_cfg.get("sample_rate", 22050)
self._sample_width = audio_cfg.get("sample_width", 2)
self._channels = audio_cfg.get("channels", 1)
self._volumes = audio_cfg.get("volume", {})
self._cooldown_sec = audio_cfg.get("cooldown_sec", 2.0)
self._ok_interval = audio_cfg.get("ok_periodic_interval_sec", 30.0)
self._beep_cfg = audio_cfg.get("beep", {})
self._speech_cfg = audio_cfg.get("speech", {})
self._cache_dir = Path(__file__).resolve().parent / "sounds"
self._cache_dir.mkdir(exist_ok=True)
# Cooldown tracking per level
self._last_play: dict = {}
self._last_ok_play: float = 0.0
self._repeat_thread: Optional[threading.Thread] = None
self._repeat_stop = threading.Event()
self._current_level = RiskLevel.OK
# Auto-detect a working audio player
self._player = self._detect_player(self._player)
if self._player:
logger.info(f"Audio player detected: {self._player}")
else:
path_hint = os.environ.get("PATH", "")
logger.warning(f"No audio player found (PATH={path_hint})")
self._init_wav_cache()
# ── Public API ──────────────────────────────────────────────────────
def play(self, alert: SpeedAlert):
level = alert.level
now = time.time()
# Cooldown check
last = self._last_play.get(level.value, 0.0)
if now - last < self._cooldown_sec:
return
# OK level: periodic reminder only
if level == RiskLevel.OK:
if self._ok_interval > 0 and now - self._last_ok_play >= self._ok_interval:
self._last_ok_play = now
self._do_play("ok_reminder.wav", self._volumes.get("OK", 0.0))
return
self._last_play[level.value] = now
self._current_level = level
filename = f"{level.value.lower()}.wav"
vol = self._volumes.get(level.value, 1.0)
self._do_play(filename, vol)
# Repeated playback for ERROR/FATAL
if level in (RiskLevel.ERROR, RiskLevel.FATAL):
self._start_repeat(level)
def stop(self):
self._repeat_stop.set()
if self._repeat_thread and self._repeat_thread.is_alive():
self._repeat_thread.join(timeout=1.0)
# Kill any lingering aplay processes
try:
subprocess.run(
["pkill", "-f", self._player],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
except Exception:
pass
# ── WAV generation ──────────────────────────────────────────────────
def _init_wav_cache(self):
self._generate("ok_reminder.wav", self._gen_ok_reminder)
self._generate("warn.wav", lambda: self._gen_level_sequence(RiskLevel.WARN))
self._generate("error.wav", lambda: self._gen_level_sequence(RiskLevel.ERROR))
self._generate("fatal.wav", lambda: self._gen_level_sequence(RiskLevel.FATAL))
def _generate(self, name: str, gen_fn):
path = self._cache_dir / name
if path.exists():
return
try:
pcm = gen_fn()
self._write_wav(path, pcm)
logger.info(f"Generated WAV: {path}")
except Exception as e:
logger.error(f"Failed to generate {name}: {e}")
def _write_wav(self, path: Path, pcm_data: bytes):
with wave.open(str(path), "wb") as wf:
wf.setnchannels(self._channels)
wf.setsampwidth(self._sample_width)
wf.setframerate(self._sample_rate)
wf.writeframes(pcm_data)
def _gen_sine(self, freq_hz: float, duration_ms: float,
volume: float = 1.0, ramp_ms: int = 10) -> bytes:
n_samples = int(self._sample_rate * duration_ms / 1000)
ramp_samples = int(self._sample_rate * ramp_ms / 1000)
samples = []
for i in range(n_samples):
t = i / self._sample_rate
val = math.sin(2 * math.pi * freq_hz * t) * volume
# Apply linear ramp
if i < ramp_samples:
val *= i / ramp_samples
elif i > n_samples - ramp_samples:
val *= (n_samples - i) / ramp_samples
# Clamp and convert to 16-bit
val = max(-1.0, min(1.0, val))
samples.append(struct.pack("<h", int(val * 32767)))
return b"".join(samples)
def _gen_silence(self, duration_ms: float) -> bytes:
n_samples = int(self._sample_rate * duration_ms / 1000)
return b"\x00\x00" * n_samples
def _gen_tone_pair(self, freq1: float, dur1_ms: float,
freq2: float, dur2_ms: float,
volume: float = 1.0) -> bytes:
return (self._gen_sine(freq1, dur1_ms, volume) +
self._gen_sine(freq2, dur2_ms, volume))
def _gen_ok_reminder(self) -> bytes:
vol = self._volumes.get("OK", 0.0) or 0.1
beep = self._gen_sine(440, 60, vol)
gap = self._gen_silence(50)
return beep + gap + beep
def _gen_level_sequence(self, level: RiskLevel) -> bytes:
cfg = self._speech_cfg.get(level.value, {})
beeps_before = cfg.get("beeps_before", 0)
beep_freq = self._beep_cfg.get("frequency_hz", 880)
beep_dur = self._beep_cfg.get("duration_ms", 150)
vol = self._volumes.get(level.value, 1.0)
speech_vol = max(0.3, vol * 0.7)
data = b""
# Beeps
for i in range(beeps_before):
data += self._gen_sine(beep_freq, beep_dur, vol)
if i < beeps_before - 1:
data += self._gen_silence(50)
if beeps_before > 0:
data += self._gen_silence(100)
# Speech-like tone pattern
speech_pattern = self._speech_pattern(cfg.get("text", ""), speech_vol)
data += speech_pattern
return data
def _speech_pattern(self, text: str, volume: float) -> bytes:
patterns = {
"注意超速": self._gen_tone_pair(660, 150, 880, 200, volume),
"超速危险": self._gen_tone_pair(880, 200, 660, 250, volume),
"严重超速": (
self._gen_sine(1047, 100, volume) +
self._gen_sine(784, 80, volume) +
self._gen_sine(1047, 100, volume) +
self._gen_sine(784, 80, volume) +
self._gen_sine(1047, 200, volume)
),
}
return patterns.get(text, self._gen_sine(880, 300, volume))
# ── Playback ────────────────────────────────────────────────────────
def _detect_player(self, preferred: str) -> Optional[str]:
"""Find a working audio player and return its full path."""
candidates = [preferred] + [p for p in _FALLBACK_PLAYERS if p != preferred]
for exe in candidates:
path = shutil.which(exe)
if path:
return path
# Fallback: check absolute paths directly (catches restricted PATH
# environments e.g. inside Apollo Docker containers).
for fp in _FALLBACK_PATHS:
if os.path.isfile(fp) and os.access(fp, os.X_OK):
return fp
return None
def _do_play(self, filename: str, volume: float):
if not self._player:
return
path = self._cache_dir / filename
if not path.exists():
logger.warning(f"WAV not found: {path}")
return
try:
subprocess.Popen(
[self._player, str(path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception as e:
logger.warning(f"Playback failed: {e}")
def _start_repeat(self, level: RiskLevel):
if (self._repeat_thread and self._repeat_thread.is_alive()
and self._current_level == level):
return
self._repeat_stop.set()
if self._repeat_thread and self._repeat_thread.is_alive():
self._repeat_thread.join(timeout=1.0)
self._repeat_stop.clear()
self._repeat_thread = threading.Thread(
target=self._repeat_loop,
args=(level,),
daemon=True,
)
self._repeat_thread.start()
def _repeat_loop(self, level: RiskLevel):
cfg = self._speech_cfg.get(level.value, {})
interval = cfg.get("repeat_interval_sec", 5.0)
filename = f"{level.value.lower()}.wav"
vol = self._volumes.get(level.value, 1.0)
while not self._repeat_stop.is_set():
if self._current_level != level:
break
time.sleep(interval)
if self._current_level == level and not self._repeat_stop.is_set():
self._do_play(filename, vol)