From 7e3419bf5d4842434e9ebadd2ce27e81f1be431c Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 11 Sep 2024 17:28:18 +0200 Subject: [PATCH] Add dispatch runner Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 +- src/frequenz/dispatch/__init__.py | 5 + src/frequenz/dispatch/_actor_runner.py | 187 +++++++++++++++++++++++++ tests/test_runner.py | 171 ++++++++++++++++++++++ 4 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 src/frequenz/dispatch/_actor_runner.py create mode 100644 tests/test_runner.py diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 56c7e98..b1085c4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches. ## Bug Fixes diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 16df1b5..23d765e 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -7,12 +7,15 @@ * [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. * [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. +* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to + manage other actors based on incoming dispatches. * [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. """ +from ._actor_runner import DispatchConfigurationEvent, DispatchRunnerActor from ._dispatch import Dispatch, RunningState from ._dispatcher import Dispatcher, ReceiverFetcher from ._event import Created, Deleted, DispatchEvent, Updated @@ -26,4 +29,6 @@ "Updated", "Dispatch", "RunningState", + "DispatchRunnerActor", + "DispatchConfigurationEvent", ] diff --git a/src/frequenz/dispatch/_actor_runner.py b/src/frequenz/dispatch/_actor_runner.py new file mode 100644 index 0000000..ae8bf2b --- /dev/null +++ b/src/frequenz/dispatch/_actor_runner.py @@ -0,0 +1,187 @@ +# License: All rights reserved +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Helper class to manage actors based on dispatches.""" + +import logging +from dataclasses import dataclass +from typing import Any + +from frequenz.channels import Receiver, Sender +from frequenz.client.dispatch.types import ComponentSelector +from frequenz.sdk.actor import Actor + +from ._dispatch import Dispatch, RunningState + +_logger = logging.getLogger(__name__) + + +@dataclass(frozen=True, kw_only=True) +class DispatchConfigurationEvent: + """Event emitted when the dispatch configuration changes.""" + + components: ComponentSelector + """Components to be used.""" + + dry_run: bool + """Whether this is a dry run.""" + + payload: dict[str, Any] + """Additional payload.""" + + +class DispatchRunnerActor(Actor): + """Helper class to manage actors based on dispatches. + + Example usage: + + ```python + import os + import asyncio + from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchConfigurationEvent + from frequenz.client.dispatch.types import ComponentSelector + from frequenz.client.common.microgrid.components import ComponentCategory + + from frequenz.channels import Receiver, Broadcast + from unittest.mock import MagicMock + + class MyActor(Actor): + def __init__(self, config_channel: Receiver[DispatchConfigurationEvent]): + super().__init__() + self._config_channel = config_channel + self._dry_run: bool + self._payload: dict[str, Any] + + async def _run(self) -> None: + while True: + config = await self._config_channel.receive() + print("Received config:", config) + + self.set_components(config.components) + self._dry_run = config.dry_run + self._payload = config.payload + + def set_components(self, components: ComponentSelector) -> None: + match components: + case [int(), *_] as component_ids: + print("Dispatch: Setting components to %s", components) + case [ComponentCategory.BATTERY, *_]: + print("Dispatch: Using all battery components") + case _ as unsupported: + print( + "Dispatch: Requested an unsupported selector %r, " + "but only component IDs or category BATTERY are supported.", + unsupported, + ) + + async def run(): + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") + key = os.getenv("DISPATCH_API_KEY", "some-key") + + microgrid_id = 1 + + dispatcher = Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key + ) + + # Create config channel to receive (re-)configuration events pre-start and mid-run + config_channel = Broadcast[DispatchConfigurationEvent](name="config_channel") + + # Start actor and supporting actor, give each a config channel receiver + my_actor = MyActor(config_channel.new_receiver()) + supporting_actor = MagicMock(config_channel.new_receiver()) + + status_receiver = dispatcher.running_status_change.new_receiver() + + dispatch_handler = DispatchRunnerActor( + actors=frozenset([my_actor, supporting_actor]), + dispatch_type="EXAMPLE", + running_status_receiver=status_receiver, + configuration_sender=config_channel.new_sender(), + ) + + await asyncio.gather(dispatcher.start(), dispatch_handler.start()) + ``` + """ + + def __init__( + self, + actors: frozenset[Actor], + dispatch_type: str, + running_status_receiver: Receiver[Dispatch], + configuration_sender: Sender[DispatchConfigurationEvent] | None = None, + ) -> None: + """Initialize the dispatch handler. + + Args: + actors: The actors to handle. + dispatch_type: The type of dispatches to handle. + running_status_receiver: The receiver for dispatch running status changes. + configuration_sender: The sender for dispatch configuration events + """ + super().__init__() + self._dispatch_rx = running_status_receiver + self._actors = actors + self._dispatch_type = dispatch_type + self._configuration_sender = configuration_sender + + def _start_actors(self) -> None: + """Start all actors.""" + for actor in self._actors: + if actor.is_running: + _logger.warning("Actor %s is already running", actor.name) + else: + actor.start() + + async def _stop_actors(self, msg: str) -> None: + """Stop all actors. + + Args: + msg: The message to be passed to the actors being stopped. + """ + for actor in self._actors: + if actor.is_running: + await actor.stop(msg) + else: + _logger.warning("Actor %s is not running", actor.name) + + async def _run(self) -> None: + """Wait for dispatches and handle them.""" + while True: + _logger.info("Waiting for dispatch...") + dispatch = await self._dispatch_rx.receive() + await self._handle_dispatch(dispatch=dispatch) + + async def _handle_dispatch(self, dispatch: Dispatch) -> None: + """Handle a dispatch. + + Args: + dispatch: The dispatch to handle. + + Returns: + The running state. + """ + running = dispatch.running(self._dispatch_type) + match running: + case RunningState.STOPPED: + _logger.info("Stopping dispatch...") + await self._stop_actors("Dispatch stopped") + case RunningState.RUNNING: + if self._configuration_sender is not None: + _logger.info("Updating configuration...") + await self._configuration_sender.send( + DispatchConfigurationEvent( + components=dispatch.selector, + dry_run=dispatch.dry_run, + payload=dispatch.payload, + ) + ) + + _logger.info("Running dispatch...") + self._start_actors() + case RunningState.DIFFERENT_TYPE: + _logger.debug( + "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type + ) diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..5cf1be5 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,171 @@ +# LICENSE: ALL RIGHTS RESERVED +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Test the dispatch runner.""" + +import asyncio +from dataclasses import dataclass, replace +from datetime import datetime, timedelta, timezone +from typing import AsyncIterator, Iterator + +import async_solipsism +import pytest +import time_machine +from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.dispatch.test.generator import DispatchGenerator +from frequenz.client.dispatch.types import Frequency +from frequenz.sdk.actor import Actor +from pytest import fixture + +from frequenz.dispatch import Dispatch, DispatchConfigurationEvent, DispatchRunnerActor + +# pylint: disable=protected-access + + +# This method replaces the event loop for all tests in the file. +@pytest.fixture +def event_loop_policy() -> async_solipsism.EventLoopPolicy: + """Return an event loop policy that uses the async solipsism event loop.""" + return async_solipsism.EventLoopPolicy() + + +@fixture +def fake_time() -> Iterator[time_machine.Coordinates]: + """Replace real time with a time machine that doesn't automatically tick.""" + # destination can be a datetime or a timestamp (int), so are moving to the + # epoch (in UTC!) + with time_machine.travel(destination=0, tick=False) as traveller: + yield traveller + + +def _now() -> datetime: + """Return the current time in UTC.""" + return datetime.now(tz=timezone.utc) + + +class MockActor(Actor): + """Mock actor for testing.""" + + async def _run(self) -> None: + while True: + await asyncio.sleep(1) + + +@dataclass +class TestEnv: + """Test environment.""" + + actor: Actor + runner_actor: DispatchRunnerActor + running_status_sender: Sender[Dispatch] + configuration_receiver: Receiver[DispatchConfigurationEvent] + generator: DispatchGenerator = DispatchGenerator() + + +@fixture +async def test_env() -> AsyncIterator[TestEnv]: + """Create a test environment.""" + channel = Broadcast[Dispatch](name="dispatch ready test channel") + config_channel = Broadcast[DispatchConfigurationEvent]( + name="dispatch config test channel" + ) + + actor = MockActor() + + runner_actor = DispatchRunnerActor( + actors=frozenset([actor]), + dispatch_type="UNIT_TEST", + running_status_receiver=channel.new_receiver(), + configuration_sender=config_channel.new_sender(), + ) + + runner_actor.start() + + yield TestEnv( + actor=actor, + runner_actor=runner_actor, + running_status_sender=channel.new_sender(), + configuration_receiver=config_channel.new_receiver(), + ) + + await runner_actor.stop() + + +# Disable for _handle_dispatch access +# pylint: disable=protected-access + + +async def test_simple_start_stop( + test_env: TestEnv, + fake_time: time_machine.Coordinates, +) -> None: + """Test behavior when receiving start/stop messages.""" + now = _now() + duration = timedelta(minutes=10) + dispatch = test_env.generator.generate_dispatch() + dispatch = replace( + dispatch, + active=True, + dry_run=False, + duration=duration, + start_time=now, + payload={"test": True}, + type="UNIT_TEST", + recurrence=replace( + dispatch.recurrence, + frequency=Frequency.UNSPECIFIED, + ), + ) + + await test_env.running_status_sender.send(Dispatch(dispatch)) + fake_time.shift(timedelta(seconds=1)) + + event = await test_env.configuration_receiver.receive() + assert event.payload == {"test": True} + assert event.components == dispatch.selector + assert event.dry_run is False + + assert test_env.actor.is_running is True + + fake_time.shift(duration) + await test_env.running_status_sender.send(Dispatch(dispatch)) + + # Give await actor.stop a chance to run in DispatchRunnerActor + await asyncio.sleep(0.1) + + assert test_env.actor.is_running is False + + +async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None: + """Test the dry run mode.""" + dispatch = test_env.generator.generate_dispatch() + dispatch = replace( + dispatch, + dry_run=True, + active=True, + start_time=_now(), + duration=timedelta(minutes=10), + type="UNIT_TEST", + recurrence=replace( + dispatch.recurrence, + frequency=Frequency.UNSPECIFIED, + ), + ) + + await test_env.running_status_sender.send(Dispatch(dispatch)) + fake_time.shift(timedelta(seconds=1)) + + event = await test_env.configuration_receiver.receive() + + assert event.dry_run is dispatch.dry_run + assert event.components == dispatch.selector + assert event.payload == dispatch.payload + assert test_env.actor.is_running is True + + fake_time.shift(dispatch.duration) + await test_env.running_status_sender.send(Dispatch(dispatch)) + + # Give await actor.stop a chance to run in DispatchRunnerActor + await asyncio.sleep(0.1) + + assert test_env.actor.is_running is False