parent
928d78086f
commit
f7703b8dad
@ -0,0 +1,160 @@
|
||||
import logging
|
||||
import time
|
||||
import cv2
|
||||
import numpy as np
|
||||
from djitellopy import tello
|
||||
import KeyPressModule as kp
|
||||
from time import sleep
|
||||
import socket
|
||||
|
||||
class DroneController:
|
||||
def __init__(self, camera_width=720, camera_height=480, detect_range=[6000, 11000], pid_parameter=[0.5, 0.0004, 0.4], font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=0.5, font_color=(255, 0, 0), line_thickness=1, speed=70):
|
||||
self.camera_width = camera_width
|
||||
self.camera_height = camera_height
|
||||
self.detect_range = detect_range
|
||||
self.pid_parameter = pid_parameter
|
||||
self.font = font
|
||||
self.font_scale = font_scale
|
||||
self.font_color = font_color
|
||||
self.line_thickness = line_thickness
|
||||
self.speed = speed
|
||||
self.drone = tello.Tello()
|
||||
self.drone.connect()
|
||||
self.drone.streamon()
|
||||
self.drone.LOGGER.setLevel(logging.ERROR)
|
||||
sleep(5)
|
||||
kp.init()
|
||||
self.socketInit()
|
||||
|
||||
def socketInit(self):
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.socket.bind(('', 9000))
|
||||
self.server_address=('192.168.10.3',9999)
|
||||
def find_face(self, img):
|
||||
face_cascade = cv2.CascadeClassifier("Resources/haarcascade_frontalface_default.xml")
|
||||
face_cascade.load("Resources/haarcascade_frontalface_default.xml")
|
||||
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
faces = face_cascade.detectMultiScale(img_gray, 1.3, 8)
|
||||
my_face_list_c = []
|
||||
my_face_list_area = []
|
||||
|
||||
for (x, y, w, h) in faces:
|
||||
cv2.rectangle(img,(x,y),(x+w, y+h),(0, 0, 255),2)
|
||||
cx = x + w//2
|
||||
cy = y + h//2
|
||||
area = w*h
|
||||
cv2.circle(img, (cx, cy), 5, (255, 0, 0), cv2.FILLED)
|
||||
my_face_list_c.append([cx,cy])
|
||||
my_face_list_area.append(area)
|
||||
if len(my_face_list_area) != 0:
|
||||
i = my_face_list_area.index(max(my_face_list_area))
|
||||
info_text = "Area:{0} X:{1} Y:{2}".format(area , cx, cy)
|
||||
cv2.putText(img, info_text, (cx+20, cy), self.font, self.font_scale, self.font_color, self.line_thickness)
|
||||
return img, [my_face_list_c[i], my_face_list_area[i]]
|
||||
else:
|
||||
return img, [[0, 0], 0]
|
||||
|
||||
def track_face(self, img, info, p_error_rotate, p_error_up):
|
||||
area = info[1]
|
||||
x, y = info[0][0], info[0][1]
|
||||
fb = 0
|
||||
error_rotate = x - self.camera_width/2
|
||||
error_up = self.camera_height/2 - y
|
||||
cv2.circle(img, (int(self.camera_width/2), int(self.camera_height/2)), 5, (0, 255, 0), cv2.FILLED)
|
||||
if x >10 or y >10:
|
||||
cv2.line(img, (int(self.camera_width/2), int(self.camera_height/2)), (x,y), (255, 0, 0), self.line_thickness)
|
||||
rotate_speed = self.pid_parameter[0]*error_rotate + self.pid_parameter[1]*(error_rotate - p_error_rotate)
|
||||
updown_speed = self.pid_parameter[0]*error_up + self.pid_parameter[1]*(error_up - p_error_up)
|
||||
rotate_speed = int(np.clip(rotate_speed, -40, 40))
|
||||
updown_speed = int(np.clip(updown_speed, -60, 60))
|
||||
|
||||
area = info[1]
|
||||
if area > self.detect_range[0] and area < self.detect_range[1]:
|
||||
fb = 0
|
||||
info_text = "Hold Speed:{0} Rotate:{1} Up:{2}".format(fb, rotate_speed, updown_speed)
|
||||
cv2.putText(img, info_text, (10, 60), self.font, self.font_scale, self.font_color, self.line_thickness)
|
||||
self.drone.send_rc_control(0, fb, updown_speed, rotate_speed)
|
||||
elif area > self.detect_range[1]:
|
||||
fb = -20
|
||||
info_text = "Backward Speed:{0} Rotate:{1} Up:{2}".format(fb, rotate_speed, updown_speed)
|
||||
cv2.putText(img, info_text, (10, 60), self.font, self.font_scale, self.font_color, self.line_thickness)
|
||||
self.drone.send_rc_control(0, fb, updown_speed, rotate_speed)
|
||||
elif area < self.detect_range[0] and area > 1000:
|
||||
fb = 20
|
||||
info_text = "Forward Speed:{0} Rotate:{1} Up:{2}".format(fb, rotate_speed, updown_speed)
|
||||
cv2.putText(img, info_text, (10, 60), self.font, self.font_scale, self.font_color, self.line_thickness)
|
||||
self.drone.send_rc_control(0, fb, updown_speed, rotate_speed)
|
||||
else:
|
||||
self.drone.send_rc_control(0, 0, 0, 0)
|
||||
|
||||
if x == 0:
|
||||
speed = 0
|
||||
error = 0
|
||||
return error_rotate, error_up
|
||||
|
||||
def get_keyboard_input(self, image):
|
||||
lr, fb, ud, yv = 0, 0, 0, 0
|
||||
key_pressed = 0
|
||||
if kp.getKey("e"):
|
||||
cv2.imwrite('D:/snap-{}.jpg'.format(time.strftime("%H%M%S", time.localtime())), image)
|
||||
if kp.getKey("UP"):
|
||||
self.drone.takeoff()
|
||||
elif kp.getKey("DOWN"):
|
||||
self.drone.land()
|
||||
|
||||
if kp.getKey("j"):
|
||||
key_pressed = 1
|
||||
lr = -self.speed
|
||||
elif kp.getKey("l"):
|
||||
key_pressed = 1
|
||||
lr = self.speed
|
||||
|
||||
if kp.getKey("i"):
|
||||
key_pressed = 1
|
||||
fb = self.speed
|
||||
elif kp.getKey("k"):
|
||||
key_pressed = 1
|
||||
fb = -self.speed
|
||||
|
||||
if kp.getKey("w"):
|
||||
key_pressed = 1
|
||||
ud = self.speed
|
||||
elif kp.getKey("s"):
|
||||
key_pressed = 1
|
||||
ud = -self.speed
|
||||
|
||||
if kp.getKey("a"):
|
||||
key_pressed = 1
|
||||
yv = -self.speed
|
||||
elif kp.getKey("d"):
|
||||
key_pressed = 1
|
||||
yv = self.speed
|
||||
info_text = "battery : {0}% height: {1}cm time: {2}".format(self.drone.get_battery(), self.drone.get_height(), time.strftime("%H:%M:%S",time.localtime()))
|
||||
cv2.putText(image, info_text, (10, 20), self.font, self.font_scale, (0, 0, 255), self.line_thickness)
|
||||
if key_pressed == 1:
|
||||
info_text = "Command : lr:{0}% fb:{1} ud:{2} yv:{3}".format(lr, fb, ud, yv)
|
||||
cv2.putText(image, info_text, (10, 40), self.font, self.font_scale, (0, 0, 255), self.line_thickness)
|
||||
|
||||
self.drone.send_rc_control(lr, fb, ud, yv)
|
||||
|
||||
def run(self):
|
||||
p_error_rotate, p_error_up = 0, 0
|
||||
while True:
|
||||
original_image = self.drone.get_frame_read().frame
|
||||
image = cv2.resize(original_image, (self.camera_width, self.camera_height))
|
||||
self.get_keyboard_input(image)
|
||||
img, info = self.find_face(image)
|
||||
p_error_rotate, p_error_up = self.track_face(img, info, p_error_rotate, p_error_up)
|
||||
cv2.imshow("Drone Control Centre", image)
|
||||
cv2.waitKey(1)
|
||||
# if self.drone.get_frame_read().grabbed:
|
||||
# # 转换帧格式
|
||||
# encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
|
||||
# result, frame_encode = cv2.imencode('.jpg', img, encode_param)
|
||||
# # 发送数据包
|
||||
# self.socket.sendto(frame_encode.tobytes(), self.server_address)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tello=DroneController()
|
||||
tello.run()
|
@ -0,0 +1,18 @@
|
||||
import pygame
|
||||
|
||||
def init():
|
||||
pygame.init()
|
||||
win = pygame.display.set_mode((400, 400))
|
||||
|
||||
def getKey(keyName):
|
||||
ans = False
|
||||
for eve in pygame.event.get(): pass
|
||||
keyInput = pygame.key.get_pressed()
|
||||
myKey = getattr(pygame,'K_{}'.format(keyName))
|
||||
if keyInput[myKey]:
|
||||
ans = True
|
||||
pygame.display.update()
|
||||
return ans
|
||||
|
||||
if __name__ == '__main__':
|
||||
init()
|
@ -0,0 +1,12 @@
|
||||
import requests
|
||||
def Map(origin, destination):
|
||||
params = {'key': 'f3f31ecbd5286f1d375707a2c8186dba',
|
||||
'origin': origin,
|
||||
'destination': destination,
|
||||
}
|
||||
r = requests.get('https://restapi.amap.com/v3/direction/walking', params=params)
|
||||
steps = r.json()['route']['paths'][0]['steps']
|
||||
print(steps)
|
||||
return steps
|
||||
|
||||
# Map('116.481028,39.989643', '116.434446,39.90816')
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue