fix: use web-digital-human as submodule

main
fred 2 weeks ago
parent a96095e481
commit b6cab3c218

@ -0,0 +1,246 @@
# RoboMaster无人机控制系统
一个集成了无人机控制和YOLO实时目标检测的智能控制系统。
## 🚀 功能特性
### 无人机控制
- ✅ 连接/断开无人机
- ✅ 起飞/降落/紧急降落
- ✅ 六自由度移动控制(前进、后退、左移、右移、上升、下降)
- ✅ 旋转控制(左转、右转)
- ✅ 悬停功能
- ✅ 侦察模式(视角控制)
- ✅ 自动巡航任务
- ✅ 任务停止功能
- ✅ 实时状态监控
### YOLO实时检测
- ✅ 无人机视频流支持
- ✅ 实时目标检测
- ✅ 检测结果可视化
- ✅ 检测统计信息
- ✅ 动态切换视频源和模型
- ✅ FPS监控
- ✅ WebSocket实时推送
### 前端界面
- ✅ 现代化响应式设计
- ✅ 实时设备状态显示
- ✅ 直观的控制按钮
- ✅ 无人机视频流显示
- ✅ 检测结果实时展示
- ✅ 连接状态指示器
## 📁 项目结构
```
robomaster_control/
├── __init__.py
├── api.py # FastAPI主接口
├── drone_controller.py # 无人机控制逻辑
├── yolo_stream.py # YOLO视频流处理
├── requirements.txt # 项目依赖
├── README.md # 项目说明
└── static/
└── index.html # 前端界面
```
## 🛠️ 安装和运行
### 1. 环境准备
```bash
# 创建虚拟环境
python -m venv robomaster_env
# 激活虚拟环境
# Windows:
robomaster_env\Scripts\activate
# Linux/Mac:
source robomaster_env/bin/activate
```
### 2. 安装依赖
```bash
pip install -r requirements.txt
```
### 3. 下载YOLO模型
```bash
# 自动下载YOLOv5s模型首次运行时会自动下载
python -c "import torch; torch.hub.load('ultralytics/yolov5', 'yolov5s')"
```
### 4. 启动服务
```bash
# 启动后端服务
uvicorn robomaster_control.api:app --reload --host 0.0.0.0 --port 8000
```
### 5. 访问界面
打开浏览器访问:`http://localhost:8000/static/index.html`
## 🔧 配置说明
### 视频源配置
- **本地摄像头**: 使用设备号(如 `0`, `1`, `2`
- **RTSP流**: 使用RTSP地址`rtsp://192.168.1.100:554/stream`
- **UDP流**: 使用UDP地址`udp://0.0.0.0:11111`
### YOLO模型配置
- **预训练模型**: `yolov5s.pt`, `yolov5m.pt`, `yolov5l.pt`, `yolov5x.pt`
- **自定义模型**: 使用自己训练的模型文件路径
## 🔌 API接口
### 无人机控制接口
| 接口 | 方法 | 说明 |
|------|------|------|
| `/drone/connect` | POST | 连接无人机 |
| `/drone/disconnect` | POST | 断开无人机 |
| `/drone/takeoff` | POST | 起飞 |
| `/drone/land` | POST | 降落 |
| `/drone/emergency_land` | POST | 紧急降落 |
| `/drone/move_forward` | POST | 前进 |
| `/drone/move_backward` | POST | 后退 |
| `/drone/move_left` | POST | 左移 |
| `/drone/move_right` | POST | 右移 |
| `/drone/move_up` | POST | 上升 |
| `/drone/move_down` | POST | 下降 |
| `/drone/rotate_left` | POST | 左转 |
| `/drone/rotate_right` | POST | 右转 |
| `/drone/hover` | POST | 悬停 |
| `/drone/cruise` | POST | 巡航 |
| `/drone/stop_mission` | POST | 停止任务 |
| `/drone/status` | GET | 获取状态 |
### YOLO视频流接口
| 接口 | 方法 | 说明 |
|------|------|------|
| `/ws/yolo/{stream_name}` | WebSocket | 视频流推送 |
| `/yolo/set_source/{stream_name}` | POST | 设置视频源 |
| `/yolo/streams` | GET | 获取流信息 |
| `/yolo/detection_summary/{stream_name}` | GET | 获取检测摘要 |
## 🎯 使用示例
### 1. 基础控制流程
```python
import requests
# 连接无人机
response = requests.post("http://localhost:8000/drone/connect")
print(response.json())
# 起飞
response = requests.post("http://localhost:8000/drone/takeoff")
print(response.json())
# 前进1米
response = requests.post("http://localhost:8000/drone/move_forward?distance=1.0")
print(response.json())
# 降落
response = requests.post("http://localhost:8000/drone/land")
print(response.json())
```
### 2. 切换YOLO配置
```python
import requests
# 切换无人机视频源
data = {
"source": "rtsp://192.168.1.100:554/stream",
"model_path": "yolov5s.pt"
}
response = requests.post("http://localhost:8000/yolo/set_source/drone", json=data)
print(response.json())
```
## 🔧 扩展开发
### 添加新的控制功能
1. 在对应的控制器类中添加新方法
2. 在 `api.py` 中添加新的API接口
3. 在前端界面中添加控制按钮
### 集成真实设备
1. 安装对应的SDK如RoboMaster SDK、Tello SDK
2. 在控制器中替换模拟代码为实际SDK调用
3. 根据设备特性调整控制参数
### 自定义YOLO模型
1. 训练自己的YOLO模型
2. 将模型文件放在项目目录中
3. 在前端界面中切换模型路径
## 🐛 故障排除
### 常见问题
1. **摄像头无法打开**
- 检查设备号是否正确
- 确认摄像头未被其他程序占用
- 尝试不同的设备号
2. **YOLO模型加载失败**
- 检查模型文件路径
- 确认模型文件完整性
- 检查PyTorch版本兼容性
3. **WebSocket连接失败**
- 检查后端服务是否正常运行
- 确认端口8000未被占用
- 检查防火墙设置
4. **设备连接失败**
- 检查网络连接
- 确认设备电源状态
- 验证SDK配置
## 📝 更新日志
### v1.0.0 (2024-01-01)
- ✅ 初始版本发布
- ✅ 基础无人机控制功能
- ✅ YOLO实时检测
- ✅ 现代化前端界面
## 🤝 贡献指南
欢迎提交Issue和Pull Request来改进这个项目
## 📄 许可证
MIT License
## 📞 联系方式
如有问题或建议,请通过以下方式联系:
- 提交GitHub Issue
- 发送邮件至:[your-email@example.com]
---
**注意**: 这是一个演示项目,实际使用时需要根据具体的硬件设备调整代码实现。

@ -0,0 +1,17 @@
"""
RoboMaster无人机控制系统
一个集成了无人机控制和YOLO实时目标检测的智能控制系统
"""
__version__ = "1.0.0"
__author__ = "AI Assistant"
__description__ = "RoboMaster无人机和无人车智能控制系统"
# 导入主要模块
from .drone_controller import RoboMasterController
from .yolo_stream import YOLOStream
__all__ = [
"RoboMasterController",
"YOLOStream"
]

@ -0,0 +1,248 @@
# robomaster_control/api.py
"""
RoboMaster无人机和无人车控制API接口
"""
from fastapi import FastAPI, WebSocket, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from robomaster_control.drone_controller import RoboMasterController
from robomaster_control.yolo_stream import YOLOStream
import json
import time
from typing import Dict, Any
app = FastAPI(title="RoboMaster控制系统", version="1.0.0")
# 允许跨域,方便前端本地调试
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件
app.mount("/static", StaticFiles(directory="robomaster_control/static"), name="static")
# 控制器实例
drone = RoboMasterController()
# 多路YOLO流实例
yolo_streams: Dict[str, YOLOStream] = {
"drone": YOLOStream(model_path='yolov5s.pt', source=0, stream_id="drone"),
}
# ==================== 无人机控制接口 ====================
@app.post("/drone/connect")
def drone_connect(ip_address: str = "192.168.10.1", port: int = 8889):
"""连接无人机"""
return {"msg": drone.connect(ip_address, port)}
@app.post("/drone/disconnect")
def drone_disconnect():
"""断开无人机"""
return {"msg": drone.disconnect()}
@app.post("/drone/takeoff")
def drone_takeoff():
"""无人机起飞"""
return {"msg": drone.takeoff()}
@app.post("/drone/land")
def drone_land():
"""无人机降落"""
return {"msg": drone.land()}
@app.post("/drone/emergency_land")
def drone_emergency_land():
"""无人机紧急降落"""
return {"msg": drone.emergency_land()}
@app.post("/drone/move_forward")
def drone_move_forward(distance: float = 1.0):
"""无人机前进"""
return {"msg": drone.move_forward(distance)}
@app.post("/drone/move_backward")
def drone_move_backward(distance: float = 1.0):
"""无人机后退"""
return {"msg": drone.move_backward(distance)}
@app.post("/drone/move_left")
def drone_move_left(distance: float = 1.0):
"""无人机左移"""
return {"msg": drone.move_left(distance)}
@app.post("/drone/move_right")
def drone_move_right(distance: float = 1.0):
"""无人机右移"""
return {"msg": drone.move_right(distance)}
@app.post("/drone/move_up")
def drone_move_up(distance: float = 1.0):
"""无人机上升"""
return {"msg": drone.move_up(distance)}
@app.post("/drone/move_down")
def drone_move_down(distance: float = 1.0):
"""无人机下降"""
return {"msg": drone.move_down(distance)}
@app.post("/drone/rotate_left")
def drone_rotate_left(angle: float = 90.0):
"""无人机左转"""
return {"msg": drone.rotate_left(angle)}
@app.post("/drone/rotate_right")
def drone_rotate_right(angle: float = 90.0):
"""无人机右转"""
return {"msg": drone.rotate_right(angle)}
@app.post("/drone/reconnaissance/right")
def drone_reconnaissance_right():
"""无人机侦察-右转"""
return {"msg": drone.reconnaissance_right()}
@app.post("/drone/reconnaissance/stop")
def drone_reconnaissance_stop():
"""无人机侦察-停止"""
return {"msg": drone.reconnaissance_stop()}
@app.post("/drone/hover")
def drone_hover():
"""无人机悬停"""
return {"msg": drone.hover()}
@app.post("/drone/cruise")
def drone_cruise():
"""无人机巡航"""
return {"msg": drone.cruise()}
@app.post("/drone/stop_mission")
def drone_stop_mission():
"""停止无人机任务"""
return {"msg": drone.stop_mission()}
@app.post("/drone/set_flight_mode")
def drone_set_flight_mode(mode: str):
"""设置无人机飞行模式"""
return {"msg": drone.set_flight_mode(mode)}
@app.get("/drone/battery")
def drone_battery():
"""获取无人机电池状态"""
battery = drone.get_battery_status()
return {"battery": battery}
@app.get("/drone/attitude")
def drone_attitude():
"""获取无人机姿态信息"""
attitude = drone.get_attitude()
return {"attitude": attitude}
@app.get("/drone/status")
def drone_status():
"""获取无人机状态"""
return drone.get_status()
# ==================== YOLO视频流接口 ====================
@app.websocket("/ws/yolo/{stream_name}")
async def websocket_yolo(websocket: WebSocket, stream_name: str):
"""
YOLO识别视频流WebSocket接口
Args:
stream_name: 流名称 (drone/car)
"""
if stream_name not in yolo_streams:
await websocket.close(code=4004, reason="Invalid stream name")
return
await yolo_streams[stream_name].stream(websocket)
@app.post("/yolo/set_source/{stream_name}")
async def set_yolo_source(stream_name: str, request: Request):
"""
设置YOLO视频流源和模型权重
Args:
stream_name: 流名称 (drone/car)
"""
if stream_name not in yolo_streams:
raise HTTPException(status_code=400, detail=f"Invalid stream name: {stream_name}")
try:
data = await request.json()
source = data.get("source", 0)
model_path = data.get("model_path", "yolov5s.pt")
# 释放旧流
yolo_streams[stream_name].release()
# 创建新流
yolo_streams[stream_name] = YOLOStream(
model_path=model_path,
source=source,
stream_id=stream_name
)
return {"msg": f"{stream_name}流已切换视频源为{source},模型为{model_path}"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"设置失败: {str(e)}")
@app.get("/yolo/streams")
def get_yolo_streams():
"""获取所有YOLO流信息"""
streams_info = {}
for name, stream in yolo_streams.items():
streams_info[name] = stream.get_stream_info()
return streams_info
@app.get("/yolo/detection_summary/{stream_name}")
def get_detection_summary(stream_name: str):
"""获取检测结果摘要"""
if stream_name not in yolo_streams:
raise HTTPException(status_code=400, detail=f"Invalid stream name: {stream_name}")
return yolo_streams[stream_name].get_detection_summary()
# ==================== 系统状态接口 ====================
@app.get("/system/status")
def system_status():
"""获取系统整体状态"""
return {
"timestamp": time.time(),
"drone": drone.get_status(),
"yolo_streams": {name: stream.get_stream_info() for name, stream in yolo_streams.items()}
}
@app.get("/system/health")
def system_health():
"""系统健康检查"""
return {
"status": "healthy",
"timestamp": time.time(),
"drone_connected": drone.connected,
"yolo_streams_count": len(yolo_streams)
}
@app.get("/")
def root():
"""根路径,返回系统信息"""
return {
"message": "RoboMaster无人机控制系统",
"version": "1.0.0",
"endpoints": {
"drone": "/drone/*",
"yolo": "/yolo/*",
"websocket": "/ws/yolo/{stream_name}",
"static": "/static/index.html",
"docs": "/docs"
}
}

@ -0,0 +1,139 @@
# robomaster_control/config.py
"""
RoboMaster控制系统配置文件
"""
import os
from typing import Dict, Any
class Config:
"""配置管理类"""
# 无人机配置
DRONE_CONFIG = {
"default_ip": "192.168.10.1",
"default_port": 8889,
"connection_timeout": 10,
"sn": "0TQZJ6B0050H87", # 无人机序列号
"conn_type": "sta", # 连接类型: sta/ap
"max_altitude": 120, # 最大飞行高度(米)
"max_speed": 15, # 最大飞行速度(m/s)
"hover_height": 1.5, # 悬停高度(米)
"landing_speed": 0.5, # 降落速度(m/s)
"takeoff_speed": 1.0 # 起飞速度(m/s)
}
# YOLO配置
YOLO_CONFIG = {
"default_model": "yolov5s.pt",
"confidence_threshold": 0.5,
"nms_threshold": 0.4,
"max_detections": 100,
"device": "auto", # auto/cpu/cuda
"stream_fps": 10,
"detection_fps": 30
}
# 视频流配置
STREAM_CONFIG = {
"drone_source": 0, # 无人机视频源
"car_source": 1, # 无人车视频源
"frame_width": 640,
"frame_height": 480,
"jpeg_quality": 85,
"buffer_size": 100
}
# 系统配置
SYSTEM_CONFIG = {
"host": "0.0.0.0",
"port": 8000,
"debug": True,
"log_level": "INFO",
"cors_origins": ["*"],
"static_dir": "robomaster_control/static"
}
# 日志配置
LOG_CONFIG = {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": "robomaster_control.log",
"max_size": 10 * 1024 * 1024, # 10MB
"backup_count": 5
}
@classmethod
def get_drone_config(cls) -> Dict[str, Any]:
"""获取无人机配置"""
return cls.DRONE_CONFIG.copy()
@classmethod
def get_yolo_config(cls) -> Dict[str, Any]:
"""获取YOLO配置"""
return cls.YOLO_CONFIG.copy()
@classmethod
def get_stream_config(cls) -> Dict[str, Any]:
"""获取视频流配置"""
return cls.STREAM_CONFIG.copy()
@classmethod
def get_system_config(cls) -> Dict[str, Any]:
"""获取系统配置"""
return cls.SYSTEM_CONFIG.copy()
@classmethod
def get_log_config(cls) -> Dict[str, Any]:
"""获取日志配置"""
return cls.LOG_CONFIG.copy()
@classmethod
def load_from_env(cls):
"""从环境变量加载配置"""
# 无人机配置
if os.getenv("DRONE_IP"):
cls.DRONE_CONFIG["default_ip"] = os.getenv("DRONE_IP")
if os.getenv("DRONE_PORT"):
cls.DRONE_CONFIG["default_port"] = int(os.getenv("DRONE_PORT"))
if os.getenv("DRONE_SN"):
cls.DRONE_CONFIG["sn"] = os.getenv("DRONE_SN")
# YOLO配置
if os.getenv("YOLO_MODEL"):
cls.YOLO_CONFIG["default_model"] = os.getenv("YOLO_MODEL")
if os.getenv("YOLO_CONFIDENCE"):
cls.YOLO_CONFIG["confidence_threshold"] = float(os.getenv("YOLO_CONFIDENCE"))
# 系统配置
if os.getenv("HOST"):
cls.SYSTEM_CONFIG["host"] = os.getenv("HOST")
if os.getenv("PORT"):
cls.SYSTEM_CONFIG["port"] = int(os.getenv("PORT"))
if os.getenv("DEBUG"):
cls.SYSTEM_CONFIG["debug"] = os.getenv("DEBUG").lower() == "true"
if os.getenv("LOG_LEVEL"):
cls.SYSTEM_CONFIG["log_level"] = os.getenv("LOG_LEVEL")
@classmethod
def get_connection_info(cls, device_type: str = "drone") -> Dict[str, Any]:
"""获取连接信息"""
if device_type.lower() == "drone":
return {
"ip": cls.DRONE_CONFIG["default_ip"],
"port": cls.DRONE_CONFIG["default_port"],
"sn": cls.DRONE_CONFIG["sn"],
"conn_type": cls.DRONE_CONFIG["conn_type"]
}
else:
raise ValueError(f"Unknown device type: {device_type}")
# 加载环境变量配置
Config.load_from_env()

@ -0,0 +1,609 @@
# robomaster_control/drone_controller.py
"""
无人机控制核心逻辑模块 - 集成RoboMaster SDK
"""
import time
import threading
from typing import Dict, Any, List, Tuple
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RoboMasterController:
def __init__(self):
"""
初始化无人机控制器
"""
self.connected = False
self.flying = False
self.drone = None # RoboMaster SDK实例
self.status = {
"battery": 100,
"altitude": 0,
"speed": {"x": 0, "y": 0, "z": 0},
"position": {"x": 0, "y": 0, "z": 0},
"camera_angle": {"pitch": 0, "yaw": 0, "roll": 0},
"flight_mode": "unknown",
"gps": {"lat": 0, "lng": 0},
"attitude": {"pitch": 0, "roll": 0, "yaw": 0}
}
self.mission_running = False
self.status_monitor_thread = None
# 飞行参数配置
self.flight_config = {
"max_altitude": 120, # 最大飞行高度(米)
"max_speed": 15, # 最大飞行速度(m/s)
"hover_height": 1.5, # 悬停高度(米)
"landing_speed": 0.5, # 降落速度(m/s)
"takeoff_speed": 1.0 # 起飞速度(m/s)
}
def connect(self, ip_address="192.168.10.1", port=8889):
"""
连接到RoboMaster无人机
Args:
ip_address: 无人机IP地址
port: 连接端口
"""
try:
# 尝试导入RoboMaster SDK
try:
from robomaster import robot
logger.info("RoboMaster SDK导入成功")
except ImportError:
logger.warning("RoboMaster SDK未安装使用模拟模式")
return self._simulate_connect()
# 创建机器人实例
self.drone = robot.Robot()
# 连接到无人机
logger.info(f"正在连接到无人机 {ip_address}:{port}")
self.drone.initialize(conn_type="sta", sn="0TQZJ6B0050H87")
# 获取无人机信息
self._get_drone_info()
self.connected = True
self._start_status_monitor()
logger.info("无人机连接成功")
return "已连接到RoboMaster无人机"
except Exception as e:
logger.error(f"连接无人机失败: {e}")
return f"连接失败: {str(e)}"
def _simulate_connect(self):
"""
模拟连接当SDK不可用时
"""
logger.info("使用模拟模式连接无人机")
self.connected = True
self._start_status_monitor()
return "已连接到无人机(模拟模式)"
def _get_drone_info(self):
"""
获取无人机基本信息
"""
try:
if self.drone:
# 获取SN号
sn = self.drone.get_sn()
logger.info(f"无人机SN: {sn}")
# 获取版本信息
version = self.drone.get_version()
logger.info(f"固件版本: {version}")
except Exception as e:
logger.warning(f"获取无人机信息失败: {e}")
def disconnect(self):
"""
断开无人机连接
"""
try:
if self.drone:
# 如果正在飞行,先降落
if self.flying:
logger.info("正在降落无人机...")
self._land_drone()
# 关闭连接
self.drone.close()
logger.info("无人机连接已关闭")
self.connected = False
self.flying = False
self.mission_running = False
return "已断开无人机连接"
except Exception as e:
logger.error(f"断开连接失败: {e}")
return f"断开连接失败: {str(e)}"
def takeoff(self):
"""
无人机起飞
"""
if not self.connected:
return "请先连接无人机"
try:
if self.drone:
# 使用RoboMaster SDK起飞
logger.info("开始起飞...")
self.drone.flight.takeoff().wait_for_completed()
self.flying = True
self.status["altitude"] = self.flight_config["hover_height"]
logger.info("起飞完成")
else:
# 模拟起飞
self.flying = True
self.status["altitude"] = self.flight_config["hover_height"]
logger.info("模拟起飞完成")
return "无人机已起飞"
except Exception as e:
logger.error(f"起飞失败: {e}")
return f"起飞失败: {str(e)}"
def land(self):
"""
无人机降落
"""
if not self.connected or not self.flying:
return "无人机未起飞"
try:
if self.drone:
# 使用RoboMaster SDK降落
logger.info("开始降落...")
self.drone.flight.land().wait_for_completed()
self.flying = False
self.status["altitude"] = 0
logger.info("降落完成")
else:
# 模拟降落
self.flying = False
self.status["altitude"] = 0
logger.info("模拟降落完成")
return "无人机已降落"
except Exception as e:
logger.error(f"降落失败: {e}")
return f"降落失败: {str(e)}"
def _land_drone(self):
"""
内部降落方法
"""
try:
if self.drone and self.flying:
self.drone.flight.land().wait_for_completed()
self.flying = False
self.status["altitude"] = 0
except Exception as e:
logger.error(f"内部降落失败: {e}")
def emergency_land(self):
"""
紧急降落
"""
if not self.connected:
return "请先连接无人机"
try:
if self.drone:
# 紧急降落
logger.warning("执行紧急降落!")
self.drone.flight.emergency_land()
self.flying = False
self.mission_running = False
self.status["altitude"] = 0
else:
# 模拟紧急降落
self.flying = False
self.mission_running = False
self.status["altitude"] = 0
return "紧急降落已执行"
except Exception as e:
logger.error(f"紧急降落失败: {e}")
return f"紧急降落失败: {str(e)}"
def move_forward(self, distance: float = 1.0):
"""
前进指定距离
Args:
distance: 前进距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
# 使用RoboMaster SDK前进
logger.info(f"前进 {distance}")
self.drone.flight.forward(distance).wait_for_completed()
self.status["position"]["x"] += distance
else:
# 模拟前进
time.sleep(distance / 2) # 模拟飞行时间
self.status["position"]["x"] += distance
return f"前进{distance}米完成"
except Exception as e:
logger.error(f"前进失败: {e}")
return f"前进失败: {str(e)}"
def move_backward(self, distance: float = 1.0):
"""
后退指定距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"后退 {distance}")
self.drone.flight.backward(distance).wait_for_completed()
self.status["position"]["x"] -= distance
else:
time.sleep(distance / 2)
self.status["position"]["x"] -= distance
return f"后退{distance}米完成"
except Exception as e:
logger.error(f"后退失败: {e}")
return f"后退失败: {str(e)}"
def move_left(self, distance: float = 1.0):
"""
左移指定距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"左移 {distance}")
self.drone.flight.left(distance).wait_for_completed()
self.status["position"]["y"] += distance
else:
time.sleep(distance / 2)
self.status["position"]["y"] += distance
return f"左移{distance}米完成"
except Exception as e:
logger.error(f"左移失败: {e}")
return f"左移失败: {str(e)}"
def move_right(self, distance: float = 1.0):
"""
右移指定距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"右移 {distance}")
self.drone.flight.right(distance).wait_for_completed()
self.status["position"]["y"] -= distance
else:
time.sleep(distance / 2)
self.status["position"]["y"] -= distance
return f"右移{distance}米完成"
except Exception as e:
logger.error(f"右移失败: {e}")
return f"右移失败: {str(e)}"
def move_up(self, distance: float = 1.0):
"""
上升指定距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"上升 {distance}")
self.drone.flight.up(distance).wait_for_completed()
self.status["altitude"] += distance
self.status["position"]["z"] += distance
else:
time.sleep(distance / 2)
self.status["altitude"] += distance
self.status["position"]["z"] += distance
return f"上升{distance}米完成"
except Exception as e:
logger.error(f"上升失败: {e}")
return f"上升失败: {str(e)}"
def move_down(self, distance: float = 1.0):
"""
下降指定距离
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"下降 {distance}")
self.drone.flight.down(distance).wait_for_completed()
self.status["altitude"] = max(0, self.status["altitude"] - distance)
self.status["position"]["z"] = max(0, self.status["position"]["z"] - distance)
else:
time.sleep(distance / 2)
self.status["altitude"] = max(0, self.status["altitude"] - distance)
self.status["position"]["z"] = max(0, self.status["position"]["z"] - distance)
return f"下降{distance}米完成"
except Exception as e:
logger.error(f"下降失败: {e}")
return f"下降失败: {str(e)}"
def rotate_left(self, angle: float = 90.0):
"""
左转指定角度
Args:
angle: 旋转角度
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"左转 {angle}")
self.drone.flight.rotate_left(angle).wait_for_completed()
self.status["camera_angle"]["yaw"] += angle
else:
time.sleep(abs(angle) / 90) # 模拟旋转时间
self.status["camera_angle"]["yaw"] += angle
return f"左转{angle}度完成"
except Exception as e:
logger.error(f"左转失败: {e}")
return f"左转失败: {str(e)}"
def rotate_right(self, angle: float = 90.0):
"""
右转指定角度
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info(f"右转 {angle}")
self.drone.flight.rotate_right(angle).wait_for_completed()
self.status["camera_angle"]["yaw"] -= angle
else:
time.sleep(abs(angle) / 90)
self.status["camera_angle"]["yaw"] -= angle
return f"右转{angle}度完成"
except Exception as e:
logger.error(f"右转失败: {e}")
return f"右转失败: {str(e)}"
def reconnaissance_right(self):
"""
侦察向右转动视角
"""
if not self.connected:
return "请先连接无人机"
try:
if self.drone:
logger.info("开始侦察模式 - 右转视角")
self.drone.flight.rotate_right(30).wait_for_completed()
self.status["camera_angle"]["yaw"] -= 30
else:
time.sleep(0.5)
self.status["camera_angle"]["yaw"] -= 30
return "无人机视角向右转动30度"
except Exception as e:
logger.error(f"侦察失败: {e}")
return f"侦察失败: {str(e)}"
def reconnaissance_stop(self):
"""
停止侦察停止视角转动
"""
if not self.connected:
return "请先连接无人机"
try:
if self.drone:
logger.info("停止侦察模式")
self.drone.flight.stop()
else:
logger.info("模拟停止侦察")
return "无人机停止视角转动"
except Exception as e:
logger.error(f"停止侦察失败: {e}")
return f"停止侦察失败: {str(e)}"
def hover(self):
"""
悬停
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
try:
if self.drone:
logger.info("进入悬停模式")
self.drone.flight.stop()
else:
logger.info("模拟悬停")
return "无人机已悬停"
except Exception as e:
logger.error(f"悬停失败: {e}")
return f"悬停失败: {str(e)}"
def set_flight_mode(self, mode: str):
"""
设置飞行模式
Args:
mode: 飞行模式 ('manual', 'auto', 'mission')
"""
if not self.connected:
return "请先连接无人机"
try:
if self.drone:
logger.info(f"设置飞行模式: {mode}")
# 这里需要根据实际SDK接口调整
self.status["flight_mode"] = mode
else:
self.status["flight_mode"] = mode
return f"飞行模式已设置为: {mode}"
except Exception as e:
logger.error(f"设置飞行模式失败: {e}")
return f"设置飞行模式失败: {str(e)}"
def get_battery_status(self):
"""
获取电池状态
"""
try:
if self.drone:
battery = self.drone.battery.get_battery()
self.status["battery"] = battery
return battery
else:
return self.status["battery"]
except Exception as e:
logger.error(f"获取电池状态失败: {e}")
return self.status["battery"]
def get_attitude(self):
"""
获取姿态信息
"""
try:
if self.drone:
attitude = self.drone.flight.get_attitude()
self.status["attitude"] = attitude
return attitude
else:
return self.status["attitude"]
except Exception as e:
logger.error(f"获取姿态信息失败: {e}")
return self.status["attitude"]
def cruise(self):
"""
巡航按指定路线移动
"""
if not self.connected or not self.flying:
return "请先连接并起飞无人机"
if self.mission_running:
return "巡航任务正在执行中"
self.mission_running = True
try:
# 定义巡航路线
route = [
("右转", 90),
("前进", 2),
("右转", 90),
("前进", 3),
("右转", 90),
("前进", 4),
("右转", 90),
("前进", 2)
]
for action, value in route:
if not self.mission_running:
break
logger.info(f"巡航任务: {action} {value}")
if action == "前进":
self.move_forward(value)
elif action == "右转":
self.rotate_right(value)
time.sleep(1) # 等待动作完成
self.mission_running = False
return "巡航完成"
except Exception as e:
self.mission_running = False
logger.error(f"巡航失败: {e}")
return f"巡航失败: {str(e)}"
def stop_mission(self):
"""
停止当前任务
"""
self.mission_running = False
logger.info("任务已停止")
return "任务已停止"
def get_status(self) -> Dict[str, Any]:
"""
获取无人机状态
"""
# 更新实时状态
if self.connected:
self.get_battery_status()
self.get_attitude()
return {
"connected": self.connected,
"flying": self.flying,
"mission_running": self.mission_running,
"status": self.status,
"flight_config": self.flight_config
}
def _start_status_monitor(self):
"""
启动状态监控线程
"""
if self.status_monitor_thread and self.status_monitor_thread.is_alive():
return
def monitor():
while self.connected:
try:
# 更新电池状态
self.get_battery_status()
# 更新姿态信息
self.get_attitude()
# 模拟电池消耗
if not self.drone and self.status["battery"] > 0:
self.status["battery"] -= 0.05
time.sleep(2) # 每2秒更新一次
except Exception as e:
logger.error(f"状态监控异常: {e}")
time.sleep(5)
self.status_monitor_thread = threading.Thread(target=monitor, daemon=True)
self.status_monitor_thread.start()
logger.info("状态监控线程已启动")

@ -0,0 +1,38 @@
# RoboMaster控制系统依赖包
# Web框架
fastapi==0.104.1
uvicorn[standard]==0.24.0
# 计算机视觉
opencv-python==4.8.1.78
numpy==1.24.3
# 机器学习
torch==2.1.0
torchvision==0.16.0
ultralytics==8.0.196
# 图像处理
Pillow==10.0.1
# 异步支持
websockets==12.0
# RoboMaster SDK大疆官方SDK
robomaster==1.0.0
# 可选Tello无人机SDK如果需要支持Tello
# djitellopy==1.5
# 开发工具
python-multipart==0.0.6
# 日志和配置
python-dotenv==1.0.0
# 网络请求
requests==2.31.0
# 时间处理
python-dateutil==2.8.2

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RoboMaster控制系统启动脚本
"""
import uvicorn
import sys
import os
from pathlib import Path
def main():
"""
主启动函数
"""
print("🚁 RoboMaster控制系统启动中...")
print("=" * 50)
# 检查依赖
check_dependencies()
# 启动服务
start_server()
def check_dependencies():
"""
检查项目依赖
"""
print("📦 检查项目依赖...")
required_packages = [
'fastapi',
'uvicorn',
'opencv-python',
'torch',
'ultralytics'
]
missing_packages = []
for package in required_packages:
try:
__import__(package.replace('-', '_'))
print(f"{package}")
except ImportError:
missing_packages.append(package)
print(f"{package}")
if missing_packages:
print(f"\n⚠️ 缺少依赖包: {', '.join(missing_packages)}")
print("请运行: pip install -r requirements.txt")
sys.exit(1)
print("✅ 所有依赖检查完成\n")
def start_server():
"""
启动FastAPI服务器
"""
print("🚀 启动服务器...")
print("📍 访问地址: http://localhost:8000")
print("🎮 控制界面: http://localhost:8000/static/index.html")
print("📚 API文档: http://localhost:8000/docs")
print("=" * 50)
try:
uvicorn.run(
"robomaster_control.api:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
except KeyboardInterrupt:
print("\n🛑 服务器已停止")
except Exception as e:
print(f"❌ 启动失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

@ -0,0 +1,547 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RoboMaster控制系统</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Microsoft YaHei', Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 1400px;
margin: 0 auto;
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
box-shadow: 0 20px 40px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
text-align: center;
}
.header h1 {
font-size: 2.5em;
margin-bottom: 10px;
}
.header p {
font-size: 1.1em;
opacity: 0.9;
}
.main-content {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
padding: 20px;
}
.control-panel {
background: #f8f9fa;
border-radius: 10px;
padding: 20px;
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.1);
}
.control-panel h2 {
color: #333;
margin-bottom: 20px;
padding-bottom: 10px;
border-bottom: 2px solid #667eea;
}
.button-group {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(120px, 1fr));
gap: 10px;
margin-bottom: 20px;
}
.btn {
padding: 12px 20px;
border: none;
border-radius: 8px;
cursor: pointer;
font-size: 14px;
font-weight: 600;
transition: all 0.3s ease;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.btn-primary {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
.btn-primary:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4);
}
.btn-success {
background: linear-gradient(135deg, #56ab2f 0%, #a8e6cf 100%);
color: white;
}
.btn-success:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(86, 171, 47, 0.4);
}
.btn-danger {
background: linear-gradient(135deg, #ff416c 0%, #ff4b2b 100%);
color: white;
}
.btn-danger:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(255, 65, 108, 0.4);
}
.btn-warning {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
color: white;
}
.btn-warning:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(240, 147, 251, 0.4);
}
.status-display {
background: white;
border-radius: 8px;
padding: 15px;
margin-top: 20px;
border-left: 4px solid #667eea;
}
.status-item {
display: flex;
justify-content: space-between;
margin-bottom: 8px;
padding: 5px 0;
}
.status-label {
font-weight: 600;
color: #555;
}
.status-value {
color: #667eea;
font-weight: 600;
}
.video-section {
background: #f8f9fa;
border-radius: 10px;
padding: 20px;
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.1);
}
.video-container {
position: relative;
margin-bottom: 20px;
}
.video-container h3 {
color: #333;
margin-bottom: 15px;
padding-bottom: 10px;
border-bottom: 2px solid #667eea;
}
.video-stream {
width: 100%;
max-width: 640px;
height: 480px;
border-radius: 10px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.2);
background: #000;
}
.video-controls {
margin-top: 15px;
display: flex;
gap: 10px;
flex-wrap: wrap;
}
.form-group {
margin-bottom: 15px;
}
.form-group label {
display: block;
margin-bottom: 5px;
font-weight: 600;
color: #555;
}
.form-group input {
width: 100%;
padding: 10px;
border: 2px solid #ddd;
border-radius: 5px;
font-size: 14px;
}
.form-group input:focus {
outline: none;
border-color: #667eea;
}
.detection-info {
background: white;
border-radius: 8px;
padding: 15px;
margin-top: 15px;
border-left: 4px solid #56ab2f;
}
.detection-item {
display: flex;
justify-content: space-between;
margin-bottom: 5px;
padding: 3px 0;
}
.connection-status {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
}
.status-connected {
background: #56ab2f;
}
.status-disconnected {
background: #ff416c;
}
.status-connecting {
background: #f093fb;
animation: pulse 1.5s infinite;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
.grid-2 {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
}
@media (max-width: 1200px) {
.main-content {
grid-template-columns: 1fr;
}
.grid-2 {
grid-template-columns: 1fr;
}
}
@media (max-width: 768px) {
.button-group {
grid-template-columns: repeat(2, 1fr);
}
.video-stream {
height: 300px;
}
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🚁 RoboMaster无人机控制系统</h1>
<p>无人机智能控制平台</p>
</div>
<div class="main-content">
<!-- 控制面板 -->
<div class="control-panel">
<h2>🎮 设备控制</h2>
<!-- 无人机控制 -->
<div class="device-section">
<h3>🚁 无人机控制</h3>
<div class="button-group">
<button class="btn btn-primary" onclick="send('/drone/connect')">连接</button>
<button class="btn btn-danger" onclick="send('/drone/disconnect')">断开</button>
<button class="btn btn-success" onclick="send('/drone/takeoff')">起飞</button>
<button class="btn btn-warning" onclick="send('/drone/land')">降落</button>
<button class="btn btn-danger" onclick="send('/drone/emergency_land')">紧急降落</button>
</div>
<h4>移动控制</h4>
<div class="button-group">
<button class="btn btn-primary" onclick="send('/drone/move_forward')">前进</button>
<button class="btn btn-primary" onclick="send('/drone/move_backward')">后退</button>
<button class="btn btn-primary" onclick="send('/drone/move_left')">左移</button>
<button class="btn btn-primary" onclick="send('/drone/move_right')">右移</button>
<button class="btn btn-primary" onclick="send('/drone/move_up')">上升</button>
<button class="btn btn-primary" onclick="send('/drone/move_down')">下降</button>
</div>
<h4>旋转控制</h4>
<div class="button-group">
<button class="btn btn-primary" onclick="send('/drone/rotate_left')">左转</button>
<button class="btn btn-primary" onclick="send('/drone/rotate_right')">右转</button>
<button class="btn btn-warning" onclick="send('/drone/hover')">悬停</button>
</div>
<h4>任务控制</h4>
<div class="button-group">
<button class="btn btn-success" onclick="send('/drone/reconnaissance/right')">侦察</button>
<button class="btn btn-warning" onclick="send('/drone/reconnaissance/stop')">停止侦察</button>
<button class="btn btn-success" onclick="send('/drone/cruise')">巡航</button>
<button class="btn btn-danger" onclick="send('/drone/stop_mission')">停止任务</button>
</div>
</div>
<!-- 状态显示 -->
<div class="status-display">
<h4>📊 设备状态</h4>
<div id="device-status">
<div class="status-item">
<span class="status-label">无人机:</span>
<span class="status-value" id="drone-status">未连接</span>
</div>
</div>
</div>
</div>
<!-- 视频流面板 -->
<div class="video-section">
<h2>📹 YOLO识别视频流</h2>
<!-- 无人机视频流 -->
<div class="video-container">
<h3>🚁 无人机识别流</h3>
<img id="drone-video" class="video-stream" alt="无人机视频流">
<div class="video-controls">
<button class="btn btn-primary" onclick="startYoloStream('drone')">连接</button>
<button class="btn btn-danger" onclick="stopYoloStream('drone')">断开</button>
<span class="connection-status" id="drone-stream-status"></span>
<span id="drone-stream-text">未连接</span>
</div>
<div class="form-group">
<label>视频源:</label>
<input type="text" id="drone-source" value="0" placeholder="摄像头设备号或RTSP地址">
</div>
<div class="form-group">
<label>模型权重:</label>
<input type="text" id="drone-model" value="yolov5s.pt" placeholder="模型文件路径">
</div>
<button class="btn btn-primary" onclick="setYoloConfig('drone')">切换配置</button>
<div class="detection-info" id="drone-detection-info">
<h4>检测信息</h4>
<div id="drone-detection-details">等待连接...</div>
</div>
</div>
</div>
</div>
</div>
<script>
// WebSocket连接管理
const wsConnections = {};
// API基础URL
const API_BASE = 'http://localhost:8000';
// 发送API请求
async function send(endpoint, method = 'POST', data = null) {
try {
const options = {
method: method,
headers: {
'Content-Type': 'application/json'
}
};
if (data) {
options.body = JSON.stringify(data);
}
const response = await fetch(API_BASE + endpoint, options);
const result = await response.json();
// 显示结果
showMessage(result.msg || result.message || '操作完成');
// 更新状态
updateDeviceStatus();
return result;
} catch (error) {
console.error('API请求失败:', error);
showMessage('请求失败: ' + error.message, 'error');
}
}
// 显示消息
function showMessage(message, type = 'info') {
// 这里可以添加更好的消息提示UI
console.log(`[${type.toUpperCase()}] ${message}`);
alert(message);
}
// 更新设备状态
async function updateDeviceStatus() {
try {
const droneStatus = await fetch(API_BASE + '/drone/status').then(r => r.json());
document.getElementById('drone-status').textContent =
droneStatus.connected ? '已连接' : '未连接';
} catch (error) {
console.error('获取状态失败:', error);
}
}
// YOLO流控制
function startYoloStream(streamName) {
if (wsConnections[streamName]) {
return;
}
const ws = new WebSocket(`ws://localhost:8000/ws/yolo/${streamName}`);
ws.onopen = () => {
updateStreamStatus(streamName, 'connected', '已连接');
};
ws.onclose = () => {
updateStreamStatus(streamName, 'disconnected', '已断开');
wsConnections[streamName] = null;
};
ws.onerror = () => {
updateStreamStatus(streamName, 'disconnected', '连接出错');
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'frame') {
// 更新视频流
const videoElement = document.getElementById(`${streamName}-video`);
videoElement.src = 'data:image/jpeg;base64,' + data.data;
} else if (data.type === 'detections') {
// 更新检测信息
updateDetectionInfo(streamName, data.data, data.summary);
}
} catch (error) {
console.error('处理WebSocket消息失败:', error);
}
};
wsConnections[streamName] = ws;
}
function stopYoloStream(streamName) {
if (wsConnections[streamName]) {
wsConnections[streamName].close();
wsConnections[streamName] = null;
}
}
function updateStreamStatus(streamName, status, text) {
const statusElement = document.getElementById(`${streamName}-stream-status`);
const textElement = document.getElementById(`${streamName}-stream-text`);
statusElement.className = `connection-status status-${status}`;
textElement.textContent = text;
}
function updateDetectionInfo(streamName, detections, summary) {
const detailsElement = document.getElementById(`${streamName}-detection-details`);
if (!detections || detections.length === 0) {
detailsElement.innerHTML = '<p>未检测到目标</p>';
return;
}
let html = `<p>检测到 ${detections.length} 个目标</p>`;
// 按类别统计
const classCounts = {};
detections.forEach(det => {
classCounts[det.class_name] = (classCounts[det.class_name] || 0) + 1;
});
html += '<ul>';
Object.entries(classCounts).forEach(([className, count]) => {
html += `<li>${className}: ${count}个</li>`;
});
html += '</ul>';
if (summary) {
html += `<p>平均置信度: ${(summary.average_confidence * 100).toFixed(1)}%</p>`;
html += `<p>FPS: ${summary.fps}</p>`;
}
detailsElement.innerHTML = html;
}
async function setYoloConfig(streamName) {
const source = document.getElementById(`${streamName}-source`).value;
const model = document.getElementById(`${streamName}-model`).value;
try {
await send(`/yolo/set_source/${streamName}`, 'POST', {
source: source,
model_path: model
});
// 重新连接流
stopYoloStream(streamName);
setTimeout(() => startYoloStream(streamName), 500);
} catch (error) {
console.error('设置配置失败:', error);
}
}
// 页面加载完成后初始化
document.addEventListener('DOMContentLoaded', function() {
// 更新设备状态
updateDeviceStatus();
// 定期更新状态
setInterval(updateDeviceStatus, 5000);
});
</script>
</body>
</html>

