class SwarmΒΆ

This script defines a Swarm class that manages a swarm of drones. It provides methods for connecting to drones, taking off, landing, retrieving positions, setting positions, and managing a leader drone.

Args:
  • drones_number (int): number of drones composing the swarm

  • drones_addrs (List[int], optional): drone addresses. Defaults to None.

from loguru import logger
from mavsdk import System
from typing import List
from utils.systemwrapper import SystemWrapper
from models.droneposition import DronePosition

class Swarm:

    next_drone_address = 14540

    def __init__(self,
                drones_number:int,
                drones_addrs:List[int]=None) -> None:
        self.__drones_number = drones_number
        self.__positions = []
        self.__drones:List[System] = []

        if drones_addrs == None:
            self.drones_addrs = []
            for i in range(drones_number):
                self.drones_addrs.append(Swarm.next_drone_address)
                Swarm.next_drone_address += 1
        elif drones_number != len(drones_addrs):
            raise ValueError
        else:
            self.drones_addrs = drones_addrs

        logger.info(f"Creating swarm with {self.__drones_number} drones at {self.drones_addrs}")


    async def connect(self):
        """
        Connects to every drone of the swarm simultaneously
        """
        logger.info("Connecting to drones...")
        for a in self.drones_addrs:
            logger.info(f"Connecting to drone@{a}...")
            sysW = SystemWrapper(a)
            drone = await sysW.connect()
            logger.info(f"Connection to drone@{a} completed")
            self.__drones.append(drone)


    async def check_system_connections(self) -> bool:
        """
        Check if all the drones [System] are connected to the base station
        """
        for system in self.__drones:
            async for state in system.core.connection_state():
                if state.is_connected:
                    logger.debug("Drone is connected.")
                    break
                else:
                    logger.debug("Drone is not connected.")
                    return False
        return True

    async def takeoff(self):
        """
        Sends `takeoff` command to each drone of the swarm.
        """

        # assert that all the drones are connected
        ready_for_takeoff = 0
        while not ready_for_takeoff:
            ready_for_takeoff = await self.check_system_connections()

        logger.info("Taking off...")
        for d in self.__drones:
            await d.action.arm()
            await d.action.takeoff()
            await d.action.set_maximum_speed(10)
        logger.info("Takeoff completed")

    async def land(self):
        """
        Sends `land` command to each drone of the swarm.
        """
        logger.info("Landing...")
        for d in self.__drones:
            await d.action.land()
        logger.info("Landing completed")

    @property
    async def positions(self) -> List[DronePosition]:
        """
        Retrieves drones positions

        Returns:
            List[DronePosition]: Current position of each drone
        """
        self.__positions = []
        for d in self.__drones:
            p = await anext(d.telemetry.position())
            pos = DronePosition.from_mavsdk_position(p)
            self.__positions.append(pos)

        return self.__positions

    async def set_position(self, index, target_position:DronePosition):
        """
        Sets a new position (`target_position`) for the drone identified by its index
        """
        try:
            prev_pos = self.__positions[index]
            drone = self.__drones[index]
        except IndexError:
            return

        logger.info(f"Moving drone@{self.drones_addrs[index]}")
        await drone.action.goto_location(*target_position.to_goto_location(prev_pos))

            async def set_positions(self, target_positions:List[DronePosition]):
                """
                Sets new positions (`target_position`) for each drone

                Args:
                    target_positions (List[DronePosition]): List of target positions
                """
                prev_pos = await self.positions
                print(prev_pos)
                for n, d in enumerate(self.__drones):
                    pos = target_positions[n]
                    logger.info(f"Moving drone@{self.drones_addrs[n]} to {pos}")
                    await d.action.goto_location(*pos.to_goto_location(prev_pos[n]))

            def get_leader(self) -> System:
                """
                Get the first drone of the swarm, which will be called "Leader"
                """
                return self.__drones[0]

            def get_drones(self) -> List[System]:
                """
                Get the list of all the drones in the swarm
                """
                return self.__drones