feat(grpc_servicer): add TokenSpeed servicer (Part 2/3)#1464
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Clean, thorough implementation. Reviewed all 11 files: servicer (Generate streaming/non-streaming/n>1, HealthCheck, Abort, GetModelInfo, GetServerInfo, GetLoads), health servicer, server lifecycle, scheduler launcher, CLI entrypoint, and 57 unit tests.
Key things verified:
- n>1 cancel sweep: CancelledError handler and Abort RPC both correctly walk the expanded
{rid}-n{i}children, preventing orphaned GPU work - Chat-template prefix strip:
_generated_output_idscorrectly slices to the lastcompletion_tokenstokens, removing the Llama-3 assistant header that broke tool-call parsing - Stop-token trim + no_stop_trim: Trailing matched stop is stripped from
output_idsunlessno_stop_trimis set;matched_token_idstill rides in the proto field - Logprob alignment: Cumulative-to-delta slicing in
_convert_output_logprobs_to_protocorrectly handles streaming chunks and stop-token-stripped frames - HasField for optional scalars:
temperature=0(greedy) is correctly forwarded via presence-tracking rather than truthy checks - Warmup lifecycle: Synchronous gRPC client on daemon thread with proper channel cleanup; health stays NOT_SERVING until a complete frame is received
- Graceful shutdown: Drain loop with timeout, then
kill_process_tree(include_parent=False)to reap scheduler children without self-terminating
No bugs, no security concerns, no silent fallbacks. LGTM.
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC servicer for the TokenSpeed inference engine, including health monitoring, subprocess management, and request handling for generation and metadata. Feedback identifies a need to handle zero completion tokens to prevent chat template prefix leakage and recommends updating the health servicer's Watch method to support server-streaming for full gRPC protocol compliance.
| if isinstance(completion, int) and 0 < completion <= len(raw): | ||
| token_ids = raw[-completion:] | ||
| else: | ||
| token_ids = raw |
There was a problem hiding this comment.
When completion_tokens is 0, the current logic falls back to returning the entire raw list of token IDs. Since raw often contains chat template prefix tokens, this fallback will leak those prefix tokens into the response. If completion_tokens is 0, an empty list should be returned. Additionally, ensure that in streaming token generation, the completion_tokens count is reported cumulatively for the entire request to ensure accurate progress reporting.
| if isinstance(completion, int) and 0 < completion <= len(raw): | |
| token_ids = raw[-completion:] | |
| else: | |
| token_ids = raw | |
| if isinstance(completion, int): | |
| token_ids = raw[-completion:] if completion > 0 else [] | |
| else: | |
| token_ids = raw |
References
- In a streaming token generation API, response chunks should report a cumulative count of completion_tokens for the entire request, not just the tokens in the current chunk, to ensure accurate progress reporting.
| async def Watch( | ||
| self, | ||
| request: health_pb2.HealthCheckRequest, | ||
| context: grpc.aio.ServicerContext, | ||
| ) -> AsyncIterator[health_pb2.HealthCheckResponse]: | ||
| # K8s probes use Check, not Watch — we emit the current status once. | ||
| yield await self.Check(request, context) |
There was a problem hiding this comment.
The Watch method implementation does not comply with the gRPC Health Checking Protocol (v1). The protocol requires Watch to be a server-streaming RPC that stays open and yields the current status whenever it changes. The current implementation yields once and then terminates the stream, which may cause issues with clients (like service meshes or load balancers) that rely on the streaming behavior of Watch to track backend health in real-time.
52cda9d to
c3c3c02
Compare
| finish_reason = "stop" | ||
| matched_kwargs: dict[str, Any] = {} | ||
| if reason_dict: | ||
| kind = reason_dict.get("type") | ||
| if kind == "length": | ||
| finish_reason = "length" | ||
| elif kind == "abort": | ||
| finish_reason = "abort" |
There was a problem hiding this comment.
🟡 Nit: The finish_reason mapping is an incomplete allowlist — only "length" and "abort" are recognized; every other type (including any future TokenSpeed additions like "cancelled") silently falls back to "stop". This means the gRPC path would silently misreport a new finish reason while the HTTP path handles it correctly, creating a subtle divergence between the two serving paths.
Consider logging a warning for unrecognized types so this doesn't fail silently:
| finish_reason = "stop" | |
| matched_kwargs: dict[str, Any] = {} | |
| if reason_dict: | |
| kind = reason_dict.get("type") | |
| if kind == "length": | |
| finish_reason = "length" | |
| elif kind == "abort": | |
| finish_reason = "abort" | |
| finish_reason = "stop" | |
| matched_kwargs: dict[str, Any] = {} | |
| if reason_dict: | |
| kind = reason_dict.get("type") | |
| if kind == "length": | |
| finish_reason = "length" | |
| elif kind == "abort": | |
| finish_reason = "abort" | |
| elif kind and kind != "stop": | |
| logger.warning("Unrecognized finish_reason type %r; defaulting to 'stop'", kind) |
| return reason | ||
| to_json = getattr(reason, "to_json", None) | ||
| if callable(to_json): | ||
| result = to_json() |
There was a problem hiding this comment.
🟡 Nit: Removing the try/except wrapper around to_json() changes error-routing semantics. The caller at line 191 catches ValueError and maps it to StatusCode.INVALID_ARGUMENT (user input error). If to_json() internally raises a ValueError, it will now be misclassified as bad user input rather than an internal server error. The previous code deliberately wrapped all to_json() failures in TypeError to guarantee they'd fall through to except Exception → StatusCode.INTERNAL.
The risk is low (a well-behaved to_json() shouldn't raise ValueError), but the original wrapper existed specifically to defend against this mismatch.
788933a to
d16d38f
Compare
8057d10 to
656f1c2
Compare
d16d38f to
3f8983a
Compare
| Mirrors smg_grpc_servicer.vllm / smg_grpc_servicer.sglang. Wraps TokenSpeed's | ||
| AsyncLLM (main-process async frontend) behind the SGLang gRPC service so the | ||
| existing Rust router (which auto-detects the SGLang proto) can route traffic | ||
| to TokenSpeed without needing a new client. | ||
| """ |
There was a problem hiding this comment.
🟡 Nit: This docstring is stale — it describes the opposite of what the implementation does. The servicer does NOT wrap behind "the SGLang gRPC service"; it uses its own tokenspeed.grpc.scheduler.TokenSpeedScheduler proto. The Rust router does NOT "auto-detect the SGLang proto"; DetectBackendStep identifies TokenSpeed natively from the service name. And there IS a new Rust client (TokenSpeedSchedulerClient).
| Mirrors smg_grpc_servicer.vllm / smg_grpc_servicer.sglang. Wraps TokenSpeed's | |
| AsyncLLM (main-process async frontend) behind the SGLang gRPC service so the | |
| existing Rust router (which auto-detects the SGLang proto) can route traffic | |
| to TokenSpeed without needing a new client. | |
| """ | |
| """TokenSpeed gRPC servicer implementation. | |
| Exposes TokenSpeed's AsyncLLM over the dedicated | |
| ``tokenspeed.grpc.scheduler.TokenSpeedScheduler`` gRPC service. | |
| The Rust gateway's ``DetectBackendStep`` identifies TokenSpeed workers | |
| natively from the service name. | |
| """ |
3f8983a to
2ecbbb9
Compare
Adds the Python servicer that runs alongside a TokenSpeed scheduler
process and serves the gRPC protocol PR1 introduced. Includes:
- the async scheduler servicer (Generate/HealthCheck/Abort/
GetModelInfo/GetServerInfo/GetLoads), with cancellation handling
for streaming, non-streaming, channel-close, and n>1 paths
- a health-service bridge that flips SERVING/NOT_SERVING based on
scheduler liveness (deep probe with bounded staleness)
- a scheduler launcher that boots TokenSpeed's AsyncLLM in-process
- the ``python -m smg_grpc_servicer.tokenspeed`` entrypoint
- real ``GetLoads`` plumbing backed by ``AsyncLLM.get_load()`` so
router-side load balancing reflects scheduler-side metrics
- 57 unit tests covering the servicer, health service, proto
conversion, finish reasons, sampling params, streaming/non-
streaming generation, abort/cancel (incl. n>1), model/server
info, and load metrics
This is part 2 of 3 splitting #1351:
- PR1: Rust gRPC + protocol (merged first)
- PR2 (this): Python servicer + unit tests
- PR3: CI workflows + e2e tests
Stacked on PR1 — the servicer imports the proto stubs PR1 generates
from ``crates/grpc_client/proto/tokenspeed_scheduler.proto``.
Fixes a 🔴 critical from review on #1351:
- FakeAsyncLLM.generate_request crashed with
``TypeError: unhashable type: 'list'`` when n>1, because
``_build_generate_req`` rewrites ``rid`` to a list of per-choice
ids. The fake engine now registers state for each child rid, so
``test_cancel_aborts_all_n_children`` exercises the cancel sweep
instead of dying at setup.
Signed-off-by: key4ng <[email protected]>
Trim ~13 lines: collapse the early-returns into a single conditional, drop the inner ``try/except`` around ``to_json()`` (propagating the original exception is more useful than wrapping it), and shorten the docstring. Behavior is unchanged — the same shapes accepted, the same TypeError raised on unknown shapes. Signed-off-by: key4ng <[email protected]>
2ecbbb9 to
6bb18d2
Compare
| load_outputs = await asyncio.wait_for( | ||
| self.async_llm.get_load(), timeout=HEALTH_CHECK_TIMEOUT | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
🔴 Important: except TimeoutError catches builtins.TimeoutError (subclass of OSError), but asyncio.wait_for raises asyncio.TimeoutError which on Python 3.10 is a separate class inheriting from Exception, not from builtins.TimeoutError. Since pyproject.toml declares requires-python = ">=3.10", this handler is dead code on 3.10 — the timeout falls through to the except Exception block below and reports StatusCode.INTERNAL instead of DEADLINE_EXCEEDED.
asyncio.TimeoutError became an alias of builtins.TimeoutError only in Python 3.11 (bpo-45098).
| except TimeoutError: | |
| except (TimeoutError, asyncio.TimeoutError): |
This catches both the builtin and the asyncio variant, working correctly on 3.10+. On 3.11+ it's redundant but harmless.
Upstream tokenspeed renamed the launch flag from ``--model-path`` to ``--model``. Update the docstring example so copy-paste still works. Signed-off-by: key4ng <[email protected]>
Upstream lightseekorg/tokenspeed renamed the model + tokenizer
``ServerArgs`` fields alongside the matching CLI flag renames:
- ``ServerArgs.model_path`` → ``ServerArgs.model``
- ``ServerArgs.tokenizer_path`` → ``ServerArgs.tokenizer``
Both are sources of fields in ``GetModelInfo``, so post-bump that RPC
fails with:
AttributeError: 'ServerArgs' object has no attribute 'model_path'
AttributeError: 'ServerArgs' object has no attribute 'tokenizer_path'
Pick whichever attribute is populated so the servicer works against
both old and new tokenspeed pins:
model_path = getattr(self.server_args, "model", None) or getattr(
self.server_args, "model_path", ""
)
tokenizer_path = getattr(self.server_args, "tokenizer", None) or getattr(
self.server_args, "tokenizer_path", ""
)
The proto fields stay named ``model_path`` / ``tokenizer_path`` because
those are the on-wire contracts the router consumes. 57/57 unit tests
still pass.
Signed-off-by: key4ng <[email protected]>
8583d04 to
a812f5c
Compare
… models When tokenspeed runs with a reasoning parser that has an xgrammar template (e.g. ``gpt-oss`` → ``harmony``), forwarding a raw JSON-schema constraint causes xgrammar to fight the Harmony channel preamble (``<|channel|>analysis<|message|>…``): the model either generates garbage or stalls until ``max_tokens``, leaving ``content`` empty. Mirror tokenspeed's HTTP entrypoint (``serving_chat.py``): when a ``reasoning_parser`` is configured, wrap the user's JSON schema via ``structural_tag_for_reasoning_json_schema()`` so the grammar only activates inside the response channel. Parsers without an xgrammar mapping fall back to the raw json_schema unchanged. Plumbs ``reasoning_parser`` into ``_sampling_params_from_proto`` as a keyword-only argument so the helper stays a static method and existing tests keep passing without modification. The new import of ``tokenspeed.runtime.grammar.reasoning_structural_tag`` is wrapped in ``try/except ImportError`` so stale tokenspeed pins fall back to raw json_schema rather than crashing. Signed-off-by: key4ng <[email protected]>
| wrapped = structural_tag_for_reasoning_json_schema( | ||
| reasoning_parser, json.loads(params.json_schema) | ||
| ) | ||
| except ImportError: | ||
| wrapped = None |
There was a problem hiding this comment.
🟡 Nit: json.loads(params.json_schema) can raise json.JSONDecodeError (a ValueError subclass) if the client sends a malformed schema string, but the except only catches ImportError. This means malformed JSON blows up here when a reasoning parser is configured, while without a parser the same bad string silently passes through as out["json_schema"].
The inconsistency is minor (the caller's except ValueError handler would produce a reasonable INVALID_ARGUMENT gRPC status), but catching JSONDecodeError alongside ImportError would make the fallback path uniform:
| wrapped = structural_tag_for_reasoning_json_schema( | |
| reasoning_parser, json.loads(params.json_schema) | |
| ) | |
| except ImportError: | |
| wrapped = None | |
| wrapped = structural_tag_for_reasoning_json_schema( | |
| reasoning_parser, json.loads(params.json_schema) | |
| ) | |
| except (ImportError, json.JSONDecodeError): | |
| wrapped = None |
Description
Problem
PR #1351's Rust router (Part 1, on
feat/grpc-servicer-tokenspeed) can dial a TokenSpeed worker over the gRPC protocol it defines, but no worker speaks that protocol. We need a Python servicer that runs alongside a TokenSpeed scheduler process and serves the wire types defined in Part 1.Solution
A self-contained TokenSpeed servicer module under
grpc_servicer/smg_grpc_servicer/tokenspeed/, with cancellation handling for streaming/non-streaming, channel-close, andn>1paths, plus 57 unit tests.3-PR Stack
This is part 2 of 3 splitting the original #1351:
mainfeat/grpc-servicer-tokenspeed(= PR1)feat/grpc-tokenspeed-servicerChanges
grpc_servicer/smg_grpc_servicer/tokenspeed/servicer.py— async scheduler servicer (Generate / HealthCheck / Abort / GetModelInfo / GetServerInfo / GetLoads), with cancellation that sweeps every{rid}-n{i}child rid expanded byn>1grpc_servicer/smg_grpc_servicer/tokenspeed/health_servicer.py— health-service bridge that flips SERVING / NOT_SERVING based on bounded-staleness scheduler liveness probesgrpc_servicer/smg_grpc_servicer/tokenspeed/scheduler_launcher.py— boots TokenSpeedAsyncLLMin-processgrpc_servicer/smg_grpc_servicer/tokenspeed/server.pyand__main__.py—python -m smg_grpc_servicer.tokenspeedentrypointGetLoadsreturns realAsyncLLM.get_load()metrics (was a stub returning zeros)grpc_servicer/tests/test_tokenspeed_*.py— 57 unit tests covering proto conversion, finish reasons, sampling params, streaming/non-streaming, abort/cancel (incl.n>1), model/server info, and load metricsTest Plan
pytest grpc_servicer/tests/ -v→ 57 passed in 1.47s, including:test_cancel_calls_abort_request(n=1 cancel path)test_cancel_aborts_all_n_children(n>1 cancel sweep)test_abort_sweeps_n_children(Abort RPC anchored regex)Review Threads from #1351
Addressed in this PR:
grpc_servicer/tests/conftest.py— kept tests with the code under test rather than moving to a follow-up; if you'd still prefer the tests in a separate PR, happy to peel them out.tests/test_tokenspeed_servicer.py—FakeAsyncLLM.generate_requestpreviously crashed withTypeError: unhashable type: 'list'forn>1because_build_generate_reqrewritesridto a list of per-choice ids. The fake engine now registers state for each child rid, sotest_cancel_aborts_all_n_childrenactually exercises the cancel sweep.Checklist
cargo +nightly fmtpasses (no Rust changes)cargo clippy --all-targets --all-features -- -D warningspasses (no Rust changes)