diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 56c7e98..b89a475 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +* We now provide the `DispatchManagingActor` class, a class to manage actors based on incoming dispatches. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 4db9107..eeb56dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ dependencies = [ # plugins.mkdocstrings.handlers.python.import) "frequenz-sdk == 1.0.0-rc900", # "frequenz-channels >= 1.1.0, < 2.0.0", - "frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@refs/pull/323/head", + "frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@v1.x.x", # "frequenz-client-dispatch >= 0.6.0, < 0.7.0", "frequenz-client-dispatch @ git+https://github.com/frequenz-floss/frequenz-client-dispatch-python.git@refs/pull/87/head", ] diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 16df1b5..037665c 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -7,6 +7,8 @@ * [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. * [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. +* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to + manage other actors based on incoming dispatches. * [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. @@ -16,6 +18,7 @@ from ._dispatch import Dispatch, RunningState from ._dispatcher import Dispatcher, ReceiverFetcher from ._event import Created, Deleted, DispatchEvent, Updated +from ._managing_actor import DispatchManagingActor, DispatchUpdate __all__ = [ "Created", @@ -26,4 +29,6 @@ "Updated", "Dispatch", "RunningState", + "DispatchManagingActor", + "DispatchUpdate", ] diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py new file mode 100644 index 0000000..aadd3bb --- /dev/null +++ b/src/frequenz/dispatch/_managing_actor.py @@ -0,0 +1,178 @@ +# License: All rights reserved +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Helper class to manage actors based on dispatches.""" + +import logging +from dataclasses import dataclass +from typing import Any + +from frequenz.channels import Receiver, Sender +from frequenz.client.dispatch.types import ComponentSelector +from frequenz.sdk.actor import Actor + +from ._dispatch import Dispatch, RunningState + +_logger = logging.getLogger(__name__) + + +@dataclass(frozen=True, kw_only=True) +class DispatchUpdate: + """Event emitted when the dispatch changes.""" + + components: ComponentSelector + """Components to be used.""" + + dry_run: bool + """Whether this is a dry run.""" + + options: dict[str, Any] + """Additional options.""" + + +class DispatchManagingActor(Actor): + """Helper class to manage actors based on dispatches. + + Example usage: + + ```python + import os + import asyncio + from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate + from frequenz.client.dispatch.types import ComponentSelector + from frequenz.client.common.microgrid.components import ComponentCategory + + from frequenz.channels import Receiver, Broadcast + + class MyActor(Actor): + def __init__(self, updates_channel: Receiver[DispatchUpdate]): + super().__init__() + self._updates_channel = updates_channel + self._dry_run: bool + self._options : dict[str, Any] + + async def _run(self) -> None: + while True: + update = await self._updates_channel.receive() + print("Received update:", update) + + self.set_components(update.components) + self._dry_run = update.dry_run + self._options = update.options + + def set_components(self, components: ComponentSelector) -> None: + match components: + case [int(), *_] as component_ids: + print("Dispatch: Setting components to %s", components) + case [ComponentCategory.BATTERY, *_]: + print("Dispatch: Using all battery components") + case unsupported: + print( + "Dispatch: Requested an unsupported selector %r, " + "but only component IDs or category BATTERY are supported.", + unsupported, + ) + + async def run(): + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") + key = os.getenv("DISPATCH_API_KEY", "some-key") + + microgrid_id = 1 + + dispatcher = Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key + ) + + # Create update channel to receive dispatch update events pre-start and mid-run + dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel") + + # Start actor and supporting actor, give each a config channel receiver + my_actor = MyActor(dispatch_updates_channel.new_receiver()) + + status_receiver = dispatcher.running_status_change.new_receiver() + + dispatch_runner = DispatchManagingActor( + actor=my_actor, + dispatch_type="EXAMPLE", + running_status_receiver=status_receiver, + updates_sender=dispatch_updates_channel.new_sender(), + ) + + await asyncio.gather(dispatcher.start(), dispatch_runner.start()) + ``` + """ + + def __init__( + self, + actor: Actor, + dispatch_type: str, + running_status_receiver: Receiver[Dispatch], + updates_sender: Sender[DispatchUpdate] | None = None, + ) -> None: + """Initialize the dispatch handler. + + Args: + actor: The actor to manage. + dispatch_type: The type of dispatches to handle. + running_status_receiver: The receiver for dispatch running status changes. + updates_sender: The sender for dispatch events + """ + super().__init__() + self._dispatch_rx = running_status_receiver + self._actor = actor + self._dispatch_type = dispatch_type + self._updates_sender = updates_sender + + def _start_actor(self) -> None: + """Start the actor.""" + if self._actor.is_running: + _logger.warning("Actor %s is already running", self._actor.name) + else: + self._actor.start() + + async def _stop_actor(self, msg: str) -> None: + """Stop the actor. + + Args: + msg: The message to be passed to the actor being stopped. + """ + if self._actor.is_running: + await self._actor.stop(msg) + else: + _logger.warning("Actor %s is not running", self._actor.name) + + async def _run(self) -> None: + """Wait for dispatches and handle them.""" + async for dispatch in self._dispatch_rx: + await self._handle_dispatch(dispatch=dispatch) + + async def _handle_dispatch(self, dispatch: Dispatch) -> None: + """Handle a dispatch. + + Args: + dispatch: The dispatch to handle. + """ + running = dispatch.running(self._dispatch_type) + match running: + case RunningState.STOPPED: + _logger.info("Stopped by dispatch %s", dispatch.id) + await self._stop_actor("Dispatch stopped") + case RunningState.RUNNING: + if self._updates_sender is not None: + _logger.info("Updated by dispatch %s", dispatch.id) + await self._updates_sender.send( + DispatchUpdate( + components=dispatch.selector, + dry_run=dispatch.dry_run, + options=dispatch.payload, + ) + ) + + _logger.info("Started by dispatch %s", dispatch.id) + self._start_actor() + case RunningState.DIFFERENT_TYPE: + _logger.debug( + "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type + ) diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py new file mode 100644 index 0000000..3702a29 --- /dev/null +++ b/tests/test_mananging_actor.py @@ -0,0 +1,164 @@ +# LICENSE: ALL RIGHTS RESERVED +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Test the dispatch runner.""" + +import asyncio +from dataclasses import dataclass, replace +from datetime import datetime, timedelta, timezone +from typing import AsyncIterator, Iterator + +import async_solipsism +import pytest +import time_machine +from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.dispatch.test.generator import DispatchGenerator +from frequenz.client.dispatch.types import Frequency +from frequenz.sdk.actor import Actor +from pytest import fixture + +from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate + + +# This method replaces the event loop for all tests in the file. +@pytest.fixture +def event_loop_policy() -> async_solipsism.EventLoopPolicy: + """Return an event loop policy that uses the async solipsism event loop.""" + return async_solipsism.EventLoopPolicy() + + +@fixture +def fake_time() -> Iterator[time_machine.Coordinates]: + """Replace real time with a time machine that doesn't automatically tick.""" + # destination can be a datetime or a timestamp (int), so are moving to the + # epoch (in UTC!) + with time_machine.travel(destination=0, tick=False) as traveller: + yield traveller + + +def _now() -> datetime: + """Return the current time in UTC.""" + return datetime.now(tz=timezone.utc) + + +class MockActor(Actor): + """Mock actor for testing.""" + + async def _run(self) -> None: + while True: + await asyncio.sleep(1) + + +@dataclass +class TestEnv: + """Test environment.""" + + actor: Actor + runner_actor: DispatchManagingActor + running_status_sender: Sender[Dispatch] + configuration_receiver: Receiver[DispatchUpdate] + generator: DispatchGenerator = DispatchGenerator() + + +@fixture +async def test_env() -> AsyncIterator[TestEnv]: + """Create a test environment.""" + channel = Broadcast[Dispatch](name="dispatch ready test channel") + config_channel = Broadcast[DispatchUpdate](name="dispatch config test channel") + + actor = MockActor() + + runner_actor = DispatchManagingActor( + actor=actor, + dispatch_type="UNIT_TEST", + running_status_receiver=channel.new_receiver(), + updates_sender=config_channel.new_sender(), + ) + + runner_actor.start() + + yield TestEnv( + actor=actor, + runner_actor=runner_actor, + running_status_sender=channel.new_sender(), + configuration_receiver=config_channel.new_receiver(), + ) + + await runner_actor.stop() + + +async def test_simple_start_stop( + test_env: TestEnv, + fake_time: time_machine.Coordinates, +) -> None: + """Test behavior when receiving start/stop messages.""" + now = _now() + duration = timedelta(minutes=10) + dispatch = test_env.generator.generate_dispatch() + dispatch = replace( + dispatch, + active=True, + dry_run=False, + duration=duration, + start_time=now, + payload={"test": True}, + type="UNIT_TEST", + recurrence=replace( + dispatch.recurrence, + frequency=Frequency.UNSPECIFIED, + ), + ) + + await test_env.running_status_sender.send(Dispatch(dispatch)) + fake_time.shift(timedelta(seconds=1)) + + event = await test_env.configuration_receiver.receive() + assert event.options == {"test": True} + assert event.components == dispatch.selector + assert event.dry_run is False + + assert test_env.actor.is_running is True + + fake_time.shift(duration) + await test_env.running_status_sender.send(Dispatch(dispatch)) + + # Give await actor.stop a chance to run in DispatchManagingActor + await asyncio.sleep(0.1) + + assert test_env.actor.is_running is False + + +async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None: + """Test the dry run mode.""" + dispatch = test_env.generator.generate_dispatch() + dispatch = replace( + dispatch, + dry_run=True, + active=True, + start_time=_now(), + duration=timedelta(minutes=10), + type="UNIT_TEST", + recurrence=replace( + dispatch.recurrence, + frequency=Frequency.UNSPECIFIED, + ), + ) + + await test_env.running_status_sender.send(Dispatch(dispatch)) + fake_time.shift(timedelta(seconds=1)) + + event = await test_env.configuration_receiver.receive() + + assert event.dry_run is dispatch.dry_run + assert event.components == dispatch.selector + assert event.options == dispatch.payload + assert test_env.actor.is_running is True + + assert dispatch.duration is not None + fake_time.shift(dispatch.duration) + await test_env.running_status_sender.send(Dispatch(dispatch)) + + # Give await actor.stop a chance to run in DispatchManagingActor + await asyncio.sleep(0.1) + + assert test_env.actor.is_running is False