diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index c722346..6ae32d7 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -93,20 +93,20 @@ async def run(): status_receiver = dispatcher.running_status_change.new_receiver() - dispatch_runner = DispatchManagingActor( - actor=my_actor, + managing_actor = DispatchManagingActor( + actors=frozenset([my_actor]), dispatch_type="EXAMPLE", running_status_receiver=status_receiver, updates_sender=dispatch_updates_channel.new_sender(), ) - await asyncio.gather(dispatcher.start(), dispatch_runner.start()) + await asyncio.gather(dispatcher.start(), managing_actor.start()) ``` """ def __init__( self, - actor: Actor, + actors: frozenset[Actor], dispatch_type: str, running_status_receiver: Receiver[Dispatch], updates_sender: Sender[DispatchUpdate] | None = None, @@ -114,34 +114,36 @@ def __init__( """Initialize the dispatch handler. Args: - actor: The actor to manage. + actors: The actors to manage. dispatch_type: The type of dispatches to handle. running_status_receiver: The receiver for dispatch running status changes. updates_sender: The sender for dispatch events """ super().__init__() self._dispatch_rx = running_status_receiver - self._actor = actor + self._actors = actors self._dispatch_type = dispatch_type self._updates_sender = updates_sender - def _start_actor(self) -> None: - """Start the actor.""" - if self._actor.is_running: - _logger.warning("Actor %s is already running", self._actor.name) - else: - self._actor.start() + def _start_actors(self) -> None: + """Start all actors.""" + for actor in self._actors: + if actor.is_running: + _logger.warning("Actor %s is already running", actor.name) + else: + actor.start() - async def _stop_actor(self, msg: str) -> None: - """Stop the actor. + async def _stop_actors(self, msg: str) -> None: + """Stop all actors. Args: - msg: The message to be passed to the actor being stopped. + msg: The message to be passed to the actors being stopped. """ - if self._actor.is_running: - await self._actor.stop(msg) - else: - _logger.warning("Actor %s is not running", self._actor.name) + for actor in self._actors: + if actor.is_running: + await actor.stop(msg) + else: + _logger.warning("Actor %s is not running", actor.name) async def _run(self) -> None: """Wait for dispatches and handle them.""" @@ -158,7 +160,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: match running: case RunningState.STOPPED: _logger.info("Stopped by dispatch %s", dispatch.id) - await self._stop_actor("Dispatch stopped") + await self._stop_actors("Dispatch stopped") case RunningState.RUNNING: if self._updates_sender is not None: _logger.info("Updated by dispatch %s", dispatch.id) @@ -171,7 +173,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: ) _logger.info("Started by dispatch %s", dispatch.id) - self._start_actor() + self._start_actors() case RunningState.DIFFERENT_TYPE: _logger.debug( "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index bab8361..23dfd84 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -69,7 +69,7 @@ async def test_env() -> AsyncIterator[TestEnv]: actor = MockActor() runner_actor = DispatchManagingActor( - actor=actor, + actors=frozenset([actor]), dispatch_type="UNIT_TEST", running_status_receiver=channel.new_receiver(), updates_sender=updates_channel.new_sender(),