class DroneΒΆ

This class allows the usage of the MAVSDK library to control a drone. It defines a Drone class with various methods for connecting to, controlling, and monitoring the drone.

Attributes:
  • sys_addr (int): System address (drone)

  • sys_port (int): System port (drone)

  • neighbours (list): A list of neighbouring drones.

Methods:
  • connect(): Connects to the Drone instance.

  • add_neighbour(): Adds a new neighbour drone.

  • test_neighbour_altitude(): Prints the altitude of a neighbour drone.

  • altitude(): Gets the altitude of the drone.

  • arm(): Arms the drone.

  • takeoff(): Takes off the drone.

  • land(): Lands the drone.

  • takeoff_and_land(): Takes off and lands the drone.

Example:

>>> from utils.drone import Drone
>>>
>>> sys_addr = 192.168.1.1
>>>
>>> drone = Drone(sys_addr)
>>>
>>> await drone.connect()
>>>
>>> # Add a neighbour drone
>>> neigh_addr = 192.168.1.2
>>> await drone.add_neighbour(neigh_addr)
>>>
>>> # Test the neighbour drone's altitude
>>> await drone.test_neighbour_altitude()
>>>
>>> # Get the altitude of the drone
>>> print(drone.altitude())
>>>
>>> # Arm the drone
>>> await drone.arm()
>>>
>>> # Take off the drone
>>> await drone.takeoff()
>>>
>>> # Land the drone
>>> await drone.land()
>>>
>>> # Take off and land the drone
>>> await drone.takeoff_and_land()
from loguru import logger
from mavsdk import System
import asyncio
import random

class Drone:
    def __init__(self,  sys_addr) -> None:
        self.sys_addr = sys_addr
        self.sys_port = random.randint(1000, 65535)
        self.__drone = System(port= self.sys_port)
        self.neighbours = []
        self.__drone.component_information.float_param

    async def connect(self) -> None:
        """
        Connect to the Drone instance
        """
        await self.__drone.connect(system_address=f"udp://:{self.sys_addr}")

        status_text_task = asyncio.ensure_future(print_status_text(self.__drone))

        logger.debug("Waiting for drone to connect...")
        async for state in self.__drone.core.connection_state():
            if state.is_connected:
                logger.debug(f"Connected sys_addr{self.sys_addr}, sys_port{self.sys_port}!")
                break

        logger.debug("Waiting for drone to have a global position estimate...")
        async for health in self.__drone.telemetry.health():
            if health.is_global_position_ok and health.is_home_position_ok:
                logger.debug("Global position estimate OK")
                break

        status_text_task.cancel()

    def altitude(self) -> int:
        """
        Drone altitude
        """
        pos = self.__drone.telemetry.position()
        return pos.absolute_altitude_m

    async def arm(self):
        """
        Drone Arming
        """
        logger.debug("Arming")
        await self.__drone.action.arm()

    async def takeoff(self):
        """
        Drone Takeoff
        """
        logger.debug("Taking off")
        await self.__drone.action.takeoff()

    async def land(self):
        """
        Drone Land
        """
        logger.debug("Landing")
        await self.__drone.action.land()

    async def takeoff_and_land(self):
        """
        Drone Takeoff and Land
        """
        await self.arm()
        await self.takeoff()
        await asyncio.sleep(10)
        await self.land()