tello控制

tello控制
master
mvbayc8qt 2 years ago
commit afb3b38039

@ -0,0 +1,2 @@
from .tello import Tello, TelloException, BackgroundFrameRead
from .swarm import TelloSwarm

@ -0,0 +1,65 @@
"""
This file is based on a StackOverflow post by @301_Moved_Permanently.
See https://stackoverflow.com/a/50622643
The code was adapted to be able to wrap all methods of a class by simply
adding the decorator to the class itself.
"""
import inspect
import typing
from contextlib import suppress
from functools import wraps
def _is_unparameterized_special_typing(type_hint):
# Check for typing.Any, typing.Union, typing.ClassVar (without parameters)
if hasattr(typing, "_SpecialForm"):
return isinstance(type_hint, typing._SpecialForm)
elif hasattr(type_hint, "__origin__"):
return type_hint.__origin__ is None
else:
return False
def enforce_types(target):
"""Class decorator adding type checks to all member functions
"""
def check_types(spec, *args, **kwargs):
parameters = dict(zip(spec.args, args))
parameters.update(kwargs)
for name, value in parameters.items():
with suppress(KeyError): # Assume un-annotated parameters can be any type
type_hint = spec.annotations[name]
if _is_unparameterized_special_typing(type_hint):
continue
if hasattr(type_hint, "__origin__") and type_hint.__origin__ is not None:
actual_type = type_hint.__origin__
elif hasattr(type_hint, "__args__") and type_hint.__args__ is not None:
actual_type = type_hint.__args__
else:
actual_type = type_hint
if not isinstance(value, actual_type):
raise TypeError("Unexpected type for '{}' (expected {} but found {})"
.format(name, type_hint, type(value)))
def decorate(func):
spec = inspect.getfullargspec(func)
@wraps(func)
def wrapper(*args, **kwargs):
check_types(spec, *args, **kwargs)
return func(*args, **kwargs)
return wrapper
if inspect.isclass(target):
members = inspect.getmembers(target, predicate=inspect.isfunction)
for name, func in members:
setattr(target, name, decorate(func))
return target
else:
return decorate(target)

@ -0,0 +1,159 @@
"""Library for controlling multiple DJI Ryze Tello drones.
"""
from threading import Thread, Barrier
from queue import Queue
from typing import List, Callable
from .tello import Tello, TelloException
from .enforce_types import enforce_types
@enforce_types
class TelloSwarm:
"""Swarm library for controlling multiple Tellos simultaneously
"""
tellos: List[Tello]
barrier: Barrier
funcBarier: Barrier
funcQueues: List[Queue]
threads: List[Thread]
@staticmethod
def fromFile(path: str):
"""Create TelloSwarm from file. The file should contain one IP address per line.
Arguments:
path: path to the file
"""
with open(path, 'r') as fd:
ips = fd.readlines()
return TelloSwarm.fromIps(ips)
@staticmethod
def fromIps(ips: list):
"""Create TelloSwarm from a list of IP addresses.
Arguments:
ips: list of IP Addresses
"""
if not ips:
raise TelloException("No ips provided")
tellos = []
for ip in ips:
tellos.append(Tello(ip.strip()))
return TelloSwarm(tellos)
def __init__(self, tellos: List[Tello]):
"""Initialize a TelloSwarm instance
Arguments:
tellos: list of [Tello][tello] instances
"""
self.tellos = tellos
self.barrier = Barrier(len(tellos))
self.funcBarrier = Barrier(len(tellos) + 1)
self.funcQueues = [Queue() for tello in tellos]
def worker(i):
queue = self.funcQueues[i]
tello = self.tellos[i]
while True:
func = queue.get()
self.funcBarrier.wait()
func(i, tello)
self.funcBarrier.wait()
self.threads = []
for i, _ in enumerate(tellos):
thread = Thread(target=worker, daemon=True, args=(i,))
thread.start()
self.threads.append(thread)
def sequential(self, func: Callable[[int, Tello], None]):
"""Call `func` for each tello sequentially. The function retrieves
two arguments: The index `i` of the current drone and `tello` the
current [Tello][tello] instance.
```python
swarm.parallel(lambda i, tello: tello.land())
```
"""
for i, tello in enumerate(self.tellos):
func(i, tello)
def parallel(self, func: Callable[[int, Tello], None]):
"""Call `func` for each tello in parallel. The function retrieves
two arguments: The index `i` of the current drone and `tello` the
current [Tello][tello] instance.
You can use `swarm.sync()` for syncing between threads.
```python
swarm.parallel(lambda i, tello: tello.move_up(50 + i * 10))
```
"""
for queue in self.funcQueues:
queue.put(func)
self.funcBarrier.wait()
self.funcBarrier.wait()
def sync(self, timeout: float = None):
"""Sync parallel tello threads. The code continues when all threads
have called `swarm.sync`.
```python
def doStuff(i, tello):
tello.move_up(50 + i * 10)
swarm.sync()
if i == 2:
tello.flip_back()
# make all other drones wait for one to complete its flip
swarm.sync()
swarm.parallel(doStuff)
```
"""
return self.barrier.wait(timeout)
def __getattr__(self, attr):
"""Call a standard tello function in parallel on all tellos.
```python
swarm.command()
swarm.takeoff()
swarm.move_up(50)
```
"""
def callAll(*args, **kwargs):
self.parallel(lambda i, tello: getattr(tello, attr)(*args, **kwargs))
return callAll
def __iter__(self):
"""Iterate over all drones in the swarm.
```python
for tello in swarm:
print(tello.get_battery())
```
"""
return iter(self.tellos)
def __len__(self):
"""Return the amount of tellos in the swarm
```python
print("Tello count: {}".format(len(swarm)))
```
"""
return len(self.tellos)

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save