Skip to content

Commit

Permalink
Add dispatch runner
Browse files Browse the repository at this point in the history
Signed-off-by: Mathias L. Baumann <[email protected]>
  • Loading branch information
Marenz committed Sep 11, 2024
1 parent 46d5b17 commit b3bdfdc
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 1 deletion.
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches.

## Bug Fixes

Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to
manage other actors based on incoming dispatches.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
"""

from ._actor_runner import DispatchConfigurationEvent, DispatchRunnerActor
from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated
Expand All @@ -26,4 +29,6 @@
"Updated",
"Dispatch",
"RunningState",
"DispatchRunnerActor",
"DispatchConfigurationEvent",
]
164 changes: 164 additions & 0 deletions src/frequenz/dispatch/_actor_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# License: All rights reserved
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Helper class to manage actors based on dispatches."""

import logging
from dataclasses import dataclass
from typing import Any

from frequenz.channels import Receiver, Sender
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.sdk.actor import Actor

from ._dispatch import Dispatch, RunningState

_logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class DispatchConfigurationEvent:
"""Event emitted when the dispatch configuration changes."""

components: ComponentSelector
"""Components to be used."""

dry_run: bool
"""Whether this is a dry run."""

payload: dict[str, Any]
"""Additional payload."""


class DispatchRunnerActor(Actor):
"""Helper class to manage actors based on dispatches.
Example usage:
```python
import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchConfigurationEvent
from frequenz.channels import Receiver, Broadcast
from unittest.mock import MagicMock
class MyActor(Actor):
def __init__(self, config_channel: Receiver[DispatchConfigurationEvent]):
super().__init__()
self._config_channel = config_channel
async def _run(self) -> None:
while True:
config = await self._config_channel.receive()
print("Received config:", config)
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
# Create config channel to receive (re-)configuration events pre-start and mid-run
config_channel = Broadcast[DispatchConfigurationEvent](name="config_channel")
# Start actor and supporting actor, give each a config channel receiver
my_actor = MyActor(config_channel.new_receiver())
supporting_actor = MagicMock(config_channel.new_receiver())
status_receiver = dispatcher.running_status_change.new_receiver()
dispatch_handler = DispatchRunnerActor(
actors=[my_actor, supporting_actor],
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
configuration_sender=config_channel.new_sender(),
)
await asyncio.gather(dispatcher.start(), dispatch_handler.start())
```
"""

def __init__(
self,
actors: frozenset[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
configuration_sender: Sender[DispatchConfigurationEvent] | None = None,
) -> None:
"""Initialize the dispatch handler.
Args:
actors: The actors to handle.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
configuration_sender: The sender for dispatch configuration events
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actors = actors
self._dispatch_type = dispatch_type
self._configuration_sender = configuration_sender

def _start_actors(self) -> None:
"""Start all actors."""
for actor in self._actors:
if actor.is_running:
_logger.warning("Actor %s is already running", actor.name)
else:
actor.start()

async def _stop_actors(self, msg: str) -> None:
"""Stop all actors.
Args:
msg: The message to be passed to the actors being stopped.
"""
for actor in self._actors:
if actor.is_running:
await actor.stop(msg)
else:
_logger.warning("Actor %s is not running", actor.name)

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
while True:
_logger.info("Waiting for dispatch...")
dispatch = await self._dispatch_rx.receive()
await self._handle_dispatch(dispatch=dispatch)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.
Args:
dispatch: The dispatch to handle.
Returns:
The running state.
"""
running = dispatch.running(self._dispatch_type)
match running:
case RunningState.STOPPED:
_logger.info("Stopping dispatch...")
await self._stop_actors("Dispatch stopped")
case RunningState.RUNNING:
if self._configuration_sender is not None:
_logger.info("Updating configuration...")
await self._configuration_sender.send(
DispatchConfigurationEvent(
components=dispatch.selector,
dry_run=dispatch.dry_run,
payload=dispatch.payload,
)
)

_logger.info("Running dispatch...")
self._start_actors()
case RunningState.DIFFERENT_TYPE:
_logger.debug(
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
)

0 comments on commit b3bdfdc

Please sign in to comment.