diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bf30540..dd74f73 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,7 @@ ## Summary - +This release introduces a more flexible and powerful mechanism for managing dispatch events with new strategies for merging intervals, enhanced customization options, and better overall alignment with evolving SDK dependencies. It also simplifies actor initialization while maintaining robust support for diverse dispatch scenarios. ## Upgrading @@ -10,13 +10,13 @@ * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. * The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. +* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`. + * It's interface has been simplified and now only requires an actor factory and a running status receiver. + * It only supports a single actor at a time now. + * Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information. +* `DispatchUpdate` was renamed to `DispatchInfo`. ## New Features -* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. - * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. -## Bug Fixes - - diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index b2d25d4..fe6a504 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -7,18 +7,18 @@ * [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. * [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. -* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to - manage other actors based on incoming dispatches. +* [ActorDispatcher][frequenz.dispatch.ActorDispatcher]: A service to manage other actors based on + incoming dispatches. * [Created][frequenz.dispatch.Created], [Updated][frequenz.dispatch.Updated], [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. """ +from ._actor_dispatcher import ActorDispatcher, DispatchInfo from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated -from ._managing_actor import DispatchManagingActor, DispatchUpdate __all__ = [ "Created", @@ -27,6 +27,6 @@ "Dispatcher", "Updated", "Dispatch", - "DispatchManagingActor", - "DispatchUpdate", + "ActorDispatcher", + "DispatchInfo", ] diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py new file mode 100644 index 0000000..6262f3a --- /dev/null +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -0,0 +1,216 @@ +# License: All rights reserved +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Helper class to manage actors based on dispatches.""" + +import asyncio +import logging +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any + +from frequenz.channels import Broadcast, Receiver +from frequenz.client.dispatch.types import TargetComponents +from frequenz.sdk.actor import Actor, BackgroundService + +from ._dispatch import Dispatch + +_logger = logging.getLogger(__name__) + + +@dataclass(frozen=True, kw_only=True) +class DispatchInfo: + """Event emitted when the dispatch changes.""" + + components: TargetComponents + """Components to be used.""" + + dry_run: bool + """Whether this is a dry run.""" + + options: dict[str, Any] + """Additional options.""" + + +class ActorDispatcher(BackgroundService): + """Helper class to manage actors based on dispatches. + + Example usage: + + ```python + import os + import asyncio + from typing import override + from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo + from frequenz.client.dispatch.types import TargetComponents + from frequenz.client.common.microgrid.components import ComponentCategory + from frequenz.channels import Receiver, Broadcast, select, selected_from + from frequenz.sdk.actor import Actor, run + + class MyActor(Actor): + def __init__( + self, + *, + name: str | None = None, + ) -> None: + super().__init__(name=name) + self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None + self._dry_run: bool = False + self._options: dict[str, Any] = {} + + @classmethod + def new_with_dispatch( + cls, + initial_dispatch: DispatchInfo, + dispatch_updates_receiver: Receiver[DispatchInfo], + *, + name: str | None = None, + ) -> "Self": + self = cls(name=name) + self._dispatch_updates_receiver = dispatch_updates_receiver + self._update_dispatch_information(initial_dispatch) + return self + + @override + async def _run(self) -> None: + other_recv: Receiver[Any] = ... + + if self._dispatch_updates_receiver is None: + async for msg in other_recv: + # do stuff + ... + else: + await self._run_with_dispatch(other_recv) + + async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None: + async for selected in select(self._dispatch_updates_receiver, other_recv): + if selected_from(selected, self._dispatch_updates_receiver): + self._update_dispatch_information(selected.message) + elif selected_from(selected, other_recv): + # do stuff + ... + else: + assert False, f"Unexpected selected receiver: {selected}" + + def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None: + print("Received update:", dispatch_update) + self._dry_run = dispatch_update.dry_run + self._options = dispatch_update.options + match dispatch_update.components: + case []: + print("Dispatch: Using all components") + case list() as ids if isinstance(ids[0], int): + component_ids = ids + case [ComponentCategory.BATTERY, *_]: + component_category = ComponentCategory.BATTERY + case unsupported: + print( + "Dispatch: Requested an unsupported selector %r, " + "but only component IDs or category BATTERY are supported.", + unsupported, + ) + + async def main(): + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") + key = os.getenv("DISPATCH_API_KEY", "some-key") + + microgrid_id = 1 + + dispatcher = Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key + ) + dispatcher.start() + + status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") + + managing_actor = DispatchManagingActor( + actor_factory=MyActor.new_with_dispatch, + running_status_receiver=status_receiver, + ) + + await run(managing_actor) + ``` + """ + + def __init__( + self, + actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], + running_status_receiver: Receiver[Dispatch], + ) -> None: + """Initialize the dispatch handler. + + Args: + actor_factory: A callable that creates an actor with some initial dispatch + information. + running_status_receiver: The receiver for dispatch running status changes. + """ + super().__init__() + self._dispatch_rx = running_status_receiver + self._actor_factory = actor_factory + self._actor: Actor | None = None + self._updates_channel = Broadcast[DispatchInfo]( + name="dispatch_updates_channel", resend_latest=True + ) + self._updates_sender = self._updates_channel.new_sender() + + def start(self) -> None: + """Start the background service.""" + self._tasks.add(asyncio.create_task(self._run())) + + async def _start_actor(self, dispatch: Dispatch) -> None: + """Start all actors.""" + dispatch_update = DispatchInfo( + components=dispatch.target, + dry_run=dispatch.dry_run, + options=dispatch.payload, + ) + + if self._actor: + sent_str = "" + if self._updates_sender is not None: + sent_str = ", sent a dispatch update instead of creating a new actor" + await self._updates_sender.send(dispatch_update) + _logger.warning( + "Actor for dispatch type %r is already running%s", + dispatch.type, + sent_str, + ) + else: + _logger.info("Starting actor for dispatch type %r", dispatch.type) + self._actor = self._actor_factory( + dispatch_update, self._updates_channel.new_receiver() + ) + self._actor.start() + + async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: + """Stop all actors. + + Args: + stopping_dispatch: The dispatch that is stopping the actor. + msg: The message to be passed to the actors being stopped. + """ + if self._actor is None: + _logger.warning( + "Actor for dispatch type %r is not running", stopping_dispatch.type + ) + else: + await self._actor.stop(msg) + self._actor = None + + async def _run(self) -> None: + """Wait for dispatches and handle them.""" + async for dispatch in self._dispatch_rx: + await self._handle_dispatch(dispatch=dispatch) + + async def _handle_dispatch(self, dispatch: Dispatch) -> None: + """Handle a dispatch. + + Args: + dispatch: The dispatch to handle. + """ + if dispatch.started: + await self._start_actor(dispatch) + else: + await self._stop_actor(dispatch, "Dispatch stopped") diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py deleted file mode 100644 index d158b12..0000000 --- a/src/frequenz/dispatch/_managing_actor.py +++ /dev/null @@ -1,174 +0,0 @@ -# License: All rights reserved -# Copyright © 2024 Frequenz Energy-as-a-Service GmbH - -"""Helper class to manage actors based on dispatches.""" - -import logging -from dataclasses import dataclass -from typing import Any, Set - -from frequenz.channels import Receiver, Sender -from frequenz.client.dispatch.types import TargetComponents -from frequenz.sdk.actor import Actor - -from ._dispatch import Dispatch - -_logger = logging.getLogger(__name__) - - -@dataclass(frozen=True, kw_only=True) -class DispatchUpdate: - """Event emitted when the dispatch changes.""" - - components: TargetComponents - """Components to be used.""" - - dry_run: bool - """Whether this is a dry run.""" - - options: dict[str, Any] - """Additional options.""" - - -class DispatchManagingActor(Actor): - """Helper class to manage actors based on dispatches. - - Example usage: - - ```python - import os - import asyncio - from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate - from frequenz.client.dispatch.types import TargetComponents - from frequenz.client.common.microgrid.components import ComponentCategory - - from frequenz.channels import Receiver, Broadcast - - class MyActor(Actor): - def __init__(self, updates_channel: Receiver[DispatchUpdate]): - super().__init__() - self._updates_channel = updates_channel - self._dry_run: bool - self._options : dict[str, Any] - - async def _run(self) -> None: - while True: - update = await self._updates_channel.receive() - print("Received update:", update) - - self.set_components(update.components) - self._dry_run = update.dry_run - self._options = update.options - - def set_components(self, components: TargetComponents) -> None: - match components: - case []: - print("Dispatch: Using all components") - case list() as ids if isinstance(ids[0], int): - component_ids = ids - case [ComponentCategory.BATTERY, *_]: - component_category = ComponentCategory.BATTERY - case unsupported: - print( - "Dispatch: Requested an unsupported selector %r, " - "but only component IDs or category BATTERY are supported.", - unsupported, - ) - - async def run(): - url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") - key = os.getenv("DISPATCH_API_KEY", "some-key") - - microgrid_id = 1 - - dispatcher = Dispatcher( - microgrid_id=microgrid_id, - server_url=url, - key=key - ) - - # Create update channel to receive dispatch update events pre-start and mid-run - dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel") - - # Start actor and give it an dispatch updates channel receiver - my_actor = MyActor(dispatch_updates_channel.new_receiver()) - - status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") - - managing_actor = DispatchManagingActor( - actor=my_actor, - running_status_receiver=status_receiver, - updates_sender=dispatch_updates_channel.new_sender(), - ) - - await asyncio.gather(dispatcher.start(), managing_actor.start()) - ``` - """ - - def __init__( - self, - actor: Actor | Set[Actor], - running_status_receiver: Receiver[Dispatch], - updates_sender: Sender[DispatchUpdate] | None = None, - ) -> None: - """Initialize the dispatch handler. - - Args: - actor: A set of actors or a single actor to manage. - running_status_receiver: The receiver for dispatch running status changes. - updates_sender: The sender for dispatch events - """ - super().__init__() - self._dispatch_rx = running_status_receiver - self._actors: frozenset[Actor] = frozenset( - [actor] if isinstance(actor, Actor) else actor - ) - self._updates_sender = updates_sender - - def _start_actors(self) -> None: - """Start all actors.""" - for actor in self._actors: - if actor.is_running: - _logger.warning("Actor %s is already running", actor.name) - else: - actor.start() - - async def _stop_actors(self, msg: str) -> None: - """Stop all actors. - - Args: - msg: The message to be passed to the actors being stopped. - """ - for actor in self._actors: - if actor.is_running: - await actor.stop(msg) - else: - _logger.warning("Actor %s is not running", actor.name) - - async def _run(self) -> None: - """Wait for dispatches and handle them.""" - async for dispatch in self._dispatch_rx: - await self._handle_dispatch(dispatch=dispatch) - - async def _handle_dispatch(self, dispatch: Dispatch) -> None: - """Handle a dispatch. - - Args: - dispatch: The dispatch to handle. - """ - if dispatch.started: - if self._updates_sender is not None: - _logger.info("Updated by dispatch %s", dispatch.id) - await self._updates_sender.send( - DispatchUpdate( - components=dispatch.target, - dry_run=dispatch.dry_run, - options=dispatch.payload, - ) - ) - - _logger.info("Started by dispatch %s", dispatch.id) - self._start_actors() - else: - _logger.info("Stopped by dispatch %s", dispatch.id) - await self._stop_actors("Dispatch stopped") diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 662f4ff..6463558 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -5,9 +5,10 @@ import asyncio import heapq +import logging from dataclasses import dataclass, replace from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterator +from typing import AsyncIterator, Iterator, cast import async_solipsism import time_machine @@ -17,7 +18,7 @@ from frequenz.sdk.actor import Actor from pytest import fixture -from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate +from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchInfo from frequenz.dispatch._bg_service import DispatchScheduler @@ -46,6 +47,15 @@ def _now() -> datetime: class MockActor(Actor): """Mock actor for testing.""" + def __init__( + self, initial_dispatch: DispatchInfo, receiver: Receiver[DispatchInfo] + ) -> None: + """Initialize the actor.""" + super().__init__(name="MockActor") + logging.info("MockActor created") + self.initial_dispatch = initial_dispatch + self.receiver = receiver + async def _run(self) -> None: while True: await asyncio.sleep(1) @@ -55,39 +65,45 @@ async def _run(self) -> None: class TestEnv: """Test environment.""" - actor: Actor - runner_actor: DispatchManagingActor + actors_service: ActorDispatcher running_status_sender: Sender[Dispatch] - updates_receiver: Receiver[DispatchUpdate] generator: DispatchGenerator = DispatchGenerator() + @property + def actor(self) -> MockActor | None: + """Return the actor.""" + # pylint: disable=protected-access + if self.actors_service._actor is None: + return None + return cast(MockActor, self.actors_service._actor) + # pylint: enable=protected-access + + @property + def updates_receiver(self) -> Receiver[DispatchInfo]: + """Return the updates receiver.""" + assert self.actor is not None + return self.actor.receiver + @fixture async def test_env() -> AsyncIterator[TestEnv]: """Create a test environment.""" channel = Broadcast[Dispatch](name="dispatch ready test channel") - updates_channel = Broadcast[DispatchUpdate](name="dispatch update test channel") - actor = MockActor() - - runner_actor = DispatchManagingActor( - actor=actor, + actors_service = ActorDispatcher( + actor_factory=MockActor, running_status_receiver=channel.new_receiver(), - updates_sender=updates_channel.new_sender(), ) - # pylint: disable=protected-access - runner_actor._restart_limit = 0 - runner_actor.start() + actors_service.start() + await asyncio.sleep(1) yield TestEnv( - actor=actor, - runner_actor=runner_actor, + actors_service=actors_service, running_status_sender=channel.new_sender(), - updates_receiver=updates_channel.new_receiver(), ) - await runner_actor.stop() + await actors_service.stop() async def test_simple_start_stop( @@ -112,23 +128,31 @@ async def test_simple_start_stop( ), ) + # Send status update to start actor, expect no DispatchInfo for the start await test_env.running_status_sender.send(Dispatch(dispatch)) fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + await asyncio.sleep(1) + logging.info("Sent dispatch") - event = await test_env.updates_receiver.receive() + assert test_env.actor is not None + event = test_env.actor.initial_dispatch assert event.options == {"test": True} assert event.components == dispatch.target assert event.dry_run is False + logging.info("Received dispatch") + + assert test_env.actor is not None assert test_env.actor.is_running is True fake_time.shift(duration) await test_env.running_status_sender.send(Dispatch(dispatch)) - # Give await actor.stop a chance to run in DispatchManagingActor - await asyncio.sleep(0.1) + # Give await actor.stop a chance to run + await asyncio.sleep(1) - assert test_env.actor.is_running is False + assert test_env.actor is None def test_heapq_dispatch_compare(test_env: TestEnv) -> None: @@ -198,19 +222,22 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - await test_env.running_status_sender.send(Dispatch(dispatch)) fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) - event = await test_env.updates_receiver.receive() + assert test_env.actor is not None + event = test_env.actor.initial_dispatch assert event.dry_run is dispatch.dry_run assert event.components == dispatch.target assert event.options == dispatch.payload + assert test_env.actor is not None assert test_env.actor.is_running is True assert dispatch.duration is not None fake_time.shift(dispatch.duration) await test_env.running_status_sender.send(Dispatch(dispatch)) - # Give await actor.stop a chance to run in DispatchManagingActor - await asyncio.sleep(0.1) + # Give await actor.stop a chance to run + await asyncio.sleep(1) - assert test_env.actor.is_running is False + assert test_env.actor is None