refactor(responses): unify OpenAI + gRPC Responses surfaces on a shared agent-loop driver + sink#1402
refactor(responses): unify OpenAI + gRPC Responses surfaces on a shared agent-loop driver + sink#1402zhaowenzi wants to merge 17 commits into
Conversation
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a unified agent loop for the Responses API, centralizing control flow and tool execution into a shared driver across different surfaces. It also adds a transcript lowering pass to normalize high-level item types for backend processing. Feedback highlights the need for more robust streaming chunk accumulation to handle partial messages and a safety fix for string truncation to prevent panics on UTF-8 boundaries. Further improvements were suggested regarding the removal of non-standard serialization attributes and narrowing the scope of dead code suppression.
3b3f2f6 to
1f36438
Compare
There was a problem hiding this comment.
Actionable comments posted: 15
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/mcp/src/core/orchestrator.rs`:
- Around line 309-320: The legacy render path sometimes skips
apply_error_overrides because callers like call_tool and continue_tool_execution
call ResponseTransformer::transform(...) directly; please centralize rendering
so all code paths use a single helper (e.g., transform_result or
ToolExecutionOutput::to_response_item) that runs apply_error_overrides(&mut
item, self.error_message.as_deref()) for error cases, or alternatively update
ResponseTransformer::transform to call apply_error_overrides when
is_error/error_message is present; ensure special-case behavior from
to_image_generation_call (which intentionally sets Completed for error payloads)
is preserved when refactoring so its semantics remain unchanged.
In `@crates/mcp/src/responses_bridge.rs`:
- Around line 148-150: The current serialization for annotations in
responses_bridge (`annotations: Some(serde_json::json!({ "read_only":
entry.annotations.read_only, }))`) drops destructive, idempotent, and open_world
which breaks consumers and approval logic; update the serialization to emit the
full ToolAnnotations set (include entry.annotations.destructive,
entry.annotations.idempotent, entry.annotations.open_world along with read_only)
so downstream code and should_require_approval() retain the original hints and
behavior.
In `@crates/protocols/src/builders/responses/response.rs`:
- Around line 188-192: The setter conversation_ref currently stores
ConversationRef values verbatim which can reintroduce the old bare-string wire
form; change conversation_ref (in the Response builder) to canonicalize the
incoming ConversationRef into the normalized object form (the same
canonicalization used by copy_from_request) — e.g., if given
ConversationRef::Id(id) convert to the object variant with that id before
assigning to self.conversation, so all responses always use the `{ "id": ... }`
object shape.
In `@model_gateway/src/routers/common/agent_loop/driver.rs`:
- Around line 257-385: The code currently replays every McpApprovalResponse
found in control_items which can re-execute already-handled approvals; change
the logic so you only act on the latest unresolved approval continuation(s):
while building responses overwrite entries keyed by approval_request_id so the
last occurrence wins (instead of pushing all responses), then before creating
PlannedToolExecution check state.transcript for an existing
ResponseInputOutputItem::FunctionToolCall or
ResponseInputOutputItem::FunctionCallOutput whose call_id (derived from
approval_request_id by stripping "mcpr_") matches — if such an item exists treat
the approval as already resolved and skip it; continue to use the same symbols
(control_items, ResponseInputOutputItem::McpApprovalResponse, requests HashMap,
call_id derivation, state.transcript,
ResponseInputOutputItem::FunctionToolCall/FunctionCallOutput,
PlannedToolExecution, state.set_next_action) to locate and modify the code.
In `@model_gateway/src/routers/common/agent_loop/error.rs`:
- Around line 32-36: The match arm in AgentLoopError::into_response currently
maps AgentLoopError::Upstream to error::internal_error (500); change it to
return a 502 Bad Gateway response instead (e.g., call the project’s 502 helper
such as error::bad_gateway("upstream_error", msg)) so upstream failures are
classified as Bad Gateway; keep AgentLoopError::Response as-is for callers that
already provide exact responses.
In `@model_gateway/src/routers/common/agent_loop/presentation.rs`:
- Around line 321-332: The fallback JSON for OutputFamily::McpCall currently
drops the real MCP server identity by hardcoding "server_label": ""—update the
code to preserve the actual server label on errors by retrieving it from the
executed item (e.g., add/use a server_label field on ExecutedCall or accept
server_label as a separate parameter) and put that value into the JSON (replace
the literal "" with executed.server_label or the passed server_label) so the
synthesized mcp_call preserves server identity on the error/serialization
fallback path.
- Around line 292-305: The error branch in presentation.rs (inside the
executed.is_error handling for obj) only nulls "output" but must also clear any
family-specific success fields from transformed_item to avoid stale payloads;
update the executed.is_error block (the obj handling where you set "status" to
"failed" and insert "error") to also remove or set to null keys like "results",
"outputs", "code", "result" (and any other success-specific slots) when present
on obj so that web_search_call / file_search_call / code_interpreter_call /
image_generation_call items cannot leak success data; reference the obj variable
and the executed.is_error branch in your change.
In `@model_gateway/src/routers/common/agent_loop/tooling.rs`:
- Around line 210-216: The truncation in truncate_for_message currently slices
bytes (using &value[..MAX]) which can panic on invalid UTF-8 boundaries; change
it to a non-panicking character-aware truncation: build the truncated string by
taking up to MAX Unicode scalar values (e.g., using
value.chars().take(MAX).collect()) and append the ellipsis when the original
string is longer, ensuring truncate_for_message never panics on malformed UTF-8
or multi-byte characters.
In `@model_gateway/src/routers/grpc/common/responses/agent_loop_sink.rs`:
- Around line 842-857: The streamed mcp_approval_request is using a newly
allocated item id (from allocate_output_index / item_id) which breaks
continuation matching; change the created item's "id" to the canonical
"mcpr_<call_id>" derived from pending.call.id (or pending.call.call_id) instead
of the generated item_id, while still using
allocate_output_index(OutputItemType::McpApprovalRequest) and calling
emit_output_item_added, emit_output_item_done, send, and complete_output_item as
before so the emitted events reference the canonical mcpr_<call_id> identifier
that the driver expects.
In `@model_gateway/src/routers/grpc/common/responses/history.rs`:
- Around line 291-297: The warn! call inside the match that uses
from_value::<ResponseInputOutputItem> logs the full `item`, which may contain
user PII/secrets; change the error logging in the Err(e) branch to avoid dumping
payloads by logging only the `kind`, the deserialization error `e`, and a
non-sensitive identifier (e.g. an item id extracted from `item` or a short
hash/truncated fingerprint) instead of the full `item`, leaving the rest of the
logic (out.push(parsed)) intact; update the warn! invocation in that match arm
to use the safe identifier and error only.
In `@model_gateway/src/routers/grpc/common/responses/streaming.rs`:
- Around line 840-906: The SSE emits reasoning summary part events but
finish_reasoning_item currently writes the accumulated text into content and
leaves summary empty, causing clients to see a referenced summary_index that
doesn't exist; in finish_reasoning_item (and the earlier delta emitter that sets
has_emitted_reasoning_summary_part_added) either remove summary-part events
or—preferably—populate the final item’s "summary" field with the same summary
part text and ensure the "response.reasoning_summary_part.added" / "done" events
carry the actual text (use accumulated_reasoning_text or text variable) so the
JSON in finish_reasoning_item (function finish_reasoning_item, variables
has_emitted_reasoning_summary_part_added, accumulated_reasoning_text, and
send_event) is consistent with the emitted SSE events.
In `@model_gateway/src/routers/grpc/harmony/responses/agent_loop_adapter.rs`:
- Around line 223-229: The adapter is turning missing tool arguments into the
string "{}" which masks truncated/malformed payloads; in the mapping that
constructs LoopToolCall (inside the pending_tool_calls extend over tool_calls)
stop calling unwrap_or_else(|| "{}".to_string()) and instead propagate the
Option by using the original tc.function.arguments (or
tc.function.arguments.clone()) so LoopToolCall.arguments remains None when
arguments were absent; ensure the LoopToolCall.arguments field type matches
Option<String> and update the map to assign tc.function.arguments directly.
In `@model_gateway/src/routers/grpc/harmony/responses/agent_streaming_adapter.rs`:
- Around line 171-223: Remove the direct pushes of assistant text/reasoning into
state.transcript (the ResponseInputOutputItem::new_reasoning and
ResponseInputOutputItem::Message blocks) so that assistant output is not
duplicated; instead only populate state.latest_turn (LoopModelTurn.message_text
and .reasoning_text), update state.pending_tool_calls (LoopToolCall entries) and
self.last_usage as you already do, letting the driver rebuild transcript from
latest_turn and executed calls.
In `@model_gateway/src/routers/grpc/harmony/streaming.rs`:
- Around line 509-513: The code currently calls prefill_stream.mark_completed()
unconditionally after awaiting Self::process_decode_stream(...), which marks the
prefill stream completed even when process_decode_stream returns Err and
suppresses abort-on-drop cleanup; change the logic so you await let result =
Self::process_decode_stream(...).await and only call
prefill_stream.mark_completed() if result.is_ok() (i.e., after a successful
decode), returning result afterwards so errors still trigger the prefill RPC
cleanup/abort behavior; ensure you reference the existing symbols
process_decode_stream and prefill_stream.mark_completed when making the change.
In `@model_gateway/src/routers/grpc/regular/responses/agent_streaming_adapter.rs`:
- Around line 212-223: state.latest_turn is being set from accumulated but drops
the rebuilt assistant text and reasoning (message.content and reasoning_content)
produced by ChatStreamAccumulator; update the LoopModelTurn construction in the
block that sets state.latest_turn (and where self.cached_response =
Some(accumulated) is set) to copy the accumulated.message.content and
accumulated.reasoning_content into the LoopModelTurn's corresponding fields so
streamed assistant preambles/thinking are preserved across iterations (ensure
you use accumulated.message.content and accumulated.reasoning_content when
populating the LoopModelTurn instance).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 18ff948a-7f70-4f6b-a709-856d9e74c93d
📒 Files selected for processing (42)
crates/mcp/src/core/orchestrator.rscrates/mcp/src/core/session.rscrates/mcp/src/responses_bridge.rscrates/mcp/src/transform/types.rscrates/protocols/src/builders/responses/response.rscrates/protocols/src/responses.rscrates/protocols/tests/background_mode_protocol.rse2e_test/responses/test_tools_call.pymodel_gateway/src/routers/common/agent_loop/build_response.rsmodel_gateway/src/routers/common/agent_loop/driver.rsmodel_gateway/src/routers/common/agent_loop/error.rsmodel_gateway/src/routers/common/agent_loop/events.rsmodel_gateway/src/routers/common/agent_loop/mod.rsmodel_gateway/src/routers/common/agent_loop/prepared.rsmodel_gateway/src/routers/common/agent_loop/presentation.rsmodel_gateway/src/routers/common/agent_loop/state.rsmodel_gateway/src/routers/common/agent_loop/tooling.rsmodel_gateway/src/routers/common/mcp_utils.rsmodel_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/transcript_lower.rsmodel_gateway/src/routers/grpc/common/responses/agent_loop_sink.rsmodel_gateway/src/routers/grpc/common/responses/history.rsmodel_gateway/src/routers/grpc/common/responses/mod.rsmodel_gateway/src/routers/grpc/common/responses/streaming.rsmodel_gateway/src/routers/grpc/common/responses/utils.rsmodel_gateway/src/routers/grpc/harmony/responses/agent_loop_adapter.rsmodel_gateway/src/routers/grpc/harmony/responses/agent_streaming_adapter.rsmodel_gateway/src/routers/grpc/harmony/responses/common.rsmodel_gateway/src/routers/grpc/harmony/responses/execution.rsmodel_gateway/src/routers/grpc/harmony/responses/mod.rsmodel_gateway/src/routers/grpc/harmony/responses/non_streaming.rsmodel_gateway/src/routers/grpc/harmony/responses/streaming.rsmodel_gateway/src/routers/grpc/harmony/streaming.rsmodel_gateway/src/routers/grpc/regular/responses/agent_loop_adapter.rsmodel_gateway/src/routers/grpc/regular/responses/agent_streaming_adapter.rsmodel_gateway/src/routers/grpc/regular/responses/common.rsmodel_gateway/src/routers/grpc/regular/responses/conversions.rsmodel_gateway/src/routers/grpc/regular/responses/handlers.rsmodel_gateway/src/routers/grpc/regular/responses/mod.rsmodel_gateway/src/routers/grpc/regular/responses/non_streaming.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rsmodel_gateway/src/routers/openai/mcp/tool_loop.rs
1f36438 to
c742ebf
Compare
Batched fixes from coderabbit + claude bot review on the unified
Responses agent-loop refactor. No behavior changes outside what each
note called out.
- orchestrator: add `ToolExecutionOutput::synthetic_error` so the
malformed-args path renders through the unified to_response_item
pipeline; clear `WebSearchCall.results` on failure to mirror other
family clears in `apply_error_overrides`.
- agent_loop/driver: dedupe `McpApprovalResponse` entries by
`approval_request_id` (first occurrence wins) and skip priming a
call_id whose `FunctionCallOutput` is already present in the
lowered transcript; collapse the seen-set match into a guard arm.
- agent_loop/error: map `Upstream` to 502 bad_gateway instead of 500.
- agent_loop/tooling: route hosted-tool dispatch args through
`prepare_hosted_dispatch_args` at the single executor chokepoint
(lost in the harmony→common move); UTF-8-safe truncate via
`is_char_boundary`; add test_request/test_prepared/test_ctx
helpers for the unit suite.
- grpc/common/agent_loop_sink: emit `mcp_approval_request.id` as the
canonical `mcpr_<call_id>` form aligned with the post-approval
continuation namespace.
- grpc/common/history: stop dumping full item JSON on deserialize
failure (PII in stored input/output); log only kind, id, type, err.
- grpc/common/streaming: add the missing canonical request-echo
fields and always-null response fields to `emit_completed`; rename
reasoning streaming events to the cloud's
`reasoning_summary_text.{delta,done}` keyed on `summary_index`,
and reshape the final reasoning item to `{id, type, summary:[…]}`
(verified against gpt-5-mini × effort × summary matrix).
- grpc/harmony/{adapter,streaming_adapter}: drop `unwrap_or_else(||
\"{}\")` on parsed tool arguments — let the executor's malformed-
args path surface the real failure rather than silently dispatching
with empty args.
- openai/mcp/tool_loop: drop the bespoke
`build_transformed_mcp_call_item` helper; render mcp_call items
through `ToolExecutionOutput::synthetic_error` /
`to_response_item` so the openai passthrough router shares the
same render shape as the gRPC surfaces.
- protocols/builders/responses: normalize `conversation_ref` to
`ConversationRef::Object { id }` matching `copy_from_request` so
the persisted record's conversation echo round-trips through the
`previous_response_id` lookup chain.
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 14
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
model_gateway/src/routers/openai/mcp/tool_loop.rs (1)
209-250:⚠️ Potential issue | 🟠 MajorKeep the raw parse-error text in the resume transcript.
This block correctly routes the client-visible item through
synthetic_error(...).to_response_item(), butstate.record_call(...)now storessynthetic.output.to_string()as thefunction_call_output. That diverges from the non-streaming branch, which records the raw error string, and can feed quoted JSON text back to the next model turn instead of the actual parse failure message.💡 Suggested fix
- let error_output = synthetic.output.clone(); let mut mcp_call_item = to_value(synthetic.to_response_item()).unwrap_or_else(|e| { warn!(tool = %call.name, error = %e, "Failed to convert item to Value"); json!({}) }); @@ state.record_call( session.is_builtin_tool(&call.name), call.call_id, call.name, call.arguments_buffer, - error_output.to_string(), + err_str, mcp_call_item, );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/routers/openai/mcp/tool_loop.rs` around lines 209 - 250, The recorded function_call_output is taking the synthetic response JSON string (error_output.to_string()) which can wrap/quote the parse error; change the call to state.record_call(...) to pass the raw parse error text (err_str or its clone) instead of error_output.to_string() so the resume transcript contains the original parse-error text; locate the synthetic_error(...) / to_response_item() block and replace the function_call_output argument to use err_str (or err_str.clone()) when calling state.record_call.model_gateway/src/routers/grpc/harmony/streaming.rs (1)
873-929:⚠️ Potential issue | 🟠 MajorFinalize the assistant message in the EOF recovery path.
This recovery branch can still return
ToolCallsFoundafter Lines 588-635 already opened and streamed a message item, but the close sequence (text.done/content_part.done/output_item.done) only exists in theCompletearm at Lines 823-859. If the backend ends without aCompleteframe, the client sees a half-open assistant message before the tool lifecycle continues.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/routers/grpc/harmony/streaming.rs` around lines 873 - 929, The EOF recovery path currently emits ToolCallEmissionDone for recovered tool calls but does not finalize the in-progress assistant message like the Complete arm does; update the block that handles accumulated_tool_calls (the code using parser.get_messages, HarmonyParserAdapter::parse_messages, accumulated_tool_calls, final_text_extracted, and sink.emit) to also emit the same finalization events as the Complete branch: emit the text/content part/output completion events (the LoopEvent variants used to signal `text.done`, `content_part.done`, and `output_item.done`) using final_text_extracted and the same call ids so the assistant message is closed before tool lifecycle events continue. Ensure you mirror the ordering and data used in the Complete arm so clients do not see a half-open assistant message.model_gateway/src/routers/grpc/common/responses/streaming.rs (1)
766-774:⚠️ Potential issue | 🟠 MajorPersist only completed items in
finalize().Line 766 currently includes any entry with
item_data, while Line 254 gates onItemStatus::Completed. This can persist partially completed items and diverge from the emitted terminal shape.🔧 Proposed fix
- let output: Vec<ResponseOutputItem> = self - .output_items - .iter() - .filter_map(|item| { - item.item_data - .as_ref() - .and_then(|data| serde_json::from_value(data.clone()).ok()) - }) - .collect(); + let output: Vec<ResponseOutputItem> = self + .output_items + .iter() + .filter(|item| item.status == ItemStatus::Completed) + .filter_map(|item| { + item.item_data + .as_ref() + .and_then(|data| serde_json::from_value(data.clone()).ok()) + }) + .collect();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/routers/grpc/common/responses/streaming.rs` around lines 766 - 774, The finalize() logic is persisting any output_items that have item_data set; change it to only include items whose status is ItemStatus::Completed so the persisted final shape matches the emitted terminal shape. In the block building output: Vec<ResponseOutputItem> from self.output_items, add a check on each item's status (e.g., only proceed when item.status == ItemStatus::Completed or matches!(item.status, ItemStatus::Completed)) before deserializing item.item_data and collecting, referencing self.output_items, ResponseOutputItem, and ItemStatus::Completed to locate and update the code in finalize().
♻️ Duplicate comments (1)
crates/mcp/src/core/orchestrator.rs (1)
1510-1530:⚠️ Potential issue | 🟠 MajorApply
apply_error_overridesintransform_resultas well.
execute_tool_with_approvalandcontinue_tool_executionstill render throughtransform_result, which currently bypasses the new failed-shape override. This can still leak success-shaped builtin tool items on error in these legacy paths.💡 Proposed fix
fn transform_result( result: &CallToolResult, format: &ResponseFormat, tool_call_id: &str, server_label: &str, tool_name: &str, arguments: &str, ) -> ResponseOutputItem { // Convert CallToolResult content to JSON for transformation let result_json = Self::call_result_to_json(result); - - ResponseTransformer::transform( + let mut item = ResponseTransformer::transform( &result_json, format, tool_call_id, server_label, tool_name, arguments, - ) + ); + if result.is_error.unwrap_or(false) { + apply_error_overrides(&mut item, None); + } + item }Based on learnings:
apply_error_overridesincrates/mcp/src/core/orchestrator.rsis the canonical source of failed output shaping across streaming and non-streaming renderers.Also applies to: 1256-1270, 1954-1965
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/mcp/src/core/orchestrator.rs` around lines 1510 - 1530, transform_result currently returns ResponseTransformer::transform(...) directly and thus bypasses the failed-shape logic in apply_error_overrides; update transform_result to run the transformer, then call Self::apply_error_overrides (or Orchestrator::apply_error_overrides) to mutate/replace the transformed ResponseOutputItem using the original CallToolResult (result) so error-shaped outputs are enforced for legacy paths used by execute_tool_with_approval and continue_tool_execution; apply the same change to the other listed occurrences (around the blocks at the other locations) so all non-streaming renderers use apply_error_overrides after ResponseTransformer::transform.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/protocols/src/builders/responses/response.rs`:
- Around line 280-283: The setter tool_choice currently forces every value into
Value::String; change its signature to accept a serde_json::Value (or
generically impl Into<serde_json::Value>) and assign that directly to
self.tool_choice so structured JSON choices are preserved. Update the method
named tool_choice to take tool_choice: impl Into<serde_json::Value> (or
serde_json::Value) and set self.tool_choice = tool_choice.into(); return self as
before. Ensure callers that pass &str or String still work via Into<Value>
conversions.
In `@crates/protocols/src/responses.rs`:
- Around line 3658-3666: The response struct's conversation field must always
serialize to the object form regardless of how ConversationRef was constructed:
add a response-only serializer on the conversation field (e.g.,
#[serde(serialize_with = "serialize_conversation_as_object")]) while keeping the
existing deserialization (or deserialize_with pointing to ConversationRef's
existing deserializer) so bare-string input still parses; implement
serialize_conversation_as_object to accept &ConversationRef and always emit
{"id": "<...>"} (mapping legacy string variant to the Object shape) and update
any tests for responses.rs to assert the canonical object wire form.
In `@model_gateway/src/routers/common/agent_loop/build_response.rs`:
- Around line 168-176: The reasoning output is being built with the legacy shape
using ResponseOutputItem::new_reasoning and
ResponseReasoningContent::ReasoningText (including content and status); change
it to emit the canonical summary shape by constructing the final
ResponseOutputItem with a summary field containing a single summary object
(e.g., a SummaryText variant holding analysis_text) and omit the content/status
fields — locate the code that checks analysis (the block using analysis_text and
request_id and calling ResponseOutputItem::new_reasoning) and replace that
construction with the summary-based form so the terminal reasoning item matches
the Responses wire shape.
In `@model_gateway/src/routers/common/agent_loop/driver.rs`:
- Around line 306-323: The dedup logic builds a snapshot HashSet
(resolved_call_ids) from state.transcript but never updates it while iterating
responses, so multiple approval responses that normalize to the same derived
call id (derived_call_id) can still schedule duplicate PlannedToolExecution
entries; update the check inside the loop (the block handling responses leading
to prime_pending_from_approval) to also insert the derived_call_id into
resolved_call_ids immediately after you accept/plan a call (or skip it),
ensuring you skip any subsequent responses that normalize to the same id;
specifically modify the loop that iterates responses (uses derived_call_id,
resolved_call_ids, and populates planned/prime_pending_from_approval) to
atomically mark a derived_call_id as resolved as soon as it’s used so duplicate
approvals in the same pass are ignored.
In `@model_gateway/src/routers/common/agent_loop/presentation.rs`:
- Around line 292-305: The code currently inserts an "error" field for any
failed executed item, but the protocol's ResponseOutputItem only defines error
for the mcp_call variant; modify the renderer to only emit the "error" field for
families that support it (e.g., when the item's family/variant equals
"mcp_call") and avoid adding "error" for web_search_call, code_interpreter_call,
file_search_call, image_generation_call; update the failure branch around
executed/is_error (and the similar block at 333-362) to conditionally insert
"error" based on the item's family/variant and still clear "output" as before.
In `@model_gateway/src/routers/common/mcp_utils.rs`:
- Around line 840-886: Update the test
test_inject_mcp_output_items_prepends_pre_rendered_list_tools to assert the
prepend order explicitly: after calling inject_mcp_output_items (and collecting
labels as you already do), add an assertion that the first output item is an
McpListTools entry (i.e., labels[0] == "deepwiki" or equivalent) to lock the
contract that inject_mcp_output_items prepends pre-rendered mcp_list_tools;
reference the test function name and the inject_mcp_output_items helper when
making the change.
In `@model_gateway/src/routers/common/transcript_lower.rs`:
- Around line 90-92: The current construction of output_text uses
output.or_else(...) which treats Some("") as a valid output and hides error;
update the logic in transcript_lower.rs where output_text is built (around the
variables output, error and the ResponseInputOutputItem::FunctionCallOutput
creation) to prefer a non-empty output string — e.g., treat empty output as None
(filter out empty strings) and only fall back to error.map(|e| format!("Tool
call failed: {e}")) when output is missing or empty so failure context is
preserved.
In `@model_gateway/src/routers/grpc/common/responses/agent_loop_sink.rs`:
- Around line 199-205: process_chat_chunk currently always calls
self.emitter.process_chunk(...) and ignores its Result, which bypasses the
sink’s disconnect latch; update process_chat_chunk to first check the sink’s
disconnect latch (e.g. self.disconnect_latch or self.disconnected) and only
attempt emitter.process_chunk when not disconnected, and then handle the Result
from self.emitter.process_chunk instead of dropping it—on Err set/mark the
disconnect latch (prevent further sends) and return early so retries stop and
the no-op disconnect contract is honored.
In `@model_gateway/src/routers/grpc/regular/responses/agent_streaming_adapter.rs`:
- Around line 309-320: The loop currently treats each transport chunk from
into_data_stream()/stream.next() as a full SSE frame, causing partial or
combined JSON to be misparsed; change it to accumulate chunks into a persistent
buffer, split the buffer on SSE frame boundaries (double newline "\n\n" or
"\r\n\r\n"), iterate over all complete frames found in the buffer and parse each
"data: " line (handling multiple data lines per frame), remove processed bytes
from the buffer, and only break on a frame whose data equals "[DONE]"; keep
using AgentLoopError::Upstream for stream read errors and
serde_json::from_str::<ChatCompletionStreamResponse> for parsing each complete
JSON payload so partial JSON across chunks is correctly reassembled before
parsing.
- Around line 238-245: The persisted Usage drops completion_tokens_details
causing mismatch with the final streamed payload; update the construction of
usage_for_persist (the map over self.last_usage that builds a Usage) to also
include u.completion_tokens_details.clone() (i.e., preserve
completion_tokens_details) so the Usage passed to sink.emitter.finalize(...)
matches the usage_json/streamed reasoning_tokens details and subsequent
previous_response_id reads.
- Around line 318-334: The loop currently forwards any non-chunk SSE payload to
the client and then treats the request as a success by returning
Ok(accumulator.finalize()); change this so non-chunk terminal/error payloads
become an upstream error: when
serde_json::from_str::<ChatCompletionStreamResponse>(json_str) fails, inspect
the original event/json_str for terminal/error indicators (e.g., contains
"event: error", contains an "error" field or other upstream terminal marker) and
if found return Err(AgentLoopError::Upstream(format!("{event}"))) instead of
calling accumulator.finalize(); otherwise keep the existing passthrough to
sink.tx for benign non-chunk messages. Update the branch where deserialization
fails (around ChatCompletionStreamResponse handling, accumulator.process_chunk,
sink.process_chat_chunk, and the sink.tx send) to implement this logic.
In `@model_gateway/src/routers/grpc/regular/responses/handlers.rs`:
- Around line 171-175: The SSE error payload sent in the Err branch currently
only emits {"status":...} via the tx.send(Ok(Bytes::from(format!(...)))) call;
update that format string in the Err(e) handler so it matches the Harmony
streaming shape by emitting {"error":"agent_loop_failed","status":<status>}
(preserve the "event: error\n" wrapper and continue using
e.into_response().status().as_u16() for the status value) so /v1/responses
clients receive the same adapter-agnostic error body as
model_gateway/src/routers/grpc/harmony/responses/streaming.rs.
---
Outside diff comments:
In `@model_gateway/src/routers/grpc/common/responses/streaming.rs`:
- Around line 766-774: The finalize() logic is persisting any output_items that
have item_data set; change it to only include items whose status is
ItemStatus::Completed so the persisted final shape matches the emitted terminal
shape. In the block building output: Vec<ResponseOutputItem> from
self.output_items, add a check on each item's status (e.g., only proceed when
item.status == ItemStatus::Completed or matches!(item.status,
ItemStatus::Completed)) before deserializing item.item_data and collecting,
referencing self.output_items, ResponseOutputItem, and ItemStatus::Completed to
locate and update the code in finalize().
In `@model_gateway/src/routers/grpc/harmony/streaming.rs`:
- Around line 873-929: The EOF recovery path currently emits
ToolCallEmissionDone for recovered tool calls but does not finalize the
in-progress assistant message like the Complete arm does; update the block that
handles accumulated_tool_calls (the code using parser.get_messages,
HarmonyParserAdapter::parse_messages, accumulated_tool_calls,
final_text_extracted, and sink.emit) to also emit the same finalization events
as the Complete branch: emit the text/content part/output completion events (the
LoopEvent variants used to signal `text.done`, `content_part.done`, and
`output_item.done`) using final_text_extracted and the same call ids so the
assistant message is closed before tool lifecycle events continue. Ensure you
mirror the ordering and data used in the Complete arm so clients do not see a
half-open assistant message.
In `@model_gateway/src/routers/openai/mcp/tool_loop.rs`:
- Around line 209-250: The recorded function_call_output is taking the synthetic
response JSON string (error_output.to_string()) which can wrap/quote the parse
error; change the call to state.record_call(...) to pass the raw parse error
text (err_str or its clone) instead of error_output.to_string() so the resume
transcript contains the original parse-error text; locate the
synthetic_error(...) / to_response_item() block and replace the
function_call_output argument to use err_str (or err_str.clone()) when calling
state.record_call.
---
Duplicate comments:
In `@crates/mcp/src/core/orchestrator.rs`:
- Around line 1510-1530: transform_result currently returns
ResponseTransformer::transform(...) directly and thus bypasses the failed-shape
logic in apply_error_overrides; update transform_result to run the transformer,
then call Self::apply_error_overrides (or Orchestrator::apply_error_overrides)
to mutate/replace the transformed ResponseOutputItem using the original
CallToolResult (result) so error-shaped outputs are enforced for legacy paths
used by execute_tool_with_approval and continue_tool_execution; apply the same
change to the other listed occurrences (around the blocks at the other
locations) so all non-streaming renderers use apply_error_overrides after
ResponseTransformer::transform.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ae7f1f25-95dc-4845-99e6-41b9ef0bbab3
📒 Files selected for processing (42)
crates/mcp/src/core/orchestrator.rscrates/mcp/src/core/session.rscrates/mcp/src/responses_bridge.rscrates/mcp/src/transform/types.rscrates/protocols/src/builders/responses/response.rscrates/protocols/src/responses.rscrates/protocols/tests/background_mode_protocol.rse2e_test/responses/test_tools_call.pymodel_gateway/src/routers/common/agent_loop/build_response.rsmodel_gateway/src/routers/common/agent_loop/driver.rsmodel_gateway/src/routers/common/agent_loop/error.rsmodel_gateway/src/routers/common/agent_loop/events.rsmodel_gateway/src/routers/common/agent_loop/mod.rsmodel_gateway/src/routers/common/agent_loop/prepared.rsmodel_gateway/src/routers/common/agent_loop/presentation.rsmodel_gateway/src/routers/common/agent_loop/state.rsmodel_gateway/src/routers/common/agent_loop/tooling.rsmodel_gateway/src/routers/common/mcp_utils.rsmodel_gateway/src/routers/common/mod.rsmodel_gateway/src/routers/common/transcript_lower.rsmodel_gateway/src/routers/grpc/common/responses/agent_loop_sink.rsmodel_gateway/src/routers/grpc/common/responses/history.rsmodel_gateway/src/routers/grpc/common/responses/mod.rsmodel_gateway/src/routers/grpc/common/responses/streaming.rsmodel_gateway/src/routers/grpc/common/responses/utils.rsmodel_gateway/src/routers/grpc/harmony/responses/agent_loop_adapter.rsmodel_gateway/src/routers/grpc/harmony/responses/agent_streaming_adapter.rsmodel_gateway/src/routers/grpc/harmony/responses/common.rsmodel_gateway/src/routers/grpc/harmony/responses/execution.rsmodel_gateway/src/routers/grpc/harmony/responses/mod.rsmodel_gateway/src/routers/grpc/harmony/responses/non_streaming.rsmodel_gateway/src/routers/grpc/harmony/responses/streaming.rsmodel_gateway/src/routers/grpc/harmony/streaming.rsmodel_gateway/src/routers/grpc/regular/responses/agent_loop_adapter.rsmodel_gateway/src/routers/grpc/regular/responses/agent_streaming_adapter.rsmodel_gateway/src/routers/grpc/regular/responses/common.rsmodel_gateway/src/routers/grpc/regular/responses/conversions.rsmodel_gateway/src/routers/grpc/regular/responses/handlers.rsmodel_gateway/src/routers/grpc/regular/responses/mod.rsmodel_gateway/src/routers/grpc/regular/responses/non_streaming.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rsmodel_gateway/src/routers/openai/mcp/tool_loop.rs
Second batch of review fixes on the unified Responses agent-loop
PR, focused on the OpenAI router migration regressions and the
remaining coderabbit / claude bot threads.
Production fixes from review feedback:
- presentation::render_final_item only stamps `error` for
`OutputFamily::McpCall`; hosted-builtin variants
(web_search_call / code_interpreter_call / file_search_call /
image_generation_call) carry no `error` slot in the protocol layer
and now convey failure through `status: "failed"` alone, matching
the typed `ResponseOutputItem::*` shapes
- transcript_lower: `output.filter(|s| !s.is_empty())` before falling
through to error projection so `Some("")` no longer suppresses the
populated `error` context on a failed mcp_call
- agent_loop_sink::process_chat_chunk now honors the disconnected
latch and flips it on the first failed `process_chunk` send,
aligning with the pattern every other emit_* path uses
- regular streaming adapter: `usage_for_persist` carries
`completion_tokens_details: { reasoning_tokens: ... }` instead of
None, so persisted records line up with the SSE-side `usage_json`
- regular handlers: error SSE payload now matches the harmony shape
(`{"error":"agent_loop_failed","status":N}`); single uniform error
shape for `/v1/responses` clients across surfaces
- openai streaming: extract human-readable `AgentLoopError::message()`
before stamping into the SSE `error` payload — clients now see the
diagnostic text, not just the status reason phrase
- openai non-streaming: `worker.record_outcome` fires on the error
path before returning so per-worker error metrics no longer
under-count upstream failures
- openai upstream: preserve upstream HTTP status (400/429/5xx) instead
of collapsing every non-2xx to `AgentLoopError::Upstream` → 502;
matches the pre-refactor direct-passthrough contract
- driver::prime_pending_from_approval: per-pass `processed_call_ids`
HashSet alongside the transcript-snapshot `resolved_call_ids` so a
malformed payload carrying both `mcpr_<X>` and bare `<X>` (which
derive to the same call_id) cannot drive two PlannedToolExecutions
- agent_loop_sink streaming: drop dead `add_optional_field` helper
(no callers in the workspace)
Wire-shape parity (request-side leaks into responses):
- `tool_choice` builder setter that force-coerced `Value::String`
removed (zero workspace callers; structured tool_choice values
flow through `copy_from_request` unchanged)
- `ResponsesResponse.conversation` now serializes through a
`serde_with::SerializeAs` adapter (`ConversationAsObject`) so the
response wire shape always emits `{ "id": ... }` regardless of
whether the request used the bare-string or `Object` variant
- terminal reasoning items use the canonical
`summary: [{ type: "summary_text", text }]` shape; `content` /
`status` are skip-serialized when empty/None so the wire matches
the streaming `finish_reasoning_item` payload (verified earlier
against gpt-5-mini cloud)
- `ResponsesResponse.tools` skip-serialized when empty so a request
whose tools were entirely filtered out (internal MCP servers, etc.)
renders without a `"tools": []` artifact
- `build_response_from_state` filters `original_tools` through
`session.should_hide_tool_json()` — internal MCP server entries
configured with `internal: true` no longer leak back to the caller,
even when the request explicitly named them
- `ResponsesResponse.metadata` accepts `null` from upstream via
`serde_with::DefaultOnNull`; OpenAI cloud emits `"metadata": null`
for empty maps and the prior strict deserializer 500'd on it
- `build_response_from_state` overlays request-level metadata
(`model` / `instructions` / `metadata` / `safety_identifier`) when
the response left them blank — restores the
`patch_response_with_request_metadata` echo semantics the legacy
OpenAI passthrough router maintained pre-unification
Test fixture realignment:
- mock_worker `Message` items now carry the spec-required `id` and
`status` fields (verified against OpenAI's OpenAPI YAML, openai-
python, openai-node — all three mark both as required)
- mock_worker `has_prior_tool_context` heuristic now keys off the
*last* input item being `function_call_output` rather than any
occurrence in the array, so `previous_response_id` continuation
turns drive a fresh function_call instead of immediately closing
with a final message
- mcp_utils prepend test asserts the prepended `mcp_list_tools` item
is *first* in the output, locking the contract the docstring
promises
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
6cbb46f to
41fd32d
Compare
| .as_ref() | ||
| .and_then(|a| serde_json::to_value(a).ok()), | ||
| annotations: Some(serde_json::json!({ | ||
| "read_only": entry.annotations.read_only, |
There was a problem hiding this comment.
🟡 Nit: This narrows the MCP tool annotations from the full set (destructive, idempotent, open_world, read_only) down to only read_only. If this is intentional (e.g. the Responses API schema only supports read_only), a brief comment explaining why would help prevent someone from "fixing" this back to the full annotations later. If unintentional, the other fields may be useful for clients making approval/safety decisions.
| } | ||
| } | ||
|
|
||
| fn build_iteration_request( |
There was a problem hiding this comment.
🟡 Nit: build_iteration_request here is nearly identical to the one in agent_loop_adapter.rs:94. Consider extracting into a shared helper to avoid the two copies drifting apart over time.
| combined.extend(items.iter().map(responses::normalize_input_item)); | ||
| } | ||
| ResponseInput::Text(text) => combined.push(current_text_item(text)), |
There was a problem hiding this comment.
🟡 Nit: Behavioral change for the harmony path — the old harmony-specific load_previous_messages did not normalize current-input items (it called history_items.extend(items) directly) and wrapped Text input as SimpleInputMessage. The shared code now normalizes Items via normalize_input_item (converting SimpleInputMessage → Message) and wraps Text as a structured Message.
This means the harmony adapter's IterationInputOptions::preserved_simple_message() (with normalize_items: false) is partially defeated — by the time the iteration builder sees state.upstream_input, items are already normalized Message items. The SimpleInputMessage text-input shape configured for harmony is also unreachable since upstream_input is always Items after this preparation step.
If the harmony backend is tolerant of Message where it previously received SimpleInputMessage, this is fine. But it's worth verifying, since the harmony adapter was explicitly configured to preserve the simple-message form in iteration_request.rs.
| let prepared_history = match super::history::prepare_request_history( | ||
| deps.responses_components.as_ref(), | ||
| &request_body, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(id) => id, | ||
| Ok(history) => history, | ||
| Err(response) => return response, |
There was a problem hiding this comment.
🟡 Nit: The mutual-exclusivity validation between conversation and previous_response_id moved from here into the shared load_request_history → request_history_source path. The old validation at this callsite recorded Metrics::record_router_error(ROUTER_OPENAI, ..., ERROR_VALIDATION) before returning the 400. The shared layer returns the same bad_request error but without the router-specific metrics, so this validation failure is no longer counted in OpenAI router error metrics.
…r + sink
Replace the two parallel per-surface agent loops (regular = chat
completion adapter, harmony = structured-generation pipeline) with a
single shared agent loop in `routers/common/agent_loop/`, plus a
shared streaming sink that owns the full mcp_call lifecycle and a
shared transcript-lower layer that projects high-level Responses
items down to the core ChatMessage shape every backend speaks. The
control plane is a state machine
`NextAction::{CallLlm, ExecuteTools, InterruptForApproval, Finish}`;
all loop-control concerns live inside `run_agent_loop`. Each adapter
is now a thin plug for one upstream wire format and slots into the
trait `AgentLoopAdapter`, so a future OpenAI Responses passthrough
router can reuse the entire loop, sink, transcript-lower,
presentation, and `build_response_from_state` pipeline unchanged.
- new `routers/common/agent_loop/` module: state machine
(`driver.rs`, `state.rs`, `events.rs`), presentation/partition
step (`presentation.rs`), terminal renderer (`build_response.rs`),
prepared-input snapshot (`prepared.rs`), tool execution helpers
(`tooling.rs`).
- new `routers/common/transcript_lower.rs`: `Lowering` trait,
exhaustive over `ResponseInputOutputItem` so a new variant forces a
compile error rather than a silent drop.
- `load_request_history` (gRPC common) resolves both
`previous_response_id` and `conversation` and applies the lowering
pass once on the merged transcript. Surface wrappers union
`mcp_list_tools.server_label` from hand-stitched current-turn input
items so a client replaying a prior `mcp_list_tools` does not see
it re-emitted.
- `GrpcResponseStreamSink` consumes `LoopEvent`s and emits the full
mcp_call lifecycle (`output_item.added` → `mcp_call.in_progress` →
`mcp_call_arguments.delta`/`.done` → `mcp_call.completed`/`.failed`
→ `output_item.done`). The harmony stream processor's inline
gateway-tool emission is removed; only caller-declared
function_call announcements stay inline. `output_item.added` is
emitted at `ExecutionStarted` rather than `EmissionStarted`, so a
gated/skipped/over-budget call no longer allocates a phantom
`output_index` slot. Also fixes the pre-existing harmony bug where
`mcp_call.completed` fired before the gateway actually executed
the tool.
- approval continuation: `prime_pending_from_approval` re-hydrates a
resumed turn's tool call so it carries the original
`approval_request_id`; new `RenderMode::ApprovalInterrupt` and
`LoopEvent::ApprovedToolReplay`.
- `max_tool_calls` exhaustion soft-stops: clear pending calls, set
`state.tool_budget_exhausted`, run one final `CallLlm` with no
tools so the response is `status=completed` +
`incomplete_details=null` matching cloud.
- wire parity: `tool_choice` is `Value` (no double JSON-encode);
`ResponsesResponse` adds `billing`, `max_tool_calls`,
`frequency_penalty`, `moderation`, `presence_penalty`,
`prompt_cache_*`, `service_tier`, `top_logprobs`;
`McpCall.approval_request_id` is `#[serialize_always]`; reasoning
streams `reasoning_summary_part.added/done` +
`reasoning_text.delta/done`; delta events carry `obfuscation`.
- regular NS `usage` reshaped to `input_tokens / output_tokens /
total_tokens` (legacy `Classic` removed).
OpenAI Responses passthrough router intentionally not migrated yet;
the `AgentLoopAdapter` trait is the integration point.
Verified against OpenAI's `gpt-5-mini` Responses API on T1-T4 +
M1-M4 + A1-A6 + `max_tool_calls=1`, NS and ST, both `gpt-oss-120b`
(harmony) and `Qwen2.5-7B-Instruct` (regular) lanes.
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Batched fixes from coderabbit + claude bot review on the unified
Responses agent-loop refactor. No behavior changes outside what each
note called out.
- orchestrator: add `ToolExecutionOutput::synthetic_error` so the
malformed-args path renders through the unified to_response_item
pipeline; clear `WebSearchCall.results` on failure to mirror other
family clears in `apply_error_overrides`.
- agent_loop/driver: dedupe `McpApprovalResponse` entries by
`approval_request_id` (first occurrence wins) and skip priming a
call_id whose `FunctionCallOutput` is already present in the
lowered transcript; collapse the seen-set match into a guard arm.
- agent_loop/error: map `Upstream` to 502 bad_gateway instead of 500.
- agent_loop/tooling: route hosted-tool dispatch args through
`prepare_hosted_dispatch_args` at the single executor chokepoint
(lost in the harmony→common move); UTF-8-safe truncate via
`is_char_boundary`; add test_request/test_prepared/test_ctx
helpers for the unit suite.
- grpc/common/agent_loop_sink: emit `mcp_approval_request.id` as the
canonical `mcpr_<call_id>` form aligned with the post-approval
continuation namespace.
- grpc/common/history: stop dumping full item JSON on deserialize
failure (PII in stored input/output); log only kind, id, type, err.
- grpc/common/streaming: add the missing canonical request-echo
fields and always-null response fields to `emit_completed`; rename
reasoning streaming events to the cloud's
`reasoning_summary_text.{delta,done}` keyed on `summary_index`,
and reshape the final reasoning item to `{id, type, summary:[…]}`
(verified against gpt-5-mini × effort × summary matrix).
- grpc/harmony/{adapter,streaming_adapter}: drop `unwrap_or_else(||
\"{}\")` on parsed tool arguments — let the executor's malformed-
args path surface the real failure rather than silently dispatching
with empty args.
- openai/mcp/tool_loop: drop the bespoke
`build_transformed_mcp_call_item` helper; render mcp_call items
through `ToolExecutionOutput::synthetic_error` /
`to_response_item` so the openai passthrough router shares the
same render shape as the gRPC surfaces.
- protocols/builders/responses: normalize `conversation_ref` to
`ConversationRef::Object { id }` matching `copy_from_request` so
the persisted record's conversation echo round-trips through the
`previous_response_id` lookup chain.
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Complete the Responses unification by moving the OpenAI router onto the same prepared-history, agent-loop, and tool-presentation pipeline the gRPC surfaces already use. This removes the legacy OpenAI MCP loop, keeps streaming and non-streaming on one control plane, and closes the remaining ID, approval, and hosted-tool divergence that was still surfacing in CI. Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Second batch of review fixes on the unified Responses agent-loop
PR, focused on the OpenAI router migration regressions and the
remaining coderabbit / claude bot threads.
Production fixes from review feedback:
- presentation::render_final_item only stamps `error` for
`OutputFamily::McpCall`; hosted-builtin variants
(web_search_call / code_interpreter_call / file_search_call /
image_generation_call) carry no `error` slot in the protocol layer
and now convey failure through `status: "failed"` alone, matching
the typed `ResponseOutputItem::*` shapes
- transcript_lower: `output.filter(|s| !s.is_empty())` before falling
through to error projection so `Some("")` no longer suppresses the
populated `error` context on a failed mcp_call
- agent_loop_sink::process_chat_chunk now honors the disconnected
latch and flips it on the first failed `process_chunk` send,
aligning with the pattern every other emit_* path uses
- regular streaming adapter: `usage_for_persist` carries
`completion_tokens_details: { reasoning_tokens: ... }` instead of
None, so persisted records line up with the SSE-side `usage_json`
- regular handlers: error SSE payload now matches the harmony shape
(`{"error":"agent_loop_failed","status":N}`); single uniform error
shape for `/v1/responses` clients across surfaces
- openai streaming: extract human-readable `AgentLoopError::message()`
before stamping into the SSE `error` payload — clients now see the
diagnostic text, not just the status reason phrase
- openai non-streaming: `worker.record_outcome` fires on the error
path before returning so per-worker error metrics no longer
under-count upstream failures
- openai upstream: preserve upstream HTTP status (400/429/5xx) instead
of collapsing every non-2xx to `AgentLoopError::Upstream` → 502;
matches the pre-refactor direct-passthrough contract
- driver::prime_pending_from_approval: per-pass `processed_call_ids`
HashSet alongside the transcript-snapshot `resolved_call_ids` so a
malformed payload carrying both `mcpr_<X>` and bare `<X>` (which
derive to the same call_id) cannot drive two PlannedToolExecutions
- agent_loop_sink streaming: drop dead `add_optional_field` helper
(no callers in the workspace)
Wire-shape parity (request-side leaks into responses):
- `tool_choice` builder setter that force-coerced `Value::String`
removed (zero workspace callers; structured tool_choice values
flow through `copy_from_request` unchanged)
- `ResponsesResponse.conversation` now serializes through a
`serde_with::SerializeAs` adapter (`ConversationAsObject`) so the
response wire shape always emits `{ "id": ... }` regardless of
whether the request used the bare-string or `Object` variant
- terminal reasoning items use the canonical
`summary: [{ type: "summary_text", text }]` shape; `content` /
`status` are skip-serialized when empty/None so the wire matches
the streaming `finish_reasoning_item` payload (verified earlier
against gpt-5-mini cloud)
- `ResponsesResponse.tools` skip-serialized when empty so a request
whose tools were entirely filtered out (internal MCP servers, etc.)
renders without a `"tools": []` artifact
- `build_response_from_state` filters `original_tools` through
`session.should_hide_tool_json()` — internal MCP server entries
configured with `internal: true` no longer leak back to the caller,
even when the request explicitly named them
- `ResponsesResponse.metadata` accepts `null` from upstream via
`serde_with::DefaultOnNull`; OpenAI cloud emits `"metadata": null`
for empty maps and the prior strict deserializer 500'd on it
- `build_response_from_state` overlays request-level metadata
(`model` / `instructions` / `metadata` / `safety_identifier`) when
the response left them blank — restores the
`patch_response_with_request_metadata` echo semantics the legacy
OpenAI passthrough router maintained pre-unification
Test fixture realignment:
- mock_worker `Message` items now carry the spec-required `id` and
`status` fields (verified against OpenAI's OpenAPI YAML, openai-
python, openai-node — all three mark both as required)
- mock_worker `has_prior_tool_context` heuristic now keys off the
*last* input item being `function_call_output` rather than any
occurrence in the array, so `previous_response_id` continuation
turns drive a fresh function_call instead of immediately closing
with a final message
- mcp_utils prepend test asserts the prepended `mcp_list_tools` item
is *first* in the output, locking the contract the docstring
promises
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Trim verbose end-state-only comments across the recent review
feedback rounds — drop process narration ("pre-refactor this lived
in...", "verified earlier against...", "would otherwise..."),
keep load-bearing why-comments where the rationale is non-obvious.
No code-behavior changes.
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Restore client-facing tools and move caller/gateway done forwarding into the parser so streaming responses do not leak internal tool shapes. Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
Signed-off-by: Ziwen Zhao <zzw.mose@gmail.com>
29fb6c7 to
1f2e18f
Compare
Description
Problem
/v1/responseshad three parallel agent loops, one per surface:routers/grpc/regular/responses/— chat-completion adapter (driving any chat-completion-style model)routers/grpc/harmony/responses/— structured-generation pipeline (gpt-oss / harmony)routers/openai/mcp/tool_loop.rs— bespoke OpenAI Responses passthrough loop (1,878 lines)Each surface re-implemented loop-control, MCP-call lifecycle SSE emission, history hydration, transcript lowering, approval continuation,
max_tool_callsexhaustion, hosted-tool wire-shape, and id-prefix construction. Whenever the OpenAI Responses spec moved (mcp_call.approval_request_idnull-vs-omit, reasoning summary event names,output_item.addedordering, web_search results clearing, etc.) the fix had to be applied in three places — and inevitably wasn't, so the three surfaces drifted. The bespoke OpenAI loop in particular short-circuited the family-aware render path (kept success-shaped items even whenis_error=true, used asymmetric id-stripping, did not runapply_error_overrides, etc.).Solution
Replace the three parallel loops with one shared agent loop under
routers/common/agent_loop/plus a shared streaming sink, plus a shared transcript-lower / history-load layer. Each surface is now a thinAgentLoopAdapterthat owns only the upstream wire format it speaks; every loop-control concern lives insiderun_agent_loop. The control plane is a state machineNextAction::{CallLlm, ExecuteTools, InterruptForApproval, Finish}driving a single transcript and sink contract.State machine
max_tool_callsexhaustion soft-stops: clear pending calls, setstate.tool_budget_exhausted, run one finalCallLlmwith no tools so the response isstatus=completed+incomplete_details=null, matching cloud.Layer responsibilities
routers/{grpc/regular,grpc/harmony,openai/responses}/.../agent_loop_adapter*.rscall_upstream), drain its output intostate.latest_turn+pending_*_tool_calls, render the final response (render_final). Never decides what comes next.agent_loop/driver.rs+state.rsrun_agent_loop. Owns theNextActionmachine, transcript building, approval continuation,max_tool_callsbudget,RenderModeselection.agent_loop/presentation.rsOutputFamilypartition,normalize_output_item_id(stripfc_/call_→ prepend family prefix, symmetric across all families),render_initial_item/render_final_item(re-stamp id, backfillapproval_request_id, applyis_error → status=failedoverrides).agent_loop/tooling.rsexecute_planned_tool— single executor chokepoint. Routes hosted-tool args throughprepare_hosted_dispatch_args, callssession.execute_tool, materializesExecutedCall.transformed_itemviatool_output.to_response_item().agent_loop/build_response.rsbuild_response_from_state. Walks transcript + latest turn intoResponsesResponse, appliesRenderMode, splices approval items in turn order.routers/grpc/common/responses/agent_loop_sink.rs,routers/openai/responses/agent_loop_adapter.rs::OpenAiResponseStreamSink,NoopSinkLoopEvent. Emit the wire-correct SSE / protobuf event sequence. Theoutput_item.added → mcp_call.in_progress → mcp_call_arguments.{delta,done} → mcp_call.{completed,failed} → output_item.donelifecycle (and analogues for hosted families) lives inside the sink, not the adapters.routers/common/responses_history.rs+transcript_lower.rsload_request_historyresolves bothprevious_response_idandconversation, lowers high-level items (mcp_call,mcp_list_tools, image-gen, …) onto the core ChatMessage shape every backend speaks, and extracts control items (approval pairs, list-tools labels) for the driver's continuation logic.crates/mcp/src/core/orchestrator.rs+transform/transformer.rsToolExecutionOutput::to_response_item()is the single render entry.apply_error_overridesis the single failure-shape entry (status=failed, output cleared per family,web_search_call.results=Noneon failure). Newsynthetic_errorconstructor lets pre-execution failures (malformed args) flow through the same render pipeline.Where the OpenAI Responses passthrough plugs in
The OpenAI passthrough router (
routers/openai/responses/) is now a regularAgentLoopAdapterimpl just like the gRPC adapters. Itscall_upstreamparses OpenAI's SSE chunks viaopenai/mcp/tool_handler.rs(a protocol-specific state machine: classifies upstream events intoForward/Buffer/Drop/ExecuteTools), forwards non-tool events throughforward_streaming_event, and pushesLoopToolCalls intostate.pending_gateway_tool_calls. Everything else — id normalization, family-aware rendering, approval continuation, history hydration, mcp_call lifecycle SSE — runs inside the shared loop.The legacy
routers/openai/mcp/tool_loop.rs(1,878 lines) is deleted. So is the bespokebuild_transformed_mcp_call_item/stable_streaming_tool_item_id/non_streaming_tool_item_id_source/approval_request_item_id_sourceladder; all id construction goes through the shared symmetricnormalize_output_item_id(family, src).Changes
routers/common/agent_loop/module: state machine (driver.rs,state.rs,events.rs), presentation/partition (presentation.rs), terminal renderer (build_response.rs), prepared-input snapshot (prepared.rs), tool execution helpers (tooling.rs), error type (error.rs)routers/common/responses_history.rs: sharedload_request_historyfor bothprevious_response_idandconversationrouters/common/transcript_lower.rs:Loweringtrait, exhaustive overResponseInputOutputItemso a new variant forces a compile error rather than a silent droprouters/grpc/common/responses/agent_loop_sink.rs: sharedGrpcResponseStreamSinkconsumesLoopEvents and emits the full mcp_call lifecycle (output_item.added→mcp_call.in_progress→mcp_call_arguments.delta/.done→mcp_call.completed/.failed→output_item.done);output_item.addedis emitted atExecutionStartedrather thanEmissionStarted, so a gated/skipped/over-budget call no longer allocates a phantomoutput_indexslotrouters/grpc/{regular,harmony}/responses/agent_{loop,streaming}_adapter.rs: thinAgentLoopAdapterimpls per surfacerouters/openai/responses/agent_loop_adapter.rs+openai/mcp/tool_prep.rs: OpenAI passthrough plugged into the shared looprouters/openai/mcp/tool_loop.rs(1,878 lines) — bespoke OpenAI agent loop replacedprime_pending_from_approvalre-hydrates a resumed turn's tool call so it carries the originalapproval_request_id; newRenderMode::ApprovalInterruptandLoopEvent::ApprovedToolReplay; dedupe byapproval_request_id(first occurrence wins) and skip priming a call_id whoseFunctionCallOutputis already present in the lowered transcriptmax_tool_callsexhaustion soft-stop: clear pending calls, setstate.tool_budget_exhausted, run one finalCallLlmwith no tools so the response isstatus=completed+incomplete_details=null, matching cloudtool_choiceisValue(no double JSON-encode); reasoning streaming events renamed to cloud'sreasoning_summary_text.{delta,done}keyed onsummary_index; reasoning final item reshaped to{id, type, summary:[…]};obfuscationfield added on delta events;mcp_approval_request.idemitted as canonicalmcpr_<call_id>ToolExecutionOutput::synthetic_errorconstructor for malformed-args path renders through unified pipeline;apply_error_overridesclearsWebSearchCall.resultson failure (mirroring other family clears);AgentLoopError::Upstream→ 502 (bad_gateway)conversation_refsetter normalizes toConversationRef::Object { id }matchingcopy_from_requestWire alignment with OpenAI cloud (verified against
gpt-5-mini12-fixture matrix)response.incompleteSSE event +ResponseStatus::Incomplete(was incorrectly emittingresponse.completedwithincomplete_details);event_typesaddsResponseEvent::Incompletemcp_list_toolsoutput items omit thestatusfield (OpenAI emits{id, type, server_label, tools}only); same shape across emitter, gRPC sink, and OpenAI sinkobfuscationstring instead ofnull(text / MCP args / function args / reasoning summary deltas)reasoning_summary_part.done.part.textcarries the accumulated summary text instead of an empty stringResponsesResponse.toolsalways serializes (was skipping when empty)completed_atfilled with the current Unix timestamp on terminal states (NS + ST), was alwaysnullconversationecho conditional on the request having one (was forcingnullwhen absent)billingdifferentiated by router: OpenAI passthrough relays upstream's{"payer":"developer"}unchanged; gRPC self-hosted omits the field entirelyresponse_tool_echo_valuehelper centralizes MCP tool wire-echo: always-insert non-secret fields withnullonNone, stripauthorization(security), null-placeholderheaderssanitize_openai_replay_inputstrips OpenAI-rejected reasoning replay fields (id,status,content) per iterationIteration request / loop setup unification
ResponsesLoopSetup::session{,_with_headers}consolidates 6 inlineMcpToolSession::new + configure_approval_policyrituals across all 6 entry points (NS / ST × {OpenAI passthrough, gRPC regular, gRPC harmony})IterationRequestFlavor::Responses { stream, tools }is now shared by gRPC harmony AND OpenAI passthrough; OpenAI's per-iteration JSON munging is reduced to the OpenAI-only post-process (sanitize + provider transform)upstream_toolsat constructor (mirrors the harmony pattern);prepare_mcp_tools_as_functionsValue-side helper removedOpenAiUpstreamHandle.base_payloadfield removed;route.rs's redundantprovider.transform_requestcall removed (the iteration-level transform was overriding it anyway)last_usagefield on streaming adapters removed;render_finalreads fromstate.latest_turn.usageStreaming sink lifecycle consolidation
ResponseStreamEventEmittergains four lifecycle helpers (emit_approval_request_lifecycle,emit_approved_tool_replay_lifecycle,emit_gateway_tool_completed_lifecycle,emit_mcp_list_tools_item_lifecycle); bothOpenAiResponseStreamSinkandGrpcResponseStreamSinkconsume them, tracking-state stays per-sinkemit_mcp_list_tools_sequenceremoved (no production callers)fn sendhelpers replaced with the sharedsend_event_best_effortDriver / state-machine polish
decide_after_call_llmunreachable branch now logs viatracing::error!then falls through toFinishgracefully (was silently returningFinishand masking partition bugs)RenderMode::Incomplete.reasonconstrained to OpenAI-spec values; iteration-cap exhaustion (the only producer of"max_iterations") now returnsAgentLoopError::Internal→ 500 instead of fabricating a non-specincompleteterminalTest Plan
pre-commit run --all-files— clean (rustfmt, clippy, codespell, ruff)cargo check -p smg+cargo clippy -p smg --tests— cleane2e validated against fresh sglang VM (gpt-oss-120b harmony, Qwen2.5-7B-Instruct regular) plus OpenAI cloud (gpt-5-mini matrix, gpt-5-nano openai-responses CI job)
25 review threads from coderabbit + claude bot resolved (id namespace, mcp_approval_request canonical id, history PII redaction, reasoning streaming alignment, tool argument unwrap_or_default, error overrides, etc.)
new wire-parity fixture tests under
model_gateway/tests/fixtures/openai_responses/: cached OpenAI cloud responses for 12 scenarios (NS/ST × {minimum, MCPnever, MCPalways, function tool, web_search_preview, code_interpreter, max_output_tokens, conversation, …}) compared field-set-wise against SMG outputregression tests
iteration_payload_normalizes_reasoning_items_for_openai_replay,iteration_payload_reapplies_provider_transform_to_replayed_items,incomplete_finish_emits_response_incomplete,mcp_list_tools_added_event_omits_status_field,streaming_delta_carries_obfuscation_string,reasoning_summary_part_done_carries_accumulated_textxAI provider transform idempotency confirmed via the iteration-payload regression so
route.rscan stop running provider transform on the seed payloadChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspasses