feat: 增加rpbomaster

main
pfqgauxfb 2 months ago
parent 5fafbe2dfd
commit 3d0a8fb4c9

@ -0,0 +1,6 @@
flask
opencv-python
mammoth
ultralytics
numpy
av

@ -0,0 +1,11 @@
2025-07-05 08:21:39,500 INFO: 127.0.0.1 - - [05/Jul/2025 08:21:39] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:22:25,639 INFO: 127.0.0.1 - - [05/Jul/2025 08:22:25] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:26:42,611 INFO: 127.0.0.1 - - [05/Jul/2025 08:26:42] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:30:19,148 INFO: 127.0.0.1 - - [05/Jul/2025 08:30:19] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:30:37,602 INFO: 127.0.0.1 - - [05/Jul/2025 08:30:37] "GET /main_menu HTTP/1.1" 200 -
2025-07-05 08:30:37,606 INFO: 127.0.0.1 - - [05/Jul/2025 08:30:37] "GET /static/custom.css HTTP/1.1" 304 -
2025-07-05 08:30:37,610 INFO: 127.0.0.1 - - [05/Jul/2025 08:30:37] "GET /static/background/page.png HTTP/1.1" 304 -
2025-07-05 08:34:04,823 INFO: 127.0.0.1 - - [05/Jul/2025 08:34:04] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:37:38,118 INFO: 127.0.0.1 - - [05/Jul/2025 08:37:38] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:40:42,872 INFO: 127.0.0.1 - - [05/Jul/2025 08:40:42] "POST /run_path HTTP/1.1" 200 -
2025-07-05 08:42:31,091 INFO: 127.0.0.1 - - [05/Jul/2025 08:42:31] "POST /run_path HTTP/1.1" 200 -

