Skip to content

Commit 28dc04d

Browse files
committed
Rewrite merging strategy design
Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e7fd295 commit 28dc04d

File tree

5 files changed

+138
-52
lines changed

5 files changed

+138
-52
lines changed

Diff for: RELEASE_NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
## New Features
1414

15-
* A new feature "unify running intervals" has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
15+
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
1616

1717
## Bug Fixes
1818

Diff for: src/frequenz/dispatch/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
1616
"""
1717

18+
from ._bg_service import MergeByType, MergeByTypeTarget, _MergeStrategy
1819
from ._dispatch import Dispatch
1920
from ._dispatcher import Dispatcher
2021
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -29,4 +30,7 @@
2930
"Dispatch",
3031
"DispatchManagingActor",
3132
"DispatchUpdate",
33+
"_MergeStrategy", # To allow for user strategies
34+
"MergeByType",
35+
"MergeByTypeTarget",
3236
]

Diff for: src/frequenz/dispatch/_bg_service.py

+86-30
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33

44
"""The dispatch background service."""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
import logging
10+
from abc import ABC, abstractmethod
811
from dataclasses import dataclass, field
912
from datetime import datetime, timedelta, timezone
1013
from heapq import heappop, heappush
14+
from typing import Callable
1115

1216
import grpc.aio
1317
from frequenz.channels import Broadcast, Receiver, select, selected_from
@@ -23,6 +27,77 @@
2327
"""The logger for this module."""
2428

2529

30+
class _MergeStrategy(ABC):
31+
"""Base class for strategies to merge running intervals."""
32+
33+
@abstractmethod
34+
def _get_filter_function(
35+
self,
36+
scheduler: DispatchScheduler,
37+
) -> Callable[[Dispatch], bool]:
38+
"""Get a filter function for dispatches.
39+
40+
Args:
41+
scheduler: The dispatch scheduler.
42+
43+
Returns:
44+
A filter function.
45+
"""
46+
47+
48+
class MergeByType(_MergeStrategy):
49+
"""Merge running intervals based on the dispatch type."""
50+
51+
def __init__(self) -> None:
52+
"""Initialize the strategy."""
53+
self._scheduler: DispatchScheduler
54+
self._new_dispatch: Dispatch
55+
56+
def _get_filter_function(
57+
self, scheduler: DispatchScheduler
58+
) -> Callable[[Dispatch], bool]:
59+
"""Get a filter function for dispatches."""
60+
self._scheduler = scheduler
61+
return self._filter_func
62+
63+
def _criteria(self, dispatch: Dispatch) -> bool:
64+
"""Define the criteria for checking other running dispatches."""
65+
return dispatch.type == self._new_dispatch.type
66+
67+
def _filter_func(self, new_dispatch: Dispatch) -> bool:
68+
"""Filter dispatches based on the merge strategy.
69+
70+
Keeps start events.
71+
Keeps stop events only if no other dispatches matching the
72+
strategy's criteria are running.
73+
"""
74+
if new_dispatch.started:
75+
return True
76+
77+
self._new_dispatch = new_dispatch
78+
79+
# pylint: disable=protected-access
80+
other_dispatches_running = any(
81+
dispatch.started
82+
for dispatch in self._scheduler._dispatches.values()
83+
if self._criteria(dispatch)
84+
)
85+
# pylint: enable=protected-access
86+
87+
return not other_dispatches_running
88+
89+
90+
class MergeByTypeTarget(MergeByType):
91+
"""Merge running intervals based on the dispatch type and target."""
92+
93+
def _criteria(self, dispatch: Dispatch) -> bool:
94+
"""Define the criteria for checking other running dispatches."""
95+
return (
96+
dispatch.type == self._new_dispatch.type
97+
and dispatch.target == self._new_dispatch.target
98+
)
99+
100+
26101
# pylint: disable=too-many-instance-attributes
27102
class DispatchScheduler(BackgroundService):
28103
"""Dispatch background service.
@@ -119,54 +194,35 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
119194
)
120195

