From e58f0123b0ff217c76c03f66f918b8d3b55dfebd Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 21 Jan 2025 17:07:40 +0100 Subject: [PATCH 1/5] Rewrite merging strategy design Implement merge strategy based on TYPE+TARGET Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 + src/frequenz/dispatch/__init__.py | 6 ++ src/frequenz/dispatch/_bg_service.py | 77 ++++++++++++++-------- src/frequenz/dispatch/_dispatcher.py | 50 +++++++++----- src/frequenz/dispatch/_merge_strategies.py | 67 +++++++++++++++++++ tests/test_frequenz_dispatch.py | 60 +++++++++++------ 6 files changed, 196 insertions(+), 66 deletions(-) create mode 100644 src/frequenz/dispatch/_merge_strategies.py diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dd74f73..4fe06aa 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -18,5 +18,7 @@ This release introduces a more flexible and powerful mechanism for managing disp ## New Features +* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. + * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index fe6a504..f239803 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -16,9 +16,11 @@ """ from ._actor_dispatcher import ActorDispatcher, DispatchInfo +from ._bg_service import MergeStrategy from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated +from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget __all__ = [ "Created", @@ -29,4 +31,8 @@ "Dispatch", "ActorDispatcher", "DispatchInfo", + "MergeStrategy", + "MergeByIdentity", + "MergeByType", + "MergeByTypeTarget", ] diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 9e466a6..b5ba072 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -3,8 +3,13 @@ """The dispatch background service.""" +from __future__ import annotations + import asyncio +import functools import logging +from abc import ABC, abstractmethod +from collections.abc import Mapping from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from heapq import heappop, heappush @@ -23,6 +28,22 @@ """The logger for this module.""" +class MergeStrategy(ABC): + """Base class for strategies to merge running intervals.""" + + @abstractmethod + def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: + """Filter dispatches based on the strategy. + + Args: + dispatches: All dispatches, available as context. + dispatch: The dispatch to filter. + + Returns: + True if the dispatch should be included, False otherwise. + """ + + # pylint: disable=too-many-instance-attributes class DispatchScheduler(BackgroundService): """Dispatch background service. @@ -119,19 +140,36 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: ) async def new_running_state_event_receiver( - self, type: str, *, unify_running_intervals: bool = True + self, type: str, *, merge_strategy: MergeStrategy | None = None ) -> Receiver[Dispatch]: """Create a new receiver for running state events of the specified type. - If `unify_running_intervals` is True, running intervals from multiple - dispatches of the same type are considered as one continuous running - period. In this mode, any stop events are ignored as long as at least - one dispatch remains active. + `merge_strategy` is an instance of a class derived from + [`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies + are: + + * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches + of the same type + * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all + dispatches of the same type and target + * `None` — no merging, just send all events + + You can make your own strategy by subclassing: + + * [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges + dispatches based on a user defined identity function + * [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based + on a user defined filter function + + Running intervals from multiple dispatches will be merged, according to + the chosen strategy. + + While merging, stop events are ignored as long as at least one + merge-criteria-matching dispatch remains active. Args: type: The type of events to receive. - unify_running_intervals: Whether to unify running intervals. - + merge_strategy: The merge strategy to use. Returns: A new receiver for running state status. """ @@ -145,28 +183,11 @@ async def new_running_state_event_receiver( limit=max(1, len(dispatches)) ).filter(lambda dispatch: dispatch.type == type) - if unify_running_intervals: - - def _is_type_still_running(new_dispatch: Dispatch) -> bool: - """Merge time windows of running dispatches. - - Any event that would cause a stop is filtered if at least one - dispatch of the same type is running. - """ - if new_dispatch.started: - return True - - other_dispatches_running = any( - dispatch.started - for dispatch in self._dispatches.values() - if dispatch.type == type - ) - # If no other dispatches are running, we can allow the stop event - return not other_dispatches_running - - receiver = receiver.filter(_is_type_still_running) + if merge_strategy: + receiver = receiver.filter( + functools.partial(merge_strategy.filter, self._dispatches) + ) - # Send all matching dispatches to the receiver for dispatch in dispatches: await self._send_running_state_change(dispatch) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 512318d..f2eca19 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -7,7 +7,7 @@ from frequenz.channels import Receiver from frequenz.client.dispatch import Client -from ._bg_service import DispatchScheduler +from ._bg_service import DispatchScheduler, MergeStrategy from ._dispatch import Dispatch from ._event import DispatchEvent @@ -16,17 +16,19 @@ class Dispatcher: """A highlevel interface for the dispatch API. This class provides a highlevel interface to the dispatch API. - It provides two channels: + It provides two receiver functions: - Lifecycle events: - A channel that sends a dispatch event message whenever a dispatch - is created, updated or deleted. + * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]: + Receives an event whenever a dispatch is created, updated or deleted. + * [Running status change + receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]: + Receives an event whenever the running status of a dispatch changes. + The running status of a dispatch can change due to a variety of reasons, + such as but not limited to the dispatch being started, stopped, modified + or deleted or reaching its scheduled start or end time. - Running status change: - Sends a dispatch message whenever a dispatch is ready - to be executed according to the schedule or the running status of the - dispatch changed in a way that could potentially require the consumer to start, - stop or reconfigure itself. + Any change that could potentially require the consumer to start, stop or + reconfigure itself will cause a message to be sent. Example: Processing running state change dispatches ```python @@ -200,7 +202,10 @@ def new_lifecycle_events_receiver( return self._bg_service.new_lifecycle_events_receiver(dispatch_type) async def new_running_state_event_receiver( - self, dispatch_type: str, *, unify_running_intervals: bool = True + self, + dispatch_type: str, + *, + merge_strategy: MergeStrategy | None = None, ) -> Receiver[Dispatch]: """Return running state event receiver. @@ -228,18 +233,29 @@ async def new_running_state_event_receiver( - The payload changed - The dispatch was deleted - If `unify_running_intervals` is True, running intervals from multiple - dispatches of the same type are considered as one continuous running - period. In this mode, any stop events are ignored as long as at least - one dispatch remains active. + `merge_strategy` is an instance of a class derived from + [`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies + are: + + * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches + of the same type + * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all + dispatches of the same type and target + * `None` — no merging, just send all events (default) + + Running intervals from multiple dispatches will be merged, according to + the chosen strategy. + + While merging, stop events are ignored as long as at least one + merge-criteria-matching dispatch remains active. Args: dispatch_type: The type of the dispatch to listen for. - unify_running_intervals: Whether to unify running intervals. + merge_strategy: The type of the strategy to merge running intervals. Returns: A new receiver for dispatches whose running status changed. """ return await self._bg_service.new_running_state_event_receiver( - dispatch_type, unify_running_intervals=unify_running_intervals + dispatch_type, merge_strategy=merge_strategy ) diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py new file mode 100644 index 0000000..3753546 --- /dev/null +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -0,0 +1,67 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Different merge strategies for dispatch running state events.""" + +import logging +from abc import abstractmethod +from collections.abc import Mapping + +from typing_extensions import override + +from ._bg_service import MergeStrategy +from ._dispatch import Dispatch + + +class MergeByIdentity(MergeStrategy): + """Merge running intervals based on a dispatch configuration.""" + + @abstractmethod + def identity(self, dispatch: Dispatch) -> int: + """Identity function for the merge criteria.""" + + @override + def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: + """Filter dispatches based on the merge strategy. + + Keeps start events. + Keeps stop events only if no other dispatches matching the + strategy's criteria are running. + """ + if dispatch.started: + logging.debug("Keeping start event %s", dispatch.id) + return True + + other_dispatches_running = any( + existing_dispatch.started + for existing_dispatch in dispatches.values() + if ( + self.identity(existing_dispatch) == self.identity(dispatch) + and existing_dispatch.id != dispatch.id + ) + ) + + logging.debug( + "stop event %s because other_dispatches_running=%s", + dispatch.id, + other_dispatches_running, + ) + return not other_dispatches_running + + +class MergeByType(MergeByIdentity): + """Merge running intervals based on the dispatch type.""" + + @override + def identity(self, dispatch: Dispatch) -> int: + """Identity function for the merge criteria.""" + return hash(dispatch.type) + + +class MergeByTypeTarget(MergeByType): + """Merge running intervals based on the dispatch type and target.""" + + @override + def identity(self, dispatch: Dispatch) -> int: + """Identity function for the merge criteria.""" + return hash((dispatch.type, dispatch.target)) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 82802c1..0145bee 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -10,6 +10,7 @@ from typing import AsyncIterator, Iterator import async_solipsism +import pytest import time_machine from frequenz.channels import Receiver from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule @@ -18,7 +19,16 @@ from frequenz.client.dispatch.types import Dispatch as BaseDispatch from pytest import fixture -from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated +from frequenz.dispatch import ( + Created, + Deleted, + Dispatch, + DispatchEvent, + MergeByType, + MergeByTypeTarget, + MergeStrategy, + Updated, +) from frequenz.dispatch._bg_service import DispatchScheduler @@ -77,7 +87,7 @@ async def test_env() -> AsyncIterator[TestEnv]: service=service, lifecycle_events=service.new_lifecycle_events_receiver("TEST_TYPE"), running_state_change=await service.new_running_state_event_receiver( - "TEST_TYPE", unify_running_intervals=False + "TEST_TYPE", merge_strategy=MergeByType() ), client=client, microgrid_id=microgrid_id, @@ -446,7 +456,7 @@ async def test_dispatch_new_but_finished( lifecycle_events=test_env.service.new_lifecycle_events_receiver("TEST_TYPE"), running_state_change=( await test_env.service.new_running_state_event_receiver( - "TEST_TYPE", unify_running_intervals=False + "TEST_TYPE", merge_strategy=MergeByType() ) ), ) @@ -520,9 +530,11 @@ async def test_notification_on_actor_start( assert ready_dispatch.started -async def test_multiple_dispatches_unify_running_intervals( +@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()]) +async def test_multiple_dispatches_merge_running_intervals( fake_time: time_machine.Coordinates, generator: DispatchGenerator, + merge_strategy: MergeStrategy, ) -> None: """Test that multiple dispatches are merged into a single running interval.""" microgrid_id = randint(1, 100) @@ -534,7 +546,7 @@ async def test_multiple_dispatches_unify_running_intervals( service.start() receiver = await service.new_running_state_event_receiver( - "TEST_TYPE", unify_running_intervals=True + "TEST_TYPE", merge_strategy=merge_strategy ) # Create two overlapping dispatches @@ -542,6 +554,7 @@ async def test_multiple_dispatches_unify_running_intervals( generator.generate_dispatch(), active=True, duration=timedelta(seconds=30), + target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4], start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -550,6 +563,7 @@ async def test_multiple_dispatches_unify_running_intervals( generator.generate_dispatch(), active=True, duration=timedelta(seconds=10), + target=[3, 4], start_time=_now() + timedelta(seconds=10), # starts after dispatch1 recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -573,7 +587,7 @@ async def test_multiple_dispatches_unify_running_intervals( assert started1.started assert started2.started - # Stop dispatch2 first, but unify_running_intervals=True means as long as dispatch1 runs, + # Stop dispatch2 first, but merge_running_intervals=TYPE means as long as dispatch1 runs, # we do not send a stop event await client.update( microgrid_id=microgrid_id, dispatch_id=started2.id, new_fields={"active": False} @@ -592,14 +606,16 @@ async def test_multiple_dispatches_unify_running_intervals( await service.stop() -async def test_multiple_dispatches_sequential_intervals_unify( +@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()]) +async def test_multiple_dispatches_sequential_intervals_merge( fake_time: time_machine.Coordinates, generator: DispatchGenerator, + merge_strategy: MergeStrategy, ) -> None: """Test that multiple dispatches are merged into a single running interval. Even if dispatches don't overlap but are consecutive, - unify_running_intervals=True should treat them as continuous if any event tries to stop. + merge_running_intervals=TPYE should treat them as continuous if any event tries to stop. """ microgrid_id = randint(1, 100) client = FakeClient() @@ -607,13 +623,15 @@ async def test_multiple_dispatches_sequential_intervals_unify( service.start() receiver = await service.new_running_state_event_receiver( - "TEST_TYPE", unify_running_intervals=True + "TEST_TYPE", merge_strategy=merge_strategy ) dispatch1 = replace( generator.generate_dispatch(), active=True, duration=timedelta(seconds=5), + # If merging by type, we want to test having different targets in dispatch 1 and 2 + target=[3, 4] if isinstance(merge_strategy, MergeByType) else [1, 2], start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -623,6 +641,7 @@ async def test_multiple_dispatches_sequential_intervals_unify( generator.generate_dispatch(), active=True, duration=timedelta(seconds=5), + target=[1, 2], start_time=dispatch1.start_time + dispatch1.duration, recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -636,30 +655,30 @@ async def test_multiple_dispatches_sequential_intervals_unify( await lifecycle.receive() await lifecycle.receive() - fake_time.shift(timedelta(seconds=11)) + fake_time.move_to(dispatch1.start_time + timedelta(seconds=1)) await asyncio.sleep(1) started1 = await receiver.receive() assert started1.started # Wait for the second dispatch to start - fake_time.shift(timedelta(seconds=3)) + fake_time.move_to(dispatch2.start_time + timedelta(seconds=1)) await asyncio.sleep(1) started2 = await receiver.receive() assert started2.started + assert started2.target == dispatch2.target # Now stop the second dispatch - fake_time.shift(timedelta(seconds=5)) - await asyncio.sleep(1) + assert dispatch2.duration is not None + fake_time.move_to(dispatch2.start_time + dispatch2.duration + timedelta(seconds=1)) stopped = await receiver.receive() assert not stopped.started - await service.stop() - await asyncio.sleep(1) - +@pytest.mark.parametrize("merge_strategy", [MergeByType(), MergeByTypeTarget()]) async def test_at_least_one_running_filter( fake_time: time_machine.Coordinates, generator: DispatchGenerator, + merge_strategy: MergeStrategy, ) -> None: """Test scenarios directly tied to the _at_least_one_running logic.""" microgrid_id = randint(1, 100) @@ -667,9 +686,9 @@ async def test_at_least_one_running_filter( service = DispatchScheduler(microgrid_id=microgrid_id, client=client) service.start() - # unify_running_intervals is True, so we use merged intervals + # merge_running_intervals is TYPE, so we use merged intervals receiver = await service.new_running_state_event_receiver( - "TEST_TYPE", unify_running_intervals=True + "TEST_TYPE", merge_strategy=merge_strategy ) # Single dispatch that starts and stops normally @@ -677,6 +696,7 @@ async def test_at_least_one_running_filter( generator.generate_dispatch(), active=True, duration=timedelta(seconds=10), + target=[1, 2] if isinstance(merge_strategy, MergeByType) else [3, 4], start_time=_now() + timedelta(seconds=5), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -705,6 +725,7 @@ async def test_at_least_one_running_filter( generator.generate_dispatch(), active=False, duration=timedelta(seconds=10), + target=[3, 4], start_time=_now() + timedelta(seconds=50), recurrence=RecurrenceRule(), type="TEST_TYPE", @@ -738,6 +759,3 @@ async def test_at_least_one_running_filter( await asyncio.sleep(1) stopped_b = await receiver.receive() assert not stopped_b.started - - # Since dispatch_a never started, no merging logic needed here. - await service.stop() From 4d229f13e5bd1c9a6de8e59054a85e251f6bfa88 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 12:47:29 +0100 Subject: [PATCH 2/5] Set min. amount of receiver queue to 30 Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index b5ba072..6142394 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -178,9 +178,14 @@ async def new_running_state_event_receiver( dispatch for dispatch in self._dispatches.values() if dispatch.type == type ] - # Create receiver with enough capacity to hold all matching dispatches + # Create a new receiver with at least 30 slots, but more if there are + # more dispatches. + # That way we can send all dispatches initially and don't have to worry + # about the receiver being full. + # If there are no initial dispatches, we still want to have some slots + # available for future dispatches, so we set the limit to 30. receiver = self._running_state_status_channel.new_receiver( - limit=max(1, len(dispatches)) + limit=max(30, len(dispatches)) ).filter(lambda dispatch: dispatch.type == type) if merge_strategy: From 163acc54107ccb2856b76061a7a686926a631697 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 12:51:19 +0100 Subject: [PATCH 3/5] Move log output to relevant function Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 6142394..7e300a7 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -221,9 +221,6 @@ async def _run(self) -> None: if selected_from(selected, self._next_event_timer): if not self._scheduled_events: continue - _logger.debug( - "Executing scheduled event: %s", self._scheduled_events[0].dispatch - ) await self._execute_scheduled_event( heappop(self._scheduled_events).dispatch ) @@ -253,6 +250,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch to execute. """ + _logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started) await self._send_running_state_change(dispatch) # The timer is always a tiny bit delayed, so we need to check if the From 4f02bca6796ce7bc58741ba13f2be67b4b5c96ce Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 12:51:35 +0100 Subject: [PATCH 4/5] Avoid reconstructing the `Dispatch` object Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 7e300a7..700ed6a 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -280,7 +280,7 @@ async def _fetch(self) -> None: for client_dispatch in page: dispatch = Dispatch(client_dispatch) - self._dispatches[dispatch.id] = Dispatch(client_dispatch) + self._dispatches[dispatch.id] = dispatch old_dispatch = old_dispatches.pop(dispatch.id, None) if not old_dispatch: _logger.debug("New dispatch: %s", dispatch) From 93683e2ac1c4d1a56ff356894af31a9f6ffb604f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 12:51:48 +0100 Subject: [PATCH 5/5] Fix stray `)` in comment Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 700ed6a..c54c204 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -334,7 +334,7 @@ async def _update_dispatch_schedule_and_notify( self._remove_scheduled(old_dispatch) was_running = old_dispatch.started - old_dispatch._set_deleted() # pylint: disable=protected-access) + old_dispatch._set_deleted() # pylint: disable=protected-access # If the dispatch was running, we need to notify if was_running: