Skip to content

Commit cfed57d

Browse files
authored
fix: Set a timeout for Actor cleanup (#206)
- closes #200 I could not find the actual reason why the linked run got stuck, but this should make the Actor cleanup more robust as a whole.
1 parent b7e4622 commit cfed57d

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
## [1.7.1](../../releases/tag/v1.7.1) - Unreleased
44

5-
...
5+
### Fixed
6+
7+
- Set a timeout for Actor cleanup
68

79
## [1.7.0](../../releases/tag/v1.7.0) - 2024-03-12
810

src/apify/actor.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import inspect
66
import os
77
import sys
8-
from datetime import datetime, timezone
8+
from datetime import datetime, timedelta, timezone
99
from typing import TYPE_CHECKING, Any, Awaitable, Callable, TypeVar, cast
1010

1111
from apify_client import ApifyClientAsync
@@ -301,6 +301,7 @@ async def exit(
301301
exit_code: int = 0,
302302
event_listeners_timeout_secs: float | None = EVENT_LISTENERS_TIMEOUT_SECS,
303303
status_message: str | None = None,
304+
cleanup_timeout: timedelta = timedelta(seconds=30),
304305
) -> None:
305306
"""Exit the actor instance.
306307
@@ -314,11 +315,13 @@ async def exit(
314315
exit_code (int, optional): The exit code with which the actor should fail (defaults to `0`).
315316
event_listeners_timeout_secs (float, optional): How long should the actor wait for actor event listeners to finish before exiting.
316317
status_message (str, optional): The final status message that the actor should display.
318+
cleanup_timeout (timedelta, optional): How long we should wait for event listeners.
317319
"""
318320
return await cls._get_default_instance().exit(
319321
exit_code=exit_code,
320322
event_listeners_timeout_secs=event_listeners_timeout_secs,
321323
status_message=status_message,
324+
cleanup_timeout=cleanup_timeout,
322325
)
323326

324327
async def _exit_internal(
@@ -327,6 +330,7 @@ async def _exit_internal(
327330
exit_code: int = 0,
328331
event_listeners_timeout_secs: float | None = EVENT_LISTENERS_TIMEOUT_SECS,
329332
status_message: str | None = None,
333+
cleanup_timeout: timedelta = timedelta(seconds=30),
330334
) -> None:
331335
self._raise_if_not_initialized()
332336

@@ -336,21 +340,23 @@ async def _exit_internal(
336340

337341
self.log.info('Exiting actor', extra={'exit_code': exit_code})
338342

339-
await self._cancel_event_emitting_intervals()
343+
async def finalize() -> None:
344+
await self._cancel_event_emitting_intervals()
340345

341-
# Send final persist state event
342-
if not self._was_final_persist_state_emitted:
343-
self._event_manager.emit(ActorEventTypes.PERSIST_STATE, {'isMigrating': False})
344-
self._was_final_persist_state_emitted = True
346+
# Send final persist state event
347+
if not self._was_final_persist_state_emitted:
348+
self._event_manager.emit(ActorEventTypes.PERSIST_STATE, {'isMigrating': False})
349+
self._was_final_persist_state_emitted = True
345350

346-
if status_message is not None:
347-
await self.set_status_message(status_message, is_terminal=True)
351+
if status_message is not None:
352+
await self.set_status_message(status_message, is_terminal=True)
348353

349-
# Sleep for a bit so that the listeners have a chance to trigger
350-
await asyncio.sleep(0.1)
354+
# Sleep for a bit so that the listeners have a chance to trigger
355+
await asyncio.sleep(0.1)
351356

352-
await self._event_manager.close(event_listeners_timeout_secs=event_listeners_timeout_secs)
357+
await self._event_manager.close(event_listeners_timeout_secs=event_listeners_timeout_secs)
353358

359+
await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds())
354360
self._is_initialized = False
355361

356362
if is_running_in_ipython():

0 commit comments

Comments
 (0)