121196
async def new_running_state_event_receiver(
122-
self, type: str, *, unify_running_intervals: bool = True
197+
self, type: str, *, merge_strategy: _MergeStrategy | None = None
123198
) -> Receiver[Dispatch]:
124199
"""Create a new receiver for running state events of the specified type.
125200
126-
If `unify_running_intervals` is True, running intervals from multiple
127-
dispatches of the same type are considered as one continuous running
128-
period. In this mode, any stop events are ignored as long as at least
129-
one dispatch remains active.
201+
`merge_strategy` can be one of `MergeByType` or `MergeByTypeTarget`.
202+
If set, running intervals from multiple dispatches will be merged,
203+
depending on the chosen strategy.
204+
When merging, stop events are ignored as long as at least one
205+
merge-criteria-matching dispatch remains active.
130206
131207
Args:
132208
type: The type of events to receive.
133-
unify_running_intervals: Whether to unify running intervals.
134-
209+
merge_strategy: The merge strategy to use.
135210
Returns:
136211
A new receiver for running state status.
137212
"""
138-
# Find all matching dispatches based on the type and collect them
139213
dispatches = [
140214
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
141215
]
142216

143-
# Create receiver with enough capacity to hold all matching dispatches
144217
receiver = self._running_state_status_channel.new_receiver(
145218
limit=max(1, len(dispatches))
146219
).filter(lambda dispatch: dispatch.type == type)
147220

148-
if unify_running_intervals:
149-
150-
def _is_type_still_running(new_dispatch: Dispatch) -> bool:
151-
"""Merge time windows of running dispatches.
152-
153-
Any event that would cause a stop is filtered if at least one
154-
dispatch of the same type is running.
155-
"""
156-
if new_dispatch.started:
157-
return True
158-
159-
other_dispatches_running = any(
160-
dispatch.started
161-
for dispatch in self._dispatches.values()
162-
if dispatch.type == type
163-
)
164-
# If no other dispatches are running, we can allow the stop event
165-
return not other_dispatches_running
166-
167-
receiver = receiver.filter(_is_type_still_running)
221+
if merge_strategy:
222+
# pylint: disable=protected-access
223+
receiver = receiver.filter(merge_strategy._get_filter_function(self))
224+
# pylint: enable=protected-access
168225

169-
# Send all matching dispatches to the receiver
170226
for dispatch in dispatches:
171227
await self._send_running_state_change(dispatch)
172228

Diff for: src/frequenz/dispatch/_dispatcher.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10-
from ._bg_service import DispatchScheduler
10+
from ._bg_service import DispatchScheduler, _MergeStrategy
1111
from ._dispatch import Dispatch
1212
from ._event import DispatchEvent
1313

@@ -200,7 +200,10 @@ def new_lifecycle_events_receiver(
200200
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203-
self, dispatch_type: str, *, unify_running_intervals: bool = True
203+
self,
204+
dispatch_type: str,
205+
*,
206+
merge_strategy: _MergeStrategy | None = None,
204207
) -> Receiver[Dispatch]:
205208
"""Return running state event receiver.
206209
@@ -228,18 +231,21 @@ async def new_running_state_event_receiver(
228231
- The payload changed
229232
- The dispatch was deleted
230233
231-
If `unify_running_intervals` is True, running intervals from multiple
232-
dispatches of the same type are considered as one continuous running
233-
period. In this mode, any stop events are ignored as long as at least
234-
one dispatch remains active.
234+
If `unify_running_intervals` is set, running intervals from multiple
235+
dispatches of the same type/type&target (depending on the chosen
236+
strategy) are considered as one continuous running
237+
period.
238+
In this mode, stop events are ignored as long as at least one (criteria
239+
matching) dispatch remains active.
235240
236241
Args:
237242
dispatch_type: The type of the dispatch to listen for.
238-
unify_running_intervals: Whether to unify running intervals.
243+
merge_strategy: The strategy to merge running intervals. One of
244+
`MergeByType` or `MergeByTypeTarget`.
239245
240246
Returns:
241247
A new receiver for dispatches whose running status changed.
242248
"""
243249
return await self._bg_service.new_running_state_event_receiver(
244-
dispatch_type, unify_running_intervals=unify_running_intervals
250+
dispatch_type, merge_strategy=merge_strategy
245251
)

0 commit comments

Comments
 (0)