tello控制

master
qiuwb 2 years ago
parent ba61ab0545
commit d7c10986c7

@ -0,0 +1,325 @@
from djitellopy import tello
import datetime
import time
import threading
import socket
import json
class TelloControl:
'''
tello控制类
'''
def __init__(self):
self.drone = tello.Tello(host="192.168.39.58")
self.speed = 50
self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"log/log_{self.timestamp}.txt"
self.log_file = open(file=log_filename, mode="a+", encoding="utf-8",buffering=1)
self.log_file.write(f"{self.timestamp}\n")
self.socket_recv_host = ''
self.socket_recv_port = 8888
self.recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.recv_sock.bind((self.socket_recv_host, self.socket_recv_port))
self.recv_thread = threading.Thread(target=self.recive)
self.recv_thread.setDaemon(True)
self.recv_thread.start()
self.auto_fly_flag = False
self.auto_home_flag = False
self.photo_quality = 50
self.send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.send_address = ('192.168.39.145', 6666)
self.start_time = None
self.distance=100
self.angle=20
self.send_info_thread = threading.Thread(target=self.send_info)
self.send_info_thread.setDaemon(True)
self.send_info_thread.start()
def recive(self):
while True:
data, addr = self.recv_sock.recvfrom(1024)
print('Received: ', data.decode('utf-8'))
if data:
self.control(data.decode('utf-8'))
def control(self, data):
"""
tello控制
:param data:
:return:
"""
try:
if data == "TAKEOFF":
self.drone.takeoff()
self.drone.set_speed(self.speed)
self.log_file.write(str(datetime.datetime.now()) + "\tTAKEOFF" + "\n")
elif data == "LAND":
self.drone.land()
self.log_file.write(str(datetime.datetime.now()) + "\tLAND" + "\n")
elif data == "FORWARD":
self.drone.move_forward(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tFORWARD {self.distance}" + "\n")
elif data == "BACKWORD":
self.drone.move_back(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tBACK {self.distance}" + "\n")
elif data == "LEFT":
self.drone.move_left(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tLEFT {self.distance}" + "\n")
elif data == "RIGHT":
self.drone.move_right(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tRIGHT {self.distance}" + "\n")
elif data == "UP":
self.drone.move_up(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tUP {self.distance}" + "\n")
elif data == "DOWN":
self.drone.move_down(self.distance)
# time.sleep(self.distance / self.speed)
self.log_file.write(str(datetime.datetime.now()) + f"\tDOWN {self.distance}" + "\n")
elif data == "TURN_LEFT":
self.drone.rotate_counter_clockwise(self.angle)
self.log_file.write(str(datetime.datetime.now()) + f"\tTURN_LEFT {self.angle}" + "\n")
elif data == "TURN_RIGHT":
self.drone.rotate_clockwise(self.angle)
self.log_file.write(str(datetime.datetime.now()) + f"\tTURN_RIGHT {self.angle}" + "\n")
elif "SPEED" in data:
speed = int(data.split(" ")[1])
self.drone.set_speed(speed)
self.speed = speed
self.log_file.write(str(datetime.datetime.now()) +"\t"+ data + "\n")
# elif "AUTOF" in data:
# distance = int(data.split(" ")[1])
# angle = int(data.split(" ")[2])
# self.autofly(distance, angle)
# self.log_file.write(str(datetime.datetime.now()) + data + "\n")
elif data == "STOP":
self.log_file.write(str(datetime.datetime.now()) + "\tSTOP" + "\n")
self.auto_fly_flag = True
self.auto_home_flag = True
self.drone.stopmove()
elif data == "PHOTO_LOW":
self.photo_quality = 50
elif data == "PHOTO_HIGH":
self.photo_quality = 100
elif "WARNING" in data:
self.auto_fly_flag = False
self.drone.connect()
self.drone.streamon()
self.drone.takeoff()
self.log_file.write( str(datetime.datetime.now()) +"\t"+ data + "\n")
distance = int(data.split(" ")[1])
angle = int(data.split(" ")[2])
self.log_file.write(f"TURN_RIGHT {angle}\n")
self.log_file.write(f"FORWARD {distance}\n")
self.autofly(distance, angle)
# elif data == "INFO":
# data_thread = threading.Thread(target=self.send_info)
# data_thread.setDaemon(True)
# data_thread.start()
elif data == "DIRECTHOME":
self.auto_home_flag = False
self.return_home_line()
self.log_file.write(str(datetime.datetime.now()) + "\tDIRECTHOME" + "\n")
except Exception as e:
print(e)
self.log_file.write(str(datetime.datetime.now()) + "\t" + str(e) + "\n")
if "Command" not in str(e):
self.drone.land()
self.drone.streamoff()
self.drone.end()
self.log_file.close()
def autofly(self, distance, angle):
'''
自动飞行
:param distance:
:param angle:
:return:
'''
self.start_time = datetime.datetime.now()
self.drone.rotate_clockwise(angle)
while distance > 500:
if self.auto_fly_flag:
self.auto_fly_flag = False
return
self.drone.move_forward(500)
distance -= 500
# time.sleep(100 / self.speed)
self.drone.move_forward(distance)
time.sleep(distance / self.speed)
def return_home(self):
# 实现将无人机返回到出发点的逻辑
# 根据日志中的操作,执行相反的操作即可
# 例如,如果日志中有"FORWARD self.distance"的操作,则执行"BACK self.distance"来反向操作
# 注意控制时间间隔可以使用time.sleep()方法
self.log_file.seek(0) # 将文件指针移动到文件开头
for line in self.log_file:
if self.auto_home_flag:
self.drone.emergency()
return
if "FORWARD" in line:
distance = int(line.split(" ")[-1])
self.drone.move_back(distance)
time.sleep(distance / self.speed)
elif "BACK" in line:
distance = int(line.split(" ")[-1])
self.drone.move_forward(distance)
time.sleep(distance / self.speed)
elif "LEFT" in line:
distance = int(line.split(" ")[-1])
self.drone.move_right(distance)
time.sleep(distance / self.speed)
elif "RIGHT" in line:
distance = int(line.split(" ")[-1])
self.drone.move_left(distance)
time.sleep(distance / self.speed)
elif "UP" in line:
distance = int(line.split(" ")[-1])
self.drone.move_down(distance)
time.sleep(distance / self.speed)
elif "DOWN" in line:
distance = int(line.split(" ")[-1])
self.drone.move_up(distance)
time.sleep(distance / self.speed)
elif "TURN_LEFT" in line:
angle = int(line.split(" ")[-1])
self.drone.rotate_clockwise(angle)
time.sleep(angle / self.speed)
elif "TURN_RIGHT" in line:
angle = int(line.split(" ")[-1])
self.drone.rotate_counter_clockwise(angle)
time.sleep(angle / self.speed)
self.drone.land()
# 文件指针移动到文件末尾
self.log_file.seek(0, 2)
def return_home_line(self):
'''
通过直线距离返回
:return:
'''
self.log_file.seek(0) # 将文件指针移动到文件开头
total_distance_forward = 0
total_distance_right = 0
total_distance_up = 0
for line in self.log_file:
if self.auto_home_flag: # 如果标志变量为True中断自动返回
self.auto_home_flag = False
break
if "FORWARD" in line:
distance = int(line.split(" ")[-1])
total_distance_forward += distance
elif "BACK" in line:
distance = int(line.split(" ")[-1])
total_distance_forward -= distance
elif "LEFT" in line:
distance = int(line.split(" ")[-1])
total_distance_right -= distance
elif "RIGHT" in line:
distance = int(line.split(" ")[-1])
total_distance_right += distance
elif "UP" in line:
distance = int(line.split(" ")[-1])
total_distance_up += distance
elif "DOWN" in line:
distance = int(line.split(" ")[-1])
total_distance_up -= distance
print(total_distance_forward, total_distance_right, total_distance_up)
self.log_file.seek(0, 2)
while total_distance_forward != 0:
distance = min(500, abs(total_distance_forward))
if self.auto_home_flag: # 如果标志变量为True中断自动返回
self.auto_home_flag = False
return
if total_distance_forward > 0:
self.drone.move_back(distance)
total_distance_forward -= distance
else:
self.drone.move_forward(distance)
total_distance_forward += distance
time.sleep(distance / self.speed)
while total_distance_right != 0:
distance = min(500, abs(total_distance_right))
if self.auto_home_flag: # 如果标志变量为True中断自动返回
self.auto_home_flag = False
return
if total_distance_right > 0:
self.drone.move_left(distance)
total_distance_right -= distance
else:
self.drone.move_right(distance)
total_distance_right += distance
time.sleep(distance / self.speed)
while total_distance_up != 0:
distance = min(500, abs(total_distance_up))
if self.auto_home_flag: # 如果标志变量为True中断自动返回
self.auto_home_flag = False
return
if total_distance_up > 0:
self.drone.move_down(distance)
total_distance_up -= distance
else:
self.drone.move_up(distance)
total_distance_up += distance
time.sleep(distance / self.speed)
def send_info(self):
'''
发送无人机的信息
:return:
'''
while True:
battery = self.drone.query_battery()
data = {
"battery": battery,
"speed": self.speed,
"distance": self.calculate_distance(self.start_time)}
self.send_sock.send(json.dumps(data).encode())
time.sleep(2)
def calculate_distance(self, start_time ):
'''
计算无人机飞行的距离
:param start_time:
:return distance_diff:
'''
current_time = datetime.datetime.now()
time_diff = current_time - start_time
distance_diff = time_diff.total_seconds() * self.speed
return distance_diff
Loading…
Cancel
Save