注释 #15

Merged
psf4lx3ga merged 1 commits from wangjing into main 11 months ago

@ -1,421 +1,162 @@
# 声源定位系统 - 开发板与PC端配合工作流程
# 声源定位系统 - 整合版
## 📋 项目概述
## 系统概述
本项目实现了一个基于麦克风阵列的声源定位系统,采用开发板(K210)与PC端服务器协同工作的架构。系统能够实时检测枪声并进行精确的声源定位适用于战场感知、安防监控等场景
这是一个完整的声源定位系统包含开发板端和PC端服务器支持多种可视化界面
### 🎯 系统特点
## 文件结构
- **高精度定位**基于TDOA算法的麦克风阵列定位
- **智能识别**PC端强大的枪声识别算法
- **实时响应**:定位与识别并行处理
- **可靠通信**WiFi网络下的稳定数据传输
- **可视化界面**:实时显示定位结果和系统状态
```
back-code/
├── development_board.py # 开发板端主程序 (K210)
├── pc_server.py # PC端服务器主程序 (整合版)
└── README.md # 本文件
front-code/
└── sound-vue-frontend/ # Vue Web前端 (可选)
```
## 功能特性
### PC端服务器 (pc_server.py)
- ✅ **TCP通信**: 接收开发板音频和定位数据
- ✅ **音频识别**: 集成audio-classification进行枪声检测
- ✅ **实时可视化**: 使用matplotlib显示声源位置
- ✅ **HTTP API**: 提供RESTful API接口
- ✅ **性能监控**: 实时统计和性能分析
- ✅ **卡尔曼滤波**: 定位数据平滑处理
### 开发板端 (development_board.py)
- ✅ **音频采集**: 麦克风阵列录音
- ✅ **声源定位**: 实时计算声源位置
- ✅ **WiFi通信**: 与PC端数据交换
- ✅ **模式切换**: 录音模式 ↔ 定位模式
- ✅ **异常处理**: 完善的错误恢复机制
### Web前端 (可选)
- ✅ **实时显示**: 基于ECharts的可视化
- ✅ **响应式设计**: 支持移动端访问
- ✅ **数据监控**: 实时状态显示
## 快速开始
### 1. 安装依赖
## 🏗️ 系统架构
```bash
# PC端依赖
pip install numpy matplotlib flask flask-cors configparser
# 可选Web前端依赖
cd front-code/sound-vue-frontend
npm install
```
┌─────────────────────────────────────────────────────────────┐
│ 开发板端 (K210) │
├─────────────────────────────────────────────────────────────┤
│ 硬件层: │
│ • 麦克风阵列 (4通道) │
│ • WiFi模块 (ESP8285) │
│ • LED指示灯、蜂鸣器 │
├─────────────────────────────────────────────────────────────┤
│ 软件层: │
│ • 音频采集与预处理 │
│ • 声源定位算法 (TDOA) │
│ • 网络通信管理 │
│ • 系统状态监控 │
└─────────────────────────────────────────────────────────────┘
┌─────────┴─────────┐
│ WiFi网络通信 │
└─────────┬─────────┘
┌─────────────────────────────────────────────────────────────┐
│ PC端服务器 │
├─────────────────────────────────────────────────────────────┤
│ 功能模块: │
│ • 音频识别引擎 (枪声检测) │
│ • 数据可视化界面 (matplotlib) │
│ • Web API服务 (Flask) │
│ • 数据存储与分析 │
└─────────────────────────────────────────────────────────────┘
### 2. 启动PC端服务器
```bash
cd back-code
python pc_server.py
```
## 🔧 硬件配置
### 开发板端硬件
- **主控芯片**: K210 (双核64位RISC-V)
- **麦克风阵列**: 4通道I2S接口
- **网络模块**: ESP8285 WiFi模块
- **存储**: 16MB Flash + 8MB PSRAM
- **接口**: UART、I2S、GPIO、PWM
### 引脚配置
```python
# 麦克风阵列引脚
mic_i2s_d0 = 23 # 数据通道0
mic_i2s_d1 = 22 # 数据通道1
mic_i2s_d2 = 21 # 数据通道2
mic_i2s_d3 = 20 # 数据通道3
mic_i2s_ws = 19 # 字选择
mic_i2s_sclk = 18 # 时钟
# 其他硬件
led_pin = 12 # LED指示灯
buzzer_pin = 13 # 蜂鸣器
wifi_en_pin = 8 # WiFi使能控制
服务器启动后会显示:
- matplotlib可视化界面
- HTTP API服务 (http://localhost:5000)
### 3. 启动开发板端
将`development_board.py`上传到K210开发板并运行。
### 4. 可选启动Web前端
```bash
cd front-code/sound-vue-frontend
npm run serve
```
## 🌐 网络配置
访问 http://localhost:8080 查看Web界面。
### 开发板端网络设置
```python
# WiFi连接配置
wifi_ssid = "junzekeki"
wifi_password = "234567890l"
## HTTP API接口
# PC端服务器地址
pc_ip = "192.168.1.100"
pc_port_audio = 12346 # 音频数据传输端口
pc_port_cmd = 12347 # 指令控制端口
pc_port_location = 12348 # 定位数据传输端口
### 获取定位数据
```
GET http://localhost:5000/data
```
### PC端服务器配置
```python
# 服务器监听配置
host = "0.0.0.0" # 监听所有网络接口
port_audio = 12346 # 音频数据接收端口
port_cmd = 12347 # 指令发送端口
port_location = 12348 # 定位数据接收端口
响应示例:
```json
{
"X": 1.23,
"Y": -0.45,
"strength": 2.5,
"angle": 90.0,
"timestamp": 1640995200.123,
"confidence": 0.85
}
```
## 🔄 详细工作流程
### 阶段1: 系统初始化
#### 开发板端初始化流程
1. **硬件初始化**
- 初始化麦克风阵列I2S接口
- 配置GPIO引脚LED、蜂鸣器、WiFi使能
- 初始化定时器和中断
2. **网络连接**
- 启用WiFi模块
- 连接到指定WiFi网络
- 建立3个Socket连接
* `audio_socket`: 发送音频数据
* `cmd_socket`: 接收PC端指令
* `location_socket`: 发送定位数据
3. **系统启动**
- 启动性能监控模块
- 启动心跳机制30秒间隔
- 初始化音频缓冲区和映射队列
- 切换到录音模式
#### PC端初始化流程
1. **服务器启动**
- 创建3个Socket服务器
- 初始化音频识别模块
- 启动matplotlib可视化界面
- 初始化Flask Web API
2. **等待连接**
- 监听开发板连接请求
- 建立数据通信通道
- 启动数据处理线程
### 阶段2: 录音监听模式
#### 开发板端工作流程
1. **音频采集**
- 从麦克风阵列获取音频数据
- 应用增益控制和噪声抑制
- 将音频数据转换为标准格式
2. **数据传输**
- 将音频数据通过`audio_socket`发送给PC端
- 更新性能统计(发送包数、数据量等)
3. **指令监听**
- 非阻塞检查`cmd_socket`是否有PC端指令
- 处理模式切换指令START_LOCATION、STOP_LOCATION等
#### PC端工作流程
1. **音频接收**
- 从`audio_socket`接收音频数据
- 将数据添加到音频处理器缓冲区
2. **枪声识别**
- 当缓冲区达到处理阈值时进行识别
- 使用预训练的音频分类模型
- 计算枪声检测置信度
3. **模式切换**
- 当检测到枪声时,发送"START_LOCATION"指令
- 切换到定位模式进行精确定位
### 阶段3: 定位识别模式(核心流程)
#### 开发板端定位流程
1. **音频缓冲**
- 持续录音并添加到定位音频缓冲区
- 缓冲区大小0.5秒音频数据
- 当缓冲区满时触发处理
2. **声源定位**
- 对缓冲的音频数据进行预处理
- 计算各麦克风间的时延差TDOA
- 使用最小二乘法求解声源位置
- 应用卡尔曼滤波平滑定位结果
3. **映射存储**
- 将定位结果和对应音频存储为映射关系
- 映射结构:
```python
{
'location_data': LocationData对象,
'audio_data': 音频数据列表,
'timestamp': 时间戳,
'processed': False
}
```
4. **识别请求**
- 构建识别请求:`RECOGNITION_REQUEST:timestamp:data_size`
- 通过`audio_socket`发送音频数据给PC端
- 清空音频缓冲区,准备下一轮
#### PC端识别流程
1. **请求处理**
- 检测识别请求标识
- 解析请求头获取时间戳和数据大小
- 接收指定大小的音频数据
2. **枪声识别**
- 将音频数据转换为numpy数组
- 检查音频质量(信噪比、能量等)
- 使用音频分类模型进行识别
- 计算识别置信度
3. **结果返回**
- 构建识别结果:`RECOGNITION_RESULT:timestamp:is_gunshot:confidence`
- 通过`cmd_socket`发送给开发板
### 阶段4: 结果处理与输出
#### 开发板端结果处理
1. **结果接收**
- 从`cmd_socket`接收识别结果
- 解析时间戳、枪声标识、置信度
2. **时间戳匹配**
- 在映射队列中查找时间戳最接近的定位数据
- 匹配条件时间差小于1秒
- 标记匹配的映射为已处理
3. **条件输出**
- 如果识别结果为枪声:
* 提取对应的定位坐标
* 通过`location_socket`发送给PC端
* 记录日志和性能统计
- 如果识别结果不是枪声:
* 忽略该定位数据
* 继续监听下一轮
4. **资源清理**
- 移除已处理的映射关系
- 清理过期的识别结果超过5秒
- 维护映射队列大小最大20个
#### PC端数据处理
1. **定位数据接收**
- 从`location_socket`接收定位数据
- 解析坐标、强度、角度等信息
2. **数据后处理**
- 应用卡尔曼滤波平滑轨迹
- 异常值检测和剔除
- 数据平滑和插值
3. **可视化更新**
- 更新matplotlib实时图表
- 显示枪声位置、轨迹、统计信息
- 更新Web API数据接口
4. **数据存储**
- 将定位数据添加到历史记录
- 更新性能统计和系统状态
- 生成分析报告
### 阶段5: 模式切换与维护
#### 动态模式切换
1. **录音→定位模式**
- 触发条件PC端检测到枪声
- 切换指令:`START_LOCATION`
- 开发板响应:重置定位缓冲区,开始定位流程
2. **定位→录音模式**
- 触发条件PC端发送停止指令或超时
- 切换指令:`STOP_LOCATION`
- 开发板响应:清理定位资源,回到录音模式
#### 系统维护
1. **心跳机制**
- 开发板每30秒发送心跳包
- 包含系统状态、内存使用、错误统计
- PC端监控连接状态和系统健康
2. **错误恢复**
- 网络断开自动重连
- 硬件故障检测和恢复
- 异常状态处理和日志记录
3. **性能监控**
- 实时监控CPU、内存使用率
- 统计数据传输量和延迟
- 生成性能报告和告警
## 📊 数据格式规范
### 音频数据格式
```python
# 音频参数
sample_rate = 16000 # 采样率 16kHz
channels = 1 # 单声道
format = "int16" # 16位整数格式
chunk_size = 1024 # 数据块大小
### 获取系统状态
```
GET http://localhost:5000/status
```
### 定位数据格式
```python
# 定位数据结构
LocationData {
x: float, # X坐标 (米)
y: float, # Y坐标 (米)
strength: float, # 信号强度 (0-1)
angle: float, # 方位角 (度)
timestamp: float, # 时间戳
confidence: float, # 置信度 (0-1)
quality: float, # 定位质量 (0-1)
noise_level: float # 噪声水平 (0-1)
}
### 获取性能统计
```
GET http://localhost:5000/stats
```
## 配置说明
系统使用`config.ini`文件进行配置,主要配置项:
```ini
[NETWORK]
host = 0.0.0.0
port_audio = 12346
port_cmd = 12347
port_location = 12348
[HTTP_API]
enabled = True
host = 0.0.0.0
port = 5000
cors_enabled = True
[RECOGNITION]
gunshot_threshold = 0.7
recognition_interval = 3.0
```
### 通信协议格式
```python
# 识别请求
"RECOGNITION_REQUEST:timestamp:data_size"
## 使用场景
### 场景1仅使用matplotlib界面
直接运行`pc_server.py`系统会自动显示matplotlib可视化界面。
# 识别结果
"RECOGNITION_RESULT:timestamp:is_gunshot:confidence"
### 场景2仅使用Web界面
1. 启动`pc_server.py` (提供HTTP API)
2. 启动Vue前端
3. 通过浏览器访问Web界面
# 定位数据
"x,y,strength,angle"
### 场景3同时使用两种界面
两个界面可以同时运行,数据实时同步。
# 心跳数据
"HEARTBEAT:timestamp:mode:status:memory"
## 故障排除
### 1. Flask模块未找到
```bash
pip install flask flask-cors
```
## 🎯 系统优势
### 1. 准确性优势
- **分离式处理**开发板专注定位PC端专注识别
- **时间戳匹配**:精确关联定位数据和识别结果
- **多重验证**:音频质量检查、置信度阈值、异常值检测
### 2. 实时性优势
- **并行处理**:定位和识别同时进行
- **非阻塞通信**Socket超时机制避免阻塞
- **缓冲优化**:合理的缓冲区大小和清理策略
### 3. 可靠性优势
- **自动重连**:网络断开自动恢复
- **错误处理**:完善的异常捕获和恢复机制
- **状态监控**:实时监控系统健康状态
### 4. 扩展性优势
- **模块化设计**:各功能模块独立,易于升级
- **配置灵活**:支持动态配置参数
- **接口标准化**:标准化的数据格式和通信协议
## 🔍 应用场景
### 1. 战场感知
- 实时检测枪声位置
- 威胁源定位和追踪
- 战场态势分析
### 2. 安防监控
- 枪声检测和报警
- 安全区域监控
- 事件记录和分析
### 3. 训练模拟
- 射击训练评估
- 战术演练分析
- 性能数据统计
### 4. 城市安全
- 公共安全监控
- 应急响应支持
- 犯罪预防分析
## 📈 性能指标
### 定位精度
- **角度精度**: ±2° (在10米距离)
- **距离精度**: ±0.5米 (在10米距离)
- **响应时间**: <100ms
### 识别性能
- **检测准确率**: >95%
- **误报率**: <2%
- **漏报率**: <3%
### 系统性能
- **最大检测距离**: 50米
- **工作温度**: -20°C ~ +70°C
- **连续工作时间**: >24小时
- **网络延迟**: <50ms
## 🛠️ 部署说明
### 开发板端部署
1. 将代码烧录到K210开发板
2. 配置WiFi网络参数
3. 连接麦克风阵列硬件
4. 启动系统并检查连接状态
### PC端部署
1. 安装Python依赖包
2. 配置服务器网络参数
3. 启动音频识别服务
4. 运行可视化界面
### 网络配置
1. 确保开发板和PC在同一WiFi网络
2. 检查防火墙设置
3. 验证端口连通性
4. 测试数据传输
## 📝 注意事项
1. **硬件连接**确保麦克风阵列正确连接检查I2S信号质量
2. **网络稳定**使用稳定的WiFi网络避免频繁断开
3. **电源供应**开发板需要稳定的5V电源供应
4. **环境噪声**:避免强电磁干扰和机械振动
5. **定期维护**:定期检查系统状态和清理日志文件
---
**版本**: 3.0.0
**作者**: 声源定位系统开发团队
**日期**: 2025年
**许可证**: MIT License
### 2. 端口被占用
修改`config.ini`中的端口配置。
### 3. 开发板连接失败
检查网络配置和开发板IP地址。
## 版本历史
- **v2.0.0**: 整合Flask API支持多种可视化界面
- **v1.0.0**: 基础声源定位功能
## 技术支持
如有问题,请检查日志文件或联系开发团队。

