Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement merge strategy based on TYPE+TARGET #98

Merged
merged 5 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ This release introduces a more flexible and powerful mechanism for managing disp

## New Features

* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.

* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.

6 changes: 6 additions & 0 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
"""

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._bg_service import MergeStrategy
from ._dispatch import Dispatch
from ._dispatcher import Dispatcher
from ._event import Created, Deleted, DispatchEvent, Updated
from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget

__all__ = [
"Created",
Expand All @@ -29,4 +31,8 @@
"Dispatch",
"ActorDispatcher",
"DispatchInfo",
"MergeStrategy",
"MergeByIdentity",
"MergeByType",
"MergeByTypeTarget",
]
94 changes: 59 additions & 35 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@

"""The dispatch background service."""

from __future__ import annotations

import asyncio
import functools
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from heapq import heappop, heappush
Expand All @@ -23,6 +28,22 @@
"""The logger for this module."""


class MergeStrategy(ABC):
"""Base class for strategies to merge running intervals."""

@abstractmethod
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
"""Filter dispatches based on the strategy.

Args:
dispatches: All dispatches, available as context.
dispatch: The dispatch to filter.

Returns:
True if the dispatch should be included, False otherwise.
"""


# pylint: disable=too-many-instance-attributes
class DispatchScheduler(BackgroundService):
"""Dispatch background service.
Expand Down Expand Up @@ -119,19 +140,36 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
)

async def new_running_state_event_receiver(
self, type: str, *, unify_running_intervals: bool = True
self, type: str, *, merge_strategy: MergeStrategy | None = None
) -> Receiver[Dispatch]:
"""Create a new receiver for running state events of the specified type.

If `unify_running_intervals` is True, running intervals from multiple
dispatches of the same type are considered as one continuous running
period. In this mode, any stop events are ignored as long as at least
one dispatch remains active.
`merge_strategy` is an instance of a class derived from
[`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies
are:

* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
of the same type
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
dispatches of the same type and target
* `None` — no merging, just send all events

You can make your own strategy by subclassing:

* [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges
dispatches based on a user defined identity function
* [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based
on a user defined filter function

Running intervals from multiple dispatches will be merged, according to
the chosen strategy.

While merging, stop events are ignored as long as at least one
merge-criteria-matching dispatch remains active.

Args:
type: The type of events to receive.
unify_running_intervals: Whether to unify running intervals.

merge_strategy: The merge strategy to use.
Returns:
A new receiver for running state status.
"""
Expand All @@ -140,33 +178,21 @@ async def new_running_state_event_receiver(
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
]

# Create receiver with enough capacity to hold all matching dispatches
# Create a new receiver with at least 30 slots, but more if there are
# more dispatches.
# That way we can send all dispatches initially and don't have to worry
# about the receiver being full.
# If there are no initial dispatches, we still want to have some slots
# available for future dispatches, so we set the limit to 30.
receiver = self._running_state_status_channel.new_receiver(
limit=max(1, len(dispatches))
limit=max(30, len(dispatches))
).filter(lambda dispatch: dispatch.type == type)

if unify_running_intervals:

def _is_type_still_running(new_dispatch: Dispatch) -> bool:
"""Merge time windows of running dispatches.

Any event that would cause a stop is filtered if at least one
dispatch of the same type is running.
"""
if new_dispatch.started:
return True

other_dispatches_running = any(
dispatch.started
for dispatch in self._dispatches.values()
if dispatch.type == type
)
# If no other dispatches are running, we can allow the stop event
return not other_dispatches_running

receiver = receiver.filter(_is_type_still_running)
if merge_strategy:
receiver = receiver.filter(
functools.partial(merge_strategy.filter, self._dispatches)
)

# Send all matching dispatches to the receiver
for dispatch in dispatches:
await self._send_running_state_change(dispatch)

