diff --git a/model_gateway/src/routers/common/mcp_utils.rs b/model_gateway/src/routers/common/mcp_utils.rs index e22271fc5..827477730 100644 --- a/model_gateway/src/routers/common/mcp_utils.rs +++ b/model_gateway/src/routers/common/mcp_utils.rs @@ -11,7 +11,8 @@ use smg_mcp::{BuiltinToolType, McpOrchestrator, McpServerBinding, McpServerConfi use tracing::{debug, warn}; use crate::routers::common::openai_bridge::{ - apply_hosted_tool_overrides, extract_hosted_tool_overrides, FormatRegistry, ResponseFormat, + self, apply_hosted_tool_overrides, extract_hosted_tool_overrides, FormatRegistry, + ResponseFormat, }; /// Default maximum tool loop iterations (safety limit). @@ -194,11 +195,8 @@ pub fn collect_builtin_routing( let mut routing = Vec::new(); for tool in tools { - let builtin_type = match tool { - ResponseTool::WebSearchPreview(_) => BuiltinToolType::WebSearchPreview, - ResponseTool::CodeInterpreter(_) => BuiltinToolType::CodeInterpreter, - ResponseTool::ImageGeneration(_) => BuiltinToolType::ImageGeneration, - _ => continue, + let Some(builtin_type) = openai_bridge::builtin_type_for_response_tool(tool) else { + continue; }; if let Some((server_name, tool_name)) = mcp_orchestrator.find_builtin_server(builtin_type) { @@ -238,12 +236,7 @@ pub fn collect_builtin_routing( pub fn extract_builtin_types(tools: &[ResponseTool]) -> Vec { tools .iter() - .filter_map(|t| match t { - ResponseTool::WebSearchPreview(_) => Some(BuiltinToolType::WebSearchPreview), - ResponseTool::CodeInterpreter(_) => Some(BuiltinToolType::CodeInterpreter), - ResponseTool::ImageGeneration(_) => Some(BuiltinToolType::ImageGeneration), - _ => None, - }) + .filter_map(openai_bridge::builtin_type_for_response_tool) .collect() } diff --git a/model_gateway/src/routers/common/openai_bridge/format_descriptor.rs b/model_gateway/src/routers/common/openai_bridge/format_descriptor.rs index e073a69d9..9e2e14d3d 100644 --- a/model_gateway/src/routers/common/openai_bridge/format_descriptor.rs +++ b/model_gateway/src/routers/common/openai_bridge/format_descriptor.rs @@ -14,6 +14,10 @@ use super::ResponseFormat; #[derive(Debug, Clone, Copy)] pub struct FormatDescriptor { pub type_str: &'static str, + /// Item-id kind without trailing `_` (e.g. `"ws"`, `"mcp"`). Joined to a + /// suffix at construction sites: `format!("{kind}_{rest}")`. Stored + /// without the underscore so callers that just need the discriminator + /// (allocator prefix, log labels) don't have to trim. pub id_prefix: &'static str, pub in_progress_event: &'static str, /// `None` for formats with no intermediate phase (e.g. Passthrough). @@ -24,11 +28,27 @@ pub struct FormatDescriptor { pub partial_image_event: Option<&'static str>, } +/// Reverse lookup for routers that have already received an item-type +/// string (`mcp_call`, `web_search_call`, …) from the upstream wire and +/// need to recover the matching `ResponseFormat` to consult the +/// descriptor. Returns `None` for non-format item types like +/// `function_call` or `message`. +pub fn format_from_type_str(type_str: &str) -> Option { + match type_str { + ItemType::MCP_CALL => Some(ResponseFormat::Passthrough), + ItemType::WEB_SEARCH_CALL => Some(ResponseFormat::WebSearchCall), + ItemType::CODE_INTERPRETER_CALL => Some(ResponseFormat::CodeInterpreterCall), + ItemType::FILE_SEARCH_CALL => Some(ResponseFormat::FileSearchCall), + ItemType::IMAGE_GENERATION_CALL => Some(ResponseFormat::ImageGenerationCall), + _ => None, + } +} + pub const fn descriptor(format: ResponseFormat) -> FormatDescriptor { match format { ResponseFormat::WebSearchCall => FormatDescriptor { type_str: ItemType::WEB_SEARCH_CALL, - id_prefix: "ws_", + id_prefix: "ws", in_progress_event: WebSearchCallEvent::IN_PROGRESS, searching_event: Some(WebSearchCallEvent::SEARCHING), completed_event: WebSearchCallEvent::COMPLETED, @@ -37,7 +57,7 @@ pub const fn descriptor(format: ResponseFormat) -> FormatDescriptor { }, ResponseFormat::CodeInterpreterCall => FormatDescriptor { type_str: ItemType::CODE_INTERPRETER_CALL, - id_prefix: "ci_", + id_prefix: "ci", in_progress_event: CodeInterpreterCallEvent::IN_PROGRESS, searching_event: Some(CodeInterpreterCallEvent::INTERPRETING), completed_event: CodeInterpreterCallEvent::COMPLETED, @@ -46,7 +66,7 @@ pub const fn descriptor(format: ResponseFormat) -> FormatDescriptor { }, ResponseFormat::FileSearchCall => FormatDescriptor { type_str: ItemType::FILE_SEARCH_CALL, - id_prefix: "fs_", + id_prefix: "fs", in_progress_event: FileSearchCallEvent::IN_PROGRESS, searching_event: Some(FileSearchCallEvent::SEARCHING), completed_event: FileSearchCallEvent::COMPLETED, @@ -55,7 +75,7 @@ pub const fn descriptor(format: ResponseFormat) -> FormatDescriptor { }, ResponseFormat::ImageGenerationCall => FormatDescriptor { type_str: ItemType::IMAGE_GENERATION_CALL, - id_prefix: "ig_", + id_prefix: "ig", in_progress_event: ImageGenerationCallEvent::IN_PROGRESS, searching_event: Some(ImageGenerationCallEvent::GENERATING), completed_event: ImageGenerationCallEvent::COMPLETED, @@ -64,7 +84,7 @@ pub const fn descriptor(format: ResponseFormat) -> FormatDescriptor { }, ResponseFormat::Passthrough => FormatDescriptor { type_str: ItemType::MCP_CALL, - id_prefix: "mcp_", + id_prefix: "mcp", in_progress_event: McpEvent::CALL_IN_PROGRESS, searching_event: None, completed_event: McpEvent::CALL_COMPLETED, diff --git a/model_gateway/src/routers/common/openai_bridge/format_registry.rs b/model_gateway/src/routers/common/openai_bridge/format_registry.rs index 14c5652ac..0892d1ac2 100644 --- a/model_gateway/src/routers/common/openai_bridge/format_registry.rs +++ b/model_gateway/src/routers/common/openai_bridge/format_registry.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use dashmap::DashMap; use smg_mcp::{inventory::ALIAS_SERVER_KEY, McpServerConfig, QualifiedToolName}; +use tracing::debug; use super::ResponseFormat; @@ -16,6 +17,12 @@ use super::ResponseFormat; /// `QualifiedToolName` returned by `qualified_name_for_exposed` rather than /// rebuilding one, so we pay the two `Arc` allocations once instead of /// twice per call. +/// +/// Telemetry: when the session knows the tool but the registry doesn't, we +/// log a `debug` event with the qualified name. That asymmetric miss is the +/// fingerprint of a registration-path bug (the orchestrator added the tool +/// but `populate_from_server_config` was never called for the same server), +/// and would otherwise dispatch silently as `mcp_call`. pub fn lookup_tool_format( session: &smg_mcp::McpToolSession<'_>, registry: &FormatRegistry, @@ -24,7 +31,17 @@ pub fn lookup_tool_format( let Some(qn) = session.qualified_name_for_exposed(exposed_name) else { return ResponseFormat::Passthrough; }; - registry.lookup(&qn) + let format = registry.lookup(&qn); + if format == ResponseFormat::Passthrough && !registry.contains(&qn) { + debug!( + exposed_name = %exposed_name, + server_key = %qn.server_key(), + tool_name = %qn.tool_name(), + "FormatRegistry miss for session-exposed tool — dispatching as Passthrough; \ + check that populate_from_server_config ran for this server" + ); + } + format } #[derive(Default, Debug, Clone)] @@ -48,6 +65,13 @@ impl FormatRegistry { self.lookup(&QualifiedToolName::new(server_key, tool_name)) } + /// True iff the registry has an explicit entry for `qualified`. Used by + /// [`lookup_tool_format`] to distinguish a registered Passthrough entry + /// from a missing entry that defaulted to Passthrough. + pub fn contains(&self, qualified: &QualifiedToolName) -> bool { + self.formats.contains_key(qualified) + } + fn insert(&self, qualified: QualifiedToolName, format: ResponseFormat) { self.formats.insert(qualified, format); } @@ -111,8 +135,10 @@ impl FormatRegistry { mod tests { use std::collections::HashMap; + use serde_json::json; use smg_mcp::{ - BuiltinToolType, McpServerConfig, McpTransport, ResponseFormatConfig, ToolConfig, + BuiltinToolType, McpConfig, McpOrchestrator, McpServerBinding, McpServerConfig, + McpToolSession, McpTransport, ResponseFormatConfig, Tool, ToolConfig, ToolEntry, }; use super::*; @@ -134,6 +160,21 @@ mod tests { } } + fn test_tool(name: &str) -> Tool { + let mut schema = serde_json::Map::new(); + schema.insert("type".to_string(), json!("object")); + schema.insert("properties".to_string(), json!({})); + Tool { + name: name.to_string().into(), + title: None, + description: Some("test".into()), + input_schema: schema.into(), + output_schema: None, + icons: None, + annotations: None, + } + } + #[test] fn lookup_unknown_returns_passthrough() { let r = FormatRegistry::new(); @@ -311,4 +352,82 @@ mod tests { ResponseFormat::Passthrough, ); } + + async fn orchestrator_with_tool(server_name: &str, tool_name: &str) -> McpOrchestrator { + let orchestrator = McpOrchestrator::new(McpConfig { + servers: vec![server(server_name)], + ..Default::default() + }) + .await + .expect("orchestrator"); + orchestrator + .tool_inventory() + .insert_entry(ToolEntry::from_server_tool( + server_name, + test_tool(tool_name), + )); + orchestrator + } + + fn binding(server_name: &str) -> Vec { + vec![McpServerBinding { + label: server_name.to_string(), + server_key: server_name.to_string(), + allowed_tools: None, + }] + } + + #[tokio::test] + async fn lookup_tool_format_returns_passthrough_when_session_unknown() { + let orchestrator = orchestrator_with_tool("brave", "brave_web_search").await; + let session = McpToolSession::new(&orchestrator, binding("brave"), "test-request"); + let registry = FormatRegistry::new(); + // `not_a_tool` was never registered with the session, so the lookup + // short-circuits at `qualified_name_for_exposed`. + assert_eq!( + lookup_tool_format(&session, ®istry, "not_a_tool"), + ResponseFormat::Passthrough + ); + } + + #[tokio::test] + async fn lookup_tool_format_returns_passthrough_for_session_known_but_registry_missing() { + // Asymmetric-miss branch: the session exposes the tool but the + // registry has no entry — production fingerprint of a server whose + // `populate_from_server_config` was skipped. Must dispatch as + // Passthrough rather than panicking or making up a hosted format. + let orchestrator = orchestrator_with_tool("brave", "brave_web_search").await; + let session = McpToolSession::new(&orchestrator, binding("brave"), "test-request"); + let registry = FormatRegistry::new(); + assert_eq!( + lookup_tool_format(&session, ®istry, "brave_web_search"), + ResponseFormat::Passthrough + ); + } + + #[tokio::test] + async fn lookup_tool_format_returns_registry_value_for_session_known_and_registered() { + // Happy path: session knows the tool AND registry has a hosted entry — + // composed lookup returns the hosted format. + let orchestrator = orchestrator_with_tool("brave", "brave_web_search").await; + let session = McpToolSession::new(&orchestrator, binding("brave"), "test-request"); + let mut tools = HashMap::new(); + tools.insert( + "brave_web_search".to_string(), + ToolConfig { + alias: None, + response_format: Some(ResponseFormatConfig::WebSearchCall), + arg_mapping: None, + }, + ); + let mut cfg = server("brave"); + cfg.tools = Some(tools); + let registry = FormatRegistry::new(); + registry.populate_from_server_config(&cfg); + + assert_eq!( + lookup_tool_format(&session, ®istry, "brave_web_search"), + ResponseFormat::WebSearchCall + ); + } } diff --git a/model_gateway/src/routers/common/openai_bridge/mod.rs b/model_gateway/src/routers/common/openai_bridge/mod.rs index 7ec13c1b2..7de2d4ea3 100644 --- a/model_gateway/src/routers/common/openai_bridge/mod.rs +++ b/model_gateway/src/routers/common/openai_bridge/mod.rs @@ -10,14 +10,15 @@ pub mod response_format; pub mod tool_descriptors; pub mod transformer; -pub use format_descriptor::{descriptor, FormatDescriptor}; +pub use format_descriptor::{descriptor, format_from_type_str, FormatDescriptor}; pub use format_registry::{lookup_tool_format, FormatRegistry}; pub use overrides::{apply_hosted_tool_overrides, extract_hosted_tool_overrides}; pub use response_format::ResponseFormat; pub use tool_descriptors::{ - build_mcp_tool_infos, chat_function_tools, configure_response_tools_approval, - function_tools_json, inject_client_visible_mcp_output_items, mcp_list_tools_item, - mcp_list_tools_json, response_tools, should_hide_output_item_json, should_hide_tool_json, + build_mcp_tool_infos, builtin_type_for_response_tool, chat_function_tools, + configure_response_tools_approval, function_tools_json, inject_client_visible_mcp_output_items, + mcp_list_tools_item, mcp_list_tools_json, response_tools, should_hide_output_item_json, + should_hide_tool_json, }; pub use transformer::{ compact_image_generation_output, compact_image_generation_outputs_json, diff --git a/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs b/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs index 76561e82a..7a499fc71 100644 --- a/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs +++ b/model_gateway/src/routers/common/openai_bridge/tool_descriptors.rs @@ -14,7 +14,24 @@ use openai_protocol::{ }, }; use serde_json::{json, Value}; -use smg_mcp::{ApprovalMode, McpToolSession}; +use smg_mcp::{ApprovalMode, BuiltinToolType, McpToolSession}; + +/// Map a single `ResponseTool` variant to the `BuiltinToolType` the MCP +/// router uses for resolution. `None` for non-builtin tool kinds (e.g. +/// `Function`, `Mcp`, `Computer`). Centralizing here keeps the four +/// hosted families (web_search_preview / code_interpreter / file_search / +/// image_generation) in lockstep — every caller that previously matched +/// on `ResponseTool::*` independently was at risk of forgetting a variant +/// when a new builtin lands. +pub fn builtin_type_for_response_tool(tool: &ResponseTool) -> Option { + match tool { + ResponseTool::WebSearchPreview(_) => Some(BuiltinToolType::WebSearchPreview), + ResponseTool::CodeInterpreter(_) => Some(BuiltinToolType::CodeInterpreter), + ResponseTool::FileSearch(_) => Some(BuiltinToolType::FileSearch), + ResponseTool::ImageGeneration(_) => Some(BuiltinToolType::ImageGeneration), + _ => None, + } +} #[inline] fn schema_to_value(schema: &serde_json::Map) -> Value { diff --git a/model_gateway/src/routers/grpc/common/responses/streaming.rs b/model_gateway/src/routers/grpc/common/responses/streaming.rs index 00793e1d5..37763e6c0 100644 --- a/model_gateway/src/routers/grpc/common/responses/streaming.rs +++ b/model_gateway/src/routers/grpc/common/responses/streaming.rs @@ -26,16 +26,25 @@ use crate::routers::{ grpc::harmony::responses::ToolResult, }; -pub(crate) enum OutputItemType { +/// Item-id-prefix discriminator for non-format kinds. Format-driven items +/// derive their prefix from `descriptor(format).id_prefix` instead — keeping +/// the wire-shape mapping in one place. +pub(crate) enum OutputItemKind { Message, McpListTools, - McpCall, FunctionCall, Reasoning, - WebSearchCall, - CodeInterpreterCall, - FileSearchCall, - ImageGenerationCall, +} + +impl OutputItemKind { + fn id_prefix(&self) -> &'static str { + match self { + Self::Message => "msg", + Self::McpListTools => "mcpl", + Self::FunctionCall => "fc", + Self::Reasoning => "rs", + } + } } /// Status of an output item @@ -531,18 +540,6 @@ impl ResponseStreamEventEmitter { } } - /// Map a `ResponseFormat` to the grpc-router-private `OutputItemType` enum. - pub fn output_item_type_for_format(response_format: Option<&ResponseFormat>) -> OutputItemType { - match response_format { - Some(ResponseFormat::WebSearchCall) => OutputItemType::WebSearchCall, - Some(ResponseFormat::CodeInterpreterCall) => OutputItemType::CodeInterpreterCall, - Some(ResponseFormat::FileSearchCall) => OutputItemType::FileSearchCall, - Some(ResponseFormat::ImageGenerationCall) => OutputItemType::ImageGenerationCall, - Some(ResponseFormat::Passthrough) => OutputItemType::McpCall, - None => OutputItemType::FunctionCall, - } - } - // ======================================================================== // Function Call Event Emission Methods // ======================================================================== @@ -617,23 +614,12 @@ impl ResponseStreamEventEmitter { format!("{}_{}", prefix, Uuid::now_v7().simple()) } - /// Allocate next output index and track item - pub fn allocate_output_index(&mut self, item_type: OutputItemType) -> (usize, String) { + /// Allocate next output index and track item, deriving the item-id from + /// `id_prefix` (e.g. `"msg"`, `"mcp"`, `"ws"` — without trailing `_`). + pub fn allocate_output_index_with_prefix(&mut self, id_prefix: &str) -> (usize, String) { let index = self.next_output_index; self.next_output_index += 1; - let id_prefix = match &item_type { - OutputItemType::McpListTools => "mcpl", - OutputItemType::McpCall => "mcp", - OutputItemType::FunctionCall => "fc", - OutputItemType::Message => "msg", - OutputItemType::Reasoning => "rs", - OutputItemType::WebSearchCall => "ws", - OutputItemType::CodeInterpreterCall => "ci", - OutputItemType::FileSearchCall => "fs", - OutputItemType::ImageGenerationCall => "ig", - }; - let id = Self::generate_item_id(id_prefix); self.output_items.push(OutputItemState { @@ -645,6 +631,25 @@ impl ResponseStreamEventEmitter { (index, id) } + /// Convenience: allocate for a non-format kind. + pub fn allocate_output_index(&mut self, kind: OutputItemKind) -> (usize, String) { + self.allocate_output_index_with_prefix(kind.id_prefix()) + } + + /// Convenience: allocate for a `ResponseFormat`-driven item, falling back + /// to `function_call` (`"fc"`) when no format applies. Mirrors the prior + /// `output_item_type_for_format` mapping but reads the prefix straight + /// off the format descriptor. + pub fn allocate_output_index_for_format( + &mut self, + response_format: Option, + ) -> (usize, String) { + let prefix = response_format + .map(|f| descriptor(f).id_prefix) + .unwrap_or("fc"); + self.allocate_output_index_with_prefix(prefix) + } + /// Mark output item as completed and store its data pub fn complete_output_item(&mut self, output_index: usize) { if let Some(item) = self @@ -719,7 +724,7 @@ impl ResponseStreamEventEmitter { reasoning_content: Option, ) -> Result<(), String> { // Allocate output index and generate ID - let (output_index, item_id) = self.allocate_output_index(OutputItemType::Reasoning); + let (output_index, item_id) = self.allocate_output_index(OutputItemKind::Reasoning); // Build reasoning item structure let item = json!({ @@ -758,7 +763,7 @@ impl ResponseStreamEventEmitter { // Allocate output_index and item_id for this message item (once per message) if self.current_item_id.is_none() { let (output_index, item_id) = - self.allocate_output_index(OutputItemType::Message); + self.allocate_output_index(OutputItemKind::Message); // Build message item structure let item = json!({ @@ -931,7 +936,7 @@ impl ResponseStreamEventEmitter { tools: &[mcp::ToolEntry], tx: &mpsc::UnboundedSender>, ) -> Result<(), String> { - let (output_index, item_id) = self.allocate_output_index(OutputItemType::McpListTools); + let (output_index, item_id) = self.allocate_output_index(OutputItemKind::McpListTools); // Build per-tool JSON items let tool_items = Self::tool_entries_to_json(tools).unwrap_or_else(|e| { diff --git a/model_gateway/src/routers/grpc/common/responses/utils.rs b/model_gateway/src/routers/grpc/common/responses/utils.rs index 336865050..05ce82844 100644 --- a/model_gateway/src/routers/grpc/common/responses/utils.rs +++ b/model_gateway/src/routers/grpc/common/responses/utils.rs @@ -55,14 +55,8 @@ pub(crate) async fn ensure_mcp_connection( // dispatches. let has_builtin_tools = tools .map(|t| { - t.iter().any(|tool| { - matches!( - tool, - ResponseTool::WebSearchPreview(_) - | ResponseTool::CodeInterpreter(_) - | ResponseTool::ImageGeneration(_) - ) - }) + t.iter() + .any(|tool| openai_bridge::builtin_type_for_response_tool(tool).is_some()) }) .unwrap_or(false); diff --git a/model_gateway/src/routers/grpc/harmony/responses/execution.rs b/model_gateway/src/routers/grpc/harmony/responses/execution.rs index d4a6b0853..904c3e262 100644 --- a/model_gateway/src/routers/grpc/harmony/responses/execution.rs +++ b/model_gateway/src/routers/grpc/harmony/responses/execution.rs @@ -14,7 +14,7 @@ use crate::{ observability::metrics::{metrics_labels, Metrics}, routers::common::{ mcp_utils::prepare_hosted_dispatch_args, - openai_bridge::{self, FormatRegistry}, + openai_bridge::{self, FormatRegistry, ResponseFormat}, }, }; @@ -33,12 +33,11 @@ pub(crate) struct ToolResult { pub is_error: bool, /// Correctly-typed output item for the Responses API, produced by - /// `ResponseTransformer::transform` (via - /// `ToolExecutionOutput::to_response_item`). Carries the per-tool - /// shape — e.g. `McpCall { output }`, `WebSearchCall { .. }`, - /// `ImageGenerationCall { result }` — so downstream code can - /// serialize the authoritative item rather than re-deriving fields - /// from `output`. + /// `openai_bridge::transform_tool_output(&output, response_format)`. + /// Carries the per-tool shape — e.g. `McpCall { output }`, + /// `WebSearchCall { .. }`, `ImageGenerationCall { result }` — so + /// downstream code can serialize the authoritative item rather than + /// re-deriving fields from `output`. pub output_item: ResponseOutputItem, } @@ -68,7 +67,12 @@ pub(super) async fn execute_mcp_tools( // For non-hosted-tool calls (Passthrough format), no override lookup runs. // Non-object model payloads coerce to `{}` so the override merge actually // applies rather than silently dropping the caller's declared config. - let inputs: Vec = tool_calls + // + // Resolve `response_format` ONCE per tool call here and zip it through to + // the output-handling pass below — the previous code looked it up twice + // per call (each lookup allocates two `Arc`s for the qualified + // name). + let prepared: Vec<(ToolExecutionInput, ResponseFormat)> = tool_calls .iter() .map(|tc| { let args_str = tc.function.arguments.as_deref().unwrap_or("{}"); @@ -97,28 +101,40 @@ pub(super) async fn execute_mcp_tools( let response_format = openai_bridge::lookup_tool_format(session, format_registry, &tc.function.name); prepare_hosted_dispatch_args(&mut args, response_format, request_tools, request_user); - ToolExecutionInput { + let input = ToolExecutionInput { call_id: tc.id.clone(), tool_name: tc.function.name.clone(), arguments: args, - } + }; + (input, response_format) }) .collect(); + let (inputs, formats): (Vec<_>, Vec<_>) = prepared.into_iter().unzip(); + debug!( tool_count = inputs.len(), "Executing MCP tools via unified API" ); - // Execute all tools via unified batch API + // `session.execute_tools` is `buffered()` and preserves input order, so + // `formats[i]` matches `outputs[i]`. The assert below upgrades that + // contract from a comment to a runtime check so a future change to the + // execution path can't silently truncate via `zip`. let outputs = session.execute_tools(inputs).await; + assert_eq!( + outputs.len(), + formats.len(), + "session.execute_tools returned {} outputs for {} inputs; \ + per-call format zip would silently drop entries", + outputs.len(), + formats.len(), + ); - // Convert outputs to ToolResults and record metrics/tracking let results: Vec = outputs .into_iter() - .map(|output| { - let response_format = - openai_bridge::lookup_tool_format(session, format_registry, &output.tool_name); + .zip(formats) + .map(|(output, response_format)| { let output_item = openai_bridge::transform_tool_output(&output, response_format); // Record this call in tracking diff --git a/model_gateway/src/routers/grpc/harmony/streaming.rs b/model_gateway/src/routers/grpc/harmony/streaming.rs index fef746643..5a5c9b908 100644 --- a/model_gateway/src/routers/grpc/harmony/streaming.rs +++ b/model_gateway/src/routers/grpc/harmony/streaming.rs @@ -38,7 +38,7 @@ use crate::{ responses::{ build_sse_response, streaming::{ - attach_mcp_server_label, OutputItemType, ResponseStreamEventEmitter, + attach_mcp_server_label, OutputItemKind, ResponseStreamEventEmitter, }, }, }, @@ -633,7 +633,7 @@ impl HarmonyStreamingProcessor { // Allocate message item if needed if message_output_index.is_none() { let (output_index, item_id) = - emitter.allocate_output_index(OutputItemType::Message); + emitter.allocate_output_index(OutputItemKind::Message); message_output_index = Some(output_index); message_item_id = Some(item_id.clone()); @@ -706,17 +706,12 @@ impl HarmonyStreamingProcessor { } }); - // Determine output item type and JSON type string - let output_item_type = - ResponseStreamEventEmitter::output_item_type_for_format( - response_format.as_ref(), - ); let type_str = ResponseStreamEventEmitter::type_str_for_format( response_format.as_ref(), ); let (output_index, item_id) = - emitter.allocate_output_index(output_item_type); + emitter.allocate_output_index_for_format(response_format); tool_call_tracking.insert( call_index, diff --git a/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs b/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs index 1918307a5..2d15d6b83 100644 --- a/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs +++ b/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs @@ -26,7 +26,7 @@ use crate::{ routers::{ common::{ mcp_utils::{prepare_hosted_dispatch_args, DEFAULT_MAX_ITERATIONS}, - openai_bridge, + openai_bridge::{self, ResponseFormat}, }, error, grpc::common::responses::{ @@ -350,7 +350,11 @@ pub(super) async fn execute_tool_loop( // request-level `user` is also forwarded into hosted-tool args. let request_tools = original_request.tools.as_deref().unwrap_or(&[]); let request_user = original_request.user.as_deref(); - let inputs: Vec = mcp_tool_calls + // Resolve `response_format` once per call here and zip it through + // to the output processing pass — looking it up twice (once for + // arg prep, once for transform) allocates two extra `Arc`s + // per call. `session.execute_tools` preserves input ordering. + let prepared: Vec<(ToolExecutionInput, ResponseFormat)> = mcp_tool_calls .into_iter() .map(|tc| { let mut arguments = @@ -369,19 +373,29 @@ pub(super) async fn execute_tool_loop( request_tools, request_user, ); - ToolExecutionInput { + let input = ToolExecutionInput { call_id: tc.call_id, tool_name: tc.name, arguments, - } + }; + (input, response_format) }) .collect(); + let (inputs, formats): (Vec<_>, Vec<_>) = prepared.into_iter().unzip(); - // Execute all MCP tools via session let results = session.execute_tools(inputs).await; + // `session.execute_tools` preserves input order and length; assert + // it so a regression there can't silently truncate via `zip`. + assert_eq!( + results.len(), + formats.len(), + "session.execute_tools returned {} outputs for {} inputs; \ + per-call format zip would silently drop entries", + results.len(), + formats.len(), + ); - // Process results: record metrics and state - for result in results { + for (result, response_format) in results.into_iter().zip(formats) { trace!( "Tool '{}' (call_id: {}) completed in {:?}, success={}", result.tool_name, @@ -390,7 +404,6 @@ pub(super) async fn execute_tool_loop( !result.is_error ); - // Record MCP tool metrics Metrics::record_mcp_tool_duration( ¤t_request.model, &result.tool_name, @@ -406,12 +419,6 @@ pub(super) async fn execute_tool_loop( }, ); - // Record the call in state with transformed output item - let response_format = openai_bridge::lookup_tool_format( - &session, - &ctx.mcp_format_registry, - &result.tool_name, - ); let output_item = openai_bridge::transform_tool_output(&result, response_format); let output_str = result.output.to_string(); state.record_call( diff --git a/model_gateway/src/routers/grpc/regular/responses/streaming.rs b/model_gateway/src/routers/grpc/regular/responses/streaming.rs index 19bd6624b..b252a0d52 100644 --- a/model_gateway/src/routers/grpc/regular/responses/streaming.rs +++ b/model_gateway/src/routers/grpc/regular/responses/streaming.rs @@ -57,7 +57,7 @@ use crate::{ grpc::{ common::responses::{ build_sse_response, persist_response_if_needed, - streaming::{attach_mcp_server_label, OutputItemType, ResponseStreamEventEmitter}, + streaming::{attach_mcp_server_label, OutputItemKind, ResponseStreamEventEmitter}, ResponsesContext, }, utils, @@ -649,12 +649,12 @@ async fn execute_tool_loop_streaming_internal( // Use emitter helpers to determine correct type and allocate index let item_type = ResponseStreamEventEmitter::type_str_for_format(Some(&response_format)); - let output_item_type = - ResponseStreamEventEmitter::output_item_type_for_format(Some(&response_format)); let resolved_label = session.resolve_tool_server_label(&tool_call.name); - // Allocate output_index with correct type (generates appropriate item_id prefix) - let (output_index, item_id) = emitter.allocate_output_index(output_item_type); + // Allocate output_index with the format's id-prefix discriminator + // (e.g. `ws_…` for web_search_call); see FormatDescriptor. + let (output_index, item_id) = + emitter.allocate_output_index_for_format(Some(response_format)); // Build initial tool call item let mut item = json!({ @@ -833,7 +833,7 @@ async fn execute_tool_loop_streaming_internal( for tool_call in function_tool_calls { // Allocate output_index for this function_tool_call item let (output_index, item_id) = - emitter.allocate_output_index(OutputItemType::FunctionCall); + emitter.allocate_output_index(OutputItemKind::FunctionCall); // Build initial function_call item let item = json!({ diff --git a/model_gateway/src/routers/openai/mcp/tool_loop.rs b/model_gateway/src/routers/openai/mcp/tool_loop.rs index 9016536cd..e5f16eec3 100644 --- a/model_gateway/src/routers/openai/mcp/tool_loop.rs +++ b/model_gateway/src/routers/openai/mcp/tool_loop.rs @@ -13,10 +13,7 @@ use std::{collections::HashSet, io}; use axum::http::HeaderMap; use bytes::Bytes; use openai_protocol::{ - event_types::{ - is_function_call_type, CodeInterpreterCallEvent, FileSearchCallEvent, - ImageGenerationCallEvent, ItemType, McpEvent, OutputItemEvent, WebSearchCallEvent, - }, + event_types::{is_function_call_type, ItemType, McpEvent, OutputItemEvent}, responses::{generate_id, ResponseInput, ResponseTool, ResponsesRequest}, }; use serde_json::{json, to_value, Value}; @@ -502,18 +499,9 @@ fn send_tool_call_intermediate_event( response_format: ResponseFormat, sequence_number: &mut u64, ) -> bool { - // Determine event type and ID prefix based on response format - let event_type = match response_format { - ResponseFormat::WebSearchCall => WebSearchCallEvent::SEARCHING, - ResponseFormat::CodeInterpreterCall => CodeInterpreterCallEvent::INTERPRETING, - ResponseFormat::FileSearchCall => FileSearchCallEvent::SEARCHING, - // `generating` is the intermediate event for image_generation_call, on - // par with `searching` for web/file search and `interpreting` for code. - // `partial_image` events are emitted inline by the underlying tool when - // it streams preview chunks; the tool_loop path only emits the coarse - // in_progress → generating → completed sequence. - ResponseFormat::ImageGenerationCall => ImageGenerationCallEvent::GENERATING, - ResponseFormat::Passthrough => return true, // mcp_call has no intermediate event + // mcp_call has no intermediate event (descriptor.searching_event = None). + let Some(event_type) = openai_bridge::descriptor(response_format).searching_event else { + return true; }; let effective_output_index = call.effective_output_index(); @@ -546,19 +534,16 @@ fn send_tool_call_completion_events( let effective_output_index = call.effective_output_index(); let item_id = stable_streaming_tool_item_id(call, response_format); - // Determine the completion event type based on item type + // Resolve the completion event from the item's `type` (the typed + // `tool_call_item` may have been re-tagged after dispatch); fall back + // to the format-derived event for unknown types. let item_type = tool_call_item .get("type") .and_then(|v| v.as_str()) .unwrap_or(""); - - let completed_event_type: &str = match item_type { - ItemType::WEB_SEARCH_CALL => WebSearchCallEvent::COMPLETED, - ItemType::CODE_INTERPRETER_CALL => CodeInterpreterCallEvent::COMPLETED, - ItemType::FILE_SEARCH_CALL => FileSearchCallEvent::COMPLETED, - ItemType::IMAGE_GENERATION_CALL => ImageGenerationCallEvent::COMPLETED, - _ => McpEvent::CALL_COMPLETED, // Default to mcp_call for mcp_call and unknown types - }; + let completed_event_type: &str = openai_bridge::format_from_type_str(item_type) + .map(|f| openai_bridge::descriptor(f).completed_event) + .unwrap_or_else(|| openai_bridge::descriptor(response_format).completed_event); // Event 1: response..completed let completed_payload = json!({ @@ -597,46 +582,45 @@ fn stable_streaming_tool_item_id( ) -> String { let source_id = call.item_id.as_deref().unwrap_or(call.call_id.as_str()); - match response_format { - ResponseFormat::Passthrough => mcp_response_item_id(source_id), - ResponseFormat::WebSearchCall => normalize_tool_item_id_with_prefix(source_id, "ws_"), - ResponseFormat::CodeInterpreterCall => normalize_tool_item_id_with_prefix(source_id, "ci_"), - ResponseFormat::FileSearchCall => normalize_tool_item_id_with_prefix(source_id, "fs_"), - // `ig_` prefix mirrors the shared transformer's output item id - // (`to_image_generation_call`) and the 2-letter convention used by - // the other hosted tool formats. - ResponseFormat::ImageGenerationCall => normalize_tool_item_id_with_prefix(source_id, "ig_"), + if response_format == ResponseFormat::Passthrough { + // mcp_response_item_id encodes the `mcp_*` rewrite rules + // (preserving an existing `mcp_` prefix instead of double-prefixing). + mcp_response_item_id(source_id) + } else { + let prefix = openai_bridge::descriptor(response_format).id_prefix; + normalize_tool_item_id_with_prefix(source_id, prefix) } } -fn normalize_tool_item_id_with_prefix(source_id: &str, target_prefix: &str) -> String { - if source_id.starts_with(target_prefix) { +fn normalize_tool_item_id_with_prefix(source_id: &str, prefix: &str) -> String { + let prefix_with_underscore = format!("{prefix}_"); + if source_id.starts_with(&prefix_with_underscore) { return source_id.to_string(); } source_id .strip_prefix("fc_") .or_else(|| source_id.strip_prefix("call_")) - .map(|stripped| format!("{target_prefix}{stripped}")) - .unwrap_or_else(|| format!("{target_prefix}{source_id}")) + .map(|stripped| format!("{prefix_with_underscore}{stripped}")) + .unwrap_or_else(|| format!("{prefix_with_underscore}{source_id}")) } fn non_streaming_tool_item_id_source(item_id: &str, response_format: ResponseFormat) -> String { - match response_format { - ResponseFormat::Passthrough => item_id.to_string(), - ResponseFormat::WebSearchCall - | ResponseFormat::CodeInterpreterCall - | ResponseFormat::FileSearchCall - | ResponseFormat::ImageGenerationCall => item_id + if response_format == ResponseFormat::Passthrough { + item_id.to_string() + } else { + // Hosted-builtin formats strip the upstream function-call prefix; the + // bridge's success builders re-add the format-specific prefix. + item_id .strip_prefix("fc_") .or_else(|| item_id.strip_prefix("call_")) .unwrap_or(item_id) - .to_string(), + .to_string() } } fn approval_request_item_id_source(item_id: &str) -> String { - normalize_tool_item_id_with_prefix(item_id, "mcpr_") + normalize_tool_item_id_with_prefix(item_id, "mcpr") } pub(crate) fn mcp_list_tools_bindings_to_emit( @@ -1902,4 +1886,25 @@ mod tests { "exactly one `output_item.done` expected, got {done_count}: {types:?}" ); } + + #[test] + fn approval_request_item_id_source_uses_single_underscore() { + // Regression: `normalize_tool_item_id_with_prefix` appends `_` + // internally, so the prefix argument must be the bare token. A + // prior version passed `"mcpr_"` here and emitted `mcpr__...` ids, + // breaking the approval wire format. + assert_eq!(super::approval_request_item_id_source("fc_abc"), "mcpr_abc"); + assert_eq!( + super::approval_request_item_id_source("call_xyz"), + "mcpr_xyz" + ); + assert_eq!( + super::approval_request_item_id_source("mcpr_already"), + "mcpr_already" + ); + assert_eq!( + super::approval_request_item_id_source("raw_id"), + "mcpr_raw_id" + ); + } } diff --git a/model_gateway/src/routers/openai/responses/streaming.rs b/model_gateway/src/routers/openai/responses/streaming.rs index 2c8b27288..420ce6052 100644 --- a/model_gateway/src/routers/openai/responses/streaming.rs +++ b/model_gateway/src/routers/openai/responses/streaming.rs @@ -18,9 +18,8 @@ use bytes::Bytes; use futures_util::StreamExt; use openai_protocol::{ event_types::{ - is_function_call_type, is_response_event, CodeInterpreterCallEvent, FileSearchCallEvent, - FunctionCallEvent, ImageGenerationCallEvent, ItemType, McpEvent, OutputItemEvent, - ResponseEvent, WebSearchCallEvent, + is_function_call_type, is_response_event, FunctionCallEvent, ItemType, McpEvent, + OutputItemEvent, ResponseEvent, }, responses::{ResponseTool, ResponsesRequest}, }; @@ -167,7 +166,7 @@ pub(super) fn apply_event_transformations_inplace( if new_type == ItemType::MCP_CALL { item["id"] = json!(mcp_response_item_id(id)); } else if let Some(stripped) = id.strip_prefix("fc_") { - let new_id = format!("{id_prefix}{stripped}"); + let new_id = format!("{id_prefix}_{stripped}"); item["id"] = json!(new_id); } } @@ -430,15 +429,10 @@ fn maybe_inject_tool_in_progress( let item_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); - // Determine the in_progress event type based on item type - let event_type = match item_type { - ItemType::MCP_CALL => McpEvent::CALL_IN_PROGRESS, - ItemType::WEB_SEARCH_CALL => WebSearchCallEvent::IN_PROGRESS, - ItemType::CODE_INTERPRETER_CALL => CodeInterpreterCallEvent::IN_PROGRESS, - ItemType::FILE_SEARCH_CALL => FileSearchCallEvent::IN_PROGRESS, - ItemType::IMAGE_GENERATION_CALL => ImageGenerationCallEvent::IN_PROGRESS, - _ => return true, // Not a tool call item, nothing to inject + let Some(format) = openai_bridge::format_from_type_str(item_type) else { + return true; // Not a tool call item, nothing to inject }; + let event_type = openai_bridge::descriptor(format).in_progress_event; let Some(item_id) = item.get("id").and_then(|v| v.as_str()) else { return true;