From 265d28bd5e6fac49ef290445aa2b07306e6e16e3 Mon Sep 17 00:00:00 2001 From: aoshen02 Date: Thu, 28 May 2026 13:48:44 +0000 Subject: [PATCH 1/3] [deployment] fix: harden Modal sandbox lifecycle at high concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three interlocked changes to make ModalDeployment survive a multi-day RL training run at agent_concurrency >= 128. All three are responses to distinct failure modes we hit on a Qwen3-235B SWE-bench campaign across ~1.4M trajectories on Modal. 1) Cap concurrent sandboxes in the STARTING state. The dominant failure mode at high concurrency is "Runtime did not start within 299s" — too many sandboxes simultaneously in Modal's runtime-start pipeline, not the sandbox CREATE rate (Modal confirmed 30/s + 2k burst, vastly above our ~1-2/s). Add an asyncio.Semaphore keyed on a fleet-wide MODAL_MAX_STARTING (default 128), divided by UNIAGENT_NUM_WORKERS to derive each worker's share — same pattern already used for the per-worker agent_loop semaphore. The permit is held from sandbox.create through runtime alive, then released so the long tool-call body does not occupy a permit. 2) Bound a single trajectory's init wall-clock. Pre-patch: max_retries=5 * startup_timeout=300s = up to 25 minutes of one trajectory hoarding a STARTING permit (or pre-patch, hogging the global Modal cold-start budget). Reduce max_retries to 2 and add MODAL_INIT_WALL_BUDGET (default 900s = 15 min) as a hard cap. asyncio.wait_for around each _start() attempt prevents a hung modal.Sandbox.create from blocking past the deadline. When the budget is exhausted the trajectory raises; the outer agent_loop converts it into a reward=0 masked sample. 3) Guarantee Modal sandbox termination on teardown. Observed (round 12, 2026-05-18): self._runtime.close() raised aiohttp.ServerDisconnectedError when the in-sandbox agent server had already torn down its socket. Without try/except, stop() returned early and self._sandbox.terminate.aio() never ran. After thousands of trajectories ~847 sandboxes were leaked on the Modal side, hitting the account's concurrent-sandbox cap and 100% failing new sandbox creates on subsequent runs. Wrap each step; guarantee terminate is attempted (and retried once) even when runtime.close fails. Env-var knobs (all optional, defaults pre-tuned for our 8 trays * agent_concurrency=360 campaign): UNIAGENT_NUM_WORKERS=8 (rollouter worker count) MODAL_MAX_STARTING=128 (fleet-wide STARTING cap) MODAL_INIT_WALL_BUDGET=900 (per-trajectory init seconds) Test plan pytest tests/deployment/test_modal_starting_limiter.py -v 12 tests covering: - per-worker share math (incl. clamp-to-1) - singleton + lazy init - max_retries=2 (not 5) - wall-budget short-circuits subsequent attempts - wait_for cancels a hung _start - serialization across concurrent _start callers - permit released on failure path All 12 passed in 65s. Production validation: ran 6+ consecutive 8-hour rollouter sessions without the "299s runtime not start" cluster-collapse pattern we hit pre-patch; sandbox leak count returned to ~0 (was steady at 50-100/hr). This PR includes AI assistance (Claude Code). Signed-off-by: aoshen02 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../deployment/test_modal_starting_limiter.py | 343 ++++++++++++++++++ uni_agent/deployment/modal/deployment.py | 192 +++++++--- 2 files changed, 480 insertions(+), 55 deletions(-) create mode 100644 tests/deployment/test_modal_starting_limiter.py diff --git a/tests/deployment/test_modal_starting_limiter.py b/tests/deployment/test_modal_starting_limiter.py new file mode 100644 index 00000000..d78fab30 --- /dev/null +++ b/tests/deployment/test_modal_starting_limiter.py @@ -0,0 +1,343 @@ +"""Tests for the Modal cold-start fleet limiter (2026-05-22 Patch). + +Covers: + * `_get_starting_semaphore` derives per-worker permits = MAX_STARTING / NUM_WORKERS, + is lazy, idempotent (singleton), and clamps to >=1. + * `ModalDeployment.start` retry loop respects max_retries=2 (not 5). + * `ModalDeployment.start` wall-clock budget aborts further attempts once + `MODAL_INIT_WALL_BUDGET` is exceeded. + * `asyncio.wait_for` inside the retry loop cancels a hung `_start`. + * The STARTING semaphore actually serializes overlapping `_start` calls and + is released on both success and failure paths. + +We do NOT import or hit real modal.com. We bypass `ModalDeployment.__init__` +(which would invoke `_ImageBuilder.auto`) via `object.__new__` and manually +assign the few attributes the methods under test read. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Callable + +import pytest + +from uni_agent.deployment.modal import deployment as mod +from uni_agent.deployment.modal.deployment import ModalDeployment + +# -------------------- helpers -------------------- + + +def _reset_limiter_state(monkeypatch, *, num_workers=8, max_starting=128, wall_budget=900.0): + """Pin module constants to known values and force semaphore re-init. + + Module-level constants are read at import; we mutate them with + monkeypatch.setattr so they revert after the test. `_STARTING_SEMA` + is the singleton cache -- clear it so the next call rebuilds with + the patched values. + """ + monkeypatch.setattr(mod, "_NUM_WORKERS", num_workers, raising=True) + monkeypatch.setattr(mod, "_MAX_STARTING_GLOBAL", max_starting, raising=True) + monkeypatch.setattr(mod, "_INIT_WALL_BUDGET", wall_budget, raising=True) + monkeypatch.setattr(mod, "_STARTING_SEMA", None, raising=True) + + +def _make_deployment( + start_fn: Callable[[_FakeDeployment], asyncio.Future | None] | None = None, + stop_fn: Callable[[_FakeDeployment], asyncio.Future | None] | None = None, +) -> _FakeDeployment: + """Build a ModalDeployment instance that skips the heavy ImageBuilder + and exposes hookable `_start` / `stop`. + """ + self = object.__new__(_FakeDeployment) + self.logger = logging.getLogger("test-modal-limiter") + self.run_id = "test-run" + self._sandbox = None + self._runtime = None + self._start_calls = 0 + self._stop_calls = 0 + self._concurrent_in_start = 0 + self._max_concurrent_observed = 0 + self._start_fn = start_fn or (lambda d: _ok()) + self._stop_fn = stop_fn or (lambda d: _ok()) + return self + + +async def _ok(): + return None + + +class _FakeDeployment(ModalDeployment): + """ModalDeployment with `_start` and `stop` rewired to user callbacks. + + Crucially, `_start` keeps the production semaphore-acquire wrapping + so we can test serialization. The body just delegates to the test + callback after acquiring the permit. + """ + + async def _start(self): # type: ignore[override] + async with mod._get_starting_semaphore(): + self._start_calls += 1 + self._concurrent_in_start += 1 + self._max_concurrent_observed = max(self._max_concurrent_observed, self._concurrent_in_start) + try: + await self._start_fn(self) + finally: + self._concurrent_in_start -= 1 + + async def stop(self): # type: ignore[override] + self._stop_calls += 1 + await self._stop_fn(self) + + +# -------------------- _get_starting_semaphore -------------------- + + +def test_starting_semaphore_per_worker_share_is_max_over_num_workers(monkeypatch): + _reset_limiter_state(monkeypatch, num_workers=8, max_starting=128) + + async def _check(): + sem = mod._get_starting_semaphore() + # Internal asyncio.Semaphore exposes its initial value via `_value` + # on CPython 3.10+. This is the contract we rely on. + assert sem._value == 16, f"expected 128/8=16, got {sem._value}" + + asyncio.run(_check()) + + +def test_starting_semaphore_clamps_to_one_when_share_would_be_zero(monkeypatch): + # 7 global perms across 8 workers => floor(7/8)=0 -> must clamp to 1 + _reset_limiter_state(monkeypatch, num_workers=8, max_starting=7) + + async def _check(): + sem = mod._get_starting_semaphore() + assert sem._value == 1, f"expected clamp to 1, got {sem._value}" + + asyncio.run(_check()) + + +def test_starting_semaphore_is_singleton(monkeypatch): + _reset_limiter_state(monkeypatch, num_workers=4, max_starting=20) + + async def _check(): + a = mod._get_starting_semaphore() + b = mod._get_starting_semaphore() + assert a is b, "semaphore should be lazily cached" + + asyncio.run(_check()) + + +def test_starting_semaphore_handles_single_worker(monkeypatch): + # Edge case: degenerate 1-worker rollouter + _reset_limiter_state(monkeypatch, num_workers=1, max_starting=64) + + async def _check(): + sem = mod._get_starting_semaphore() + assert sem._value == 64 + + asyncio.run(_check()) + + +# -------------------- start() retry + wall-budget -------------------- + + +def test_start_succeeds_on_first_attempt_does_not_retry(monkeypatch): + _reset_limiter_state(monkeypatch) + dep = _make_deployment() + + async def _go(): + await dep.start() + + asyncio.run(_go()) + assert dep._start_calls == 1 + assert dep._stop_calls == 0 # stop() only called on failure + + +def test_start_retries_once_then_succeeds(monkeypatch): + _reset_limiter_state(monkeypatch) + + attempts = {"n": 0} + + async def flaky(dep): + attempts["n"] += 1 + if attempts["n"] == 1: + raise RuntimeError("simulated transient cold-start failure") + + dep = _make_deployment(start_fn=flaky) + + async def _go(): + await dep.start() + + asyncio.run(_go()) + assert dep._start_calls == 2 + assert dep._stop_calls == 1 # cleanup ran once between attempts + + +def test_start_max_retries_is_two_not_five(monkeypatch): + """The pre-patch loop tried 5 times. The patched loop must give up at 2.""" + _reset_limiter_state(monkeypatch) + + async def always_fail(dep): + raise RuntimeError("never works") + + dep = _make_deployment(start_fn=always_fail) + + async def _go(): + await dep.start() + + with pytest.raises(RuntimeError, match=r"after 2 retries"): + asyncio.run(_go()) + assert dep._start_calls == 2, "must attempt exactly 2 times, not 5" + assert dep._stop_calls == 2 + + +def test_start_wall_budget_aborts_before_attempt_when_exhausted(monkeypatch): + """If wall budget is already negative, must NOT call _start again.""" + # 0.5s budget: first attempt sleeps 0.6s and fails -> second attempt + # should be vetoed by the deadline check (not even invoked). + _reset_limiter_state(monkeypatch, wall_budget=0.5) + + async def slow_fail(dep): + await asyncio.sleep(0.6) + raise RuntimeError("too slow") + + dep = _make_deployment(start_fn=slow_fail) + + async def _go(): + await dep.start() + + t0 = time.monotonic() + with pytest.raises(RuntimeError): + asyncio.run(_go()) + elapsed = time.monotonic() - t0 + assert dep._start_calls == 1, ( + f"second attempt must be skipped after wall budget exhausted, got start_calls={dep._start_calls}" + ) + # Generous upper bound: first attempt 0.6s + sleep gap + cleanup << 3s + assert elapsed < 3.0, f"wall budget should short-circuit, elapsed={elapsed:.2f}s" + + +def test_start_wait_for_cancels_hung_start(monkeypatch): + """A _start that hangs forever must be cancelled by the per-attempt wait_for. + + We give a 0.4s wall budget; the hung _start must be killed within that + bound (plus epsilon) instead of hanging the test forever. + """ + _reset_limiter_state(monkeypatch, wall_budget=0.4) + + async def hang_forever(dep): + await asyncio.sleep(3600) + + dep = _make_deployment(start_fn=hang_forever) + + async def _go(): + await dep.start() + + t0 = time.monotonic() + with pytest.raises(RuntimeError): + asyncio.run(_go()) + elapsed = time.monotonic() - t0 + # wait_for floor is max(60.0, remaining); remaining=0.4s -> floor 60s, + # so wall_budget=0.4 will exhaust before 60s timeout fires. The retry + # loop checks `remaining <= 0` next iteration and exits. + # Verify we don't actually wait the full 60s wait_for floor: that + # depends on Python's asyncio.wait_for behavior; with budget < 60 we + # rely on the OUTER loop's deadline check after attempt 1 to bail. + # NOTE: this test mainly proves no infinite hang. + assert elapsed < 90.0, f"start() must not hang forever, elapsed={elapsed:.1f}s" + + +# -------------------- semaphore serialization -------------------- + + +def test_starting_semaphore_serializes_concurrent_starts(monkeypatch): + """With per-worker permits=2, 6 concurrent _start calls must have + at most 2 inside the critical section at any time. + """ + _reset_limiter_state(monkeypatch, num_workers=4, max_starting=8) # 8/4=2 permits + + # Each _start holds the permit for 0.05s, then succeeds. + async def slow_ok(dep): + await asyncio.sleep(0.05) + + deps = [_make_deployment(start_fn=slow_ok) for _ in range(6)] + + async def _go(): + await asyncio.gather(*[d.start() for d in deps]) + + asyncio.run(_go()) + + # Combine observations across all deps. Each dep's + # _max_concurrent_observed is its OWN local counter (incremented + # before yielding inside the critical section), but the SEMAPHORE + # is shared. To prove the cap, sum: at any moment, the sum of + # _concurrent_in_start across all deps must be <= 2. + # The local _max_concurrent_observed will always be 1 because + # each dep can be inside its own _start at most once. + # Better proof: count how many concurrent deps were active by + # tracking via a shared counter -- next test does that. + assert all(d._start_calls == 1 for d in deps) + + +def test_starting_semaphore_caps_global_in_flight(monkeypatch): + """Stronger version: instrument a SHARED counter to prove the + semaphore really caps the number of `_start` bodies running + simultaneously across multiple ModalDeployment instances. + """ + _reset_limiter_state(monkeypatch, num_workers=4, max_starting=8) # 2 permits + + shared = {"in_flight": 0, "peak": 0} + lock = asyncio.Lock() + + async def track(dep): + async with lock: + shared["in_flight"] += 1 + shared["peak"] = max(shared["peak"], shared["in_flight"]) + await asyncio.sleep(0.03) + async with lock: + shared["in_flight"] -= 1 + + deps = [_make_deployment(start_fn=track) for _ in range(10)] + + async def _go(): + await asyncio.gather(*[d.start() for d in deps]) + + asyncio.run(_go()) + + assert shared["in_flight"] == 0 + assert shared["peak"] <= 2, ( + f"semaphore must cap concurrent _start bodies at 2 (= 8 global / 4 workers), observed peak={shared['peak']}" + ) + assert shared["peak"] >= 1 + + +def test_starting_semaphore_released_on_failure(monkeypatch): + """Permit must be released even when `_start` raises -- otherwise + a chain of failures would slowly leak all permits and deadlock. + """ + _reset_limiter_state(monkeypatch, num_workers=1, max_starting=1) # 1 permit + + fail_first_two = {"n": 0} + + async def flaky(dep): + fail_first_two["n"] += 1 + if fail_first_two["n"] <= 2: + raise RuntimeError("transient") + + # Single deployment: 3 attempts total inside start() retry, but + # max_retries=2 so this would only see 2 attempts. To prove + # release across MULTIPLE deployments we run two back-to-back. + dep_a = _make_deployment(start_fn=flaky) + dep_b = _make_deployment(start_fn=lambda d: _ok()) # must succeed -- permit must be free + + async def _go(): + # dep_a uses 2 attempts then raises -- both must release. + with pytest.raises(RuntimeError): + await dep_a.start() + # dep_b must NOT block: if the single permit leaked, it would hang. + await asyncio.wait_for(dep_b.start(), timeout=2.0) + + asyncio.run(_go()) + assert dep_b._start_calls == 1 diff --git a/uni_agent/deployment/modal/deployment.py b/uni_agent/deployment/modal/deployment.py index 8622f16a..d3a11401 100644 --- a/uni_agent/deployment/modal/deployment.py +++ b/uni_agent/deployment/modal/deployment.py @@ -23,6 +23,39 @@ __all__ = ["ModalDeployment"] +# Modal cold-start fleet limiter. +# +# Mode A (run.log: "Runtime did not start within 299s") is the dominant Modal +# failure mode at agent_concurrency >= 128: too many sandboxes hit the Modal +# region's runtime-start pipeline simultaneously and timeout. The fix is NOT +# to throttle the sandbox CREATE rate (Will gave us 30/s + 2k burst, vastly +# above our actual ~1-2/s) but to cap how many sandboxes are simultaneously +# in the "created but runtime not yet alive" state. +# +# _MAX_STARTING_GLOBAL is the user-facing fleet-wide intent. Because +# asyncio.Semaphore is process-local, we divide by num_workers to derive each +# worker's share -- same pattern as agent_loop.py's per-worker _semaphore. +# +# _INIT_WALL_BUDGET caps a single trajectory's total init wall-clock so a +# stuck trajectory cannot hog a STARTING permit for 30+ minutes (the old +# 5 retries x 300s startup_timeout scenario). +_NUM_WORKERS = int(os.getenv("UNIAGENT_NUM_WORKERS", "8")) +_MAX_STARTING_GLOBAL = int(os.getenv("MODAL_MAX_STARTING", "128")) +_INIT_WALL_BUDGET = float(os.getenv("MODAL_INIT_WALL_BUDGET", "900")) +_STARTING_SEMA: asyncio.Semaphore | None = None + + +def _get_starting_semaphore() -> asyncio.Semaphore: + """Lazy-init STARTING semaphore. Lazy because asyncio.Semaphore must be + constructed inside the running event loop on some Python versions, and we + want the env vars resolved at first use rather than import time.""" + global _STARTING_SEMA + if _STARTING_SEMA is None: + per_worker = max(1, _MAX_STARTING_GLOBAL // _NUM_WORKERS) + _STARTING_SEMA = asyncio.Semaphore(per_worker) + return _STARTING_SEMA + + def _get_modal_user() -> str: # not sure how to get the user from the modal api return modal.config._profile # type: ignore @@ -199,76 +232,125 @@ async def _start(self): if self._app is None: self._app = await modal.App.lookup.aio("swe-rex", create_if_missing=True) - self.logger.info(f"Starting modal sandbox with image {self._image_name}") - self._hooks.on_custom_step("Starting modal sandbox") - t0 = time.time() - token = self._get_token() - self._sandbox = await modal.Sandbox.create.aio( - "/usr/bin/env", - "bash", - "-c", - self._start_swerex_cmd(token), - image=self._image, - timeout=int(self._deployment_timeout), - encrypted_ports=[self._port], - app=self._app, - **self._modal_kwargs, - ) - tunnels = await self._sandbox.tunnels.aio() - tunnel = tunnels[self._port] - elapsed_sandbox_creation = time.time() - t0 - self.logger.info(f"Sandbox ({self._sandbox.object_id}) created in {elapsed_sandbox_creation:.2f}s") - self.logger.info(f"Check sandbox logs at {await self.get_modal_log_url()}") - self.logger.info(f"Sandbox created with id {self._sandbox.object_id}") - await asyncio.sleep(1) - self.logger.info(f"Starting runtime at {tunnel.url}") - self._hooks.on_custom_step("Starting runtime") - runtime_config = RemoteRuntimeConfig( - host=tunnel.url, - timeout=self._runtime_timeout, - auth_token=token, - proxy=self._proxy, - ) - self._runtime = RemoteRuntime.from_config(runtime_config, run_id=self.run_id) - remaining_startup_timeout = max(0, self._startup_timeout - elapsed_sandbox_creation) - t1 = time.time() - await self._wait_until_alive(timeout=remaining_startup_timeout) - await self.runtime.create_session(CreateBashSessionRequest(startup_timeout=60)) - self.logger.info(f"Runtime started in {time.time() - t1:.2f}s") - - async def start(self, max_retries: int = 5): - """Starts the runtime with retry.""" + # Hold the STARTING permit from sandbox.create through runtime alive. + # Release as soon as runtime.create_session returns: tool-call execution + # afterwards is LLM-bound and does not stress Modal's cold-start + # pipeline, so it must not occupy a permit. + async with _get_starting_semaphore(): + self.logger.info(f"Starting modal sandbox with image {self._image_name}") + self._hooks.on_custom_step("Starting modal sandbox") + t0 = time.time() + token = self._get_token() + self._sandbox = await modal.Sandbox.create.aio( + "/usr/bin/env", + "bash", + "-c", + self._start_swerex_cmd(token), + image=self._image, + timeout=int(self._deployment_timeout), + encrypted_ports=[self._port], + app=self._app, + **self._modal_kwargs, + ) + tunnels = await self._sandbox.tunnels.aio() + tunnel = tunnels[self._port] + elapsed_sandbox_creation = time.time() - t0 + self.logger.info(f"Sandbox ({self._sandbox.object_id}) created in {elapsed_sandbox_creation:.2f}s") + self.logger.info(f"Check sandbox logs at {await self.get_modal_log_url()}") + self.logger.info(f"Sandbox created with id {self._sandbox.object_id}") + await asyncio.sleep(1) + self.logger.info(f"Starting runtime at {tunnel.url}") + self._hooks.on_custom_step("Starting runtime") + runtime_config = RemoteRuntimeConfig( + host=tunnel.url, + timeout=self._runtime_timeout, + auth_token=token, + proxy=self._proxy, + ) + self._runtime = RemoteRuntime.from_config(runtime_config, run_id=self.run_id) + remaining_startup_timeout = max(0, self._startup_timeout - elapsed_sandbox_creation) + t1 = time.time() + await self._wait_until_alive(timeout=remaining_startup_timeout) + await self.runtime.create_session(CreateBashSessionRequest(startup_timeout=60)) + self.logger.info(f"Runtime started in {time.time() - t1:.2f}s") + + async def start(self, max_retries: int = 2): + """Starts the runtime with retry, bounded by a wall-clock budget. + + Two changes vs the original 5-retry loop: + * max_retries 5 -> 2: with startup_timeout=300s each, 5 retries + could hold a STARTING permit for ~25 minutes -- starving the + limiter for everyone else. 2 attempts caps that at ~10 min. + * MODAL_INIT_WALL_BUDGET hard cap (default 900s = 15 min): if the + sum of attempts exceeds this, give up early. The trajectory + becomes a reward=0 masked sample (handled in agent_loop.py's + outer except) which is far cheaper than blocking a permit. + """ last_error: Exception | None = None + deadline = time.monotonic() + _INIT_WALL_BUDGET for retry in range(max_retries): + remaining = deadline - time.monotonic() + if remaining <= 0: + self.logger.critical(f"Wall-clock budget {_INIT_WALL_BUDGET}s exhausted before attempt {retry + 1}") + break try: - await self._start() + await asyncio.wait_for(self._start(), timeout=max(60.0, remaining)) return except Exception as exc: last_error = exc self.logger.critical(f"Failed to create modal sandbox: {exc}") - # Best-effort cleanup; never let stop() failures shadow the real start - # error or short-circuit the retry loop. - try: - await self.stop() - except Exception as stop_exc: - self.logger.error(f"Cleanup after failed sandbox start raised: {stop_exc}") - if retry < max_retries - 1: - sleep_time = min(30, 2**retry) + await self.stop() + if retry < max_retries - 1 and time.monotonic() < deadline: + sleep_time = min(10, 2**retry) self.logger.info(f"Retrying modal deployment startup in {sleep_time} seconds...") await asyncio.sleep(sleep_time) - raise RuntimeError(f"Failed to create modal sandbox after {max_retries} retries") from last_error + raise RuntimeError( + f"Failed to create modal sandbox after {max_retries} retries (wall budget {_INIT_WALL_BUDGET}s)" + ) from last_error async def stop(self): - """Stops the runtime.""" + """Stops the runtime. + + Best-effort: each cleanup step is wrapped so a transient failure in one + does NOT skip subsequent steps. Specifically, we must always reach the + Modal sandbox terminate call, otherwise the sandbox lingers on Modal's + side and counts against the account's concurrent-sandbox cap. + + Observed leak (round12, 2026-05-18): `self._runtime.close()` raises + `aiohttp.ServerDisconnectedError` when the agent server side has + already torn down the socket. Without the try/except, the function + returned early and `self._sandbox.terminate.aio()` never ran. After + thousands of trajectories across multiple runs, ~847 sandboxes were + leaked, hitting Modal's account cap and 100% failing new sandbox + creates in subsequent runs. + """ if self._runtime is not None: - await self._runtime.close() + try: + await self._runtime.close() + except Exception as exc: + self.logger.warning(f"runtime.close() swallowed (continuing teardown): {type(exc).__name__}: {exc}") self._runtime = None + + # CRITICAL — must always run to avoid leaking the modal sandbox. if self._sandbox is not None: - exit_code = await self._sandbox.poll.aio() - if exit_code is None: - await self._sandbox.terminate.aio() - self._sandbox = None + try: + exit_code = await self._sandbox.poll.aio() + if exit_code is None: + await self._sandbox.terminate.aio() + except Exception as exc: + self.logger.warning( + f"sandbox poll/terminate first attempt failed: " + f"{type(exc).__name__}: {exc}; retrying terminate once." + ) + try: + await self._sandbox.terminate.aio() + except Exception as exc2: + self.logger.error( + f"sandbox.terminate.aio() retry also failed: {type(exc2).__name__}: {exc2}. Sandbox may leak." + ) + self._sandbox = None + self._app = None @property From 7e7df3010c652555b6c956aec530277f550d6b7d Mon Sep 17 00:00:00 2001 From: yyDing1 Date: Thu, 4 Jun 2026 15:38:37 +0800 Subject: [PATCH 2/3] update --- .../deployment/test_modal_starting_limiter.py | 48 +++++----- uni_agent/deployment/modal/deployment.py | 88 +++++++------------ 2 files changed, 57 insertions(+), 79 deletions(-) diff --git a/tests/deployment/test_modal_starting_limiter.py b/tests/deployment/test_modal_starting_limiter.py index d78fab30..b89b8d5d 100644 --- a/tests/deployment/test_modal_starting_limiter.py +++ b/tests/deployment/test_modal_starting_limiter.py @@ -1,8 +1,8 @@ """Tests for the Modal cold-start fleet limiter (2026-05-22 Patch). Covers: - * `_get_starting_semaphore` derives per-worker permits = MAX_STARTING / NUM_WORKERS, - is lazy, idempotent (singleton), and clamps to >=1. + * `_get_starting_semaphore` reads per-worker permits from + MODAL_MAX_STARTING_PER_WORKER, is lazy, idempotent (singleton), and clamps to >=1. * `ModalDeployment.start` retry loop respects max_retries=2 (not 5). * `ModalDeployment.start` wall-clock budget aborts further attempts once `MODAL_INIT_WALL_BUDGET` is exceeded. @@ -30,17 +30,16 @@ # -------------------- helpers -------------------- -def _reset_limiter_state(monkeypatch, *, num_workers=8, max_starting=128, wall_budget=900.0): - """Pin module constants to known values and force semaphore re-init. +def _reset_limiter_state(monkeypatch, *, per_worker=16, wall_budget=900.0): + """Pin the limiter env vars to known values and force semaphore re-init. - Module-level constants are read at import; we mutate them with - monkeypatch.setattr so they revert after the test. `_STARTING_SEMA` - is the singleton cache -- clear it so the next call rebuilds with - the patched values. + The limiter reads MODAL_MAX_STARTING_PER_WORKER and MODAL_INIT_WALL_BUDGET + lazily (at first use / per call), so we set them via monkeypatch.setenv and + they revert after the test. `_STARTING_SEMA` is the singleton cache -- clear + it so the next call rebuilds with the patched value. """ - monkeypatch.setattr(mod, "_NUM_WORKERS", num_workers, raising=True) - monkeypatch.setattr(mod, "_MAX_STARTING_GLOBAL", max_starting, raising=True) - monkeypatch.setattr(mod, "_INIT_WALL_BUDGET", wall_budget, raising=True) + monkeypatch.setenv("MODAL_MAX_STARTING_PER_WORKER", str(per_worker)) + monkeypatch.setenv("MODAL_INIT_WALL_BUDGET", str(wall_budget)) monkeypatch.setattr(mod, "_STARTING_SEMA", None, raising=True) @@ -95,21 +94,21 @@ async def stop(self): # type: ignore[override] # -------------------- _get_starting_semaphore -------------------- -def test_starting_semaphore_per_worker_share_is_max_over_num_workers(monkeypatch): - _reset_limiter_state(monkeypatch, num_workers=8, max_starting=128) +def test_starting_semaphore_uses_per_worker_env(monkeypatch): + _reset_limiter_state(monkeypatch, per_worker=16) async def _check(): sem = mod._get_starting_semaphore() # Internal asyncio.Semaphore exposes its initial value via `_value` # on CPython 3.10+. This is the contract we rely on. - assert sem._value == 16, f"expected 128/8=16, got {sem._value}" + assert sem._value == 16, f"expected 16, got {sem._value}" asyncio.run(_check()) -def test_starting_semaphore_clamps_to_one_when_share_would_be_zero(monkeypatch): - # 7 global perms across 8 workers => floor(7/8)=0 -> must clamp to 1 - _reset_limiter_state(monkeypatch, num_workers=8, max_starting=7) +def test_starting_semaphore_clamps_to_one(monkeypatch): + # A per-worker value of 0 (or negative) must clamp to >=1 -> no deadlock. + _reset_limiter_state(monkeypatch, per_worker=0) async def _check(): sem = mod._get_starting_semaphore() @@ -119,7 +118,7 @@ async def _check(): def test_starting_semaphore_is_singleton(monkeypatch): - _reset_limiter_state(monkeypatch, num_workers=4, max_starting=20) + _reset_limiter_state(monkeypatch, per_worker=5) async def _check(): a = mod._get_starting_semaphore() @@ -129,9 +128,8 @@ async def _check(): asyncio.run(_check()) -def test_starting_semaphore_handles_single_worker(monkeypatch): - # Edge case: degenerate 1-worker rollouter - _reset_limiter_state(monkeypatch, num_workers=1, max_starting=64) +def test_starting_semaphore_respects_large_value(monkeypatch): + _reset_limiter_state(monkeypatch, per_worker=64) async def _check(): sem = mod._get_starting_semaphore() @@ -256,7 +254,7 @@ def test_starting_semaphore_serializes_concurrent_starts(monkeypatch): """With per-worker permits=2, 6 concurrent _start calls must have at most 2 inside the critical section at any time. """ - _reset_limiter_state(monkeypatch, num_workers=4, max_starting=8) # 8/4=2 permits + _reset_limiter_state(monkeypatch, per_worker=2) # 2 permits # Each _start holds the permit for 0.05s, then succeeds. async def slow_ok(dep): @@ -286,7 +284,7 @@ def test_starting_semaphore_caps_global_in_flight(monkeypatch): semaphore really caps the number of `_start` bodies running simultaneously across multiple ModalDeployment instances. """ - _reset_limiter_state(monkeypatch, num_workers=4, max_starting=8) # 2 permits + _reset_limiter_state(monkeypatch, per_worker=2) # 2 permits shared = {"in_flight": 0, "peak": 0} lock = asyncio.Lock() @@ -308,7 +306,7 @@ async def _go(): assert shared["in_flight"] == 0 assert shared["peak"] <= 2, ( - f"semaphore must cap concurrent _start bodies at 2 (= 8 global / 4 workers), observed peak={shared['peak']}" + f"semaphore must cap concurrent _start bodies at 2 (per-worker permits), observed peak={shared['peak']}" ) assert shared["peak"] >= 1 @@ -317,7 +315,7 @@ def test_starting_semaphore_released_on_failure(monkeypatch): """Permit must be released even when `_start` raises -- otherwise a chain of failures would slowly leak all permits and deadlock. """ - _reset_limiter_state(monkeypatch, num_workers=1, max_starting=1) # 1 permit + _reset_limiter_state(monkeypatch, per_worker=1) # 1 permit fail_first_two = {"n": 0} diff --git a/uni_agent/deployment/modal/deployment.py b/uni_agent/deployment/modal/deployment.py index d3a11401..df79943f 100644 --- a/uni_agent/deployment/modal/deployment.py +++ b/uni_agent/deployment/modal/deployment.py @@ -23,39 +23,34 @@ __all__ = ["ModalDeployment"] -# Modal cold-start fleet limiter. -# -# Mode A (run.log: "Runtime did not start within 299s") is the dominant Modal -# failure mode at agent_concurrency >= 128: too many sandboxes hit the Modal -# region's runtime-start pipeline simultaneously and timeout. The fix is NOT -# to throttle the sandbox CREATE rate (Will gave us 30/s + 2k burst, vastly -# above our actual ~1-2/s) but to cap how many sandboxes are simultaneously -# in the "created but runtime not yet alive" state. -# -# _MAX_STARTING_GLOBAL is the user-facing fleet-wide intent. Because -# asyncio.Semaphore is process-local, we divide by num_workers to derive each -# worker's share -- same pattern as agent_loop.py's per-worker _semaphore. -# -# _INIT_WALL_BUDGET caps a single trajectory's total init wall-clock so a -# stuck trajectory cannot hog a STARTING permit for 30+ minutes (the old -# 5 retries x 300s startup_timeout scenario). -_NUM_WORKERS = int(os.getenv("UNIAGENT_NUM_WORKERS", "8")) -_MAX_STARTING_GLOBAL = int(os.getenv("MODAL_MAX_STARTING", "128")) -_INIT_WALL_BUDGET = float(os.getenv("MODAL_INIT_WALL_BUDGET", "900")) +# Cap how many Modal sandboxes are simultaneously in the "created but runtime +# not yet alive" state; too many at once cause "Runtime did not start" timeouts. +# The semaphore is process-local, so MODAL_MAX_STARTING_PER_WORKER is a +# per-worker cap (size it as fleet-wide target / num rollout workers). +# MODAL_INIT_WALL_BUDGET caps a single trajectory's total init wall-clock. +_DEFAULT_MAX_STARTING_PER_WORKER = 16 +_DEFAULT_INIT_WALL_BUDGET = 900.0 _STARTING_SEMA: asyncio.Semaphore | None = None def _get_starting_semaphore() -> asyncio.Semaphore: - """Lazy-init STARTING semaphore. Lazy because asyncio.Semaphore must be - constructed inside the running event loop on some Python versions, and we - want the env vars resolved at first use rather than import time.""" + """Lazy-init the per-worker STARTING semaphore. + + Lazy so it is built inside the running loop and the env var is read at + first use, not at import (so vars set after import still apply). + """ global _STARTING_SEMA if _STARTING_SEMA is None: - per_worker = max(1, _MAX_STARTING_GLOBAL // _NUM_WORKERS) + per_worker = max(1, int(os.getenv("MODAL_MAX_STARTING_PER_WORKER", str(_DEFAULT_MAX_STARTING_PER_WORKER)))) _STARTING_SEMA = asyncio.Semaphore(per_worker) return _STARTING_SEMA +def _get_init_wall_budget() -> float: + """Per-trajectory init wall-clock budget (seconds), resolved at call time.""" + return float(os.getenv("MODAL_INIT_WALL_BUDGET", str(_DEFAULT_INIT_WALL_BUDGET))) + + def _get_modal_user() -> str: # not sure how to get the user from the modal api return modal.config._profile # type: ignore @@ -232,10 +227,8 @@ async def _start(self): if self._app is None: self._app = await modal.App.lookup.aio("swe-rex", create_if_missing=True) - # Hold the STARTING permit from sandbox.create through runtime alive. - # Release as soon as runtime.create_session returns: tool-call execution - # afterwards is LLM-bound and does not stress Modal's cold-start - # pipeline, so it must not occupy a permit. + # Hold the STARTING permit only through runtime startup; the tool-call + # body afterwards is LLM-bound and must not occupy a permit. async with _get_starting_semaphore(): self.logger.info(f"Starting modal sandbox with image {self._image_name}") self._hooks.on_custom_step("Starting modal sandbox") @@ -275,23 +268,19 @@ async def _start(self): self.logger.info(f"Runtime started in {time.time() - t1:.2f}s") async def start(self, max_retries: int = 2): - """Starts the runtime with retry, bounded by a wall-clock budget. - - Two changes vs the original 5-retry loop: - * max_retries 5 -> 2: with startup_timeout=300s each, 5 retries - could hold a STARTING permit for ~25 minutes -- starving the - limiter for everyone else. 2 attempts caps that at ~10 min. - * MODAL_INIT_WALL_BUDGET hard cap (default 900s = 15 min): if the - sum of attempts exceeds this, give up early. The trajectory - becomes a reward=0 masked sample (handled in agent_loop.py's - outer except) which is far cheaper than blocking a permit. + """Start the runtime with retry, bounded by MODAL_INIT_WALL_BUDGET. + + Few retries + a wall-clock cap stop a stuck trajectory from holding a + STARTING permit for many minutes; on exhaustion it raises and the outer + agent_loop turns it into a reward=0 masked sample. """ last_error: Exception | None = None - deadline = time.monotonic() + _INIT_WALL_BUDGET + wall_budget = _get_init_wall_budget() + deadline = time.monotonic() + wall_budget for retry in range(max_retries): remaining = deadline - time.monotonic() if remaining <= 0: - self.logger.critical(f"Wall-clock budget {_INIT_WALL_BUDGET}s exhausted before attempt {retry + 1}") + self.logger.critical(f"Wall-clock budget {wall_budget}s exhausted before attempt {retry + 1}") break try: await asyncio.wait_for(self._start(), timeout=max(60.0, remaining)) @@ -306,24 +295,15 @@ async def start(self, max_retries: int = 2): await asyncio.sleep(sleep_time) raise RuntimeError( - f"Failed to create modal sandbox after {max_retries} retries (wall budget {_INIT_WALL_BUDGET}s)" + f"Failed to create modal sandbox after {max_retries} retries (wall budget {wall_budget}s)" ) from last_error async def stop(self): - """Stops the runtime. - - Best-effort: each cleanup step is wrapped so a transient failure in one - does NOT skip subsequent steps. Specifically, we must always reach the - Modal sandbox terminate call, otherwise the sandbox lingers on Modal's - side and counts against the account's concurrent-sandbox cap. - - Observed leak (round12, 2026-05-18): `self._runtime.close()` raises - `aiohttp.ServerDisconnectedError` when the agent server side has - already torn down the socket. Without the try/except, the function - returned early and `self._sandbox.terminate.aio()` never ran. After - thousands of trajectories across multiple runs, ~847 sandboxes were - leaked, hitting Modal's account cap and 100% failing new sandbox - creates in subsequent runs. + """Stop the runtime, best-effort. + + Each step is wrapped so a transient failure (e.g. runtime.close raising + when the socket is already gone) never skips sandbox terminate -- a + leaked sandbox counts against the account's concurrent-sandbox cap. """ if self._runtime is not None: try: From ec6134da7c6fabb0ec6b7cdd34b64a285d3220d9 Mon Sep 17 00:00:00 2001 From: yyDing1 Date: Thu, 4 Jun 2026 15:45:03 +0800 Subject: [PATCH 3/3] update --- uni_agent/deployment/modal/deployment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uni_agent/deployment/modal/deployment.py b/uni_agent/deployment/modal/deployment.py index df79943f..7168f132 100644 --- a/uni_agent/deployment/modal/deployment.py +++ b/uni_agent/deployment/modal/deployment.py @@ -28,7 +28,7 @@ # The semaphore is process-local, so MODAL_MAX_STARTING_PER_WORKER is a # per-worker cap (size it as fleet-wide target / num rollout workers). # MODAL_INIT_WALL_BUDGET caps a single trajectory's total init wall-clock. -_DEFAULT_MAX_STARTING_PER_WORKER = 16 +_DEFAULT_MAX_STARTING_PER_WORKER = 8 _DEFAULT_INIT_WALL_BUDGET = 900.0 _STARTING_SEMA: asyncio.Semaphore | None = None