Skip to content

Commit

Permalink
Make ready_to_execute channel more general (#22)
Browse files Browse the repository at this point in the history
* Rename `ready_to_execute` channel to `running_status_change`
reflecting it's broader purpose.
* Small re-designs and changes about the interfaces in general
* Add own `Dispatch` type with useful functions on top of the original
base class
  • Loading branch information
Marenz authored May 15, 2024
2 parents 64dea91 + addb81d commit 69a72ec
Show file tree
Hide file tree
Showing 8 changed files with 604 additions and 184 deletions.
81 changes: 52 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,61 @@
## Introduction

A highlevel interface for the dispatch API.
The interface is made of the dispatch actor which should run once per microgrid.
It provides two channels for clients:
- "new_dispatches" for newly created dispatches
- "ready_dispatches" for dispatches that are ready to be executed

## Example Usage
See [the documentation](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch) for more information.

## Usage

The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher), the main entry point for the API, provides two channels:

* [Lifecycle events](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.lifecycle_events): A channel that sends a message whenever a [Dispatch][frequenz.dispatch.Dispatch] is created, updated or deleted.
* [Running status change](https://frequenz-floss.github.io/frequenz-dispatch-python/v0.1/reference/frequenz/dispatch/#frequenz.dispatch.Dispatcher.running_status_change): Sends a dispatch message whenever a dispatch is ready to be executed according to the schedule or the running status of the dispatch changed in a way that could potentially require the actor to start, stop or reconfigure itself.

### Example using the running status change channel

```python
async def run():
# dispatch helper sends out dispatches when they are due
dispatch_arrived = dispatch_helper.updated_dispatches().new_receiver()
dispatch_ready = dispatch_helper.ready_dispatches().new_receiver()

async for selected in select(dispatch_ready, dispatch_arrived):
if selected_from(selected, dispatch_ready):
dispatch = selected.value
match dispatch.type:
case DISPATCH_TYPE_BATTERY_CHARGE:
battery_pool = microgrid.battery_pool(dispatch.battery_set, task_id)
battery_pool.set_charge(dispatch.power)
...
if selected_from(selected, dispatch_arrived):
match selected.value:
case Created(dispatch):
log.info("New dispatch arrived %s", dispatch)
...
case Updated(dispatch):
log.info("Dispatch updated %s", dispatch)
...
case Deleted(dispatch):
log.info("Dispatch deleted %s", dispatch)
...
import os
import grpc.aio
from unittest.mock import MagicMock

async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")

service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
await dispatcher.start()

actor = MagicMock() # replace with your actor

changed_running_status_rx = dispatcher.running_status_change.new_receiver()

async for dispatch in changed_running_status_rx:
match dispatch.running("DEMO_TYPE"):
case RunningState.RUNNING:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.selector,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
case RunningState.STOPPED:
actor.stop() # this will stop the actor
case RunningState.DIFFERENT_TYPE:
pass # dispatch not for this type
```

## Supported Platforms
Expand Down
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
* `Dispatcher.ready_to_execute()` was renamed to `Dispatcher.running_status_change()`

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Introduced new class `Dispatch` (based on the client class) that contains useful functions and extended information about the received dispatch.
* `Dispatcher.client` was added to provide an easy access to the client for updating, deleting and creating dispatches

## Bug Fixes

Expand Down
19 changes: 16 additions & 3 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""A highlevel interface for the dispatch API."""
"""A highlevel interface for the dispatch API.
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
A small overview of the most important classes in this module:
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
"""

from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated

__all__ = [
"Created",
Expand All @@ -13,4 +24,6 @@
"Dispatcher",
"ReceiverFetcher",
"Updated",
"Dispatch",
"RunningState",
]
251 changes: 251 additions & 0 deletions src/frequenz/dispatch/_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Dispatch type with support for next_run calculation."""


import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Iterator, cast

from dateutil import rrule
from frequenz.client.dispatch.types import Dispatch as BaseDispatch
from frequenz.client.dispatch.types import Frequency, Weekday

_logger = logging.getLogger(__name__)
"""The logger for this module."""

_RRULE_FREQ_MAP = {
Frequency.MINUTELY: rrule.MINUTELY,
Frequency.HOURLY: rrule.HOURLY,
Frequency.DAILY: rrule.DAILY,
Frequency.WEEKLY: rrule.WEEKLY,
Frequency.MONTHLY: rrule.MONTHLY,
}
"""To map from our Frequency enum to the dateutil library enum."""

_RRULE_WEEKDAY_MAP = {
Weekday.MONDAY: rrule.MO,
Weekday.TUESDAY: rrule.TU,
Weekday.WEDNESDAY: rrule.WE,
Weekday.THURSDAY: rrule.TH,
Weekday.FRIDAY: rrule.FR,
Weekday.SATURDAY: rrule.SA,
Weekday.SUNDAY: rrule.SU,
}
"""To map from our Weekday enum to the dateutil library enum."""


class RunningState(Enum):
"""The running state of a dispatch."""

RUNNING = "RUNNING"
"""The dispatch is running."""

STOPPED = "STOPPED"
"""The dispatch is stopped."""

DIFFERENT_TYPE = "DIFFERENT_TYPE"
"""The dispatch is for a different type."""


@dataclass(frozen=True)
class Dispatch(BaseDispatch):
"""Dispatch type with extra functionality."""

deleted: bool = False
"""Whether the dispatch is deleted."""

running_state_change_synced: datetime | None = None
"""The last time a message was sent about the running state change."""

def __init__(
self,
client_dispatch: BaseDispatch,
deleted: bool = False,
running_state_change_synced: datetime | None = None,
):
"""Initialize the dispatch.
Args:
client_dispatch: The client dispatch.
deleted: Whether the dispatch is deleted.
running_state_change_synced: Timestamp of the last running state change message.
"""
super().__init__(**client_dispatch.__dict__)
# Work around frozen to set deleted
object.__setattr__(self, "deleted", deleted)
object.__setattr__(
self,
"running_state_change_synced",
running_state_change_synced,
)

def _set_deleted(self) -> None:
"""Mark the dispatch as deleted."""
object.__setattr__(self, "deleted", True)

@property
def _running_status_notified(self) -> bool:
"""Check that the latest running state change notification was sent.
Returns:
True if the latest running state change notification was sent, False otherwise.
"""
return self.running_state_change_synced == self.update_time

def _set_running_status_notified(self) -> None:
"""Mark the latest running state change notification as sent."""
object.__setattr__(self, "running_state_change_synced", self.update_time)

def running(self, type_: str) -> RunningState:
"""Check if the dispatch is currently supposed to be running.
Args:
type_: The type of the dispatch that should be running.
Returns:
RUNNING if the dispatch is running,
STOPPED if it is stopped,
DIFFERENT_TYPE if it is for a different type.
"""
if self.type != type_:
return RunningState.DIFFERENT_TYPE

if not self.active or self.deleted:
return RunningState.STOPPED

now = datetime.now(tz=timezone.utc)
if until := self._until(now):
return RunningState.RUNNING if now < until else RunningState.STOPPED

return RunningState.STOPPED

@property
def until(self) -> datetime | None:
"""Time when the dispatch should end.
Returns the time that a running dispatch should end.
If the dispatch is not running, None is returned.
Returns:
The time when the dispatch should end or None if the dispatch is not running.
"""
if not self.active or self.deleted:
return None

now = datetime.now(tz=timezone.utc)
return self._until(now)

@property
# noqa is needed because of a bug in pydoclint that makes it think a `return` without a return
# value needs documenting
def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405
"""Yield all missed runs of a dispatch.
Yields all missed runs of a dispatch.
If a running state change notification was not sent in time
due to connection issues, this method will yield all missed runs
since the last sent notification.
Returns:
A generator that yields all missed runs of a dispatch.
"""
if self.update_time == self.running_state_change_synced:
return

from_time = self.update_time
now = datetime.now(tz=timezone.utc)

while (next_run := self.next_run_after(from_time)) and next_run < now:
yield next_run
from_time = next_run

@property
def next_run(self) -> datetime | None:
"""Calculate the next run of a dispatch.
Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
return self.next_run_after(datetime.now(tz=timezone.utc))

def next_run_after(self, after: datetime) -> datetime | None:
"""Calculate the next run of a dispatch.
Args:
after: The time to calculate the next run from.
Returns:
The next run of the dispatch or None if the dispatch is finished.
"""
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
):
if after > self.start_time:
return None
return self.start_time

# Make sure no weekday is UNSPECIFIED
if Weekday.UNSPECIFIED in self.recurrence.byweekdays:
_logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id)
return None

# No type information for rrule, so we need to cast
return cast(datetime | None, self._prepare_rrule().after(after, inc=True))

def _prepare_rrule(self) -> rrule.rrule:
"""Prepare the rrule object.
Returns:
The rrule object.
"""
count, until = (None, None)
if end := self.recurrence.end_criteria:
count = end.count
until = end.until

rrule_obj = rrule.rrule(
freq=_RRULE_FREQ_MAP[self.recurrence.frequency],
dtstart=self.start_time,
count=count,
until=until,
byminute=self.recurrence.byminutes,
byhour=self.recurrence.byhours,
byweekday=[
_RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays
],
bymonthday=self.recurrence.bymonthdays,
bymonth=self.recurrence.bymonths,
interval=self.recurrence.interval,
)

return rrule_obj

def _until(self, now: datetime) -> datetime | None:
"""Calculate the time when the dispatch should end.
If no previous run is found, None is returned.
Args:
now: The current time.
Returns:
The time when the dispatch should end or None if the dispatch is not running.
"""
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
):
return self.start_time + self.duration

latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True)

if not latest_past_start:
return None

return latest_past_start + self.duration
Loading

0 comments on commit 69a72ec

Please sign in to comment.