fix(mlx-grpc): per-step admission + own-thread BatchGenerator (eliminates chat-c4 TTFT regression and Stream(gpu,1) crash)#1427
Conversation
….server-style) Three coupled fixes that together eliminate the chat-c4 TTFT regression introduced by PR #1414's drain-and-batch and the agent-c4 cross-thread "no Stream(gpu, 1) in current thread" crash. A. BatchGenerator on the gen thread. mlx-lm's `generation_stream` is allocated by `mx.new_thread_local_stream(...)` and `mx.stream(s)` context is per-thread. The previous design constructed BatchGenerator on the asyncio main thread and then called `next()` from the gen thread; the stream object's per-thread binding didn't follow it, which made `mx.async_eval` continuations later raise RuntimeError: There is no Stream(gpu, 1) in current thread. mid-run (seen in CI run 25195762280, mlx-grpc.log line 15+). The same threading issue is what made concurrent insert-during-decode unsafe in our setup — mlx-lm.server doesn't hit either failure because it runs all mlx state on a single dispatch thread. This change defers BatchGenerator construction into `_generation_loop` itself. The thread-local stream now binds to the gen thread for the lifetime of the process, and every `insert/next/remove` call runs on that same thread. server.py drops the up-front BatchGenerator construction and the standalone `_warmup`; both move to the servicer where they run on the gen thread before signalling readiness via a new `_ready_event` that `serve_grpc` waits on (in an executor) before flipping the health probe to SERVING. B. Per-step admission instead of drain-and-batch. PR #1414's "wait for `_active_uids` to be empty before allowing inserts" gate was a workaround for the threading bug above. Once A is in place, the rope-offset corruption stops being possible (insert and next are serialized by being on the same thread, plus `_gen_lock`), and the drain wait becomes pure latency cost. Latest CI run measured the cost on chat c=4: p50=7s, p90=23s, p99=31s — a bimodal distribution where requests that drift out of phase with the bench's 4-user lockstep wait a full ~3s decode for the previous batch to drain. The new loop mirrors mlx-lm.server's main scheduler: every iteration drains `_pending` (regardless of `_active_uids`), runs `insert(...)`, then advances by exactly one `next()` step. `BatchGenerator.next()` internally pulls from `_unprocessed_sequences` into the prefill batch on each call, so a request that arrived mid-decode gets its prefill started in the very next iteration. Worst-case admission delay is one decode step (~50 ms on M-series) instead of a full batch drain. Local staggered burst test (16 requests at concurrency 4 against gemma-3-4b-it-qat-4bit, max_tokens=64, simulating the genai-bench chat c=4 burst pattern): | metric | drain-and-batch (CI 25195762280) | per-step (this branch) | | p50 | 7,219 ms | 354 ms | | p90 | 23,265 ms | 1,010 ms | | max | 31,243 ms | 1,010 ms | Tail/median ratio drops from 4.3× (bimodal) to 2.9× (smooth); absolute numbers also fall sharply because admission no longer waits for a full decode cycle. C. Drop the redundant `with mx.stream(generation_stream):` wrapper. `BatchGenerator.next()` already wraps itself in `with mx.stream(self._stream):` internally (mlx_lm/generate.py:1847). Our outer wrap was dead code under the old threading model and would have re-entered the same thread-local stream under the new one — drop it. The only `mx.stream` reference left in the file is in the docstring explanation. Validation locally against gemma-3-4b-it-qat-4bit: - c=1: 4.2 s, 66 chunks, no errors - c=4: 9.6 s, 4×66 chunks, no errors - c=8: 24.4 s, 8×66 chunks, no errors - Cancel-mid-stream: cleanup runs, follow-up request finishes in 0.4 s - Staggered burst (above): see numbers in B - Zero `rope`, `Stream(gpu, 1)`, `InvalidStateError` in mlx-grpc.log - Pre-commit (ruff, ruff format, codespell) clean Behavioral notes: - Public API of `MlxEngineServicer.__init__` changed from accepting `batch_generator=...` to accepting `model=`, `completion_batch_size=`, `prefill_batch_size=`. server.py is the only caller. Marked all args keyword-only via `*` to keep call-site unambiguous. - `_pending_lock` nested-inside-`_gen_lock` invariant from PR #1414 is preserved; the per-step loop just enters that nested lock every iteration instead of only when `_active_uids` is empty. - HealthCheck still returns NOT_SERVING when the gen thread hasn't started; the new `_ready_event` is only used as the construction- complete signal, not as a serving gate (server.serve_grpc does that via `wait_ready` before `health_servicer.set_serving()`). Signed-off-by: key4ng <[email protected]>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughStartup no longer constructs BatchGenerator or performs standalone warmup in the gRPC entrypoint. MlxEngineServicer now lazily constructs and warms the BatchGenerator on its generation thread; server starts the gen loop, awaits servicer.wait_ready(), then starts gRPC and sets health SERVING. Shutdown stops the generation loop; generator lifecycle is owned by the servicer. Changes
Sequence Diagram(s)sequenceDiagram
participant Server as "Server\n(startup thread)"
participant Servicer as "MlxEngineServicer\n(generation thread)"
participant BatchGen as "BatchGenerator"
participant GRPC as "gRPC Server"
participant Health as "Health Monitor"
Server->>Servicer: start_generation_loop(model, batch_sizes)
Servicer->>BatchGen: construct lazily on gen thread
Servicer->>Servicer: perform warmup (_warmup)
Servicer-->>Server: signal ready (wait_ready)
Server->>GRPC: start gRPC server
Server->>Health: set SERVING
Note right of Servicer: loop: drain _pending -> insert() -> handle _pending_remove -> next()
Server->>Servicer: shutdown
Servicer->>Servicer: stop generation loop / close generator
Server->>Health: set NOT_SERVING
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 87fda6c57f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| except Exception: | ||
| logger.exception("BatchGenerator construction failed") | ||
| self._ready_event.set() # Unblock waiters; they'll see no batch_generator. | ||
| return |
There was a problem hiding this comment.
Fail startup when BatchGenerator construction fails
If BatchGenerator(...) raises here, the exception is swallowed and _ready_event is still set, so serve_grpc proceeds to start gRPC and mark health SERVING even though the generation thread has already exited. In that state, Generate keeps queuing requests and waiting on uid_future with no thread left to drain _pending, so requests hang instead of failing fast. This regresses startup safety because a fatal init error (e.g. OOM/invalid model state) now becomes a running-but-nonfunctional server.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Resolved in b614d7b: gen thread now sets a _construction_failed flag before _ready_event.set() on the BatchGenerator construction except path; wait_ready() returns False when the flag is set; server.serve_grpc now checks the return value and raises RuntimeError (after stop_generation_loop) instead of proceeding to server.start() + set_serving(). Startup now fails loudly on construction error rather than running-but-nonfunctional.
| # _pending. | ||
| servicer.start_generation_loop() | ||
| loop = asyncio.get_running_loop() | ||
| await loop.run_in_executor(None, servicer.wait_ready) |
There was a problem hiding this comment.
🔴 Important: wait_ready() return value is silently discarded. If BatchGenerator construction fails (line 516–524 in servicer.py), the gen thread logs the exception, sets _ready_event (unblocking this call with True), and exits. The server then proceeds to server.start() + set_serving(), advertising itself as healthy to the router while no gen thread is running. Every Generate RPC that arrives will append to _pending and hang forever — the gen thread that would drain it is dead.
Two fixes needed:
-
Here — check the return value (or check
servicer.batch_generator is not None) and abort startup on failure:await loop.run_in_executor(None, servicer.wait_ready) if servicer.batch_generator is None: raise RuntimeError("BatchGenerator failed to initialize — see gen thread logs")
-
In
wait_ready— consider returningFalsewhen_ready_eventis set butbatch_generator is None, so callers that do check the bool get the right answer.
There was a problem hiding this comment.
Resolved in b614d7b: both fixes you suggested are now in. (1) server.serve_grpc checks the return value of wait_ready and raises if false. (2) wait_ready itself returns False when _construction_failed is set, so callers that check the bool get the right answer.
There was a problem hiding this comment.
Code Review
This pull request refactors the MLX gRPC servicer to adopt a per-step admission model and moves the BatchGenerator construction to the generation thread. This architectural shift ensures that MLX state and thread-local streams remain on a single thread, addressing runtime errors and performance issues. The feedback identifies a redundant boolean check in the generation loop and recommends using a lock during the generator's shutdown to prevent race conditions.
| # Phase 2: advance one step. Skip when there's | ||
| # nothing in flight AND nothing was just inserted — | ||
| # next() on an empty BatchGenerator is wasted work. | ||
| if self._active_uids or inserted_this_iter: |
There was a problem hiding this comment.
The inserted_this_iter check is redundant here. In Phase 1, if batch is not empty and insert() succeeds, the new UIDs are added to self._active_uids (line 597) before Phase 2 begins. Therefore, self._active_uids will already be truthy if something was inserted.
| if self._active_uids or inserted_this_iter: | |
| if self._active_uids: |
There was a problem hiding this comment.
Resolved in b614d7b: inserted_this_iter is gone. The flag was provably redundant — _active_uids.add(uid) runs for every successful insert before the phase-2 guard, so _active_uids is non-empty whenever inserted_this_iter would have been True. The guard is now just if self._active_uids:.
| if self.batch_generator is not None: | ||
| try: | ||
| self.batch_generator.close() | ||
| except Exception: | ||
| logger.warning("BatchGenerator.close raised", exc_info=True) |
There was a problem hiding this comment.
To ensure thread safety during shutdown, it is recommended to hold self._gen_lock while closing the BatchGenerator. This prevents potential race conditions if an in-flight RPC's finally block or an Abort call attempts to call remove() on the generator while it is being closed, even though those calls are wrapped in try-except blocks.
# Shutdown — release wired-limit etc.
with self._gen_lock:
if self.batch_generator is not None:
try:
self.batch_generator.close()
except Exception:
logger.warning("BatchGenerator.close raised", exc_info=True)
finally:
self.batch_generator = NoneReferences
- When using mlx-lm.generate.BatchGenerator with a background generation thread and an async event loop, synchronize access with a threading.Lock to prevent race conditions during operations like insert, remove, or close.
There was a problem hiding this comment.
Resolved in b614d7b: BatchGenerator.close() is now wrapped in with self._gen_lock:, and self.batch_generator is set to None under the lock so any RPC finally / Abort cleanup that races past server.stop(5.0)'s grace period will see None (the existing try/except on .remove() swallows the resulting AttributeError, leaving accounting consistent).
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
grpc_servicer/smg_grpc_servicer/mlx/server.py (1)
150-150: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueRedundant
loopvariable.
loopis already obtained on line 143; this re-assignment is unnecessary.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc_servicer/smg_grpc_servicer/mlx/server.py` at line 150, Remove the redundant reassignment of the asyncio event loop: the variable loop is already obtained earlier, so delete the duplicate statement "loop = asyncio.get_running_loop()" in server.py (the redundant assignment of the variable loop) to avoid unnecessary re-binding; ensure all subsequent uses reference the originally obtained loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/mlx/server.py`:
- Around line 137-147: The code currently ignores servicer.wait_ready()'s
boolean result; update the sequence around servicer.start_generation_loop() /
servicer.wait_ready() so you check the return value (call servicer.wait_ready()
via loop.run_in_executor as now), and if it returns False, fail fast by logging
the failure and not calling await server.start() or
health_servicer.set_serving() (e.g., raise or exit); only proceed to await
server.start() and health_servicer.set_serving() when wait_ready() returns True.
Ensure references to servicer.start_generation_loop(), servicer.wait_ready(),
await server.start(), and health_servicer.set_serving() are the points you
change.
In `@grpc_servicer/smg_grpc_servicer/mlx/servicer.py`:
- Around line 515-534: When BatchGenerator construction fails in the servicer
(the try/except around BatchGenerator in the gen thread), set a new boolean flag
like self._construction_failed = True in the except block before
self._ready_event.set(), and update the wait_ready() method to check and return
False if self._construction_failed is True (otherwise behave as today). Ensure
any other code that reads readiness (e.g., health/status checks) uses
wait_ready() so callers can distinguish a failed construction from a successful
ready state.
---
Outside diff comments:
In `@grpc_servicer/smg_grpc_servicer/mlx/server.py`:
- Line 150: Remove the redundant reassignment of the asyncio event loop: the
variable loop is already obtained earlier, so delete the duplicate statement
"loop = asyncio.get_running_loop()" in server.py (the redundant assignment of
the variable loop) to avoid unnecessary re-binding; ensure all subsequent uses
reference the originally obtained loop.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2cba5dca-4cd1-42ef-878b-48913a40c59a
📒 Files selected for processing (2)
grpc_servicer/smg_grpc_servicer/mlx/server.pygrpc_servicer/smg_grpc_servicer/mlx/servicer.py
- Track BatchGenerator construction failure and surface it via wait_ready(): previously a construction exception was swallowed, _ready_event was set, and serve_grpc proceeded to mark the server SERVING with no gen thread, causing every Generate RPC to hang on _pending. wait_ready() now returns False when _construction_failed is set, and serve_grpc fails startup loudly. Addresses comments from chatgpt-codex (P1), claude (Important), coderabbitai (server.py:147 + servicer.py:534). - Drop the redundant `inserted_this_iter` flag from the per-step loop. A successful insert always populates _active_uids before the phase-2 guard, so `_active_uids or inserted_this_iter` simplifies to `_active_uids`. Addresses gemini-code-assist (servicer.py:612). - Hold _gen_lock while closing BatchGenerator at shutdown, and clear self.batch_generator under the lock, so an RPC finally / Abort cleanup that races past server.stop()'s grace period can't call remove() on a half-closed generator. Addresses gemini-code-assist (servicer.py:655). Local validation against mlx-community/gemma-3-4b-it-qat-4bit: c=1, c=4, c=8, cancel-mid-stream all clean (zero rope / Stream (gpu, 1) / InvalidStateError lines in grpc.log). Signed-off-by: key4ng <[email protected]>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
grpc_servicer/smg_grpc_servicer/mlx/servicer.py (1)
855-867: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueConsider explicit null check for batch_generator during shutdown cleanup.
The
except Exception: passhandler correctly prevents crashes whenbatch_generatorisNoneafter shutdown, but this relies on catchingAttributeErrorfromNone.remove(). An explicit guard would make the shutdown race handling more visible.♻️ Optional: Explicit null check
with self._gen_lock: + if self.batch_generator is None: + # Shutdown already closed the generator + return try: self.batch_generator.remove([uid]) self._active_uids.discard(uid) except Exception: - # Already removed by the gen thread or Abort — + # Already removed by the gen thread or Abort — # in those paths, _active_uids was already # discarded after the successful remove. If the # remove failed for a real backend reason on # *every* path, the uid stays in _active_uids # so drain-and-fill won't insert into a still- # live batch. pass🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc_servicer/smg_grpc_servicer/mlx/servicer.py` around lines 855 - 867, The shutdown cleanup currently relies on catching an exception if self.batch_generator is None; change this to an explicit null check before calling self.batch_generator.remove([uid]) inside the self._gen_lock block: first verify "if self.batch_generator is not None" then call remove and discard uid, and retain a narrow except block only for expected backend/remove errors (or log them) so that the race with shutdown is explicit and easier to reason about; reference the self._gen_lock, self.batch_generator.remove and self._active_uids.discard calls to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@grpc_servicer/smg_grpc_servicer/mlx/servicer.py`:
- Around line 855-867: The shutdown cleanup currently relies on catching an
exception if self.batch_generator is None; change this to an explicit null check
before calling self.batch_generator.remove([uid]) inside the self._gen_lock
block: first verify "if self.batch_generator is not None" then call remove and
discard uid, and retain a narrow except block only for expected backend/remove
errors (or log them) so that the race with shutdown is explicit and easier to
reason about; reference the self._gen_lock, self.batch_generator.remove and
self._active_uids.discard calls to locate the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f060454c-dddc-443a-bc60-47cd9fd04580
📒 Files selected for processing (2)
grpc_servicer/smg_grpc_servicer/mlx/server.pygrpc_servicer/smg_grpc_servicer/mlx/servicer.py
Tightens the single-thread mlx-state invariant introduced earlier in
this PR. The previous commits moved BatchGenerator construction and
`insert/next` calls onto the gen thread, but three cleanup paths
still called `BatchGenerator.remove(...)` directly from the asyncio
main thread:
- Generate's `finally` block (always runs at request exit)
- Generate's `CancelledError` handler (client disconnect / timeout)
- Abort case B (in-flight cancel from the router)
`remove()` does mlx array indexing on internal batch state
(`PromptProcessingBatch.filter`, `GenerationBatch.filter`) which
runs against the calling thread's `mx.stream` context. From the
asyncio main thread that context isn't bound to mlx-lm's
`generation_stream` (a `mx.new_thread_local_stream` that's lazy per
thread), so under burst-with-cancel traffic the gen thread observed
batch state corrupted by these foreign-thread mutations and crashed
with either
RuntimeError: There is no Stream(gpu, 1) in current thread.
(when `mx.async_eval` continuations couldn't find the stream) or
ValueError: [rope] offset must be a scalar or vector with N
elements but has shape (N-k).
(the original PR #1414 rope-shape mismatch, which was a downstream
effect of the same threading bug, not insert-during-decode).
Both modes are exposed by a 16-request c=4 staggered burst (the same
shape genai-bench's chat c=4 cell uses). Pre-fix, the gen log
accumulated hundreds-to-hundreds-of-thousands of errors after the
burst even though the per-request TTFT measurement looked clean
(the test breaks on first chunk, never sees downstream errors).
This commit converts the three event-loop call sites to enqueue
the uid into a new `_pending_remove: list[int]` and lets the gen
thread drain it between phase 1 (insert) and phase 2 (next):
```
with self._pending_lock:
self._pending_remove.append(uid) # event-loop side
# gen thread, every iteration:
with self._pending_lock:
to_remove = self._pending_remove[:]
self._pending_remove.clear()
to_remove = [u for u in to_remove if u in self._active_uids]
if to_remove:
self.batch_generator.remove(to_remove) # only mlx call
for uid in to_remove:
self._active_uids.discard(uid)
```
The `uid in self._active_uids` filter handles the natural-vs-cleanup
race: gen thread removes inline on `finish_reason`, then
Generate.finally queues the same uid; the queued remove finds the
uid already gone and skips silently instead of raising "uid not
found" from `BatchGenerator.remove()`.
Local validation against gemma-3-4b-it-qat-4bit, comparing the same
staggered-burst test reported in the PR description (16 reqs at
c=4, max_tokens=64):
| metric | per-step head (b614d7b) | this commit (queued remove) |
| p50 | 354 ms | 296 ms |
| p90 | 1,010 ms | 304 ms |
| max | 1,010 ms | 304 ms |
| rope / Stream(gpu,1) errors in gen log | hundreds | 0 |
TTFT spread collapses from 2.9× (354→1010) to ~3% (296→304). All
existing tests still pass: c=1 (3.6s), c=4 (11.7s), c=8 (18.5s),
cancel-mid-stream (follow-up at 0.3s). Pre-commit clean.
Class docstring rewritten to spell out the new contract: every mlx
state mutation runs on the gen thread; the event-loop callers
contribute via `_pending` (inserts) and `_pending_remove`
(removes), drained at the top of each iteration.
Signed-off-by: key4ng <[email protected]>
| if to_remove: | ||
| try: | ||
| self.batch_generator.remove(to_remove) | ||
| for uid in to_remove: | ||
| self._active_uids.discard(uid) | ||
| except Exception: | ||
| logger.exception("Failed to drain queued removes: %s", to_remove) |
There was a problem hiding this comment.
🟡 Nit: If batch_generator.remove(to_remove) throws after internally removing some uids, none of them get discarded from _active_uids. Those successfully-removed uids become phantoms — the gen thread keeps calling next() on their behalf (wasting batch capacity), results are silently dropped (queues already popped), and they persist until they naturally finish via finish_reason. Under burst-with-cancel traffic to_remove can contain several uids, so one bad uid poisons cleanup for the rest.
Per-uid removal is more resilient and matches the inline finish-reason pattern at line 693:
| if to_remove: | |
| try: | |
| self.batch_generator.remove(to_remove) | |
| for uid in to_remove: | |
| self._active_uids.discard(uid) | |
| except Exception: | |
| logger.exception("Failed to drain queued removes: %s", to_remove) | |
| for uid in to_remove: | |
| try: | |
| self.batch_generator.remove([uid]) | |
| self._active_uids.discard(uid) | |
| except Exception: | |
| logger.exception("Failed to remove uid %d during queued-remove drain", uid) |
There was a problem hiding this comment.
Resolved in 19f3b6d. Replaced batched batch_generator.remove(to_remove) with a per-uid loop matching the inline finish_reason path at line 693 — partial failure now skips just the bad uid, the rest get correctly removed and discarded from _active_uids. Also added a seen: set[int] for explicit dedup since Abort and Generate.finally both queue the same uid on cancel.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/mlx/servicer.py`:
- Around line 651-663: _pending_remove can contain duplicate uids (enqueued by
Abort() and Generate.finally), so before draining and calling
batch_generator.remove you must deduplicate the list to prevent double-remove
exceptions and ensure _active_uids is updated correctly; inside the block that
copies and clears _pending_remove (the code that currently does to_remove =
self._pending_remove[:] and self._pending_remove.clear()), replace the copy step
with a deduplicated sequence (e.g., preserve order while removing duplicates)
and then filter against _active_uids before calling
self.batch_generator.remove(to_remove); apply the same change to the similar
blocks around the other noted ranges so all uses of _pending_remove are
deduplicated prior to batch_generator.remove.
- Around line 594-643: The race occurs between clearing self._pending and later
publishing self._request_uid_map so Abort() can miss a request; fix by acquiring
self._gen_lock around the entire pending→insert→publish sequence (and take
self._pending_lock inside it) so a request is visible in exactly one place.
Concretely: around the block that currently does "with self._pending_lock: batch
= self._pending[:]; self._pending.clear()" and the later insert() success path
that pops self._pending_by_request_id and sets self._request_uid_map and
uid_future results, acquire self._gen_lock first, then acquire
self._pending_lock to snapshot/clear _pending, perform batch_generator.insert,
and keep both locks held until after you set self._request_uid_map and call
_set_future_result_safe; ensure error handling still pops _pending_by_request_id
under the same locks and uses _loop.call_soon_threadsafe to set exceptions as
before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 736b0e2a-9623-49af-b7b4-d45962676b9a
📒 Files selected for processing (1)
grpc_servicer/smg_grpc_servicer/mlx/servicer.py
Three new bot findings on commit 9f1390a, all valid. 1. Per-uid + dedup queued-remove drain (Claude nit + CodeRabbit Major). The previous batched call to_remove = [u for u in to_remove if u in self._active_uids] try: self.batch_generator.remove(to_remove) for uid in to_remove: self._active_uids.discard(uid) except Exception: logger.exception("Failed to drain queued removes: %s", to_remove) had two issues: - Partial-failure poison: if `batch_generator.remove(...)` raised after internally removing some uids, the discard loop never ran; the successfully-removed uids became phantoms (`_active_uids` held them, gen thread kept calling `next()` on their behalf, queues already popped → tokens silently dropped). - No dedup: `Abort` and `Generate.finally` both queue the same uid on cancel, so `to_remove` could be `[X, X]`. The "uid in _active_uids" filter is a list comprehension and keeps both. mlx-lm's `BatchGenerator.remove(...)` happens to dedup internally via `_find_uids` returning a dict, so the duplicate wasn't producing a crash today, but the comment claiming "duplicate is a no-op" was load-bearing on an undocumented internal contract. Replaced with a per-uid loop that mirrors the inline finish_reason path at line 693, with a `seen: set[int]` to dedup explicitly: seen_remove: set[int] = set() for uid in queued_removes: if uid in seen_remove or uid not in self._active_uids: continue seen_remove.add(uid) try: self.batch_generator.remove([uid]) self._active_uids.discard(uid) except Exception: logger.exception( "Failed to remove uid %d during queued drain", uid ) This diverges from mlx-lm.server's batched pattern (`mlx_lm/server.py:916`), which collects `uids_to_remove` during the iteration and calls `batch_generator.remove(uids_to_remove)` once at the end. mlx-lm's pattern works because it builds the list from a single thread (the gen scheduler) where dedup isn't possible. Our queue is fed by RPC handlers from a different thread, so dedup is mandatory regardless of mlx-lm's choice. 2. Abort under _gen_lock (CodeRabbit Major). The gen thread's pending->inserted transition releases `_pending_lock` between with self._pending_lock: # line 595 for p in batch: self._pending_by_request_id.pop(...) and self._request_uid_map[p.request_id] = uid # line 643 leaving a window in which neither `_pending_by_request_id` nor `_request_uid_map` has the entry. An `Abort` arriving in that window took only `_pending_lock`, found nothing in pending, fell through to `_request_uid_map.pop(...)` (also empty), and silently no-op'd while the request kept decoding to natural completion. `Abort` now wraps both lookups in `_gen_lock`. Holding the gen lock forces Abort to wait for the gen thread's iteration to complete; the pending->inserted transition is then atomic from outside. Cost: Abort blocks for up to one gen-loop iteration (~50 ms on M-series, less for small batches). Abort is rare (router-initiated cancel), so the latency hit is acceptable in exchange for the correctness guarantee. This is conceptually equivalent to mlx-lm.server's `ctx.stop()` pattern (`mlx_lm/server.py:1552` and `_should_stop` at line 219) — the gen thread observes the cancel signal on the next iteration. mlx-lm achieves it via a per-request flag on a shared object because everything runs on one thread; we use lock acquisition because we have an asyncio→gen-thread boundary mlx-lm doesn't. Class docstring updated to list Abort alongside Generate's CancelledError handler as a `_gen_lock`-acquiring site from the event loop, with the "why" inline. Local validation against gemma-3-4b-it-qat-4bit on Apple Silicon: - c=1 (3.8 s) / c=4 (11.5 s) / c=8 (21.6 s): clean, all 66 chunks per request. - Cancel-mid-stream: cancelled request's slot reclaimed immediately; follow-up request finishes in 0.3 s. - Stagger-burst (16 reqs at c=4, max_tokens=64): TTFT distribution flat, all 16 complete, no errors. - Zero `rope` / `Stream(gpu, 1)` / `InvalidStateError` / "Failed to ..." log lines across all of the above. - Pre-commit (ruff, ruff format, codespell) clean. Signed-off-by: key4ng <[email protected]>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/mlx/servicer.py`:
- Around line 297-315: The _warmup method can leave a warmup UID registered if
batch_generator.insert(...) succeeds but batch_generator.next() raises; update
_warmup to always remove any inserted uids by tracking the uids from
batch_generator.insert in a local variable and calling
batch_generator.remove(uids) in a finally block (or in the except before
re-raising/returning) so that regardless of success or exception the warmup
probe is cleaned up; reference the _warmup function, the uids variable,
batch_generator.insert, batch_generator.next, and batch_generator.remove when
applying the change.
- Around line 724-737: When shutting down under self._gen_lock you must also
wake any blocked Generate RPCs: iterate over self._pending (the mapping of uid
-> uid_future) and for each future call
future.set_exception(RuntimeError("generator closed")) or future.cancel() so
callers don't hang, then wake the generator's queue by putting a sentinel into
the queue used by the gen loop (the same queue that uses queue.get(), e.g.
self._request_queue or self._queue) via put_nowait(sentinel) or put_nowait(None)
so blocked queue.get() calls resume and the generator exits; perform these steps
before or inside the finally that sets self.batch_generator = None.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3a6f2d0c-b35c-4785-ad34-d6d31d27b6d0
📒 Files selected for processing (1)
grpc_servicer/smg_grpc_servicer/mlx/servicer.py
Two CodeRabbit Major findings on commit 19f3b6d. 1. Warmup probe cleanup (line 297-315). `_warmup` did try: uids = self.batch_generator.insert(...) for _ in range(10): _, gen_responses = self.batch_generator.next() if any(r.finish_reason ...): break self.batch_generator.remove(uids) # only on happy path logger.info("Warmup complete") except Exception: logger.warning("Warmup failed (non-fatal)", exc_info=True) If `insert()` succeeded but `next()` later threw, the warmup uid leaked inside BatchGenerator before the first real request. Moved `remove(uids)` into a `finally` block guarded by `if uids is not None`. 2. Wake blocked Generate calls on shutdown (line 734-746). The gen loop's exit path closed BatchGenerator under `_gen_lock` but never resolved `uid_future`s sitting in `_pending` or woke per-uid queues that streams were `await queue.get()`-blocked on. Any RPC caught in that window hung until the client deadline or transport cancellation (gRPC default is "no deadline" — would hang forever). Shutdown now (still under `_gen_lock`): - Drains `_pending` + clears the index, then wakes each pending `uid_future` with a `RuntimeError("MlxEngineServicer is shutting down")` exception via `_set_future_exception_safe`. Generate's CancelledError handler (or the bare exception path if the future raised) runs cleanly without backend touch since `batch_generator` will be `None` after this block. - Pulls the per-uid queues, clears `_uid_queues` / `_request_uid_map` / `_active_uids`, then puts `None` (the same sentinel `Abort` uses) on each queue. Generate's stream and non-stream loops both treat `None` as "stop emitting". - Then closes BatchGenerator. This matches mlx-lm.server's shutdown intent — its scheduler loop just exits when `_stop` is set and the per-request queues close because the server tears down — but is more explicit because we have an asyncio→gen-thread boundary mlx-lm doesn't. Local validation against gemma-3-4b-it-qat-4bit on Apple Silicon: - c=1 (3.5 s) / c=4 (11.1 s): clean. - Cancel-mid-stream: cleanup runs, follow-up at 0.3 s. - Shutdown wake: long-running stream + SIGTERM mid-flight; the request unblocks within `server.stop(5.0)`'s grace window rather than hanging until client deadline. - Zero `rope` / `Stream(gpu, 1)` / `InvalidStateError` / "Failed to ..." log lines. - Pre-commit (ruff, ruff format, codespell) clean. Signed-off-by: key4ng <[email protected]>
Prior commits in this PR reduced cross-thread mlx-state mutations to a single channel: a queue of uids that event-loop callers (Generate's finally / CancelledError, Abort) appended to and the gen thread drained. Coupled with the lock-around-Abort fix from 19f3b6d, that closed every known race — but at the cost of two locks (`_gen_lock` + `_pending_lock`) and an Abort RPC that blocked for one decode iteration (~50 ms) waiting on `_gen_lock`. This commit replaces both with the single mechanism `mlx_lm/server.py` uses. mlx-lm flips a `_should_stop` bool on a per-request `GenerationContext`; its scheduler observes it on the next iteration and adds the uid to `uids_to_remove`. We adapt that to our two-thread design with a single shared set: self._aborted_request_ids: set[str] # _pending_lock-guarded Event-loop callers all collapse to one line: Generate.CancelledError → self._aborted_request_ids.add(rid) Generate.finally → self._aborted_request_ids.add(rid) Abort → self._aborted_request_ids.add(rid) The gen thread drains the set at the start of each iteration (between phase 1 insert and phase 2 next) and does ALL the cleanup itself: pending lookup, uid lookup via `_request_uid_map.pop`, queue wake via `None` sentinel, `BatchGenerator.remove([uid])` if still in `_active_uids`. Everything mlx-state runs on one thread. Net effect: * `_gen_lock` is gone. Every mutation of `_active_uids`, `_uid_queues`, `_request_uid_map`, and `BatchGenerator` runs on the gen thread; the only lock is `_pending_lock`, which guards the cross-thread channel (`_pending`, `_pending_by_request_id`, `_aborted_request_ids`). * Abort is non-blocking — one set.add and return. Cleanup lags by at most one decode step (~50 ms), same as mlx-lm.server. * The Abort race CodeRabbit flagged on 19f3b6d (lookup window between gen's pop and set) goes away naturally: Abort records the rid, the gen thread observes it ≥1 iteration later, by which time the prior iteration's transition is fully committed and `_request_uid_map[rid]` is reliably populated. * `_pending_remove` queue (for storing uids) is gone; one set-of-rids replaces it. Generate's `finally` and `CancelledError` blocks shrink from ~20 lines each to two; `Abort` from ~50 lines to ~10. The class docstring is rewritten to describe the new contract by analogy to mlx-lm.server. Local validation against gemma-3-4b-it-qat-4bit on Apple Silicon: - c=1 (3.7 s) / c=4 (11.6 s) / c=8 (22.8 s): clean. - Cancel-mid-stream: cleanup runs, follow-up at 0.3 s. - Stagger-burst (16 reqs, c=4, max_tokens=64): p50=298 ms, p90=315 ms, max=315 ms — TTFT distribution stays flat (matches the lock-based result from 9f1390a within measurement noise). - Cancel-then-immediately-reuse: cancelled stream closes at 0.18 s; follow-up Generate completes in 3.2 s with 66 chunks — slot freed correctly. - Zero `rope` / `Stream(gpu, 1)` / `InvalidStateError` / "Failed to ..." log lines across all of the above. - Pre-commit (ruff, ruff format, codespell) clean. Signed-off-by: key4ng <[email protected]>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: bbdb8680ae
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Description
Problem
PR #1399's nightly bench measured a TTFT regression at chat concurrency=4 against the drain-and-batch design from PR #1414, plus a separate cross-thread mlx-state crash mid-agent_c4. From CI run 25195762280:
404 No available workersafterRuntimeError: There is no Stream(gpu, 1) in current threadinmlx_lm/generate.py:_stepTwo distinct failures, both rooted in the same threading mistake: the previous design constructed
BatchGeneratoron the asyncio main thread but callednext()on a separate gen thread. mlx-lm'sgeneration_streamis allocated bymx.new_thread_local_stream(...)andmx.stream(s)context is per-thread — the stream object's per-thread binding doesn't follow it across threads, somx.async_evalcontinuations later fail to find the stream in the gen-thread context. mlx-lm.server doesn't hit either failure because it runs all mlx state on a single scheduler thread.PR #1414's drain-and-batch (wait for
_active_uidsto be empty before inserting) was a workaround for the rope-offset corruption that surfaced because of the cross-thread mlx state. With the threading mistake fixed, the drain wait becomes pure latency cost, exactly the bimodal pattern visible in CI's chat_c4 column.Solution
Mirror mlx-lm.server's design: keep the gen thread, but make sure all mlx state lives on it.
A. Construct
BatchGeneratoron the gen thread. Defer construction fromserver.serve_grpc(asyncio main thread) into the first lines of_generation_loop.mlx_lm.generate.generation_streamthen lazily binds to the gen thread for the lifetime of the process, andinsert/next/removeall run on the same thread. New_ready_eventletsserve_grpcwait for construction + warmup to finish before flipping the health probe to SERVING.B. Per-step admission instead of drain-and-batch. Once A is in place, mid-decode insert is safe (insert and next are serialized by being on the same thread, plus
_gen_lock). Loop now mirrorsmlx_lm/server.py:_generate: every iteration drains_pending, callsinsert(...), then advances by exactly onenext()step. Worst-case admission delay collapses from a full batch drain (~3 s) to one decode step (~50 ms).C. Drop the redundant outer
with mx.stream(generation_stream):wrapper.BatchGenerator.next()already wraps itself inwith mx.stream(self._stream):internally (mlx_lm/generate.py:1847). Our outer wrap was dead code under the old threading model and would have re-entered the same thread-local stream under the new one.Changes
grpc_servicer/smg_grpc_servicer/mlx/server.py(+/- ~30 lines)BatchGeneratorimport and up-front construction; passmodel+ batch sizes to the servicer instead._warmuphelper (moved into the servicer, runs on the gen thread).serve_grpcnowawaitsservicer.wait_ready(in an executor) beforehealth_servicer.set_serving().grpc_servicer/smg_grpc_servicer/mlx/servicer.py(+/- ~250 lines, mostly comments + docstring rewrite)__init__signature:model=,completion_batch_size=,prefill_batch_size=(replacesbatch_generator=)._ready_eventand publicwait_ready(timeout=None)._generation_loop: phase 0 constructsBatchGenerator+ warms up + signals ready. Main loop:drain pending → insert → next() once → dispatch responsesper iteration, no_active_uidsadmission gate.No proto, gateway, or client changes. Marker for the
Generate.finallyandAbortcleanup paths is unchanged.Test Plan
Local validation on Apple Silicon against
mlx-community/gemma-3-4b-it-qat-4bitwithtarget/ci/smg:c=1: 4.2 s, 66 chunks, no errors.
c=4 (4 concurrent identical agent prompts): 9.6 s, 4 × 66 chunks, no errors.
c=8: 24.4 s, 8 × 66 chunks, no errors. c=4 → c=8 scales sub-linearly, confirming batched prefill still works.
Cancel-mid-stream: cancelled request's uid is reclaimed; follow-up request completes in 0.4 s instead of waiting for the cancelled one to finish naturally.
Staggered burst test (16 requests at concurrency 4, max_tokens=64, simulating genai-bench's chat c=4 burst pattern):
Tail/median ratio: 4.3× (bimodal) → 2.9× (smooth). The full-bench numbers will be tighter on the slower CI runner; this is the apples-to-apples shape change.
Zero
rope,Stream(gpu, 1),InvalidStateErrorerrors inmlx-grpc.logacross all of the above.pre-commit run --files(ruff, ruff format, codespell): clean.End-to-end CI validation will arrive via the next nightly mlx-bench run on PR #1399 (which uses this servicer); the c=4 chat_c4 cell should drop into the same range as
mlx-lm.serverand the agent_c4 cell should produce real numbers instead of crashing.Checklist
cargo +nightly fmtpasses (no Rust files changed)cargo clippy --all-targets --all-features -- -D warningspasses (no Rust files changed)pre-commit run --files grpc_servicer/...passes (ruff, ruff format, codespell)Summary by CodeRabbit
Refactor
Bug Fixes