Skip to content

Commit

Permalink
Enhancements from the last PR (#114)
Browse files Browse the repository at this point in the history
- **Use shorter variant of the same code**
- **Clean up retry-tasks on shutdown**
  • Loading branch information
Marenz authored Mar 4, 2025
2 parents f316eb2 + 2350b4c commit ecaca06
Showing 1 changed file with 15 additions and 26 deletions.
41 changes: 15 additions & 26 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,25 @@ async def main():
```
"""

class RetryFailedDispatches:
"""Manages the retry of failed dispatches."""
class FailedDispatchesRetrier(BackgroundService):
"""Manages the retring of failed dispatches."""

def __init__(self, retry_interval: timedelta) -> None:
"""Initialize the retry manager.
Args:
retry_interval: The interval between retries.
"""
super().__init__()
self._retry_interval = retry_interval
self._channel = Broadcast[Dispatch](name="retry_channel")
self._sender = self._channel.new_sender()
self._tasks: set[asyncio.Task[None]] = set()

def start(self) -> None:
"""Start the background service.
This is a no-op.
"""

def new_receiver(self) -> Receiver[Dispatch]:
"""Create a new receiver for dispatches to retry.
Expand Down Expand Up @@ -187,7 +193,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
retry_interval: timedelta | None = timedelta(seconds=60),
retry_interval: timedelta = timedelta(seconds=60),
) -> None:
"""Initialize the dispatch handler.
Expand All @@ -197,7 +203,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
retry_interval: The interval between retries. If `None`, retries are disabled.
retry_interval: The interval between retries.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
Expand All @@ -211,11 +217,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
name="dispatch_updates_channel", resend_latest=True
)
self._updates_sender = self._updates_channel.new_sender()
self._retrier = (
ActorDispatcher.RetryFailedDispatches(retry_interval)
if retry_interval
else None
)
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)

def start(self) -> None:
"""Start the background service."""
Expand Down Expand Up @@ -258,12 +260,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
dispatch.type,
exc_info=e,
)
if self._retrier:
self._retrier.retry(dispatch)
else:
_logger.error(
"No retry mechanism enabled, dispatch %r failed", dispatch
)
self._retrier.retry(dispatch)
else:
# No exception occurred, so we can add the actor to the list
self._actors[identity] = actor
Expand All @@ -275,26 +272,18 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
actor: Actor | None = None
identity = self._dispatch_identity(stopping_dispatch)

actor = self._actors.get(identity)

if actor:
if actor := self._actors.pop(identity, None):
await actor.stop(msg)

del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)

async def _run(self) -> None:
"""Run the background service."""
if not self._retrier:
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch)
else:
async with self._retrier:
retry_recv = self._retrier.new_receiver()

async for selected in select(retry_recv, self._dispatch_rx):
Expand Down

0 comments on commit ecaca06

Please sign in to comment.