From ed61541059072c9e8a4d5bc14ebef9e8f8bd5889 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian <sahas.subramanian@proton.me> Date: Fri, 27 Sep 2024 12:38:12 +0200 Subject: [PATCH 1/5] Improve resiliency of SoC calculation Without this commit, the SoC calculation task would crash and not recover if the upper and lower SoC bounds are the same. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me> --- .../timeseries/battery_pool/_metric_calculator.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py index a5ee0eb05..a4691173c 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py @@ -397,9 +397,17 @@ def calculate( # # Therefore, the variables are named with a `_x100` suffix. usable_capacity_x100 = capacity * (soc_upper_bound - soc_lower_bound) - soc_scaled = ( - (soc - soc_lower_bound) / (soc_upper_bound - soc_lower_bound) * 100.0 - ) + if math.isclose(soc_upper_bound, soc_lower_bound): + if soc < soc_lower_bound: + soc_scaled = 0.0 + else: + soc_scaled = 100.0 + else: + soc_scaled = ( + (soc - soc_lower_bound) + / (soc_upper_bound - soc_lower_bound) + * 100.0 + ) # we are clamping here because the SoC might be out of bounds soc_scaled = min(max(soc_scaled, 0.0), 100.0) timestamp = max(timestamp, metrics.timestamp) From bc358fbf7d1576885bbb52ae4977a493ca79dbe9 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian <sahas.subramanian@proton.me> Date: Fri, 27 Sep 2024 16:01:01 +0200 Subject: [PATCH 2/5] Add a generic `_internal._asyncio.run_forever` function The function takes a callable that would return a corouting, and keep running it forever. This commit also replaces the custom `_run_forever` implementations with the generic `run_forever`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me> --- src/frequenz/sdk/_internal/_asyncio.py | 25 ++++++++++++++++++- .../_ev_charger_manager.py | 12 ++------- .../_ev_charger_status_tracker.py | 14 ++--------- .../_pv_inverter_status_tracker.py | 13 ++-------- .../ev_charger_pool/_system_bounds_tracker.py | 17 ++----------- .../pv_pool/_system_bounds_tracker.py | 17 ++----------- 6 files changed, 34 insertions(+), 64 deletions(-) diff --git a/src/frequenz/sdk/_internal/_asyncio.py b/src/frequenz/sdk/_internal/_asyncio.py index 1d82f239f..fc71e56f8 100644 --- a/src/frequenz/sdk/_internal/_asyncio.py +++ b/src/frequenz/sdk/_internal/_asyncio.py @@ -5,8 +5,12 @@ import asyncio +import logging from abc import ABC -from typing import Any +from datetime import timedelta +from typing import Any, Callable, Coroutine + +_logger = logging.getLogger(__name__) async def cancel_and_await(task: asyncio.Task[Any]) -> None: @@ -28,6 +32,25 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None: pass +async def run_forever( + async_callable: Callable[[], Coroutine[Any, Any, None]], + interval: timedelta = timedelta(seconds=1), +) -> None: + """Run a given function forever, restarting it after any exception. + + Args: + async_callable: The async callable to run. + interval: The interval between restarts. + """ + interval_s = interval.total_seconds() + while True: + try: + await async_callable() + except Exception: # pylint: disable=broad-except + _logger.exception("Restarting after exception") + await asyncio.sleep(interval_s) + + class NotSyncConstructible(AssertionError): """Raised when object with async constructor is created in sync way.""" diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 80817016d..62a33c37a 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -24,6 +24,7 @@ ) from typing_extensions import override +from ....._internal._asyncio import run_forever from ....._internal._math import is_close_to_zero from .....timeseries import Power, Sample3Phase, Voltage from .... import _data_pipeline, connection_manager @@ -89,7 +90,7 @@ async def start(self) -> None: """Start the ev charger data manager.""" # Need to start a task only if there are EV chargers in the component graph. if self._ev_charger_ids: - self._task = asyncio.create_task(self._run_forever()) + self._task = asyncio.create_task(run_forever(self._run)) @override async def distribute_power(self, request: Request) -> None: @@ -217,15 +218,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]: ) return {component_id: target_power} - async def _run_forever(self) -> None: - """Run the EV charger manager forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception("Recovering from an error in EV charger manager.") - await asyncio.sleep(1.0) - async def _run(self) -> None: # pylint: disable=too-many-locals """Run the main event loop of the EV charger manager.""" api = connection_manager.get().api_client diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py index 5b7e454c8..5285239dd 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py @@ -17,6 +17,7 @@ ) from typing_extensions import override +from ...._internal._asyncio import run_forever from ....actor._background_service import BackgroundService from ... import connection_manager from ._blocking_status import BlockingStatus @@ -80,7 +81,7 @@ def __init__( # pylint: disable=too-many-arguments @override def start(self) -> None: """Start the status tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) def _is_working(self, ev_data: EVChargerData) -> bool: """Return whether the given data indicates that the component is working.""" @@ -99,17 +100,6 @@ def _is_stale(self, ev_data: EVChargerData) -> bool: stale = now - ev_data.timestamp > self._max_data_age return stale - async def _run_forever(self) -> None: - """Run the status tracker forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in EVChargerStatusTracker.run()" - ) - await asyncio.sleep(1.0) - def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum: """Handle new EV charger data.""" if self._is_stale(ev_data): diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py index 658926973..f3ad7ebe5 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py @@ -12,6 +12,7 @@ from frequenz.client.microgrid import InverterComponentState, InverterData from typing_extensions import override +from ...._internal._asyncio import run_forever from ....actor._background_service import BackgroundService from ... import connection_manager from ._blocking_status import BlockingStatus @@ -76,7 +77,7 @@ def __init__( # pylint: disable=too-many-arguments @override def start(self) -> None: """Start the status tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) def _is_working(self, pv_data: InverterData) -> bool: """Return whether the given data indicates that the PV inverter is working.""" @@ -87,16 +88,6 @@ def _is_working(self, pv_data: InverterData) -> bool: InverterComponentState.STANDBY, ) - async def _run_forever(self) -> None: - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in PVInverterStatusTracker.run()" - ) - await asyncio.sleep(1.0) - def _is_stale(self, pv_data: InverterData) -> bool: """Return whether the given data is stale.""" now = datetime.now(tz=timezone.utc) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py index 12d5799bd..a4e7cb53c 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py @@ -5,20 +5,18 @@ import asyncio -import logging from collections import abc from frequenz.channels import Receiver, Sender, merge, select, selected_from from frequenz.client.microgrid import EVChargerData +from ..._internal._asyncio import run_forever from ...actor import BackgroundService from ...microgrid import connection_manager from ...microgrid._power_distributing._component_status import ComponentPoolStatus from .. import Power from .._base_types import Bounds, SystemBounds -_logger = logging.getLogger(__name__) - class EVCSystemBoundsTracker(BackgroundService): """Track the system bounds for the EV chargers. @@ -55,7 +53,7 @@ def __init__( def start(self) -> None: """Start the EV charger system bounds tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) async def _send_bounds(self) -> None: """Calculate and send the aggregate system bounds if they have changed.""" @@ -104,17 +102,6 @@ async def _send_bounds(self) -> None: ) await self._bounds_sender.send(self._last_sent_bounds) - async def _run_forever(self) -> None: - """Run the status tracker forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in EVChargerSystemBoundsTracker.run()" - ) - await asyncio.sleep(1.0) - async def _run(self) -> None: """Run the system bounds tracker.""" api_client = connection_manager.get().api_client diff --git a/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py index 0c6f803ff..8a8a5c7f5 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py @@ -4,20 +4,18 @@ """System bounds tracker for PV inverters.""" import asyncio -import logging from collections import abc from frequenz.channels import Receiver, Sender, merge, select, selected_from from frequenz.client.microgrid import InverterData +from ..._internal._asyncio import run_forever from ...actor import BackgroundService from ...microgrid import connection_manager from ...microgrid._power_distributing._component_status import ComponentPoolStatus from .._base_types import Bounds, SystemBounds from .._quantities import Power -_logger = logging.getLogger(__name__) - class PVSystemBoundsTracker(BackgroundService): """Track the system bounds for PV inverters. @@ -54,7 +52,7 @@ def __init__( def start(self) -> None: """Start the PV inverter system bounds tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) async def _send_bounds(self) -> None: """Calculate and send the aggregate system bounds if they have changed.""" @@ -103,17 +101,6 @@ async def _send_bounds(self) -> None: ) await self._bounds_sender.send(self._last_sent_bounds) - async def _run_forever(self) -> None: - """Run the system bounds tracker.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in PVSystemBoundsTracker.run()" - ) - await asyncio.sleep(1.0) - async def _run(self) -> None: """Run the system bounds tracker.""" api_client = connection_manager.get().api_client From f3445125a4a97d59203a962f594605cb372ec4d3 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian <sahas.subramanian@proton.me> Date: Fri, 27 Sep 2024 16:04:04 +0200 Subject: [PATCH 3/5] Update `battery_pool` metric streamers to run forever. This makes sure that when a streaming method for any of the battery pool metrics raises an exception, the method will be restarted. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me> --- src/frequenz/sdk/timeseries/battery_pool/_methods.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_methods.py b/src/frequenz/sdk/timeseries/battery_pool/_methods.py index 8eea713d8..9ad44670b 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_methods.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_methods.py @@ -12,7 +12,7 @@ from frequenz.channels import Broadcast, Receiver -from ..._internal._asyncio import cancel_and_await +from ..._internal._asyncio import cancel_and_await, run_forever from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC from ...microgrid._power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, @@ -104,8 +104,10 @@ def __init__( self._update_event = asyncio.Event() self._cached_metrics: dict[int, ComponentMetricsData] = {} - self._update_task = asyncio.create_task(self._update_and_notify()) - self._send_task = asyncio.create_task(self._send_on_update(min_update_interval)) + self._update_task = asyncio.create_task(run_forever(self._update_and_notify)) + self._send_task = asyncio.create_task( + run_forever(lambda: self._send_on_update(min_update_interval)) + ) self._pending_data_fetchers: set[asyncio.Task[ComponentMetricsData | None]] = ( set() ) From 9bad8c94591876890a0af7964942a0d742bed5be Mon Sep 17 00:00:00 2001 From: Sahas Subramanian <sahas.subramanian@proton.me> Date: Fri, 27 Sep 2024 16:49:14 +0200 Subject: [PATCH 4/5] Add exception-recovery for more long-running functions These appear to be the only remaining long-running functions that don't have their own exception handling. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me> --- .../sdk/microgrid/_data_sourcing/microgrid_api_source.py | 3 ++- .../sdk/microgrid/_power_managing/_power_managing_actor.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 2b3701397..34486cfcc 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -18,6 +18,7 @@ MeterData, ) +from ..._internal._asyncio import run_forever from ..._internal._channels import ChannelRegistry from ...microgrid import connection_manager from ...timeseries import Sample @@ -460,7 +461,7 @@ async def _update_streams( self.comp_data_tasks[comp_id].cancel() self.comp_data_tasks[comp_id] = asyncio.create_task( - self._handle_data_stream(comp_id, category) + run_forever(lambda: self._handle_data_stream(comp_id, category)) ) async def add_metric(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 8360632d1..ed7f01c45 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -15,6 +15,7 @@ from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType from typing_extensions import override +from ..._internal._asyncio import run_forever from ..._internal._channels import ChannelRegistry from ...actor import Actor from ...timeseries import Power @@ -194,7 +195,7 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None: # Start the bounds tracker, for ongoing updates. self._bound_tracker_tasks[component_ids] = asyncio.create_task( - self._bounds_tracker(component_ids, bounds_receiver) + run_forever(lambda: self._bounds_tracker(component_ids, bounds_receiver)) ) def _calculate_shifted_bounds( From cd0c5325a8bea6623a70714aba041f0a01600c97 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian <sahas.subramanian@proton.me> Date: Fri, 27 Sep 2024 17:02:13 +0200 Subject: [PATCH 5/5] Update release notes Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me> --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1b7bfb787..91e1059f1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,4 +16,4 @@ ## Bug Fixes -<!-- Here goes notable bug fixes that are worth a special mention or explanation --> +- Many long running async tasks including metric streamers in the BatteryPool now have automatic recovery in case of exceptions.