From 14dbf9269baa776a77fcee2ef346ea70b7759abf Mon Sep 17 00:00:00 2001 From: wangjing22e Date: Mon, 14 Jul 2025 11:26:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=A3=E7=A0=81=E7=BC=BA?= =?UTF-8?q?=E9=99=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../back-code/development_board.py | 514 ++++++++++++++++++ src/声源定位代码/back-code/pc_server.py | 263 ++++++++- .../front-code/sound-vue-frontend/src/App.vue | 75 +++ 3 files changed, 844 insertions(+), 8 deletions(-) diff --git a/src/声源定位代码/back-code/development_board.py b/src/声源定位代码/back-code/development_board.py index d4eba11..6a4cf58 100644 --- a/src/声源定位代码/back-code/development_board.py +++ b/src/声源定位代码/back-code/development_board.py @@ -24,6 +24,7 @@ import json # JSON数据格式处理 import os # 操作系统接口,文件系统操作 import sys # 系统相关参数和函数 import traceback # 异常追踪信息 +import struct # 二进制数据打包和解包 from machine import UART, Timer, PWM # K210硬件接口:串口、定时器、PWM from fpioa_manager import fm # K210引脚管理器 from Maix import GPIO, MIC_ARRAY as mic # K210 GPIO和麦克风阵列 @@ -1839,6 +1840,519 @@ class DevelopmentBoard: except Exception as e: self.logger.error(f"硬件关闭失败: {e}") + + def run(self): + """开发板主运行循环 - 实现完整的音频采集和传输流程""" + self.logger.info("=== 开发板主循环启动 ===") + self.logger.info(f"当前模式: {self.current_mode.value}") + + try: + # 确保网络连接正常 + if not self._ensure_network_connection(): + self.logger.error("网络连接失败,无法启动主循环") + return + + # 主循环 + while self.running: + try: + if self.current_mode == SystemMode.RECORDING: + self._audio_recording_loop() + elif self.current_mode == SystemMode.LOCATING: + self._location_processing_loop() + elif self.current_mode == SystemMode.ERROR: + self._error_recovery_loop() + elif self.current_mode == SystemMode.SHUTDOWN: + break + else: + time.sleep(0.1) + + except Exception as e: + self.logger.error(f"主循环异常: {e}") + self._switch_mode(SystemMode.ERROR) + + except KeyboardInterrupt: + self.logger.info("收到中断信号,准备关闭...") + except Exception as e: + self.logger.error(f"主循环严重异常: {e}") + finally: + self._cleanup_resources() + + def _ensure_network_connection(self) -> bool: + """确保网络连接正常""" + try: + # 检查WiFi连接 + if self.wifi_status != WiFiStatus.CONNECTED: + if not self._connect_wifi(): + return False + + # 检查Socket连接 + if self.connection_status != ConnectionStatus.CONNECTED: + if not self._setup_socket_connections(): + return False + + return True + + except Exception as e: + self.logger.error(f"网络连接检查失败: {e}") + return False + + def _connect_wifi(self) -> bool: + """连接WiFi网络""" + try: + self.logger.info(f"正在连接WiFi: {self.network_config.wifi_ssid}") + self.wifi_status = WiFiStatus.CONNECTING + + # 初始化WiFi + self.nic = network.WLAN(network.STA_IF) + self.nic.active(True) + + # 连接WiFi + self.nic.connect(self.network_config.wifi_ssid, self.network_config.wifi_password) + + # 等待连接 + start_time = time.time() + while not self.nic.isconnected(): + if time.time() - start_time > self.network_config.connection_timeout: + raise WiFiConnectionError("WiFi连接超时", self.network_config.wifi_ssid) + time.sleep(1) + + # 获取网络信息 + ip_info = self.nic.ifconfig() + self.logger.info(f"WiFi连接成功 - IP: {ip_info[0]}") + self.wifi_status = WiFiStatus.CONNECTED + self.wifi_connection_attempts = 0 + + return True + + except Exception as e: + self.wifi_connection_attempts += 1 + self.logger.error(f"WiFi连接失败 (第{self.wifi_connection_attempts}次): {e}") + self.wifi_status = WiFiStatus.ERROR + + if self.wifi_connection_attempts >= self.network_config.max_reconnect_attempts: + self.logger.error("WiFi连接尝试次数已达上限") + return False + + return False + + def _setup_socket_connections(self) -> bool: + """建立Socket连接""" + try: + self.logger.info("正在建立Socket连接...") + self.connection_status = ConnectionStatus.CONNECTING + + # 创建音频数据Socket + self.audio_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.audio_socket.settimeout(self.network_config.socket_timeout) + self.audio_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_audio)) + + # 创建指令Socket + self.cmd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.cmd_socket.settimeout(self.network_config.socket_timeout) + self.cmd_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_cmd)) + + # 创建定位数据Socket + self.location_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.location_socket.settimeout(self.network_config.socket_timeout) + self.location_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_location)) + + self.connection_status = ConnectionStatus.CONNECTED + self.socket_connection_attempts = 0 + self.logger.info("Socket连接建立成功") + + return True + + except Exception as e: + self.socket_connection_attempts += 1 + self.logger.error(f"Socket连接失败 (第{self.socket_connection_attempts}次): {e}") + self.connection_status = ConnectionStatus.ERROR + + # 关闭失败的Socket + self._close_sockets() + + if self.socket_connection_attempts >= self.network_config.max_reconnect_attempts: + self.logger.error("Socket连接尝试次数已达上限") + return False + + return False + + def _close_sockets(self): + """关闭所有Socket连接""" + try: + for socket_name, sock in [('audio', self.audio_socket), + ('cmd', self.cmd_socket), + ('location', self.location_socket)]: + if sock: + try: + sock.close() + self.logger.debug(f"关闭{socket_name} Socket") + except Exception as e: + self.logger.error(f"关闭{socket_name} Socket失败: {e}") + + self.audio_socket = None + self.cmd_socket = None + self.location_socket = None + + except Exception as e: + self.logger.error(f"关闭Socket失败: {e}") + + def _audio_recording_loop(self): + """音频录音循环 - 第一阶段的核心实现""" + try: + self.logger.debug("开始音频录音循环") + + # 1. 采集麦克风阵列音频数据 + audio_data = self._capture_audio_data() + if audio_data is None: + self.logger.warning("音频数据采集失败") + return + + # 2. 音频预处理 + processed_audio = self._preprocess_audio(audio_data) + if processed_audio is None: + self.logger.warning("音频预处理失败") + return + + # 3. 网络传输到PC服务器 + if not self._send_audio_data(processed_audio): + self.logger.error("音频数据发送失败") + return + + # 4. 监听PC端指令 + self._listen_for_commands() + + # 5. 发送心跳包 + self._send_heartbeat_if_needed() + + # 6. 更新性能统计 + self.performance_monitor.increment('audio_packets_sent') + + # 控制循环频率 + time.sleep(self.system_config.location_update_interval) + + except Exception as e: + self.logger.error(f"音频录音循环异常: {e}") + self.performance_monitor.increment('errors') + + def _capture_audio_data(self) -> Optional[bytes]: + """采集麦克风阵列音频数据""" + try: + # 使用麦克风阵列采集音频 + # 假设mic.record()返回原始音频数据 + audio_data = mic.record(self.audio_config.chunk_size) + + if audio_data is None or len(audio_data) == 0: + self.logger.warning("麦克风阵列返回空数据") + return None + + # 转换为字节格式 + if isinstance(audio_data, (list, tuple)): + # 如果是数值列表,转换为字节 + audio_bytes = b'' + for sample in audio_data: + audio_bytes += struct.pack(' Optional[bytes]: + """音频预处理""" + try: + # 1. 数据有效性检查 + if len(audio_data) < self.audio_config.chunk_size * 2: # 假设16位采样 + self.logger.warning("音频数据长度不足") + return None + + # 2. 音频格式转换(如果需要) + # 这里可以添加音频格式转换逻辑 + + # 3. 添加时间戳 + timestamp = time.time() + header = struct.pack(' bool: + """发送音频数据到PC服务器""" + try: + if self.audio_socket is None: + self.logger.error("音频Socket未连接") + return False + + # 发送数据 + self.audio_socket.send(audio_data) + + self.logger.debug(f"音频数据发送成功: {len(audio_data)} bytes") + return True + + except socket.error as e: + self.logger.error(f"音频数据发送失败: {e}") + self.connection_status = ConnectionStatus.ERROR + return False + except Exception as e: + self.logger.error(f"音频数据发送异常: {e}") + return False + + def _listen_for_commands(self): + """监听PC端指令""" + try: + if self.cmd_socket is None: + return + + # 设置非阻塞模式 + self.cmd_socket.settimeout(0.01) # 10ms超时 + + try: + data = self.cmd_socket.recv(1024) + if data: + command = data.decode('utf-8').strip() + self.logger.info(f"收到PC端指令: {command}") + + # 处理指令 + self._process_command(command) + + except socket.timeout: + # 超时是正常的,继续循环 + pass + except Exception as e: + self.logger.error(f"指令接收异常: {e}") + + except Exception as e: + self.logger.error(f"指令监听失败: {e}") + + def _process_command(self, command: str): + """处理PC端指令""" + try: + if command == "START_LOCATION": + self.logger.info("收到切换到定位模式指令") + self._switch_mode(SystemMode.LOCATING) + + elif command == "STOP_LOCATION": + self.logger.info("收到停止定位模式指令") + self._switch_mode(SystemMode.RECORDING) + + elif command == "SHUTDOWN": + self.logger.info("收到关闭系统指令") + self._switch_mode(SystemMode.SHUTDOWN) + + elif command == "HEARTBEAT": + self.logger.debug("收到心跳包") + self._send_heartbeat() + + else: + self.logger.warning(f"未知指令: {command}") + + except Exception as e: + self.logger.error(f"指令处理失败: {e}") + + def _send_heartbeat_if_needed(self): + """发送心跳包(如果需要)""" + try: + current_time = time.time() + if current_time - self.last_heartbeat > self.system_config.heartbeat_interval: + self._send_heartbeat() + self.last_heartbeat = current_time + + except Exception as e: + self.logger.error(f"心跳包发送失败: {e}") + + def _send_heartbeat(self): + """发送心跳包""" + try: + if self.cmd_socket is None: + return + + heartbeat_data = { + 'type': 'heartbeat', + 'timestamp': time.time(), + 'mode': self.current_mode.value, + 'status': self.system_status, + 'uptime': time.time() - self.start_time, + 'memory': gc.mem_free(), + 'error_count': self.error_count + } + + heartbeat_json = json.dumps(heartbeat_data) + self.cmd_socket.send(heartbeat_json.encode('utf-8')) + + self.logger.debug("心跳包发送成功") + + except Exception as e: + self.logger.error(f"心跳包发送失败: {e}") + + def _location_processing_loop(self): + """定位处理循环 - 第二阶段的核心实现""" + try: + self.logger.debug("开始定位处理循环") + + # 1. 采集麦克风阵列数据 + audio_data = self._capture_audio_data() + if audio_data is None: + return + + # 2. 声源定位计算 + location_data = self._calculate_source_location(audio_data) + if location_data is None: + return + + # 3. 发送定位数据到PC服务器 + if not self._send_location_data(location_data): + self.logger.error("定位数据发送失败") + return + + # 4. 监听PC端指令 + self._listen_for_commands() + + # 5. 更新性能统计 + self.performance_monitor.increment('location_packets_sent') + + # 控制循环频率 + time.sleep(self.system_config.location_update_interval) + + except Exception as e: + self.logger.error(f"定位处理循环异常: {e}") + self.performance_monitor.increment('errors') + + def _calculate_source_location(self, audio_data: bytes) -> Optional[LocationData]: + """计算声源位置""" + try: + # 创建声源定位器 + locator = SoundSourceLocator(self.hardware_config) + + # 将字节数据转换为数值列表 + audio_values = self._bytes_to_audio_values(audio_data) + + # 进行声源定位 + location = locator.process_audio_frame(audio_values) + + if location: + self.logger.debug(f"声源定位成功: X={location.x:.2f}, Y={location.y:.2f}") + return location + else: + self.logger.debug("声源定位失败") + return None + + except Exception as e: + self.logger.error(f"声源定位计算失败: {e}") + return None + + def _bytes_to_audio_values(self, audio_bytes: bytes) -> List[float]: + """将字节数据转换为音频数值列表""" + try: + audio_values = [] + + # 假设音频数据是16位整数格式 + for i in range(0, len(audio_bytes), 2): + if i + 1 < len(audio_bytes): + sample = struct.unpack(' bool: + """发送定位数据到PC服务器""" + try: + if self.location_socket is None: + self.logger.error("定位Socket未连接") + return False + + # 转换为字符串格式发送 + location_str = f"{location_data.x:.3f},{location_data.y:.3f},{location_data.strength:.3f},{location_data.angle:.3f}" + self.location_socket.send(location_str.encode('utf-8')) + + self.logger.debug(f"定位数据发送成功: {location_str}") + return True + + except socket.error as e: + self.logger.error(f"定位数据发送失败: {e}") + self.connection_status = ConnectionStatus.ERROR + return False + except Exception as e: + self.logger.error(f"定位数据发送异常: {e}") + return False + + def _error_recovery_loop(self): + """错误恢复循环""" + try: + self.logger.info("进入错误恢复模式") + + # 尝试重新连接网络 + if self._ensure_network_connection(): + self.logger.info("网络连接恢复成功") + self._switch_mode(SystemMode.RECORDING) + else: + self.logger.warning("网络连接恢复失败,继续尝试...") + time.sleep(5) # 等待5秒后重试 + + except Exception as e: + self.logger.error(f"错误恢复失败: {e}") + + def _cleanup_resources(self): + """清理资源""" + try: + self.logger.info("开始清理资源...") + + # 关闭Socket连接 + self._close_sockets() + + # 关闭WiFi + if self.nic: + self.nic.active(False) + + # 关闭硬件组件 + self._shutdown_hardware() + + # 停止定时器 + if self.heartbeat_timer: + self.heartbeat_timer.stop() + if self.health_check_timer: + self.health_check_timer.stop() + + self.logger.info("资源清理完成") + + except Exception as e: + self.logger.error(f"资源清理失败: {e}") + + def _get_mic_direction(self) -> Optional[LocationData]: + """获取麦克风方向 - 用于测试麦克风阵列功能""" + try: + # 采集一小段音频数据进行测试 + test_audio = self._capture_audio_data() + if test_audio is None: + return None + + # 转换为音频数值 + audio_values = self._bytes_to_audio_values(test_audio) + if not audio_values: + return None + + # 使用声源定位器进行测试 + locator = SoundSourceLocator(self.hardware_config) + test_location = locator.process_audio_frame(audio_values) + + return test_location + + except Exception as e: + self.logger.error(f"麦克风方向测试失败: {e}") + return None # ========== 声源定位核心算法 ========== class KalmanFilter: diff --git a/src/声源定位代码/back-code/pc_server.py b/src/声源定位代码/back-code/pc_server.py index 0e74745..b18930d 100644 --- a/src/声源定位代码/back-code/pc_server.py +++ b/src/声源定位代码/back-code/pc_server.py @@ -197,6 +197,164 @@ class NetworkConfig: raise ValueError("超时时间必须大于0") return True +@dataclass +class LocationRecord: + """声源定位记录数据结构""" + location_data: LocationData + audio_data: bytes + timestamp: float + gunshot_detected: bool = False + gunshot_confidence: float = 0.0 + gunshot_label: str = "" + is_saved: bool = False + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + 'location': { + 'x': self.location_data.x, + 'y': self.location_data.y, + 'strength': self.location_data.strength, + 'angle': self.location_data.angle, + 'confidence': self.location_data.confidence, + 'timestamp': self.location_data.timestamp + }, + 'audio_data_size': len(self.audio_data), + 'timestamp': self.timestamp, + 'gunshot_detected': self.gunshot_detected, + 'gunshot_confidence': self.gunshot_confidence, + 'gunshot_label': self.gunshot_label, + 'is_saved': self.is_saved + } + +class LocationDataRecorder: + """声源定位数据记录器 - 管理定位数据和音频数据的记录""" + + def __init__(self, save_directory: str = "location_records"): + self.save_directory = save_directory + self.logger = setup_logging("LocationDataRecorder") + self.records = [] # 内存中的记录 + self.max_memory_records = 1000 # 最大内存记录数 + self.gunshot_records = [] # 枪声记录 + self.non_gunshot_records = [] # 非枪声记录 + + # 确保保存目录存在 + os.makedirs(save_directory, exist_ok=True) + os.makedirs(os.path.join(save_directory, "gunshot"), exist_ok=True) + os.makedirs(os.path.join(save_directory, "non_gunshot"), exist_ok=True) + + self.logger.info(f"声源定位数据记录器初始化完成,保存目录: {save_directory}") + + def add_record(self, location_data: LocationData, audio_data: bytes) -> LocationRecord: + """添加新的定位记录""" + try: + record = LocationRecord( + location_data=location_data, + audio_data=audio_data, + timestamp=time.time() + ) + + # 添加到内存记录 + self.records.append(record) + + # 限制内存记录数量 + if len(self.records) > self.max_memory_records: + self.records = self.records[-self.max_memory_records:] + + self.logger.debug(f"添加定位记录: X={location_data.x:.2f}, Y={location_data.y:.2f}") + return record + + except Exception as e: + self.logger.error(f"添加定位记录失败: {e}") + return None + + def process_gunshot_detection(self, record: LocationRecord, audio_processor) -> bool: + """处理枪声检测""" + try: + # 使用音频处理器进行枪声识别 + if audio_processor.add_audio_data(record.audio_data): + if audio_processor.should_process_recognition(): + result = audio_processor.process_recognition() + + if result and result.get('is_gunshot', False): + # 检测到枪声 + record.gunshot_detected = True + record.gunshot_confidence = result.get('score', 0.0) + record.gunshot_label = result.get('label', 'gunshot') + + # 保存枪声记录 + self._save_gunshot_record(record) + self.gunshot_records.append(record) + + self.logger.warning(f"检测到枪声!置信度: {record.gunshot_confidence:.4f}, " + f"位置: X={record.location_data.x:.2f}, Y={record.location_data.y:.2f}") + return True + else: + # 不是枪声,丢弃记录 + record.gunshot_detected = False + record.gunshot_confidence = result.get('score', 0.0) if result else 0.0 + record.gunshot_label = result.get('label', 'unknown') if result else 'unknown' + + self.logger.debug(f"非枪声音频,丢弃定位数据: 置信度={record.gunshot_confidence:.4f}") + return False + + return False + + except Exception as e: + self.logger.error(f"枪声检测处理失败: {e}") + return False + + def _save_gunshot_record(self, record: LocationRecord): + """保存枪声记录""" + try: + timestamp_str = time.strftime("%Y%m%d_%H%M%S", time.localtime(record.timestamp)) + filename = f"gunshot_{timestamp_str}_{record.gunshot_confidence:.4f}.json" + filepath = os.path.join(self.save_directory, "gunshot", filename) + + # 保存记录信息 + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(record.to_dict(), f, indent=2, ensure_ascii=False) + + # 保存音频数据 + audio_filename = f"gunshot_{timestamp_str}_{record.gunshot_confidence:.4f}.wav" + audio_filepath = os.path.join(self.save_directory, "gunshot", audio_filename) + + # 这里可以添加音频文件保存逻辑 + # 例如使用scipy.io.wavfile.write保存为WAV文件 + + record.is_saved = True + self.logger.info(f"枪声记录已保存: {filepath}") + + except Exception as e: + self.logger.error(f"保存枪声记录失败: {e}") + + def get_gunshot_records(self) -> List[LocationRecord]: + """获取枪声记录""" + return self.gunshot_records.copy() + + def get_recent_records(self, count: int = 100) -> List[LocationRecord]: + """获取最近的记录""" + return self.records[-count:] if self.records else [] + + def get_statistics(self) -> Dict[str, Any]: + """获取记录统计信息""" + try: + total_records = len(self.records) + gunshot_count = len(self.gunshot_records) + non_gunshot_count = total_records - gunshot_count + + return { + 'total_records': total_records, + 'gunshot_records': gunshot_count, + 'non_gunshot_records': non_gunshot_count, + 'gunshot_ratio': gunshot_count / max(total_records, 1), + 'save_directory': self.save_directory, + 'max_memory_records': self.max_memory_records + } + except Exception as e: + self.logger.error(f"获取统计信息失败: {e}") + return {} + # ========== 日志配置 ========== def setup_logging(log_level: str = "INFO", log_file: str = "pc_server.log") -> logging.Logger: """设置日志系统 - 配置分级日志记录和文件轮转 @@ -1197,6 +1355,9 @@ class PCServer: self.logger.error(traceback.format_exc()) self.current_mode = SystemMode.ERROR raise + + # 添加定位数据记录器 + self.location_recorder = LocationDataRecorder() def _load_recognition_config(self) -> Dict[str, Any]: """加载识别配置""" @@ -1468,7 +1629,7 @@ class PCServer: raise def _audio_processing_worker(self): - """音频处理工作线程""" + """音频处理工作线程 - 修改版本,保存音频数据用于定位记录""" self.logger.info("音频处理线程开始运行") try: @@ -1482,6 +1643,9 @@ class PCServer: self.connection_status['audio'] = ConnectionStatus.DISCONNECTED break + # 保存最近的音频数据用于定位记录 + self.recent_audio_data = audio_data + # 更新性能统计 self.performance_stats['audio_packets_received'] += 1 @@ -1590,29 +1754,71 @@ class PCServer: self.logger.info("定位数据处理线程结束") def _process_location_data(self, data: bytes): - """处理定位数据""" + """处理定位数据 - 修改后的版本,包含枪声检测和记录管理""" try: data_str = data.decode('utf-8').strip() if not data_str: return - # 解析数据 + + # 解析定位数据 location_data = self._parse_location_data(data_str) - if location_data: + if not location_data: + return + + # 获取对应的音频数据(从开发板发送的音频数据) + audio_data = self._get_corresponding_audio_data(location_data.timestamp) + + # 创建定位记录 + record = self.location_recorder.add_record(location_data, audio_data) + if not record: + return + + # 进行枪声检测 + is_gunshot = self.location_recorder.process_gunshot_detection(record, self.audio_processor) + + if is_gunshot: + # 检测到枪声,保留定位数据 + self.logger.warning(f"检测到枪声!保留定位数据: X={location_data.x:.2f}, Y={location_data.y:.2f}") + # 应用卡尔曼滤波 filtered_data = self._apply_kalman_filter(location_data) + # 后处理(平滑、异常值剔除等) processed_data = location_post_processor.process(filtered_data) - # 将数据放入队列 + + # 将数据放入队列用于可视化 self.location_queue.put(processed_data) self.location_data_count += 1 + # 添加到历史记录 self._add_to_history(processed_data) - self.logger.debug(f"接收定位数据(后处理): X={processed_data.x:.2f}, Y={processed_data.y:.2f}, " - f"强度={processed_data.strength:.2f}, 角度={processed_data.angle:.2f}") + + self.logger.info(f"枪声定位数据已保存并显示: X={processed_data.x:.2f}, Y={processed_data.y:.2f}") + + else: + # 不是枪声,丢弃定位数据 + self.logger.debug(f"非枪声音频,丢弃定位数据: X={location_data.x:.2f}, Y={location_data.y:.2f}") + # 不添加到历史记录和可视化队列 + except Exception as e: self.logger.error(f"处理定位数据失败: {e}") self.performance_stats['errors'] += 1 + def _get_corresponding_audio_data(self, location_timestamp: float) -> bytes: + """获取对应的音频数据""" + try: + # 这里需要实现从音频缓冲区获取对应时间戳的音频数据 + # 简单实现:返回最近的音频数据 + if hasattr(self, 'recent_audio_data') and self.recent_audio_data: + return self.recent_audio_data + else: + # 如果没有音频数据,返回空字节 + return b'' + + except Exception as e: + self.logger.error(f"获取对应音频数据失败: {e}") + return b'' + def _parse_location_data(self, data_str: str) -> Optional[LocationData]: """解析定位数据""" try: @@ -1914,13 +2120,37 @@ class PCServer: stats = { "system_stats": self.get_system_status(), "audio_processor_stats": self.audio_processor.get_performance_stats(), - "communication_stats": self.communication_manager.get_connection_stats() + "communication_stats": self.communication_manager.get_connection_stats(), + "location_records_stats": self.get_location_records_stats() } return jsonify(stats) except Exception as e: self.logger.error(f"获取性能统计失败: {e}") return jsonify({"error": "获取统计失败"}), 500 + @self.flask_app.route("/gunshot_records") + def get_gunshot_records(): + """获取枪声记录列表""" + try: + records = self.get_gunshot_records() + return jsonify({ + "gunshot_records": records, + "count": len(records) + }) + except Exception as e: + self.logger.error(f"获取枪声记录失败: {e}") + return jsonify({"error": "获取枪声记录失败"}), 500 + + @self.flask_app.route("/location_records_stats") + def get_location_records_stats(): + """获取定位记录统计信息""" + try: + stats = self.get_location_records_stats() + return jsonify(stats) + except Exception as e: + self.logger.error(f"获取定位记录统计失败: {e}") + return jsonify({"error": "获取定位记录统计失败"}), 500 + self.logger.info("Flask路由设置完成") except Exception as e: @@ -1963,6 +2193,23 @@ class PCServer: self.logger.info("Flask服务器已停止") except Exception as e: self.logger.error(f"停止Flask服务器失败: {e}") + + def get_location_records_stats(self) -> Dict[str, Any]: + """获取定位记录统计信息""" + try: + return self.location_recorder.get_statistics() + except Exception as e: + self.logger.error(f"获取定位记录统计失败: {e}") + return {} + + def get_gunshot_records(self) -> List[Dict[str, Any]]: + """获取枪声记录列表""" + try: + records = self.location_recorder.get_gunshot_records() + return [record.to_dict() for record in records] + except Exception as e: + self.logger.error(f"获取枪声记录失败: {e}") + return [] def main(): """主函数""" diff --git a/src/声源定位代码/front-code/sound-vue-frontend/src/App.vue b/src/声源定位代码/front-code/sound-vue-frontend/src/App.vue index 2bbe379..eb61f13 100644 --- a/src/声源定位代码/front-code/sound-vue-frontend/src/App.vue +++ b/src/声源定位代码/front-code/sound-vue-frontend/src/App.vue @@ -30,6 +30,42 @@ {{ sourceData.angle.toFixed(2) }}° + +
+

