更新缺陷 #16

Merged
psf4lx3ga merged 1 commits from wangjing into main 11 months ago

@ -24,6 +24,7 @@ import json # JSON数据格式处理
import os # 操作系统接口,文件系统操作
import sys # 系统相关参数和函数
import traceback # 异常追踪信息
import struct # 二进制数据打包和解包
from machine import UART, Timer, PWM # K210硬件接口串口、定时器、PWM
from fpioa_manager import fm # K210引脚管理器
from Maix import GPIO, MIC_ARRAY as mic # K210 GPIO和麦克风阵列
@ -1839,6 +1840,519 @@ class DevelopmentBoard:
except Exception as e:
self.logger.error(f"硬件关闭失败: {e}")
def run(self):
"""开发板主运行循环 - 实现完整的音频采集和传输流程"""
self.logger.info("=== 开发板主循环启动 ===")
self.logger.info(f"当前模式: {self.current_mode.value}")
try:
# 确保网络连接正常
if not self._ensure_network_connection():
self.logger.error("网络连接失败,无法启动主循环")
return
# 主循环
while self.running:
try:
if self.current_mode == SystemMode.RECORDING:
self._audio_recording_loop()
elif self.current_mode == SystemMode.LOCATING:
self._location_processing_loop()
elif self.current_mode == SystemMode.ERROR:
self._error_recovery_loop()
elif self.current_mode == SystemMode.SHUTDOWN:
break
else:
time.sleep(0.1)
except Exception as e:
self.logger.error(f"主循环异常: {e}")
self._switch_mode(SystemMode.ERROR)
except KeyboardInterrupt:
self.logger.info("收到中断信号,准备关闭...")
except Exception as e:
self.logger.error(f"主循环严重异常: {e}")
finally:
self._cleanup_resources()
def _ensure_network_connection(self) -> bool:
"""确保网络连接正常"""
try:
# 检查WiFi连接
if self.wifi_status != WiFiStatus.CONNECTED:
if not self._connect_wifi():
return False
# 检查Socket连接
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._setup_socket_connections():
return False
return True
except Exception as e:
self.logger.error(f"网络连接检查失败: {e}")
return False
def _connect_wifi(self) -> bool:
"""连接WiFi网络"""
try:
self.logger.info(f"正在连接WiFi: {self.network_config.wifi_ssid}")
self.wifi_status = WiFiStatus.CONNECTING
# 初始化WiFi
self.nic = network.WLAN(network.STA_IF)
self.nic.active(True)
# 连接WiFi
self.nic.connect(self.network_config.wifi_ssid, self.network_config.wifi_password)
# 等待连接
start_time = time.time()
while not self.nic.isconnected():
if time.time() - start_time > self.network_config.connection_timeout:
raise WiFiConnectionError("WiFi连接超时", self.network_config.wifi_ssid)
time.sleep(1)
# 获取网络信息
ip_info = self.nic.ifconfig()
self.logger.info(f"WiFi连接成功 - IP: {ip_info[0]}")
self.wifi_status = WiFiStatus.CONNECTED
self.wifi_connection_attempts = 0
return True
except Exception as e:
self.wifi_connection_attempts += 1
self.logger.error(f"WiFi连接失败 (第{self.wifi_connection_attempts}次): {e}")
self.wifi_status = WiFiStatus.ERROR
if self.wifi_connection_attempts >= self.network_config.max_reconnect_attempts:
self.logger.error("WiFi连接尝试次数已达上限")
return False
return False
def _setup_socket_connections(self) -> bool:
"""建立Socket连接"""
try:
self.logger.info("正在建立Socket连接...")
self.connection_status = ConnectionStatus.CONNECTING
# 创建音频数据Socket
self.audio_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.audio_socket.settimeout(self.network_config.socket_timeout)
self.audio_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_audio))
# 创建指令Socket
self.cmd_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.cmd_socket.settimeout(self.network_config.socket_timeout)
self.cmd_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_cmd))
# 创建定位数据Socket
self.location_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.location_socket.settimeout(self.network_config.socket_timeout)
self.location_socket.connect((self.network_config.pc_ip, self.network_config.pc_port_location))
self.connection_status = ConnectionStatus.CONNECTED
self.socket_connection_attempts = 0
self.logger.info("Socket连接建立成功")
return True
except Exception as e:
self.socket_connection_attempts += 1
self.logger.error(f"Socket连接失败 (第{self.socket_connection_attempts}次): {e}")
self.connection_status = ConnectionStatus.ERROR
# 关闭失败的Socket
self._close_sockets()
if self.socket_connection_attempts >= self.network_config.max_reconnect_attempts:
self.logger.error("Socket连接尝试次数已达上限")
return False
return False
def _close_sockets(self):
"""关闭所有Socket连接"""
try:
for socket_name, sock in [('audio', self.audio_socket),
('cmd', self.cmd_socket),
('location', self.location_socket)]:
if sock:
try:
sock.close()
self.logger.debug(f"关闭{socket_name} Socket")
except Exception as e:
self.logger.error(f"关闭{socket_name} Socket失败: {e}")
self.audio_socket = None
self.cmd_socket = None
self.location_socket = None
except Exception as e:
self.logger.error(f"关闭Socket失败: {e}")
def _audio_recording_loop(self):
"""音频录音循环 - 第一阶段的核心实现"""
try:
self.logger.debug("开始音频录音循环")
# 1. 采集麦克风阵列音频数据
audio_data = self._capture_audio_data()
if audio_data is None:
self.logger.warning("音频数据采集失败")
return
# 2. 音频预处理
processed_audio = self._preprocess_audio(audio_data)
if processed_audio is None:
self.logger.warning("音频预处理失败")
return
# 3. 网络传输到PC服务器
if not self._send_audio_data(processed_audio):
self.logger.error("音频数据发送失败")
return
# 4. 监听PC端指令
self._listen_for_commands()
# 5. 发送心跳包
self._send_heartbeat_if_needed()
# 6. 更新性能统计
self.performance_monitor.increment('audio_packets_sent')
# 控制循环频率
time.sleep(self.system_config.location_update_interval)
except Exception as e:
self.logger.error(f"音频录音循环异常: {e}")
self.performance_monitor.increment('errors')
def _capture_audio_data(self) -> Optional[bytes]:
"""采集麦克风阵列音频数据"""
try:
# 使用麦克风阵列采集音频
# 假设mic.record()返回原始音频数据
audio_data = mic.record(self.audio_config.chunk_size)
if audio_data is None or len(audio_data) == 0:
self.logger.warning("麦克风阵列返回空数据")
return None
# 转换为字节格式
if isinstance(audio_data, (list, tuple)):
# 如果是数值列表,转换为字节
audio_bytes = b''
for sample in audio_data:
audio_bytes += struct.pack('<h', int(sample))
return audio_bytes
elif isinstance(audio_data, bytes):
return audio_data
else:
self.logger.warning(f"未知的音频数据格式: {type(audio_data)}")
return None
except Exception as e:
self.logger.error(f"音频数据采集失败: {e}")
return None
def _preprocess_audio(self, audio_data: bytes) -> Optional[bytes]:
"""音频预处理"""
try:
# 1. 数据有效性检查
if len(audio_data) < self.audio_config.chunk_size * 2: # 假设16位采样
self.logger.warning("音频数据长度不足")
return None
# 2. 音频格式转换(如果需要)
# 这里可以添加音频格式转换逻辑
# 3. 添加时间戳
timestamp = time.time()
header = struct.pack('<d', timestamp) # 8字节时间戳
# 4. 组合数据
processed_data = header + audio_data
return processed_data
except Exception as e:
self.logger.error(f"音频预处理失败: {e}")
return None
def _send_audio_data(self, audio_data: bytes) -> bool:
"""发送音频数据到PC服务器"""
try:
if self.audio_socket is None:
self.logger.error("音频Socket未连接")
return False
# 发送数据
self.audio_socket.send(audio_data)
self.logger.debug(f"音频数据发送成功: {len(audio_data)} bytes")
return True
except socket.error as e:
self.logger.error(f"音频数据发送失败: {e}")
self.connection_status = ConnectionStatus.ERROR
return False
except Exception as e:
self.logger.error(f"音频数据发送异常: {e}")
return False
def _listen_for_commands(self):
"""监听PC端指令"""
try:
if self.cmd_socket is None:
return
# 设置非阻塞模式
self.cmd_socket.settimeout(0.01) # 10ms超时
try:
data = self.cmd_socket.recv(1024)
if data:
command = data.decode('utf-8').strip()
self.logger.info(f"收到PC端指令: {command}")
# 处理指令
self._process_command(command)
except socket.timeout:
# 超时是正常的,继续循环
pass
except Exception as e:
self.logger.error(f"指令接收异常: {e}")
except Exception as e:
self.logger.error(f"指令监听失败: {e}")
def _process_command(self, command: str):
"""处理PC端指令"""
try:
if command == "START_LOCATION":
self.logger.info("收到切换到定位模式指令")
self._switch_mode(SystemMode.LOCATING)
elif command == "STOP_LOCATION":
self.logger.info("收到停止定位模式指令")
self._switch_mode(SystemMode.RECORDING)
elif command == "SHUTDOWN":
self.logger.info("收到关闭系统指令")
self._switch_mode(SystemMode.SHUTDOWN)
elif command == "HEARTBEAT":
self.logger.debug("收到心跳包")
self._send_heartbeat()
else:
self.logger.warning(f"未知指令: {command}")
except Exception as e:
self.logger.error(f"指令处理失败: {e}")
def _send_heartbeat_if_needed(self):
"""发送心跳包(如果需要)"""
try:
current_time = time.time()
if current_time - self.last_heartbeat > self.system_config.heartbeat_interval:
self._send_heartbeat()
self.last_heartbeat = current_time
except Exception as e:
self.logger.error(f"心跳包发送失败: {e}")
def _send_heartbeat(self):
"""发送心跳包"""
try:
if self.cmd_socket is None:
return
heartbeat_data = {
'type': 'heartbeat',
'timestamp': time.time(),
'mode': self.current_mode.value,
'status': self.system_status,
'uptime': time.time() - self.start_time,
'memory': gc.mem_free(),
'error_count': self.error_count
}
heartbeat_json = json.dumps(heartbeat_data)
self.cmd_socket.send(heartbeat_json.encode('utf-8'))
self.logger.debug("心跳包发送成功")
except Exception as e:
self.logger.error(f"心跳包发送失败: {e}")
def _location_processing_loop(self):
"""定位处理循环 - 第二阶段的核心实现"""
try:
self.logger.debug("开始定位处理循环")
# 1. 采集麦克风阵列数据
audio_data = self._capture_audio_data()
if audio_data is None:
return
# 2. 声源定位计算
location_data = self._calculate_source_location(audio_data)
if location_data is None:
return
# 3. 发送定位数据到PC服务器
if not self._send_location_data(location_data):
self.logger.error("定位数据发送失败")
return
# 4. 监听PC端指令
self._listen_for_commands()
# 5. 更新性能统计
self.performance_monitor.increment('location_packets_sent')
# 控制循环频率
time.sleep(self.system_config.location_update_interval)
except Exception as e:
self.logger.error(f"定位处理循环异常: {e}")
self.performance_monitor.increment('errors')
def _calculate_source_location(self, audio_data: bytes) -> Optional[LocationData]:
"""计算声源位置"""
try:
# 创建声源定位器
locator = SoundSourceLocator(self.hardware_config)
# 将字节数据转换为数值列表
audio_values = self._bytes_to_audio_values(audio_data)
# 进行声源定位
location = locator.process_audio_frame(audio_values)
if location:
self.logger.debug(f"声源定位成功: X={location.x:.2f}, Y={location.y:.2f}")
return location
else:
self.logger.debug("声源定位失败")
return None
except Exception as e:
self.logger.error(f"声源定位计算失败: {e}")
return None
def _bytes_to_audio_values(self, audio_bytes: bytes) -> List[float]:
"""将字节数据转换为音频数值列表"""
try:
audio_values = []
# 假设音频数据是16位整数格式
for i in range(0, len(audio_bytes), 2):
if i + 1 < len(audio_bytes):
sample = struct.unpack('<h', audio_bytes[i:i+2])[0]
audio_values.append(float(sample) / 32768.0) # 归一化到[-1, 1]
return audio_values
except Exception as e:
self.logger.error(f"音频数据转换失败: {e}")
return []
def _send_location_data(self, location_data: LocationData) -> bool:
"""发送定位数据到PC服务器"""
try:
if self.location_socket is None:
self.logger.error("定位Socket未连接")
return False
# 转换为字符串格式发送
location_str = f"{location_data.x:.3f},{location_data.y:.3f},{location_data.strength:.3f},{location_data.angle:.3f}"
self.location_socket.send(location_str.encode('utf-8'))
self.logger.debug(f"定位数据发送成功: {location_str}")
return True
except socket.error as e:
self.logger.error(f"定位数据发送失败: {e}")
self.connection_status = ConnectionStatus.ERROR
return False
except Exception as e:
self.logger.error(f"定位数据发送异常: {e}")
return False
def _error_recovery_loop(self):
"""错误恢复循环"""
try:
self.logger.info("进入错误恢复模式")
# 尝试重新连接网络
if self._ensure_network_connection():
self.logger.info("网络连接恢复成功")
self._switch_mode(SystemMode.RECORDING)
else:
self.logger.warning("网络连接恢复失败,继续尝试...")
time.sleep(5) # 等待5秒后重试
except Exception as e:
self.logger.error(f"错误恢复失败: {e}")
def _cleanup_resources(self):
"""清理资源"""
try:
self.logger.info("开始清理资源...")
# 关闭Socket连接
self._close_sockets()
# 关闭WiFi
if self.nic:
self.nic.active(False)
# 关闭硬件组件
self._shutdown_hardware()
# 停止定时器
if self.heartbeat_timer:
self.heartbeat_timer.stop()
if self.health_check_timer:
self.health_check_timer.stop()
self.logger.info("资源清理完成")
except Exception as e:
self.logger.error(f"资源清理失败: {e}")
def _get_mic_direction(self) -> Optional[LocationData]:
"""获取麦克风方向 - 用于测试麦克风阵列功能"""
try:
# 采集一小段音频数据进行测试
test_audio = self._capture_audio_data()
if test_audio is None:
return None
# 转换为音频数值
audio_values = self._bytes_to_audio_values(test_audio)
if not audio_values:
return None
# 使用声源定位器进行测试
locator = SoundSourceLocator(self.hardware_config)
test_location = locator.process_audio_frame(audio_values)
return test_location
except Exception as e:
self.logger.error(f"麦克风方向测试失败: {e}")
return None
# ========== 声源定位核心算法 ==========
class KalmanFilter:

@ -197,6 +197,164 @@ class NetworkConfig:
raise ValueError("超时时间必须大于0")
return True
@dataclass
class LocationRecord:
"""声源定位记录数据结构"""
location_data: LocationData
audio_data: bytes
timestamp: float
gunshot_detected: bool = False
gunshot_confidence: float = 0.0
gunshot_label: str = ""
is_saved: bool = False
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'location': {
'x': self.location_data.x,
'y': self.location_data.y,
'strength': self.location_data.strength,
'angle': self.location_data.angle,
'confidence': self.location_data.confidence,
'timestamp': self.location_data.timestamp
},
'audio_data_size': len(self.audio_data),
'timestamp': self.timestamp,
'gunshot_detected': self.gunshot_detected,
'gunshot_confidence': self.gunshot_confidence,
'gunshot_label': self.gunshot_label,
'is_saved': self.is_saved
}
class LocationDataRecorder:
"""声源定位数据记录器 - 管理定位数据和音频数据的记录"""
def __init__(self, save_directory: str = "location_records"):
self.save_directory = save_directory
self.logger = setup_logging("LocationDataRecorder")
self.records = [] # 内存中的记录
self.max_memory_records = 1000 # 最大内存记录数
self.gunshot_records = [] # 枪声记录
self.non_gunshot_records = [] # 非枪声记录
# 确保保存目录存在
os.makedirs(save_directory, exist_ok=True)
os.makedirs(os.path.join(save_directory, "gunshot"), exist_ok=True)
os.makedirs(os.path.join(save_directory, "non_gunshot"), exist_ok=True)
self.logger.info(f"声源定位数据记录器初始化完成,保存目录: {save_directory}")
def add_record(self, location_data: LocationData, audio_data: bytes) -> LocationRecord:
"""添加新的定位记录"""
try:
record = LocationRecord(
location_data=location_data,
audio_data=audio_data,
timestamp=time.time()
)
# 添加到内存记录
self.records.append(record)
# 限制内存记录数量
if len(self.records) > self.max_memory_records:
self.records = self.records[-self.max_memory_records:]
self.logger.debug(f"添加定位记录: X={location_data.x:.2f}, Y={location_data.y:.2f}")
return record
except Exception as e:
self.logger.error(f"添加定位记录失败: {e}")
return None
def process_gunshot_detection(self, record: LocationRecord, audio_processor) -> bool:
"""处理枪声检测"""
try:
# 使用音频处理器进行枪声识别
if audio_processor.add_audio_data(record.audio_data):
if audio_processor.should_process_recognition():
result = audio_processor.process_recognition()
if result and result.get('is_gunshot', False):
# 检测到枪声
record.gunshot_detected = True
record.gunshot_confidence = result.get('score', 0.0)
record.gunshot_label = result.get('label', 'gunshot')
# 保存枪声记录
self._save_gunshot_record(record)
self.gunshot_records.append(record)
self.logger.warning(f"检测到枪声!置信度: {record.gunshot_confidence:.4f}, "
f"位置: X={record.location_data.x:.2f}, Y={record.location_data.y:.2f}")
return True
else:
# 不是枪声,丢弃记录
record.gunshot_detected = False
record.gunshot_confidence = result.get('score', 0.0) if result else 0.0
record.gunshot_label = result.get('label', 'unknown') if result else 'unknown'
self.logger.debug(f"非枪声音频,丢弃定位数据: 置信度={record.gunshot_confidence:.4f}")
return False
return False
except Exception as e:
self.logger.error(f"枪声检测处理失败: {e}")
return False
def _save_gunshot_record(self, record: LocationRecord):
"""保存枪声记录"""
try:
timestamp_str = time.strftime("%Y%m%d_%H%M%S", time.localtime(record.timestamp))
filename = f"gunshot_{timestamp_str}_{record.gunshot_confidence:.4f}.json"
filepath = os.path.join(self.save_directory, "gunshot", filename)
# 保存记录信息
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(record.to_dict(), f, indent=2, ensure_ascii=False)
# 保存音频数据
audio_filename = f"gunshot_{timestamp_str}_{record.gunshot_confidence:.4f}.wav"
audio_filepath = os.path.join(self.save_directory, "gunshot", audio_filename)
# 这里可以添加音频文件保存逻辑
# 例如使用scipy.io.wavfile.write保存为WAV文件
record.is_saved = True
self.logger.info(f"枪声记录已保存: {filepath}")
except Exception as e:
self.logger.error(f"保存枪声记录失败: {e}")
def get_gunshot_records(self) -> List[LocationRecord]:
"""获取枪声记录"""
return self.gunshot_records.copy()
def get_recent_records(self, count: int = 100) -> List[LocationRecord]:
"""获取最近的记录"""
return self.records[-count:] if self.records else []
def get_statistics(self) -> Dict[str, Any]:
"""获取记录统计信息"""
try:
total_records = len(self.records)
gunshot_count = len(self.gunshot_records)
non_gunshot_count = total_records - gunshot_count
return {
'total_records': total_records,
'gunshot_records': gunshot_count,
'non_gunshot_records': non_gunshot_count,
'gunshot_ratio': gunshot_count / max(total_records, 1),
'save_directory': self.save_directory,
'max_memory_records': self.max_memory_records
}
except Exception as e:
self.logger.error(f"获取统计信息失败: {e}")
return {}
# ========== 日志配置 ==========
def setup_logging(log_level: str = "INFO", log_file: str = "pc_server.log") -> logging.Logger:
"""设置日志系统 - 配置分级日志记录和文件轮转
@ -1197,6 +1355,9 @@ class PCServer:
self.logger.error(traceback.format_exc())
self.current_mode = SystemMode.ERROR
raise
# 添加定位数据记录器
self.location_recorder = LocationDataRecorder()
def _load_recognition_config(self) -> Dict[str, Any]:
"""加载识别配置"""
@ -1468,7 +1629,7 @@ class PCServer:
raise
def _audio_processing_worker(self):
"""音频处理工作线程"""
"""音频处理工作线程 - 修改版本,保存音频数据用于定位记录"""
self.logger.info("音频处理线程开始运行")
try:
@ -1482,6 +1643,9 @@ class PCServer:
self.connection_status['audio'] = ConnectionStatus.DISCONNECTED
break
# 保存最近的音频数据用于定位记录
self.recent_audio_data = audio_data
# 更新性能统计
self.performance_stats['audio_packets_received'] += 1
@ -1590,29 +1754,71 @@ class PCServer:
self.logger.info("定位数据处理线程结束")
def _process_location_data(self, data: bytes):
"""处理定位数据"""
"""处理定位数据 - 修改后的版本,包含枪声检测和记录管理"""
try:
data_str = data.decode('utf-8').strip()
if not data_str:
return
# 解析数据
# 解析定位数据
location_data = self._parse_location_data(data_str)
if location_data:
if not location_data:
return
# 获取对应的音频数据(从开发板发送的音频数据)
audio_data = self._get_corresponding_audio_data(location_data.timestamp)
# 创建定位记录
record = self.location_recorder.add_record(location_data, audio_data)
if not record:
return
# 进行枪声检测
is_gunshot = self.location_recorder.process_gunshot_detection(record, self.audio_processor)
if is_gunshot:
# 检测到枪声,保留定位数据
self.logger.warning(f"检测到枪声!保留定位数据: X={location_data.x:.2f}, Y={location_data.y:.2f}")
# 应用卡尔曼滤波
filtered_data = self._apply_kalman_filter(location_data)
# 后处理(平滑、异常值剔除等)
processed_data = location_post_processor.process(filtered_data)
# 将数据放入队列
# 将数据放入队列用于可视化
self.location_queue.put(processed_data)
self.location_data_count += 1
# 添加到历史记录
self._add_to_history(processed_data)
self.logger.debug(f"接收定位数据(后处理): X={processed_data.x:.2f}, Y={processed_data.y:.2f}, "
f"强度={processed_data.strength:.2f}, 角度={processed_data.angle:.2f}")
self.logger.info(f"枪声定位数据已保存并显示: X={processed_data.x:.2f}, Y={processed_data.y:.2f}")
else:
# 不是枪声,丢弃定位数据
self.logger.debug(f"非枪声音频,丢弃定位数据: X={location_data.x:.2f}, Y={location_data.y:.2f}")
# 不添加到历史记录和可视化队列
except Exception as e:
self.logger.error(f"处理定位数据失败: {e}")
self.performance_stats['errors'] += 1
def _get_corresponding_audio_data(self, location_timestamp: float) -> bytes:
"""获取对应的音频数据"""
try:
# 这里需要实现从音频缓冲区获取对应时间戳的音频数据
# 简单实现:返回最近的音频数据
if hasattr(self, 'recent_audio_data') and self.recent_audio_data:
return self.recent_audio_data
else:
# 如果没有音频数据,返回空字节
return b''
except Exception as e:
self.logger.error(f"获取对应音频数据失败: {e}")
return b''
def _parse_location_data(self, data_str: str) -> Optional[LocationData]:
"""解析定位数据"""
try:
@ -1914,13 +2120,37 @@ class PCServer:
stats = {
"system_stats": self.get_system_status(),
"audio_processor_stats": self.audio_processor.get_performance_stats(),
"communication_stats": self.communication_manager.get_connection_stats()
"communication_stats": self.communication_manager.get_connection_stats(),
"location_records_stats": self.get_location_records_stats()
}
return jsonify(stats)
except Exception as e:
self.logger.error(f"获取性能统计失败: {e}")
return jsonify({"error": "获取统计失败"}), 500
@self.flask_app.route("/gunshot_records")
def get_gunshot_records():
"""获取枪声记录列表"""
try:
records = self.get_gunshot_records()
return jsonify({
"gunshot_records": records,
"count": len(records)
})
except Exception as e:
self.logger.error(f"获取枪声记录失败: {e}")
return jsonify({"error": "获取枪声记录失败"}), 500
@self.flask_app.route("/location_records_stats")
def get_location_records_stats():
"""获取定位记录统计信息"""
try:
stats = self.get_location_records_stats()
return jsonify(stats)
except Exception as e:
self.logger.error(f"获取定位记录统计失败: {e}")
return jsonify({"error": "获取定位记录统计失败"}), 500
self.logger.info("Flask路由设置完成")
except Exception as e:
@ -1963,6 +2193,23 @@ class PCServer:
self.logger.info("Flask服务器已停止")
except Exception as e:
self.logger.error(f"停止Flask服务器失败: {e}")
def get_location_records_stats(self) -> Dict[str, Any]:
"""获取定位记录统计信息"""
try:
return self.location_recorder.get_statistics()
except Exception as e:
self.logger.error(f"获取定位记录统计失败: {e}")
return {}
def get_gunshot_records(self) -> List[Dict[str, Any]]:
"""获取枪声记录列表"""
try:
records = self.location_recorder.get_gunshot_records()
return [record.to_dict() for record in records]
except Exception as e:
self.logger.error(f"获取枪声记录失败: {e}")
return []
def main():
"""主函数"""

@ -30,6 +30,42 @@
<el-descriptions-item label="角度">{{ sourceData.angle.toFixed(2) }}°</el-descriptions-item>
</el-descriptions>
</div>
<div class="gunshot-records">
<h3>枪声记录统计</h3>
<el-descriptions :column="1" border>
<el-descriptions-item label="总记录数">{{ recordsStats.total_records || 0 }}</el-descriptions-item>
<el-descriptions-item label="枪声记录数">{{ recordsStats.gunshot_records || 0 }}</el-descriptions-item>
<el-descriptions-item label="非枪声记录数">{{ recordsStats.non_gunshot_records || 0 }}</el-descriptions-item>
<el-descriptions-item label="枪声比例">{{ ((recordsStats.gunshot_ratio || 0) * 100).toFixed(2) }}%</el-descriptions-item>
</el-descriptions>
</div>
<div class="gunshot-list">
<h3>最近枪声记录</h3>
<el-table :data="gunshotRecords" style="width: 100%" max-height="200">
<el-table-column prop="timestamp" label="时间" width="120">
<template #default="scope">
{{ formatTimestamp(scope.row.timestamp) }}
</template>
</el-table-column>
<el-table-column prop="location.x" label="X坐标" width="80">
<template #default="scope">
{{ scope.row.location.x.toFixed(2) }}
</template>
</el-table-column>
<el-table-column prop="location.y" label="Y坐标" width="80">
<template #default="scope">
{{ scope.row.location.y.toFixed(2) }}
</template>
</el-table-column>
<el-table-column prop="gunshot_confidence" label="置信度" width="80">
<template #default="scope">
{{ (scope.row.gunshot_confidence * 100).toFixed(1) }}%
</template>
</el-table-column>
</el-table>
</div>
</div>
<div class="visualization-panel">
@ -63,11 +99,20 @@ export default {
isMonitoring: false,
pollingInterval: null,
API_BASE_URL: 'http://127.0.0.1:5000', //
recordsStats: {
total_records: 0,
gunshot_records: 0,
non_gunshot_records: 0,
gunshot_ratio: 0
},
gunshotRecords: []
};
},
mounted() {
this.initChart();
this.checkConnection();
this.fetchRecordsStats();
this.fetchGunshotRecords();
},
beforeUnmount() {
if (this.pollingInterval) {
@ -230,6 +275,8 @@ export default {
// 500ms
this.pollingInterval = setInterval(() => {
this.fetchSourceData();
this.fetchRecordsStats();
this.fetchGunshotRecords();
}, 500);
},
stopMonitoring() {
@ -251,6 +298,28 @@ export default {
console.error('获取声源数据失败:', error);
this.connectionStatus = false;
});
},
fetchRecordsStats() {
axios.get(`${this.API_BASE_URL}/location_records_stats`)
.then(response => {
this.recordsStats = response.data;
})
.catch(error => {
console.error('获取记录统计失败:', error);
});
},
fetchGunshotRecords() {
axios.get(`${this.API_BASE_URL}/gunshot_records`)
.then(response => {
this.gunshotRecords = response.data.gunshot_records || [];
})
.catch(error => {
console.error('获取枪声记录失败:', error);
});
},
formatTimestamp(timestamp) {
const date = new Date(timestamp * 1000);
return date.toLocaleTimeString();
}
}
}
@ -296,6 +365,8 @@ export default {
display: flex;
flex-direction: column;
gap: 1.5rem;
max-height: 80vh;
overflow-y: auto;
}
.visualization-panel {
@ -317,6 +388,10 @@ export default {
margin: 1rem 0;
}
.gunshot-records, .gunshot-list {
margin-top: 1rem;
}
.app-footer {
background-color: #304156;
color: #a7b0bc;

Loading…
Cancel
Save