Skip to content

Commit de35812

Browse files
committed
Rewrite merging strategy design
Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 96c8bc8 commit de35812

File tree

6 files changed

+214
-70
lines changed

6 files changed

+214
-70
lines changed

RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
## New Features
1515

16-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
16+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1717

1818
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
1919

src/frequenz/dispatch/__init__.py

+7
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
1616
"""
1717

18+
from ._bg_service import MergeStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
2122
from ._managing_actor import DispatchManagingActor, DispatchUpdate
23+
from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget, NoMerge
2224

2325
__all__ = [
2426
"Created",
@@ -29,4 +31,9 @@
2931
"Dispatch",
3032
"DispatchManagingActor",
3133
"DispatchUpdate",
34+
"MergeStrategy",
35+
"MergeByIdentity",
36+
"MergeByType",
37+
"MergeByTypeTarget",
38+
"NoMerge",
3239
]

src/frequenz/dispatch/_bg_service.py

+56-31
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
9+
import functools
710
import logging
11+
from abc import ABC, abstractmethod
12+
from collections.abc import ValuesView
813
from dataclasses import dataclass, field
914
from datetime import datetime, timedelta, timezone
1015
from heapq import heappop, heappush
@@ -23,6 +28,22 @@
2328
"""The logger for this module."""
2429

2530

31+
class MergeStrategy(ABC):
32+
"""Base class for strategies to merge running intervals."""
33+
34+
@abstractmethod
35+
def filter(self, dispatches: dict[int, Dispatch], dispatch: Dispatch) -> bool:
36+
"""Filter dispatches based on the strategy.
37+
38+
Args:
39+
dispatches: All dispatches, available as context.
40+
dispatch: The dispatch to filter.
41+
42+
Returns:
43+
True if the dispatch should be included, False otherwise.
44+
"""
45+
46+
2647
# pylint: disable=too-many-instance-attributes
2748
class DispatchScheduler(BackgroundService):
2849
"""Dispatch background service.
@@ -104,6 +125,11 @@ def __init__(
104125
always at index 0.
105126
"""
106127

128+
@property
129+
def dispatches(self) -> ValuesView[Dispatch]:
130+
"""All currently cached dispatches."""
131+
return self._dispatches.values()
132+
107133
# pylint: disable=redefined-builtin
108134
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
109135
"""Create a new receiver for lifecycle events.
@@ -119,54 +145,53 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119145
)
120146

121147
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
148+
self, type: str, *, merge_strategy: MergeStrategy
123149
) -> Receiver[Dispatch]:
124150
"""Create a new receiver for running state events of the specified type.
125151
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
152+
`merge_strategy` is an instance of a class derived from
153+
[`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
154+
are:
155+
156+
* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
157+
of the same type
158+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
159+
dispatches of the same type and target
160+
* [`NoMerge`][frequenz.dispatch.NoMerge] — no merging, just send all
161+
events
162+
163+
You can make your own strategy by subclassing
164+
165+
* [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
166+
dispatches based on a user defined identity function
167+
* [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
168+
on a user defined filter function
169+
170+
Running intervals from multiple dispatches will be merged, according to
171+
the chosen strategy.
172+
173+
While merging, stop events are ignored as long as at least one
174+
merge-criteria-matching dispatch remains active.
130175
131176
Args:
132177
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
178+
merge_strategy: The merge strategy to use.
135179
Returns:
136180
A new receiver for running state status.
137181
"""
138-
# Find all matching dispatches based on the type and collect them
139182
dispatches = [
140183
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141184
]
142185

143-
# Create receiver with enough capacity to hold all matching dispatches
144186
receiver = self._running_state_status_channel.new_receiver(
145-
limit=max(1, len(dispatches))
187+
limit=max(30, len(dispatches))
146188
).filter(lambda dispatch: dispatch.type == type)
147189

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
190+
if merge_strategy:
191+
receiver = receiver.filter(
192+
functools.partial(merge_strategy.filter, self._dispatches)
193+
)
168194

169-
# Send all matching dispatches to the receiver
170195
for dispatch in dispatches:
171196
await self._send_running_state_change(dispatch)
172197

src/frequenz/dispatch/_dispatcher.py

+36-17
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,29 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
13+
from ._merge_strategies import NoMerge
1314

1415

1516
class Dispatcher:
1617
"""A highlevel interface for the dispatch API.
1718
1819
This class provides a highlevel interface to the dispatch API.
19-
It provides two channels:
20+
It provides two receiver functions:
2021
21-
Lifecycle events:
22-
A channel that sends a dispatch event message whenever a dispatch
23-
is created, updated or deleted.
22+
* [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
23+
Receives an event whenever a dispatch is created, updated or deleted.
24+
* [Running status change
25+
receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]:
26+
Receives an event whenever the running status of a dispatch changes.
27+
The running status of a dispatch can change due to a variety of reasons,
28+
such as but not limited to the dispatch being started, stopped, modified
29+
or deleted or reaching its scheduled start or end time.
2430
25-
Running status change:
26-
Sends a dispatch message whenever a dispatch is ready
27-
to be executed according to the schedule or the running status of the
28-
dispatch changed in a way that could potentially require the consumer to start,
29-
stop or reconfigure itself.
31+
Any change that could potentially require the consumer to start, stop or
32+
reconfigure itself will cause a message to be sent.
3033
3134
Example: Processing running state change dispatches
3235
```python
@@ -200,7 +203,10 @@ def new_lifecycle_events_receiver(
200203
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201204

202205
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
206+
self,
207+
dispatch_type: str,
208+
*,
209+
merge_strategy: MergeStrategy = NoMerge(),
204210
) -> Receiver[Dispatch]:
205211
"""Return running state event receiver.
206212
@@ -228,18 +234,31 @@ async def new_running_state_event_receiver(
228234
- The payload changed
229235
- The dispatch was deleted
230236
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
237+
`merge_strategy` is an instance of a class derived from
238+
[`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
239+
are:
240+
241+
* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
242+
of the same type
243+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
244+
dispatches of the same type and target
245+
* [`NoMerge`][frequenz.dispatch.NoMerge] — no merging, just send all
246+
events (default)
247+
248+
Running intervals from multiple dispatches will be merged, according to
249+
the chosen strategy.
250+
251+
While merging, stop events are ignored as long as at least one
252+
merge-criteria-matching dispatch remains active.
235253
236254
Args:
237255
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
256+
merge_strategy: The type of the strategy to merge running intervals.
257+
One of `MergeByType` or `MergeByTypeTarget`.
239258
240259
Returns:
241260
A new receiver for dispatches whose running status changed.
242261
"""
243262
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
263+
dispatch_type, merge_strategy=merge_strategy
245264
)
+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Different merge strategies for dispatch running state events."""
5+
6+
import logging
7+
from abc import abstractmethod
8+
9+
from typing_extensions import override
10+
11+
from ._bg_service import MergeStrategy
12+
from ._dispatch import Dispatch
13+
14+
15+
class MergeByIdentity(MergeStrategy):
16+
"""Merge running intervals based on a dispatch configuration."""
17+
18+
@abstractmethod
19+
def identity(self, dispatch: Dispatch) -> int:
20+
"""Identity function for the merge criteria."""
21+
22+
@override
23+
def filter(self, dispatches: dict[int, Dispatch], dispatch: Dispatch) -> bool:
24+
"""Filter dispatches based on the merge strategy.
25+
26+
Keeps start events.
27+
Keeps stop events only if no other dispatches matching the
28+
strategy's criteria are running.
29+
"""
30+
if dispatch.started:
31+
logging.debug("Keeping start event %s", dispatch.id)
32+
return True
33+
34+
other_dispatches_running = any(
35+
existing_dispatch.started
36+
for existing_dispatch in dispatches.values()
37+
if (
38+
self.identity(existing_dispatch) == self.identity(dispatch)
39+
and existing_dispatch.id != dispatch.id
40+
)
41+
)
42+
43+
logging.debug(
44+
"stop event %s because other_dispatches_running=%s",
45+
dispatch.id,
46+
other_dispatches_running,
47+
)
48+
return not other_dispatches_running
49+
50+
51+
class MergeByType(MergeByIdentity):
52+
"""Merge running intervals based on the dispatch type."""
53+
54+
@override
55+
def identity(self, dispatch: Dispatch) -> int:
56+
"""Identity function for the merge criteria."""
57+
return hash(dispatch.type)
58+
59+
60+
class MergeByTypeTarget(MergeByType):
61+
"""Merge running intervals based on the dispatch type and target."""
62+
63+
@override
64+
def identity(self, dispatch: Dispatch) -> int:
65+
"""Identity function for the merge criteria."""
66+
return hash((dispatch.type, dispatch.target))
67+
68+
69+
class NoMerge(MergeByIdentity):
70+
"""Treat each dispatch individually. No merging."""
71+
72+
@override
73+
def identity(self, dispatch: Dispatch) -> int:
74+
"""Identity function for the merge criteria."""
75+
return dispatch.id

0 commit comments

Comments
 (0)