- class DroneΒΆ
This class allows the usage of the MAVSDK library to control a drone. It defines a Drone class with various methods for connecting to, controlling, and monitoring the drone.
- Attributes:
sys_addr (int): System address (drone)
sys_port (int): System port (drone)
neighbours (list): A list of neighbouring drones.
- Methods:
connect(): Connects to the Drone instance.
add_neighbour(): Adds a new neighbour drone.
test_neighbour_altitude(): Prints the altitude of a neighbour drone.
altitude(): Gets the altitude of the drone.
arm(): Arms the drone.
takeoff(): Takes off the drone.
land(): Lands the drone.
takeoff_and_land(): Takes off and lands the drone.
Example:
>>> from utils.drone import Drone
>>>
>>> sys_addr = 192.168.1.1
>>>
>>> drone = Drone(sys_addr)
>>>
>>> await drone.connect()
>>>
>>> # Add a neighbour drone
>>> neigh_addr = 192.168.1.2
>>> await drone.add_neighbour(neigh_addr)
>>>
>>> # Test the neighbour drone's altitude
>>> await drone.test_neighbour_altitude()
>>>
>>> # Get the altitude of the drone
>>> print(drone.altitude())
>>>
>>> # Arm the drone
>>> await drone.arm()
>>>
>>> # Take off the drone
>>> await drone.takeoff()
>>>
>>> # Land the drone
>>> await drone.land()
>>>
>>> # Take off and land the drone
>>> await drone.takeoff_and_land()
from loguru import logger
from mavsdk import System
import asyncio
import random
class Drone:
def __init__(self, sys_addr) -> None:
self.sys_addr = sys_addr
self.sys_port = random.randint(1000, 65535)
self.__drone = System(port= self.sys_port)
self.neighbours = []
self.__drone.component_information.float_param
async def connect(self) -> None:
"""
Connect to the Drone instance
"""
await self.__drone.connect(system_address=f"udp://:{self.sys_addr}")
status_text_task = asyncio.ensure_future(print_status_text(self.__drone))
logger.debug("Waiting for drone to connect...")
async for state in self.__drone.core.connection_state():
if state.is_connected:
logger.debug(f"Connected sys_addr{self.sys_addr}, sys_port{self.sys_port}!")
break
logger.debug("Waiting for drone to have a global position estimate...")
async for health in self.__drone.telemetry.health():
if health.is_global_position_ok and health.is_home_position_ok:
logger.debug("Global position estimate OK")
break
status_text_task.cancel()
def altitude(self) -> int:
"""
Drone altitude
"""
pos = self.__drone.telemetry.position()
return pos.absolute_altitude_m
async def arm(self):
"""
Drone Arming
"""
logger.debug("Arming")
await self.__drone.action.arm()
async def takeoff(self):
"""
Drone Takeoff
"""
logger.debug("Taking off")
await self.__drone.action.takeoff()
async def land(self):
"""
Drone Land
"""
logger.debug("Landing")
await self.__drone.action.land()
async def takeoff_and_land(self):
"""
Drone Takeoff and Land
"""
await self.arm()
await self.takeoff()
await asyncio.sleep(10)
await self.land()