Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resiliency of long-running async tasks #1081

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- Many long running async tasks including metric streamers in the BatteryPool now have automatic recovery in case of exceptions.
25 changes: 24 additions & 1 deletion src/frequenz/sdk/_internal/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@


import asyncio
import logging
from abc import ABC
from typing import Any
from datetime import timedelta
from typing import Any, Callable, Coroutine

_logger = logging.getLogger(__name__)


async def cancel_and_await(task: asyncio.Task[Any]) -> None:
Expand All @@ -28,6 +32,25 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None:
pass


async def run_forever(
async_callable: Callable[[], Coroutine[Any, Any, None]],
interval: timedelta = timedelta(seconds=1),
Copy link
Contributor

@llucax llucax Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future it might be nice to be able to pass a retry strategy. A good reason to move the retry module from client-base to core (frequenz-floss/frequenz-core-python#34).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExponentialBackoff is hard to do with generic functions, because it needs to know if a previous attempt succeeded, so that it can reset its backoff interval.

This is easy to do with streams, we just reset it after every incoming message. But for functions, we'd need something like a timer to reset the backoff interval, if the function hasn't failed for a certain interval, then we say it succeeded, for example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe we can have a strategy that actually takes into account for how long the task ran, so if last time it ran for a long time it waits very little to restart and if it failed immediately it waits quite a long time to restart 🤔

) -> None:
"""Run a given function forever, restarting it after any exception.

Args:
async_callable: The async callable to run.
interval: The interval between restarts.
"""
interval_s = interval.total_seconds()
while True:
try:
await async_callable()
except Exception: # pylint: disable=broad-except
_logger.exception("Restarting after exception")
await asyncio.sleep(interval_s)


class NotSyncConstructible(AssertionError):
"""Raised when object with async constructor is created in sync way."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MeterData,
)

from ..._internal._asyncio import run_forever
from ..._internal._channels import ChannelRegistry
from ...microgrid import connection_manager
from ...timeseries import Sample
Expand Down Expand Up @@ -460,7 +461,7 @@ async def _update_streams(
self.comp_data_tasks[comp_id].cancel()

self.comp_data_tasks[comp_id] = asyncio.create_task(
self._handle_data_stream(comp_id, category)
run_forever(lambda: self._handle_data_stream(comp_id, category))
)

async def add_metric(self, request: ComponentMetricRequest) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from typing_extensions import override

from ....._internal._asyncio import run_forever
from ....._internal._math import is_close_to_zero
from .....timeseries import Power, Sample3Phase, Voltage
from .... import _data_pipeline, connection_manager
Expand Down Expand Up @@ -89,7 +90,7 @@ async def start(self) -> None:
"""Start the ev charger data manager."""
# Need to start a task only if there are EV chargers in the component graph.
if self._ev_charger_ids:
self._task = asyncio.create_task(self._run_forever())
self._task = asyncio.create_task(run_forever(self._run))

@override
async def distribute_power(self, request: Request) -> None:
Expand Down Expand Up @@ -217,15 +218,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]:
)
return {component_id: target_power}

async def _run_forever(self) -> None:
"""Run the EV charger manager forever."""
while True:
try:
await self._run()
except Exception: # pylint: disable=broad-except
_logger.exception("Recovering from an error in EV charger manager.")
await asyncio.sleep(1.0)

async def _run(self) -> None: # pylint: disable=too-many-locals
"""Run the main event loop of the EV charger manager."""
api = connection_manager.get().api_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from typing_extensions import override

from ...._internal._asyncio import run_forever
from ....actor._background_service import BackgroundService
from ... import connection_manager
from ._blocking_status import BlockingStatus
Expand Down Expand Up @@ -80,7 +81,7 @@ def __init__( # pylint: disable=too-many-arguments
@override
def start(self) -> None:
"""Start the status tracker."""
self._tasks.add(asyncio.create_task(self._run_forever()))
self._tasks.add(asyncio.create_task(run_forever(self._run)))

def _is_working(self, ev_data: EVChargerData) -> bool:
"""Return whether the given data indicates that the component is working."""
Expand All @@ -99,17 +100,6 @@ def _is_stale(self, ev_data: EVChargerData) -> bool:
stale = now - ev_data.timestamp > self._max_data_age
return stale

async def _run_forever(self) -> None:
"""Run the status tracker forever."""
while True:
try:
await self._run()
except Exception: # pylint: disable=broad-except
_logger.exception(
"Restarting after exception in EVChargerStatusTracker.run()"
)
await asyncio.sleep(1.0)

def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum:
"""Handle new EV charger data."""
if self._is_stale(ev_data):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from frequenz.client.microgrid import InverterComponentState, InverterData
from typing_extensions import override

from ...._internal._asyncio import run_forever
from ....actor._background_service import BackgroundService
from ... import connection_manager
from ._blocking_status import BlockingStatus
Expand Down Expand Up @@ -76,7 +77,7 @@ def __init__( # pylint: disable=too-many-arguments
@override
def start(self) -> None:
"""Start the status tracker."""
self._tasks.add(asyncio.create_task(self._run_forever()))
self._tasks.add(asyncio.create_task(run_forever(self._run)))

def _is_working(self, pv_data: InverterData) -> bool:
"""Return whether the given data indicates that the PV inverter is working."""
Expand All @@ -87,16 +88,6 @@ def _is_working(self, pv_data: InverterData) -> bool:
InverterComponentState.STANDBY,
)

async def _run_forever(self) -> None:
while True:
try:
await self._run()
except Exception: # pylint: disable=broad-except
_logger.exception(
"Restarting after exception in PVInverterStatusTracker.run()"
)
await asyncio.sleep(1.0)

def _is_stale(self, pv_data: InverterData) -> bool:
"""Return whether the given data is stale."""
now = datetime.now(tz=timezone.utc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
from typing_extensions import override

from ..._internal._asyncio import run_forever
from ..._internal._channels import ChannelRegistry
from ...actor import Actor
from ...timeseries import Power
Expand Down Expand Up @@ -194,7 +195,7 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None:

# Start the bounds tracker, for ongoing updates.
self._bound_tracker_tasks[component_ids] = asyncio.create_task(
self._bounds_tracker(component_ids, bounds_receiver)
run_forever(lambda: self._bounds_tracker(component_ids, bounds_receiver))
)

def _calculate_shifted_bounds(
Expand Down
8 changes: 5 additions & 3 deletions src/frequenz/sdk/timeseries/battery_pool/_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from frequenz.channels import Broadcast, Receiver

from ..._internal._asyncio import cancel_and_await
from ..._internal._asyncio import cancel_and_await, run_forever
from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC
from ...microgrid._power_distributing._component_managers._battery_manager import (
_get_battery_inverter_mappings,
Expand Down Expand Up @@ -104,8 +104,10 @@ def __init__(
self._update_event = asyncio.Event()
self._cached_metrics: dict[int, ComponentMetricsData] = {}

self._update_task = asyncio.create_task(self._update_and_notify())
self._send_task = asyncio.create_task(self._send_on_update(min_update_interval))
self._update_task = asyncio.create_task(run_forever(self._update_and_notify))
self._send_task = asyncio.create_task(
run_forever(lambda: self._send_on_update(min_update_interval))
)
self._pending_data_fetchers: set[asyncio.Task[ComponentMetricsData | None]] = (
set()
)
Expand Down
14 changes: 11 additions & 3 deletions src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,17 @@ def calculate(
#
# Therefore, the variables are named with a `_x100` suffix.
usable_capacity_x100 = capacity * (soc_upper_bound - soc_lower_bound)
soc_scaled = (
(soc - soc_lower_bound) / (soc_upper_bound - soc_lower_bound) * 100.0
)
if math.isclose(soc_upper_bound, soc_lower_bound):
if soc < soc_lower_bound:
soc_scaled = 0.0
else:
soc_scaled = 100.0
else:
soc_scaled = (
(soc - soc_lower_bound)
/ (soc_upper_bound - soc_lower_bound)
* 100.0
)
# we are clamping here because the SoC might be out of bounds
soc_scaled = min(max(soc_scaled, 0.0), 100.0)
timestamp = max(timestamp, metrics.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@


import asyncio
import logging
from collections import abc

from frequenz.channels import Receiver, Sender, merge, select, selected_from
from frequenz.client.microgrid import EVChargerData

from ..._internal._asyncio import run_forever
from ...actor import BackgroundService
from ...microgrid import connection_manager
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
from .. import Power
from .._base_types import Bounds, SystemBounds

_logger = logging.getLogger(__name__)


class EVCSystemBoundsTracker(BackgroundService):
"""Track the system bounds for the EV chargers.
Expand Down Expand Up @@ -55,7 +53,7 @@ def __init__(

def start(self) -> None:
"""Start the EV charger system bounds tracker."""
self._tasks.add(asyncio.create_task(self._run_forever()))
self._tasks.add(asyncio.create_task(run_forever(self._run)))

async def _send_bounds(self) -> None:
"""Calculate and send the aggregate system bounds if they have changed."""
Expand Down Expand Up @@ -104,17 +102,6 @@ async def _send_bounds(self) -> None:
)
await self._bounds_sender.send(self._last_sent_bounds)

async def _run_forever(self) -> None:
"""Run the status tracker forever."""
while True:
try:
await self._run()
except Exception: # pylint: disable=broad-except
_logger.exception(
"Restarting after exception in EVChargerSystemBoundsTracker.run()"
)
await asyncio.sleep(1.0)

async def _run(self) -> None:
"""Run the system bounds tracker."""
api_client = connection_manager.get().api_client
Expand Down
17 changes: 2 additions & 15 deletions src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
"""System bounds tracker for PV inverters."""

import asyncio
import logging
from collections import abc

from frequenz.channels import Receiver, Sender, merge, select, selected_from
from frequenz.client.microgrid import InverterData

from ..._internal._asyncio import run_forever
from ...actor import BackgroundService
from ...microgrid import connection_manager
from ...microgrid._power_distributing._component_status import ComponentPoolStatus
from .._base_types import Bounds, SystemBounds
from .._quantities import Power

_logger = logging.getLogger(__name__)


class PVSystemBoundsTracker(BackgroundService):
"""Track the system bounds for PV inverters.
Expand Down Expand Up @@ -54,7 +52,7 @@ def __init__(

def start(self) -> None:
"""Start the PV inverter system bounds tracker."""
self._tasks.add(asyncio.create_task(self._run_forever()))
self._tasks.add(asyncio.create_task(run_forever(self._run)))

async def _send_bounds(self) -> None:
"""Calculate and send the aggregate system bounds if they have changed."""
Expand Down Expand Up @@ -103,17 +101,6 @@ async def _send_bounds(self) -> None:
)
await self._bounds_sender.send(self._last_sent_bounds)

async def _run_forever(self) -> None:
"""Run the system bounds tracker."""
while True:
try:
await self._run()
except Exception: # pylint: disable=broad-except
_logger.exception(
"Restarting after exception in PVSystemBoundsTracker.run()"
)
await asyncio.sleep(1.0)

async def _run(self) -> None:
"""Run the system bounds tracker."""
api_client = connection_manager.get().api_client
Expand Down
Loading