From d7c10986c712357997bf7c64afca656850296b1e Mon Sep 17 00:00:00 2001 From: qiuwb <389791945@qq.com> Date: Tue, 27 Jun 2023 23:54:13 +0800 Subject: [PATCH] =?UTF-8?q?tello=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/PC_Transfer/tello_control.py | 325 +++++++++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 src/PC_Transfer/tello_control.py diff --git a/src/PC_Transfer/tello_control.py b/src/PC_Transfer/tello_control.py new file mode 100644 index 0000000..33b17af --- /dev/null +++ b/src/PC_Transfer/tello_control.py @@ -0,0 +1,325 @@ +from djitellopy import tello +import datetime +import time +import threading +import socket +import json + +class TelloControl: + ''' + tello控制类 + ''' + def __init__(self): + self.drone = tello.Tello(host="192.168.39.58") + self.speed = 50 + self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + log_filename = f"log/log_{self.timestamp}.txt" + self.log_file = open(file=log_filename, mode="a+", encoding="utf-8",buffering=1) + self.log_file.write(f"{self.timestamp}\n") + + self.socket_recv_host = '' + self.socket_recv_port = 8888 + + self.recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.recv_sock.bind((self.socket_recv_host, self.socket_recv_port)) + self.recv_thread = threading.Thread(target=self.recive) + self.recv_thread.setDaemon(True) + self.recv_thread.start() + + self.auto_fly_flag = False + self.auto_home_flag = False + + self.photo_quality = 50 + + self.send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.send_address = ('192.168.39.145', 6666) + + self.start_time = None + + self.distance=100 + self.angle=20 + + self.send_info_thread = threading.Thread(target=self.send_info) + self.send_info_thread.setDaemon(True) + self.send_info_thread.start() + + + def recive(self): + while True: + data, addr = self.recv_sock.recvfrom(1024) + print('Received: ', data.decode('utf-8')) + if data: + self.control(data.decode('utf-8')) + + def control(self, data): + """ + tello控制 + :param data: + :return: + """ + try: + if data == "TAKEOFF": + self.drone.takeoff() + self.drone.set_speed(self.speed) + self.log_file.write(str(datetime.datetime.now()) + "\tTAKEOFF" + "\n") + elif data == "LAND": + self.drone.land() + self.log_file.write(str(datetime.datetime.now()) + "\tLAND" + "\n") + elif data == "FORWARD": + self.drone.move_forward(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tFORWARD {self.distance}" + "\n") + elif data == "BACKWORD": + self.drone.move_back(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tBACK {self.distance}" + "\n") + elif data == "LEFT": + self.drone.move_left(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tLEFT {self.distance}" + "\n") + elif data == "RIGHT": + self.drone.move_right(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tRIGHT {self.distance}" + "\n") + elif data == "UP": + self.drone.move_up(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tUP {self.distance}" + "\n") + elif data == "DOWN": + self.drone.move_down(self.distance) +# time.sleep(self.distance / self.speed) + + self.log_file.write(str(datetime.datetime.now()) + f"\tDOWN {self.distance}" + "\n") + elif data == "TURN_LEFT": + self.drone.rotate_counter_clockwise(self.angle) + self.log_file.write(str(datetime.datetime.now()) + f"\tTURN_LEFT {self.angle}" + "\n") + elif data == "TURN_RIGHT": + self.drone.rotate_clockwise(self.angle) + self.log_file.write(str(datetime.datetime.now()) + f"\tTURN_RIGHT {self.angle}" + "\n") + elif "SPEED" in data: + speed = int(data.split(" ")[1]) + self.drone.set_speed(speed) + self.speed = speed + self.log_file.write(str(datetime.datetime.now()) +"\t"+ data + "\n") + # elif "AUTOF" in data: + # distance = int(data.split(" ")[1]) + # angle = int(data.split(" ")[2]) + # self.autofly(distance, angle) + # self.log_file.write(str(datetime.datetime.now()) + data + "\n") + + elif data == "STOP": + self.log_file.write(str(datetime.datetime.now()) + "\tSTOP" + "\n") + self.auto_fly_flag = True + self.auto_home_flag = True + self.drone.stopmove() + + elif data == "PHOTO_LOW": + self.photo_quality = 50 + + elif data == "PHOTO_HIGH": + self.photo_quality = 100 + + elif "WARNING" in data: + self.auto_fly_flag = False + self.drone.connect() + self.drone.streamon() + self.drone.takeoff() + self.log_file.write( str(datetime.datetime.now()) +"\t"+ data + "\n") + + distance = int(data.split(" ")[1]) + angle = int(data.split(" ")[2]) + self.log_file.write(f"TURN_RIGHT {angle}\n") + self.log_file.write(f"FORWARD {distance}\n") + self.autofly(distance, angle) + # elif data == "INFO": + # data_thread = threading.Thread(target=self.send_info) + # data_thread.setDaemon(True) + # data_thread.start() + elif data == "DIRECTHOME": + self.auto_home_flag = False + self.return_home_line() + self.log_file.write(str(datetime.datetime.now()) + "\tDIRECTHOME" + "\n") + + except Exception as e: + print(e) + self.log_file.write(str(datetime.datetime.now()) + "\t" + str(e) + "\n") + if "Command" not in str(e): + self.drone.land() + self.drone.streamoff() + self.drone.end() + self.log_file.close() + + def autofly(self, distance, angle): + ''' + 自动飞行 + :param distance: + :param angle: + :return: + ''' + self.start_time = datetime.datetime.now() + self.drone.rotate_clockwise(angle) + while distance > 500: + if self.auto_fly_flag: + self.auto_fly_flag = False + return + self.drone.move_forward(500) + distance -= 500 + # time.sleep(100 / self.speed) + + self.drone.move_forward(distance) + time.sleep(distance / self.speed) + + def return_home(self): + # 实现将无人机返回到出发点的逻辑 + # 根据日志中的操作,执行相反的操作即可 + # 例如,如果日志中有"FORWARD self.distance"的操作,则执行"BACK self.distance"来反向操作 + # 注意控制时间间隔,可以使用time.sleep()方法 + + self.log_file.seek(0) # 将文件指针移动到文件开头 + for line in self.log_file: + if self.auto_home_flag: + self.drone.emergency() + return + if "FORWARD" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_back(distance) + time.sleep(distance / self.speed) + elif "BACK" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_forward(distance) + time.sleep(distance / self.speed) + elif "LEFT" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_right(distance) + time.sleep(distance / self.speed) + elif "RIGHT" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_left(distance) + time.sleep(distance / self.speed) + elif "UP" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_down(distance) + time.sleep(distance / self.speed) + elif "DOWN" in line: + distance = int(line.split(" ")[-1]) + self.drone.move_up(distance) + time.sleep(distance / self.speed) + elif "TURN_LEFT" in line: + angle = int(line.split(" ")[-1]) + self.drone.rotate_clockwise(angle) + time.sleep(angle / self.speed) + elif "TURN_RIGHT" in line: + angle = int(line.split(" ")[-1]) + self.drone.rotate_counter_clockwise(angle) + time.sleep(angle / self.speed) + + self.drone.land() + # 文件指针移动到文件末尾 + self.log_file.seek(0, 2) + + def return_home_line(self): + ''' + 通过直线距离返回 + :return: + ''' + self.log_file.seek(0) # 将文件指针移动到文件开头 + total_distance_forward = 0 + total_distance_right = 0 + total_distance_up = 0 + + for line in self.log_file: + if self.auto_home_flag: # 如果标志变量为True,中断自动返回 + self.auto_home_flag = False + break + + if "FORWARD" in line: + distance = int(line.split(" ")[-1]) + total_distance_forward += distance + elif "BACK" in line: + distance = int(line.split(" ")[-1]) + total_distance_forward -= distance + elif "LEFT" in line: + distance = int(line.split(" ")[-1]) + total_distance_right -= distance + elif "RIGHT" in line: + distance = int(line.split(" ")[-1]) + total_distance_right += distance + elif "UP" in line: + distance = int(line.split(" ")[-1]) + total_distance_up += distance + elif "DOWN" in line: + distance = int(line.split(" ")[-1]) + total_distance_up -= distance + print(total_distance_forward, total_distance_right, total_distance_up) + self.log_file.seek(0, 2) + while total_distance_forward != 0: + distance = min(500, abs(total_distance_forward)) + if self.auto_home_flag: # 如果标志变量为True,中断自动返回 + self.auto_home_flag = False + return + if total_distance_forward > 0: + self.drone.move_back(distance) + total_distance_forward -= distance + else: + self.drone.move_forward(distance) + total_distance_forward += distance + time.sleep(distance / self.speed) + + while total_distance_right != 0: + distance = min(500, abs(total_distance_right)) + if self.auto_home_flag: # 如果标志变量为True,中断自动返回 + self.auto_home_flag = False + return + if total_distance_right > 0: + self.drone.move_left(distance) + total_distance_right -= distance + else: + self.drone.move_right(distance) + total_distance_right += distance + time.sleep(distance / self.speed) + + while total_distance_up != 0: + distance = min(500, abs(total_distance_up)) + if self.auto_home_flag: # 如果标志变量为True,中断自动返回 + self.auto_home_flag = False + return + if total_distance_up > 0: + self.drone.move_down(distance) + total_distance_up -= distance + else: + self.drone.move_up(distance) + total_distance_up += distance + time.sleep(distance / self.speed) + + + + def send_info(self): + ''' + 发送无人机的信息 + :return: + ''' + while True: + battery = self.drone.query_battery() + data = { + "battery": battery, + "speed": self.speed, + "distance": self.calculate_distance(self.start_time)} + self.send_sock.send(json.dumps(data).encode()) + time.sleep(2) + + def calculate_distance(self, start_time ): + ''' + 计算无人机飞行的距离 + :param start_time: + :return distance_diff: + ''' + current_time = datetime.datetime.now() + time_diff = current_time - start_time + distance_diff = time_diff.total_seconds() * self.speed + return distance_diff +