diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 8bb76a8..6ce9594 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -133,8 +133,8 @@ async def main(): ``` """ - class RetryFailedDispatches: - """Manages the retry of failed dispatches.""" + class FailedDispatchesRetrier(BackgroundService): + """Manages the retring of failed dispatches.""" def __init__(self, retry_interval: timedelta) -> None: """Initialize the retry manager. @@ -142,10 +142,16 @@ def __init__(self, retry_interval: timedelta) -> None: Args: retry_interval: The interval between retries. """ + super().__init__() self._retry_interval = retry_interval self._channel = Broadcast[Dispatch](name="retry_channel") self._sender = self._channel.new_sender() - self._tasks: set[asyncio.Task[None]] = set() + + def start(self) -> None: + """Start the background service. + + This is a no-op. + """ def new_receiver(self) -> Receiver[Dispatch]: """Create a new receiver for dispatches to retry. @@ -187,7 +193,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen ], running_status_receiver: Receiver[Dispatch], dispatch_identity: Callable[[Dispatch], int] | None = None, - retry_interval: timedelta | None = timedelta(seconds=60), + retry_interval: timedelta = timedelta(seconds=60), ) -> None: """Initialize the dispatch handler. @@ -197,7 +203,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen running_status_receiver: The receiver for dispatch running status changes. dispatch_identity: A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID. - retry_interval: The interval between retries. If `None`, retries are disabled. + retry_interval: The interval between retries. """ super().__init__() self._dispatch_identity: Callable[[Dispatch], int] = ( @@ -211,11 +217,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen name="dispatch_updates_channel", resend_latest=True ) self._updates_sender = self._updates_channel.new_sender() - self._retrier = ( - ActorDispatcher.RetryFailedDispatches(retry_interval) - if retry_interval - else None - ) + self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval) def start(self) -> None: """Start the background service.""" @@ -258,12 +260,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: dispatch.type, exc_info=e, ) - if self._retrier: - self._retrier.retry(dispatch) - else: - _logger.error( - "No retry mechanism enabled, dispatch %r failed", dispatch - ) + self._retrier.retry(dispatch) else: # No exception occurred, so we can add the actor to the list self._actors[identity] = actor @@ -275,15 +272,10 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - actor: Actor | None = None identity = self._dispatch_identity(stopping_dispatch) - actor = self._actors.get(identity) - - if actor: + if actor := self._actors.pop(identity, None): await actor.stop(msg) - - del self._actors[identity] else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type @@ -291,10 +283,7 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: async def _run(self) -> None: """Run the background service.""" - if not self._retrier: - async for dispatch in self._dispatch_rx: - await self._handle_dispatch(dispatch) - else: + async with self._retrier: retry_recv = self._retrier.new_receiver() async for selected in select(retry_recv, self._dispatch_rx):