Skip to content

Commit d990e0a

Browse files
authored
Redesign actor service (#103)
- **Re-create actors on dispatch updates** - **Rename DispatchManagingService to DispatchActorsService** - **Update release notes**
2 parents a600e3c + d8d3c5a commit d990e0a

File tree

5 files changed

+280
-211
lines changed

5 files changed

+280
-211
lines changed

RELEASE_NOTES.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
## Summary
44

5-
<!-- Here goes a general summary of what this release is about -->
5+
This release introduces a more flexible and powerful mechanism for managing dispatch events with new strategies for merging intervals, enhanced customization options, and better overall alignment with evolving SDK dependencies. It also simplifies actor initialization while maintaining robust support for diverse dispatch scenarios.
66

77
## Upgrading
88

99
* Two properties have been replaced by methods that require a type as parameter.
1010
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
1111
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
1212
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
13+
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
14+
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
15+
* It only supports a single actor at a time now.
16+
* Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information.
17+
* `DispatchUpdate` was renamed to `DispatchInfo`.
1318

1419
## New Features
1520

16-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
17-
1821
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
1922

20-
## Bug Fixes
21-
22-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/dispatch/__init__.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10-
* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to
11-
manage other actors based on incoming dispatches.
10+
* [ActorDispatcher][frequenz.dispatch.ActorDispatcher]: A service to manage other actors based on
11+
incoming dispatches.
1212
* [Created][frequenz.dispatch.Created],
1313
[Updated][frequenz.dispatch.Updated],
1414
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
1515
1616
"""
1717

18+
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
21-
from ._managing_actor import DispatchManagingActor, DispatchUpdate
2222

2323
__all__ = [
2424
"Created",
@@ -27,6 +27,6 @@
2727
"Dispatcher",
2828
"Updated",
2929
"Dispatch",
30-
"DispatchManagingActor",
31-
"DispatchUpdate",
30+
"ActorDispatcher",
31+
"DispatchInfo",
3232
]
+216
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import asyncio
7+
import logging
8+
from collections.abc import Callable
9+
from dataclasses import dataclass
10+
from typing import Any
11+
12+
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.client.dispatch.types import TargetComponents
14+
from frequenz.sdk.actor import Actor, BackgroundService
15+
16+
from ._dispatch import Dispatch
17+
18+
_logger = logging.getLogger(__name__)
19+
20+
21+
@dataclass(frozen=True, kw_only=True)
22+
class DispatchInfo:
23+
"""Event emitted when the dispatch changes."""
24+
25+
components: TargetComponents
26+
"""Components to be used."""
27+
28+
dry_run: bool
29+
"""Whether this is a dry run."""
30+
31+
options: dict[str, Any]
32+
"""Additional options."""
33+
34+
35+
class ActorDispatcher(BackgroundService):
36+
"""Helper class to manage actors based on dispatches.
37+
38+
Example usage:
39+
40+
```python
41+
import os
42+
import asyncio
43+
from typing import override
44+
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
45+
from frequenz.client.dispatch.types import TargetComponents
46+
from frequenz.client.common.microgrid.components import ComponentCategory
47+
from frequenz.channels import Receiver, Broadcast, select, selected_from
48+
from frequenz.sdk.actor import Actor, run
49+
50+
class MyActor(Actor):
51+
def __init__(
52+
self,
53+
*,
54+
name: str | None = None,
55+
) -> None:
56+
super().__init__(name=name)
57+
self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
58+
self._dry_run: bool = False
59+
self._options: dict[str, Any] = {}
60+
61+
@classmethod
62+
def new_with_dispatch(
63+
cls,
64+
initial_dispatch: DispatchInfo,
65+
dispatch_updates_receiver: Receiver[DispatchInfo],
66+
*,
67+
name: str | None = None,
68+
) -> "Self":
69+
self = cls(name=name)
70+
self._dispatch_updates_receiver = dispatch_updates_receiver
71+
self._update_dispatch_information(initial_dispatch)
72+
return self
73+
74+
@override
75+
async def _run(self) -> None:
76+
other_recv: Receiver[Any] = ...
77+
78+
if self._dispatch_updates_receiver is None:
79+
async for msg in other_recv:
80+
# do stuff
81+
...
82+
else:
83+
await self._run_with_dispatch(other_recv)
84+
85+
async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
86+
async for selected in select(self._dispatch_updates_receiver, other_recv):
87+
if selected_from(selected, self._dispatch_updates_receiver):
88+
self._update_dispatch_information(selected.message)
89+
elif selected_from(selected, other_recv):
90+
# do stuff
91+
...
92+
else:
93+
assert False, f"Unexpected selected receiver: {selected}"
94+
95+
def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
96+
print("Received update:", dispatch_update)
97+
self._dry_run = dispatch_update.dry_run
98+
self._options = dispatch_update.options
99+
match dispatch_update.components:
100+
case []:
101+
print("Dispatch: Using all components")
102+
case list() as ids if isinstance(ids[0], int):
103+
component_ids = ids
104+
case [ComponentCategory.BATTERY, *_]:
105+
component_category = ComponentCategory.BATTERY
106+
case unsupported:
107+
print(
108+
"Dispatch: Requested an unsupported selector %r, "
109+
"but only component IDs or category BATTERY are supported.",
110+
unsupported,
111+
)
112+
113+
async def main():
114+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
115+
key = os.getenv("DISPATCH_API_KEY", "some-key")
116+
117+
microgrid_id = 1
118+
119+
dispatcher = Dispatcher(
120+
microgrid_id=microgrid_id,
121+
server_url=url,
122+
key=key
123+
)
124+
dispatcher.start()
125+
126+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
127+
128+
managing_actor = DispatchManagingActor(
129+
actor_factory=MyActor.new_with_dispatch,
130+
running_status_receiver=status_receiver,
131+
)
132+
133+
await run(managing_actor)
134+
```
135+
"""
136+
137+
def __init__(
138+
self,
139+
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
140+
running_status_receiver: Receiver[Dispatch],
141+
) -> None:
142+
"""Initialize the dispatch handler.
143+
144+
Args:
145+
actor_factory: A callable that creates an actor with some initial dispatch
146+
information.
147+
running_status_receiver: The receiver for dispatch running status changes.
148+
"""
149+
super().__init__()
150+
self._dispatch_rx = running_status_receiver
151+
self._actor_factory = actor_factory
152+
self._actor: Actor | None = None
153+
self._updates_channel = Broadcast[DispatchInfo](
154+
name="dispatch_updates_channel", resend_latest=True
155+
)
156+
self._updates_sender = self._updates_channel.new_sender()
157+
158+
def start(self) -> None:
159+
"""Start the background service."""
160+
self._tasks.add(asyncio.create_task(self._run()))
161+
162+
async def _start_actor(self, dispatch: Dispatch) -> None:
163+
"""Start all actors."""
164+
dispatch_update = DispatchInfo(
165+
components=dispatch.target,
166+
dry_run=dispatch.dry_run,
167+
options=dispatch.payload,
168+
)
169+
170+
if self._actor:
171+
sent_str = ""
172+
if self._updates_sender is not None:
173+
sent_str = ", sent a dispatch update instead of creating a new actor"
174+
await self._updates_sender.send(dispatch_update)
175+
_logger.warning(
176+
"Actor for dispatch type %r is already running%s",
177+
dispatch.type,
178+
sent_str,
179+
)
180+
else:
181+
_logger.info("Starting actor for dispatch type %r", dispatch.type)
182+
self._actor = self._actor_factory(
183+
dispatch_update, self._updates_channel.new_receiver()
184+
)
185+
self._actor.start()
186+
187+
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
188+
"""Stop all actors.
189+
190+
Args:
191+
stopping_dispatch: The dispatch that is stopping the actor.
192+
msg: The message to be passed to the actors being stopped.
193+
"""
194+
if self._actor is None:
195+
_logger.warning(
196+
"Actor for dispatch type %r is not running", stopping_dispatch.type
197+
)
198+
else:
199+
await self._actor.stop(msg)
200+
self._actor = None
201+
202+
async def _run(self) -> None:
203+
"""Wait for dispatches and handle them."""
204+
async for dispatch in self._dispatch_rx:
205+
await self._handle_dispatch(dispatch=dispatch)
206+
207+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
208+
"""Handle a dispatch.
209+
210+
Args:
211+
dispatch: The dispatch to handle.
212+
"""
213+
if dispatch.started:
214+
await self._start_actor(dispatch)
215+
else:
216+
await self._stop_actor(dispatch, "Dispatch stopped")

0 commit comments

Comments
 (0)