枪声记录统计

+ + {{ recordsStats.total_records || 0 }} + {{ recordsStats.gunshot_records || 0 }} + {{ recordsStats.non_gunshot_records || 0 }} + {{ ((recordsStats.gunshot_ratio || 0) * 100).toFixed(2) }}% + +
+ +
+

最近枪声记录

+ + + + + + + + + + + + + + +
@@ -63,11 +99,20 @@ export default { isMonitoring: false, pollingInterval: null, API_BASE_URL: 'http://127.0.0.1:5000', // 默认后端地址,如需修改请在此处更改 + recordsStats: { + total_records: 0, + gunshot_records: 0, + non_gunshot_records: 0, + gunshot_ratio: 0 + }, + gunshotRecords: [] }; }, mounted() { this.initChart(); this.checkConnection(); + this.fetchRecordsStats(); + this.fetchGunshotRecords(); }, beforeUnmount() { if (this.pollingInterval) { @@ -230,6 +275,8 @@ export default { // 每 500ms 获取一次数据 this.pollingInterval = setInterval(() => { this.fetchSourceData(); + this.fetchRecordsStats(); + this.fetchGunshotRecords(); }, 500); }, stopMonitoring() { @@ -251,6 +298,28 @@ export default { console.error('获取声源数据失败:', error); this.connectionStatus = false; }); + }, + fetchRecordsStats() { + axios.get(`${this.API_BASE_URL}/location_records_stats`) + .then(response => { + this.recordsStats = response.data; + }) + .catch(error => { + console.error('获取记录统计失败:', error); + }); + }, + fetchGunshotRecords() { + axios.get(`${this.API_BASE_URL}/gunshot_records`) + .then(response => { + this.gunshotRecords = response.data.gunshot_records || []; + }) + .catch(error => { + console.error('获取枪声记录失败:', error); + }); + }, + formatTimestamp(timestamp) { + const date = new Date(timestamp * 1000); + return date.toLocaleTimeString(); } } } @@ -296,6 +365,8 @@ export default { display: flex; flex-direction: column; gap: 1.5rem; + max-height: 80vh; + overflow-y: auto; } .visualization-panel { @@ -317,6 +388,10 @@ export default { margin: 1rem 0; } +.gunshot-records, .gunshot-list { + margin-top: 1rem; +} + .app-footer { background-color: #304156; color: #a7b0bc; -- 2.34.1