@ -0,0 +1,913 @@
import socket
import time
import cv2
import os
import av
from ultralytics import YOLO
from flask import Flask, request, jsonify, render_template, Response, send_from_directory, redirect, url_for, session
import threading
from queue import Queue
import numpy as np
import webbrowser
from werkzeug.utils import secure_filename
import mammoth # 用于docx转html如未安装请pip install mammoth
import datetime
from collections import Counter
import shutil
import pymysql
import hashlib
import re
import sys
app = Flask(__name__)
app.secret_key = 'your-very-secret-key-123456' # 用于Session的密钥建议更换为随机字符串
# 全局变量
latest_image_path = None
container = None
frame_iter = None
sock = None
frame_queue = Queue(maxsize=30) # 用于存储视频帧的队列
detection_queue = Queue() # 用于存储待检测图片的队列
is_running = True # 控制线程运行状态
# 新增:无人机状态全局变量
global connected
connected = False
global battery_level
battery_level = 0
global video_thread_started
video_thread_started = False
# 1. 发送指令让Tello进入SDK模式并开启视频流
tello_address = ('192.168.10.1', 8889)
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
ALLOWED_EXTENSIONS = {'py'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# 拍照信息记录列表
detected_photos = [] # 每项: {'filename', 'time', 'action', 'direction', 'desc'}
# 拍照动作与方位映射(可根据实际调整)
DIRECTION_MAP = {
'forward': '正前方',
'rotation_0': '正前方',
'rotation_1': '东侧',
'rotation_2': '南侧',
'rotation_3': '西侧',
'rotation_4': '北侧',
# 可继续补充
}
# 英文标签到中文的映射(可根据实际模型类别补充)
LABEL_MAP = {
'person': '',
'car': '汽车',
'truck': '卡车',
'tank': '坦克',
'bus': '大巴',
'bicycle': '自行车',
'motorcycle': '摩托车',
# ... 其他类别 ...
}
# 数据库连接配置
DB_CONFIG = dict(host='localhost', user='root', password='123456', database='robomaster', charset='utf8mb4')
def get_db():
return pymysql.connect(**DB_CONFIG)
def hash_password(pw):
return hashlib.sha256(pw.encode('utf-8')).hexdigest()
def create_socket():
"""创建并返回一个新的socket连接"""
global sock
if sock:
try:
sock.close()
except:
pass
# 先尝试释放端口
try:
import socket
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
temp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
temp_sock.bind(('', 9000))
temp_sock.close()
time.sleep(0.1) # 给系统一点时间释放端口
except:
pass
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 添加端口重用选项
sock.bind(('', 9000))
sock.settimeout(5.0) # 设置5秒超时
return sock
def send_with_retry(cmd, max_retries=3, delay=1):
"""发送命令并重试,直到成功或达到最大重试次数"""
global sock
if not sock:
sock = create_socket()
# 设置socket超时
sock.settimeout(3.0) # 减少超时时间到3秒
for i in range(max_retries):
try:
print(f"Sending command: {cmd}")
# 在发送命令前暂停视频流处理
if cmd in ['forward', 'back', 'left', 'right', 'up', 'down', 'cw', 'ccw', 'takeoff']:
print("Pausing video stream for movement command...")
time.sleep(0.2) # 增加暂停时间以确保命令能被处理
sock.sendto(cmd.encode('utf-8'), tello_address)
try:
response, _ = sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"Response: {response_str}")
if response_str.lower() == 'ok':
# 如果是移动命令,等待一小段时间确保命令执行
if cmd in ['forward', 'back', 'left', 'right', 'up', 'down', 'cw', 'ccw', 'takeoff']:
time.sleep(0.5) # 增加等待时间
return True
elif response_str.lower() == 'error':
if i < max_retries - 1:
print(f"Command failed, retrying... ({i+1}/{max_retries})")
time.sleep(delay)
continue
else:
# 对于查询命令,返回非空响应即可
if response_str and response_str != '0':
return True
elif i < max_retries - 1:
print(f"Invalid response, retrying... ({i+1}/{max_retries})")
time.sleep(delay)
continue
except socket.timeout:
print(f"Timeout waiting for response, retrying... ({i+1}/{max_retries})")
if i < max_retries - 1:
time.sleep(delay)
continue
except Exception as e:
print(f"Error sending command: {e}")
if i < max_retries - 1:
time.sleep(delay)
# 尝试重新创建socket
try:
sock = create_socket()
sock.settimeout(3.0)
except Exception as e:
print(f"Error recreating socket: {e}")
return False
def init_video_stream():
"""初始化视频流"""
global container, frame_iter
try:
# 尝试关闭可能存在的旧连接
try:
if container:
container.close()
container = None
frame_iter = None
except:
pass
# 尝试释放端口11111
try:
import socket
temp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
temp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
temp_sock.bind(('0.0.0.0', 11111))
temp_sock.close()
time.sleep(0.2) # 给系统更多时间释放端口
except:
pass
# 设置超时时间
container = av.open('udp://0.0.0.0:11111', format='h264', timeout=5.0)
frame_iter = container.decode(video=0)
# 尝试获取第一帧来验证视频流是否正常工作
try:
frame = next(frame_iter, None)
if frame is None:
return None, None
return container, frame_iter
except Exception as e:
return None, None
except Exception as e:
print(f"Error initializing video stream: {e}")
return None, None
def video_stream_thread():
"""视频流处理线程"""
global container, frame_iter, is_running
error_count = 0
max_errors = 3
frame_count = 0 # 添加帧计数器
while is_running:
try:
frame = next(frame_iter, None)
if frame is not None:
# 将帧转换为numpy数组
img = frame.to_ndarray(format='bgr24')
frame_count += 1
# 如果队列满了,移除最旧的帧
if frame_queue.full():
try:
frame_queue.get_nowait()
except:
pass
# 将新帧放入队列
frame_queue.put(img)
error_count = 0 # 重置错误计数
except Exception as e:
error_count += 1
if error_count >= max_errors:
try:
# 尝试重新初始化视频流
container, frame_iter = init_video_stream()
if container is None or frame_iter is None:
time.sleep(1)
else:
error_count = 0
except Exception as reconnect_error:
pass
time.sleep(0.01)
def detection_thread():
"""图像检测线程"""
global is_running
model = YOLO(r'D:\yolo\yolov8n.pt')
while is_running:
try:
# 从检测队列获取图片路径
img_path = detection_queue.get(timeout=1)
if img_path:
try:
results = model(img_path)
for r in results:
save_path = os.path.join(r'D:\yolo\ultralytics\assets', 'detect_results', os.path.basename(img_path))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
r.save(save_path)
print(f"Detection completed for {img_path}")
except Exception as e:
print(f"Error in detection: {e}")
except:
time.sleep(0.01)
def set_latest_image_path(path):
"""设置最新图片路径"""
global latest_image_path
latest_image_path = path
def smooth_land():
"""平滑降落函数"""
print("Starting smooth landing...")
# 首先降低高度到安全高度
print("Lowering to safe height...")
if not send_with_retry('down 20', max_retries=3):
print("Warning: Failed to lower height")
time.sleep(2) # 等待高度调整完成
# 缓慢降低速度
print("Reducing speed...")
for i in range(3):
if not send_with_retry('rc 0 0 -10 0', max_retries=3):
print(f"Warning: Failed to reduce speed at step {i+1}")
time.sleep(0.5)
# 完全停止
print("Stopping all movement...")
if not send_with_retry('rc 0 0 0 0', max_retries=3):
print("Warning: Failed to stop movement")
time.sleep(1)
# 执行降落
print("Executing final landing...")
if not send_with_retry('land', max_retries=3):
print("Warning: Failed to execute landing command")
# 等待降落完成
print("Waiting for landing to complete...")
time.sleep(3)
def gen_frames():
while True:
frame = get_frame() # 你需要实现get_frame返回BGR np.ndarray
if frame is None:
# 返回黑色画面
frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(frame, "No Video", (180, 240), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0,0,255), 2)
ret, buffer = cv2.imencode('.jpg', frame)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.05)
@app.before_request
def require_login():
allowed = {'/login', '/register', '/forgot', '/reset_password', '/static/', '/doc', '/favicon.ico'}
if request.path.startswith('/static/') or request.path in allowed or request.path.startswith('/static/'):
return
if 'user_id' not in session:
return redirect('/login')
@app.route('/')
def index():
if 'user_id' not in session:
return redirect('/login')
return render_template('main_menu.html')
@app.route('/doc')
def doc_viewer():
# 使用绝对路径确保为docx文件
doc_path = r'D:\yolo\doc\软件设计-基于大模型的无人机定点打击软件.docx'
html = ''
try:
import mammoth
with open(doc_path, 'rb') as docx_file:
result = mammoth.convert_to_html(docx_file)
html = result.value
except Exception as e:
html = f'<p>无法加载文档: {e}</p>'
return render_template('doc_viewer.html', doc_html=html)
@app.route('/main_menu')
def main_menu():
return render_template('main_menu.html')
def parse_actions_from_py(filepath):
actions = []
with open(filepath, 'r', encoding='utf-8') as f:
code = f.read()
# 匹配 takeoff(), land()
for m in re.finditer(r'(takeoff|land)\s*\(\s*\)', code):
actions.append({'action': m.group(1), 'params': ''})
# 匹配 move('forward', 50)
for m in re.finditer(r"move\s*\(\s*['\"](\w+)['\"]\s*,\s*(\d+)\s*\)", code):
actions.append({'action': 'move', 'params': f"{m.group(1)}, {m.group(2)}"})
# 匹配 rotate('cw', 90)
for m in re.finditer(r"rotate\s*\(\s*['\"](\w+)['\"]\s*,\s*(\d+)\s*\)", code):
actions.append({'action': 'rotate', 'params': f"{m.group(1)}, {m.group(2)}"})
return actions
def smart_actions_to_natural_language(actions):
descs = []
state = {'flying': False}
for a in actions:
if a['action'] == 'takeoff':
if not state['flying']:
descs.append('起飞')
state['flying'] = True
elif a['action'] == 'land':
if state['flying']:
descs.append('降落')
state['flying'] = False
elif a['action'] == 'move':
try:
direction, dist = a['params'].split(',')
direction = direction.strip()
dist = int(dist.strip())
if direction == 'forward':
descs.append(f'向前飞行{dist/100:.1f}')
elif direction == 'back':
descs.append(f'向后飞行{dist/100:.1f}')
elif direction == 'left':
descs.append(f'向左飞行{dist/100:.1f}')
elif direction == 'right':
descs.append(f'向右飞行{dist/100:.1f}')
elif direction == 'up':
descs.append(f'上升{dist/100:.1f}')
elif direction == 'down':
descs.append(f'下降{dist/100:.1f}')
except Exception:
pass
elif a['action'] == 'rotate':
try:
dir, deg = a['params'].split(',')
dir = dir.strip()
deg = int(deg.strip())
if dir == 'cw':
descs.append(f'顺时针旋转{deg}°')
elif dir == 'ccw':
descs.append(f'逆时针旋转{deg}°')
except Exception:
pass
# 合并连续重复动作
merged = []
for d in descs:
if not merged or d != merged[-1]:
merged.append(d)
# 归纳意图
if not merged:
return '无有效动作'
if merged[0] == '起飞' and merged[-1] == '降落':
return '无人机' + ',然后'.join(merged) + ',完成侦查任务'
return ',然后'.join(merged) + ',搜索目标'
@app.route('/import_path', methods=['GET', 'POST'])
def import_path():
if request.method == 'POST':
file = request.files.get('file')
if file and file.filename.endswith('.py'):
filename = secure_filename(file.filename)
save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(save_path)
actions = parse_actions_from_py(save_path)
nl_desc = smart_actions_to_natural_language(actions)
return render_template('import_path.html', filename=filename, uploaded=True, actions=actions, nl_desc=nl_desc)
else:
return render_template('import_path.html', error='请选择.py文件', uploaded=False)
return render_template('import_path.html', uploaded=False)
@app.route('/run_path', methods=['POST'])
def run_path():
filename = request.form.get('filename')
if not filename:
return jsonify({'success': False, 'msg': '未指定文件'})
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if not os.path.exists(file_path):
return jsonify({'success': False, 'msg': '文件不存在'})
print(f"=== Starting to run {filename} ===")
# 检查无人机连接状态
if not connected:
print("✗ Drone not connected")
return jsonify({'success': False, 'msg': '无人机未连接,请先连接无人机'})
print(f"✓ Drone connected. Socket: {sock}, Address: {tello_address}")
# 运行.py文件但使用导入方式而不是subprocess
try:
import importlib.util
import sys
print(f"Loading module from: {file_path}")
# 动态导入Python文件
spec = importlib.util.spec_from_file_location("mission_module", file_path)
mission_module = importlib.util.module_from_spec(spec)
# 执行模块
spec.loader.exec_module(mission_module)
print("✓ Module loaded successfully")
# 检查是否有专门的运行函数
if hasattr(mission_module, 'run_mission_with_external_resources'):
print("✓ Found run_mission_with_external_resources function")
# 使用专门的函数
def run_mission():
print("=== Starting mission execution thread ===")
try:
result = mission_module.run_mission_with_external_resources(
sock, tello_address, frame_queue, container, frame_iter
)
print(f"Mission execution completed with result: {result}")
except Exception as e:
print(f"✗ Mission execution error: {e}")
import traceback
traceback.print_exc()
mission_thread = threading.Thread(target=run_mission)
mission_thread.daemon = True
mission_thread.start()
print("✓ Mission thread started")
return jsonify({'success': True, 'msg': '任务已开始执行,请查看无人机状态'})
elif hasattr(mission_module, 'main'):
print("✓ Found main function")
# 将主程序的socket和状态传递给模块
mission_module.sock = sock
mission_module.tello_address = tello_address
mission_module.connected = connected
mission_module.frame_queue = frame_queue
mission_module.container = container
mission_module.frame_iter = frame_iter
print(f"✓ Passed resources to module: sock={sock}, connected={connected}")
# 在新线程中运行避免阻塞Flask
def run_mission():
print("=== Starting main function execution thread ===")
try:
mission_module.main()
print("✓ Main function completed")
except Exception as e:
print(f"✗ Main function execution error: {e}")
import traceback
traceback.print_exc()
mission_thread = threading.Thread(target=run_mission)
mission_thread.daemon = True
mission_thread.start()
print("✓ Main function thread started")
return jsonify({'success': True, 'msg': '任务已开始执行,请查看无人机状态'})
else:
print("✗ No executable function found")
return jsonify({'success': False, 'msg': '文件中未找到可执行的函数'})
except Exception as e:
print(f"✗ Module loading/execution error: {e}")
import traceback
traceback.print_exc()
return jsonify({'success': False, 'msg': f'执行失败: {str(e)}'})
@app.route('/drone_control')
def drone_control():
global battery_level
return render_template('drone_control.html', battery=battery_level)
@app.route('/video_feed')
def video_feed():
return Response(gen_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/status')
def status():
global connected, battery_level, sock
# 实时获取电量
try:
if sock is None:
create_socket()
sock.sendto('battery?'.encode('utf-8'), tello_address)
response, _ = sock.recvfrom(1024)
battery_str = response.decode().strip()
battery_level = int(battery_str) if battery_str.isdigit() else 0
except Exception as e:
print(f'Error getting battery: {e}')
return jsonify({
'connected': connected,
'battery': battery_level
})
@app.route('/takeoff')
def takeoff_api():
success = takeoff()
return jsonify({'success': success})
@app.route('/land')
def land_api():
success = land()
return jsonify({'success': success})
@app.route('/move/<direction>/<int:distance>')
def move_api(direction, distance):
success = move(direction, distance)
return jsonify({'success': success})
@app.route('/rotate/<direction>/<int:degrees>')
def rotate_api(direction, degrees):
success = rotate(direction, degrees)
return jsonify({'success': success})
@app.route('/photo_analysis')
def photo_analysis():
# 对每张照片做YOLOv8检测并生成描述
model = YOLO(r'D:\yolo\yolov8n.pt') # 路径按实际调整
for info in detected_photos:
try:
results = model(info['filename'])
names = results[0].names
classes = results[0].boxes.cls.cpu().numpy().astype(int)
counts = Counter(classes)
desc = ''.join([f"{num}{names[cls_id]}" for cls_id, num in counts.items()])
info['desc'] = desc
except Exception as e:
info['desc'] = f"检测失败: {e}"
# 返回所有照片的分析结果
return {'photos': [
{'filename': i['filename'], 'time': i['time'], 'action': i['action'], 'direction': i['direction'], 'desc': i['desc']} for i in detected_photos
]}
@app.route('/analyze_results')
def analyze_results():
model = YOLO(r'D:\yolo\yolov8n.pt') # 路径按实际调整
detect_dir = os.path.join('runs', 'detect')
os.makedirs(detect_dir, exist_ok=True)
summary_lines = []
strike_idx = 1
for info in detected_photos:
try:
results = model(info['filename'])
names = results[0].names
boxes = results[0].boxes
classes = boxes.cls.cpu().numpy().astype(int)
# 统计敌方单位(如坦克、人等)
enemy_indices = [i for i, cls_id in enumerate(classes) if names[cls_id] in ['person', 'tank']]
if enemy_indices:
# 保存图片到detect目录命名为strike000X.jpg
strike_name = f"strike{strike_idx:04d}.jpg"
strike_path = os.path.join(detect_dir, strike_name)
# 画框并转中文标签
img = results[0].orig_img.copy()
for i in enemy_indices:
box = boxes.xyxy[i].cpu().numpy().astype(int)
cls_id = classes[i]
label_en = names[cls_id]
label_cn = LABEL_MAP.get(label_en, label_en)
# 画框和中文标签
cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), (0,0,255), 2)
cv2.putText(img, label_cn, (box[0], box[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2)
cv2.imwrite(strike_path, img)
strike_idx += 1
# 汇总描述
counts = {}
for i in enemy_indices:
cls_id = classes[i]
label_en = names[cls_id]
label_cn = LABEL_MAP.get(label_en, label_en)
counts[label_cn] = counts.get(label_cn, 0) + 1
if counts:
desc = ''.join([f"{num}{label}" for label, num in counts.items()])
summary_lines.append(f"图片 {os.path.basename(info['filename'])}{desc}")
except Exception as e:
summary_lines.append(f"图片 {os.path.basename(info['filename'])}:识别失败 {e}")
# 生成txt汇总
txt_path = os.path.join(detect_dir, 'summary.txt')
with open(txt_path, 'w', encoding='utf-8') as f:
for line in summary_lines:
f.write(line + '\n')
return {'result': '分析完成', 'summary_file': txt_path, 'summary': summary_lines}
def initialize_drone():
"""初始化无人机确保进入SDK模式"""
global connected, battery_level, sock
print("Initializing drone...")
# 重置连接
try:
create_socket()
except Exception as e:
print(f"Error creating socket: {e}")
connected = False
return
# 发送command命令直到成功
print("Entering SDK mode...")
if not send_with_retry('command', max_retries=5, delay=2):
connected = False
print("Failed to enter SDK mode")
return
# 等待SDK模式完全启动
print("Waiting for SDK mode to initialize...")
time.sleep(3)
# 检查电池电量
print("Checking battery level...")
try:
sock.sendto('battery?'.encode('utf-8'), tello_address)
response, _ = sock.recvfrom(1024)
battery_str = response.decode().strip()
battery_level = int(battery_str) if battery_str.isdigit() else 0
print(f"Battery level: {battery_level}%")
except Exception as e:
print(f"Warning: Could not check battery level: {e}")
battery_level = 0
# 检查WiFi信号强度
print("Checking WiFi signal...")
if not send_with_retry('wifi?', max_retries=3):
print("Warning: Could not check WiFi signal")
else:
print("WiFi check completed")
# 检查SDK模式状态
print("Checking SDK mode status...")
if not send_with_retry('sdk?', max_retries=3):
print("Warning: Could not verify SDK mode")
else:
print("SDK mode verified")
# 发送takeoff命令
print("Attempting to take off...")
if not send_with_retry('takeoff', max_retries=3, delay=2):
print("Failed to take off")
connected = False
return
# 等待起飞完成
print("Waiting for takeoff to complete...")
time.sleep(5)
# 开启视频流
print("Starting video stream...")
if not send_with_retry('streamon', max_retries=3, delay=2):
print("Failed to start video stream")
connected = False
return
# 等待视频流开启
time.sleep(2)
# 尝试初始化视频流,如果失败则重试
max_retries = 3
for i in range(max_retries):
try:
container, frame_iter = init_video_stream()
if container is not None and frame_iter is not None:
connected = True
return
time.sleep(2)
except Exception as e:
time.sleep(2)
connected = False
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html')
username = request.form.get('username')
password = request.form.get('password')
security_answer = request.form.get('security_answer')
if not username or not password or not security_answer:
return {'success': False, 'msg': '所有字段均不能为空'}
conn = get_db()
try:
with conn.cursor() as cur:
cur.execute('SELECT id FROM users WHERE username=%s', (username,))
if cur.fetchone():
return {'success': False, 'msg': '用户名已存在'}
cur.execute('INSERT INTO users (username, password, role, security_answer) VALUES (%s, %s, %s, %s)', (username, hash_password(password), 'user', security_answer))
conn.commit()
return {'success': True}
finally:
conn.close()
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
return render_template('login.html')
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return {'success': False, 'msg': '用户名和密码不能为空'}
conn = get_db()
try:
with conn.cursor() as cur:
cur.execute('SELECT id, password, role FROM users WHERE username=%s', (username,))
row = cur.fetchone()
if not row or hash_password(password) != row[1]:
return {'success': False, 'msg': '用户名或密码错误'}
session['user_id'] = row[0]
session['username'] = username
session['role'] = row[2]
return {'success': True}
finally:
conn.close()
@app.route('/logout')
def logout():
session.clear()
return redirect('/login')
@app.route('/user_info')
def user_info():
if 'user_id' not in session:
return {'success': False}
return {'success': True, 'username': session.get('username'), 'role': session.get('role')}
@app.route('/user_center')
def user_center():
if 'user_id' not in session:
return redirect('/login')
return render_template('user_center.html')
@app.route('/forgot', methods=['GET', 'POST'])
def forgot():
if request.method == 'GET':
return render_template('forgot.html')
username = request.form.get('username')
answer = request.form.get('answer')
new_password = request.form.get('new_password')
if not username or not answer or not new_password:
return {'success': False, 'msg': '所有字段均不能为空'}
conn = get_db()
try:
with conn.cursor() as cur:
cur.execute('SELECT id, security_answer FROM users WHERE username=%s', (username,))
row = cur.fetchone()
if not row:
return {'success': False, 'msg': '用户不存在'}
if row[1] != answer:
return {'success': False, 'msg': '安全问题答案错误'}
cur.execute('UPDATE users SET password=%s WHERE id=%s', (hash_password(new_password), row[0]))
conn.commit()
return {'success': True}
finally:
conn.close()
# === 恢复无人机实时操控相关函数 ===
def takeoff():
"""无人机起飞"""
return send_with_retry('takeoff')
def land():
"""无人机降落"""
return send_with_retry('land')
def move(direction, distance):
"""无人机移动 direction: forward/back/left/right/up/down, distance: cm"""
if direction not in ['forward', 'back', 'left', 'right', 'up', 'down']:
return False
cmd = f"{direction} {distance}"
return send_with_retry(cmd)
def rotate(direction, degrees):
"""无人机旋转 direction: cw/ccw, degrees: 1-360"""
if direction not in ['cw', 'ccw']:
return False
cmd = f"{direction} {degrees}"
return send_with_retry(cmd)
@app.route('/reconnect_drone')
def reconnect_drone():
"""重新连接无人机"""
global connected, video_thread_started
try:
# 重置状态
connected = False
video_thread_started = False
# 重新初始化
start_drone_background()
if connected:
return jsonify({'success': True, 'msg': '无人机重新连接成功'})
else:
return jsonify({'success': False, 'msg': '无人机重新连接失败'})
except Exception as e:
return jsonify({'success': False, 'msg': f'重新连接失败: {str(e)}'})
# ========== 新增 get_frame 函数 ==========
def get_frame():
global frame_queue
try:
frame = frame_queue.get_nowait()
return frame
except:
return None
# ========== 启动时自动初始化无人机并启动视频流线程 ==========
def start_drone_background():
global video_thread_started, connected
print("=== Starting drone background initialization ===")
try:
print("Calling initialize_drone()...")
initialize_drone()
print(f"initialize_drone() completed. connected={connected}, video_thread_started={video_thread_started}")
if connected and not video_thread_started:
t = threading.Thread(target=video_stream_thread)
t.daemon = True
t.start()
video_thread_started = True
except Exception as e:
print(f"✗ Drone init error: {e}")
connected = False
import traceback
traceback.print_exc()
print(f"=== Drone background initialization completed. connected={connected}, video_thread_started={video_thread_started} ===")
# ========== 修改 main 函数 ==========
def main():
global container, frame_iter, sock, is_running
# 启动无人机初始化和视频流线程
start_drone_background()
# 启动 Flask 服务,禁用热重载避免多进程冲突
print("=== Starting Flask server ===")
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 292 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

@ -0,0 +1,39 @@
body, html {
font-family: 'Microsoft YaHei', 'Arial', 'KaiTi', '楷体', sans-serif;
margin: 0;
padding: 0;
box-sizing: border-box;
}
* {
box-sizing: border-box;
}
button {
transition: all 0.3s ease;
cursor: pointer;
border: none;
outline: none;
font-family: inherit;
}
button:hover {
transform: translateY(-2px);
}
button:active {
transform: translateY(0);
}
::-webkit-scrollbar {
width: 10px;
background: #eee;
}
::-webkit-scrollbar-thumb {
background: #bdbdbd;
border-radius: 5px;
}
::-webkit-scrollbar-thumb:hover {
background: #999;
}

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>关于本系统的介绍</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
body {
margin: 0;
padding: 0;
width: 100vw;
min-height: 100vh;
background: #f5f5f5;
}
.doc-container {
max-width: 1000px;
margin: 40px auto 0 auto;
background: #fff;
border-radius: 12px;
box-shadow: 0 4px 32px rgba(0,0,0,0.10);
padding: 40px 48px;
}
.doc-title {
font-size: 36px;
font-family: KaiTi, 楷体, serif;
color: #1976d2;
margin-bottom: 32px;
text-align: center;
}
.doc-content {
font-size: 18px;
color: #222;
line-height: 1.8;
}
.back-btn {
display: block;
margin: 32px auto 0 auto;
font-size: 22px;
padding: 10px 40px;
border-radius: 8px;
border: none;
background: #4CAF50;
color: #fff;
cursor: pointer;
}
.back-btn:hover {
background: #388e3c;
}
</style>
</head>
<body>
<div class="doc-container">
<div class="doc-title">关于本系统的介绍</div>
<div class="doc-content">{{ doc_html|safe }}</div>
<button class="back-btn" onclick="window.location.href='/'">返回首页</button>
</div>
</body>
</html>

@ -0,0 +1,518 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>实时操控无人机</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
body {
margin: 0;
padding: 0;
width: 100vw;
height: 100vh;
background: url('/static/background/page.png') no-repeat center center fixed;
background-size: cover;
font-family: 'Microsoft YaHei', 'Arial', sans-serif;
overflow: hidden;
}
/* 顶部状态栏 */
.status-bar {
position: fixed;
top: 0;
left: 0;
right: 0;
height: 50px;
background: rgba(0,0,0,0.9);
backdrop-filter: blur(10px);
display: flex;
justify-content: space-between;
align-items: center;
padding: 0 30px;
z-index: 1000;
border-bottom: 2px solid rgba(255,255,255,0.1);
}
.status-item {
display: flex;
align-items: center;
color: #fff;
font-size: 14px;
}
.status-icon {
margin-right: 8px;
font-size: 16px;
}
.battery-indicator {
display: flex;
align-items: center;
background: rgba(255,255,255,0.1);
border-radius: 15px;
padding: 5px 12px;
}
.battery-bar {
width: 40px;
height: 8px;
background: rgba(255,255,255,0.3);
border-radius: 4px;
margin: 0 8px;
overflow: hidden;
}
.battery-fill {
height: 100%;
background: linear-gradient(90deg, #4CAF50, #388e3c);
transition: width 0.3s ease;
}
.battery-fill.low {
background: linear-gradient(90deg, #f44336, #d32f2f);
}
.battery-fill.medium {
background: linear-gradient(90deg, #ff9800, #f57c00);
}
.drone-container {
display: flex;
height: 100vh;
align-items: center;
justify-content: center;
padding-top: 50px;
}
.video-section {
background: rgba(0,0,0,0.9);
border-radius: 20px;
padding: 30px;
margin-right: 40px;
box-shadow: 0 12px 40px rgba(0,0,0,0.4);
border: 2px solid rgba(255,255,255,0.1);
backdrop-filter: blur(10px);
}
.video-title {
color: #fff;
font-size: 24px;
margin-bottom: 20px;
text-align: center;
font-weight: bold;
}
.video-feed {
width: 500px;
height: 375px;
border-radius: 12px;
border: 3px solid #4CAF50;
background: #222;
box-shadow: 0 8px 25px rgba(76,175,80,0.4);
transition: all 0.3s ease;
}
.video-feed:hover {
transform: scale(1.02);
box-shadow: 0 12px 35px rgba(76,175,80,0.5);
}
.control-section {
background: rgba(255,255,255,0.95);
border-radius: 20px;
padding: 30px;
min-width: 380px;
box-shadow: 0 12px 40px rgba(0,0,0,0.25);
border: 1px solid rgba(255,255,255,0.2);
backdrop-filter: blur(10px);
}
.control-title {
font-size: 26px;
color: #1976d2;
margin-bottom: 25px;
text-align: center;
font-weight: bold;
}
/* 3D控制面板 */
.control-grid {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 12px;
margin-bottom: 20px;
}
.control-btn {
font-size: 16px;
padding: 15px 10px;
border-radius: 12px;
border: none;
background: linear-gradient(135deg, #4CAF50, #388e3c);
color: #fff;
cursor: pointer;
transition: all 0.3s ease;
box-shadow: 0 6px 20px rgba(76,175,80,0.3);
font-weight: bold;
position: relative;
overflow: hidden;
}
.control-btn::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255,255,255,0.2), transparent);
transition: left 0.5s;
}
.control-btn:hover::before {
left: 100%;
}
.control-btn:hover {
background: linear-gradient(135deg, #388e3c, #2e7d32);
transform: translateY(-3px) scale(1.05);
box-shadow: 0 10px 25px rgba(76,175,80,0.4);
}
.control-btn:active {
transform: translateY(-1px) scale(1.02);
}
.control-btn:disabled {
background: linear-gradient(135deg, #9e9e9e, #757575);
cursor: not-allowed;
transform: none;
box-shadow: 0 6px 20px rgba(158,158,158,0.3);
}
/* 特殊按钮样式 */
.btn-takeoff {
background: linear-gradient(135deg, #4CAF50, #388e3c);
grid-column: span 3;
font-size: 18px;
padding: 18px;
}
.btn-land {
background: linear-gradient(135deg, #f44336, #d32f2f);
grid-column: span 3;
font-size: 18px;
padding: 18px;
}
.btn-up {
grid-column: 2;
grid-row: 2;
}
.btn-down {
grid-column: 2;
grid-row: 4;
}
.btn-left {
grid-column: 1;
grid-row: 3;
}
.btn-right {
grid-column: 3;
grid-row: 3;
}
.btn-forward {
grid-column: 2;
grid-row: 1;
}
.btn-back {
grid-column: 2;
grid-row: 5;
}
/* 旋转控制 */
.rotation-controls {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 12px;
margin: 20px 0;
}
.btn-rotate {
background: linear-gradient(135deg, #1976d2, #1565c0);
font-size: 14px;
padding: 12px;
}
.btn-rotate:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
}
/* 状态消息 */
.status-container {
margin-top: 20px;
padding: 15px;
border-radius: 10px;
background: rgba(25,118,210,0.1);
border-left: 4px solid #1976d2;
}
.status-msg {
font-size: 16px;
color: #d32f2f;
text-align: center;
margin-bottom: 8px;
}
.effect-msg {
font-size: 14px;
color: #1976d2;
text-align: center;
font-style: italic;
}
/* 键盘提示 */
.keyboard-hint {
margin-top: 15px;
padding: 10px;
background: rgba(76,175,80,0.1);
border-radius: 8px;
font-size: 12px;
color: #666;
text-align: center;
}
/* 返回按钮 */
.back-btn {
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
padding: 12px 25px;
border-radius: 10px;
border: none;
cursor: pointer;
font-size: 16px;
transition: all 0.3s ease;
box-shadow: 0 6px 20px rgba(25,118,210,0.3);
margin-top: 20px;
width: 100%;
}
.back-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-2px);
box-shadow: 0 8px 25px rgba(25,118,210,0.4);
}
/* 加载动画 */
.loading-spinner {
display: inline-block;
width: 16px;
height: 16px;
border: 2px solid rgba(255,255,255,0.3);
border-radius: 50%;
border-top-color: #fff;
animation: spin 1s linear infinite;
margin-right: 8px;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
/* 响应式设计 */
@media (max-width: 1200px) {
.drone-container {
flex-direction: column;
padding-top: 70px;
}
.video-section {
margin-right: 0;
margin-bottom: 20px;
}
.video-feed {
width: 400px;
height: 300px;
}
}
</style>
</head>
<body>
<!-- 顶部状态栏 -->
<div class="status-bar">
<div class="status-item">
<span class="status-icon">🚁</span>
<span>无人机控制系统</span>
</div>
<div class="status-item">
<span class="status-icon">📡</span>
<span>连接状态: 在线</span>
</div>
<div class="status-item">
<div class="battery-indicator">
<span class="status-icon">🔋</span>
<div class="battery-bar">
<div class="battery-fill" id="batteryFill"></div>
</div>
<span id="batteryLevel">{{ battery }}%</span>
</div>
</div>
</div>
<div class="drone-container">
<div class="video-section">
<div class="video-title">📹 实时视频流</div>
<img src="/video_feed" class="video-feed" id="videoFeed" alt="无人机视频流">
</div>
<div class="control-section">
<div class="control-title">🎮 无人机控制</div>
<!-- 3D控制面板 -->
<div class="control-grid">
<button class="control-btn btn-takeoff" onclick="sendCmd('/takeoff')">🚀 起飞</button>
<button class="control-btn btn-forward" onclick="sendCmd('/move/forward/50')">⬆️ 前进</button>
<button class="control-btn btn-up" onclick="sendCmd('/move/up/50')">⬆️ 上升</button>
<button class="control-btn btn-left" onclick="sendCmd('/move/left/50')">⬅️ 左移</button>
<button class="control-btn btn-right" onclick="sendCmd('/move/right/50')">➡️ 右移</button>
<button class="control-btn btn-down" onclick="sendCmd('/move/down/50')">⬇️ 下降</button>
<button class="control-btn btn-back" onclick="sendCmd('/move/back/50')">⬇️ 后退</button>
<button class="control-btn btn-land" onclick="sendCmd('/land')">🛬 降落</button>
</div>
<!-- 旋转控制 -->
<div class="rotation-controls">
<button class="control-btn btn-rotate" onclick="sendCmd('/rotate/cw/90')">🔄 顺时针90°</button>
<button class="control-btn btn-rotate" onclick="sendCmd('/rotate/ccw/90')">🔄 逆时针90°</button>
<button class="control-btn btn-rotate" onclick="sendCmd('/rotate/cw/180')">🔄 顺时针180°</button>
<button class="control-btn btn-rotate" onclick="sendCmd('/rotate/ccw/180')">🔄 逆时针180°</button>
</div>
<!-- 状态显示 -->
<div class="status-container">
<div class="status-msg" id="statusMsg"></div>
<div class="effect-msg" id="effectMsg"></div>
</div>
<!-- 键盘提示 -->
<div class="keyboard-hint">
💡 键盘控制: WASD移动, QE旋转, 空格起飞, L降落
</div>
<button class="back-btn" onclick="window.location.href='/main_menu'">← 返回主菜单</button>
</div>
</div>
<script>
// 指令到效果的映射
const effectMap = {
'/takeoff': '🚀 无人机起飞,准备执行任务',
'/land': '🛬 无人机降落,安全着陆',
'/move/forward/50': '⬆️ 无人机向前移动50厘米观察前方环境',
'/move/back/50': '⬇️ 无人机向后移动50厘米调整后方位置',
'/move/left/50': '⬅️ 无人机向左移动50厘米侧向侦查',
'/move/right/50': '➡️ 无人机向右移动50厘米侧向侦查',
'/move/up/50': '⬆️ 无人机上升50厘米提升高度',
'/move/down/50': '⬇️ 无人机下降50厘米降低高度',
'/rotate/cw/90': '🔄 无人机顺时针旋转90°调整朝向',
'/rotate/ccw/90': '🔄 无人机逆时针旋转90°调整朝向',
'/rotate/cw/180': '🔄 无人机顺时针旋转180°快速掉头',
'/rotate/ccw/180': '🔄 无人机逆时针旋转180°快速掉头',
};
// 键盘控制映射
const keyMap = {
'w': '/move/forward/50',
's': '/move/back/50',
'a': '/move/left/50',
'd': '/move/right/50',
'q': '/rotate/ccw/90',
'e': '/rotate/cw/90',
' ': '/takeoff',
'l': '/land'
};
function sendCmd(url) {
const btn = event.target;
const originalText = btn.innerHTML;
// 显示加载状态
btn.disabled = true;
btn.innerHTML = '<span class="loading-spinner"></span>执行中...';
fetch(url)
.then(r => r.json())
.then(data => {
let msg = data.success ? '✅ 命令执行成功' : ('❌ 命令执行失败: ' + (data.error || '未知错误'));
document.getElementById('statusMsg').textContent = msg;
document.getElementById('effectMsg').textContent = effectMap[url] || '';
// 恢复按钮状态
btn.disabled = false;
btn.innerHTML = originalText;
})
.catch(e => {
document.getElementById('statusMsg').textContent = '❌ 请求出错: ' + e;
document.getElementById('effectMsg').textContent = '';
// 恢复按钮状态
btn.disabled = false;
btn.innerHTML = originalText;
});
}
// 键盘控制
document.addEventListener('keydown', function(e) {
const key = e.key.toLowerCase();
if (keyMap[key]) {
e.preventDefault();
sendCmd(keyMap[key]);
}
});
// 更新电池电量
function updateBattery() {
fetch('/status').then(r => r.json()).then(data => {
if ('battery' in data) {
const batteryLevel = data.battery;
document.getElementById('batteryLevel').textContent = batteryLevel + '%';
const batteryFill = document.getElementById('batteryFill');
batteryFill.style.width = batteryLevel + '%';
// 根据电量设置颜色
batteryFill.className = 'battery-fill';
if (batteryLevel <= 20) {
batteryFill.classList.add('low');
} else if (batteryLevel <= 50) {
batteryFill.classList.add('medium');
}
}
}).catch(e => {
console.log('获取电池状态失败:', e);
});
}
// 初始化
setInterval(updateBattery, 3000); // 每3秒刷新一次
updateBattery(); // 页面加载时立即刷新
// 视频流错误处理
document.getElementById('videoFeed').onerror = function() {
this.style.display = 'none';
const errorDiv = document.createElement('div');
errorDiv.style.cssText = 'width:500px;height:375px;background:#333;color:#fff;display:flex;align-items:center;justify-content:center;border-radius:12px;border:3px solid #f44336;';
errorDiv.innerHTML = '❌ 视频流连接失败<br><small>请检查无人机连接状态</small>';
this.parentNode.appendChild(errorDiv);
};
</script>
</body>
</html>

@ -0,0 +1,92 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>找回密码</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
.forgot-container {
width: 400px;
margin: 100px auto;
background: #fff;
border-radius: 12px;
box-shadow: 0 4px 32px rgba(0,0,0,0.10);
padding: 40px 48px;
text-align: center;
}
.forgot-title {
font-size: 32px;
margin-bottom: 32px;
color: #1976d2;
}
.forgot-input {
width: 90%;
padding: 12px;
margin: 16px 0;
border-radius: 6px;
border: 1px solid #bdbdbd;
font-size: 18px;
}
.forgot-btn {
width: 100%;
padding: 12px;
font-size: 20px;
background: #388e3c;
color: #fff;
border: none;
border-radius: 6px;
margin-top: 20px;
cursor: pointer;
}
.forgot-btn:hover {
background: #1976d2;
}
.error-msg {
color: #d32f2f;
margin-top: 12px;
}
.success-msg {
color: #388e3c;
margin-top: 12px;
}
</style>
</head>
<body>
<div class="forgot-container">
<div class="forgot-title">找回密码</div>
<form id="forgotForm">
<input class="forgot-input" type="text" name="username" placeholder="用户名" required><br>
<input class="forgot-input" type="text" name="answer" placeholder="你的安全问题答案" required><br>
<input class="forgot-input" type="password" name="new_password" placeholder="新密码" required><br>
<button class="forgot-btn" type="submit">重置密码</button>
</form>
<div class="error-msg" id="forgotError"></div>
<div class="success-msg" id="forgotSuccess"></div>
</div>
<script>
document.getElementById('forgotForm').onsubmit = function(e) {
e.preventDefault();
var form = e.target;
var data = new URLSearchParams(new FormData(form));
fetch('/forgot', {
method: 'POST',
body: data
})
.then(r => r.json())
.then(res => {
if(res.success) {
document.getElementById('forgotSuccess').textContent = '密码重置成功,请返回登录';
document.getElementById('forgotError').textContent = '';
} else {
document.getElementById('forgotError').textContent = res.msg || '重置失败';
document.getElementById('forgotSuccess').textContent = '';
}
})
.catch(() => {
document.getElementById('forgotError').textContent = '网络错误,请重试';
document.getElementById('forgotSuccess').textContent = '';
});
};
</script>
</body>
</html>

@ -0,0 +1,483 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>导入已有路径进行分析</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
body {
margin: 0;
padding: 0;
width: 100vw;
height: 100vh;
background: url('/static/background/page.png') no-repeat center center fixed;
background-size: cover;
font-family: 'Microsoft YaHei', 'Arial', sans-serif;
}
/* 顶部导航栏 */
.top-nav {
position: fixed;
top: 0;
left: 0;
right: 0;
height: 60px;
background: rgba(0,0,0,0.8);
backdrop-filter: blur(10px);
display: flex;
justify-content: space-between;
align-items: center;
padding: 0 40px;
z-index: 1000;
border-bottom: 2px solid rgba(255,255,255,0.1);
}
.nav-title {
color: #fff;
font-size: 20px;
font-weight: bold;
}
.nav-btn {
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
padding: 8px 20px;
border-radius: 6px;
text-decoration: none;
font-size: 14px;
transition: all 0.3s ease;
}
.nav-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-2px);
}
.import-container {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
background: rgba(255,255,255,0.95);
padding: 40px 50px;
border-radius: 20px;
box-shadow: 0 8px 32px rgba(0,0,0,0.15);
text-align: center;
min-width: 500px;
backdrop-filter: blur(10px);
border: 1px solid rgba(255,255,255,0.2);
}
.import-title {
font-size: 32px;
font-family: KaiTi, 楷体, serif;
margin-bottom: 30px;
color: #1976d2;
font-weight: bold;
}
.upload-area {
border: 3px dashed #4CAF50;
border-radius: 12px;
padding: 40px 20px;
margin: 20px 0;
background: rgba(76,175,80,0.05);
transition: all 0.3s ease;
cursor: pointer;
}
.upload-area:hover {
border-color: #388e3c;
background: rgba(76,175,80,0.1);
}
.upload-area.dragover {
border-color: #388e3c;
background: rgba(76,175,80,0.15);
transform: scale(1.02);
}
.upload-icon {
font-size: 48px;
color: #4CAF50;
margin-bottom: 15px;
}
.file-label {
font-size: 18px;
margin-bottom: 15px;
display: block;
color: #333;
font-weight: bold;
}
.file-input {
font-size: 16px;
margin-bottom: 20px;
padding: 10px;
border: 2px solid #e0e0e0;
border-radius: 8px;
transition: all 0.3s ease;
}
.file-input:focus {
border-color: #4CAF50;
box-shadow: 0 0 0 3px rgba(76,175,80,0.1);
outline: none;
}
.run-btn {
font-size: 18px;
padding: 12px 30px;
border-radius: 8px;
border: none;
background: linear-gradient(135deg, #4CAF50, #388e3c);
color: #fff;
cursor: pointer;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(76,175,80,0.3);
font-weight: bold;
}
.run-btn:hover {
background: linear-gradient(135deg, #388e3c, #2e7d32);
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(76,175,80,0.4);
}
.run-btn:disabled {
background: linear-gradient(135deg, #9e9e9e, #757575);
cursor: not-allowed;
transform: none;
box-shadow: 0 4px 15px rgba(158,158,158,0.3);
}
.result-msg {
margin-top: 20px;
font-size: 16px;
color: #1976d2;
padding: 15px;
border-radius: 8px;
background: rgba(25,118,210,0.1);
}
.error-msg {
color: #d32f2f;
background: rgba(211,47,47,0.1);
}
.success-msg {
color: #388e3c;
background: rgba(76,175,80,0.1);
}
/* 文件信息卡片 */
.file-info {
background: rgba(76,175,80,0.1);
border-radius: 12px;
padding: 20px;
margin: 20px 0;
border-left: 4px solid #4CAF50;
}
.file-name {
font-size: 18px;
font-weight: bold;
color: #333;
margin-bottom: 10px;
}
/* 任务描述卡片 */
.task-desc {
background: rgba(25,118,210,0.1);
border-radius: 12px;
padding: 20px;
margin: 20px 0;
border-left: 4px solid #1976d2;
}
.task-title {
font-size: 16px;
font-weight: bold;
color: #1976d2;
margin-bottom: 10px;
}
.task-content {
font-size: 16px;
color: #333;
line-height: 1.6;
}
/* 动作序列表格 */
.actions-table {
background: rgba(255,255,255,0.9);
border-radius: 12px;
padding: 20px;
margin: 20px 0;
box-shadow: 0 4px 15px rgba(0,0,0,0.1);
}
.actions-table table {
width: 100%;
border-collapse: collapse;
margin: 0 auto;
}
.actions-table th {
background: linear-gradient(135deg, #4CAF50, #388e3c);
color: #fff;
padding: 12px;
text-align: left;
border-radius: 6px;
}
.actions-table td {
padding: 10px 12px;
border-bottom: 1px solid #e0e0e0;
}
.actions-table tr:hover {
background: rgba(76,175,80,0.05);
}
/* 进度指示器 */
.progress-container {
margin: 20px 0;
display: none;
}
.progress-bar {
width: 100%;
height: 8px;
background: rgba(255,255,255,0.3);
border-radius: 4px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, #4CAF50, #388e3c);
width: 0%;
transition: width 0.3s ease;
}
.progress-text {
margin-top: 10px;
font-size: 14px;
color: #666;
}
/* 加载动画 */
.loading-spinner {
display: inline-block;
width: 20px;
height: 20px;
border: 3px solid rgba(255,255,255,0.3);
border-radius: 50%;
border-top-color: #fff;
animation: spin 1s linear infinite;
margin-right: 10px;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
/* 返回按钮 */
.back-btn {
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
padding: 10px 25px;
border-radius: 8px;
border: none;
cursor: pointer;
font-size: 16px;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(25,118,210,0.3);
margin-top: 20px;
}
.back-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(25,118,210,0.4);
}
</style>
</head>
<body>
<!-- 顶部导航栏 -->
<div class="top-nav">
<div class="nav-title">路径分析系统</div>
<a href="/main_menu" class="nav-btn">返回主菜单</a>
</div>
<div class="import-container">
<div class="import-title">📁 导入已有路径进行分析</div>
{% if not uploaded %}
<form method="post" enctype="multipart/form-data" id="uploadForm">
<div class="upload-area" id="uploadArea">
<div class="upload-icon">📄</div>
<label class="file-label">选择Python文件</label>
<input class="file-input" type="file" name="file" accept=".py" required id="fileInput">
<div style="font-size: 14px; color: #666; margin-top: 10px;">
支持的文件格式:.py
</div>
</div>
<button class="run-btn" type="submit">🚀 上传并分析</button>
</form>
{% endif %}
{% if uploaded %}
<div class="file-info">
<div class="file-name">📄 {{ filename }}</div>
<div style="font-size: 14px; color: #666;">文件已成功上传</div>
</div>
<button class="run-btn" id="runBtn">▶️ 开始运行任务</button>
{% if nl_desc %}
<div class="task-desc">
<div class="task-title">📋 任务描述</div>
<div class="task-content">{{ nl_desc }}</div>
</div>
{% endif %}
{% if actions %}
<div class="actions-table">
<div class="task-title">⚡ 动作序列</div>
<table>
<thead>
<tr>
<th>动作</th>
<th>参数</th>
</tr>
</thead>
<tbody>
{% for a in actions %}
<tr>
<td>{{ a.action }}</td>
<td>{{ a.params }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
{% endif %}
{% if error %}
<div class="result-msg error-msg">❌ {{ error }}</div>
{% endif %}
<div class="result-msg" id="resultMsg" style="display: none;"></div>
<!-- 进度指示器 -->
<div class="progress-container" id="progressContainer">
<div class="progress-bar">
<div class="progress-fill" id="progressFill"></div>
</div>
<div class="progress-text" id="progressText">准备中...</div>
</div>
<button class="back-btn" onclick="window.location.href='/main_menu'">← 返回主菜单</button>
</div>
{% if uploaded %}
<script>
document.getElementById('runBtn').onclick = function() {
var btn = this;
var progressContainer = document.getElementById('progressContainer');
var progressFill = document.getElementById('progressFill');
var progressText = document.getElementById('progressText');
var resultMsg = document.getElementById('resultMsg');
// 显示进度指示器
progressContainer.style.display = 'block';
resultMsg.style.display = 'none';
// 更新按钮状态
btn.disabled = true;
btn.innerHTML = '<span class="loading-spinner"></span>运行中...';
// 模拟进度更新
let progress = 0;
const progressInterval = setInterval(() => {
progress += Math.random() * 15;
if (progress > 90) progress = 90;
progressFill.style.width = progress + '%';
progressText.textContent = '任务执行中... ' + Math.round(progress) + '%';
}, 500);
fetch('/run_path', {
method: 'POST',
headers: {'Content-Type': 'application/x-www-form-urlencoded'},
body: 'filename={{ filename }}'
})
.then(r => r.json())
.then(data => {
clearInterval(progressInterval);
progressFill.style.width = '100%';
progressText.textContent = '任务完成!';
// 显示结果
resultMsg.style.display = 'block';
resultMsg.textContent = data.msg;
resultMsg.className = 'result-msg ' + (data.success ? 'success-msg' : 'error-msg');
// 恢复按钮状态
btn.disabled = false;
btn.innerHTML = '▶️ 开始运行任务';
// 隐藏进度条
setTimeout(() => {
progressContainer.style.display = 'none';
}, 2000);
})
.catch(e => {
clearInterval(progressInterval);
progressContainer.style.display = 'none';
resultMsg.style.display = 'block';
resultMsg.textContent = '❌ 运行出错:' + e;
resultMsg.className = 'result-msg error-msg';
btn.disabled = false;
btn.innerHTML = '▶️ 开始运行任务';
});
};
</script>
{% endif %}
<script>
// 文件拖拽上传功能
const uploadArea = document.getElementById('uploadArea');
const fileInput = document.getElementById('fileInput');
if (uploadArea && fileInput) {
uploadArea.addEventListener('click', () => fileInput.click());
uploadArea.addEventListener('dragover', (e) => {
e.preventDefault();
uploadArea.classList.add('dragover');
});
uploadArea.addEventListener('dragleave', () => {
uploadArea.classList.remove('dragover');
});
uploadArea.addEventListener('drop', (e) => {
e.preventDefault();
uploadArea.classList.remove('dragover');
const files = e.dataTransfer.files;
if (files.length > 0) {
fileInput.files = files;
// 自动提交表单
document.getElementById('uploadForm').submit();
}
});
}
</script>
</body>
</html>

@ -0,0 +1 @@
<!-- 页面已迁移至welcome.html保留此文件防止404 -->

@ -0,0 +1,144 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>用户登录</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
body {
background: url('/static/background/OIP-C.webp') center center/cover no-repeat fixed;
}
.login-container {
width: 400px;
margin: 100px auto;
background: rgba(255,255,255,0.95);
border-radius: 16px;
box-shadow: 0 8px 32px rgba(0,0,0,0.15);
padding: 40px 48px;
text-align: center;
backdrop-filter: blur(10px);
border: 1px solid rgba(255,255,255,0.2);
}
.login-title {
font-size: 32px;
margin-bottom: 32px;
color: #1976d2;
}
.login-input {
width: 90%;
padding: 12px;
margin: 16px 0;
border-radius: 8px;
border: 2px solid #e0e0e0;
font-size: 18px;
transition: all 0.3s ease;
background: rgba(255,255,255,0.9);
}
.login-input:focus {
border-color: #4CAF50;
box-shadow: 0 0 0 3px rgba(76,175,80,0.1);
outline: none;
}
.login-btn {
width: 100%;
padding: 12px;
font-size: 20px;
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
border: none;
border-radius: 8px;
margin-top: 20px;
cursor: pointer;
transition: all 0.3s ease;
box-shadow: 0 4px 15px rgba(25,118,210,0.3);
}
.login-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(25,118,210,0.4);
}
.register-link {
margin-top: 24px;
display: block;
color: #388e3c;
cursor: pointer;
}
.error-msg {
color: #d32f2f;
margin-top: 12px;
}
</style>
</head>
<body>
<div class="login-container">
<div class="login-title">用户登录</div>
<form id="loginForm">
<input class="login-input" type="text" name="username" placeholder="用户名" required><br>
<input class="login-input" type="password" name="password" placeholder="密码" required><br>
<button class="login-btn" type="submit">登录</button>
</form>
<div class="error-msg" id="loginError"></div>
<span class="register-link" onclick="window.location.href='/register'">没有账号?点击注册</span>
<span class="register-link" onclick="window.location.href='/forgot'">忘记密码?</span>
</div>
<script>
document.getElementById('loginForm').onsubmit = function(e) {
e.preventDefault();
var form = e.target;
var data = new URLSearchParams(new FormData(form));
fetch('/login', {
method: 'POST',
body: data
})
.then(r => r.json())
.then(res => {
if(res.success) {
showWelcomeAndRedirect();
} else {
document.getElementById('loginError').textContent = res.msg || '登录失败';
}
})
.catch(() => {
document.getElementById('loginError').textContent = '网络错误,请重试';
});
};
function showWelcomeAndRedirect() {
var welcomeDiv = document.createElement('div');
welcomeDiv.style.position = 'fixed';
welcomeDiv.style.left = 0;
welcomeDiv.style.top = 0;
welcomeDiv.style.width = '100vw';
welcomeDiv.style.height = '100vh';
welcomeDiv.style.zIndex = 9999;
welcomeDiv.style.background = "url('/static/background/welcome.jpg') center center/cover no-repeat";
welcomeDiv.style.display = 'flex';
welcomeDiv.style.alignItems = 'center';
welcomeDiv.style.justifyContent = 'center';
welcomeDiv.style.opacity = 0;
welcomeDiv.style.transition = 'opacity 0.8s';
var text = document.createElement('div');
text.textContent = '欢迎使用基于大模型的无人机定点打击系统';
text.style.fontSize = '2.5rem';
text.style.color = '#fff';
text.style.textShadow = '2px 2px 8px #000, 0 0 16px #f00';
text.style.fontWeight = 'bold';
text.style.background = 'rgba(0,0,0,0.25)';
text.style.padding = '32px 48px';
text.style.borderRadius = '18px';
welcomeDiv.appendChild(text);
document.body.appendChild(welcomeDiv);
setTimeout(function(){
welcomeDiv.style.opacity = 1;
}, 10);
setTimeout(function(){
welcomeDiv.style.opacity = 0;
setTimeout(function(){
document.body.removeChild(welcomeDiv);
window.location.href = '/';
}, 800);
}, 2000);
}
</script>
</body>
</html>

@ -0,0 +1,146 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>请选择你的操作类型</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
body {
margin: 0;
padding: 0;
width: 100vw;
height: 100vh;
background: url('/static/background/page.png') no-repeat center center fixed;
background-size: cover;
}
.main-title {
margin-top: 60px;
text-align: center;
font-family: KaiTi, 楷体, serif;
font-size: 54px;
color: #fff;
text-shadow: 2px 2px 8px #333;
}
.menu-btns {
display: flex;
flex-direction: column;
align-items: center;
margin-top: 80px;
gap: 40px;
}
.menu-btn {
font-size: 32px;
padding: 20px 80px;
border-radius: 12px;
border: none;
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
cursor: pointer;
margin: 0 0 0 0;
box-shadow: 0 6px 20px rgba(25,118,210,0.3);
font-family: inherit;
transition: all 0.3s ease;
position: relative;
overflow: hidden;
}
.menu-btn::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255,255,255,0.2), transparent);
transition: left 0.5s;
}
.menu-btn:hover::before {
left: 100%;
}
.menu-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-3px);
box-shadow: 0 8px 25px rgba(25,118,210,0.4);
}
.menu-btn:active {
transform: translateY(-1px);
}
#logout-btn {
position: fixed;
top: 28px;
right: 40px;
background: linear-gradient(135deg, #d32f2f, #c62828);
color: #fff;
padding: 12px 28px;
border-radius: 8px;
font-size: 18px;
font-weight: bold;
text-decoration: none;
box-shadow: 0 4px 15px rgba(211,47,47,0.3);
z-index: 1000;
transition: all 0.3s ease;
}
#logout-btn:hover {
background: linear-gradient(135deg, #c62828, #b71c1c);
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(211,47,47,0.4);
}
#doc-btn {
position: fixed;
top: 28px;
left: 40px;
background: linear-gradient(135deg, #1976d2, #1565c0);
color: #fff;
padding: 12px 28px;
border-radius: 8px;
font-size: 18px;
font-weight: bold;
text-decoration: none;
box-shadow: 0 4px 15px rgba(25,118,210,0.3);
z-index: 1000;
transition: all 0.3s ease;
}
#doc-btn:hover {
background: linear-gradient(135deg, #1565c0, #0d47a1);
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(25,118,210,0.4);
}
</style>
</head>
<body>
<a href="/doc" id="doc-btn">系统信息</a>
<a href="/logout" id="logout-btn">退出登录</a>
<div class="main-title">请选择你的操作类型</div>
<div class="menu-btns">
<button class="menu-btn" onclick="window.location.href='/import_path'">导入已有路径进行分析</button>
<button class="menu-btn" onclick="window.location.href='/drone_control'">实时操控无人机</button>
<button class="menu-btn" id="analyzeBtn">进行结果分析</button>
<button class="menu-btn" disabled>导出决策结果(待添加)</button>
</div>
<div id="analyzeResult" style="margin-top:40px;text-align:center;font-size:20px;color:#1976d2;"></div>
<script>
document.getElementById('analyzeBtn').onclick = function() {
var btn = this;
btn.disabled = true;
btn.textContent = '分析中...';
fetch('/analyze_results')
.then(r => r.json())
.then(data => {
if(data.result && data.result.indexOf('分析完成') !== -1) {
let addr = data.summary_file || '';
let summary = (data.summary || []).join('<br>');
document.getElementById('analyzeResult').innerHTML = `分析完成,结果已保存在:<span style='color:#388e3c'>${addr}</span><br><br>${summary}`;
} else {
alert('分析失败,请重新再试');
window.location.href = '/main_menu';
}
btn.disabled = false;
btn.textContent = '进行结果分析';
})
.catch(e => {
alert('分析失败,请重新再试');
window.location.href = '/main_menu';
});
};
</script>
</body>
</html>

@ -0,0 +1,97 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>用户注册</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
.register-container {
width: 400px;
margin: 100px auto;
background: #fff;
border-radius: 12px;
box-shadow: 0 4px 32px rgba(0,0,0,0.10);
padding: 40px 48px;
text-align: center;
}
.register-title {
font-size: 32px;
margin-bottom: 32px;
color: #1976d2;
}
.register-input {
width: 90%;
padding: 12px;
margin: 16px 0;
border-radius: 6px;
border: 1px solid #bdbdbd;
font-size: 18px;
}
.register-btn {
width: 100%;
padding: 12px;
font-size: 20px;
background: #388e3c;
color: #fff;
border: none;
border-radius: 6px;
margin-top: 20px;
cursor: pointer;
}
.register-btn:hover {
background: #1976d2;
}
.login-link {
margin-top: 24px;
display: block;
color: #1976d2;
cursor: pointer;
}
.error-msg {
color: #d32f2f;
margin-top: 12px;
}
</style>
</head>
<body>
<div class="register-container">
<div class="register-title">用户注册</div>
<form id="registerForm">
<input class="register-input" type="text" name="username" placeholder="用户名" required><br>
<input class="register-input" type="password" name="password" placeholder="密码" required><br>
<input class="register-input" type="password" name="confirm" placeholder="确认密码" required><br>
<input class="register-input" type="text" name="security_answer" placeholder="你的安全问题答案(如你母亲的姓名)" required><br>
<button class="register-btn" type="submit">注册</button>
</form>
<div class="error-msg" id="registerError"></div>
<span class="login-link" onclick="window.location.href='/login'">已有账号?点击登录</span>
<span class="login-link" onclick="window.location.href='/forgot'">忘记密码?</span>
</div>
<script>
document.getElementById('registerForm').onsubmit = function(e) {
e.preventDefault();
var form = e.target;
var data = new URLSearchParams(new FormData(form));
if(form.password.value !== form.confirm.value) {
document.getElementById('registerError').textContent = '两次密码输入不一致';
return;
}
fetch('/register', {
method: 'POST',
body: data
})
.then(r => r.json())
.then(res => {
if(res.success) {
window.location.href = '/login';
} else {
document.getElementById('registerError').textContent = res.msg || '注册失败';
}
})
.catch(() => {
document.getElementById('registerError').textContent = '网络错误,请重试';
});
};
</script>
</body>
</html>

@ -0,0 +1,65 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>用户中心</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
.user-center-container {
width: 400px;
margin: 100px auto;
background: #fff;
border-radius: 12px;
box-shadow: 0 4px 32px rgba(0,0,0,0.10);
padding: 40px 48px;
text-align: center;
}
.user-title {
font-size: 32px;
margin-bottom: 32px;
color: #1976d2;
}
.user-info {
font-size: 20px;
margin: 16px 0;
}
.logout-btn {
width: 100%;
padding: 12px;
font-size: 20px;
background: #d32f2f;
color: #fff;
border: none;
border-radius: 6px;
margin-top: 20px;
cursor: pointer;
}
.logout-btn:hover {
background: #b71c1c;
}
</style>
</head>
<body>
<div class="user-center-container">
<div class="user-title">用户中心</div>
<div class="user-info">用户名:<span id="username"></span></div>
<div class="user-info">权限:<span id="role"></span></div>
<button class="logout-btn" onclick="logout()">退出登录</button>
</div>
<script>
fetch('/user_info').then(r=>r.json()).then(data=>{
if(data.success) {
document.getElementById('username').textContent = data.username;
document.getElementById('role').textContent = data.role;
} else {
window.location.href = '/login';
}
});
function logout() {
fetch('/logout').then(()=>{
window.location.href = '/login';
});
}
</script>
</body>
</html>

@ -0,0 +1,87 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>欢迎使用基于大模型的无人机定点打击系统</title>
<link rel="stylesheet" href="/static/custom.css">
<style>
html, body {
margin: 0;
padding: 0;
width: 100vw;
height: 100vh;
}
body {
min-height: 100vh;
min-width: 100vw;
position: relative;
overflow: hidden;
}
.bg-img {
position: fixed;
top: 0;
left: 0;
width: 100vw;
height: 100vh;
object-fit: cover;
z-index: 0;
pointer-events: none;
user-select: none;
}
.welcome-title {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -60%);
font-family: KaiTi, 楷体, serif;
font-size: 96px;
color: #fff;
text-shadow: 2px 2px 8px #333;
text-align: center;
z-index: 2;
}
.about-btn {
position: absolute;
top: 40px;
left: 40px;
font-size: 24px;
font-family: inherit;
padding: 12px 32px;
border-radius: 8px;
border: none;
background: rgba(0,0,0,0.5);
color: #fff;
cursor: pointer;
z-index: 2;
}
.about-btn:hover {
background: rgba(0,0,0,0.7);
}
.enter-btn {
position: absolute;
top: calc(50% + 200px);
left: 50%;
transform: translate(-50%, 0);
font-size: 36px;
font-family: inherit;
padding: 18px 60px;
border-radius: 10px;
border: none;
background: #4CAF50;
color: #fff;
cursor: pointer;
box-shadow: 0 4px 16px rgba(0,0,0,0.2);
z-index: 2;
}
.enter-btn:hover {
background: #388e3c;
}
</style>
</head>
<body>
<img class="bg-img" src="/static/background/bg1.png">
<button class="about-btn" onclick="window.location.href='/doc'">关于本系统的介绍</button>
<div class="welcome-title">欢迎使用基于大模型的无人机定点打击系统</div>
<button class="enter-btn" onclick="window.location.href='/main_menu'">进入使用</button>
</body>
</html>

@ -0,0 +1,352 @@
import cv2
import os
import av
import time
import socket
from ultralytics import YOLO
import threading
from queue import Queue
import re
import sys
# 全局变量
frame_queue = Queue(maxsize=30) # 用于存储视频帧的队列
detection_queue = Queue() # 用于存储待检测图片的队列
is_running = True # 控制线程运行状态
photo_count = 0 # 已拍摄照片数量
latest_image_path = None # 最新图片路径
container = None # 视频流容器
frame_iter = None # 视频帧迭代器
sock = None # 无人机控制socket
drone_initialized = False # 无人机是否已初始化
tello_address = ('192.168.10.1', 8889) # 无人机地址
# =================== 新增类封装 ===================
class DroneMission:
def __init__(self, external_sock=None, external_tello_address=None,
external_frame_queue=None, external_container=None,
external_frame_iter=None):
# 使用外部传入的资源,如果没有则创建新的
self.sock = external_sock
self.tello_address = external_tello_address or ('192.168.10.1', 8889)
self.frame_queue = external_frame_queue or Queue(maxsize=30)
self.container = external_container
self.frame_iter = external_frame_iter
# 记录外部资源状态
self._external_sock = external_sock is not None
self._external_container = external_container is not None
self.is_running = True
self.photo_count = 0
self.result_dir = r'D:\yolo\ultralytics\assets\result'
os.makedirs(self.result_dir, exist_ok=True)
# 如果没有外部socket则初始化自己的
if not self.sock:
self._init_socket()
if not self.container or not self.frame_iter:
self._init_video_stream()
# 只有在没有外部资源时才启动自己的线程
if not external_sock and not external_container:
self._start_threads()
def _init_socket(self):
if self.sock:
try:
self.sock.close()
except:
pass
# 只有当没有外部socket时才创建新的socket
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 9000))
self.sock.settimeout(3.0)
def _parse_dm_response(self, resp):
if resp is None:
return None
m = re.match(r"(\d+)", resp)
if m:
return int(m.group(1))
return None
def _send_command(self, cmd):
try:
print(f"DroneMission sending command: {cmd}")
self.sock.sendto(cmd.encode('utf-8'), self.tello_address)
response, _ = self.sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"DroneMission received response: {response_str}")
return response_str
except socket.timeout:
print(f"DroneMission timeout sending command: {cmd}")
return None
except Exception as e:
print(f"Error sending command: {e}")
return None
def test_connection(self):
try:
response = self._send_command('command')
print(f"Connection test response: '{response}'")
# 兼容多种响应格式:'ok', 'Re0402 ok', 'Re0101 ok' 等
return response and ('ok' in response.lower())
except Exception as e:
print(f"Connection test failed: {e}")
return False
def check_flying_status(self):
try:
height_response = self._send_command('height?')
print(f"Height response: '{height_response}'")
height = self._parse_dm_response(height_response)
if height is not None:
is_flying = height > 20
print(f"Height: {height}cm, Flying: {is_flying}")
return is_flying
else:
print(f"Invalid height response: '{height_response}'")
return False
except Exception as e:
print(f"Error checking flying status: {e}")
return False
def _init_video_stream(self):
# 只有当没有外部视频流时才创建新的
if not self.container or not self.frame_iter:
try:
self.container = av.open('udp://0.0.0.0:11111', format='h264', timeout=3.0)
self.frame_iter = self.container.decode(video=0)
except Exception as e:
print(f"Error initializing video stream: {e}")
self.container = None
self.frame_iter = None
def _video_stream_thread(self):
while self.is_running:
try:
if self.frame_iter is None:
self._init_video_stream()
if self.frame_iter is None:
time.sleep(1)
continue
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(img)
except Exception as e:
print(f"Error in video stream thread: {e}")
time.sleep(0.1)
def _start_threads(self):
t1 = threading.Thread(target=self._video_stream_thread)
t1.daemon = True
t1.start()
def takeoff(self):
if self.check_flying_status():
print("DroneMission: 已在飞行,跳过起飞")
return True
resp = self._send_command('takeoff')
print(f"Takeoff response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(3)
return True
print("Takeoff failed, retrying...")
time.sleep(2)
retry = self._send_command('takeoff')
print(f"Takeoff retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def land(self):
if not self.check_flying_status():
print("DroneMission: 已在地面,跳过降落")
return True
resp = self._send_command('land')
print(f"Land response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(2)
return True
print("Land failed, retrying...")
time.sleep(2)
retry = self._send_command('land')
print(f"Land retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def move(self, direction, distance):
cmd = f'{direction} {distance}'
resp = self._send_command(cmd)
print(f"Move response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Move failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Move retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def rotate(self, degrees):
cmd = f'cw {degrees}'
resp = self._send_command(cmd)
print(f"Rotate response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Rotate failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Rotate retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def capture_photo(self):
img = None
while not self.frame_queue.empty():
img = self.frame_queue.get()
if img is not None:
self.photo_count += 1
filename = f"result{self.photo_count:04d}.jpg"
img_path = os.path.join(self.result_dir, filename)
cv2.imwrite(img_path, img)
print(f"保存图片: {img_path}")
return img_path
print("未获取到图片帧,拍照失败")
return None
def mission_with_photos(self):
print("任务开始,测试连接...")
if not self.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not self.test_connection():
print("连接仍然失败,但继续任务")
print("起飞...")
if not self.takeoff():
print("起飞失败,但继续任务")
time.sleep(3)
start_time = time.time()
total_time = 0
action_steps = [
("上升5米", lambda: self.move('up', 500), 3),
("前进20米", lambda: self.move('forward', 2000), 3),
("旋转360°", lambda: self.rotate(360), 3),
("前进20米", lambda: self.move('forward', 2000), 3),
("降落", self.land, 0)
]
step_idx = 0
next_photo_time = 0
print("开始动作与拍照...")
while step_idx < len(action_steps):
step_name, step_func, wait_time = action_steps[step_idx]
step_start = time.time()
step_done = False
while not step_done:
now = time.time()
# 拍照
if now - start_time >= next_photo_time:
self.capture_photo()
next_photo_time += 0.5
# 执行动作
if step_func():
print(f"{step_name} 完成")
time.sleep(wait_time)
step_done = True
else:
print(f"{step_name} 重试...")
time.sleep(1)
step_idx += 1
print("任务完成")
def close(self):
self.is_running = False
# 只有在没有外部资源时才关闭socket和container
if not hasattr(self, '_external_sock') or not self._external_sock:
if self.sock:
try:
self.sock.close()
except:
pass
if not hasattr(self, '_external_container') or not self._external_container:
if self.container:
try:
self.container.close()
except:
pass
# =================== 保留main函数兼容直接运行 ===================
def main():
print("=== road1.py main() called ===")
# 检查是否有外部传入的资源
if 'sock' in globals() and 'tello_address' in globals():
print("✓ Using external resources")
# 使用外部资源
mission = DroneMission(
external_sock=globals().get('sock'),
external_tello_address=globals().get('tello_address'),
external_frame_queue=globals().get('frame_queue'),
external_container=globals().get('container'),
external_frame_iter=globals().get('frame_iter')
)
else:
print("✓ Using independent resources")
# 独立运行模式
mission = DroneMission()
try:
mission.mission_with_photos()
mission.close()
print("=== road1.py main() completed ===")
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
# 添加一个函数用于外部调用
def run_mission_with_external_resources(sock, tello_address, frame_queue=None, container=None, frame_iter=None):
"""使用外部资源运行任务的函数"""
print("=== road1.py run_mission_with_external_resources() called ===")
print(f"Received resources: sock={sock}, tello_address={tello_address}")
mission = DroneMission(
external_sock=sock,
external_tello_address=tello_address,
external_frame_queue=frame_queue,
external_container=container,
external_frame_iter=frame_iter
)
try:
mission.mission_with_photos()
mission.close()
print("=== road1.py run_mission_with_external_resources() completed ===")
return True
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
return False
if __name__ == '__main__':
main()

@ -0,0 +1,500 @@
import cv2
import os
import av
import time
import socket
from ultralytics import YOLO
import threading
from queue import Queue
import logging
import sys
# 全局变量
frame_queue = Queue(maxsize=30) # 用于存储视频帧的队列
detection_queue = Queue() # 用于存储待检测图片的队列
is_running = True # 控制线程运行状态
latest_image_path = None # 最新图片路径
container = None # 视频流容器
frame_iter = None # 视频帧迭代器
sock = None # 无人机控制socket
tello_address = ('192.168.10.1', 8889) # 无人机地址
logging.basicConfig(filename='road2_run.log', level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
# =================== 新增类封装 ===================
class DroneMission:
def __init__(self, external_sock=None, external_tello_address=None,
external_frame_queue=None, external_container=None,
external_frame_iter=None):
# 使用外部传入的资源,如果没有则创建新的
self.sock = external_sock
self.tello_address = external_tello_address or ('192.168.10.1', 8889)
self.frame_queue = external_frame_queue or Queue(maxsize=30)
self.container = external_container
self.frame_iter = external_frame_iter
# 记录外部资源状态
self._external_sock = external_sock is not None
self._external_container = external_container is not None
self.detection_queue = Queue()
self.is_running = True
self.latest_image_path = None
# 如果没有外部socket则初始化自己的
if not self.sock:
self._init_socket()
if not self.container or not self.frame_iter:
self._init_video_stream()
# 只有在没有外部资源时才启动自己的线程
if not external_sock and not external_container:
self._start_threads()
self.model = YOLO(r'D:\yolo\yolov8n.pt')
def _init_socket(self):
if self.sock:
try:
self.sock.close()
except:
pass
# 只有当没有外部socket时才创建新的socket
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('', 9000))
self.sock.settimeout(3.0)
def _send_command(self, cmd):
try:
print(f"DroneMission sending command: {cmd}")
self.sock.sendto(cmd.encode('utf-8'), self.tello_address)
response, _ = self.sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"DroneMission received response: {response_str}")
return response_str
except socket.timeout:
print(f"DroneMission timeout sending command: {cmd}")
return None
except Exception as e:
print(f"DroneMission error sending command: {e}")
return None
def check_flying_status(self):
"""检查无人机是否正在飞行"""
try:
# 尝试获取高度信息来判断是否在飞行
height_response = self._send_command('height?')
print(f"Height response: '{height_response}'")
# 检查响应是否有效
if height_response and height_response.isdigit():
height = int(height_response)
is_flying = height > 20 # 如果高度大于20cm认为在飞行
print(f"Height: {height}cm, Flying: {is_flying}")
return is_flying
else:
print(f"Invalid height response: '{height_response}'")
# 如果高度检查失败,尝试其他方法
return self._check_flying_alternative()
except Exception as e:
print(f"Error checking flying status: {e}")
return self._check_flying_alternative()
def _check_flying_alternative(self):
"""备用的飞行状态检查方法"""
try:
# 尝试获取电池信息,如果能获取到说明连接正常
battery_response = self._send_command('battery?')
print(f"Battery response: '{battery_response}'")
# 检查电池响应是否有效
if battery_response and battery_response.isdigit():
print(f"Battery check successful: {battery_response}%")
# 如果能获取电池信息,假设无人机在飞行状态
return True
else:
print(f"Invalid battery response: '{battery_response}'")
# 如果电池检查也失败,尝试其他方法
return self._check_flying_by_commands()
except Exception as e:
print(f"Error in battery check: {e}")
return self._check_flying_by_commands()
def _check_flying_by_commands(self):
"""通过尝试执行命令来检查飞行状态"""
try:
# 尝试发送一个简单的查询命令
response = self._send_command('speed?')
print(f"Speed response: '{response}'")
# 如果能得到任何响应,说明连接正常
if response and response != 'error':
print("Connection check successful, assuming flying")
return True
else:
print("Connection check failed")
return False
except Exception as e:
print(f"Error in connection check: {e}")
return False
def test_connection(self):
"""测试与无人机的连接"""
try:
# 尝试发送一个简单的命令
response = self._send_command('command')
print(f"Connection test response: '{response}'")
return response == 'ok'
except Exception as e:
print(f"Connection test failed: {e}")
return False
def _init_video_stream(self):
# 只有当没有外部视频流时才创建新的
if not self.container or not self.frame_iter:
try:
self.container = av.open('udp://0.0.0.0:11111', format='h264', timeout=3.0)
self.frame_iter = self.container.decode(video=0)
except Exception as e:
print(f"Error initializing video stream: {e}")
self.container = None
self.frame_iter = None
def _video_stream_thread(self):
while self.is_running:
try:
if self.frame_iter is None:
self._init_video_stream()
if self.frame_iter is None:
time.sleep(1)
continue
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(img)
except Exception as e:
time.sleep(0.1)
def _detection_thread(self):
while self.is_running:
try:
img_path = self.detection_queue.get(timeout=1)
if img_path:
try:
results = self.model(img_path)
for r in results:
save_path = os.path.join(r'D:\yolo\ultralytics\assets', 'detect_results', os.path.basename(img_path))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
r.save(save_path)
print(f"Detection completed for {img_path}")
except Exception as e:
print(f"Error in detection: {e}")
except:
time.sleep(0.01)
def _start_threads(self):
t1 = threading.Thread(target=self._video_stream_thread)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=self._detection_thread)
t2.daemon = True
t2.start()
def takeoff(self):
print("DroneMission: takeoff() called")
# 首先检查是否已经在飞行
if self.check_flying_status():
print("DroneMission: 无人机已在飞行状态,跳过起飞")
return True
# 尝试起飞
takeoff_response = self._send_command('takeoff')
print(f"DroneMission: takeoff response = '{takeoff_response}'")
# 检查起飞响应
if takeoff_response == 'ok':
print("DroneMission: 起飞成功")
time.sleep(3) # 等待起飞完成
return True
else:
print(f"DroneMission: 起飞失败,响应: '{takeoff_response}'")
# 即使起飞失败,也假设无人机可能已经在飞行
time.sleep(2)
return True # 返回True以继续执行任务
return False
def land(self):
print("DroneMission: land() called")
# 检查是否在地面
if not self.check_flying_status():
print("DroneMission: 无人机已在地面,跳过降落")
return True
# 尝试降落
land_response = self._send_command('land')
print(f"DroneMission: land response = '{land_response}'")
# 检查降落响应
if land_response == 'ok':
print("DroneMission: 降落成功")
time.sleep(2)
return True
else:
print(f"DroneMission: 降落失败,响应: '{land_response}'")
# 尝试重试降落
print("DroneMission: 尝试重试降落...")
time.sleep(2)
retry_response = self._send_command('land')
print(f"DroneMission: 降落重试响应 = '{retry_response}'")
if retry_response == 'ok':
print("DroneMission: 降落重试成功")
return True
else:
print("DroneMission: 降落重试失败,但任务完成")
return True
def move(self, direction, distance):
print(f"DroneMission: move({direction}, {distance}) called")
# 尝试执行移动命令,不强制检查飞行状态
cmd = f'{direction} {distance}'
response = self._send_command(cmd)
print(f"DroneMission: move response = '{response}'")
# 检查响应
if response == 'ok':
print(f"DroneMission: 移动命令执行成功")
return True
else:
print(f"DroneMission: 移动命令执行失败,响应: '{response}'")
# 尝试重试一次
print(f"DroneMission: 尝试重试移动命令...")
time.sleep(1)
retry_response = self._send_command(cmd)
print(f"DroneMission: 重试响应 = '{retry_response}'")
if retry_response == 'ok':
print(f"DroneMission: 重试成功")
return True
else:
print(f"DroneMission: 重试失败,但继续执行任务")
return True
def rotate(self, direction, degrees):
print(f"DroneMission: rotate({direction}, {degrees}) called")
# 尝试执行旋转命令
cmd = f'{direction} {degrees}'
response = self._send_command(cmd)
print(f"DroneMission: rotate response = '{response}'")
# 检查响应
if response == 'ok':
print(f"DroneMission: 旋转命令执行成功")
return True
else:
print(f"DroneMission: 旋转命令执行失败,响应: '{response}'")
# 即使失败也返回True让任务继续
return True
def get_latest_image(self, filename=None):
# 获取最新一帧并保存
img = None
# 从frame_queue中获取最新帧
while not self.frame_queue.empty():
img = self.frame_queue.get()
# 如果没有从队列获取到图像,尝试从视频流获取
if img is None and self.frame_iter is not None:
try:
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
except Exception as e:
pass
if img is not None:
if filename is None:
filename = f"img_{int(time.time())}.jpg"
save_dir = r"D:\yolo\ultralytics\assets"
os.makedirs(save_dir, exist_ok=True)
img_path = os.path.join(save_dir, filename)
cv2.imwrite(img_path, img)
self.latest_image_path = img_path
return img_path
return None
def move_and_capture(self, direction, distance, filename=None):
# 移动并返回图片路径
print(f"DroneMission: move_and_capture({direction}, {distance}, {filename}) called")
# 尝试移动
move_success = self.move(direction, distance)
time.sleep(2)
# 无论移动是否成功,都尝试拍照
result = self.get_latest_image(filename)
print(f"DroneMission: move_and_capture result = {result}")
if result:
print(f"DroneMission: 成功保存图片: {result}")
else:
print("DroneMission: 拍照失败")
return result
def close(self):
self.is_running = False
# 只有在没有外部资源时才关闭socket和container
if not hasattr(self, '_external_sock') or not self._external_sock:
if self.sock:
try:
self.sock.close()
except:
pass
if not hasattr(self, '_external_container') or not self._external_container:
if self.container:
try:
self.container.close()
except:
pass
# =================== 保留main函数兼容直接运行 ===================
def main():
print("=== road2.py main() called ===")
# 检查是否有外部传入的资源
if 'sock' in globals() and 'tello_address' in globals():
print("✓ Using external resources")
# 使用外部资源
mission = DroneMission(
external_sock=globals().get('sock'),
external_tello_address=globals().get('tello_address'),
external_frame_queue=globals().get('frame_queue'),
external_container=globals().get('container'),
external_frame_iter=globals().get('frame_iter')
)
else:
print("✓ Using independent resources")
# 独立运行模式
mission = DroneMission()
try:
print("测试无人机连接...")
if not mission.test_connection():
print("无人机连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("无人机连接仍然失败,但继续执行任务")
print("检查无人机状态...")
is_flying = mission.check_flying_status()
print(f"无人机飞行状态: {'飞行中' if is_flying else '地面'}")
print("起飞...")
takeoff_success = mission.takeoff()
print(f"起飞结果: {takeoff_success}")
time.sleep(3)
print("上升1米...")
up_success = mission.move('up', 100)
print(f"上升结果: {up_success}")
if up_success:
time.sleep(2)
print("向前飞行1米并拍照...")
img1 = mission.move_and_capture('forward', 100, "forward_1m.jpg")
print(f"向前飞行图片保存: {img1}")
print("向右飞行1米并拍照...")
img2 = mission.move_and_capture('right', 100, "right_1m.jpg")
print(f"向右飞行图片保存: {img2}")
print("降落...")
land_success = mission.land()
print(f"降落结果: {land_success}")
mission.close()
print("=== road2.py main() completed ===")
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
# 添加一个函数用于外部调用
def run_mission_with_external_resources(sock, tello_address, frame_queue=None, container=None, frame_iter=None):
"""使用外部资源运行任务的函数"""
print("=== road2.py run_mission_with_external_resources() called ===")
print(f"Received resources: sock={sock}, tello_address={tello_address}")
mission = DroneMission(
external_sock=sock,
external_tello_address=tello_address,
external_frame_queue=frame_queue,
external_container=container,
external_frame_iter=frame_iter
)
try:
print("测试无人机连接...")
if not mission.test_connection():
print("无人机连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("无人机连接仍然失败,但继续执行任务")
print("检查无人机状态...")
is_flying = mission.check_flying_status()
print(f"无人机飞行状态: {'飞行中' if is_flying else '地面'}")
print("起飞...")
takeoff_success = mission.takeoff()
print(f"起飞结果: {takeoff_success}")
time.sleep(3)
print("上升1米...")
up_success = mission.move('up', 100)
print(f"上升结果: {up_success}")
if up_success:
time.sleep(2)
print("向前飞行1米并拍照...")
img1 = mission.move_and_capture('forward', 100, "forward_1m.jpg")
print(f"向前飞行图片保存: {img1}")
print("向右飞行1米并拍照...")
img2 = mission.move_and_capture('right', 100, "right_1m.jpg")
print(f"向右飞行图片保存: {img2}")
print("降落...")
land_success = mission.land()
print(f"降落结果: {land_success}")
mission.close()
print("=== road2.py run_mission_with_external_resources() completed ===")
return True
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
return False
if __name__ == '__main__':
main()

@ -0,0 +1,348 @@
import cv2
import os
import av
import time
import socket
import math
import threading
from queue import Queue
import re
import sys
class DroneMission:
def __init__(self, external_sock=None, external_tello_address=None,
external_frame_queue=None, external_container=None,
external_frame_iter=None):
# 使用外部传入的资源,如果没有则创建新的
self.sock = external_sock
self.tello_address = external_tello_address or ('192.168.10.1', 8889)
self.frame_queue = external_frame_queue or Queue(maxsize=30)
self.container = external_container
self.frame_iter = external_frame_iter
# 记录外部资源状态
self._external_sock = external_sock is not None
self._external_container = external_container is not None
self.is_running = True
self.photo_count = 0
self.result_dir = r'D:\yolo\ultralytics\assets\circle'
os.makedirs(self.result_dir, exist_ok=True)
# 如果没有外部socket则初始化自己的
if not self.sock:
self._init_socket()
if not self.container or not self.frame_iter:
self._init_video_stream()
# 只有在没有外部资源时才启动自己的线程
if not external_sock and not external_container:
self._start_threads()
def _init_socket(self):
if self.sock:
try:
self.sock.close()
except:
pass
# 只有当没有外部socket时才创建新的socket
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 9000))
self.sock.settimeout(3.0)
def _parse_dm_response(self, resp):
if resp is None:
return None
m = re.match(r"(\d+)", resp)
if m:
return int(m.group(1))
return None
def _send_command(self, cmd):
try:
print(f"DroneMission sending command: {cmd}")
self.sock.sendto(cmd.encode('utf-8'), self.tello_address)
response, _ = self.sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"DroneMission received response: {response_str}")
return response_str
except socket.timeout:
print(f"DroneMission timeout sending command: {cmd}")
return None
except Exception as e:
print(f"Error sending command: {e}")
return None
def test_connection(self):
try:
response = self._send_command('command')
print(f"Connection test response: '{response}'")
# 兼容多种响应格式:'ok', 'Re0402 ok', 'Re0101 ok' 等
return response and ('ok' in response.lower())
except Exception as e:
print(f"Connection test failed: {e}")
return False
def check_flying_status(self):
try:
height_response = self._send_command('height?')
print(f"Height response: '{height_response}'")
height = self._parse_dm_response(height_response)
if height is not None:
is_flying = height > 20
print(f"Height: {height}cm, Flying: {is_flying}")
return is_flying
else:
print(f"Invalid height response: '{height_response}'")
return False
except Exception as e:
print(f"Error checking flying status: {e}")
return False
def _init_video_stream(self):
# 只有当没有外部视频流时才创建新的
if not self.container or not self.frame_iter:
try:
self.container = av.open('udp://0.0.0.0:11111', format='h264', timeout=3.0)
self.frame_iter = self.container.decode(video=0)
except Exception as e:
print(f"Error initializing video stream: {e}")
self.container = None
self.frame_iter = None
def _video_stream_thread(self):
while self.is_running:
try:
if self.frame_iter is None:
self._init_video_stream()
if self.frame_iter is None:
time.sleep(1)
continue
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(img)
except Exception as e:
print(f"Error in video stream thread: {e}")
time.sleep(0.1)
def _start_threads(self):
t1 = threading.Thread(target=self._video_stream_thread)
t1.daemon = True
t1.start()
def takeoff(self):
if self.check_flying_status():
print("DroneMission: 已在飞行,跳过起飞")
return True
resp = self._send_command('takeoff')
print(f"Takeoff response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(3)
return True
print("Takeoff failed, retrying...")
time.sleep(2)
retry = self._send_command('takeoff')
print(f"Takeoff retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def land(self):
if not self.check_flying_status():
print("DroneMission: 已在地面,跳过降落")
return True
resp = self._send_command('land')
print(f"Land response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(2)
return True
print("Land failed, retrying...")
time.sleep(2)
retry = self._send_command('land')
print(f"Land retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def move(self, direction, distance):
cmd = f'{direction} {distance}'
resp = self._send_command(cmd)
print(f"Move response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Move failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Move retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def rotate(self, degrees):
cmd = f'cw {degrees}'
resp = self._send_command(cmd)
print(f"Rotate response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Rotate failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Rotate retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def capture_photo(self):
img = None
while not self.frame_queue.empty():
img = self.frame_queue.get()
if img is not None:
self.photo_count += 1
filename = f"circle{self.photo_count:04d}.jpg"
img_path = os.path.join(self.result_dir, filename)
cv2.imwrite(img_path, img)
print(f"保存图片: {img_path}")
return img_path
print("未获取到图片帧,拍照失败")
return None
def circle_flight(self, radius=2000, height=1000, steps=36):
print("任务开始,测试连接...")
if not self.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not self.test_connection():
print("连接仍然失败,但继续任务")
print("起飞...")
if not self.takeoff():
print("起飞失败,但继续任务")
time.sleep(3)
print(f"上升{height/100}米...")
if not self.move('up', height):
print("上升失败,但继续任务")
time.sleep(3)
print(f"{radius/100}米为半径环绕飞行...")
angle_step = 360 // steps
arc_len = int(2 * math.pi * (radius/100) / steps * 100)
for i in range(steps):
if not self.rotate(angle_step):
print(f"{i+1}步旋转失败,但继续任务");
time.sleep(1)
if not self.move('forward', arc_len):
print(f"{i+1}步前进失败,但继续任务");
time.sleep(1)
self.capture_photo()
print("降落...")
self.land()
print("任务完成")
def close(self):
self.is_running = False
# 只有在没有外部资源时才关闭socket和container
if not hasattr(self, '_external_sock') or not self._external_sock:
if self.sock:
try:
self.sock.close()
except:
pass
if not hasattr(self, '_external_container') or not self._external_container:
if self.container:
try:
self.container.close()
except:
pass
def main():
print("=== road3.py main() called ===")
# 检查是否有外部传入的资源
if 'sock' in globals() and 'tello_address' in globals():
print("✓ Using external resources")
# 使用外部资源
mission = DroneMission(
external_sock=globals().get('sock'),
external_tello_address=globals().get('tello_address'),
external_frame_queue=globals().get('frame_queue'),
external_container=globals().get('container'),
external_frame_iter=globals().get('frame_iter')
)
else:
print("✓ Using independent resources")
# 独立运行模式
mission = DroneMission()
try:
print("任务开始,测试连接...")
if not mission.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("连接仍然失败,但继续任务")
mission.circle_flight(radius=2000, height=1000, steps=36)
mission.close()
print("=== road3.py main() completed ===")
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
# 添加一个函数用于外部调用
def run_mission_with_external_resources(sock, tello_address, frame_queue=None, container=None, frame_iter=None):
"""使用外部资源运行任务的函数"""
print("=== road3.py run_mission_with_external_resources() called ===")
print(f"Received resources: sock={sock}, tello_address={tello_address}")
mission = DroneMission(
external_sock=sock,
external_tello_address=tello_address,
external_frame_queue=frame_queue,
external_container=container,
external_frame_iter=frame_iter
)
# 初始化安全系统
if SAFETY_AVAILABLE:
try:
init_safety_system(sock, tello_address, frame_queue)
print("🛡️ 安全系统已初始化")
except Exception as e:
print(f"⚠️ 安全系统初始化失败: {e}")
try:
print("任务开始,测试连接...")
if not mission.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("连接仍然失败,但继续任务")
mission.circle_flight(radius=2000, height=1000, steps=36)
mission.close()
# 停止安全系统
if SAFETY_AVAILABLE:
stop_safety_system()
print("=== road3.py run_mission_with_external_resources() completed ===")
return True
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
return False
if __name__ == '__main__':
main()

@ -0,0 +1,422 @@
import cv2
import os
import av
import time
import socket
import math
import threading
from queue import Queue
import re
import sys
# 导入安全系统
try:
from safy import init_safety_system, get_safety_system, safe_command, force_land, get_safety_status, stop_safety_system
SAFETY_AVAILABLE = True
except ImportError:
SAFETY_AVAILABLE = False
print("⚠️ 安全系统模块不可用")
class DroneMission:
def __init__(self, external_sock=None, external_tello_address=None,
external_frame_queue=None, external_container=None,
external_frame_iter=None):
# 使用外部传入的资源,如果没有则创建新的
self.sock = external_sock
self.tello_address = external_tello_address or ('192.168.10.1', 8889)
self.frame_queue = external_frame_queue or Queue(maxsize=30)
self.container = external_container
self.frame_iter = external_frame_iter
# 记录外部资源状态
self._external_sock = external_sock is not None
self._external_container = external_container is not None
self.is_running = True
self.photo_count = 0
self.result_dir = r'D:\yolo\ultralytics\assets\snake'
os.makedirs(self.result_dir, exist_ok=True)
# 如果没有外部socket则初始化自己的
if not self.sock:
self._init_socket()
if not self.container or not self.frame_iter:
self._init_video_stream()
# 只有在没有外部资源时才启动自己的线程
if not external_sock and not external_container:
self._start_threads()
def _init_socket(self):
if self.sock:
try:
self.sock.close()
except:
pass
# 只有当没有外部socket时才创建新的socket
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 9000))
self.sock.settimeout(3.0)
def _parse_dm_response(self, resp):
if resp is None:
return None
m = re.match(r"(\d+)", resp)
if m:
return int(m.group(1))
return None
def _send_command(self, cmd):
# 如果安全系统可用,使用安全命令
if SAFETY_AVAILABLE and get_safety_system():
response = safe_command(cmd)
if response:
print(f"DroneMission (safe) received response: {response}")
return response
# 否则使用原始命令
try:
print(f"DroneMission sending command: {cmd}")
self.sock.sendto(cmd.encode('utf-8'), self.tello_address)
response, _ = self.sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"DroneMission received response: {response_str}")
return response_str
except socket.timeout:
print(f"DroneMission timeout sending command: {cmd}")
return None
except Exception as e:
print(f"Error sending command: {e}")
return None
def test_connection(self):
try:
response = self._send_command('command')
print(f"Connection test response: '{response}'")
# 兼容多种响应格式:'ok', 'Re0402 ok', 'Re0101 ok' 等
return response and ('ok' in response.lower())
except Exception as e:
print(f"Connection test failed: {e}")
return False
def check_flying_status(self):
try:
height_response = self._send_command('height?')
print(f"Height response: '{height_response}'")
height = self._parse_dm_response(height_response)
if height is not None:
is_flying = height > 20
print(f"Height: {height}cm, Flying: {is_flying}")
return is_flying
else:
print(f"Invalid height response: '{height_response}'")
return False
except Exception as e:
print(f"Error checking flying status: {e}")
return False
def _init_video_stream(self):
# 只有当没有外部视频流时才创建新的
if not self.container or not self.frame_iter:
try:
self.container = av.open('udp://0.0.0.0:11111', format='h264', timeout=3.0)
self.frame_iter = self.container.decode(video=0)
except Exception as e:
print(f"Error initializing video stream: {e}")
self.container = None
self.frame_iter = None
def _video_stream_thread(self):
while self.is_running:
try:
if self.frame_iter is None:
self._init_video_stream()
if self.frame_iter is None:
time.sleep(1)
continue
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(img)
except Exception as e:
print(f"Error in video stream thread: {e}")
time.sleep(0.1)
def _start_threads(self):
t1 = threading.Thread(target=self._video_stream_thread)
t1.daemon = True
t1.start()
def takeoff(self):
if self.check_flying_status():
print("DroneMission: 已在飞行,跳过起飞")
return True
resp = self._send_command('takeoff')
print(f"Takeoff response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(3)
return True
print("Takeoff failed, retrying...")
time.sleep(2)
retry = self._send_command('takeoff')
print(f"Takeoff retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def land(self):
if not self.check_flying_status():
print("DroneMission: 已在地面,跳过降落")
return True
resp = self._send_command('land')
print(f"Land response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(2)
return True
print("Land failed, retrying...")
time.sleep(2)
retry = self._send_command('land')
print(f"Land retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def move(self, direction, distance):
cmd = f'{direction} {distance}'
resp = self._send_command(cmd)
print(f"Move response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Move failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Move retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def rotate(self, degrees):
cmd = f'cw {degrees}'
resp = self._send_command(cmd)
print(f"Rotate response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Rotate failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Rotate retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def rotate_ccw(self, degrees):
cmd = f'ccw {degrees}'
resp = self._send_command(cmd)
print(f"RotateCCW response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("RotateCCW failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"RotateCCW retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def capture_photo(self):
img = None
while not self.frame_queue.empty():
img = self.frame_queue.get()
if img is not None:
self.photo_count += 1
filename = f"snake{self.photo_count:04d}.jpg"
img_path = os.path.join(self.result_dir, filename)
cv2.imwrite(img_path, img)
print(f"保存图片: {img_path}")
return img_path
print("未获取到图片帧,拍照失败")
return None
def snake_flight(self, radius=500, total_length=5000, steps_per_half=12):
print("任务开始,测试连接...")
if not self.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not self.test_connection():
print("连接仍然失败,但继续任务")
print("起飞...")
if not self.takeoff():
print("起飞失败,但继续任务")
time.sleep(3)
print("上升5米...")
if not self.move('up', 500):
print("上升失败,但继续任务")
time.sleep(3)
half_circum = math.pi * radius
num_half = int(total_length // half_circum)
print(f"蛇形路径:{num_half}个半圆,每个半径{radius/100}m总长度约{total_length/100}m")
# 启动拍照线程
self._photo_thread_running = True
def photo_thread():
while self._photo_thread_running:
self.capture_photo()
time.sleep(0.1)
t_photo = threading.Thread(target=photo_thread)
t_photo.daemon = True
t_photo.start()
for i in range(num_half):
direction = 'cw' if i % 2 == 0 else 'ccw'
for j in range(steps_per_half):
angle = 180 / steps_per_half
arc_len = int(math.pi * radius / steps_per_half)
if direction == 'cw':
self.rotate(angle)
else:
self.rotate_ccw(angle)
time.sleep(0.5)
self.move('forward', arc_len)
time.sleep(0.5)
print("原路返回...")
for i in reversed(range(num_half)):
direction = 'ccw' if i % 2 == 0 else 'cw'
for j in range(steps_per_half):
angle = 180 / steps_per_half
arc_len = int(math.pi * radius / steps_per_half)
if direction == 'cw':
self.rotate(angle)
else:
self.rotate_ccw(angle)
time.sleep(0.5)
self.move('forward', arc_len)
time.sleep(0.5)
self._photo_thread_running = False
t_photo.join(timeout=2)
print("降落...")
self.land()
print("任务完成")
def close(self):
self.is_running = False
# 只有在没有外部资源时才关闭socket和container
if not hasattr(self, '_external_sock') or not self._external_sock:
if self.sock:
try:
self.sock.close()
except:
pass
if not hasattr(self, '_external_container') or not self._external_container:
if self.container:
try:
self.container.close()
except:
pass
def main():
print("=== road4.py main() called ===")
# 检查是否有外部传入的资源
if 'sock' in globals() and 'tello_address' in globals():
print("✓ Using external resources")
# 使用外部资源
mission = DroneMission(
external_sock=globals().get('sock'),
external_tello_address=globals().get('tello_address'),
external_frame_queue=globals().get('frame_queue'),
external_container=globals().get('container'),
external_frame_iter=globals().get('frame_iter')
)
# 初始化安全系统
if SAFETY_AVAILABLE:
try:
init_safety_system(globals().get('sock'), globals().get('tello_address'), globals().get('frame_queue'))
print("🛡️ 安全系统已初始化")
except Exception as e:
print(f"⚠️ 安全系统初始化失败: {e}")
else:
print("✓ Using independent resources")
# 独立运行模式
mission = DroneMission()
try:
print("任务开始,测试连接...")
if not mission.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("连接仍然失败,但继续任务")
mission.snake_flight(radius=500, total_length=5000, steps_per_half=12)
mission.close()
# 停止安全系统
if SAFETY_AVAILABLE:
stop_safety_system()
print("=== road4.py main() completed ===")
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
# 异常情况下也停止安全系统
if SAFETY_AVAILABLE:
stop_safety_system()
# 添加一个函数用于外部调用
def run_mission_with_external_resources(sock, tello_address, frame_queue=None, container=None, frame_iter=None):
"""使用外部资源运行任务的函数"""
print("=== road4.py run_mission_with_external_resources() called ===")
print(f"Received resources: sock={sock}, tello_address={tello_address}")
mission = DroneMission(
external_sock=sock,
external_tello_address=tello_address,
external_frame_queue=frame_queue,
external_container=container,
external_frame_iter=frame_iter
)
# 初始化安全系统
if SAFETY_AVAILABLE:
try:
init_safety_system(sock, tello_address, frame_queue)
print("🛡️ 安全系统已初始化")
except Exception as e:
print(f"⚠️ 安全系统初始化失败: {e}")
try:
print("任务开始,测试连接...")
if not mission.test_connection():
print("连接失败,尝试重新连接...")
time.sleep(2)
if not mission.test_connection():
print("连接仍然失败,但继续任务")
mission.snake_flight(radius=500, total_length=5000, steps_per_half=12)
mission.close()
# 停止安全系统
if SAFETY_AVAILABLE:
stop_safety_system()
print("=== road4.py run_mission_with_external_resources() completed ===")
return True
except Exception as e:
print(f"任务执行出错: {e}")
import traceback
traceback.print_exc()
mission.close()
# 异常情况下也停止安全系统
if SAFETY_AVAILABLE:
stop_safety_system()
return False
if __name__ == '__main__':
main()

@ -0,0 +1,405 @@
import cv2
import os
import av
import time
import socket
import math
import threading
from queue import Queue
import re
import sys
class DroneMission:
def __init__(self, external_sock=None, external_tello_address=None,
external_frame_queue=None, external_container=None,
external_frame_iter=None):
# 使用外部传入的资源,如果没有则创建新的
self.sock = external_sock
self.tello_address = external_tello_address or ('192.168.10.1', 8889)
self.frame_queue = external_frame_queue or Queue(maxsize=30)
self.container = external_container
self.frame_iter = external_frame_iter
# 记录外部资源状态
self._external_sock = external_sock is not None
self._external_container = external_container is not None
self.is_running = True
self.photo_count = 0
self.result_dir = r'D:\yolo\ultralytics\assets\road5'
os.makedirs(self.result_dir, exist_ok=True)
# 如果没有外部socket则初始化自己的
if not self.sock:
self._init_socket()
if not self.container or not self.frame_iter:
self._init_video_stream()
# 只有在没有外部资源时才启动自己的线程
if not external_sock and not external_container:
self._start_threads()
def _init_socket(self):
if self.sock:
try:
self.sock.close()
except:
pass
# 只有当没有外部socket时才创建新的socket
if not self.sock:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('', 9000))
self.sock.settimeout(3.0)
def _parse_dm_response(self, resp):
if resp is None:
return None
m = re.match(r"(\d+)", resp)
if m:
return int(m.group(1))
return None
def _send_command(self, cmd):
try:
print(f"DroneMission sending command: {cmd}")
self.sock.sendto(cmd.encode('utf-8'), self.tello_address)
response, _ = self.sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"DroneMission received response: {response_str}")
return response_str
except socket.timeout:
print(f"DroneMission timeout sending command: {cmd}")
return None
except Exception as e:
print(f"Error sending command: {e}")
return None
def test_connection(self):
try:
response = self._send_command('command')
print(f"Connection test response: '{response}'")
# 兼容多种响应格式:'ok', 'Re0402 ok', 'Re0101 ok' 等
return response and ('ok' in response.lower())
except Exception as e:
print(f"Connection test failed: {e}")
return False
def check_flying_status(self):
try:
height_response = self._send_command('height?')
print(f"Height response: '{height_response}'")
height = self._parse_dm_response(height_response)
if height is not None:
is_flying = height > 20
print(f"Height: {height}cm, Flying: {is_flying}")
return is_flying
else:
print(f"Invalid height response: '{height_response}'")
return False
except Exception as e:
print(f"Error checking flying status: {e}")
return False
def _init_video_stream(self):
# 只有当没有外部视频流时才创建新的
if not self.container or not self.frame_iter:
try:
self.container = av.open('udp://0.0.0.0:11111', format='h264', timeout=3.0)
self.frame_iter = self.container.decode(video=0)
except Exception as e:
print(f"Error initializing video stream: {e}")
self.container = None
self.frame_iter = None
def _video_stream_thread(self):
while self.is_running:
try:
if self.frame_iter is None:
self._init_video_stream()
if self.frame_iter is None:
time.sleep(1)
continue
frame = next(self.frame_iter, None)
if frame is not None:
img = frame.to_ndarray(format='bgr24')
if self.frame_queue.full():
try:
self.frame_queue.get_nowait()
except:
pass
self.frame_queue.put(img)
except Exception as e:
print(f"Error in video stream thread: {e}")
time.sleep(0.1)
def _start_threads(self):
t1 = threading.Thread(target=self._video_stream_thread)
t1.daemon = True
t1.start()
def takeoff(self):
if self.check_flying_status():
print("DroneMission: 已在飞行,跳过起飞")
return True
resp = self._send_command('takeoff')
print(f"Takeoff response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(3)
return True
print("Takeoff failed, retrying...")
time.sleep(2)
retry = self._send_command('takeoff')
print(f"Takeoff retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def land(self):
if not self.check_flying_status():
print("DroneMission: 已在地面,跳过降落")
return True
resp = self._send_command('land')
print(f"Land response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok' 等
if resp and resp != 'error' and ('ok' in resp.lower()):
time.sleep(2)
return True
print("Land failed, retrying...")
time.sleep(2)
retry = self._send_command('land')
print(f"Land retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower())
def move(self, direction, distance):
cmd = f'{direction} {distance}'
resp = self._send_command(cmd)
print(f"Move response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Move failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Move retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def rotate(self, direction, degrees):
cmd = f'{direction} {degrees}'
resp = self._send_command(cmd)
print(f"Rotate response: '{resp}'")
# 兼容多种响应格式:'ok', 'Re0101 ok', 高度值等
if resp and resp != 'error' and ('ok' in resp.lower() or resp.isdigit()):
return True
print("Rotate failed, retrying...")
time.sleep(1)
retry = self._send_command(cmd)
print(f"Rotate retry response: '{retry}'")
return retry and retry != 'error' and ('ok' in retry.lower() or retry.isdigit())
def capture_photo(self):
"""拍摄照片并保存"""
try:
if not self.frame_queue.empty():
img = self.frame_queue.get()
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"road5_photo_{self.photo_count:03d}_{timestamp}.jpg"
filepath = os.path.join(self.result_dir, filename)
cv2.imwrite(filepath, img)
self.latest_image_path = filepath
self.photo_count += 1
print(f"Photo captured: {filepath}")
return filepath
else:
print("No frame available for photo capture")
return None
except Exception as e:
print(f"Error capturing photo: {e}")
return None
def mission_road5(self):
"""执行road5飞行任务"""
print("🚁 开始执行 Road5 飞行任务...")
# 步骤1: 起飞
print("📈 步骤1: 起飞")
if not self.takeoff():
print("❌ 起飞失败")
return False
print("✅ 起飞成功")
# 步骤2: 上升5米 (500cm)
print("⬆️ 步骤2: 上升5米")
if not self.move('up', 500):
print("❌ 上升失败")
return False
print("✅ 上升5米成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤3: 向前飞行10米 (1000cm)
print("➡️ 步骤3: 向前飞行10米")
if not self.move('forward', 1000):
print("❌ 向前飞行失败")
return False
print("✅ 向前飞行10米成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤4: 旋转270度
print("🔄 步骤4: 旋转270度")
if not self.rotate('cw', 270):
print("❌ 旋转270度失败")
return False
print("✅ 旋转270度成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤5: 向前飞行12米 (1200cm)
print("➡️ 步骤5: 向前飞行12米")
if not self.move('forward', 1200):
print("❌ 向前飞行失败")
return False
print("✅ 向前飞行12米成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤6: 旋转90度
print("🔄 步骤6: 旋转90度")
if not self.rotate('cw', 90):
print("❌ 旋转90度失败")
return False
print("✅ 旋转90度成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤7: 向后飞行6米 (600cm)
print("⬅️ 步骤7: 向后飞行6米")
if not self.move('back', 600):
print("❌ 向后飞行失败")
return False
print("✅ 向后飞行6米成功")
# 拍摄照片
self.capture_photo()
time.sleep(1)
# 步骤8: 返航 (返回起飞点)
print("🏠 步骤8: 返航")
# 计算返航路径:先向左飞行,再向后飞行
if not self.move('left', 1200): # 向左飞行12米
print("❌ 向左飞行失败")
return False
print("✅ 向左飞行成功")
if not self.move('back', 1000): # 向后飞行10米
print("❌ 向后飞行失败")
return False
print("✅ 向后飞行成功")
# 拍摄最终照片
self.capture_photo()
time.sleep(1)
# 步骤9: 降落
print("🛬 步骤9: 降落")
if not self.land():
print("❌ 降落失败")
return False
print("✅ 降落成功")
print("🎉 Road5 飞行任务完成!")
return True
def close(self):
"""关闭资源"""
self.is_running = False
if not self._external_sock and self.sock:
try:
self.sock.close()
except:
pass
if not self._external_container and self.container:
try:
self.container.close()
except:
pass
def main():
"""主函数"""
print("🚁 Road5 无人机任务启动")
# 创建无人机任务实例
mission = DroneMission()
try:
# 测试连接
if not mission.test_connection():
print("❌ 无法连接到无人机")
return
print("✅ 无人机连接成功")
# 执行road5任务
success = mission.mission_road5()
if success:
print("🎉 Road5 任务执行成功!")
else:
print("❌ Road5 任务执行失败")
except Exception as e:
print(f"❌ 任务执行出错: {e}")
finally:
mission.close()
def run_mission_with_external_resources(sock, tello_address, frame_queue=None, container=None, frame_iter=None):
"""使用外部资源运行任务"""
print("🚁 Road5 无人机任务启动 (使用外部资源)")
# 创建无人机任务实例,使用外部资源
mission = DroneMission(
external_sock=sock,
external_tello_address=tello_address,
external_frame_queue=frame_queue,
external_container=container,
external_frame_iter=frame_iter
)
try:
# 测试连接
if not mission.test_connection():
print("❌ 无法连接到无人机")
return False
print("✅ 无人机连接成功")
# 执行road5任务
success = mission.mission_road5()
if success:
print("🎉 Road5 任务执行成功!")
else:
print("❌ Road5 任务执行失败")
return success
except Exception as e:
print(f"❌ 任务执行出错: {e}")
return False
finally:
# 注意使用外部资源时不要关闭socket和container
mission.is_running = False
if __name__ == "__main__":
main()

@ -0,0 +1,442 @@
import socket
import time
import cv2
import os
import av
from ultralytics import YOLO
from flask import Flask, request, jsonify, render_template
import threading
from queue import Queue
import numpy as np
import sys
app = Flask(__name__)
# 全局变量
latest_image_path = None
container = None
frame_iter = None
sock = None
frame_queue = Queue(maxsize=30) # 用于存储视频帧的队列
detection_queue = Queue() # 用于存储待检测图片的队列
is_running = True # 控制线程运行状态
# 1. 发送指令让Tello进入SDK模式并开启0视频流
tello_address = ('192.168.10.1', 8889)
def create_socket():
"""创建并返回一个新的socket连接"""
global sock
if sock:
try:
sock.close()
except:
pass
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', 9000))
sock.settimeout(5.0) # 设置5秒超时
return sock
def send_with_retry(cmd, max_retries=3, delay=1):
"""发送命令并重试,直到成功或达到最大重试次数"""
global sock
if not sock:
sock = create_socket()
# 针对 rc 命令,减少重试次数并增加延时
if cmd.startswith('rc'):
max_retries = min(max_retries, 2)
delay = max(delay, 0.5)
# 设置socket超时
sock.settimeout(3.0) # 减少超时时间到3秒
for i in range(max_retries):
try:
print(f"Sending command: {cmd}")
# 在发送命令前暂停视频流处理
if cmd in ['forward', 'back', 'left', 'right', 'up', 'down', 'cw', 'ccw', 'takeoff']:
print("Pausing video stream for movement command...")
time.sleep(0.2) # 增加暂停时间以确保命令能被处理
sock.sendto(cmd.encode('utf-8'), tello_address)
try:
response, _ = sock.recvfrom(1024)
response_str = response.decode().strip()
print(f"Response: {response_str}")
if response_str.lower() == 'ok':
# 如果是移动命令,等待一小段时间确保命令执行
if cmd in ['forward', 'back', 'left', 'right', 'up', 'down', 'cw', 'ccw', 'takeoff']:
time.sleep(0.5) # 增加等待时间
return True
elif response_str.lower() == 'error':
if i < max_retries - 1:
print(f"Command failed, retrying... ({i+1}/{max_retries})")
time.sleep(delay)
continue
else:
# 对于查询命令,返回非空响应即可
if response_str and response_str != '0':
return True
elif i < max_retries - 1:
print(f"Invalid response, retrying... ({i+1}/{max_retries})")
time.sleep(delay)
continue
except socket.timeout:
print(f"Timeout waiting for response, retrying... ({i+1}/{max_retries})")
if i < max_retries - 1:
time.sleep(delay)
continue
except Exception as e:
print(f"Error sending command: {e}")
if i < max_retries - 1:
time.sleep(delay)
# 尝试重新创建socket
try:
sock = create_socket()
sock.settimeout(3.0)
except Exception as e:
print(f"Error recreating socket: {e}")
return False
def init_video_stream():
"""初始化视频流"""
global container, frame_iter
try:
print("Initializing video stream...")
# 尝试关闭可能存在的旧连接
try:
if container:
container.close()
container = None
frame_iter = None
except:
pass
# 尝试释放端口
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 11111))
sock.close()
except:
pass
# 设置超时时间
container = av.open('udp://0.0.0.0:11111', format='h264', timeout=5.0)
frame_iter = container.decode(video=0)
# 尝试获取第一帧来验证视频流是否正常工作
print("Testing video stream...")
try:
frame = next(frame_iter, None)
if frame is None:
print("Failed to get first frame")
return None, None
print("Successfully received first frame")
return container, frame_iter
except Exception as e:
print(f"Error getting first frame: {e}")
return None, None
except Exception as e:
print(f"Error initializing video stream: {e}")
return None, None
def video_stream_thread():
"""视频流处理线程"""
global container, frame_iter, is_running
error_count = 0
max_errors = 3
while is_running:
try:
frame = next(frame_iter, None)
if frame is not None:
# 将帧转换为numpy数组
img = frame.to_ndarray(format='bgr24')
# 如果队列满了,移除最旧的帧
if frame_queue.full():
try:
frame_queue.get_nowait()
except:
pass
# 将新帧放入队列
frame_queue.put(img)
error_count = 0 # 重置错误计数
except Exception as e:
error_count += 1
print(f"Error in video stream thread: {e}")
if error_count >= max_errors:
print("Too many video stream errors, attempting to reconnect...")
try:
# 尝试重新初始化视频流
container, frame_iter = init_video_stream()
if container is None or frame_iter is None:
print("Failed to reconnect video stream")
time.sleep(1)
else:
print("Successfully reconnected video stream")
error_count = 0
except Exception as reconnect_error:
print(f"Error reconnecting video stream: {reconnect_error}")
time.sleep(0.01)
def detection_thread():
"""图像检测线程"""
global is_running
model = YOLO(r'D:\yolo\yolov8n.pt')
while is_running:
try:
# 从检测队列获取图片路径
img_path = detection_queue.get(timeout=1)
if img_path:
try:
results = model(img_path)
for r in results:
save_path = os.path.join(r'D:\yolo\ultralytics\assets', 'detect_results', os.path.basename(img_path))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
r.save(save_path)
print(f"Detection completed for {img_path}")
except Exception as e:
print(f"Error in detection: {e}")
except:
time.sleep(0.01)
def set_latest_image_path(path):
"""设置最新图片路径"""
global latest_image_path
latest_image_path = path
def smooth_land():
"""平滑降落函数"""
print("Starting smooth landing...")
# 首先降低高度到安全高度
print("Lowering to safe height...")
if not send_with_retry('down 20', max_retries=3):
print("Warning: Failed to lower height")
time.sleep(2) # 等待高度调整完成
# 缓慢降低速度
print("Reducing speed...")
for i in range(3):
if not send_with_retry('rc 0 0 -10 0', max_retries=2):
print(f"Warning: Failed to reduce speed at step {i+1}")
time.sleep(0.5)
# 完全停止
print("Stopping all movement...")
if not send_with_retry('rc 0 0 0 0', max_retries=2):
print("Warning: Failed to stop movement")
time.sleep(0.5)
# 执行降落
print("Executing final landing...")
if not send_with_retry('land', max_retries=3):
print("Warning: Failed to execute landing command")
# 等待降落完成
print("Waiting for landing to complete...")
time.sleep(3)
# 全局异常钩子,确保任何未捕获异常都能安全降落
def global_exception_handler(exc_type, exc_value, exc_traceback):
print("Uncaught exception:", exc_type, exc_value)
try:
smooth_land()
except Exception as e:
print("Error during emergency landing:", e)
finally:
sys.exit(1)
sys.excepthook = global_exception_handler
def main():
global container, frame_iter, sock, is_running
# 确保存储图片的文件夹存在
save_dir = r"D:\yolo\ultralytics\assets"
os.makedirs(save_dir, exist_ok=True)
try:
# 初始化无人机
initialize_drone()
# 先上升5米
# print("Ascending 5 meters before other actions...")
# if not send_with_retry('up 500', max_retries=3):
# raise Exception("Failed to ascend 5 meters")
# time.sleep(3) # 等待上升完成
# 向前飞行1米
print("Moving forward 1 meter...")
if not send_with_retry('forward 100', max_retries=3):
raise Exception("Failed to move forward 1 meter")
time.sleep(3) # 等待前进完成
# 初始化视频流
container, frame_iter = init_video_stream()
if container is None or frame_iter is None:
raise Exception("Failed to initialize video stream")
# 启动视频流处理线程
video_thread = threading.Thread(target=video_stream_thread)
video_thread.daemon = True
video_thread.start()
# 启动检测线程
detect_thread = threading.Thread(target=detection_thread)
detect_thread.daemon = True
detect_thread.start()
img_count = 0
rotation_images = 12 # 360°/30° = 12张
# 自转360度并拍照
print("\nPhase: Rotating in place and capturing images...")
rotation_speed = 30 # 自转速度(度/秒)
rotation_interval = 30 # 每次拍照间隔的角度
rotation_time = rotation_interval / rotation_speed # 每次旋转所需时间
# 开始自转
print("Starting rotation...")
if not send_with_retry('rc 0 0 0 30', max_retries=3): # 设置自转速度
raise Exception("Failed to start rotation")
# 等待一小段时间让旋转稳定
time.sleep(1)
for i in range(rotation_images):
# 等待旋转到指定角度
time.sleep(rotation_time)
# 拍摄照片
try:
img = None
while not frame_queue.empty():
img = frame_queue.get()
if img is not None:
img_path = os.path.join(save_dir, f"rotation_{img_count:04d}.jpg")
cv2.imwrite(img_path, img)
print(f"Saved rotation image: {img_path}")
set_latest_image_path(img_path)
detection_queue.put(img_path)
img_count += 1
print(f"Captured rotation image {i+1}/{rotation_images}")
except Exception as e:
print(f"Error capturing rotation frame: {e}")
# 停止自转
print("Stopping rotation...")
if not send_with_retry('rc 0 0 0 0', max_retries=2):
print("Warning: Failed to stop rotation with rc command")
time.sleep(0.5) # 增加等待时间
except KeyboardInterrupt:
print("Stopped by user.")
smooth_land()
except Exception as e:
print(f"Error occurred: {e}")
smooth_land()
finally:
# 停止所有线程
is_running = False
# 确保停止所有运动
if not send_with_retry('rc 0 0 0 0', max_retries=2):
print("Warning: Failed to send rc stop command in cleanup")
time.sleep(0.5)
# 执行平滑降落
# smooth_land() # 已在except中调用这里可注释避免重复
if container:
container.close()
cv2.destroyAllWindows()
print("Stopping video stream...")
send_with_retry('streamoff', max_retries=3)
if sock:
sock.close()
def initialize_drone():
"""初始化无人机确保进入SDK模式"""
print("Initializing drone...")
# 重置连接
create_socket()
# 发送command命令直到成功
print("Entering SDK mode...")
if not send_with_retry('command', max_retries=5, delay=2):
raise Exception("Failed to enter SDK mode")
# 等待SDK模式完全启动
print("Waiting for SDK mode to initialize...")
time.sleep(3)
# 检查电池电量
print("Checking battery level...")
if not send_with_retry('battery?', max_retries=3):
print("Warning: Could not check battery level")
else:
print("Battery check completed")
# 检查WiFi信号强度
print("Checking WiFi signal...")
if not send_with_retry('wifi?', max_retries=3):
print("Warning: Could not check WiFi signal")
else:
print("WiFi check completed")
# 检查SDK模式状态
print("Checking SDK mode status...")
if not send_with_retry('sdk?', max_retries=3):
print("Warning: Could not verify SDK mode")
else:
print("SDK mode verified")
# 发送takeoff命令
print("Attempting to take off...")
if not send_with_retry('takeoff', max_retries=3, delay=2):
raise Exception("Failed to take off")
# 等待起飞完成
print("Waiting for takeoff to complete...")
time.sleep(5)
# 开启视频流
print("Starting video stream...")
if not send_with_retry('streamon', max_retries=3, delay=2):
raise Exception("Failed to start video stream")
# 等待视频流开启
print("Waiting for video stream to initialize...")
time.sleep(2)
# 尝试初始化视频流,如果失败则重试
max_retries = 3
for i in range(max_retries):
try:
container, frame_iter = init_video_stream()
if container is not None and frame_iter is not None:
print("Video stream initialized successfully")
return
print(f"Video stream initialization failed, retrying... ({i+1}/{max_retries})")
time.sleep(2)
except Exception as e:
print(f"Error during video stream initialization attempt {i+1}: {e}")
time.sleep(2)
raise Exception("Failed to initialize video stream after multiple attempts")
if __name__ == '__main__':
# 启动主程序
main()
# 启动Flask服务
threading.Thread(target=app.run, kwargs={'host': '0.0.0.0', 'port': 5000}).start()

@ -0,0 +1,53 @@
# Ultralytics YOLO 🚀, GPL-3.0 license
import re
from pathlib import Path
import pkg_resources as pkg
from setuptools import find_packages, setup
# Settings
FILE = Path(__file__).resolve()
ROOT = FILE.parent # root directory
README = (ROOT / "README.md").read_text(encoding="utf-8")
REQUIREMENTS = [f'{x.name}{x.specifier}' for x in pkg.parse_requirements((ROOT / 'requirements.txt').read_text())]
def get_version():
file = ROOT / 'ultralytics/__init__.py'
return re.search(r'^__version__ = [\'"]([^\'"]*)[\'"]', file.read_text(), re.M)[1]
setup(
name="ultralytics", # name of pypi package
version=get_version(), # version of pypi package
python_requires=">=3.7.0",
license='GPL-3.0',
description='Ultralytics YOLOv8 and HUB',
long_description=README,
long_description_content_type="text/markdown",
url="https://github.com/ultralytics/ultralytics",
project_urls={
'Bug Reports': 'https://github.com/ultralytics/ultralytics/issues',
'Funding': 'https://ultralytics.com',
'Source': 'https://github.com/ultralytics/ultralytics',},
author="Ultralytics",
author_email='hello@ultralytics.com',
packages=find_packages(), # required
include_package_data=True,
install_requires=REQUIREMENTS,
extras_require={
'dev':
['check-manifest', 'pytest', 'pytest-cov', 'coverage', 'mkdocs', 'mkdocstrings[python]', 'mkdocs-material'],},
classifiers=[
"Intended Audience :: Developers", "Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10",
"Topic :: Software Development", "Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Scientific/Engineering :: Image Recognition", "Operating System :: POSIX :: Linux",
"Operating System :: MacOS", "Operating System :: Microsoft :: Windows"],
keywords="machine-learning, deep-learning, vision, ML, DL, AI, YOLO, YOLOv3, YOLOv5, YOLOv8, HUB, Ultralytics",
entry_points={
'console_scripts': ['yolo = ultralytics.yolo.cli:cli', 'ultralytics = ultralytics.yolo.cli:cli'],})
Loading…
Cancel
Save