Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions cobald_hep_plugins/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Time-based demand control decorators and helpers."""

from __future__ import annotations

import asyncio
from datetime import datetime, time as datetime_time
from typing import Mapping, Union, Iterable

from cobald.daemon import service
from cobald.interfaces import Pool, PoolDecorator

TimeInput = Union[str, datetime_time]
Schedule = Mapping[TimeInput, float]


def str_to_time(value: str) -> datetime_time:
"""Convert a HH:MM string into a :class:`datetime.time` object."""
try:
hours_str, minutes_str = value.strip().split(":", 1)
hours, minutes = int(hours_str), int(minutes_str)
except ValueError as exc:
raise ValueError(f"invalid time specification {value}")
return datetime_time(hour=hours, minute=minutes)


def latest_timestamp(
times: Iterable[datetime_time],
*,
reference: datetime_time | None = None, # easier to test with
) -> datetime_time:
"""Return the latest timestamp with respect to the current time."""
ordered_times = sorted(times) # ascending
current_time = datetime.now().time() if reference is None else reference
for candidate in reversed(ordered_times): # descending
if candidate <= current_time:
return candidate
# All configured times are in the future relative to ``current_time``.
# return latest timestamp
return ordered_times[-1]


@service(flavour=asyncio)
class Timer(PoolDecorator):
"""
Decorator that adjusts demand based on a daily schedule.

:param target: pool being decorated
:param schedule: mapping of ``HH:MM`` times
to the demand that should become active at that time
:param interval: interval in seconds between schedule checks
"""

def __init__(
self,
target: Pool,
schedule: Schedule,
interval: int = 300,
):
super().__init__(target)

assert interval > 0, "Interval must be a positive integer."
self.interval = interval

schedule = {str_to_time(key): value for key,value in schedule.items()}
self.schedule = schedule
self.latest_sched_demand = self._refresh_demand()

@property
def demand(self) -> float:
return self.target.demand

@demand.setter
def demand(self, value: float) -> None:
# Ignore user supplied demand and always enforce the scheduled value.
self.target.demand = self.latest_sched_demand

async def run(self) -> None:
"""Update the demand periodically according to the schedule."""
while True:
self._refresh_demand()
await asyncio.sleep(self.interval)

def _refresh_demand(self) -> float:
"""Look up the demand that should currently be active."""
latest_sched_time = latest_timestamp(self.schedule.keys())
self.latest_sched_demand = self.schedule[latest_sched_time]
self.target.demand = self.latest_sched_demand

assert self.latest_sched_demand >= 0.0
assert self.target.demand >= 0.0

return self.latest_sched_demand
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ HEP Plugins for COBalD/TARDIS
:caption: Available Plugins
:hidden:

plugins/timer
plugins/stopper
plugins/example

Expand Down
66 changes: 66 additions & 0 deletions docs/plugins/timer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#############
Timer Plugin
#############

.. py:module:: cobald_hep_plugins.timer
:synopsis: Time-of-day demand controller

The :class:`~cobald_hep_plugins.timer.Timer` plugin enforces a recurring,
time-of-day schedule for the demand of a target pool.

At its core the Timer plugin stores an ordered mapping of ``HH:MM`` to demand values. Whenever the demand is updated,
``Timer`` looks for the latest schedule entry that is not later than the current
time and applies the demand from that entplugin pool. If the current
time is earlier than the first entry, the scheduled value from the previous day
becomes active. User supplied values to :attr:`demand` are ignored so that the
scheduled demand is always in effect.

Configuration
=============

The class accepts three arguments:

``target``
The pool whose demand should follow the schedule. This is the same target
you would normally use in a COBalD pipeline.

``schedule``
A mapping where the key is either ``HH:MM`` and the
value is a floating point demand. The schedule must at least contain one
entry and demand values must be non-negative.

``interval`` (optional)
Number of seconds between two schedule checks. Defaults to ``300`` seconds.

Schedule semantics
==================

Demands remain active until the next configured timestamp is reached. For
example with the fragment below.

.. code:: yaml

schedule:
'08:00': 50 # from 08:00 until 12:30
'12:30': 120 # from 12:30 until 20:15
'20:15': 10 # from 20:15 until 08:00 next day


Example configuration
=====================

An example snippet for the YAML interface in a COBalD configuration file:

.. code:: yaml

- !Timer
target: SomePool
schedule:
'06:00': 20
'09:00': 80
'17:30': 10
interval: 120

With the configuration above the decorator wakes up every two minutes, updates
``target`` with the most recent demand of the schedule and thus enforces a
repeatable daily pattern.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Source = "https://github.com/MatterMiners/cobald-hep-plugins"
# v name used in YAML v v --- package/module --- v v class/function
CobaldHepProjectExample = "cobald_hep_plugins.example:DemandScale"
Stopper = "cobald_hep_plugins.stopper:Stopper"
Timer = "cobald_hep_plugins.timer:Timer"

[tool.flit.module]
name = "cobald_hep_plugins"
80 changes: 80 additions & 0 deletions tests/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from cobald_hep_plugins.timer import Timer, latest_timestamp, str_to_time
import pytest
import datetime
from datetime import time as datetime_time
from .utility import MockPool

SCHEDULE = {
"18:03": 30.0,
"06:59": 10.0,
"00:30": 0.0,
"12:38": 20.0,
}
SCHEDULE_DATETIME = {
datetime_time(18, 3): 30.0,
datetime_time(6, 59): 10.0,
datetime_time(0, 30): 0.0,
datetime_time(12, 38): 20.0,
}


def test_str_to_time():
for str_time, expected_time in zip(SCHEDULE.keys(), SCHEDULE_DATETIME.keys()):
assert str_to_time(str_time) == expected_time
with pytest.raises(ValueError):
str_to_time("invalid")
with pytest.raises(ValueError):
str_to_time("25:00")
with pytest.raises(ValueError):
str_to_time("12:60")

def test_schedule():
timer = Timer(target=MockPool, schedule=SCHEDULE)
expected = {
datetime_time(18, 3): 30.0,
datetime_time(6, 59): 10.0,
datetime_time(0, 30): 0.0,
datetime_time(12, 38): 20.0,
}
assert timer.schedule == expected


@pytest.mark.parametrize(
"reference, expected_latest",
[
(datetime_time(18, 2), datetime_time(12, 38)),
(datetime_time(18, 3), datetime_time(18, 3)),
(datetime_time(18, 4), datetime_time(18, 3)),
(datetime_time(0, 0), datetime_time(18, 3)),
(datetime_time(23, 59), datetime_time(18, 3)),
(datetime_time(7, 0), datetime_time(6, 59)),
(datetime_time(12, 1), datetime_time(6, 59)),
],
)
def test_latest_timestamp(reference, expected_latest):
latest = latest_timestamp(times=SCHEDULE_DATETIME.keys(), reference=reference)
assert latest == expected_latest

def test_demand():
timer = Timer(target=MockPool, schedule=SCHEDULE)
for demand in [
15.0,
25.0,
5.0,
0.0,
30.0,
]:

# set demand directly
timer.latest_sched_demand = demand
# ignore demand setter
timer.demand = demand + 2.
assert timer.demand == demand

def test_non_negative_demand():
schedule = {
"10:00": -5.0,
}
with pytest.raises(AssertionError):
timer = Timer(target=MockPool, schedule=schedule)