Expand Down Expand Up @@ -195,9 +221,6 @@ async def _run(self) -> None:
if selected_from(selected, self._next_event_timer):
if not self._scheduled_events:
continue
_logger.debug(
"Executing scheduled event: %s", self._scheduled_events[0].dispatch
)
await self._execute_scheduled_event(
heappop(self._scheduled_events).dispatch
)
Expand Down Expand Up @@ -227,6 +250,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
Args:
dispatch: The dispatch to execute.
"""
_logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started)
await self._send_running_state_change(dispatch)

# The timer is always a tiny bit delayed, so we need to check if the
Expand Down Expand Up @@ -256,7 +280,7 @@ async def _fetch(self) -> None:
for client_dispatch in page:
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = Dispatch(client_dispatch)
self._dispatches[dispatch.id] = dispatch
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
_logger.debug("New dispatch: %s", dispatch)
Expand Down Expand Up @@ -310,7 +334,7 @@ async def _update_dispatch_schedule_and_notify(
self._remove_scheduled(old_dispatch)

was_running = old_dispatch.started
old_dispatch._set_deleted() # pylint: disable=protected-access)
old_dispatch._set_deleted() # pylint: disable=protected-access

# If the dispatch was running, we need to notify
if was_running:
Expand Down
50 changes: 33 additions & 17 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from frequenz.channels import Receiver
from frequenz.client.dispatch import Client

from ._bg_service import DispatchScheduler
from ._bg_service import DispatchScheduler, MergeStrategy
from ._dispatch import Dispatch
from ._event import DispatchEvent

Expand All @@ -16,17 +16,19 @@ class Dispatcher:
"""A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API.
It provides two channels:
It provides two receiver functions:

Lifecycle events:
A channel that sends a dispatch event message whenever a dispatch
is created, updated or deleted.
* [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
Receives an event whenever a dispatch is created, updated or deleted.
* [Running status change
receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]:
Receives an event whenever the running status of a dispatch changes.
The running status of a dispatch can change due to a variety of reasons,
such as but not limited to the dispatch being started, stopped, modified
or deleted or reaching its scheduled start or end time.

Running status change:
Sends a dispatch message whenever a dispatch is ready
to be executed according to the schedule or the running status of the
dispatch changed in a way that could potentially require the consumer to start,
stop or reconfigure itself.
Any change that could potentially require the consumer to start, stop or
reconfigure itself will cause a message to be sent.

Example: Processing running state change dispatches
```python
Expand Down Expand Up @@ -200,7 +202,10 @@ def new_lifecycle_events_receiver(
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)

async def new_running_state_event_receiver(
self, dispatch_type: str, *, unify_running_intervals: bool = True
self,
dispatch_type: str,
*,
merge_strategy: MergeStrategy | None = None,
) -> Receiver[Dispatch]:
"""Return running state event receiver.

Expand Down Expand Up @@ -228,18 +233,29 @@ async def new_running_state_event_receiver(
- The payload changed
- The dispatch was deleted

If `unify_running_intervals` is True, running intervals from multiple
dispatches of the same type are considered as one continuous running
period. In this mode, any stop events are ignored as long as at least
one dispatch remains active.
`merge_strategy` is an instance of a class derived from
[`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies
are:

* [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches
of the same type
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all
dispatches of the same type and target
* `None` — no merging, just send all events (default)

Running intervals from multiple dispatches will be merged, according to
the chosen strategy.

While merging, stop events are ignored as long as at least one
merge-criteria-matching dispatch remains active.

Args:
dispatch_type: The type of the dispatch to listen for.
unify_running_intervals: Whether to unify running intervals.
merge_strategy: The type of the strategy to merge running intervals.

Returns:
A new receiver for dispatches whose running status changed.
"""
return await self._bg_service.new_running_state_event_receiver(
dispatch_type, unify_running_intervals=unify_running_intervals
dispatch_type, merge_strategy=merge_strategy
)
67 changes: 67 additions & 0 deletions src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Different merge strategies for dispatch running state events."""

import logging
from abc import abstractmethod
from collections.abc import Mapping

from typing_extensions import override

from ._bg_service import MergeStrategy
from ._dispatch import Dispatch


class MergeByIdentity(MergeStrategy):
"""Merge running intervals based on a dispatch configuration."""

@abstractmethod
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""

@override
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
"""Filter dispatches based on the merge strategy.

Keeps start events.
Keeps stop events only if no other dispatches matching the
strategy's criteria are running.
"""
if dispatch.started:
logging.debug("Keeping start event %s", dispatch.id)
return True

other_dispatches_running = any(
existing_dispatch.started
for existing_dispatch in dispatches.values()
if (
self.identity(existing_dispatch) == self.identity(dispatch)
and existing_dispatch.id != dispatch.id
)
)

logging.debug(
"stop event %s because other_dispatches_running=%s",
dispatch.id,
other_dispatches_running,
)
return not other_dispatches_running


class MergeByType(MergeByIdentity):
"""Merge running intervals based on the dispatch type."""

@override
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""
return hash(dispatch.type)


class MergeByTypeTarget(MergeByType):
"""Merge running intervals based on the dispatch type and target."""

@override
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""
return hash((dispatch.type, dispatch.target))
Loading
Loading