You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

261 lines
9.7 KiB

import cv2
import time
import numpy as np
from src import PersonDetector, DistanceCalculator, MapManager, config
class RealTimePersonDistanceDetector:
def __init__(self):
self.detector = PersonDetector()
self.distance_calculator = DistanceCalculator()
self.cap = None
self.fps_counter = 0
self.fps_time = time.time()
self.current_fps = 0
# 初始化地图管理器
if config.ENABLE_MAP_DISPLAY:
self.map_manager = MapManager(
api_key=config.GAODE_API_KEY,
camera_lat=config.CAMERA_LATITUDE,
camera_lng=config.CAMERA_LONGITUDE
)
self.map_manager.set_camera_position(
config.CAMERA_LATITUDE,
config.CAMERA_LONGITUDE,
config.CAMERA_HEADING
)
print("🗺️ 地图管理器已初始化")
else:
self.map_manager = None
def initialize_camera(self):
"""初始化摄像头"""
self.cap = cv2.VideoCapture(config.CAMERA_INDEX)
if not self.cap.isOpened():
raise Exception(f"无法开启摄像头 {config.CAMERA_INDEX}")
# 设置摄像头参数
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, config.FRAME_WIDTH)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, config.FRAME_HEIGHT)
self.cap.set(cv2.CAP_PROP_FPS, config.FPS)
# 获取实际设置的参数
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
print(f"摄像头初始化成功:")
print(f" 分辨率: {actual_width}x{actual_height}")
print(f" 帧率: {actual_fps} FPS")
def calculate_fps(self):
"""计算实际帧率"""
self.fps_counter += 1
current_time = time.time()
if current_time - self.fps_time >= 1.0:
self.current_fps = self.fps_counter
self.fps_counter = 0
self.fps_time = current_time
def draw_info_panel(self, frame, person_count=0):
"""绘制信息面板"""
height, width = frame.shape[:2]
# 绘制顶部信息栏
info_height = 60
cv2.rectangle(frame, (0, 0), (width, info_height), (0, 0, 0), -1)
# 显示FPS
fps_text = f"FPS: {self.current_fps}"
cv2.putText(frame, fps_text, (10, 25), config.FONT, 0.6, (0, 255, 0), 2)
# 显示人员计数
person_text = f"Persons: {person_count}"
cv2.putText(frame, person_text, (150, 25), config.FONT, 0.6, (0, 255, 255), 2)
# 显示模型信息
model_text = self.detector.get_model_info()
cv2.putText(frame, model_text, (10, 45), config.FONT, 0.5, (255, 255, 255), 1)
# 显示操作提示
help_text = "Press 'q' to quit | 'c' to calibrate | 'r' to reset | 'm' to open map"
text_size = cv2.getTextSize(help_text, config.FONT, 0.5, 1)[0]
cv2.putText(frame, help_text, (width - text_size[0] - 10, 25),
config.FONT, 0.5, (255, 255, 0), 1)
# 显示地图状态
if self.map_manager:
map_status = "Map: ON"
cv2.putText(frame, map_status, (10, height - 10),
config.FONT, 0.5, (0, 255, 255), 1)
return frame
def calibrate_distance(self, detections):
"""距离校准模式"""
if len(detections) == 0:
print("未检测到人体,无法校准")
return
print("\n=== 距离校准模式 ===")
print("请确保画面中有一个人,并输入该人距离摄像头的真实距离")
try:
real_distance = float(input("请输入真实距离(厘米): "))
# 使用第一个检测到的人进行校准
detection = detections[0]
x1, y1, x2, y2, conf = detection
bbox_height = y2 - y1
# 更新参考参数
config.REFERENCE_DISTANCE = real_distance
config.REFERENCE_HEIGHT_PIXELS = bbox_height
# 重新初始化距离计算器
self.distance_calculator = DistanceCalculator()
print(f"校准完成!")
print(f"参考距离: {real_distance}cm")
print(f"参考像素高度: {bbox_height}px")
except ValueError:
print("输入无效,校准取消")
except Exception as e:
print(f"校准失败: {e}")
def process_frame(self, frame):
"""处理单帧图像"""
# 检测人体
detections = self.detector.detect_persons(frame)
# 计算距离并更新地图位置
distances = []
if self.map_manager:
self.map_manager.clear_persons()
for i, detection in enumerate(detections):
bbox = detection[:4] # [x1, y1, x2, y2]
x1, y1, x2, y2 = bbox
distance = self.distance_calculator.get_distance(bbox)
distance_str = self.distance_calculator.format_distance(distance)
distances.append(distance_str)
# 更新地图上的人员位置
if self.map_manager:
# 计算人体中心点
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
# 将距离从厘米转换为米
distance_meters = distance / 100.0
# 添加到地图
self.map_manager.add_person_position(
center_x, center_y, distance_meters,
frame.shape[1], frame.shape[0], # width, height
f"P{i+1}"
)
# 绘制检测结果
frame = self.detector.draw_detections(frame, detections, distances)
# 绘制信息面板
frame = self.draw_info_panel(frame, len(detections))
# 计算FPS
self.calculate_fps()
return frame, detections
def run(self):
"""运行主程序"""
try:
print("正在初始化...")
self.initialize_camera()
print("系统启动成功!")
print("操作说明:")
print(" - 按 'q' 键退出程序")
print(" - 按 'c' 键进入距离校准模式")
print(" - 按 'r' 键重置为默认参数")
print(" - 按 's' 键保存当前帧")
if self.map_manager:
print(" - 按 'm' 键打开地图显示")
print(" - 按 'h' 键设置摄像头朝向")
print("\n开始实时检测...")
frame_count = 0
while True:
ret, frame = self.cap.read()
if not ret:
print("无法读取摄像头画面")
break
# 处理帧
processed_frame, detections = self.process_frame(frame)
# 显示结果
cv2.imshow('Real-time Person Distance Detection', processed_frame)
# 处理按键
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
print("用户退出程序")
break
elif key == ord('c'):
# 校准模式
self.calibrate_distance(detections)
elif key == ord('r'):
# 重置参数
print("重置为默认参数")
self.distance_calculator = DistanceCalculator()
elif key == ord('s'):
# 保存当前帧
filename = f"capture_{int(time.time())}.jpg"
cv2.imwrite(filename, processed_frame)
print(f"已保存截图: {filename}")
elif key == ord('m') and self.map_manager:
# 打开地图显示
print("正在打开地图...")
self.map_manager.open_map()
elif key == ord('h') and self.map_manager:
# 设置摄像头朝向
try:
heading = float(input("请输入摄像头朝向角度 (0-360°, 0为正北): "))
if 0 <= heading <= 360:
self.map_manager.update_camera_heading(heading)
else:
print("角度必须在0-360度之间")
except ValueError:
print("输入无效")
frame_count += 1
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"程序运行出错: {e}")
finally:
self.cleanup()
def cleanup(self):
"""清理资源"""
if self.cap:
self.cap.release()
cv2.destroyAllWindows()
print("资源已清理,程序结束")
def main():
"""主函数"""
print("=" * 50)
print("实时人体距离检测系统")
print("=" * 50)
detector = RealTimePersonDistanceDetector()
detector.run()
if __name__ == "__main__":
main()