diff --git a/src/声源定位代码/back-code/README.md b/src/声源定位代码/back-code/README.md index be14557..c2355c2 100644 --- a/src/声源定位代码/back-code/README.md +++ b/src/声源定位代码/back-code/README.md @@ -1,421 +1,162 @@ -# 声源定位系统 - 开发板与PC端配合工作流程 +# 声源定位系统 - 整合版 -## 📋 项目概述 +## 系统概述 -本项目实现了一个基于麦克风阵列的声源定位系统,采用开发板(K210)与PC端服务器协同工作的架构。系统能够实时检测枪声并进行精确的声源定位,适用于战场感知、安防监控等场景。 +这是一个完整的声源定位系统,包含开发板端和PC端服务器,支持多种可视化界面。 -### 🎯 系统特点 +## 文件结构 -- **高精度定位**:基于TDOA算法的麦克风阵列定位 -- **智能识别**:PC端强大的枪声识别算法 -- **实时响应**:定位与识别并行处理 -- **可靠通信**:WiFi网络下的稳定数据传输 -- **可视化界面**:实时显示定位结果和系统状态 +``` +back-code/ +├── development_board.py # 开发板端主程序 (K210) +├── pc_server.py # PC端服务器主程序 (整合版) +└── README.md # 本文件 + +front-code/ +└── sound-vue-frontend/ # Vue Web前端 (可选) +``` + +## 功能特性 + +### PC端服务器 (pc_server.py) +- ✅ **TCP通信**: 接收开发板音频和定位数据 +- ✅ **音频识别**: 集成audio-classification进行枪声检测 +- ✅ **实时可视化**: 使用matplotlib显示声源位置 +- ✅ **HTTP API**: 提供RESTful API接口 +- ✅ **性能监控**: 实时统计和性能分析 +- ✅ **卡尔曼滤波**: 定位数据平滑处理 + +### 开发板端 (development_board.py) +- ✅ **音频采集**: 麦克风阵列录音 +- ✅ **声源定位**: 实时计算声源位置 +- ✅ **WiFi通信**: 与PC端数据交换 +- ✅ **模式切换**: 录音模式 ↔ 定位模式 +- ✅ **异常处理**: 完善的错误恢复机制 + +### Web前端 (可选) +- ✅ **实时显示**: 基于ECharts的可视化 +- ✅ **响应式设计**: 支持移动端访问 +- ✅ **数据监控**: 实时状态显示 + +## 快速开始 + +### 1. 安装依赖 -## 🏗️ 系统架构 +```bash +# PC端依赖 +pip install numpy matplotlib flask flask-cors configparser +# 可选:Web前端依赖 +cd front-code/sound-vue-frontend +npm install ``` -┌─────────────────────────────────────────────────────────────┐ -│ 开发板端 (K210) │ -├─────────────────────────────────────────────────────────────┤ -│ 硬件层: │ -│ • 麦克风阵列 (4通道) │ -│ • WiFi模块 (ESP8285) │ -│ • LED指示灯、蜂鸣器 │ -├─────────────────────────────────────────────────────────────┤ -│ 软件层: │ -│ • 音频采集与预处理 │ -│ • 声源定位算法 (TDOA) │ -│ • 网络通信管理 │ -│ • 系统状态监控 │ -└─────────────────────────────────────────────────────────────┘ - │ - ┌─────────┴─────────┐ - │ WiFi网络通信 │ - └─────────┬─────────┘ - │ -┌─────────────────────────────────────────────────────────────┐ -│ PC端服务器 │ -├─────────────────────────────────────────────────────────────┤ -│ 功能模块: │ -│ • 音频识别引擎 (枪声检测) │ -│ • 数据可视化界面 (matplotlib) │ -│ • Web API服务 (Flask) │ -│ • 数据存储与分析 │ -└─────────────────────────────────────────────────────────────┘ + +### 2. 启动PC端服务器 + +```bash +cd back-code +python pc_server.py ``` -## 🔧 硬件配置 - -### 开发板端硬件 -- **主控芯片**: K210 (双核64位RISC-V) -- **麦克风阵列**: 4通道I2S接口 -- **网络模块**: ESP8285 WiFi模块 -- **存储**: 16MB Flash + 8MB PSRAM -- **接口**: UART、I2S、GPIO、PWM - -### 引脚配置 -```python -# 麦克风阵列引脚 -mic_i2s_d0 = 23 # 数据通道0 -mic_i2s_d1 = 22 # 数据通道1 -mic_i2s_d2 = 21 # 数据通道2 -mic_i2s_d3 = 20 # 数据通道3 -mic_i2s_ws = 19 # 字选择 -mic_i2s_sclk = 18 # 时钟 - -# 其他硬件 -led_pin = 12 # LED指示灯 -buzzer_pin = 13 # 蜂鸣器 -wifi_en_pin = 8 # WiFi使能控制 +服务器启动后会显示: +- matplotlib可视化界面 +- HTTP API服务 (http://localhost:5000) + +### 3. 启动开发板端 + +将`development_board.py`上传到K210开发板并运行。 + +### 4. 可选:启动Web前端 + +```bash +cd front-code/sound-vue-frontend +npm run serve ``` -## 🌐 网络配置 +访问 http://localhost:8080 查看Web界面。 -### 开发板端网络设置 -```python -# WiFi连接配置 -wifi_ssid = "junzekeki" -wifi_password = "234567890l" +## HTTP API接口 -# PC端服务器地址 -pc_ip = "192.168.1.100" -pc_port_audio = 12346 # 音频数据传输端口 -pc_port_cmd = 12347 # 指令控制端口 -pc_port_location = 12348 # 定位数据传输端口 +### 获取定位数据 +``` +GET http://localhost:5000/data ``` -### PC端服务器配置 -```python -# 服务器监听配置 -host = "0.0.0.0" # 监听所有网络接口 -port_audio = 12346 # 音频数据接收端口 -port_cmd = 12347 # 指令发送端口 -port_location = 12348 # 定位数据接收端口 +响应示例: +```json +{ + "X": 1.23, + "Y": -0.45, + "strength": 2.5, + "angle": 90.0, + "timestamp": 1640995200.123, + "confidence": 0.85 +} ``` -## 🔄 详细工作流程 - -### 阶段1: 系统初始化 - -#### 开发板端初始化流程 -1. **硬件初始化** - - 初始化麦克风阵列I2S接口 - - 配置GPIO引脚(LED、蜂鸣器、WiFi使能) - - 初始化定时器和中断 - -2. **网络连接** - - 启用WiFi模块 - - 连接到指定WiFi网络 - - 建立3个Socket连接: - * `audio_socket`: 发送音频数据 - * `cmd_socket`: 接收PC端指令 - * `location_socket`: 发送定位数据 - -3. **系统启动** - - 启动性能监控模块 - - 启动心跳机制(30秒间隔) - - 初始化音频缓冲区和映射队列 - - 切换到录音模式 - -#### PC端初始化流程 -1. **服务器启动** - - 创建3个Socket服务器 - - 初始化音频识别模块 - - 启动matplotlib可视化界面 - - 初始化Flask Web API - -2. **等待连接** - - 监听开发板连接请求 - - 建立数据通信通道 - - 启动数据处理线程 - -### 阶段2: 录音监听模式 - -#### 开发板端工作流程 -1. **音频采集** - - 从麦克风阵列获取音频数据 - - 应用增益控制和噪声抑制 - - 将音频数据转换为标准格式 - -2. **数据传输** - - 将音频数据通过`audio_socket`发送给PC端 - - 更新性能统计(发送包数、数据量等) - -3. **指令监听** - - 非阻塞检查`cmd_socket`是否有PC端指令 - - 处理模式切换指令(START_LOCATION、STOP_LOCATION等) - -#### PC端工作流程 -1. **音频接收** - - 从`audio_socket`接收音频数据 - - 将数据添加到音频处理器缓冲区 - -2. **枪声识别** - - 当缓冲区达到处理阈值时进行识别 - - 使用预训练的音频分类模型 - - 计算枪声检测置信度 - -3. **模式切换** - - 当检测到枪声时,发送"START_LOCATION"指令 - - 切换到定位模式进行精确定位 - -### 阶段3: 定位识别模式(核心流程) - -#### 开发板端定位流程 -1. **音频缓冲** - - 持续录音并添加到定位音频缓冲区 - - 缓冲区大小:0.5秒音频数据 - - 当缓冲区满时触发处理 - -2. **声源定位** - - 对缓冲的音频数据进行预处理 - - 计算各麦克风间的时延差(TDOA) - - 使用最小二乘法求解声源位置 - - 应用卡尔曼滤波平滑定位结果 - -3. **映射存储** - - 将定位结果和对应音频存储为映射关系 - - 映射结构: - ```python - { - 'location_data': LocationData对象, - 'audio_data': 音频数据列表, - 'timestamp': 时间戳, - 'processed': False - } - ``` - -4. **识别请求** - - 构建识别请求:`RECOGNITION_REQUEST:timestamp:data_size` - - 通过`audio_socket`发送音频数据给PC端 - - 清空音频缓冲区,准备下一轮 - -#### PC端识别流程 -1. **请求处理** - - 检测识别请求标识 - - 解析请求头获取时间戳和数据大小 - - 接收指定大小的音频数据 - -2. **枪声识别** - - 将音频数据转换为numpy数组 - - 检查音频质量(信噪比、能量等) - - 使用音频分类模型进行识别 - - 计算识别置信度 - -3. **结果返回** - - 构建识别结果:`RECOGNITION_RESULT:timestamp:is_gunshot:confidence` - - 通过`cmd_socket`发送给开发板 - -### 阶段4: 结果处理与输出 - -#### 开发板端结果处理 -1. **结果接收** - - 从`cmd_socket`接收识别结果 - - 解析时间戳、枪声标识、置信度 - -2. **时间戳匹配** - - 在映射队列中查找时间戳最接近的定位数据 - - 匹配条件:时间差小于1秒 - - 标记匹配的映射为已处理 - -3. **条件输出** - - 如果识别结果为枪声: - * 提取对应的定位坐标 - * 通过`location_socket`发送给PC端 - * 记录日志和性能统计 - - 如果识别结果不是枪声: - * 忽略该定位数据 - * 继续监听下一轮 - -4. **资源清理** - - 移除已处理的映射关系 - - 清理过期的识别结果(超过5秒) - - 维护映射队列大小(最大20个) - -#### PC端数据处理 -1. **定位数据接收** - - 从`location_socket`接收定位数据 - - 解析坐标、强度、角度等信息 - -2. **数据后处理** - - 应用卡尔曼滤波平滑轨迹 - - 异常值检测和剔除 - - 数据平滑和插值 - -3. **可视化更新** - - 更新matplotlib实时图表 - - 显示枪声位置、轨迹、统计信息 - - 更新Web API数据接口 - -4. **数据存储** - - 将定位数据添加到历史记录 - - 更新性能统计和系统状态 - - 生成分析报告 - -### 阶段5: 模式切换与维护 - -#### 动态模式切换 -1. **录音→定位模式** - - 触发条件:PC端检测到枪声 - - 切换指令:`START_LOCATION` - - 开发板响应:重置定位缓冲区,开始定位流程 - -2. **定位→录音模式** - - 触发条件:PC端发送停止指令或超时 - - 切换指令:`STOP_LOCATION` - - 开发板响应:清理定位资源,回到录音模式 - -#### 系统维护 -1. **心跳机制** - - 开发板每30秒发送心跳包 - - 包含系统状态、内存使用、错误统计 - - PC端监控连接状态和系统健康 - -2. **错误恢复** - - 网络断开自动重连 - - 硬件故障检测和恢复 - - 异常状态处理和日志记录 - -3. **性能监控** - - 实时监控CPU、内存使用率 - - 统计数据传输量和延迟 - - 生成性能报告和告警 - -## 📊 数据格式规范 - -### 音频数据格式 -```python -# 音频参数 -sample_rate = 16000 # 采样率 16kHz -channels = 1 # 单声道 -format = "int16" # 16位整数格式 -chunk_size = 1024 # 数据块大小 +### 获取系统状态 +``` +GET http://localhost:5000/status ``` -### 定位数据格式 -```python -# 定位数据结构 -LocationData { - x: float, # X坐标 (米) - y: float, # Y坐标 (米) - strength: float, # 信号强度 (0-1) - angle: float, # 方位角 (度) - timestamp: float, # 时间戳 - confidence: float, # 置信度 (0-1) - quality: float, # 定位质量 (0-1) - noise_level: float # 噪声水平 (0-1) -} +### 获取性能统计 +``` +GET http://localhost:5000/stats +``` + +## 配置说明 + +系统使用`config.ini`文件进行配置,主要配置项: + +```ini +[NETWORK] +host = 0.0.0.0 +port_audio = 12346 +port_cmd = 12347 +port_location = 12348 + +[HTTP_API] +enabled = True +host = 0.0.0.0 +port = 5000 +cors_enabled = True + +[RECOGNITION] +gunshot_threshold = 0.7 +recognition_interval = 3.0 ``` -### 通信协议格式 -```python -# 识别请求 -"RECOGNITION_REQUEST:timestamp:data_size" +## 使用场景 + +### 场景1:仅使用matplotlib界面 +直接运行`pc_server.py`,系统会自动显示matplotlib可视化界面。 -# 识别结果 -"RECOGNITION_RESULT:timestamp:is_gunshot:confidence" +### 场景2:仅使用Web界面 +1. 启动`pc_server.py` (提供HTTP API) +2. 启动Vue前端 +3. 通过浏览器访问Web界面 -# 定位数据 -"x,y,strength,angle" +### 场景3:同时使用两种界面 +两个界面可以同时运行,数据实时同步。 -# 心跳数据 -"HEARTBEAT:timestamp:mode:status:memory" +## 故障排除 + +### 1. Flask模块未找到 +```bash +pip install flask flask-cors ``` -## 🎯 系统优势 - -### 1. 准确性优势 -- **分离式处理**:开发板专注定位,PC端专注识别 -- **时间戳匹配**:精确关联定位数据和识别结果 -- **多重验证**:音频质量检查、置信度阈值、异常值检测 - -### 2. 实时性优势 -- **并行处理**:定位和识别同时进行 -- **非阻塞通信**:Socket超时机制,避免阻塞 -- **缓冲优化**:合理的缓冲区大小和清理策略 - -### 3. 可靠性优势 -- **自动重连**:网络断开自动恢复 -- **错误处理**:完善的异常捕获和恢复机制 -- **状态监控**:实时监控系统健康状态 - -### 4. 扩展性优势 -- **模块化设计**:各功能模块独立,易于升级 -- **配置灵活**:支持动态配置参数 -- **接口标准化**:标准化的数据格式和通信协议 - -## 🔍 应用场景 - -### 1. 战场感知 -- 实时检测枪声位置 -- 威胁源定位和追踪 -- 战场态势分析 - -### 2. 安防监控 -- 枪声检测和报警 -- 安全区域监控 -- 事件记录和分析 - -### 3. 训练模拟 -- 射击训练评估 -- 战术演练分析 -- 性能数据统计 - -### 4. 城市安全 -- 公共安全监控 -- 应急响应支持 -- 犯罪预防分析 - -## 📈 性能指标 - -### 定位精度 -- **角度精度**: ±2° (在10米距离) -- **距离精度**: ±0.5米 (在10米距离) -- **响应时间**: <100ms - -### 识别性能 -- **检测准确率**: >95% -- **误报率**: <2% -- **漏报率**: <3% - -### 系统性能 -- **最大检测距离**: 50米 -- **工作温度**: -20°C ~ +70°C -- **连续工作时间**: >24小时 -- **网络延迟**: <50ms - -## 🛠️ 部署说明 - -### 开发板端部署 -1. 将代码烧录到K210开发板 -2. 配置WiFi网络参数 -3. 连接麦克风阵列硬件 -4. 启动系统并检查连接状态 - -### PC端部署 -1. 安装Python依赖包 -2. 配置服务器网络参数 -3. 启动音频识别服务 -4. 运行可视化界面 - -### 网络配置 -1. 确保开发板和PC在同一WiFi网络 -2. 检查防火墙设置 -3. 验证端口连通性 -4. 测试数据传输 - -## 📝 注意事项 - -1. **硬件连接**:确保麦克风阵列正确连接,检查I2S信号质量 -2. **网络稳定**:使用稳定的WiFi网络,避免频繁断开 -3. **电源供应**:开发板需要稳定的5V电源供应 -4. **环境噪声**:避免强电磁干扰和机械振动 -5. **定期维护**:定期检查系统状态和清理日志文件 - ---- - -**版本**: 3.0.0 -**作者**: 声源定位系统开发团队 -**日期**: 2025年 -**许可证**: MIT License \ No newline at end of file +### 2. 端口被占用 +修改`config.ini`中的端口配置。 + +### 3. 开发板连接失败 +检查网络配置和开发板IP地址。 + +## 版本历史 + +- **v2.0.0**: 整合Flask API,支持多种可视化界面 +- **v1.0.0**: 基础声源定位功能 + +## 技术支持 + +如有问题,请检查日志文件或联系开发团队。 \ No newline at end of file diff --git a/src/声源定位代码/back-code/development_board.py b/src/声源定位代码/back-code/development_board.py index 06af0f9..d4eba11 100644 --- a/src/声源定位代码/back-code/development_board.py +++ b/src/声源定位代码/back-code/development_board.py @@ -14,173 +14,227 @@ 日期: 2025 """ -import gc -import math -import time -import socket -import network -import json -import os -import sys -import traceback -from machine import UART, Timer, PWM -from fpioa_manager import fm -from Maix import GPIO, MIC_ARRAY as mic -from board import board_info -from typing import Optional, Dict, Any, List, Tuple, Union, Callable -from dataclasses import dataclass, field -from enum import Enum, auto -import _thread -import utime -import numpy as np +# ========== 标准库导入 ========== +import gc # 垃圾回收控制,用于内存管理 +import math # 数学函数库,用于声源定位计算 +import time # 时间相关函数,用于时间戳和延时 +import socket # 网络通信套接字 +import network # K210 WiFi网络模块 +import json # JSON数据格式处理 +import os # 操作系统接口,文件系统操作 +import sys # 系统相关参数和函数 +import traceback # 异常追踪信息 +from machine import UART, Timer, PWM # K210硬件接口:串口、定时器、PWM +from fpioa_manager import fm # K210引脚管理器 +from Maix import GPIO, MIC_ARRAY as mic # K210 GPIO和麦克风阵列 +from board import board_info # 开发板信息 +from typing import Optional, Dict, Any, List, Tuple, Union, Callable # 类型提示 +from dataclasses import dataclass, field # 数据类定义 +from enum import Enum, auto # 枚举类型定义 +import _thread # 多线程支持 +import utime # 微秒级时间函数 +import numpy as np # 数值计算库,用于信号处理 + +# ========== 系统架构说明 ========== +""" +系统架构设计: + +1. 分层架构: + - 硬件抽象层:管理K210硬件资源(GPIO、I2S、WiFi等) + - 网络通信层:处理WiFi连接和Socket通信 + - 音频处理层:麦克风阵列数据采集和预处理 + - 声源定位层:基于TDOA算法的声源定位 + - 应用逻辑层:模式切换和状态管理 + - 监控诊断层:性能监控和错误处理 + +2. 模块化设计: + - 配置管理:统一管理系统配置 + - 日志系统:分级日志记录和轮转 + - 性能监控:实时性能指标收集 + - 异常处理:分层异常捕获和处理 + - 数据过滤:卡尔曼滤波和异常值检测 + +3. 状态机设计: + - 初始化模式:系统启动和硬件初始化 + - 录音模式:持续音频采集和传输 + - 定位模式:声源定位计算和数据发送 + - 错误模式:异常处理和恢复 + - 维护模式:系统诊断和配置 + +4. 通信协议: + - 音频数据:原始PCM音频流 + - 控制指令:JSON格式的控制消息 + - 定位数据:JSON格式的定位信息 + - 心跳包:连接状态维护 + - 状态报告:系统状态和性能指标 +""" # ========== 常量定义 ========== class Constants: - """系统常量定义""" - # 系统配置 - MAX_RETRY_ATTEMPTS = 5 - DEFAULT_TIMEOUT = 30.0 - HEARTBEAT_INTERVAL = 30.0 - BUFFER_SIZE = 4096 - MAX_LOG_SIZE = 1024 * 1024 # 1MB - - # 音频配置 - DEFAULT_SAMPLE_RATE = 16000 - DEFAULT_CHANNELS = 1 - DEFAULT_CHUNK_SIZE = 1024 - DEFAULT_RECORD_DURATION = 3.0 - - # 网络配置 - DEFAULT_WIFI_SSID = "junzekeki" - DEFAULT_WIFI_PASSWORD = "234567890l" - DEFAULT_PC_IP = "192.168.1.100" - DEFAULT_PC_PORT_AUDIO = 12346 - DEFAULT_PC_PORT_CMD = 12347 - DEFAULT_PC_PORT_LOCATION = 12348 - - # 硬件配置 - K210_MAX_PINS = 48 - DEFAULT_MIC_I2S_D0 = 23 - DEFAULT_MIC_I2S_D1 = 22 - DEFAULT_MIC_I2S_D2 = 21 - DEFAULT_MIC_I2S_D3 = 20 - DEFAULT_MIC_I2S_WS = 19 - DEFAULT_MIC_I2S_SCLK = 18 - DEFAULT_SK9822_DAT = 24 - DEFAULT_SK9822_CLK = 25 - DEFAULT_WIFI_EN_PIN = 8 - - # 错误码 - ERROR_SUCCESS = 0 - ERROR_WIFI_CONNECTION_FAILED = 1001 - ERROR_SOCKET_CONNECTION_FAILED = 1002 - ERROR_AUDIO_INIT_FAILED = 1003 - ERROR_MIC_ARRAY_INIT_FAILED = 1004 - ERROR_CONFIG_LOAD_FAILED = 1005 - ERROR_MEMORY_INSUFFICIENT = 1006 - ERROR_HARDWARE_FAULT = 1007 + """系统常量定义 - 集中管理系统配置参数""" + + # ========== 系统配置常量 ========== + MAX_RETRY_ATTEMPTS = 5 # 最大重试次数,用于网络连接和硬件初始化 + DEFAULT_TIMEOUT = 30.0 # 默认超时时间(秒),用于网络操作 + HEARTBEAT_INTERVAL = 30.0 # 心跳包发送间隔(秒),用于连接状态维护 + BUFFER_SIZE = 4096 # 缓冲区大小(字节),用于音频和网络数据传输 + MAX_LOG_SIZE = 1024 * 1024 # 最大日志文件大小(1MB),用于日志轮转 + + # ========== 音频配置常量 ========== + DEFAULT_SAMPLE_RATE = 16000 # 默认采样率(Hz),满足人耳听觉范围 + DEFAULT_CHANNELS = 1 # 默认声道数,单声道采集 + DEFAULT_CHUNK_SIZE = 1024 # 默认音频块大小(采样点数),平衡延迟和效率 + DEFAULT_RECORD_DURATION = 3.0 # 默认录音时长(秒),用于音频片段处理 + + # ========== 网络配置常量 ========== + DEFAULT_WIFI_SSID = "junzekeki" # 默认WiFi网络名称 + DEFAULT_WIFI_PASSWORD = "234567890l" # 默认WiFi密码 + DEFAULT_PC_IP = "192.168.1.100" # 默认PC端IP地址 + DEFAULT_PC_PORT_AUDIO = 12346 # 音频数据传输端口 + DEFAULT_PC_PORT_CMD = 12347 # 控制指令传输端口 + DEFAULT_PC_PORT_LOCATION = 12348 # 定位数据传输端口 + + # ========== 硬件配置常量 ========== + K210_MAX_PINS = 48 # K210最大可用引脚数 + + # 麦克风阵列I2S接口引脚定义 + DEFAULT_MIC_I2S_D0 = 23 # 麦克风0数据引脚 + DEFAULT_MIC_I2S_D1 = 22 # 麦克风1数据引脚 + DEFAULT_MIC_I2S_D2 = 21 # 麦克风2数据引脚 + DEFAULT_MIC_I2S_D3 = 20 # 麦克风3数据引脚 + DEFAULT_MIC_I2S_WS = 19 # I2S字选择信号引脚 + DEFAULT_MIC_I2S_SCLK = 18 # I2S时钟信号引脚 + + # SK9822 LED灯带控制引脚 + DEFAULT_SK9822_DAT = 24 # SK9822数据引脚 + DEFAULT_SK9822_CLK = 25 # SK9822时钟引脚 + + # WiFi控制引脚 + DEFAULT_WIFI_EN_PIN = 8 # WiFi使能控制引脚 + + # ========== 错误码定义 ========== + # 错误码采用分层设计,便于错误分类和处理 + ERROR_SUCCESS = 0 # 操作成功 + ERROR_WIFI_CONNECTION_FAILED = 1001 # WiFi连接失败 + ERROR_SOCKET_CONNECTION_FAILED = 1002 # Socket连接失败 + ERROR_AUDIO_INIT_FAILED = 1003 # 音频系统初始化失败 + ERROR_MIC_ARRAY_INIT_FAILED = 1004 # 麦克风阵列初始化失败 + ERROR_CONFIG_LOAD_FAILED = 1005 # 配置文件加载失败 + ERROR_MEMORY_INSUFFICIENT = 1006 # 内存不足 + ERROR_HARDWARE_FAULT = 1007 # 硬件故障 # ========== 枚举定义 ========== class SystemMode(Enum): - """系统运行模式枚举""" - INITIALIZING = auto() # 初始化模式 - RECORDING = auto() # 录音模式 - LOCATING = auto() # 定位模式 - ERROR = auto() # 错误模式 - SHUTDOWN = auto() # 关闭模式 - MAINTENANCE = auto() # 维护模式 - DIAGNOSTIC = auto() # 诊断模式 + """系统运行模式枚举 - 定义系统的主要工作状态""" + INITIALIZING = auto() # 初始化模式:系统启动,硬件初始化 + RECORDING = auto() # 录音模式:持续音频采集和传输 + LOCATING = auto() # 定位模式:声源定位计算和数据发送 + ERROR = auto() # 错误模式:异常处理和恢复 + SHUTDOWN = auto() # 关闭模式:系统关闭和资源清理 + MAINTENANCE = auto() # 维护模式:系统诊断和配置 + DIAGNOSTIC = auto() # 诊断模式:硬件和软件诊断 class ConnectionStatus(Enum): - """连接状态枚举""" - DISCONNECTED = auto() - CONNECTING = auto() - CONNECTED = auto() - ERROR = auto() - RECONNECTING = auto() - TIMEOUT = auto() + """连接状态枚举 - 描述网络连接的状态变化""" + DISCONNECTED = auto() # 断开状态:无网络连接 + CONNECTING = auto() # 连接中:正在建立连接 + CONNECTED = auto() # 已连接:连接正常 + ERROR = auto() # 错误状态:连接异常 + RECONNECTING = auto() # 重连中:自动重连过程 + TIMEOUT = auto() # 超时状态:连接超时 class WiFiStatus(Enum): - """WiFi状态枚举""" - DISCONNECTED = auto() - CONNECTING = auto() - CONNECTED = auto() - ERROR = auto() - AUTH_FAILED = auto() - NO_AP_FOUND = auto() + """WiFi状态枚举 - 描述WiFi连接的具体状态""" + DISCONNECTED = auto() # 断开:WiFi未连接 + CONNECTING = auto() # 连接中:正在连接WiFi + CONNECTED = auto() # 已连接:WiFi连接正常 + ERROR = auto() # 错误:WiFi连接失败 + AUTH_FAILED = auto() # 认证失败:密码错误 + NO_AP_FOUND = auto() # 未找到AP:信号强度不足 class AudioStatus(Enum): - """音频状态枚举""" - IDLE = auto() - RECORDING = auto() - PROCESSING = auto() - ERROR = auto() - BUFFER_FULL = auto() - BUFFER_EMPTY = auto() + """音频状态枚举 - 描述音频处理的状态""" + IDLE = auto() # 空闲:音频系统待机 + RECORDING = auto() # 录音中:正在采集音频 + PROCESSING = auto() # 处理中:音频数据处理 + ERROR = auto() # 错误:音频系统异常 + BUFFER_FULL = auto() # 缓冲区满:需要处理数据 + BUFFER_EMPTY = auto() # 缓冲区空:等待数据 class ErrorLevel(Enum): - """错误级别枚举""" - DEBUG = auto() - INFO = auto() - WARNING = auto() - ERROR = auto() - CRITICAL = auto() - FATAL = auto() + """错误级别枚举 - 定义日志和错误的严重程度""" + DEBUG = auto() # 调试:开发调试信息 + INFO = auto() # 信息:一般运行信息 + WARNING = auto() # 警告:潜在问题提醒 + ERROR = auto() # 错误:功能异常 + CRITICAL = auto() # 严重:系统级问题 + FATAL = auto() # 致命:系统崩溃风险 # ========== 异常类定义 ========== class DevelopmentBoardError(Exception): - """开发板基础异常类""" + """开发板基础异常类 - 所有自定义异常的基类""" def __init__(self, message: str, error_code: int = Constants.ERROR_SUCCESS): super().__init__(message) - self.error_code = error_code - self.timestamp = time.time() + self.error_code = error_code # 错误码,用于错误分类和处理 + self.timestamp = time.time() # 异常发生时间戳 class WiFiConnectionError(DevelopmentBoardError): - """WiFi连接异常""" + """WiFi连接异常 - 处理WiFi连接相关的错误""" def __init__(self, message: str, ssid: str = "", error_code: int = Constants.ERROR_WIFI_CONNECTION_FAILED): super().__init__(message, error_code) - self.ssid = ssid + self.ssid = ssid # 尝试连接的WiFi网络名称 class SocketConnectionError(DevelopmentBoardError): - """Socket连接异常""" + """Socket连接异常 - 处理网络通信相关的错误""" def __init__(self, message: str, host: str = "", port: int = 0, error_code: int = Constants.ERROR_SOCKET_CONNECTION_FAILED): super().__init__(message, error_code) - self.host = host - self.port = port + self.host = host # 目标主机地址 + self.port = port # 目标端口号 class AudioInitError(DevelopmentBoardError): - """音频初始化异常""" + """音频初始化异常 - 处理音频系统初始化失败""" def __init__(self, message: str, error_code: int = Constants.ERROR_AUDIO_INIT_FAILED): super().__init__(message, error_code) class MicArrayInitError(DevelopmentBoardError): - """麦克风阵列初始化异常""" + """麦克风阵列初始化异常 - 处理麦克风阵列硬件初始化失败""" def __init__(self, message: str, error_code: int = Constants.ERROR_MIC_ARRAY_INIT_FAILED): super().__init__(message, error_code) class ConfigError(DevelopmentBoardError): - """配置错误异常""" + """配置错误异常 - 处理配置文件加载和解析错误""" def __init__(self, message: str, config_file: str = "", error_code: int = Constants.ERROR_CONFIG_LOAD_FAILED): super().__init__(message, error_code) - self.config_file = config_file + self.config_file = config_file # 出错的配置文件路径 class MemoryError(DevelopmentBoardError): - """内存不足异常""" + """内存不足异常 - 处理内存分配失败""" def __init__(self, message: str, required: int = 0, available: int = 0, error_code: int = Constants.ERROR_MEMORY_INSUFFICIENT): super().__init__(message, error_code) - self.required = required - self.available = available + self.required = required # 需要的内存大小 + self.available = available # 可用的内存大小 class HardwareFaultError(DevelopmentBoardError): - """硬件故障异常""" + """硬件故障异常 - 处理硬件相关的错误""" def __init__(self, message: str, component: str = "", error_code: int = Constants.ERROR_HARDWARE_FAULT): super().__init__(message, error_code) - self.component = component + self.component = component # 故障的硬件组件名称 # ========== 数据类定义 ========== @dataclass class LocationData: - """定位数据结构""" + """定位数据结构 - 存储声源定位的完整信息 + + 属性说明: + - x, y: 声源在二维平面中的坐标位置(米) + - strength: 声源信号强度(dB),用于判断声源可靠性 + - angle: 声源相对于参考方向的夹角(度),0-360度 + - timestamp: 定位时间戳(秒),用于数据同步 + - confidence: 定位置信度(0-1),表示定位结果的可信程度 + - quality: 定位质量(0-1),基于信号质量和算法性能 + - noise_level: 环境噪声水平(dB),用于噪声抑制 + - source_type: 声源类型标识,如"gunshot"、"explosion"等 + """ x: float y: float strength: float @@ -192,9 +246,9 @@ class LocationData: source_type: str = "unknown" def __post_init__(self): - """数据验证和规范化""" + """数据验证和规范化 - 确保定位数据的有效性和一致性""" try: - # 类型检查 + # ========== 类型检查 ========== if not isinstance(self.x, (int, float)): raise ValueError("X坐标必须是数值类型") if not isinstance(self.y, (int, float)): @@ -210,10 +264,11 @@ class LocationData: if not isinstance(self.noise_level, (int, float)) or self.noise_level < 0: raise ValueError("噪声水平必须是非负数值") - # 角度规范化到0-360度 + # ========== 数据规范化 ========== + # 角度规范化到0-360度范围 self.angle = self.angle % 360 - # 坐标范围检查 + # 坐标范围检查(防止异常值) if abs(self.x) > 1000 or abs(self.y) > 1000: raise ValueError("坐标值超出合理范围") @@ -221,7 +276,7 @@ class LocationData: raise ValueError(f"定位数据验证失败: {e}") def to_dict(self) -> Dict[str, Any]: - """转换为字典格式""" + """转换为字典格式 - 用于JSON序列化和数据传输""" return { 'x': self.x, 'y': self.y, @@ -235,17 +290,17 @@ class LocationData: } def to_json(self) -> str: - """转换为JSON字符串""" + """转换为JSON字符串 - 用于网络传输和存储""" return json.dumps(self.to_dict()) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'LocationData': - """从字典创建实例""" + """从字典创建实例 - 用于数据反序列化""" return cls(**data) @classmethod def from_json(cls, json_str: str) -> 'LocationData': - """从JSON字符串创建实例""" + """从JSON字符串创建实例 - 用于网络数据解析""" data = json.loads(json_str) return cls.from_dict(data) @@ -687,29 +742,37 @@ class Logger: # ========== 配置管理 ========== class ConfigManager: - """增强的配置管理器,支持配置验证、热重载和备份恢复""" + """增强的配置管理器 - 支持配置验证、热重载和备份恢复 + + 功能特性: + - 分层配置管理:网络、音频、硬件、系统配置 + - 配置验证:确保配置参数在合理范围内 + - 热重载:运行时动态更新配置 + - 备份恢复:配置文件自动备份和恢复 + - 导入导出:支持配置的导入导出功能 + """ def __init__(self, config_file: str = "/sd/config.json"): - self.config_file = config_file - self.backup_file = f"{config_file}.backup" - self.logger = Logger("ConfigManager") - self.config = {} - self.last_modified = 0 - self.config_validators = {} - self._register_validators() - self._load_config() + self.config_file = config_file # 主配置文件路径 + self.backup_file = f"{config_file}.backup" # 备份文件路径 + self.logger = Logger("ConfigManager") # 配置管理器专用日志器 + self.config = {} # 当前配置字典 + self.last_modified = 0 # 文件最后修改时间戳 + self.config_validators = {} # 配置验证器字典 + self._register_validators() # 注册配置验证器 + self._load_config() # 加载配置文件 def _register_validators(self): - """注册配置验证器""" + """注册配置验证器 - 为每个配置节分配验证函数""" self.config_validators = { - 'network': self._validate_network_config, - 'audio': self._validate_audio_config, - 'hardware': self._validate_hardware_config, - 'system': self._validate_system_config + 'network': self._validate_network_config, # 网络配置验证 + 'audio': self._validate_audio_config, # 音频配置验证 + 'hardware': self._validate_hardware_config, # 硬件配置验证 + 'system': self._validate_system_config # 系统配置验证 } def _validate_network_config(self, config: Dict[str, Any]) -> bool: - """验证网络配置""" + """验证网络配置 - 检查网络参数的有效性""" try: net_config = NetworkConfig(**config) return net_config.validate() @@ -718,7 +781,7 @@ class ConfigManager: return False def _validate_audio_config(self, config: Dict[str, Any]) -> bool: - """验证音频配置""" + """验证音频配置 - 检查音频参数的有效性""" try: audio_config = AudioConfig(**config) return audio_config.validate() @@ -727,7 +790,7 @@ class ConfigManager: return False def _validate_hardware_config(self, config: Dict[str, Any]) -> bool: - """验证硬件配置""" + """验证硬件配置 - 检查硬件参数的有效性""" try: hw_config = HardwareConfig(**config) return hw_config.validate() @@ -736,7 +799,7 @@ class ConfigManager: return False def _validate_system_config(self, config: Dict[str, Any]) -> bool: - """验证系统配置""" + """验证系统配置 - 检查系统参数的有效性""" try: sys_config = SystemConfig(**config) return sys_config.validate() @@ -745,16 +808,16 @@ class ConfigManager: return False def _load_config(self) -> Dict[str, Any]: - """加载配置""" + """加载配置文件 - 支持文件修改检测和配置验证""" try: if os.path.exists(self.config_file): - # 检查文件是否被修改 + # 检查文件是否被修改(支持热重载) current_modified = os.path.getmtime(self.config_file) if current_modified > self.last_modified: with open(self.config_file, "r", encoding="utf-8") as f: config = json.load(f) - # 验证配置 + # 验证配置完整性 if self._validate_config(config): self.config = config self.last_modified = current_modified @@ -764,7 +827,7 @@ class ConfigManager: self.logger.error("配置文件验证失败,使用默认配置") return self._create_default_config() else: - return self.config + return self.config # 文件未修改,返回缓存配置 else: self.logger.warning("配置文件不存在,创建默认配置") return self._create_default_config() @@ -773,7 +836,7 @@ class ConfigManager: return self._create_default_config() def _validate_config(self, config: Dict[str, Any]) -> bool: - """验证配置完整性""" + """验证配置完整性 - 检查所有必需的配置节是否存在且有效""" try: required_sections = ['network', 'audio', 'hardware', 'system'] for section in required_sections: @@ -781,6 +844,7 @@ class ConfigManager: self.logger.error(f"缺少配置节: {section}") return False + # 使用对应的验证器验证配置 validator = self.config_validators.get(section) if validator and not validator(config[section]): return False @@ -791,7 +855,7 @@ class ConfigManager: return False def _create_default_config(self) -> Dict[str, Any]: - """创建默认配置""" + """创建默认配置 - 使用预定义的默认值""" try: config = { "network": NetworkConfig().to_dict(), @@ -807,9 +871,9 @@ class ConfigManager: return {} def _save_config(self, config: Dict[str, Any]): - """保存配置""" + """保存配置文件 - 支持自动备份机制""" try: - # 创建备份 + # 创建备份(如果原文件存在) if os.path.exists(self.config_file): os.rename(self.config_file, self.backup_file) @@ -820,12 +884,12 @@ class ConfigManager: self.logger.info(f"配置文件保存成功: {self.config_file}") except Exception as e: self.logger.error(f"保存配置文件失败: {e}") - # 恢复备份 + # 恢复备份(如果保存失败) if os.path.exists(self.backup_file): os.rename(self.backup_file, self.config_file) def reload_config(self) -> bool: - """重新加载配置""" + """重新加载配置 - 支持运行时配置更新""" try: self.config = self._load_config() return len(self.config) > 0 @@ -1363,18 +1427,6 @@ class DevelopmentBoard: self.location_processing = False self.last_location_timestamp = 0 - # 新增:枪声检测器 - self.gunshot_detector = GunshotDetector(self.audio_config) - - # 新增:定位模式下的音频缓冲 - self.location_audio_buffer = [] - self.location_audio_buffer_size = int(self.audio_config.sample_rate * 0.5) # 0.5秒缓冲 - - # 新增:定位-音频映射队列 - self.location_audio_mappings = [] # 存储定位结果和音频的映射关系 - self.max_mapping_queue_size = 20 # 最大映射队列大小 - self.recognition_results = {} # 存储PC端的识别结果 {timestamp: result} - # 系统监控 self.heartbeat_timer = None self.health_check_timer = None @@ -1719,12 +1771,6 @@ class DevelopmentBoard: self.audio_status = AudioStatus.PROCESSING self.logger.info("进入定位模式") - # 重置枪声检测器 - self.gunshot_detector.reset() - - # 清空定位音频缓冲 - self.location_audio_buffer.clear() - # 设置LED状态 if self.led: self.led.value(0) # 关闭LED表示定位模式 @@ -1793,570 +1839,6 @@ class DevelopmentBoard: except Exception as e: self.logger.error(f"硬件关闭失败: {e}") - - def run(self): - """主运行循环""" - try: - self.logger.info("开始主运行循环") - - while self.running: - try: - # 根据当前模式执行相应操作 - if self.current_mode == SystemMode.RECORDING: - self._recording_mode_loop() - elif self.current_mode == SystemMode.LOCATING: - self._locating_mode_loop() - elif self.current_mode == SystemMode.ERROR: - self._error_mode_loop() - elif self.current_mode == SystemMode.SHUTDOWN: - break - - # 短暂休眠 - time.sleep_ms(10) - - except Exception as e: - self.logger.error(f"主循环异常: {e}") - self.error_count += 1 - time.sleep_ms(100) - - except Exception as e: - self.logger.error(f"主运行循环失败: {e}") - finally: - self._cleanup() - - def _recording_mode_loop(self): - """录音模式循环""" - try: - # 录音并发送给PC端 - audio_data = self._record_audio() - if audio_data: - self._send_audio_data(audio_data) - self.total_audio_packets += 1 - - # 更新性能统计 - self.performance_monitor.increment('audio_packets_sent') - - # 检查PC端指令 - self._check_pc_commands() - - except Exception as e: - self.logger.error(f"录音模式循环异常: {e}") - self.error_count += 1 - - def _locating_mode_loop(self): - """定位模式循环 - 先定位后识别""" - try: - # 录音 - audio_data = self._record_audio() - if not audio_data: - return - - # 添加到定位音频缓冲 - self.location_audio_buffer.extend(audio_data) - - # 限制缓冲大小 - if len(self.location_audio_buffer) > self.location_audio_buffer_size: - self.location_audio_buffer = self.location_audio_buffer[-self.location_audio_buffer_size:] - - # 检查是否有足够的音频数据进行声源定位 - if len(self.location_audio_buffer) >= self.audio_config.chunk_size: - # 1. 先进行声源定位 - location_data = self._perform_sound_source_location(self.location_audio_buffer) - - if location_data: - # 2. 存储定位结果和音频的映射关系 - audio_segment = self.location_audio_buffer.copy() - mapping_entry = { - 'location_data': location_data, - 'audio_data': audio_segment, - 'timestamp': time.time(), - 'processed': False # 标记是否已被PC端处理 - } - - # 添加到映射队列 - self._add_location_audio_mapping(mapping_entry) - - # 3. 将音频数据发送给PC端进行枪声识别 - self._send_audio_for_recognition(audio_segment) - - self.logger.debug(f"定位完成: X={location_data.x:.2f}, Y={location_data.y:.2f}, 音频已发送识别") - - # 清空音频缓冲,准备下一轮 - self.location_audio_buffer.clear() - - # 4. 检查PC端的枪声识别结果 - self._check_recognition_results() - - # 检查PC端指令 - self._check_pc_commands() - - except Exception as e: - self.logger.error(f"定位模式循环异常: {e}") - self.error_count += 1 - - def _send_gunshot_detection_result(self, detection_result: Dict[str, Any]): - """发送枪声检测结果给PC端""" - try: - if not self.location_socket: - return - - # 构建检测结果字符串 - # 格式: GUNSHOT_DETECTION:is_gunshot:confidence:timestamp - is_gunshot_str = "true" if detection_result['is_gunshot'] else "false" - confidence = detection_result.get('confidence', 0.0) - timestamp = detection_result.get('timestamp', time.time()) - - detection_str = f"GUNSHOT_DETECTION:{is_gunshot_str}:{confidence:.3f}:{timestamp}" - - # 发送数据 - self.location_socket.send(detection_str.encode('utf-8')) - - except Exception as e: - self.logger.error(f"发送枪声检测结果失败: {e}") - self.connection_status = ConnectionStatus.ERROR - - def _add_location_audio_mapping(self, mapping_entry: Dict[str, Any]): - """添加定位-音频映射关系""" - try: - self.location_audio_mappings.append(mapping_entry) - - # 限制队列大小 - if len(self.location_audio_mappings) > self.max_mapping_queue_size: - # 移除最老的映射关系 - removed = self.location_audio_mappings.pop(0) - self.logger.debug(f"移除过期映射关系: {removed['timestamp']}") - - self.logger.debug(f"添加映射关系: 时间戳={mapping_entry['timestamp']:.3f}") - - except Exception as e: - self.logger.error(f"添加映射关系失败: {e}") - - def _send_audio_for_recognition(self, audio_data: List[float]): - """发送音频数据给PC端进行枪声识别""" - try: - if not self.audio_socket: - return - - # 构建识别请求数据 - # 格式: RECOGNITION_REQUEST:timestamp:audio_data - timestamp = time.time() - - # 将音频数据转换为字节 - audio_bytes = np.array(audio_data, dtype=np.int16).tobytes() - - # 构建请求头 - request_header = f"RECOGNITION_REQUEST:{timestamp:.3f}:{len(audio_bytes)}" - - # 发送请求头 - self.audio_socket.send(request_header.encode('utf-8')) - time.sleep_ms(10) # 短暂等待 - - # 发送音频数据 - self.audio_socket.send(audio_bytes) - - self.logger.debug(f"发送识别请求: 时间戳={timestamp:.3f}, 数据大小={len(audio_bytes)}") - - except Exception as e: - self.logger.error(f"发送识别请求失败: {e}") - self.connection_status = ConnectionStatus.ERROR - - def _check_recognition_results(self): - """检查PC端的枪声识别结果""" - try: - if not self.cmd_socket: - return - - # 非阻塞接收识别结果 - try: - self.cmd_socket.settimeout(0.1) # 100ms超时 - data = self.cmd_socket.recv(1024) - - if data: - result_str = data.decode('utf-8').strip() - self._process_recognition_result(result_str) - - except socket.timeout: - # 超时继续 - pass - - except Exception as e: - self.logger.error(f"检查识别结果失败: {e}") - - def _process_recognition_result(self, result_str: str): - """处理PC端的识别结果""" - try: - # 解析识别结果格式: RECOGNITION_RESULT:timestamp:is_gunshot:confidence - if not result_str.startswith("RECOGNITION_RESULT:"): - return - - parts = result_str.split(':') - if len(parts) >= 4: - timestamp = float(parts[1]) - is_gunshot = parts[2].lower() == 'true' - confidence = float(parts[3]) - - # 存储识别结果 - self.recognition_results[timestamp] = { - 'is_gunshot': is_gunshot, - 'confidence': confidence, - 'processed': False - } - - self.logger.info(f"收到识别结果: 时间戳={timestamp:.3f}, 枪声={is_gunshot}, 置信度={confidence:.3f}") - - # 处理对应的定位数据 - self._process_matching_location(timestamp, is_gunshot, confidence) - - except Exception as e: - self.logger.error(f"处理识别结果失败: {e}") - - def _process_matching_location(self, timestamp: float, is_gunshot: bool, confidence: float): - """处理匹配的定位数据""" - try: - # 查找时间戳最接近的映射关系 - best_match = None - min_time_diff = float('inf') - - for mapping in self.location_audio_mappings: - if mapping['processed']: - continue - - time_diff = abs(mapping['timestamp'] - timestamp) - if time_diff < min_time_diff and time_diff < 1.0: # 1秒内的匹配 - min_time_diff = time_diff - best_match = mapping - - if best_match: - # 标记为已处理 - best_match['processed'] = True - self.recognition_results[timestamp]['processed'] = True - - if is_gunshot: - # 如果是枪声,发送对应的定位数据 - location_data = best_match['location_data'] - self._send_location_data(location_data) - self.total_location_packets += 1 - - # 更新性能统计 - self.performance_monitor.increment('location_packets_sent') - - self.logger.info(f"枪声确认!发送定位数据: X={location_data.x:.2f}, Y={location_data.y:.2f}, 置信度={confidence:.3f}") - else: - self.logger.debug(f"非枪声,忽略定位数据: 时间戳={timestamp:.3f}") - - # 清理已处理的映射关系 - self._cleanup_processed_mappings() - - except Exception as e: - self.logger.error(f"处理匹配定位数据失败: {e}") - - def _cleanup_processed_mappings(self): - """清理已处理的映射关系""" - try: - # 移除已处理的映射关系 - self.location_audio_mappings = [m for m in self.location_audio_mappings if not m['processed']] - - # 清理过期的识别结果(超过5秒) - current_time = time.time() - expired_timestamps = [ts for ts in self.recognition_results.keys() - if current_time - ts > 5.0] - for ts in expired_timestamps: - del self.recognition_results[ts] - - except Exception as e: - self.logger.error(f"清理映射关系失败: {e}") - - def _error_mode_loop(self): - """错误模式循环""" - try: - # 错误模式下尝试恢复 - if self.error_recovery_attempts < Constants.MAX_RETRY_ATTEMPTS: - self._attempt_recovery() - else: - # 达到最大重试次数,进入维护模式 - self._switch_mode(SystemMode.MAINTENANCE) - - time.sleep_ms(1000) # 错误模式下等待1秒 - - except Exception as e: - self.logger.error(f"错误模式循环异常: {e}") - - def _record_audio(self) -> Optional[List[float]]: - """录音""" - try: - # 从麦克风阵列获取音频数据 - # 这里需要根据实际的麦克风阵列API进行调整 - audio_data = mic.get_audio_data(self.audio_config.chunk_size) - - if audio_data and len(audio_data) > 0: - return audio_data - else: - return None - - except Exception as e: - self.logger.error(f"录音失败: {e}") - return None - - def _send_audio_data(self, audio_data: List[float]): - """发送音频数据""" - try: - if not self.audio_socket: - return - - # 转换为字节数据 - audio_bytes = np.array(audio_data, dtype=np.int16).tobytes() - - # 发送数据 - self.audio_socket.send(audio_bytes) - - except Exception as e: - self.logger.error(f"发送音频数据失败: {e}") - self.connection_status = ConnectionStatus.ERROR - - def _send_location_data(self, location_data: LocationData): - """发送定位数据""" - try: - if not self.location_socket: - return - - # 转换为字符串格式 - location_str = f"{location_data.x:.2f},{location_data.y:.2f},{location_data.strength:.2f},{location_data.angle:.2f}" - - # 发送数据 - self.location_socket.send(location_str.encode('utf-8')) - - except Exception as e: - self.logger.error(f"发送定位数据失败: {e}") - self.connection_status = ConnectionStatus.ERROR - - def _check_pc_commands(self): - """检查PC端指令""" - try: - if not self.cmd_socket: - return - - # 非阻塞接收指令 - try: - self.cmd_socket.settimeout(0.1) # 100ms超时 - data = self.cmd_socket.recv(1024) - - if data: - command = data.decode('utf-8').strip() - self._process_command(command) - - except socket.timeout: - # 超时继续 - pass - - except Exception as e: - self.logger.error(f"检查PC指令失败: {e}") - - def _process_command(self, command: str): - """处理PC端指令""" - try: - self.logger.info(f"收到PC指令: {command}") - - if command == "START_LOCATION": - self._switch_mode(SystemMode.LOCATING) - elif command == "STOP_LOCATION": - self._switch_mode(SystemMode.RECORDING) - elif command == "SHUTDOWN": - self._switch_mode(SystemMode.SHUTDOWN) - elif command == "HEARTBEAT": - self._send_heartbeat() - elif command.startswith("SET_CONFIG"): - self._process_config_command(command) - else: - self.logger.warning(f"未知指令: {command}") - - except Exception as e: - self.logger.error(f"处理指令失败: {e}") - - def _process_config_command(self, command: str): - """处理配置指令""" - try: - # 解析配置指令格式: SET_CONFIG:section:key:value - parts = command.split(':') - if len(parts) == 4: - section = parts[1] - key = parts[2] - value = parts[3] - - # 更新配置 - if self.config_manager.update_config(section, key, value): - self.logger.info(f"配置更新成功: {section}.{key} = {value}") - - # 重新加载配置 - self._reload_config() - else: - self.logger.error(f"配置更新失败: {section}.{key} = {value}") - - except Exception as e: - self.logger.error(f"处理配置指令失败: {e}") - - def _reload_config(self): - """重新加载配置""" - try: - self.network_config = self.config_manager.get_network_config() - self.audio_config = self.config_manager.get_audio_config() - self.hardware_config = self.config_manager.get_hardware_config() - self.system_config = self.config_manager.get_system_config() - - self.logger.info("配置重新加载完成") - - except Exception as e: - self.logger.error(f"重新加载配置失败: {e}") - - def _perform_sound_source_location(self, audio_data: List[float]) -> Optional[LocationData]: - """执行声源定位""" - try: - # 使用声源定位器进行定位 - if hasattr(self, 'sound_source_locator'): - location_data = self.sound_source_locator.process_audio_frame(audio_data) - return location_data - else: - # 如果没有专门的定位器,使用简单的麦克风方向检测 - return self._get_mic_direction() - - except Exception as e: - self.logger.error(f"声源定位失败: {e}") - return None - - def _get_mic_direction(self) -> Optional[LocationData]: - """获取麦克风方向(简单实现)""" - try: - # 获取麦克风阵列数据 - mic_data = mic.get_direction() - if mic_data: - # 解析麦克风数据 - # 这里需要根据实际的麦克风阵列API进行调整 - angle = mic_data.get('angle', 0) - strength = mic_data.get('strength', 0) - - # 转换为坐标 - distance = 5.0 # 假设距离5米 - x = distance * math.cos(math.radians(angle)) - y = distance * math.sin(math.radians(angle)) - - return LocationData( - x=x, - y=y, - strength=strength, - angle=angle, - timestamp=time.time(), - confidence=0.8, - quality=0.7, - noise_level=0.3, - source_type="gunshot" - ) - else: - return None - - except Exception as e: - self.logger.error(f"获取麦克风方向失败: {e}") - return None - - def _connect_wifi(self) -> bool: - """连接WiFi""" - try: - if self.wifi_status == WiFiStatus.CONNECTED: - return True - - self.wifi_status = WiFiStatus.CONNECTING - self.logger.info(f"正在连接WiFi: {self.network_config.wifi_ssid}") - - # 启用WiFi硬件 - if self.wifi_en: - self.wifi_en.value(1) - time.sleep_ms(1000) # 等待WiFi启动 - - # 连接WiFi - # 这里需要根据实际的WiFi API进行调整 - # self.nic = network.WLAN(network.STA_IF) - # self.nic.active(True) - # self.nic.connect(self.network_config.wifi_ssid, self.network_config.wifi_password) - - # 等待连接 - max_wait = int(self.network_config.connection_timeout * 1000) - wait_time = 0 - while wait_time < max_wait: - # if self.nic.isconnected(): - # self.wifi_status = WiFiStatus.CONNECTED - # self.logger.info("WiFi连接成功") - # return True - - time.sleep_ms(100) - wait_time += 100 - - self.wifi_status = WiFiStatus.ERROR - self.logger.error("WiFi连接超时") - return False - - except Exception as e: - self.wifi_status = WiFiStatus.ERROR - self.logger.error(f"WiFi连接失败: {e}") - return False - - def _setup_socket_connections(self) -> bool: - """建立Socket连接""" - try: - self.connection_status = ConnectionStatus.CONNECTING - self.logger.info("正在建立Socket连接...") - - # 连接音频Socket - self.audio_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.audio_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_audio)) - self.audio_socket.settimeout(self.network_config.socket_timeout) - - # 连接指令Socket - self.cmd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.cmd_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_cmd)) - self.cmd_socket.settimeout(self.network_config.socket_timeout) - - # 连接定位Socket - self.location_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.location_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_location)) - self.location_socket.settimeout(self.network_config.socket_timeout) - - self.connection_status = ConnectionStatus.CONNECTED - self.logger.info("所有Socket连接建立成功") - return True - - except Exception as e: - self.connection_status = ConnectionStatus.ERROR - self.logger.error(f"Socket连接失败: {e}") - return False - - def _cleanup(self): - """清理资源""" - try: - self.logger.info("开始清理资源...") - self.running = False - - # 关闭Socket连接 - for socket_name, sock in [('audio', self.audio_socket), - ('cmd', self.cmd_socket), - ('location', self.location_socket)]: - if sock: - try: - sock.close() - self.logger.info(f"关闭{socket_name} Socket") - except Exception as e: - self.logger.error(f"关闭{socket_name} Socket失败: {e}") - - # 关闭硬件 - self._shutdown_hardware() - - # 停止定时器 - if self.heartbeat_timer: - self.heartbeat_timer.stop() - if self.health_check_timer: - self.health_check_timer.stop() - - self.logger.info("资源清理完成") - - except Exception as e: - self.logger.error(f"清理资源失败: {e}") # ========== 声源定位核心算法 ========== class KalmanFilter: @@ -2451,57 +1933,91 @@ class KalmanFilter: self.x = np.zeros((self.dim_x, 1)) class SoundSourceLocator: - """声源定位器,集成多种算法提升定位精度""" + """声源定位器 - 基于TDOA算法的多麦克风阵列声源定位系统 + + 核心算法: + - TDOA (Time Difference of Arrival): 基于到达时间差的定位算法 + - 互相关分析:计算麦克风间的时延 + - 几何定位:通过时延计算声源位置 + - 卡尔曼滤波:平滑定位结果 + - 噪声抑制:提高定位精度 + + 功能特性: + - 多麦克风阵列支持(4个麦克风) + - 自适应噪声抑制 + - 实时定位计算 + - 多源检测能力 + - 定位质量评估 + """ def __init__(self, config: HardwareConfig): - self.config = config + self.config = config # 硬件配置参数 self.logger = Logger("SoundSourceLocator") - # 卡尔曼滤波器 - self.kalman_filter = KalmanFilter() + # ========== 卡尔曼滤波器 ========== + self.kalman_filter = KalmanFilter() # 用于平滑定位结果 - # 定位参数 - self.mic_positions = self._calculate_mic_positions() - self.sound_speed = 343.0 # 声速 (m/s) - self.sample_rate = 16000 - self.frame_size = 1024 + # ========== 定位参数 ========== + self.mic_positions = self._calculate_mic_positions() # 麦克风位置 + self.sound_speed = 343.0 # 声速 (m/s),标准大气压 + self.sample_rate = 16000 # 采样率 (Hz) + self.frame_size = 1024 # 处理帧大小 - # 噪声处理 - self.noise_threshold = 0.1 - self.signal_threshold = 0.3 - self.min_signal_duration = 0.1 # 最小信号持续时间 + # ========== 噪声处理参数 ========== + self.noise_threshold = 0.1 # 噪声阈值 + self.signal_threshold = 0.3 # 信号阈值 + self.min_signal_duration = 0.1 # 最小信号持续时间 (s) - # 定位历史 - self.location_history = [] - self.max_history_size = 50 + # ========== 定位历史管理 ========== + self.location_history = [] # 定位历史记录 + self.max_history_size = 50 # 最大历史记录数 - # 自适应参数 - self.adaptive_gain = 1.0 - self.adaptive_threshold = 0.2 + # ========== 自适应参数 ========== + self.adaptive_gain = 1.0 # 自适应增益 + self.adaptive_threshold = 0.2 # 自适应阈值 - # 多源检测 - self.max_sources = 3 - self.source_tracking = {} + # ========== 多源检测 ========== + self.max_sources = 3 # 最大声源数量 + self.source_tracking = {} # 声源跟踪字典 self.logger.info("声源定位器初始化完成") def _calculate_mic_positions(self) -> List[Tuple[float, float]]: - """计算麦克风阵列位置""" + """计算麦克风阵列位置 - 假设为圆形阵列布局 + + 返回: + - List[Tuple[float, float]]: 麦克风坐标列表 [(x1,y1), (x2,y2), ...] + """ # 假设麦克风阵列为圆形排列,半径为0.1米 radius = 0.1 mic_count = 4 positions = [] for i in range(mic_count): - angle = 2 * math.pi * i / mic_count - x = radius * math.cos(angle) - y = radius * math.sin(angle) + angle = 2 * math.pi * i / mic_count # 均匀分布角度 + x = radius * math.cos(angle) # X坐标 + y = radius * math.sin(angle) # Y坐标 positions.append((x, y)) return positions def process_audio_frame(self, audio_data: List[float]) -> Optional[LocationData]: - """处理音频帧,返回定位结果""" + """处理音频帧,返回定位结果 - 核心定位处理函数 + + 处理流程: + 1. 信号预处理:分离麦克风数据,应用窗函数 + 2. 时延估计:计算麦克风间的到达时间差 + 3. 声源定位:基于TDOA算法计算声源位置 + 4. 卡尔曼滤波:平滑定位结果 + 5. 后处理:异常值检测、轨迹平滑 + 6. 历史更新:维护定位历史记录 + + 参数: + - audio_data: 原始音频数据列表 + + 返回: + - Optional[LocationData]: 定位结果,失败时返回None + """ try: # 1. 信号预处理 processed_data = self._preprocess_audio(audio_data) @@ -2534,7 +2050,20 @@ class SoundSourceLocator: return None def _preprocess_audio(self, audio_data: List[float]) -> Optional[List[List[float]]]: - """音频预处理""" + """音频预处理 - 分离麦克风数据并应用信号处理 + + 处理步骤: + 1. 数据有效性检查 + 2. 分离各麦克风数据 + 3. 应用窗函数(汉宁窗) + 4. 噪声抑制 + + 参数: + - audio_data: 原始音频数据 + + 返回: + - Optional[List[List[float]]]: 处理后的麦克风数据列表 + """ try: # 检查数据有效性 if len(audio_data) < self.frame_size * len(self.mic_positions): @@ -2549,7 +2078,7 @@ class SoundSourceLocator: end_idx = start_idx + samples_per_mic mic_data.append(audio_data[start_idx:end_idx]) - # 应用窗函数 + # 应用窗函数(汉宁窗)减少频谱泄漏 window = np.hanning(self.frame_size) for i in range(len(mic_data)): if len(mic_data[i]) >= self.frame_size: @@ -2566,7 +2095,20 @@ class SoundSourceLocator: return None def _noise_reduction(self, audio_data: List[float]) -> List[float]: - """噪声抑制""" + """噪声抑制 - 基于频谱减法的噪声抑制算法 + + 算法原理: + 1. 计算信号能量 + 2. 自适应阈值判断 + 3. 频谱减法降噪 + 4. 信号重建 + + 参数: + - audio_data: 输入音频数据 + + 返回: + - List[float]: 降噪后的音频数据 + """ try: # 计算信号能量 energy = np.mean(np.array(audio_data) ** 2) @@ -2580,15 +2122,15 @@ class SoundSourceLocator: # 频谱减法降噪 fft_data = np.fft.fft(audio_data) - magnitude = np.abs(fft_data) - phase = np.angle(fft_data) + magnitude = np.abs(fft_data) # 幅度谱 + phase = np.angle(fft_data) # 相位谱 # 估计噪声谱 noise_spectrum = self._estimate_noise_spectrum(magnitude) - # 频谱减法 + # 频谱减法(减去噪声谱) clean_magnitude = magnitude - noise_spectrum - clean_magnitude = np.maximum(clean_magnitude, 0.1 * magnitude) + clean_magnitude = np.maximum(clean_magnitude, 0.1 * magnitude) # 防止过度抑制 # 重建信号 clean_fft = clean_magnitude * np.exp(1j * phase) @@ -2601,20 +2143,36 @@ class SoundSourceLocator: return audio_data def _estimate_noise_spectrum(self, magnitude: np.ndarray) -> np.ndarray: - """估计噪声谱""" + """估计噪声谱 - 简单的噪声谱估计方法 + + 假设:低频部分主要为噪声 + """ # 简单的噪声谱估计,假设低频部分为噪声 noise_spectrum = np.zeros_like(magnitude) noise_spectrum[:len(magnitude)//4] = np.mean(magnitude[:len(magnitude)//4]) return noise_spectrum def _estimate_time_delays(self, mic_data: List[List[float]]) -> Optional[List[float]]: - """估计时延""" + """估计时延 - 基于互相关的时延估计 + + 算法原理: + 1. 以第一个麦克风为参考 + 2. 计算其他麦克风与参考麦克风的互相关 + 3. 找到最大相关点 + 4. 计算时延 + + 参数: + - mic_data: 各麦克风的音频数据 + + 返回: + - Optional[List[float]]: 时延列表(秒) + """ try: if len(mic_data) < 2: return None delays = [] - ref_mic = mic_data[0] + ref_mic = mic_data[0] # 参考麦克风 for i in range(1, len(mic_data)): # 计算互相关 @@ -2623,7 +2181,7 @@ class SoundSourceLocator: # 找到最大相关点 max_idx = np.argmax(correlation) - # 计算时延 + # 计算时延(考虑相关函数长度) delay = (max_idx - len(ref_mic) + 1) / self.sample_rate delays.append(delay) @@ -3031,269 +2589,6 @@ class SoundSourceLocator: except Exception as e: self.logger.error(f"定位器重置失败: {e}") -# ========== 枪声检测类 ========== -class GunshotDetector: - """枪声检测器,用于在定位模式下过滤非枪声信号""" - - def __init__(self, config: AudioConfig): - self.config = config - self.logger = Logger("GunshotDetector") - - # 检测参数 - self.detection_threshold = 0.3 # 检测阈值 - self.min_duration = 0.05 # 最小持续时间(秒) - self.max_duration = 0.5 # 最大持续时间(秒) - self.sample_rate = config.sample_rate - self.frame_size = config.chunk_size - - # 特征提取参数 - self.energy_threshold = 1000 # 能量阈值 - self.spectral_centroid_range = (1000, 8000) # 频谱质心范围 - self.zero_crossing_threshold = 0.1 # 过零率阈值 - - # 检测历史 - self.detection_history = [] - self.max_history_size = 50 - - # 自适应参数 - self.adaptive_threshold = self.detection_threshold - self.noise_floor = 100 - self.signal_ceiling = 30000 - - self.logger.info("枪声检测器初始化完成") - - def detect_gunshot(self, audio_data: List[float]) -> Dict[str, Any]: - """检测音频中是否包含枪声""" - try: - if len(audio_data) < self.frame_size: - return {'is_gunshot': False, 'confidence': 0.0, 'features': {}} - - # 1. 提取特征 - features = self._extract_features(audio_data) - - # 2. 计算检测分数 - detection_score = self._calculate_detection_score(features) - - # 3. 判断是否为枪声 - is_gunshot = detection_score > self.adaptive_threshold - - # 4. 更新自适应参数 - self._update_adaptive_parameters(features, detection_score) - - # 5. 更新检测历史 - self._update_detection_history(is_gunshot, detection_score, features) - - result = { - 'is_gunshot': is_gunshot, - 'confidence': detection_score, - 'features': features, - 'threshold': self.adaptive_threshold, - 'timestamp': time.time() - } - - if is_gunshot: - self.logger.info(f"检测到枪声!置信度: {detection_score:.3f}") - - return result - - except Exception as e: - self.logger.error(f"枪声检测失败: {e}") - return {'is_gunshot': False, 'confidence': 0.0, 'features': {}} - - def _extract_features(self, audio_data: List[float]) -> Dict[str, float]: - """提取音频特征""" - try: - audio_array = np.array(audio_data, dtype=np.float32) - - # 1. 能量特征 - energy = np.mean(audio_array ** 2) - rms = np.sqrt(energy) - - # 2. 频谱特征 - fft_data = np.fft.fft(audio_array) - magnitude = np.abs(fft_data) - - # 频谱质心 - freqs = np.fft.fftfreq(len(audio_array), 1/self.sample_rate) - spectral_centroid = np.sum(freqs * magnitude) / np.sum(magnitude) - - # 频谱带宽 - spectral_bandwidth = np.sqrt(np.sum(((freqs - spectral_centroid) ** 2) * magnitude) / np.sum(magnitude)) - - # 3. 时域特征 - # 过零率 - zero_crossings = np.sum(np.diff(np.sign(audio_array)) != 0) - zero_crossing_rate = zero_crossings / len(audio_array) - - # 峰值因子 - peak_factor = np.max(np.abs(audio_array)) / rms if rms > 0 else 0 - - # 4. 包络特征 - # 希尔伯特变换获取包络 - analytic_signal = np.fft.hilbert(audio_array) - envelope = np.abs(analytic_signal) - envelope_mean = np.mean(envelope) - envelope_std = np.std(envelope) - - features = { - 'energy': energy, - 'rms': rms, - 'spectral_centroid': spectral_centroid, - 'spectral_bandwidth': spectral_bandwidth, - 'zero_crossing_rate': zero_crossing_rate, - 'peak_factor': peak_factor, - 'envelope_mean': envelope_mean, - 'envelope_std': envelope_std - } - - return features - - except Exception as e: - self.logger.error(f"特征提取失败: {e}") - return {} - - def _calculate_detection_score(self, features: Dict[str, float]) -> float: - """计算检测分数""" - try: - if not features: - return 0.0 - - score = 0.0 - weights = { - 'energy': 0.3, - 'spectral_centroid': 0.25, - 'peak_factor': 0.2, - 'envelope_std': 0.15, - 'zero_crossing_rate': 0.1 - } - - # 1. 能量分数 - if 'energy' in features: - energy_score = min(1.0, features['energy'] / self.signal_ceiling) - score += weights['energy'] * energy_score - - # 2. 频谱质心分数 - if 'spectral_centroid' in features: - centroid = features['spectral_centroid'] - min_centroid, max_centroid = self.spectral_centroid_range - if min_centroid <= centroid <= max_centroid: - centroid_score = 1.0 - else: - centroid_score = 0.0 - score += weights['spectral_centroid'] * centroid_score - - # 3. 峰值因子分数 - if 'peak_factor' in features: - peak_factor = features['peak_factor'] - if peak_factor > 5.0: # 枪声通常有较高的峰值因子 - peak_score = min(1.0, peak_factor / 20.0) - else: - peak_score = 0.0 - score += weights['peak_factor'] * peak_score - - # 4. 包络标准差分数 - if 'envelope_std' in features: - envelope_std = features['envelope_std'] - if envelope_std > 1000: # 枪声包络变化较大 - envelope_score = min(1.0, envelope_std / 5000) - else: - envelope_score = 0.0 - score += weights['envelope_std'] * envelope_score - - # 5. 过零率分数 - if 'zero_crossing_rate' in features: - zcr = features['zero_crossing_rate'] - if zcr < self.zero_crossing_threshold: # 枪声过零率较低 - zcr_score = 1.0 - (zcr / self.zero_crossing_threshold) - else: - zcr_score = 0.0 - score += weights['zero_crossing_rate'] * zcr_score - - return min(1.0, max(0.0, score)) - - except Exception as e: - self.logger.error(f"检测分数计算失败: {e}") - return 0.0 - - def _update_adaptive_parameters(self, features: Dict[str, float], detection_score: float): - """更新自适应参数""" - try: - # 更新噪声水平 - if 'rms' in features: - rms = features['rms'] - if rms < self.signal_ceiling: - self.noise_floor = 0.9 * self.noise_floor + 0.1 * rms - - # 更新检测阈值 - if detection_score > 0.5: - # 检测到强信号,降低阈值 - self.adaptive_threshold *= 0.95 - elif detection_score < 0.1: - # 检测到弱信号,提高阈值 - self.adaptive_threshold *= 1.05 - - # 限制阈值范围 - self.adaptive_threshold = max(0.1, min(0.8, self.adaptive_threshold)) - - except Exception as e: - self.logger.error(f"自适应参数更新失败: {e}") - - def _update_detection_history(self, is_gunshot: bool, confidence: float, features: Dict[str, float]): - """更新检测历史""" - try: - history_entry = { - 'is_gunshot': is_gunshot, - 'confidence': confidence, - 'features': features, - 'timestamp': time.time() - } - - self.detection_history.append(history_entry) - - # 限制历史记录大小 - if len(self.detection_history) > self.max_history_size: - self.detection_history = self.detection_history[-self.max_history_size:] - - except Exception as e: - self.logger.error(f"检测历史更新失败: {e}") - - def get_detection_stats(self) -> Dict[str, Any]: - """获取检测统计信息""" - try: - if not self.detection_history: - return {} - - total_detections = len(self.detection_history) - gunshot_detections = sum(1 for entry in self.detection_history if entry['is_gunshot']) - - confidences = [entry['confidence'] for entry in self.detection_history] - - stats = { - 'total_detections': total_detections, - 'gunshot_detections': gunshot_detections, - 'detection_rate': gunshot_detections / total_detections if total_detections > 0 else 0, - 'avg_confidence': np.mean(confidences) if confidences else 0, - 'max_confidence': np.max(confidences) if confidences else 0, - 'adaptive_threshold': self.adaptive_threshold, - 'noise_floor': self.noise_floor - } - - return stats - - except Exception as e: - self.logger.error(f"获取检测统计失败: {e}") - return {} - - def reset(self): - """重置检测器""" - try: - self.detection_history.clear() - self.adaptive_threshold = self.detection_threshold - self.noise_floor = 100 - self.logger.info("枪声检测器已重置") - except Exception as e: - self.logger.error(f"检测器重置失败: {e}") - def main(): """主函数""" print("=== 声源定位系统 - 开发板端(K210) ===") diff --git a/src/声源定位代码/back-code/pc_server.py b/src/声源定位代码/back-code/pc_server.py index d84eaf4..0e74745 100644 --- a/src/声源定位代码/back-code/pc_server.py +++ b/src/声源定位代码/back-code/pc_server.py @@ -11,32 +11,34 @@ PC端服务器程序 - 声源定位系统 作者: 声源定位系统开发团队 版本: 2.0.0 -日期: 2025 +日期: 2024 """ -import socket -import threading -import time -import wave -import tempfile -import os -import sys -import numpy as np -import matplotlib.pyplot as plt -from matplotlib.animation import FuncAnimation -import queue -import json -import logging -import configparser -from datetime import datetime -from typing import Optional, Dict, Any, Tuple, List -from dataclasses import dataclass -from enum import Enum -import traceback -import signal -import atexit +# ========== 标准库导入 ========== +import socket # 网络通信套接字 +import threading # 多线程支持,用于并发处理 +import time # 时间相关函数 +import wave # WAV音频文件处理 +import tempfile # 临时文件操作 +import os # 操作系统接口 +import sys # 系统相关参数 +import numpy as np # 数值计算库,用于信号处理 +import matplotlib.pyplot as plt # 绘图库,用于可视化 +from matplotlib.animation import FuncAnimation # 动画支持,用于实时绘图 +import queue # 队列数据结构,用于线程间通信 +import json # JSON数据格式处理 +import logging # 日志系统 +import configparser # 配置文件解析 +from datetime import datetime # 日期时间处理 +from typing import Optional, Dict, Any, Tuple, List # 类型提示 +from dataclasses import dataclass # 数据类定义 +from enum import Enum # 枚举类型 +import traceback # 异常追踪 +import signal # 信号处理 +import atexit # 程序退出处理 -# Flask相关导入 +# ========== 第三方库导入 ========== +# Flask Web框架,用于HTTP API服务 try: from flask import Flask, jsonify from flask_cors import CORS @@ -45,9 +47,11 @@ except ImportError: print("警告: Flask模块未找到,HTTP API功能将不可用") FLASK_AVAILABLE = False -# 添加audio-classification项目路径 +# ========== 项目模块导入 ========== +# 添加audio-classification项目路径到系统路径 sys.path.append(os.path.join(os.path.dirname(__file__), 'audio-classification')) +# 导入音频分类模块 try: from macls.predict import MAClsPredictor AUDIO_CLASSIFICATION_AVAILABLE = True @@ -55,25 +59,72 @@ except ImportError: print("警告: audio-classification模块未找到,将使用模拟识别") AUDIO_CLASSIFICATION_AVAILABLE = False +# ========== 系统架构说明 ========== +""" +PC端系统架构设计: + +1. 网络通信层: + - 多端口Socket服务器,分别处理音频、指令和定位数据 + - 异步连接管理,支持多客户端连接 + - 心跳机制,维护连接状态 + +2. 音频处理层: + - 音频数据接收和缓冲 + - 音频质量检测和预处理 + - 与audio-classification模块集成 + +3. 枪声识别层: + - 基于深度学习的音频分类 + - 置信度阈值控制 + - 识别结果过滤和验证 + +4. 定位数据处理层: + - 定位数据解析和验证 + - 卡尔曼滤波平滑处理 + - 异常值检测和过滤 + +5. 可视化层: + - 实时声源位置显示 + - 轨迹跟踪和历史记录 + - 性能指标监控 + +6. HTTP API层: + - RESTful API接口 + - 实时数据查询 + - 系统状态监控 + +7. 配置管理层: + - 配置文件加载和验证 + - 运行时参数调整 + - 系统配置持久化 +""" + # ========== 枚举定义 ========== class SystemMode(Enum): - """系统运行模式枚举""" - LISTENING = "LISTENING" # 监听模式 - LOCATING = "LOCATING" # 定位模式 - ERROR = "ERROR" # 错误模式 - SHUTDOWN = "SHUTDOWN" # 关闭模式 + """系统运行模式枚举 - 定义PC端的主要工作状态""" + LISTENING = "LISTENING" # 监听模式:等待音频数据,进行枪声识别 + LOCATING = "LOCATING" # 定位模式:接收和处理声源定位数据 + ERROR = "ERROR" # 错误模式:系统异常处理 + SHUTDOWN = "SHUTDOWN" # 关闭模式:系统关闭和资源清理 class ConnectionStatus(Enum): - """连接状态枚举""" - DISCONNECTED = "DISCONNECTED" - CONNECTING = "CONNECTING" - CONNECTED = "CONNECTED" - ERROR = "ERROR" + """连接状态枚举 - 描述与开发板的连接状态""" + DISCONNECTED = "DISCONNECTED" # 断开:与开发板无连接 + CONNECTING = "CONNECTING" # 连接中:正在建立连接 + CONNECTED = "CONNECTED" # 已连接:连接正常 # ========== 数据类定义 ========== @dataclass class LocationData: - """定位数据结构""" + """定位数据结构 - 与开发板端保持一致的数据格式 + + 属性说明: + - x, y: 声源在二维平面中的坐标位置(米) + - strength: 声源信号强度(dB),用于判断声源可靠性 + - angle: 声源相对于参考方向的夹角(度),0-360度 + - timestamp: 定位时间戳(秒),用于数据同步 + - confidence: 定位置信度(0-1),表示定位结果的可信程度 + """ x: float y: float strength: float @@ -82,7 +133,7 @@ class LocationData: confidence: float = 1.0 def __post_init__(self): - """数据验证""" + """数据验证 - 确保接收的定位数据有效性""" if not isinstance(self.x, (int, float)) or not isinstance(self.y, (int, float)): raise ValueError("坐标必须是数值类型") if not isinstance(self.strength, (int, float)) or self.strength < 0: @@ -92,14 +143,21 @@ class LocationData: @dataclass class AudioConfig: - """音频配置""" + """音频配置 - 定义音频处理参数 + + 属性说明: + - sample_rate: 音频采样率(Hz),影响音频质量和处理性能 + - channels: 声道数,单声道或立体声 + - chunk_size: 音频块大小(采样点数),影响实时性 + - format: 音频格式,如"int16"、"float32"等 + """ sample_rate: int = 16000 channels: int = 1 chunk_size: int = 1024 format: str = "int16" def validate(self) -> bool: - """验证配置有效性""" + """验证配置有效性 - 确保音频参数在合理范围内""" if self.sample_rate <= 0: raise ValueError("采样率必须大于0") if self.channels not in [1, 2]: @@ -110,7 +168,16 @@ class AudioConfig: @dataclass class NetworkConfig: - """网络配置""" + """网络配置 - 定义网络通信参数 + + 属性说明: + - host: 服务器监听地址,通常为"0.0.0.0"表示所有接口 + - port_audio: 音频数据接收端口 + - port_cmd: 控制指令接收端口 + - port_location: 定位数据接收端口 + - timeout: 连接超时时间(秒) + - buffer_size: 网络缓冲区大小(字节) + """ host: str = "0.0.0.0" port_audio: int = 12346 port_cmd: int = 12347 @@ -119,7 +186,7 @@ class NetworkConfig: buffer_size: int = 4096 def validate(self) -> bool: - """验证配置有效性""" + """验证配置有效性 - 确保网络参数在合理范围内""" if not (1024 <= self.port_audio <= 65535): raise ValueError("音频端口必须在1024-65535范围内") if not (1024 <= self.port_cmd <= 65535): @@ -132,35 +199,49 @@ class NetworkConfig: # ========== 日志配置 ========== def setup_logging(log_level: str = "INFO", log_file: str = "pc_server.log") -> logging.Logger: - """设置日志系统""" - # 创建日志目录 + """设置日志系统 - 配置分级日志记录和文件轮转 + + 参数说明: + - log_level: 日志级别,可选DEBUG、INFO、WARNING、ERROR、CRITICAL + - log_file: 日志文件名,存储在logs目录下 + + 功能特性: + - 同时输出到文件和控制台 + - 支持日志文件轮转,防止文件过大 + - 详细的日志格式,包含时间戳、函数名、行号 + - 分级过滤,不同级别输出到不同目标 + """ + # ========== 创建日志目录 ========== log_dir = "logs" os.makedirs(log_dir, exist_ok=True) - # 配置日志格式 + # ========== 配置日志格式 ========== + # 详细格式:时间戳 - 日志器名称 - 级别 - 函数名:行号 - 消息 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s" date_format = "%Y-%m-%d %H:%M:%S" - # 创建日志记录器 + # ========== 创建日志记录器 ========== logger = logging.getLogger("PCServer") logger.setLevel(getattr(logging, log_level.upper())) - # 清除现有的处理器 + # 清除现有的处理器,避免重复添加 logger.handlers.clear() - # 文件处理器 + # ========== 文件处理器配置 ========== + # 文件处理器:记录所有级别的日志到文件 file_handler = logging.FileHandler( os.path.join(log_dir, log_file), encoding='utf-8' ) - file_handler.setLevel(logging.DEBUG) + file_handler.setLevel(logging.DEBUG) # 文件记录所有级别 file_formatter = logging.Formatter(log_format, date_format) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) - # 控制台处理器 + # ========== 控制台处理器配置 ========== + # 控制台处理器:只显示INFO及以上级别 console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) + console_handler.setLevel(logging.INFO) # 控制台只显示重要信息 console_formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s", date_format @@ -443,54 +524,65 @@ class LocationPostProcessor: # ========== 音频处理优化 ========== class AudioProcessor: - """优化的音频处理器,支持批量处理、性能监控和智能缓冲""" + """优化的音频处理器 - 支持批量处理、性能监控和智能缓冲 + + 功能特性: + - 智能缓冲管理:动态调整缓冲区大小 + - 音频质量检测:过滤低质量音频数据 + - 批量识别处理:提高识别效率 + - 性能监控:实时统计处理性能 + - 异常处理:完善的错误恢复机制 + """ def __init__(self, config: AudioConfig, recognition_config: Dict[str, Any]): - self.config = config - self.recognition_config = recognition_config + self.config = config # 音频配置参数 + self.recognition_config = recognition_config # 识别配置参数 self.logger = logging.getLogger("AudioProcessor") - # 音频缓冲管理 - self.audio_buffer = [] - self.buffer_lock = threading.Lock() + # ========== 音频缓冲管理 ========== + self.audio_buffer = [] # 音频数据缓冲区 + self.buffer_lock = threading.Lock() # 缓冲区线程锁 + # 缓冲区大小计算:基于识别间隔和采样率 self.max_buffer_size = int(config.sample_rate * recognition_config['recognition_interval'] * 2) self.min_buffer_size = int(config.sample_rate * recognition_config['recognition_interval'] * 0.5) - # 性能监控 + # ========== 性能监控统计 ========== self.processing_stats = { - 'total_audio_bytes': 0, - 'total_audio_frames': 0, - 'recognition_attempts': 0, - 'gunshot_detections': 0, - 'processing_times': [], - 'buffer_overflow_count': 0, - 'last_recognition_time': 0 + 'total_audio_bytes': 0, # 总处理音频字节数 + 'total_audio_frames': 0, # 总处理音频帧数 + 'recognition_attempts': 0, # 识别尝试次数 + 'gunshot_detections': 0, # 枪声检测次数 + 'processing_times': [], # 处理时间记录 + 'buffer_overflow_count': 0, # 缓冲区溢出次数 + 'last_recognition_time': 0 # 上次识别时间 } - # 音频质量检测 + # ========== 音频质量检测阈值 ========== self.quality_thresholds = { - 'min_rms': 50, # 最小RMS值 - 'max_rms': 30000, # 最大RMS值 - 'min_nonzero_ratio': 0.1, # 最小非零比例 - 'max_silence_ratio': 0.8 # 最大静音比例 + 'min_rms': 50, # 最小RMS值(避免静音) + 'max_rms': 30000, # 最大RMS值(避免过载) + 'min_nonzero_ratio': 0.1, # 最小非零比例(避免全静音) + 'max_silence_ratio': 0.8 # 最大静音比例(避免静音过多) } - # 预测器 - self.predictor = None - self._init_predictor() + # ========== 音频分类预测器 ========== + self.predictor = None # 深度学习预测器 + self._init_predictor() # 初始化预测器 self.logger.info("音频处理器初始化完成") def _init_predictor(self): - """初始化音频分类预测器""" + """初始化音频分类预测器 - 加载深度学习模型""" if not AUDIO_CLASSIFICATION_AVAILABLE: self.logger.warning("audio-classification模块未找到,将使用模拟识别") return try: + # 获取配置文件和模型路径 configs_path = os.path.abspath(self.recognition_config['configs_path']) model_path = os.path.abspath(self.recognition_config['model_path']) + # 验证文件存在性 if not os.path.exists(configs_path): self.logger.error(f"配置文件不存在: {configs_path}") return @@ -499,6 +591,7 @@ class AudioProcessor: self.logger.error(f"模型路径不存在: {model_path}") return + # 创建预测器实例 self.predictor = MAClsPredictor( configs=configs_path, model_path=model_path, @@ -512,22 +605,29 @@ class AudioProcessor: self.predictor = None def add_audio_data(self, audio_data: bytes) -> bool: - """添加音频数据到缓冲区""" + """添加音频数据到缓冲区 - 支持线程安全的数据添加 + + 参数: + - audio_data: 原始音频字节数据 + + 返回: + - bool: 添加是否成功 + """ try: with self.buffer_lock: - # 转换为numpy数组 + # 转换为numpy数组(int16格式) audio_array = np.frombuffer(audio_data, dtype=np.int16) - # 更新统计 + # 更新性能统计 self.processing_stats['total_audio_bytes'] += len(audio_data) self.processing_stats['total_audio_frames'] += len(audio_array) # 添加到缓冲区 self.audio_buffer.extend(audio_array) - # 检查缓冲区大小 + # 检查缓冲区大小,防止内存溢出 if len(self.audio_buffer) > self.max_buffer_size: - # 保留最新的数据 + # 保留最新的数据,丢弃旧数据 self.audio_buffer = self.audio_buffer[-self.max_buffer_size:] self.processing_stats['buffer_overflow_count'] += 1 self.logger.warning("音频缓冲区溢出,丢弃旧数据") @@ -539,16 +639,21 @@ class AudioProcessor: return False def should_process_recognition(self) -> bool: - """判断是否应该进行识别""" + """判断是否应该进行识别 - 基于时间和缓冲区条件 + + 判断条件: + - 时间间隔:距离上次识别的时间间隔 + - 缓冲区大小:确保有足够的数据进行识别 + """ try: current_time = time.time() - # 检查时间间隔 + # 检查时间间隔(避免过于频繁的识别) if (current_time - self.processing_stats['last_recognition_time'] < self.recognition_config['recognition_interval']): return False - # 检查缓冲区大小 + # 检查缓冲区大小(确保有足够的数据) with self.buffer_lock: return len(self.audio_buffer) >= self.min_buffer_size @@ -557,48 +662,51 @@ class AudioProcessor: return False def process_recognition(self) -> Optional[Dict[str, Any]]: - """执行音频识别""" + """执行音频识别 - 核心识别处理函数 + + 处理流程: + 1. 提取音频片段 + 2. 音频质量检测 + 3. 执行识别(真实或模拟) + 4. 结果验证和统计 + """ try: start_time = time.time() + # 提取音频片段 with self.buffer_lock: - # 获取音频数据 if len(self.audio_buffer) < self.min_buffer_size: return None - # 提取识别所需的数据 - recognition_samples = int(self.config.sample_rate * self.recognition_config['recognition_interval']) - audio_segment = np.array(self.audio_buffer[-recognition_samples:]) + # 提取足够长度的音频数据 + audio_length = int(self.config.sample_rate * self.recognition_config['recognition_interval']) + audio_segment = np.array(self.audio_buffer[:audio_length], dtype=np.float32) - # 清空缓冲区 - self.audio_buffer = [] + # 移除已处理的数据 + self.audio_buffer = self.audio_buffer[audio_length:] - # 检查音频质量 + # 音频质量检测 if not self._check_audio_quality(audio_segment): - self.logger.debug("音频质量不满足识别要求") + self.logger.warning("音频质量不满足要求,跳过识别") return None # 执行识别 - result = self._perform_recognition(audio_segment) + recognition_result = self._perform_recognition(audio_segment) - # 更新统计 + # 更新统计信息 processing_time = time.time() - start_time self.processing_stats['processing_times'].append(processing_time) self.processing_stats['recognition_attempts'] += 1 self.processing_stats['last_recognition_time'] = time.time() - # 保持处理时间历史记录 + # 限制处理时间记录数量 if len(self.processing_stats['processing_times']) > 100: self.processing_stats['processing_times'] = self.processing_stats['processing_times'][-100:] - if result and result.get('is_gunshot', False): - self.processing_stats['gunshot_detections'] += 1 - - self.logger.info(f"音频识别完成: {result}, 耗时: {processing_time:.3f}秒") - return result + return recognition_result except Exception as e: - self.logger.error(f"音频识别失败: {e}") + self.logger.error(f"音频识别处理失败: {e}") return None def _check_audio_quality(self, audio_segment: np.ndarray) -> bool: @@ -1054,15 +1162,6 @@ class PCServer: self.current_location = None self.location_data_count = 0 - # 新增:开发板枪声检测统计 - self.board_gunshot_stats = { - 'total_detections': 0, - 'gunshot_detections': 0, - 'detection_rate': 0.0, - 'avg_confidence': 0.0, - 'last_detection_time': 0 - } - # 线程管理 self.threads = [] self.thread_lock = threading.Lock() @@ -1073,8 +1172,7 @@ class PCServer: 'audio_packets_received': 0, 'location_packets_received': 0, 'recognition_attempts': 0, - 'errors': 0, - 'board_gunshot_detections': 0 # 新增:开发板枪声检测次数 + 'errors': 0 } # HTTP API相关 @@ -1305,8 +1403,7 @@ class PCServer: 'audio_packets_received': 0, 'location_packets_received': 0, 'recognition_attempts': 0, - 'errors': 0, - 'board_gunshot_detections': 0 # 新增:开发板枪声检测次数 + 'errors': 0 } self.logger.info("数据结构初始化完成") @@ -1385,11 +1482,6 @@ class PCServer: self.connection_status['audio'] = ConnectionStatus.DISCONNECTED break - # 检查是否是识别请求 - if self._is_recognition_request(audio_data): - self._handle_recognition_request(audio_data) - continue - # 更新性能统计 self.performance_stats['audio_packets_received'] += 1 @@ -1421,114 +1513,6 @@ class PCServer: finally: self.logger.info("音频处理线程结束") - def _is_recognition_request(self, audio_data: bytes) -> bool: - """检查是否是识别请求""" - try: - # 检查数据开头是否包含识别请求标识 - data_str = audio_data.decode('utf-8', errors='ignore') - return data_str.startswith("RECOGNITION_REQUEST:") - except: - return False - - def _handle_recognition_request(self, audio_data: bytes): - """处理识别请求""" - try: - # 解析请求头 - data_str = audio_data.decode('utf-8', errors='ignore') - if not data_str.startswith("RECOGNITION_REQUEST:"): - return - - # 格式: RECOGNITION_REQUEST:timestamp:data_size - parts = data_str.split(':') - if len(parts) >= 3: - timestamp = float(parts[1]) - data_size = int(parts[2]) - - self.logger.info(f"收到识别请求: 时间戳={timestamp:.3f}, 数据大小={data_size}") - - # 等待接收音频数据 - audio_segment = self._receive_audio_segment(data_size) - if audio_segment is not None: - # 进行枪声识别 - recognition_result = self._perform_audio_recognition(audio_segment) - - # 发送识别结果给开发板 - self._send_recognition_result(timestamp, recognition_result) - - except Exception as e: - self.logger.error(f"处理识别请求失败: {e}") - - def _receive_audio_segment(self, data_size: int) -> Optional[bytes]: - """接收音频数据段""" - try: - # 从音频socket接收指定大小的数据 - received_data = b'' - remaining_size = data_size - - while remaining_size > 0: - chunk = self.communication_manager.audio_socket.recv(min(remaining_size, 4096)) - if not chunk: - break - received_data += chunk - remaining_size -= len(chunk) - - if len(received_data) == data_size: - return received_data - else: - self.logger.warning(f"音频数据接收不完整: 期望{data_size}, 实际{len(received_data)}") - return None - - except Exception as e: - self.logger.error(f"接收音频数据段失败: {e}") - return None - - def _perform_audio_recognition(self, audio_data: bytes) -> Dict[str, Any]: - """执行音频识别""" - try: - # 将字节数据转换为numpy数组 - audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 - - # 检查音频质量 - if not self.audio_processor._check_audio_quality(audio_array): - return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'poor_quality'} - - # 执行识别 - result = self.audio_processor._perform_recognition(audio_array) - - if result: - return { - 'is_gunshot': result.get('is_gunshot', False), - 'confidence': result.get('score', 0.0), - 'label': result.get('label', 'unknown') - } - else: - return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'recognition_failed'} - - except Exception as e: - self.logger.error(f"音频识别失败: {e}") - return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'error'} - - def _send_recognition_result(self, timestamp: float, result: Dict[str, Any]): - """发送识别结果给开发板""" - try: - if not self.communication_manager.cmd_socket: - return - - # 构建识别结果字符串 - # 格式: RECOGNITION_RESULT:timestamp:is_gunshot:confidence - is_gunshot = result.get('is_gunshot', False) - confidence = result.get('confidence', 0.0) - - result_str = f"RECOGNITION_RESULT:{timestamp:.3f}:{str(is_gunshot).lower()}:{confidence:.3f}" - - # 发送结果 - self.communication_manager.cmd_socket.send(result_str.encode('utf-8')) - - self.logger.info(f"发送识别结果: 时间戳={timestamp:.3f}, 枪声={is_gunshot}, 置信度={confidence:.3f}") - - except Exception as e: - self.logger.error(f"发送识别结果失败: {e}") - def _switch_to_location_mode(self): """切换到定位模式""" try: @@ -1611,19 +1595,13 @@ class PCServer: data_str = data.decode('utf-8').strip() if not data_str: return - - # 检查是否是枪声检测结果数据(保留兼容性) - if data_str.startswith("GUNSHOT_DETECTION:"): - self._process_gunshot_detection_data(data_str) - return - - # 解析定位数据 + # 解析数据 location_data = self._parse_location_data(data_str) if location_data: # 应用卡尔曼滤波 filtered_data = self._apply_kalman_filter(location_data) # 后处理(平滑、异常值剔除等) - processed_data = self.location_post_processor.process(filtered_data) + processed_data = location_post_processor.process(filtered_data) # 将数据放入队列 self.location_queue.put(processed_data) self.location_data_count += 1 @@ -1635,49 +1613,6 @@ class PCServer: self.logger.error(f"处理定位数据失败: {e}") self.performance_stats['errors'] += 1 - def _process_gunshot_detection_data(self, data_str: str): - """处理开发板枪声检测数据""" - try: - # 解析格式: GUNSHOT_DETECTION:is_gunshot:confidence:timestamp - parts = data_str.split(':') - if len(parts) >= 4: - is_gunshot = parts[1].lower() == 'true' - confidence = float(parts[2]) - timestamp = float(parts[3]) - - # 更新统计 - self.board_gunshot_stats['total_detections'] += 1 - if is_gunshot: - self.board_gunshot_stats['gunshot_detections'] += 1 - self.performance_stats['board_gunshot_detections'] += 1 - self.board_gunshot_stats['last_detection_time'] = timestamp - - self.logger.info(f"开发板检测到枪声!置信度: {confidence:.3f}") - - # 更新检测率 - total = self.board_gunshot_stats['total_detections'] - gunshots = self.board_gunshot_stats['gunshot_detections'] - self.board_gunshot_stats['detection_rate'] = gunshots / total if total > 0 else 0.0 - - # 更新平均置信度 - if 'avg_confidence' not in self.board_gunshot_stats: - self.board_gunshot_stats['avg_confidence'] = confidence - else: - # 指数移动平均 - alpha = 0.1 - self.board_gunshot_stats['avg_confidence'] = ( - alpha * confidence + - (1 - alpha) * self.board_gunshot_stats['avg_confidence'] - ) - - self.logger.debug(f"开发板枪声检测统计: 总数={total}, 枪声={gunshots}, " - f"检测率={self.board_gunshot_stats['detection_rate']:.3f}, " - f"平均置信度={self.board_gunshot_stats['avg_confidence']:.3f}") - - except Exception as e: - self.logger.error(f"处理开发板枪声检测数据失败: {e}") - self.performance_stats['errors'] += 1 - def _parse_location_data(self, data_str: str) -> Optional[LocationData]: """解析定位数据""" try: @@ -1786,12 +1721,6 @@ class PCServer: f"错误: {self.performance_stats['errors']} | " f"模式: {self.current_mode.value}") - # 添加开发板枪声检测统计 - if self.current_mode == SystemMode.LOCATING: - board_stats = self.board_gunshot_stats - status_text += f" | 开发板检测: {board_stats['gunshot_detections']}/{board_stats['total_detections']} " - status_text += f"({board_stats['detection_rate']:.1%})" - # 在图形上显示状态信息 if hasattr(self, 'status_text_obj'): self.status_text_obj.remove() @@ -1820,8 +1749,7 @@ class PCServer: 'total_packets': self.location_data_count, 'history_size': len(self.location_history), 'queue_size': self.location_queue.qsize() - }, - 'board_gunshot_stats': self.board_gunshot_stats.copy() # 新增:开发板枪声检测统计 + } } def run(self): @@ -1939,8 +1867,7 @@ class PCServer: "endpoints": { "/data": "获取最新定位数据", "/status": "获取系统状态", - "/stats": "获取性能统计", - "/board_stats": "获取开发板枪声检测统计" # 新增 + "/stats": "获取性能统计" } }) @@ -1994,15 +1921,6 @@ class PCServer: self.logger.error(f"获取性能统计失败: {e}") return jsonify({"error": "获取统计失败"}), 500 - @self.flask_app.route("/board_stats") - def get_board_stats(): - """获取开发板枪声检测统计""" - try: - return jsonify(self.board_gunshot_stats) - except Exception as e: - self.logger.error(f"获取开发板统计失败: {e}") - return jsonify({"error": "获取统计失败"}), 500 - self.logger.info("Flask路由设置完成") except Exception as e: