Skip to content

Commit

Permalink
DispatchManagingActor: Support starting/stopping of multiple actors
Browse files Browse the repository at this point in the history
Signed-off-by: Mathias L. Baumann <[email protected]>
  • Loading branch information
Marenz committed Sep 24, 2024
1 parent 1088918 commit 74f20d4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
44 changes: 23 additions & 21 deletions src/frequenz/dispatch/_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,55 +93,57 @@ async def run():
status_receiver = dispatcher.running_status_change.new_receiver()
dispatch_runner = DispatchManagingActor(
actor=my_actor,
managing_actor = DispatchManagingActor(
actors=frozenset([my_actor]),
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
updates_sender=dispatch_updates_channel.new_sender(),
)
await asyncio.gather(dispatcher.start(), dispatch_runner.start())
await asyncio.gather(dispatcher.start(), managing_actor.start())
```
"""

def __init__(
self,
actor: Actor,
actors: frozenset[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
) -> None:
"""Initialize the dispatch handler.
Args:
actor: The actor to manage.
actors: The actors to manage.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
updates_sender: The sender for dispatch events
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actor = actor
self._actors = actors
self._dispatch_type = dispatch_type
self._updates_sender = updates_sender

def _start_actor(self) -> None:
"""Start the actor."""
if self._actor.is_running:
_logger.warning("Actor %s is already running", self._actor.name)
else:
self._actor.start()
def _start_actors(self) -> None:
"""Start all actors."""
for actor in self._actors:
if actor.is_running:
_logger.warning("Actor %s is already running", actor.name)
else:
actor.start()

async def _stop_actor(self, msg: str) -> None:
"""Stop the actor.
async def _stop_actors(self, msg: str) -> None:
"""Stop all actors.
Args:
msg: The message to be passed to the actor being stopped.
msg: The message to be passed to the actors being stopped.
"""
if self._actor.is_running:
await self._actor.stop(msg)
else:
_logger.warning("Actor %s is not running", self._actor.name)
for actor in self._actors:
if actor.is_running:
await actor.stop(msg)
else:
_logger.warning("Actor %s is not running", actor.name)

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
Expand All @@ -158,7 +160,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
match running:
case RunningState.STOPPED:
_logger.info("Stopped by dispatch %s", dispatch.id)
await self._stop_actor("Dispatch stopped")
await self._stop_actors("Dispatch stopped")
case RunningState.RUNNING:
if self._updates_sender is not None:
_logger.info("Updated by dispatch %s", dispatch.id)
Expand All @@ -171,7 +173,7 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
)

_logger.info("Started by dispatch %s", dispatch.id)
self._start_actor()
self._start_actors()
case RunningState.DIFFERENT_TYPE:
_logger.debug(
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
Expand Down
2 changes: 1 addition & 1 deletion tests/test_mananging_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def test_env() -> AsyncIterator[TestEnv]:
actor = MockActor()

runner_actor = DispatchManagingActor(
actor=actor,
actors=frozenset([actor]),
dispatch_type="UNIT_TEST",
running_status_receiver=channel.new_receiver(),
updates_sender=updates_channel.new_sender(),
Expand Down

0 comments on commit 74f20d4

Please sign in to comment.