File diff suppressed because it is too large Load Diff

@ -11,32 +11,34 @@ PC端服务器程序 - 声源定位系统
作者: 声源定位系统开发团队
版本: 2.0.0
日期: 2025
日期: 2024
"""
import socket
import threading
import time
import wave
import tempfile
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import queue
import json
import logging
import configparser
from datetime import datetime
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from enum import Enum
import traceback
import signal
import atexit
# ========== 标准库导入 ==========
import socket # 网络通信套接字
import threading # 多线程支持,用于并发处理
import time # 时间相关函数
import wave # WAV音频文件处理
import tempfile # 临时文件操作
import os # 操作系统接口
import sys # 系统相关参数
import numpy as np # 数值计算库,用于信号处理
import matplotlib.pyplot as plt # 绘图库,用于可视化
from matplotlib.animation import FuncAnimation # 动画支持,用于实时绘图
import queue # 队列数据结构,用于线程间通信
import json # JSON数据格式处理
import logging # 日志系统
import configparser # 配置文件解析
from datetime import datetime # 日期时间处理
from typing import Optional, Dict, Any, Tuple, List # 类型提示
from dataclasses import dataclass # 数据类定义
from enum import Enum # 枚举类型
import traceback # 异常追踪
import signal # 信号处理
import atexit # 程序退出处理
# Flask相关导入
# ========== 第三方库导入 ==========
# Flask Web框架用于HTTP API服务
try:
from flask import Flask, jsonify
from flask_cors import CORS
@ -45,9 +47,11 @@ except ImportError:
print("警告: Flask模块未找到HTTP API功能将不可用")
FLASK_AVAILABLE = False
# 添加audio-classification项目路径
# ========== 项目模块导入 ==========
# 添加audio-classification项目路径到系统路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'audio-classification'))
# 导入音频分类模块
try:
from macls.predict import MAClsPredictor
AUDIO_CLASSIFICATION_AVAILABLE = True
@ -55,25 +59,72 @@ except ImportError:
print("警告: audio-classification模块未找到将使用模拟识别")
AUDIO_CLASSIFICATION_AVAILABLE = False
# ========== 系统架构说明 ==========
"""
PC端系统架构设计
1. 网络通信层
- 多端口Socket服务器分别处理音频指令和定位数据
- 异步连接管理支持多客户端连接
- 心跳机制维护连接状态
2. 音频处理层
- 音频数据接收和缓冲
- 音频质量检测和预处理
- 与audio-classification模块集成
3. 枪声识别层
- 基于深度学习的音频分类
- 置信度阈值控制
- 识别结果过滤和验证
4. 定位数据处理层
- 定位数据解析和验证
- 卡尔曼滤波平滑处理
- 异常值检测和过滤
5. 可视化层
- 实时声源位置显示
- 轨迹跟踪和历史记录
- 性能指标监控
6. HTTP API层
- RESTful API接口
- 实时数据查询
- 系统状态监控
7. 配置管理层
- 配置文件加载和验证
- 运行时参数调整
- 系统配置持久化
"""
# ========== 枚举定义 ==========
class SystemMode(Enum):
"""系统运行模式枚举"""
LISTENING = "LISTENING" # 监听模式
LOCATING = "LOCATING" # 定位模式
ERROR = "ERROR" # 错误模式
SHUTDOWN = "SHUTDOWN" # 关闭模式
"""系统运行模式枚举 - 定义PC端的主要工作状态"""
LISTENING = "LISTENING" # 监听模式:等待音频数据,进行枪声识别
LOCATING = "LOCATING" # 定位模式:接收和处理声源定位数据
ERROR = "ERROR" # 错误模式:系统异常处理
SHUTDOWN = "SHUTDOWN" # 关闭模式:系统关闭和资源清理
class ConnectionStatus(Enum):
"""连接状态枚举"""
DISCONNECTED = "DISCONNECTED"
CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
ERROR = "ERROR"
"""连接状态枚举 - 描述与开发板的连接状态"""
DISCONNECTED = "DISCONNECTED" # 断开:与开发板无连接
CONNECTING = "CONNECTING" # 连接中:正在建立连接
CONNECTED = "CONNECTED" # 已连接:连接正常
# ========== 数据类定义 ==========
@dataclass
class LocationData:
"""定位数据结构"""
"""定位数据结构 - 与开发板端保持一致的数据格式
属性说明
- x, y: 声源在二维平面中的坐标位置
- strength: 声源信号强度dB用于判断声源可靠性
- angle: 声源相对于参考方向的夹角0-360
- timestamp: 定位时间戳用于数据同步
- confidence: 定位置信度0-1表示定位结果的可信程度
"""
x: float
y: float
strength: float
@ -82,7 +133,7 @@ class LocationData:
confidence: float = 1.0
def __post_init__(self):
"""数据验证"""
"""数据验证 - 确保接收的定位数据有效性"""
if not isinstance(self.x, (int, float)) or not isinstance(self.y, (int, float)):
raise ValueError("坐标必须是数值类型")
if not isinstance(self.strength, (int, float)) or self.strength < 0:
@ -92,14 +143,21 @@ class LocationData:
@dataclass
class AudioConfig:
"""音频配置"""
"""音频配置 - 定义音频处理参数
属性说明
- sample_rate: 音频采样率Hz影响音频质量和处理性能
- channels: 声道数单声道或立体声
- chunk_size: 音频块大小采样点数影响实时性
- format: 音频格式"int16""float32"
"""
sample_rate: int = 16000
channels: int = 1
chunk_size: int = 1024
format: str = "int16"
def validate(self) -> bool:
"""验证配置有效性"""
"""验证配置有效性 - 确保音频参数在合理范围内"""
if self.sample_rate <= 0:
raise ValueError("采样率必须大于0")
if self.channels not in [1, 2]:
@ -110,7 +168,16 @@ class AudioConfig:
@dataclass
class NetworkConfig:
"""网络配置"""
"""网络配置 - 定义网络通信参数
属性说明
- host: 服务器监听地址通常为"0.0.0.0"表示所有接口
- port_audio: 音频数据接收端口
- port_cmd: 控制指令接收端口
- port_location: 定位数据接收端口
- timeout: 连接超时时间
- buffer_size: 网络缓冲区大小字节
"""
host: str = "0.0.0.0"
port_audio: int = 12346
port_cmd: int = 12347
@ -119,7 +186,7 @@ class NetworkConfig:
buffer_size: int = 4096
def validate(self) -> bool:
"""验证配置有效性"""
"""验证配置有效性 - 确保网络参数在合理范围内"""
if not (1024 <= self.port_audio <= 65535):
raise ValueError("音频端口必须在1024-65535范围内")
if not (1024 <= self.port_cmd <= 65535):
@ -132,35 +199,49 @@ class NetworkConfig:
# ========== 日志配置 ==========
def setup_logging(log_level: str = "INFO", log_file: str = "pc_server.log") -> logging.Logger:
"""设置日志系统"""
# 创建日志目录
"""设置日志系统 - 配置分级日志记录和文件轮转
参数说明
- log_level: 日志级别可选DEBUGINFOWARNINGERRORCRITICAL
- log_file: 日志文件名存储在logs目录下
功能特性
- 同时输出到文件和控制台
- 支持日志文件轮转防止文件过大
- 详细的日志格式包含时间戳函数名行号
- 分级过滤不同级别输出到不同目标
"""
# ========== 创建日志目录 ==========
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置日志格式
# ========== 配置日志格式 ==========
# 详细格式:时间戳 - 日志器名称 - 级别 - 函数名:行号 - 消息
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"
# 创建日志记录器
# ========== 创建日志记录器 ==========
logger = logging.getLogger("PCServer")
logger.setLevel(getattr(logging, log_level.upper()))
# 清除现有的处理器
# 清除现有的处理器,避免重复添加
logger.handlers.clear()
# 文件处理器
# ========== 文件处理器配置 ==========
# 文件处理器:记录所有级别的日志到文件
file_handler = logging.FileHandler(
os.path.join(log_dir, log_file),
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setLevel(logging.DEBUG) # 文件记录所有级别
file_formatter = logging.Formatter(log_format, date_format)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 控制台处理器
# ========== 控制台处理器配置 ==========
# 控制台处理器只显示INFO及以上级别
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setLevel(logging.INFO) # 控制台只显示重要信息
console_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s",
date_format
@ -443,54 +524,65 @@ class LocationPostProcessor:
# ========== 音频处理优化 ==========
class AudioProcessor:
"""优化的音频处理器,支持批量处理、性能监控和智能缓冲"""
"""优化的音频处理器 - 支持批量处理、性能监控和智能缓冲
功能特性
- 智能缓冲管理动态调整缓冲区大小
- 音频质量检测过滤低质量音频数据
- 批量识别处理提高识别效率
- 性能监控实时统计处理性能
- 异常处理完善的错误恢复机制
"""
def __init__(self, config: AudioConfig, recognition_config: Dict[str, Any]):
self.config = config
self.recognition_config = recognition_config
self.config = config # 音频配置参数
self.recognition_config = recognition_config # 识别配置参数
self.logger = logging.getLogger("AudioProcessor")
# 音频缓冲管理
self.audio_buffer = []
self.buffer_lock = threading.Lock()
# ========== 音频缓冲管理 ==========
self.audio_buffer = [] # 音频数据缓冲区
self.buffer_lock = threading.Lock() # 缓冲区线程锁
# 缓冲区大小计算:基于识别间隔和采样率
self.max_buffer_size = int(config.sample_rate * recognition_config['recognition_interval'] * 2)
self.min_buffer_size = int(config.sample_rate * recognition_config['recognition_interval'] * 0.5)
# 性能监控
# ========== 性能监控统计 ==========
self.processing_stats = {
'total_audio_bytes': 0,
'total_audio_frames': 0,
'recognition_attempts': 0,
'gunshot_detections': 0,
'processing_times': [],
'buffer_overflow_count': 0,
'last_recognition_time': 0
'total_audio_bytes': 0, # 总处理音频字节数
'total_audio_frames': 0, # 总处理音频帧数
'recognition_attempts': 0, # 识别尝试次数
'gunshot_detections': 0, # 枪声检测次数
'processing_times': [], # 处理时间记录
'buffer_overflow_count': 0, # 缓冲区溢出次数
'last_recognition_time': 0 # 上次识别时间
}
# 音频质量检测
# ========== 音频质量检测阈值 ==========
self.quality_thresholds = {
'min_rms': 50, # 最小RMS值
'max_rms': 30000, # 最大RMS值
'min_nonzero_ratio': 0.1, # 最小非零比例
'max_silence_ratio': 0.8 # 最大静音比例
'min_rms': 50, # 最小RMS值(避免静音)
'max_rms': 30000, # 最大RMS值(避免过载)
'min_nonzero_ratio': 0.1, # 最小非零比例(避免全静音)
'max_silence_ratio': 0.8 # 最大静音比例(避免静音过多)
}
# 预测器
self.predictor = None
self._init_predictor()
# ========== 音频分类预测器 ==========
self.predictor = None # 深度学习预测器
self._init_predictor() # 初始化预测器
self.logger.info("音频处理器初始化完成")
def _init_predictor(self):
"""初始化音频分类预测器"""
"""初始化音频分类预测器 - 加载深度学习模型"""
if not AUDIO_CLASSIFICATION_AVAILABLE:
self.logger.warning("audio-classification模块未找到将使用模拟识别")
return
try:
# 获取配置文件和模型路径
configs_path = os.path.abspath(self.recognition_config['configs_path'])
model_path = os.path.abspath(self.recognition_config['model_path'])
# 验证文件存在性
if not os.path.exists(configs_path):
self.logger.error(f"配置文件不存在: {configs_path}")
return
@ -499,6 +591,7 @@ class AudioProcessor:
self.logger.error(f"模型路径不存在: {model_path}")
return
# 创建预测器实例
self.predictor = MAClsPredictor(
configs=configs_path,
model_path=model_path,
@ -512,22 +605,29 @@ class AudioProcessor:
self.predictor = None
def add_audio_data(self, audio_data: bytes) -> bool:
"""添加音频数据到缓冲区"""
"""添加音频数据到缓冲区 - 支持线程安全的数据添加
参数
- audio_data: 原始音频字节数据
返回
- bool: 添加是否成功
"""
try:
with self.buffer_lock:
# 转换为numpy数组
# 转换为numpy数组int16格式
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 更新统计
# 更新性能统计
self.processing_stats['total_audio_bytes'] += len(audio_data)
self.processing_stats['total_audio_frames'] += len(audio_array)
# 添加到缓冲区
self.audio_buffer.extend(audio_array)
# 检查缓冲区大小
# 检查缓冲区大小,防止内存溢出
if len(self.audio_buffer) > self.max_buffer_size:
# 保留最新的数据
# 保留最新的数据,丢弃旧数据
self.audio_buffer = self.audio_buffer[-self.max_buffer_size:]
self.processing_stats['buffer_overflow_count'] += 1
self.logger.warning("音频缓冲区溢出,丢弃旧数据")
@ -539,16 +639,21 @@ class AudioProcessor:
return False
def should_process_recognition(self) -> bool:
"""判断是否应该进行识别"""
"""判断是否应该进行识别 - 基于时间和缓冲区条件
判断条件
- 时间间隔距离上次识别的时间间隔
- 缓冲区大小确保有足够的数据进行识别
"""
try:
current_time = time.time()
# 检查时间间隔
# 检查时间间隔(避免过于频繁的识别)
if (current_time - self.processing_stats['last_recognition_time'] <
self.recognition_config['recognition_interval']):
return False
# 检查缓冲区大小
# 检查缓冲区大小(确保有足够的数据)
with self.buffer_lock:
return len(self.audio_buffer) >= self.min_buffer_size
@ -557,48 +662,51 @@ class AudioProcessor:
return False
def process_recognition(self) -> Optional[Dict[str, Any]]:
"""执行音频识别"""
"""执行音频识别 - 核心识别处理函数
处理流程
1. 提取音频片段
2. 音频质量检测
3. 执行识别真实或模拟
4. 结果验证和统计
"""
try:
start_time = time.time()
# 提取音频片段
with self.buffer_lock:
# 获取音频数据
if len(self.audio_buffer) < self.min_buffer_size:
return None
# 提取识别所需的数据
recognition_samples = int(self.config.sample_rate * self.recognition_config['recognition_interval'])
audio_segment = np.array(self.audio_buffer[-recognition_samples:])
# 提取足够长度的音频数据
audio_length = int(self.config.sample_rate * self.recognition_config['recognition_interval'])
audio_segment = np.array(self.audio_buffer[:audio_length], dtype=np.float32)
# 清空缓冲区
self.audio_buffer = []
# 移除已处理的数据
self.audio_buffer = self.audio_buffer[audio_length:]
# 检查音频质量
# 音频质量检测
if not self._check_audio_quality(audio_segment):
self.logger.debug("音频质量不满足识别要求")
self.logger.warning("音频质量不满足要求,跳过识别")
return None
# 执行识别
result = self._perform_recognition(audio_segment)
recognition_result = self._perform_recognition(audio_segment)
# 更新统计
# 更新统计信息
processing_time = time.time() - start_time
self.processing_stats['processing_times'].append(processing_time)
self.processing_stats['recognition_attempts'] += 1
self.processing_stats['last_recognition_time'] = time.time()
# 保持处理时间历史记录
# 限制处理时间记录数量
if len(self.processing_stats['processing_times']) > 100:
self.processing_stats['processing_times'] = self.processing_stats['processing_times'][-100:]
if result and result.get('is_gunshot', False):
self.processing_stats['gunshot_detections'] += 1
self.logger.info(f"音频识别完成: {result}, 耗时: {processing_time:.3f}")
return result
return recognition_result
except Exception as e:
self.logger.error(f"音频识别失败: {e}")
self.logger.error(f"音频识别处理失败: {e}")
return None
def _check_audio_quality(self, audio_segment: np.ndarray) -> bool:
@ -1054,15 +1162,6 @@ class PCServer:
self.current_location = None
self.location_data_count = 0
# 新增:开发板枪声检测统计
self.board_gunshot_stats = {
'total_detections': 0,
'gunshot_detections': 0,
'detection_rate': 0.0,
'avg_confidence': 0.0,
'last_detection_time': 0
}
# 线程管理
self.threads = []
self.thread_lock = threading.Lock()
@ -1073,8 +1172,7 @@ class PCServer:
'audio_packets_received': 0,
'location_packets_received': 0,
'recognition_attempts': 0,
'errors': 0,
'board_gunshot_detections': 0 # 新增:开发板枪声检测次数
'errors': 0
}
# HTTP API相关
@ -1305,8 +1403,7 @@ class PCServer:
'audio_packets_received': 0,
'location_packets_received': 0,
'recognition_attempts': 0,
'errors': 0,
'board_gunshot_detections': 0 # 新增:开发板枪声检测次数
'errors': 0
}
self.logger.info("数据结构初始化完成")
@ -1385,11 +1482,6 @@ class PCServer:
self.connection_status['audio'] = ConnectionStatus.DISCONNECTED
break
# 检查是否是识别请求
if self._is_recognition_request(audio_data):
self._handle_recognition_request(audio_data)
continue
# 更新性能统计
self.performance_stats['audio_packets_received'] += 1
@ -1421,114 +1513,6 @@ class PCServer:
finally:
self.logger.info("音频处理线程结束")
def _is_recognition_request(self, audio_data: bytes) -> bool:
"""检查是否是识别请求"""
try:
# 检查数据开头是否包含识别请求标识
data_str = audio_data.decode('utf-8', errors='ignore')
return data_str.startswith("RECOGNITION_REQUEST:")
except:
return False
def _handle_recognition_request(self, audio_data: bytes):
"""处理识别请求"""
try:
# 解析请求头
data_str = audio_data.decode('utf-8', errors='ignore')
if not data_str.startswith("RECOGNITION_REQUEST:"):
return
# 格式: RECOGNITION_REQUEST:timestamp:data_size
parts = data_str.split(':')
if len(parts) >= 3:
timestamp = float(parts[1])
data_size = int(parts[2])
self.logger.info(f"收到识别请求: 时间戳={timestamp:.3f}, 数据大小={data_size}")
# 等待接收音频数据
audio_segment = self._receive_audio_segment(data_size)
if audio_segment is not None:
# 进行枪声识别
recognition_result = self._perform_audio_recognition(audio_segment)
# 发送识别结果给开发板
self._send_recognition_result(timestamp, recognition_result)
except Exception as e:
self.logger.error(f"处理识别请求失败: {e}")
def _receive_audio_segment(self, data_size: int) -> Optional[bytes]:
"""接收音频数据段"""
try:
# 从音频socket接收指定大小的数据
received_data = b''
remaining_size = data_size
while remaining_size > 0:
chunk = self.communication_manager.audio_socket.recv(min(remaining_size, 4096))
if not chunk:
break
received_data += chunk
remaining_size -= len(chunk)
if len(received_data) == data_size:
return received_data
else:
self.logger.warning(f"音频数据接收不完整: 期望{data_size}, 实际{len(received_data)}")
return None
except Exception as e:
self.logger.error(f"接收音频数据段失败: {e}")
return None
def _perform_audio_recognition(self, audio_data: bytes) -> Dict[str, Any]:
"""执行音频识别"""
try:
# 将字节数据转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
# 检查音频质量
if not self.audio_processor._check_audio_quality(audio_array):
return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'poor_quality'}
# 执行识别
result = self.audio_processor._perform_recognition(audio_array)
if result:
return {
'is_gunshot': result.get('is_gunshot', False),
'confidence': result.get('score', 0.0),
'label': result.get('label', 'unknown')
}
else:
return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'recognition_failed'}
except Exception as e:
self.logger.error(f"音频识别失败: {e}")
return {'is_gunshot': False, 'confidence': 0.0, 'reason': 'error'}
def _send_recognition_result(self, timestamp: float, result: Dict[str, Any]):
"""发送识别结果给开发板"""
try:
if not self.communication_manager.cmd_socket:
return
# 构建识别结果字符串
# 格式: RECOGNITION_RESULT:timestamp:is_gunshot:confidence
is_gunshot = result.get('is_gunshot', False)
confidence = result.get('confidence', 0.0)
result_str = f"RECOGNITION_RESULT:{timestamp:.3f}:{str(is_gunshot).lower()}:{confidence:.3f}"
# 发送结果
self.communication_manager.cmd_socket.send(result_str.encode('utf-8'))
self.logger.info(f"发送识别结果: 时间戳={timestamp:.3f}, 枪声={is_gunshot}, 置信度={confidence:.3f}")
except Exception as e:
self.logger.error(f"发送识别结果失败: {e}")
def _switch_to_location_mode(self):
"""切换到定位模式"""
try:
@ -1611,19 +1595,13 @@ class PCServer:
data_str = data.decode('utf-8').strip()
if not data_str:
return
# 检查是否是枪声检测结果数据(保留兼容性)
if data_str.startswith("GUNSHOT_DETECTION:"):
self._process_gunshot_detection_data(data_str)
return
# 解析定位数据
# 解析数据
location_data = self._parse_location_data(data_str)
if location_data:
# 应用卡尔曼滤波
filtered_data = self._apply_kalman_filter(location_data)
# 后处理(平滑、异常值剔除等)
processed_data = self.location_post_processor.process(filtered_data)
processed_data = location_post_processor.process(filtered_data)
# 将数据放入队列
self.location_queue.put(processed_data)
self.location_data_count += 1
@ -1635,49 +1613,6 @@ class PCServer:
self.logger.error(f"处理定位数据失败: {e}")
self.performance_stats['errors'] += 1
def _process_gunshot_detection_data(self, data_str: str):
"""处理开发板枪声检测数据"""
try:
# 解析格式: GUNSHOT_DETECTION:is_gunshot:confidence:timestamp
parts = data_str.split(':')
if len(parts) >= 4:
is_gunshot = parts[1].lower() == 'true'
confidence = float(parts[2])
timestamp = float(parts[3])
# 更新统计
self.board_gunshot_stats['total_detections'] += 1
if is_gunshot:
self.board_gunshot_stats['gunshot_detections'] += 1
self.performance_stats['board_gunshot_detections'] += 1
self.board_gunshot_stats['last_detection_time'] = timestamp
self.logger.info(f"开发板检测到枪声!置信度: {confidence:.3f}")
# 更新检测率
total = self.board_gunshot_stats['total_detections']
gunshots = self.board_gunshot_stats['gunshot_detections']
self.board_gunshot_stats['detection_rate'] = gunshots / total if total > 0 else 0.0
# 更新平均置信度
if 'avg_confidence' not in self.board_gunshot_stats:
self.board_gunshot_stats['avg_confidence'] = confidence
else:
# 指数移动平均
alpha = 0.1
self.board_gunshot_stats['avg_confidence'] = (
alpha * confidence +
(1 - alpha) * self.board_gunshot_stats['avg_confidence']
)
self.logger.debug(f"开发板枪声检测统计: 总数={total}, 枪声={gunshots}, "
f"检测率={self.board_gunshot_stats['detection_rate']:.3f}, "
f"平均置信度={self.board_gunshot_stats['avg_confidence']:.3f}")
except Exception as e:
self.logger.error(f"处理开发板枪声检测数据失败: {e}")
self.performance_stats['errors'] += 1
def _parse_location_data(self, data_str: str) -> Optional[LocationData]:
"""解析定位数据"""
try:
@ -1786,12 +1721,6 @@ class PCServer:
f"错误: {self.performance_stats['errors']} | "
f"模式: {self.current_mode.value}")
# 添加开发板枪声检测统计
if self.current_mode == SystemMode.LOCATING:
board_stats = self.board_gunshot_stats
status_text += f" | 开发板检测: {board_stats['gunshot_detections']}/{board_stats['total_detections']} "
status_text += f"({board_stats['detection_rate']:.1%})"
# 在图形上显示状态信息
if hasattr(self, 'status_text_obj'):
self.status_text_obj.remove()
@ -1820,8 +1749,7 @@ class PCServer:
'total_packets': self.location_data_count,
'history_size': len(self.location_history),
'queue_size': self.location_queue.qsize()
},
'board_gunshot_stats': self.board_gunshot_stats.copy() # 新增:开发板枪声检测统计
}
}
def run(self):
@ -1939,8 +1867,7 @@ class PCServer:
"endpoints": {
"/data": "获取最新定位数据",
"/status": "获取系统状态",
"/stats": "获取性能统计",
"/board_stats": "获取开发板枪声检测统计" # 新增
"/stats": "获取性能统计"
}
})
@ -1994,15 +1921,6 @@ class PCServer:
self.logger.error(f"获取性能统计失败: {e}")
return jsonify({"error": "获取统计失败"}), 500
@self.flask_app.route("/board_stats")
def get_board_stats():
"""获取开发板枪声检测统计"""
try:
return jsonify(self.board_gunshot_stats)
except Exception as e:
self.logger.error(f"获取开发板统计失败: {e}")
return jsonify({"error": "获取统计失败"}), 500
self.logger.info("Flask路由设置完成")
except Exception as e:

Loading…
Cancel
Save