Skip to content

Commit

Permalink
Lift limit on Runner used by ProcessWorker (#17325)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Feb 28, 2025
1 parent cd24d54 commit 27eb408
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"pytz>=2021.1,<2026",
"readchar>=4.0.0,<5.0.0",
"sqlalchemy[asyncio]>=2.0,<3.0.0",
"starlette<0.46.0; python_version == '3.9'",
"typer>=0.12.0,!=0.12.2,<0.16.0",
# Client dependencies
# If you modify this list, make the same modification in client/pyproject.toml
Expand Down
16 changes: 11 additions & 5 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def fast_flow():
)
from prefect.types._datetime import DateTime
from prefect.types.entrypoint import EntrypointType
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import (
asyncnullcontext,
is_async_fn,
Expand Down Expand Up @@ -153,7 +154,7 @@ def __init__(
query_seconds: Optional[float] = None,
prefetch_seconds: float = 10,
heartbeat_seconds: Optional[float] = None,
limit: Optional[int] = None,
limit: int | type[NotSet] | None = NotSet,
pause_on_shutdown: bool = True,
webserver: bool = False,
):
Expand All @@ -169,7 +170,8 @@ def __init__(
heartbeat_seconds: The number of seconds to wait between emitting
flow run heartbeats. The runner will not emit heartbeats if the value is None.
Defaults to `PREFECT_RUNNER_HEARTBEAT_FREQUENCY`.
limit: The maximum number of flow runs this runner should be running at
limit: The maximum number of flow runs this runner should be running at. Provide `None` for no limit.
If not provided, the runner will use the value of `PREFECT_RUNNER_PROCESS_LIMIT`.
pause_on_shutdown: A boolean for whether or not to automatically pause
deployment schedules on shutdown; defaults to `True`
webserver: a boolean flag for whether to start a webserver for this runner
Expand Down Expand Up @@ -210,7 +212,11 @@ def goodbye_flow(name):
self.started: bool = False
self.stopping: bool = False
self.pause_on_shutdown: bool = pause_on_shutdown
self.limit: int | None = limit or settings.runner.process_limit
self.limit: int | None = (
settings.runner.process_limit
if limit is NotSet or isinstance(limit, type)
else limit
)
self.webserver: bool = webserver

self.query_seconds: float = query_seconds or settings.runner.poll_frequency
Expand Down Expand Up @@ -1280,8 +1286,8 @@ def _acquire_limit_slot(self, flow_run_id: UUID) -> bool:
assert self._limiter is not None
self._logger.info(
f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs"
" in progress. You can control this limit by passing a `limit` value"
" to `serve` or adjusting the PREFECT_RUNNER_PROCESS_LIMIT setting."
" in progress. You can control this limit by adjusting the "
"PREFECT_RUNNER_PROCESS_LIMIT setting."
)
return False

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/workers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async def run(
async def __aenter__(self) -> ProcessWorker:
await super().__aenter__()
self._runner = await self._exit_stack.enter_async_context(
Runner(pause_on_shutdown=False)
Runner(pause_on_shutdown=False, limit=None)
)
return self

Expand Down
10 changes: 9 additions & 1 deletion tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ async def test_runner_respects_limit_setting(self):
runner = Runner()
assert runner.limit == 100

async def test_runner_limit_can_be_none(self):
runner = Runner(limit=None)
assert runner.limit is None

# Be extra sure that the limiter is not initialized
assert runner._limiter is None
assert runner._acquire_limit_slot("foobar") is True

async def test_runner_respects_poll_setting(self):
runner = Runner()
assert runner.query_seconds == PREFECT_RUNNER_POLL_FREQUENCY.value()
Expand Down Expand Up @@ -939,7 +947,7 @@ async def test_runner_does_not_emit_heartbeats_for_single_flow_run_if_not_set(
async def test_runner_can_execute_a_single_flow_run(
self, prefect_client: PrefectClient, asserting_events_worker: EventsWorker
):
runner = Runner(heartbeat_seconds=30)
runner = Runner(heartbeat_seconds=30, limit=None)

deployment_id = await (await dummy_flow_1.to_deployment(__file__)).apply()

Expand Down

0 comments on commit 27eb408

Please sign in to comment.