@ -0,0 +1,268 @@
# robomaster_control/yolo_stream.py
import cv2
import torch
import base64
import asyncio
import json
import time
import threading
from fastapi import WebSocket
from typing import Dict, List, Any, Optional
from collections import deque
class YOLOStream:
def __init__(self, model_path='yolov5s.pt', source=0, stream_id="default"):
"""
初始化YOLO模型和视频流
Args:
model_path: YOLO模型权重路径
source: 视频流源0为本地摄像头或RTSP/UDP地址
stream_id: 流标识符
"""
self.model_path = model_path
self.source = source
self.stream_id = stream_id
self.model = None
self.cap = None
self.is_running = False
self.detection_thread = None
self.latest_results = []
self.latest_frame = None
self.frame_count = 0
self.fps = 0
self.last_fps_time = time.time()
# 检测结果历史记录
self.detection_history = deque(maxlen=100)
self.load_model_and_stream()
def load_model_and_stream(self):
"""加载模型和视频流"""
try:
# 加载YOLO模型
self.model = torch.hub.load('ultralytics/yolov5', 'custom', path=self.model_path, force_reload=True)
print(f"YOLO模型 {self.model_path} 加载成功")
# 打开视频流
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
raise Exception(f"无法打开视频源: {self.source}")
print(f"视频流 {self.source} 打开成功")
except Exception as e:
print(f"加载模型或视频流失败: {e}")
raise
def release(self):
"""释放资源"""
self.is_running = False
if self.detection_thread and self.detection_thread.is_alive():
self.detection_thread.join(timeout=2)
if self.cap is not None:
self.cap.release()
self.cap = None
def start_detection_loop(self):
"""启动检测循环线程"""
if self.is_running:
return
self.is_running = True
self.detection_thread = threading.Thread(target=self._detection_loop, daemon=True)
self.detection_thread.start()
def _detection_loop(self):
"""检测循环(在独立线程中运行)"""
while self.is_running:
try:
frame = self.get_frame()
if frame is not None:
self.latest_frame = frame
self.frame_count += 1
# 计算FPS
current_time = time.time()
if current_time - self.last_fps_time >= 1.0:
self.fps = self.frame_count
self.frame_count = 0
self.last_fps_time = current_time
time.sleep(0.033) # 约30FPS
except Exception as e:
print(f"检测循环异常: {e}")
time.sleep(1)
def get_frame(self):
"""
获取一帧视频并进行YOLO识别返回带框图片
"""
if self.cap is None:
return None
ret, frame = self.cap.read()
if not ret:
return None
try:
# BGR转RGB
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# YOLO检测
results = self.model(img)
# 处理检测结果
detections = self._process_detections(results, frame.shape)
self.latest_results = detections
# 添加到历史记录
self.detection_history.append({
'timestamp': time.time(),
'detections': detections
})
# 在图片上绘制检测框
results.render()
img = results.ims[0]
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 添加FPS信息
cv2.putText(img, f"FPS: {self.fps}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.putText(img, f"Detections: {len(detections)}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
return img
except Exception as e:
print(f"处理帧异常: {e}")
return frame
def _process_detections(self, results, frame_shape):
"""
处理检测结果
Args:
results: YOLO检测结果
frame_shape: 图像尺寸
Returns:
处理后的检测结果列表
"""
detections = []
height, width = frame_shape[:2]
for *xyxy, conf, cls in results.xyxy[0]:
if conf >= 0.5: # 置信度阈值
x1, y1, x2, y2 = [int(x) for x in xyxy]
detection = {
'class_id': int(cls),
'class_name': results.names[int(cls)],
'confidence': float(conf),
'bbox': [x1, y1, x2, y2],
'center': [int((x1 + x2) / 2), int((y1 + y2) / 2)],
'area': (x2 - x1) * (y2 - y1),
'relative_position': {
'x': (x1 + x2) / (2 * width),
'y': (y1 + y2) / (2 * height)
}
}
detections.append(detection)
return detections
def frame_to_base64(self, frame):
"""
将图片帧转为base64字符串
"""
if frame is None:
return None
try:
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
return jpg_as_text
except Exception as e:
print(f"图像编码失败: {e}")
return None
def get_detection_summary(self):
"""
获取检测结果摘要
"""
if not self.latest_results:
return {}
class_counts = {}
total_confidence = 0
for detection in self.latest_results:
class_name = detection['class_name']
class_counts[class_name] = class_counts.get(class_name, 0) + 1
total_confidence += detection['confidence']
return {
'total_detections': len(self.latest_results),
'class_counts': class_counts,
'average_confidence': total_confidence / len(self.latest_results) if self.latest_results else 0,
'fps': self.fps
}
async def stream(self, websocket: WebSocket, fps=10):
"""
不断推送识别后的视频帧到WebSocket
Args:
websocket: WebSocket连接
fps: 推送帧率默认10帧每秒
"""
await websocket.accept()
# 启动检测循环
self.start_detection_loop()
try:
while True:
# 检查WebSocket连接状态
if websocket.client_state.value > 2: # 连接已关闭
break
# 获取最新帧
if self.latest_frame is not None:
img_str = self.frame_to_base64(self.latest_frame)
if img_str:
# 发送图像数据
await websocket.send_text(json.dumps({
'type': 'frame',
'data': img_str,
'timestamp': time.time()
}))
# 发送检测结果
if self.latest_results:
await websocket.send_text(json.dumps({
'type': 'detections',
'data': self.latest_results,
'summary': self.get_detection_summary(),
'timestamp': time.time()
}))
await asyncio.sleep(1 / fps)
except Exception as e:
print(f"WebSocket流异常: {e}")
finally:
self.release()
def get_stream_info(self):
"""
获取流信息
"""
return {
'stream_id': self.stream_id,
'source': self.source,
'model_path': self.model_path,
'is_running': self.is_running,
'fps': self.fps,
'frame_count': self.frame_count,
'detection_count': len(self.latest_results)
}

@ -0,0 +1,63 @@
import socket
import time
import threading
import av
import cv2
import base64
from flask import Flask
from flask_socketio import SocketIO
# Tello配置
TELLO_IP = '192.168.10.1'
TELLO_PORT = 8889
LOCAL_PORT = 9000
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', LOCAL_PORT))
def send_command(cmd):
sock.sendto(cmd.encode('utf-8'), (TELLO_IP, TELLO_PORT))
try:
sock.settimeout(2)
response, _ = sock.recvfrom(1024)
print(f"Command: {cmd}, Response: {response.decode()}")
return True
except socket.timeout:
print(f"Timeout for command: {cmd}")
return False
# 初始化Tello
send_command('command')
time.sleep(1)
send_command('streamon')
time.sleep(1)
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('drone_command')
def handle_drone_command(data):
command = data.get('command')
if command:
success = send_command(command)
socketio.emit('command_response', {'success': success, 'command': command})
def video_stream():
container = av.open('udp://0.0.0.0:11111', format='h264')
try:
for frame in container.decode(video=0):
img = frame.to_ndarray(format='bgr24')
img = cv2.resize(img, (640, 480))
_, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 70])
img_base64 = base64.b64encode(buffer).decode('utf-8')
socketio.emit('video_frame', {'image': img_base64})
except Exception as e:
print(f"视频流错误: {e}")
finally:
container.close()
if __name__ == '__main__':
video_thread = threading.Thread(target=video_stream)
video_thread.daemon = True
video_thread.start()
socketio.run(app, host='0.0.0.0', port=5000, debug=True)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save