Qwen3 prefix-cache usage in OpenAI responses to enhance observability#299
Qwen3 prefix-cache usage in OpenAI responses to enhance observability#299jackyYang6 wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces tracking and reporting of cached tokens across multiple engines (including DeepSeek, Kimi, Qwen, and Simulated engines) and integrates this with the vLLM frontend. Specifically, a new patch_usage module is added to the frontend to intercept completion endpoints and patch the OpenAI usage response with the engine-provided cached_tokens count. The review feedback highlights two critical memory leaks in this new module: one occurring on non-streaming error responses where failed requests are not cleaned up from the global tracking map, and another during streaming cancellations where early client disconnects bypass the cleanup logic after the generator loop.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| let status = response.status(); | ||
| if !status.is_success() { | ||
| return response; | ||
| } | ||
| let (parts, body) = response.into_parts(); | ||
| let bytes = match to_bytes(body, COMPLETION_USAGE_PATCH_BODY_LIMIT).await { | ||
| Ok(bytes) => bytes, | ||
| Err(error) => { | ||
| return ( | ||
| StatusCode::INTERNAL_SERVER_ERROR, | ||
| Json(ErrorBody { | ||
| error: format!("failed to read completion response body: {error}"), | ||
| }), | ||
| ) | ||
| .into_response(); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Memory Leak on Non-Streaming Error Responses
If the response status is not successful (e.g., 4xx or 5xx), the function currently returns early without removing the request_id from the global cached_tokens_by_request_id map. This causes a memory leak for every failed request.
We can fix this by reading the response body first, attempting to parse the id to clean up the map, and then returning early if the status is not successful.
let status = response.status();
let (parts, body) = response.into_parts();
let bytes = match to_bytes(body, COMPLETION_USAGE_PATCH_BODY_LIMIT).await {
Ok(bytes) => bytes,
Err(error) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorBody {
error: format!("failed to read completion response body: {error}"),
}),
)
.into_response();
}
};
if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
if let Some(request_id) = value.get("id").and_then(serde_json::Value::as_str) {
state.cached_tokens_by_request_id.write().await.remove(request_id);
}
}
if !status.is_success() {
return Response::from_parts(parts, Body::from(bytes));
}| let status = response.status(); | ||
| if !status.is_success() { | ||
| return response; | ||
| } | ||
| let cached_tokens_by_request_id = state.cached_tokens_by_request_id; | ||
| let (mut parts, body) = response.into_parts(); | ||
| parts.headers.remove(CONTENT_LENGTH); | ||
| let stream = body.into_data_stream(); | ||
| let body = Body::from_stream(async_stream::stream! { | ||
| let mut stream = stream; | ||
| let mut request_id_for_cleanup = None; | ||
| while let Some(next) = stream.next().await { | ||
| let Ok(bytes) = next else { | ||
| continue; | ||
| }; | ||
| yield Ok::<Bytes, std::convert::Infallible>(patch_sse_usage_chunk( | ||
| bytes, | ||
| &cached_tokens_by_request_id, | ||
| include_usage, | ||
| &mut request_id_for_cleanup, | ||
| ).await); | ||
| } | ||
| if let Some(request_id) = request_id_for_cleanup { | ||
| cached_tokens_by_request_id.write().await.remove(&request_id); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Memory Leak on Streaming Cancellation / Client Disconnect
In patch_streaming_completion_usage, if the client disconnects mid-stream, the stream is dropped. In Rust, dropping an async_stream::stream! generator drops its local variables but does not resume execution to run any code after the loop (lines 180-182). Thus, the cleanup .remove(&request_id) is bypassed, causing a memory leak.
We can fix this robustly by implementing a Drop guard. Since local variables are guaranteed to be dropped when the generator is dropped, the Drop implementation of the guard will always run even on early stream cancellation.
let status = response.status();
if !status.is_success() {
return response;
}
let cached_tokens_by_request_id = state.cached_tokens_by_request_id;
let (mut parts, body) = response.into_parts();
parts.headers.remove(CONTENT_LENGTH);
let stream = body.into_data_stream();
struct StreamCleanupGuard {
request_id: Option<String>,
map: CachedTokenUsageMap,
}
impl Drop for StreamCleanupGuard {
fn drop(&mut self) {
if let Some(ref id) = self.request_id {
let map = self.map.clone();
let id = id.clone();
tokio::spawn(async move {
map.write().await.remove(&id);
});
}
}
}
let body = Body::from_stream(async_stream::stream! {
let mut stream = stream;
let mut guard = StreamCleanupGuard {
request_id: None,
map: cached_tokens_by_request_id.clone(),
};
while let Some(next) = stream.next().await {
let Ok(bytes) = next else {
continue;
};
yield Ok::<Bytes, std::convert::Infallible>(patch_sse_usage_chunk(
bytes,
&guard.map,
include_usage,
&mut guard.request_id,
).await);
}
});c968f1f to
9d84879
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9d8487980c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if request_id_for_cleanup.is_none() { | ||
| if let Some(request_id) = value.get("id").and_then(serde_json::Value::as_str) { | ||
| *request_id_for_cleanup = Some(request_id.to_string()); |
There was a problem hiding this comment.
Update cleanup id after seeing stream id
For streaming requests that include X-Request-Id or a body request_id, request_id_for_cleanup starts as the fallback chatcmpl-<client-id>, but vLLM still emits and the bridge stores cached-token usage under the actual generated response id. Because this block only records the stream's id when the cleanup id is None, a client disconnect before the usage chunk (or a stream without included usage) drops the cleanup guard while still removing only the fallback key, leaving the actual cached-token entry in CACHED_TOKENS_BY_REQUEST_ID indefinitely for those requests.
Useful? React with 👍 / 👎.
|
Heads up — the root cause is upstream: the vllm rust frontend never fills |
|
Totally agreed. I initially resorted to this response-patch layer precisely because of this upstream limitation, and as a result, the patching code has grown increasingly bloated and ugly. Once the upstream fix lands and we bump our pinned rev, I'll gladly drop the patch layer, rewrite it with clean code, and resubmit. |
Description
Fixes issue #246 by propagating Qwen3 prefix-cache hit information from the model scheduler to OpenAI-compatible API usage fields, so users and benchmark tooling can observe whether a request reused cached prompt tokens.
What changed
TokenEvent::Scheduledwithcached_tokens.PrefillRequestResult.cached_tokensthrough scheduler planning/effects.Scheduledevents before prompt echo, first token, or terminal finish events so frontend usage accounting has the cache metadata in time.cached_tokens: 0, preserving existing behavior for engines without prefix-cache accounting.usage.prompt_tokens_details.cached_tokensPrefillStats.num_cached_tokensPrefillStats.num_local_cached_tokensnum_computed_tokens = prompt_tokens.saturating_sub(cached_tokens)PrefillStatsinto the final OpenAIusageobject.Why
Prefix caching is enabled by default for Qwen3-4B and can significantly reduce warm-request TTFT, but before this change the cache hit count was dropped at the scheduler/frontend boundary. API users, benchmarks, and operators could not tell whether a repeated prompt actually hit the prefix cache because usage always reported zero cached tokens and there was no engine-level cache hit/miss visibility.
This change makes per-request cache reuse visible in OpenAI-compatible usage while keeping cache accounting owned by the Qwen3 executor/scheduler rather than guessing in the frontend.
Usage patch design
The Rust vLLM frontend receives
PrefillStatsfromTokenEvent::Scheduled, but OpenAI response usage did not reliably preserve the cached-token fields in the final response object. To avoid recomputing cache hits in the frontend, this PR adds a small response wrapper inpegainfer-vllm-frontend/src/patch_usage.rs.The wrapper:
TokenEvent::Scheduledis processed;/v1/completionsand/v1/chat/completions;usage.prompt_tokens_details.cached_tokensfield;stream_options.include_usage=true;0if an engine did not report cache metadata.This keeps the frontend as a pass-through/patch layer: it does not infer prefix-cache behavior from prompt length, token ids, or repeated requests. The Qwen3 executor remains the source of truth for
cached_tokens.Engine-level cache info design
Issue #246 also asks for engine-level hit/miss counters logged or exposed for operators. This PR adds a minimal Qwen3-local debug log rather than a new metrics subsystem.
Qwen3 scheduler effects now maintain cumulative prefix-cache stats for the lifetime of the scheduler loop:
total_requestshit_requestsmiss_requestshit_ratetotal_prompt_tokenstotal_cached_tokenstoken_hit_rateThe stats are logged at
debug!after each scheduled observation:The log is debug-level by design: operators can enable it when investigating prefix-cache effectiveness, while default info logs remain quiet under normal serving load. No prompt text, token ids, generated text, or request payloads are logged.
Example startup filter:
Type of Change
Validation
GPU-machine validation performed:
cargo test --release -p pegainfer-qwen3-4b --lib scheduler17 passed; 0 failed/v1/completionscold/warm validation:cached_tokens=0cached_tokens=6752/v1/chat/completionsnon-streaming cold/warm validation after chat route patch:cached_tokens=0cached_tokens=6512stream_options: {"include_usage": true}:/v1/completionsusage chunk includesprompt_tokens_details.cached_tokens/v1/chat/completionsusage chunk includesprompt_tokens_details.cached_tokensLocal/sub-agent validation performed:
cargo test --release -p pegainfer-vllm-frontend --libcargo test --release -p pegainfer-sim --test frontend_e2e cached_tokenscargo clippy --release -p pegainfer-vllm-frontend -p pegainfer-sim --all-targets -- -D warningscargo fmt --checkgit diff --checkSuggested final checks before merge, if not already run on the final clean branch:
Checklist
TokenEvent::Scheduledcarriescached_tokens.Scheduledbefore prompt echo, token, or finish events.cached_tokens: 0./v1/completionsnon-streaming and streaming usage paths are covered./v1/chat/completionsnon-streaming and streaming usage paths are covered.