diff --git a/cobald_hep_plugins/timer.py b/cobald_hep_plugins/timer.py new file mode 100644 index 0000000..25bd37c --- /dev/null +++ b/cobald_hep_plugins/timer.py @@ -0,0 +1,92 @@ +"""Time-based demand control decorators and helpers.""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, time as datetime_time +from typing import Mapping, Union, Iterable + +from cobald.daemon import service +from cobald.interfaces import Pool, PoolDecorator + +TimeInput = Union[str, datetime_time] +Schedule = Mapping[TimeInput, float] + + +def str_to_time(value: str) -> datetime_time: + """Convert a HH:MM string into a :class:`datetime.time` object.""" + try: + hours_str, minutes_str = value.strip().split(":", 1) + hours, minutes = int(hours_str), int(minutes_str) + except ValueError as exc: + raise ValueError(f"invalid time specification {value}") + return datetime_time(hour=hours, minute=minutes) + + +def latest_timestamp( + times: Iterable[datetime_time], + *, + reference: datetime_time | None = None, # easier to test with +) -> datetime_time: + """Return the latest timestamp with respect to the current time.""" + ordered_times = sorted(times) # ascending + current_time = datetime.now().time() if reference is None else reference + for candidate in reversed(ordered_times): # descending + if candidate <= current_time: + return candidate + # All configured times are in the future relative to ``current_time``. + # return latest timestamp + return ordered_times[-1] + + +@service(flavour=asyncio) +class Timer(PoolDecorator): + """ + Decorator that adjusts demand based on a daily schedule. + + :param target: pool being decorated + :param schedule: mapping of ``HH:MM`` times + to the demand that should become active at that time + :param interval: interval in seconds between schedule checks + """ + + def __init__( + self, + target: Pool, + schedule: Schedule, + interval: int = 300, + ): + super().__init__(target) + + assert interval > 0, "Interval must be a positive integer." + self.interval = interval + + schedule = {str_to_time(key): value for key,value in schedule.items()} + self.schedule = schedule + self.latest_sched_demand = self._refresh_demand() + + @property + def demand(self) -> float: + return self.target.demand + + @demand.setter + def demand(self, value: float) -> None: + # Ignore user supplied demand and always enforce the scheduled value. + self.target.demand = self.latest_sched_demand + + async def run(self) -> None: + """Update the demand periodically according to the schedule.""" + while True: + self._refresh_demand() + await asyncio.sleep(self.interval) + + def _refresh_demand(self) -> float: + """Look up the demand that should currently be active.""" + latest_sched_time = latest_timestamp(self.schedule.keys()) + self.latest_sched_demand = self.schedule[latest_sched_time] + self.target.demand = self.latest_sched_demand + + assert self.latest_sched_demand >= 0.0 + assert self.target.demand >= 0.0 + + return self.latest_sched_demand diff --git a/docs/index.rst b/docs/index.rst index f243cc8..7a55197 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -27,6 +27,7 @@ HEP Plugins for COBalD/TARDIS :caption: Available Plugins :hidden: + plugins/timer plugins/stopper plugins/example diff --git a/docs/plugins/timer.rst b/docs/plugins/timer.rst new file mode 100644 index 0000000..f126a0c --- /dev/null +++ b/docs/plugins/timer.rst @@ -0,0 +1,66 @@ +############# +Timer Plugin +############# + +.. py:module:: cobald_hep_plugins.timer + :synopsis: Time-of-day demand controller + +The :class:`~cobald_hep_plugins.timer.Timer` plugin enforces a recurring, +time-of-day schedule for the demand of a target pool. + +At its core the Timer plugin stores an ordered mapping of ``HH:MM`` to demand values. Whenever the demand is updated, +``Timer`` looks for the latest schedule entry that is not later than the current +time and applies the demand from that entplugin pool. If the current +time is earlier than the first entry, the scheduled value from the previous day +becomes active. User supplied values to :attr:`demand` are ignored so that the +scheduled demand is always in effect. + +Configuration +============= + +The class accepts three arguments: + +``target`` + The pool whose demand should follow the schedule. This is the same target + you would normally use in a COBalD pipeline. + +``schedule`` + A mapping where the key is either ``HH:MM`` and the + value is a floating point demand. The schedule must at least contain one + entry and demand values must be non-negative. + +``interval`` (optional) + Number of seconds between two schedule checks. Defaults to ``300`` seconds. + +Schedule semantics +================== + +Demands remain active until the next configured timestamp is reached. For +example with the fragment below. + +.. code:: yaml + + schedule: + '08:00': 50 # from 08:00 until 12:30 + '12:30': 120 # from 12:30 until 20:15 + '20:15': 10 # from 20:15 until 08:00 next day + + +Example configuration +===================== + +An example snippet for the YAML interface in a COBalD configuration file: + +.. code:: yaml + + - !Timer + target: SomePool + schedule: + '06:00': 20 + '09:00': 80 + '17:30': 10 + interval: 120 + +With the configuration above the decorator wakes up every two minutes, updates +``target`` with the most recent demand of the schedule and thus enforces a +repeatable daily pattern. diff --git a/pyproject.toml b/pyproject.toml index 085edff..c928796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ Source = "https://github.com/MatterMiners/cobald-hep-plugins" # v name used in YAML v v --- package/module --- v v class/function CobaldHepProjectExample = "cobald_hep_plugins.example:DemandScale" Stopper = "cobald_hep_plugins.stopper:Stopper" +Timer = "cobald_hep_plugins.timer:Timer" [tool.flit.module] name = "cobald_hep_plugins" diff --git a/tests/test_timer.py b/tests/test_timer.py new file mode 100644 index 0000000..042a509 --- /dev/null +++ b/tests/test_timer.py @@ -0,0 +1,80 @@ +from cobald_hep_plugins.timer import Timer, latest_timestamp, str_to_time +import pytest +import datetime +from datetime import time as datetime_time +from .utility import MockPool + +SCHEDULE = { + "18:03": 30.0, + "06:59": 10.0, + "00:30": 0.0, + "12:38": 20.0, +} +SCHEDULE_DATETIME = { + datetime_time(18, 3): 30.0, + datetime_time(6, 59): 10.0, + datetime_time(0, 30): 0.0, + datetime_time(12, 38): 20.0, +} + + +def test_str_to_time(): + for str_time, expected_time in zip(SCHEDULE.keys(), SCHEDULE_DATETIME.keys()): + assert str_to_time(str_time) == expected_time + with pytest.raises(ValueError): + str_to_time("invalid") + with pytest.raises(ValueError): + str_to_time("25:00") + with pytest.raises(ValueError): + str_to_time("12:60") + +def test_schedule(): + timer = Timer(target=MockPool, schedule=SCHEDULE) + expected = { + datetime_time(18, 3): 30.0, + datetime_time(6, 59): 10.0, + datetime_time(0, 30): 0.0, + datetime_time(12, 38): 20.0, + } + assert timer.schedule == expected + + +@pytest.mark.parametrize( + "reference, expected_latest", + [ + (datetime_time(18, 2), datetime_time(12, 38)), + (datetime_time(18, 3), datetime_time(18, 3)), + (datetime_time(18, 4), datetime_time(18, 3)), + (datetime_time(0, 0), datetime_time(18, 3)), + (datetime_time(23, 59), datetime_time(18, 3)), + (datetime_time(7, 0), datetime_time(6, 59)), + (datetime_time(12, 1), datetime_time(6, 59)), + ], +) +def test_latest_timestamp(reference, expected_latest): + latest = latest_timestamp(times=SCHEDULE_DATETIME.keys(), reference=reference) + assert latest == expected_latest + +def test_demand(): + timer = Timer(target=MockPool, schedule=SCHEDULE) + for demand in [ + 15.0, + 25.0, + 5.0, + 0.0, + 30.0, + ]: + + # set demand directly + timer.latest_sched_demand = demand + # ignore demand setter + timer.demand = demand + 2. + assert timer.demand == demand + +def test_non_negative_demand(): + schedule = { + "10:00": -5.0, + } + with pytest.raises(AssertionError): + timer = Timer(target=MockPool, schedule=schedule) +