diff --git a/crates/mcp/src/core/orchestrator.rs b/crates/mcp/src/core/orchestrator.rs index e0197cdcc..545350f67 100644 --- a/crates/mcp/src/core/orchestrator.rs +++ b/crates/mcp/src/core/orchestrator.rs @@ -158,7 +158,8 @@ pub struct ToolExecutionInput { /// Output from batch tool execution. /// /// `#[non_exhaustive]` so additive fields don't break consumers; only -/// `smg-mcp` constructs this. +/// `smg-mcp` constructs this in production. External tests use +/// [`ToolExecutionOutput::new_for_test`]. #[derive(Debug, Clone)] #[non_exhaustive] pub struct ToolExecutionOutput { @@ -184,6 +185,39 @@ pub struct ToolExecutionOutput { pub duration: Duration, } +impl ToolExecutionOutput { + /// Test-only constructor for downstream crates blocked by + /// `#[non_exhaustive]`. One positional arg per public field is + /// intentional so adding a field forces every test to consider it. + #[expect( + clippy::too_many_arguments, + reason = "Test fixture constructor; one arg per field is intentional." + )] + pub fn new_for_test( + call_id: impl Into, + tool_name: impl Into, + server_key: impl Into, + server_label: impl Into, + arguments_str: impl Into, + output: Value, + is_error: bool, + error_message: Option, + duration: Duration, + ) -> Self { + Self { + call_id: call_id.into(), + tool_name: tool_name.into(), + server_key: server_key.into(), + server_label: server_label.into(), + arguments_str: arguments_str.into(), + output, + is_error, + error_message, + duration, + } + } +} + /// Result from resolved tool execution that preserves interactive approval state. #[derive(Debug, Clone)] pub enum ToolExecutionResult { diff --git a/model_gateway/src/routers/common/mcp_utils.rs b/model_gateway/src/routers/common/mcp_utils.rs index d9ba42ced..e22271fc5 100644 --- a/model_gateway/src/routers/common/mcp_utils.rs +++ b/model_gateway/src/routers/common/mcp_utils.rs @@ -247,6 +247,17 @@ pub fn extract_builtin_types(tools: &[ResponseTool]) -> Vec { .collect() } +/// True if `tools` carries any MCP-routed entry (declared MCP server or a +/// builtin family that the gateway intercepts via MCP). +/// +/// Derived from [`extract_builtin_types`] so the predicate and the actual +/// routing path can't drift — adding a new builtin to the routing path +/// (e.g. `file_search`) makes this predicate cover it too. +pub fn request_uses_mcp_routing(tools: &[ResponseTool]) -> bool { + tools.iter().any(|t| matches!(t, ResponseTool::Mcp(_))) + || !extract_builtin_types(tools).is_empty() +} + /// Collect user-declared function tool names from a Responses request. pub(crate) fn collect_user_function_names(request: &ResponsesRequest) -> HashSet { request diff --git a/model_gateway/src/routers/common/openai_bridge/format_registry.rs b/model_gateway/src/routers/common/openai_bridge/format_registry.rs index 7170e15fa..14c5652ac 100644 --- a/model_gateway/src/routers/common/openai_bridge/format_registry.rs +++ b/model_gateway/src/routers/common/openai_bridge/format_registry.rs @@ -52,39 +52,41 @@ impl FormatRegistry { self.formats.insert(qualified, format); } - /// Populate from a server config: per-tool overrides + builtin defaults. - /// Safe to call repeatedly — entries for non-Passthrough formats are - /// overwritten. Downgrading a format back to `Passthrough` requires a - /// separate registry rebuild (no production caller mutates configs in - /// place today). + fn remove(&self, qualified: &QualifiedToolName) { + self.formats.remove(qualified); + } + + /// Populate from a server config. Safe to call repeatedly. /// - /// Mirrors `McpOrchestrator::apply_tool_configs`: - /// - When a tool has an `alias`, the format is attached **only** to the - /// alias entry (under `("alias", alias_name)`), matching the orchestrator's - /// `register_alias` qualified-name shape. The underlying `(server, tool)` - /// stays at the `Passthrough` default so direct calls aren't transformed. - /// - When a tool has no alias but a non-default format, attach to - /// `(server, tool)` directly. - /// - When the per-tool stanza omits `response_format` entirely - /// (`None`), the builtin default still applies. This lets users add an - /// `alias` or `arg_mapping` to a builtin tool without disabling its - /// hosted-format wire shape. An explicit `Some(Passthrough)` *does* - /// block the builtin default — that is the documented escape hatch - /// for opting out of the hosted shape. + /// `McpToolSession::collect_visible_mcp_tools` replaces a direct tool + /// entry with its alias entry, so production session lookup of an + /// aliased tool resolves through `("alias", alias_name)`. Direct + /// dispatch still uses `(server_key, tool_name)`. Both keys must carry + /// the same format, so non-Passthrough formats are mirrored on both. pub fn populate_from_server_config(&self, config: &McpServerConfig) { if let Some(tools) = &config.tools { for (tool_name, tool_config) in tools { + let direct_key = QualifiedToolName::new(&config.name, tool_name); + let alias_key = tool_config + .alias + .as_deref() + .map(|alias| QualifiedToolName::new(ALIAS_SERVER_KEY, alias)); + let Some(format_config) = tool_config.response_format else { continue; }; let format: ResponseFormat = format_config.into(); if format == ResponseFormat::Passthrough { + self.remove(&direct_key); + if let Some(alias_key) = &alias_key { + self.remove(alias_key); + } continue; } - if let Some(alias) = &tool_config.alias { - self.insert(QualifiedToolName::new(ALIAS_SERVER_KEY, alias), format); - } else { - self.insert(QualifiedToolName::new(&config.name, tool_name), format); + + self.insert(direct_key, format); + if let Some(alias_key) = alias_key { + self.insert(alias_key, format); } } } @@ -92,14 +94,14 @@ impl FormatRegistry { if let (Some(builtin_type), Some(tool_name)) = (&config.builtin_type, &config.builtin_tool_name) { - let has_explicit_format = config - .tools - .as_ref() - .and_then(|tools| tools.get(tool_name)) - .is_some_and(|cfg| cfg.response_format.is_some()); + let stanza = config.tools.as_ref().and_then(|tools| tools.get(tool_name)); + let has_explicit_format = stanza.is_some_and(|cfg| cfg.response_format.is_some()); if !has_explicit_format { let format: ResponseFormat = builtin_type.response_format().into(); self.insert(QualifiedToolName::new(&config.name, tool_name), format); + if let Some(alias) = stanza.and_then(|cfg| cfg.alias.as_deref()) { + self.insert(QualifiedToolName::new(ALIAS_SERVER_KEY, alias), format); + } } } } @@ -142,9 +144,7 @@ mod tests { } #[test] - fn alias_format_stored_under_alias_server_key() { - // Mirrors orchestrator::register_alias which uses - // QualifiedToolName::new("alias", alias_name). + fn alias_format_mirrored_on_both_keys() { let mut tools = HashMap::new(); tools.insert( "brave_web_search".to_string(), @@ -163,12 +163,10 @@ mod tests { assert_eq!( r.lookup_by_names("alias", "web_search"), ResponseFormat::WebSearchCall, - "alias entry must use the literal `alias` server_key prefix" ); assert_eq!( r.lookup_by_names("brave", "brave_web_search"), - ResponseFormat::Passthrough, - "underlying tool entry must NOT receive the format when an alias exists" + ResponseFormat::WebSearchCall, ); } @@ -217,7 +215,6 @@ mod tests { "do_search".to_string(), ToolConfig { alias: None, - // Explicit override differs from the builtin default. response_format: Some(ResponseFormatConfig::Passthrough), arg_mapping: None, }, @@ -230,8 +227,6 @@ mod tests { let r = FormatRegistry::new(); r.populate_from_server_config(&cfg); - // Explicit Some(Passthrough) override means "no entry inserted" AND - // the builtin default is NOT applied on top. assert_eq!( r.lookup_by_names("search", "do_search"), ResponseFormat::Passthrough @@ -239,12 +234,7 @@ mod tests { } #[test] - fn alias_only_stanza_preserves_builtin_default() { - // Regression: a per-tool stanza that only aliases a builtin tool - // (or only sets arg_mapping) used to suppress the builtin default, - // collapsing the hosted format to plain mcp_call. With - // `response_format: None` meaning "inherit context", the builtin - // default must still apply. + fn alias_only_stanza_preserves_builtin_default_on_both_keys() { let mut tools = HashMap::new(); tools.insert( "do_search".to_string(), @@ -262,10 +252,63 @@ mod tests { let r = FormatRegistry::new(); r.populate_from_server_config(&cfg); + // Alias key — what production session lookup hits. + assert_eq!( + r.lookup_by_names("alias", "web_search"), + ResponseFormat::WebSearchCall, + ); + // Direct key — what direct dispatch hits. assert_eq!( r.lookup_by_names("search", "do_search"), ResponseFormat::WebSearchCall, - "alias-only stanza must not disable the builtin's hosted format" + ); + } + + #[test] + fn explicit_passthrough_downgrade_clears_prior_hosted_entry() { + let r = FormatRegistry::new(); + + let mut hosted = HashMap::new(); + hosted.insert( + "brave_web_search".to_string(), + ToolConfig { + alias: Some("web_search".to_string()), + response_format: Some(ResponseFormatConfig::WebSearchCall), + arg_mapping: None, + }, + ); + let mut hosted_cfg = server("brave"); + hosted_cfg.tools = Some(hosted); + r.populate_from_server_config(&hosted_cfg); + assert_eq!( + r.lookup_by_names("alias", "web_search"), + ResponseFormat::WebSearchCall, + ); + assert_eq!( + r.lookup_by_names("brave", "brave_web_search"), + ResponseFormat::WebSearchCall, + ); + + let mut downgraded = HashMap::new(); + downgraded.insert( + "brave_web_search".to_string(), + ToolConfig { + alias: Some("web_search".to_string()), + response_format: Some(ResponseFormatConfig::Passthrough), + arg_mapping: None, + }, + ); + let mut downgraded_cfg = server("brave"); + downgraded_cfg.tools = Some(downgraded); + r.populate_from_server_config(&downgraded_cfg); + + assert_eq!( + r.lookup_by_names("alias", "web_search"), + ResponseFormat::Passthrough, + ); + assert_eq!( + r.lookup_by_names("brave", "brave_web_search"), + ResponseFormat::Passthrough, ); } } diff --git a/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs b/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs index 7d513bb87..76561e82a 100644 --- a/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs +++ b/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs @@ -85,18 +85,30 @@ pub fn response_tools(session: &McpToolSession<'_>) -> Vec { } /// `McpToolInfo` records used inside `mcp_list_tools` output items. +/// +/// `annotations` is narrowed to `{"read_only": …}` to match OpenAI's +/// Responses API shape. The hint is read straight off the rmcp tool +/// (`tool.annotations.read_only_hint`) rather than the SMG +/// `ToolAnnotations` wrapper — the wrapper applies conservative policy +/// defaults (destructive=true on absent hint) that are intended for the +/// approval pipeline, not the wire surface, and reading them here would +/// surface the wrong `read_only` for tools the server didn't annotate. pub fn build_mcp_tool_infos(entries: &[smg_mcp::ToolEntry]) -> Vec { entries .iter() - .map(|entry| McpToolInfo { - name: entry.tool_name().to_string(), - description: entry.tool.description.as_ref().map(|d| d.to_string()), - input_schema: schema_to_value(&entry.tool.input_schema), - annotations: entry + .map(|entry| { + let read_only = entry .tool .annotations .as_ref() - .and_then(|a| serde_json::to_value(a).ok()), + .and_then(|a| a.read_only_hint) + .unwrap_or(false); + McpToolInfo { + name: entry.tool_name().to_string(), + description: entry.tool.description.as_ref().map(|d| d.to_string()), + input_schema: schema_to_value(&entry.tool.input_schema), + annotations: Some(json!({ "read_only": read_only })), + } }) .collect() } @@ -311,3 +323,60 @@ fn is_client_visible_output_item( | ResponseOutputItem::LocalShellCallOutput { .. } => true, } } + +#[cfg(test)] +mod tests { + use std::{borrow::Cow, sync::Arc}; + + use rmcp::model::{Tool, ToolAnnotations as RmcpToolAnnotations}; + use smg_mcp::ToolEntry; + + use super::*; + + fn entry_with_rmcp_annotations(annotations: Option) -> ToolEntry { + let tool = Tool { + name: Cow::Owned("widget".to_string()), + title: None, + description: Some(Cow::Owned("widget description".to_string())), + input_schema: Arc::new(serde_json::Map::new()), + output_schema: None, + annotations, + icons: None, + }; + ToolEntry::from_server_tool("srv", tool) + } + + fn read_only_hint(value: bool) -> RmcpToolAnnotations { + RmcpToolAnnotations { + title: None, + read_only_hint: Some(value), + destructive_hint: None, + idempotent_hint: None, + open_world_hint: None, + } + } + + #[test] + fn build_mcp_tool_infos_surfaces_rmcp_read_only_hint() { + let entries = vec![entry_with_rmcp_annotations(Some(read_only_hint(true)))]; + let infos = build_mcp_tool_infos(&entries); + let serialized = serde_json::to_value(&infos[0]) + .expect("McpToolInfo must serialize") + .get("annotations") + .cloned() + .expect("annotations must serialize"); + assert_eq!(serialized, json!({ "read_only": true })); + } + + #[test] + fn build_mcp_tool_infos_defaults_to_false_when_hint_absent() { + let entries = vec![entry_with_rmcp_annotations(None)]; + let infos = build_mcp_tool_infos(&entries); + let serialized = serde_json::to_value(&infos[0]) + .expect("McpToolInfo must serialize") + .get("annotations") + .cloned() + .expect("annotations must serialize"); + assert_eq!(serialized, json!({ "read_only": false })); + } +} diff --git a/model_gateway/src/routers/common/openai_bridge/transformer.rs b/model_gateway/src/routers/common/openai_bridge/transformer.rs index 3ec3fe745..9a4075946 100644 --- a/model_gateway/src/routers/common/openai_bridge/transformer.rs +++ b/model_gateway/src/routers/common/openai_bridge/transformer.rs @@ -9,19 +9,29 @@ use tracing::warn; use super::ResponseFormat; -/// Transform a `ToolExecutionOutput` to a `ResponseOutputItem` using a +/// Transform a `ToolExecutionOutput` into a `ResponseOutputItem` using a /// pre-resolved `ResponseFormat`. /// -/// The format MUST be resolved via the session's exposed-name map (e.g. -/// [`super::lookup_tool_format`]). `output.tool_name` is the *invoked/exposed* -/// name after `McpToolSession::execute_tool_result` rewrites it, so a registry -/// lookup against `(output.server_key, output.tool_name)` would miss for -/// disambiguated names like `mcp__` and silently degrade to -/// `Passthrough`. +/// `response_format` must be resolved via the session's exposed-name map +/// (e.g. [`super::lookup_tool_format`]) — `output.tool_name` carries the +/// *invoked/exposed* name after session-side rewriting, so a fresh +/// registry lookup against `(output.server_key, output.tool_name)` would +/// miss for disambiguated names like `mcp__`. +/// +/// On `is_error`: only `mcp_call` surfaces the failure (`status: "failed"`, +/// `error: Some(msg)`). The four hosted-builtin variants always emit +/// `status: "completed"` — that matches OpenAI cloud's de-facto behavior +/// (the cloud's native `web_search_call`/`code_interpreter_call`/etc. +/// don't emit `Failed` for soft failures like empty results or rate +/// limits, and many MCP servers set `isError: true` for those very cases). +/// The error context lives in the item's content, not in `status`. pub fn transform_tool_output( output: &smg_mcp::ToolExecutionOutput, response_format: ResponseFormat, ) -> ResponseOutputItem { + if output.is_error && response_format == ResponseFormat::Passthrough { + return failed_mcp_call(output); + } ResponseTransformer::transform( &output.output, response_format, @@ -32,6 +42,32 @@ pub fn transform_tool_output( ) } +fn failed_mcp_call(output: &smg_mcp::ToolExecutionOutput) -> ResponseOutputItem { + // The MCP `CallToolResult { is_error: true, .. }` path can leave + // `error_message` unset while carrying the real failure text inside + // `output.output` (typed text blocks). Fall back to that before the + // generic placeholder so client-visible mcp_call.error stays useful. + let err_msg = output + .error_message + .clone() + .filter(|msg| !msg.is_empty()) + .or_else(|| { + let text = ResponseTransformer::flatten_mcp_output(&output.output); + (!text.is_empty()).then_some(text) + }) + .unwrap_or_else(|| "Tool execution failed".to_string()); + ResponseOutputItem::McpCall { + id: mcp_response_item_id(&output.call_id), + status: "failed".to_string(), + approval_request_id: None, + arguments: output.arguments_str.clone(), + error: Some(err_msg), + name: output.tool_name.clone(), + output: String::new(), + server_label: output.server_label.clone(), + } +} + /// Normalize an MCP response item id source into an external `mcp_call.id`. /// /// The input may be an upstream output item id (`fc_*`), an internal call id @@ -1547,6 +1583,88 @@ mod tests { } } + fn failed_tool_output(tool_name: &str) -> smg_mcp::ToolExecutionOutput { + smg_mcp::ToolExecutionOutput::new_for_test( + "call_xyz", + tool_name, + "srv", + "srv-label", + "{\"q\":\"v\"}", + serde_json::json!({}), + /* is_error */ true, + Some("upstream broke".to_string()), + std::time::Duration::default(), + ) + } + + #[test] + fn transform_tool_output_emits_mcp_call_failed_with_error_message() { + let item = transform_tool_output( + &failed_tool_output("brave_web_search"), + ResponseFormat::Passthrough, + ); + match item { + ResponseOutputItem::McpCall { + status, + error, + output, + arguments, + name, + .. + } => { + assert_eq!(status, "failed"); + assert_eq!(error.as_deref(), Some("upstream broke")); + assert_eq!(output, ""); + assert_eq!(arguments, "{\"q\":\"v\"}"); + assert_eq!(name, "brave_web_search"); + } + _ => panic!("Expected McpCall"), + } + } + + #[test] + fn transform_tool_output_falls_back_to_output_text_when_error_message_missing() { + // MCP `CallToolResult { is_error: true }` can leave error_message + // unset and put the failure text inside the result blocks. + let mut output = failed_tool_output("brave_web_search"); + output.error_message = None; + output.output = serde_json::json!([ + {"type": "text", "text": "rate limited"} + ]); + + let item = transform_tool_output(&output, ResponseFormat::Passthrough); + match item { + ResponseOutputItem::McpCall { error, .. } => { + assert_eq!(error.as_deref(), Some("rate limited")); + } + _ => panic!("Expected McpCall"), + } + } + + #[test] + fn transform_tool_output_emits_hosted_completed_even_on_is_error() { + // Hosted-builtin variants always emit status=completed regardless of + // upstream is_error. Real OpenAI cloud doesn't emit Failed for soft + // failures (rate-limited search, no results, etc.) and many MCP + // servers set isError=true for those exact cases — propagating it + // would break clients that expect cloud-parity wire shape. The + // failure context lives in the item content, not in `status`. + for fmt in [ + ResponseFormat::WebSearchCall, + ResponseFormat::CodeInterpreterCall, + ResponseFormat::FileSearchCall, + ResponseFormat::ImageGenerationCall, + ] { + let item = transform_tool_output(&failed_tool_output("search"), fmt); + let serialized = serde_json::to_value(&item).expect("serialize failed item"); + assert_eq!( + serialized["status"], + serde_json::json!("completed"), + "{fmt:?}" + ); + } + } + #[test] fn test_compact_image_generation_outputs_json_strips_base64() { let mut outputs = vec![ diff --git a/model_gateway/src/routers/common/persistence_utils.rs b/model_gateway/src/routers/common/persistence_utils.rs index 98010bd8e..13fb66b42 100644 --- a/model_gateway/src/routers/common/persistence_utils.rs +++ b/model_gateway/src/routers/common/persistence_utils.rs @@ -85,8 +85,23 @@ pub fn item_to_json(item: &ConversationItem) -> Value { obj.insert("phase".to_string(), phase_value); } } + } else if let Some(content_obj) = item.content.as_object() { + // Whole-item store path (`store_whole_item` in + // `item_to_new_conversation_item`): `content` is the original item + // value with its fields at the top level. Hoist them back rather + // than wrapping them under `content`, otherwise replayed structural + // items come back as `{"type":"…","content":{"type":"…",…}}` and + // lose `revised_prompt`/`status`/etc. on the wire. + for (k, v) in content_obj { + if matches!(k.as_str(), "id" | "type" | "role" | "status") { + continue; + } + obj.insert(k.clone(), v.clone()); + } } else { - // Default: include content as-is + // Pre-fix rows for non-listed types stored `[]` instead of the + // whole item; preserve the legacy `{"content": …}` wrapping so we + // don't drop data that came in under the old write path. obj.insert("content".to_string(), item.content.clone()); } @@ -320,19 +335,18 @@ fn extract_input_items(input: &ResponseInput) -> Result, String> { fn item_to_new_conversation_item( item_value: &Value, response_id: Option, - is_input: bool, + _is_input: bool, ) -> NewConversationItem { let item_type = item_value .get("type") .and_then(|v| v.as_str()) .unwrap_or("message"); - // Determine if we should store the whole item or just the content field - let store_whole_item = if is_input { - item_type == "function_call" || item_type == "function_call_output" - } else { - item_type != "message" - }; + // Non-message items carry their fields at the top level (no `content` + // field on the wire), so reading `content` on the input side would + // collapse replayed structural items (`image_generation_call`, + // `web_search_call`, …) to `[]`. + let store_whole_item = item_type != "message"; let content = if store_whole_item { item_value.clone() @@ -515,3 +529,95 @@ async fn persist_conversation_items_inner( Ok(()) } + +#[cfg(test)] +mod tests { + use chrono::Utc; + + use super::*; + + fn stored_item(item_type: &str, content: Value) -> ConversationItem { + ConversationItem { + id: ConversationItemId::from("ig_replay_1"), + response_id: None, + item_type: item_type.to_string(), + role: None, + content, + status: Some("completed".to_string()), + created_at: Utc::now(), + } + } + + #[test] + fn item_to_json_restores_top_level_fields_for_whole_item_store() { + let stored = stored_item( + "image_generation_call", + json!({ + "id": "ig_replay_1", + "type": "image_generation_call", + "status": "completed", + "revised_prompt": "cat", + }), + ); + + let serialized = item_to_json(&stored); + assert_eq!(serialized["type"], json!("image_generation_call")); + assert_eq!(serialized["revised_prompt"], json!("cat")); + assert!( + serialized.get("content").is_none(), + "non-message whole-item store must hoist top-level fields, not \ + nest the original item under `content`: {serialized}" + ); + } + + #[test] + fn item_to_json_falls_back_to_legacy_content_wrap_for_array_content() { + // Pre-fix rows for non-listed types may have stored `[]`. Preserve + // the legacy wrapping so old data still round-trips. + let stored = stored_item("image_generation_call", json!([])); + let serialized = item_to_json(&stored); + assert_eq!(serialized["content"], json!([])); + } + + #[test] + fn replayed_image_generation_input_item_is_stored_whole() { + let input_item = json!({ + "id": "ig_replay_1", + "type": "image_generation_call", + "status": "completed", + "revised_prompt": "cat", + }); + + let new_item = item_to_new_conversation_item(&input_item, None, /* is_input */ true); + assert_eq!(new_item.item_type, "image_generation_call"); + assert_eq!( + new_item + .content + .get("revised_prompt") + .and_then(|v| v.as_str()), + Some("cat"), + ); + assert_eq!( + new_item.content.get("type").and_then(|v| v.as_str()), + Some("image_generation_call"), + ); + } + + #[test] + fn replayed_message_input_item_still_extracts_content() { + let input_item = json!({ + "id": "msg_1", + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "hi"}], + "status": "completed", + }); + + let new_item = item_to_new_conversation_item(&input_item, None, /* is_input */ true); + assert_eq!(new_item.item_type, "message"); + assert_eq!( + new_item.content, + json!([{"type": "input_text", "text": "hi"}]) + ); + } +} diff --git a/model_gateway/src/routers/grpc/regular/responses/streaming.rs b/model_gateway/src/routers/grpc/regular/responses/streaming.rs index ad00ae4d6..19bd6624b 100644 --- a/model_gateway/src/routers/grpc/regular/responses/streaming.rs +++ b/model_gateway/src/routers/grpc/regular/responses/streaming.rs @@ -739,31 +739,35 @@ async fn execute_tool_loop_streaming_internal( let success = !tool_output.is_error; let output_str = tool_output.output.to_string(); - if success { - // Emit tool_call.completed - let event = - emitter.emit_tool_call_completed(output_index, &item_id, response_format); - emitter.send_event(&event, &tx)?; - - // Build complete item with output - let mut item_done = json!({ + let output_item = + openai_bridge::transform_tool_output(&tool_output, response_format); + let mut item_done = serde_json::to_value(&output_item).unwrap_or_else(|e| { + warn!( + tool = %tool_output.tool_name, + error = %e, + "Failed to serialize transformed output item; falling back to a minimal stub", + ); + json!({ "id": item_id, "type": item_type, - "name": tool_output.tool_name, - "status": "completed", - "arguments": tool_output.arguments_str, - "output": output_str - }); - attach_mcp_server_label( - &mut item_done, - Some(tool_output.server_label.as_str()), - Some(&response_format), - ); + "status": if success { "completed" } else { "failed" }, + }) + }); + // Override the typed item's id so output_item.done matches the + // streaming-allocated id used by the earlier output_item.added. + if let Some(obj) = item_done.as_object_mut() { + obj.insert("id".to_string(), json!(&item_id)); + } + attach_mcp_server_label( + &mut item_done, + Some(tool_output.server_label.as_str()), + Some(&response_format), + ); - // Emit output_item.done - let event = emitter.emit_output_item_done(output_index, &item_done); + if success { + let event = + emitter.emit_tool_call_completed(output_index, &item_id, response_format); emitter.send_event(&event, &tx)?; - emitter.complete_output_item(output_index); } else { let err_text = tool_output .error_message @@ -771,32 +775,28 @@ async fn execute_tool_loop_streaming_internal( .unwrap_or_else(|| output_str.clone()); warn!("Tool execution returned error: {}", err_text); - // Emit mcp_call.failed (no web_search_call.failed event exists) - let event = emitter.emit_mcp_call_failed(output_index, &item_id, &err_text); - emitter.send_event(&event, &tx)?; - - // Build failed item - let mut item_done = json!({ - "id": item_id, - "type": item_type, - "name": tool_output.tool_name, - "status": "failed", - "arguments": tool_output.arguments_str, - "error": err_text - }); - attach_mcp_server_label( - &mut item_done, - Some(tool_output.server_label.as_str()), - Some(&response_format), - ); - - // Emit output_item.done - let event = emitter.emit_output_item_done(output_index, &item_done); - emitter.send_event(&event, &tx)?; - emitter.complete_output_item(output_index); + // `response.mcp_call.failed` is the only `*.failed` event + // in the Responses API. Hosted-builtin families close via + // `*.completed` to mirror OpenAI cloud's wire shape; + // the failure context (when present) lives in the item + // content. + if matches!(response_format, ResponseFormat::Passthrough) { + let event = emitter.emit_mcp_call_failed(output_index, &item_id, &err_text); + emitter.send_event(&event, &tx)?; + } else { + let event = emitter.emit_tool_call_completed( + output_index, + &item_id, + response_format, + ); + emitter.send_event(&event, &tx)?; + } } - // Record MCP tool metrics + let event = emitter.emit_output_item_done(output_index, &item_done); + emitter.send_event(&event, &tx)?; + emitter.complete_output_item(output_index); + Metrics::record_mcp_tool_duration( ¤t_request.model, &tool_output.tool_name, @@ -812,10 +812,6 @@ async fn execute_tool_loop_streaming_internal( }, ); - let output_item = - openai_bridge::transform_tool_output(&tool_output, response_format); - - // Record the call in state with transformed output item state.record_call( tool_output.call_id, tool_output.tool_name, diff --git a/model_gateway/src/routers/openai/responses/non_streaming.rs b/model_gateway/src/routers/openai/responses/non_streaming.rs index 44ae40224..08df7a1d7 100644 --- a/model_gateway/src/routers/openai/responses/non_streaming.rs +++ b/model_gateway/src/routers/openai/responses/non_streaming.rs @@ -15,7 +15,7 @@ use super::utils::{patch_response_with_request_metadata, restore_original_tools} use crate::routers::{ common::{ header_utils::{extract_forwardable_request_headers, ApiProvider}, - mcp_utils::ensure_request_mcp_client, + mcp_utils::{ensure_request_mcp_client, request_uses_mcp_routing}, openai_bridge, persistence_utils::persist_conversation_items, }, @@ -56,40 +56,42 @@ pub async fn handle_non_streaming_response(mut ctx: RequestContext) -> Response return error::internal_error("internal_error", "Worker not selected"); } }; - let mcp_orchestrator = match ctx.components.mcp_orchestrator() { - Some(m) => m, - None => { - return error::internal_error("internal_error", "MCP orchestrator required"); - } - }; - - // The format registry is the router-side source of truth for MCP - // builtin/alias format resolution; falling back to a default would - // silently mis-route hosted tools instead of failing fast. - let mcp_format_registry = match ctx.components.mcp_format_registry() { - Some(r) => r.clone(), - None => { - return error::internal_error("internal_error", "MCP format registry required"); + // Only MCP-laden requests need the orchestrator and format registry; + // without this narrowing, plain non-MCP requests would 500 in + // deployments that run the gateway without MCP wiring. A registry-less + // MCP request still hard-fails — silent fallback would mis-route + // hosted tools. + let mcp_routing = match original_body.tools.as_deref() { + Some(tools) if request_uses_mcp_routing(tools) => { + let Some(mcp_orchestrator) = ctx.components.mcp_orchestrator() else { + return error::internal_error( + "internal_error", + "MCP orchestrator required for requests carrying MCP/builtin tools", + ); + }; + let Some(registry) = ctx.components.mcp_format_registry() else { + return error::internal_error( + "internal_error", + "MCP format registry required for requests carrying MCP/builtin tools", + ); + }; + ensure_request_mcp_client(mcp_orchestrator, registry, tools) + .await + .map(|servers| (servers, mcp_orchestrator.clone(), registry.clone())) } - }; - - // Check for MCP tools and create session if needed - let mcp_servers = if let Some(tools) = original_body.tools.as_deref() { - ensure_request_mcp_client(mcp_orchestrator, &mcp_format_registry, tools).await - } else { - None + _ => None, }; let mut response_json: Value; - if let Some(mcp_servers) = mcp_servers { + if let Some((mcp_servers, mcp_orchestrator, mcp_format_registry)) = mcp_routing { let session_request_id = original_body .request_id .clone() .unwrap_or_else(|| format!("req_{}", uuid::Uuid::now_v7())); let forwarded_headers = extract_forwardable_request_headers(ctx.headers()); let mut session = McpToolSession::new_with_headers( - mcp_orchestrator, + &mcp_orchestrator, mcp_servers, &session_request_id, forwarded_headers, diff --git a/model_gateway/src/routers/openai/responses/streaming.rs b/model_gateway/src/routers/openai/responses/streaming.rs index 2b6a204a3..2c8b27288 100644 --- a/model_gateway/src/routers/openai/responses/streaming.rs +++ b/model_gateway/src/routers/openai/responses/streaming.rs @@ -1065,7 +1065,7 @@ pub(super) fn handle_streaming_with_tool_interception( /// Main entry point for streaming responses pub async fn handle_streaming_response(ctx: RequestContext) -> Response { - use crate::routers::common::mcp_utils::ensure_request_mcp_client; + use crate::routers::common::mcp_utils::{ensure_request_mcp_client, request_uses_mcp_routing}; let worker = match ctx.worker() { Some(w) => w.clone(), @@ -1080,26 +1080,29 @@ pub async fn handle_streaming_response(ctx: RequestContext) -> Response { return error::internal_error("internal_error", "Expected responses request"); } }; - let mcp_orchestrator = match ctx.components.mcp_orchestrator() { - Some(m) => m.clone(), - None => { - return error::internal_error("internal_error", "MCP orchestrator required"); - } - }; - // Same fail-fast contract as the non-streaming path: a missing format - // registry means MCP routing decisions would be silently wrong. - let mcp_format_registry = match ctx.components.mcp_format_registry() { - Some(r) => r.clone(), - None => { - return error::internal_error("internal_error", "MCP format registry required"); + // Only MCP-laden requests need the orchestrator and format registry; + // plain streaming requests must still pass through deployments without + // MCP wiring. + let mcp_routing = match original_body.tools.as_deref() { + Some(tools) if request_uses_mcp_routing(tools) => { + let Some(mcp_orchestrator) = ctx.components.mcp_orchestrator().cloned() else { + return error::internal_error( + "internal_error", + "MCP orchestrator required for requests carrying MCP/builtin tools", + ); + }; + let Some(registry) = ctx.components.mcp_format_registry() else { + return error::internal_error( + "internal_error", + "MCP format registry required for requests carrying MCP/builtin tools", + ); + }; + let registry = registry.clone(); + ensure_request_mcp_client(&mcp_orchestrator, ®istry, tools) + .await + .map(|servers| (servers, mcp_orchestrator, registry)) } - }; - - // Check for MCP tools and create request context if needed - let mcp_servers = if let Some(tools) = original_body.tools.as_deref() { - ensure_request_mcp_client(&mcp_orchestrator, &mcp_format_registry, tools).await - } else { - None + _ => None, }; let client = ctx.components.client().clone(); @@ -1110,7 +1113,7 @@ pub async fn handle_streaming_response(ctx: RequestContext) -> Response { } }; - let Some(mcp_servers) = mcp_servers else { + let Some((mcp_servers, mcp_orchestrator, mcp_format_registry)) = mcp_routing else { return handle_simple_streaming_passthrough(&client, &worker, headers.as_ref(), req).await; }; diff --git a/model_gateway/tests/mcp_test.rs b/model_gateway/tests/mcp_test.rs index 0aa84eb81..d8b2920fd 100644 --- a/model_gateway/tests/mcp_test.rs +++ b/model_gateway/tests/mcp_test.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use common::mock_mcp_server::{MockMCPServer, MockSearchResponseMCPServer, MockSearchResponseMode}; use openai_protocol::responses::{ResponseOutputItem, WebSearchAction}; use serde_json::json; -use smg::routers::common::openai_bridge::{ResponseFormat, ResponseTransformer}; +use smg::routers::common::openai_bridge::{transform_tool_output, ResponseFormat}; use smg_mcp::{ core::config::{ResponseFormatConfig, ToolConfig}, McpConfig, McpOrchestrator, McpServerBinding, McpServerConfig, McpToolSession, McpTransport, @@ -334,17 +334,7 @@ async fn test_web_search_transform_handles_openai_search_response_with_mock() { assert!(!output.is_error, "Tool execution should succeed"); - // The session returns the raw `output` Value from the MCP call. Re-transform - // with WebSearchCall format to verify serialization (end-to-end source - // extraction is covered by the gateway bridge's own tests). - let transformed = ResponseTransformer::transform( - &output.output, - ResponseFormat::WebSearchCall, - "test-request-openai-search", - "openai_search_server", - "brave_web_search", - "{\"query\":\"rust openai search\"}", - ); + let transformed = transform_tool_output(&output, ResponseFormat::WebSearchCall); match transformed { ResponseOutputItem::WebSearchCall { action, .. } => match action { WebSearchAction::Search { @@ -419,14 +409,7 @@ async fn test_web_search_transform_sets_action_query_for_brave_search_with_mock( assert!(!output.is_error, "Tool execution should succeed"); - let transformed = ResponseTransformer::transform( - &output.output, - ResponseFormat::WebSearchCall, - "test-request-brave", - "brave_response_server", - "brave_web_search", - "{\"query\":\"rust brave query\"}", - ); + let transformed = transform_tool_output(&output, ResponseFormat::WebSearchCall); match transformed { ResponseOutputItem::WebSearchCall { action, .. } => match action { WebSearchAction::Search {