|
|
#!/usr/bin/env python3
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
"""
|
|
|
系统综合测试脚本
|
|
|
用于验证各个模块是否正常工作
|
|
|
"""
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
import sys
|
|
|
import traceback
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
|
|
|
def test_opencv():
|
|
|
"""测试OpenCV是否正常工作"""
|
|
|
print("=" * 50)
|
|
|
print("测试 OpenCV...")
|
|
|
try:
|
|
|
print(f"OpenCV 版本: {cv2.__version__}")
|
|
|
|
|
|
# 测试摄像头
|
|
|
cap = cv2.VideoCapture(0)
|
|
|
if cap.isOpened():
|
|
|
print("✓ 摄像头可以正常打开")
|
|
|
ret, frame = cap.read()
|
|
|
if ret:
|
|
|
print(f"✓ 摄像头可以正常读取画面,分辨率: {frame.shape[1]}x{frame.shape[0]}")
|
|
|
else:
|
|
|
print("✗ 无法从摄像头读取画面")
|
|
|
cap.release()
|
|
|
else:
|
|
|
print("✗ 无法打开摄像头")
|
|
|
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"✗ OpenCV 测试失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def test_yolo_model():
|
|
|
"""测试YOLO模型是否正常工作"""
|
|
|
print("=" * 50)
|
|
|
print("测试 YOLO 模型...")
|
|
|
try:
|
|
|
from ultralytics import YOLO
|
|
|
print("✓ ultralytics 库导入成功")
|
|
|
|
|
|
# 尝试加载模型
|
|
|
model = YOLO('yolov8n.pt')
|
|
|
print("✓ YOLOv8n 模型加载成功")
|
|
|
|
|
|
# 创建一个测试图像
|
|
|
test_image = np.zeros((640, 480, 3), dtype=np.uint8)
|
|
|
results = model(test_image, verbose=False)
|
|
|
print("✓ YOLO 推理测试成功")
|
|
|
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"✗ YOLO 模型测试失败: {e}")
|
|
|
traceback.print_exc()
|
|
|
return False
|
|
|
|
|
|
def test_modules():
|
|
|
"""测试自定义模块"""
|
|
|
print("=" * 50)
|
|
|
print("测试自定义模块...")
|
|
|
try:
|
|
|
# 测试配置模块
|
|
|
from src import config
|
|
|
print("✓ config 模块导入成功")
|
|
|
|
|
|
# 测试距离计算模块
|
|
|
from src import DistanceCalculator
|
|
|
calculator = DistanceCalculator()
|
|
|
test_bbox = [100, 100, 200, 400] # 测试边界框
|
|
|
distance = calculator.get_distance(test_bbox)
|
|
|
print(f"✓ distance_calculator 模块测试成功,测试距离: {calculator.format_distance(distance)}")
|
|
|
|
|
|
# 测试人体检测模块
|
|
|
from src import PersonDetector
|
|
|
detector = PersonDetector()
|
|
|
print("✓ person_detector 模块导入成功")
|
|
|
|
|
|
# 测试地图管理器
|
|
|
from src import MapManager
|
|
|
map_manager = MapManager(
|
|
|
api_key=config.GAODE_API_KEY,
|
|
|
camera_lat=config.CAMERA_LATITUDE,
|
|
|
camera_lng=config.CAMERA_LONGITUDE
|
|
|
)
|
|
|
print("✓ map_manager 模块导入成功")
|
|
|
|
|
|
# 测试手机连接器
|
|
|
from src import MobileConnector
|
|
|
mobile_connector = MobileConnector(port=8081) # 使用不同端口避免冲突
|
|
|
print("✓ mobile_connector 模块导入成功")
|
|
|
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"✗ 自定义模块测试失败: {e}")
|
|
|
traceback.print_exc()
|
|
|
return False
|
|
|
|
|
|
def test_flask():
|
|
|
"""测试Flask是否安装"""
|
|
|
print("=" * 50)
|
|
|
print("测试 Flask 环境...")
|
|
|
try:
|
|
|
import flask
|
|
|
print(f"✓ Flask 导入成功,版本: {flask.__version__}")
|
|
|
|
|
|
# 测试Web服务器模块
|
|
|
from src import WebServer
|
|
|
print("✓ WebServer 模块导入成功")
|
|
|
|
|
|
return True
|
|
|
except ImportError:
|
|
|
print("✗ Flask 未安装,Web功能不可用")
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
print(f"✗ Flask 测试失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def test_web_apis(base_url="http://127.0.0.1:5000"):
|
|
|
"""测试Web API接口(仅在服务器运行时测试)"""
|
|
|
print("=" * 50)
|
|
|
print("测试Web API接口...")
|
|
|
|
|
|
try:
|
|
|
# 测试主页面
|
|
|
response = requests.get(f"{base_url}/", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
print("✓ 主页面访问正常")
|
|
|
|
|
|
# 测试人员数据API
|
|
|
response = requests.get(f"{base_url}/api/get_persons_data", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
print(f"✓ 人员数据API正常: {len(data)} 个人员")
|
|
|
|
|
|
# 测试调试信息API
|
|
|
response = requests.get(f"{base_url}/api/debug_info", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
data = response.json()
|
|
|
print(f"✓ 调试信息API正常")
|
|
|
print(f" 摄像头状态: {'在线' if data.get('camera_active') else '离线'}")
|
|
|
|
|
|
# 测试手机相关API
|
|
|
response = requests.get(f"{base_url}/api/mobile/devices", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
devices = response.json()
|
|
|
print(f"✓ 手机设备API正常: {len(devices)} 个设备")
|
|
|
|
|
|
# 测试手机端页面
|
|
|
response = requests.get(f"{base_url}/mobile/mobile_client.html", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
print(f"✓ 手机端页面可访问")
|
|
|
|
|
|
# 🛤️ 测试轨迹模拟API
|
|
|
test_trajectory_simulation_apis(base_url)
|
|
|
|
|
|
return True
|
|
|
else:
|
|
|
print("✗ Web服务器未响应")
|
|
|
return False
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
print("⚠️ Web服务器未运行,跳过API测试")
|
|
|
return True # 不算失败,因为服务器可能没启动
|
|
|
except Exception as e:
|
|
|
print(f"✗ Web API测试失败: {e}")
|
|
|
return False
|
|
|
|
|
|
def test_trajectory_simulation_apis(base_url):
|
|
|
"""测试轨迹模拟相关API"""
|
|
|
print("\n🛤️ 测试轨迹模拟API...")
|
|
|
|
|
|
try:
|
|
|
# 测试模拟状态API
|
|
|
response = requests.get(f"{base_url}/api/test/simulation_status", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
status_data = response.json()
|
|
|
print(f"✓ 模拟状态API正常: 状态={status_data.get('simulation_active', False)}")
|
|
|
|
|
|
# 测试启动模拟API
|
|
|
simulation_data = {
|
|
|
"type": "circle",
|
|
|
"drone_count": 1,
|
|
|
"speed": 2
|
|
|
}
|
|
|
response = requests.post(
|
|
|
f"{base_url}/api/test/start_simulation",
|
|
|
json=simulation_data,
|
|
|
timeout=5
|
|
|
)
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
print(f"✓ 启动模拟API正常: {result.get('message', '无消息')}")
|
|
|
|
|
|
# 等待一下让模拟运行
|
|
|
time.sleep(3)
|
|
|
|
|
|
# 再次检查状态
|
|
|
response = requests.get(f"{base_url}/api/test/simulation_status", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
status_data = response.json()
|
|
|
if status_data.get('simulation_active'):
|
|
|
print(f"✓ 模拟正在运行: {len(status_data.get('simulated_drones', []))} 架无人机")
|
|
|
else:
|
|
|
print("⚠️ 模拟未成功启动")
|
|
|
|
|
|
# 测试停止模拟API
|
|
|
response = requests.post(f"{base_url}/api/test/stop_simulation", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
print(f"✓ 停止模拟API正常: {result.get('message', '无消息')}")
|
|
|
|
|
|
# 测试轨迹模拟测试页面
|
|
|
response = requests.get(f"{base_url}/trajectory_simulation_test.html", timeout=5)
|
|
|
if response.status_code == 200:
|
|
|
print("✓ 轨迹模拟测试页面可访问")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"⚠️ 轨迹模拟API测试失败: {e}")
|
|
|
# 不抛出异常,因为这只是测试功能
|
|
|
|
|
|
def main():
|
|
|
"""主测试函数"""
|
|
|
print("🔧 开始系统综合测试...")
|
|
|
print("测试将验证所有必要的组件是否正常工作")
|
|
|
|
|
|
tests = [
|
|
|
("OpenCV", test_opencv),
|
|
|
("YOLO模型", test_yolo_model),
|
|
|
("自定义模块", test_modules),
|
|
|
("Flask环境", test_flask),
|
|
|
("Web API", test_web_apis),
|
|
|
]
|
|
|
|
|
|
results = {}
|
|
|
|
|
|
for test_name, test_func in tests:
|
|
|
print(f"\n🧪 开始测试: {test_name}")
|
|
|
results[test_name] = test_func()
|
|
|
|
|
|
# 显示测试结果摘要
|
|
|
print("\n" + "=" * 50)
|
|
|
print("📊 测试结果摘要:")
|
|
|
print("=" * 50)
|
|
|
|
|
|
passed = 0
|
|
|
total = len(tests)
|
|
|
|
|
|
for test_name, result in results.items():
|
|
|
status = "✓ 通过" if result else "✗ 失败"
|
|
|
print(f"{test_name:<15}: {status}")
|
|
|
if result:
|
|
|
passed += 1
|
|
|
|
|
|
print(f"\n总体结果: {passed}/{total} 测试通过")
|
|
|
|
|
|
if passed >= total - 1: # 允许Web API测试失败(因为可能没启动服务器)
|
|
|
print("🎉 系统基本功能正常!")
|
|
|
print("\n📖 使用建议:")
|
|
|
print(" 1. 运行 'python run.py' 选择运行模式")
|
|
|
print(" 2. 使用Web模式获得最佳体验")
|
|
|
print(" 3. 配置摄像头位置获得准确的地图显示")
|
|
|
else:
|
|
|
print("⚠️ 系统存在问题,请检查相关组件")
|
|
|
print("\n🔧 建议操作:")
|
|
|
print(" 1. 重新运行 'python tools/install.py' 安装依赖")
|
|
|
print(" 2. 检查requirements.txt中的依赖版本")
|
|
|
print(" 3. 确认摄像头设备连接正常")
|
|
|
|
|
|
print("\n测试完成!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |