diff --git a/crates/mcp/src/core/session.rs b/crates/mcp/src/core/session.rs index 8e72544e5..5d2baa995 100644 --- a/crates/mcp/src/core/session.rs +++ b/crates/mcp/src/core/session.rs @@ -591,7 +591,7 @@ impl<'a> McpToolSession<'a> { server_label, name, .. } => !self.should_hide_mcp_call_like_by_label(name, server_label), ResponseOutputItem::FunctionToolCall { name, .. } => { - !self.should_hide_function_call_like(name, user_function_names) + !self.should_hide_internal_non_builtin_function_like(name, user_function_names) } ResponseOutputItem::WebSearchCall { .. } | ResponseOutputItem::CodeInterpreterCall { .. } @@ -621,8 +621,9 @@ impl<'a> McpToolSession<'a> { user_function_names: &HashSet, ) -> bool { match tool.get("type").and_then(|value| value.as_str()) { - Some("function") => Self::function_tool_name_json(tool) - .is_some_and(|name| self.should_hide_function_call_like(name, user_function_names)), + Some("function") => Self::function_tool_name_json(tool).is_some_and(|name| { + self.should_hide_internal_non_builtin_function_like(name, user_function_names) + }), // MCP tool entries are keyed by server metadata, so function-name collision // handling does not apply to this arm. Some("mcp") => tool @@ -671,7 +672,9 @@ impl<'a> McpToolSession<'a> { Some("function_call") | Some("function_tool_call") => item .get("name") .and_then(|value| value.as_str()) - .is_some_and(|name| self.should_hide_function_call_like(name, user_function_names)), + .is_some_and(|name| { + self.should_hide_internal_non_builtin_function_like(name, user_function_names) + }), _ => false, } } @@ -695,12 +698,12 @@ impl<'a> McpToolSession<'a> { } } - fn should_hide_function_call_like( + fn should_hide_internal_non_builtin_function_like( &self, name: &str, user_function_names: &HashSet, ) -> bool { - self.is_internal_tool(name) && !user_function_names.contains(name) + self.is_internal_non_builtin_tool(name) && !user_function_names.contains(name) } fn function_tool_name_json(tool: &serde_json::Value) -> Option<&str> { diff --git a/docs/reference/mcp-internal-servers.md b/docs/reference/mcp-internal-servers.md index f91d24fae..52e228042 100644 --- a/docs/reference/mcp-internal-servers.md +++ b/docs/reference/mcp-internal-servers.md @@ -17,14 +17,15 @@ servers: ``` In the current implementation, `internal: true` applies only to self-provided -MCP servers declared under `servers:`. It affects final assembled, -non-streaming MCP responses by allowing higher layers to strip internal server -tool lists and tool-call trace items before the response is returned to the -client. +MCP servers declared under `servers:`. The model may still see and call these +tools during gateway-managed tool loops, but OpenAI Responses client-facing +output hides internal non-builtin tool details before returning data to the +client. That includes final non-streaming responses, final streaming +`response.completed` events, live streaming tool-call events, live +`mcp_list_tools` events, and response envelope `tools` / `tool_choice` fields. -This flag does not currently hide streaming output, and it does not apply to -builtin-routed MCP results such as `web_search_call`, `code_interpreter_call`, -or `file_search_call`. +This flag does not apply to builtin-routed MCP results such as +`web_search_call`, `code_interpreter_call`, or `file_search_call`. This flag is generic. It does not imply any vendor-specific behavior and does not change transport setup or tool execution on its own. diff --git a/model_gateway/src/routers/openai/mcp/tool_loop.rs b/model_gateway/src/routers/openai/mcp/tool_loop.rs index 730cdd430..4af6f2e64 100644 --- a/model_gateway/src/routers/openai/mcp/tool_loop.rs +++ b/model_gateway/src/routers/openai/mcp/tool_loop.rs @@ -200,6 +200,10 @@ pub(crate) async fn execute_streaming_tool_calls( let response_format = session.tool_response_format(&call.name); let server_label = session.resolve_tool_server_label(&call.name); + let emit_tool_events = !session.is_internal_non_builtin_tool(&call.name); + if !emit_tool_events && tx.is_closed() { + return false; + } let mut arguments: Value = match serde_json::from_str(args_str) { Ok(v) => v, @@ -221,13 +225,15 @@ pub(crate) async fn execute_streaming_tool_calls( Value::String(stable_streaming_tool_item_id(&call, &response_format)), ); } - if !send_tool_call_completion_events( - tx, - &call, - &mcp_call_item, - &response_format, - sequence_number, - ) { + if emit_tool_events + && !send_tool_call_completion_events( + tx, + &call, + &mcp_call_item, + &response_format, + sequence_number, + ) + { return false; } state.record_call( @@ -242,7 +248,9 @@ pub(crate) async fn execute_streaming_tool_calls( } }; - if !send_tool_call_intermediate_event(tx, &call, &response_format, sequence_number) { + if emit_tool_events + && !send_tool_call_intermediate_event(tx, &call, &response_format, sequence_number) + { return false; } @@ -266,6 +274,9 @@ pub(crate) async fn execute_streaming_tool_calls( // Log the effective (post-merge) args so the log reflects what the // MCP server actually receives, not the pre-merge string from the model. debug!("Calling MCP tool '{}' with args: {}", call.name, arguments); + if !emit_tool_events && tx.is_closed() { + return false; + } let tool_output = session .execute_tool(ToolExecutionInput { call_id: call.call_id.clone(), @@ -284,6 +295,9 @@ pub(crate) async fn execute_streaming_tool_calls( metrics_labels::RESULT_SUCCESS }, ); + if !emit_tool_events && tx.is_closed() { + return false; + } let output_str = tool_output.output.to_string(); let mut mcp_call_item = to_value(tool_output.to_response_item()).unwrap_or_else(|e| { @@ -297,13 +311,15 @@ pub(crate) async fn execute_streaming_tool_calls( ); } - if !send_tool_call_completion_events( - tx, - &call, - &mcp_call_item, - &response_format, - sequence_number, - ) { + if emit_tool_events + && !send_tool_call_completion_events( + tx, + &call, + &mcp_call_item, + &response_format, + sequence_number, + ) + { return false; } @@ -1440,6 +1456,80 @@ mod tests { )); } + #[tokio::test] + async fn streaming_tool_execution_suppresses_events_for_internal_non_builtin_tools() { + let orchestrator = McpOrchestrator::new(McpConfig { + servers: vec![McpServerConfig { + name: "internal-server".to_string(), + transport: McpTransport::Sse { + url: "http://localhost:3000/sse".to_string(), + token: None, + headers: Default::default(), + }, + proxy: None, + required: false, + tools: None, + builtin_type: None, + builtin_tool_name: None, + internal: true, + }], + ..Default::default() + }) + .await + .expect("orchestrator"); + orchestrator + .tool_inventory() + .insert_entry(ToolEntry::from_server_tool( + "internal-server", + test_tool("internal_search"), + )); + let session = McpToolSession::new( + &orchestrator, + vec![McpServerBinding { + label: "internal-label".to_string(), + server_key: "internal-server".to_string(), + allowed_tools: None, + }], + "test-request", + ); + let pending_call = super::FunctionCallInProgress { + call_id: "call_internal".to_string(), + name: "internal_search".to_string(), + arguments_buffer: "{not-json".to_string(), + item_id: Some("fc_internal".to_string()), + output_index: 0, + last_obfuscation: None, + assigned_output_index: Some(0), + }; + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut state = ToolLoopState::new(ResponseInput::Text("hello".to_string()), Vec::new()); + let mut sequence_number = 0; + + let ok = super::execute_streaming_tool_calls( + vec![pending_call], + &session, + &tx, + &mut state, + &mut sequence_number, + "gpt-5.4", + &[], + None, + ) + .await; + drop(tx); + + assert!(ok); + assert_eq!( + drain_channel(&mut rx), + Vec::::new(), + "internal tool execution must not emit streaming tool events" + ); + assert_eq!(state.mcp_call_items.len(), 1); + assert!(state.mcp_call_items[0] + .to_string() + .contains("internal_search")); + } + #[test] fn emits_only_new_binding_when_resume_adds_second_tool_block() { let existing_labels = HashSet::from(["deepwiki_ask".to_string()]); diff --git a/model_gateway/src/routers/openai/responses/streaming.rs b/model_gateway/src/routers/openai/responses/streaming.rs index e22e36cf8..ca1d620ce 100644 --- a/model_gateway/src/routers/openai/responses/streaming.rs +++ b/model_gateway/src/routers/openai/responses/streaming.rs @@ -7,7 +7,7 @@ //! - MCP tool execution loops within streaming responses //! - Event transformation and output index remapping -use std::{borrow::Cow, io, sync::Arc}; +use std::{borrow::Cow, collections::HashSet, io, sync::Arc}; use axum::{ body::Body, @@ -37,7 +37,7 @@ use super::{ common::{extract_output_index, get_event_type, parse_sse_block, ChunkProcessor}, utils::{ patch_response_with_request_metadata, response_tool_to_value, restore_original_tools, - rewrite_streaming_block, + rewrite_streaming_block, strip_internal_mcp_artifacts, }, }; const SSE_DONE: &str = "data: [DONE]\n\n"; @@ -49,7 +49,7 @@ use crate::{ header_utils::{ extract_forwardable_request_headers, preserve_response_headers, ApiProvider, }, - mcp_utils::DEFAULT_MAX_ITERATIONS, + mcp_utils::{collect_user_function_names, DEFAULT_MAX_ITERATIONS}, persistence_utils::persist_conversation_items, }, error, @@ -128,6 +128,20 @@ pub(super) fn apply_event_transformations_inplace( } } } + + // Live response envelopes can echo the effective model tool payload. + // Redact leaked internal artifacts in-place without synthesizing final + // response-only fields that upstream omitted from the live envelope. + if let Some(response) = parsed_data.get_mut("response") { + let has_client_visible_tool_fields = response.get("tools").is_some() + || response.get("tool_choice").is_some() + || response.get("output").is_some(); + + if has_client_visible_tool_fields { + strip_internal_mcp_artifacts(response, ctx.original_request, ctx.session); + changed = true; + } + } } // 2. Apply transform_streaming_event logic (function_call → mcp_call/web_search_call) @@ -352,6 +366,10 @@ pub(super) fn forward_streaming_event( return true; } + if should_suppress_internal_streaming_event(&parsed_data, handler, ctx) { + return true; + } + // Handle function_call_arguments.done - send buffered args first let mut mapped_output_index: Option = None; if event_name == Some(FunctionCallEvent::ARGUMENTS_DONE) @@ -417,6 +435,58 @@ pub(super) fn forward_streaming_event( true } +fn should_suppress_internal_streaming_event( + parsed_data: &Value, + handler: &StreamingToolHandler, + ctx: &StreamingEventContext<'_>, +) -> bool { + let Some(session) = ctx.session else { + return false; + }; + let user_function_names = collect_user_function_names(ctx.original_request); + + if let Some(item) = parsed_data.get("item") { + if session.should_hide_output_item_json(item, &user_function_names) { + return true; + } + } + + let Some(tool_name) = streaming_event_tool_name(parsed_data, handler) else { + return false; + }; + + session.is_internal_non_builtin_tool(tool_name.as_ref()) + && !user_function_names.contains(tool_name.as_ref()) +} + +fn streaming_event_tool_name<'a>( + parsed_data: &'a Value, + handler: &'a StreamingToolHandler, +) -> Option> { + if let Some(name) = parsed_data + .get("item") + .and_then(|item| item.get("name")) + .and_then(|value| value.as_str()) + { + return Some(Cow::Borrowed(name)); + } + + if let Some(name) = parsed_data + .get("delta") + .and_then(|delta| delta.get("name")) + .and_then(|value| value.as_str()) + { + return Some(Cow::Borrowed(name)); + } + + let output_index = extract_output_index(parsed_data)?; + handler + .pending_calls + .iter() + .find(|call| call.output_index == output_index) + .and_then(|call| (!call.name.is_empty()).then_some(Cow::Borrowed(call.name.as_str()))) +} + /// Inject in_progress event after a tool call item is added. /// Handles mcp_call, web_search_call, code_interpreter_call, file_search_call, /// and image_generation_call items. @@ -487,7 +557,7 @@ pub(super) fn send_final_response_event( inject_mcp_metadata_streaming(&mut final_response, state, session); } - restore_original_tools(&mut final_response, ctx.original_request, None); + restore_original_tools(&mut final_response, ctx.original_request, ctx.session); patch_response_with_request_metadata( &mut final_response, ctx.original_request, @@ -721,10 +791,8 @@ pub(super) fn handle_streaming_with_tool_interception( let mut sequence_number: u64 = 0; let mut next_output_index: usize = 0; let mut preserved_response_id: Option = None; - let list_tools_bindings = mcp_list_tools_bindings_to_emit( - &state.existing_mcp_list_tools_labels, - session.mcp_servers(), - ); + let list_tools_bindings = + client_visible_mcp_list_tools_bindings(&state.existing_mcp_list_tools_labels, &session); let streaming_ctx = StreamingEventContext { original_request: &original_request, @@ -1062,6 +1130,16 @@ pub(super) fn handle_streaming_with_tool_interception( response } +fn client_visible_mcp_list_tools_bindings( + existing_labels: &HashSet, + session: &McpToolSession<'_>, +) -> Vec<(String, String)> { + mcp_list_tools_bindings_to_emit(existing_labels, session.mcp_servers()) + .into_iter() + .filter(|(server_label, _)| !session.is_internal_non_builtin_server_label(server_label)) + .collect() +} + /// Main entry point for streaming responses pub async fn handle_streaming_response(ctx: RequestContext) -> Response { use crate::routers::common::mcp_utils::ensure_request_mcp_client; @@ -1114,3 +1192,142 @@ pub async fn handle_streaming_response(ctx: RequestContext) -> Response { mcp_servers, ) } + +#[cfg(test)] +mod tests { + use openai_protocol::responses::{ResponseInput, ResponsesRequest}; + use serde_json::json; + use smg_mcp::{ + McpConfig, McpOrchestrator, McpServerBinding, McpServerConfig, McpToolSession, + McpTransport, Tool, ToolEntry, + }; + use tokio::sync::mpsc; + + use super::*; + + fn test_tool(name: &str) -> Tool { + let mut schema = serde_json::Map::new(); + schema.insert("type".to_string(), json!("object")); + schema.insert("properties".to_string(), json!({})); + + Tool { + name: name.to_string().into(), + title: None, + description: Some("internal".into()), + input_schema: schema.into(), + output_schema: None, + icons: None, + annotations: None, + } + } + + async fn internal_orchestrator_with_tool(tool_name: &str) -> McpOrchestrator { + let orchestrator = McpOrchestrator::new(McpConfig { + servers: vec![McpServerConfig { + name: "internal-server".to_string(), + transport: McpTransport::Sse { + url: "http://localhost:3000/sse".to_string(), + token: None, + headers: Default::default(), + }, + proxy: None, + required: false, + tools: None, + builtin_type: None, + builtin_tool_name: None, + internal: true, + }], + ..Default::default() + }) + .await + .expect("orchestrator"); + orchestrator + .tool_inventory() + .insert_entry(ToolEntry::from_server_tool( + "internal-server", + test_tool(tool_name), + )); + orchestrator + } + + fn internal_binding() -> McpServerBinding { + McpServerBinding { + label: "internal-label".to_string(), + server_key: "internal-server".to_string(), + allowed_tools: None, + } + } + + fn drain_channel(rx: &mut mpsc::UnboundedReceiver>) -> Vec { + let mut events = Vec::new(); + while let Ok(chunk) = rx.try_recv() { + let bytes = chunk.expect("no io errors in unit-test channel"); + events.push(String::from_utf8(bytes.to_vec()).expect("utf-8 sse block")); + } + events + } + + #[tokio::test] + async fn forward_streaming_event_strips_internal_tools_from_response_envelope() { + let orchestrator = internal_orchestrator_with_tool("internal_search").await; + let session = McpToolSession::new( + &orchestrator, + vec![internal_binding()], + "test-internal-response-envelope", + ); + let original_request = ResponsesRequest { + model: "gpt-5.4".to_string(), + input: ResponseInput::Text("hello".to_string()), + ..Default::default() + }; + let ctx = StreamingEventContext { + original_request: &original_request, + previous_response_id: None, + session: Some(&session), + }; + let mut handler = StreamingToolHandler::with_starting_index(0); + let mut sequence_number = 0; + let (tx, mut rx) = mpsc::unbounded_channel(); + let payload = json!({ + "type": ResponseEvent::IN_PROGRESS, + "sequence_number": 0, + "response": { + "id": "resp_123", + "model": "gpt-5.4", + "tools": [{ + "type": "function", + "name": "internal_search", + "parameters": {"type": "object"} + }], + "tool_choice": { + "type": "function", + "name": "internal_search" + } + } + }); + let data = payload.to_string(); + let raw_block = format!("event: {}\ndata: {}", ResponseEvent::IN_PROGRESS, data); + + assert!(forward_streaming_event( + SseEventData { + raw_block: &raw_block, + event_name: Some(ResponseEvent::IN_PROGRESS), + data: &data, + pre_parsed: Some(payload), + }, + &mut handler, + &tx, + &ctx, + &mut sequence_number, + )); + drop(tx); + + let events = drain_channel(&mut rx); + assert_eq!(events.len(), 1, "expected response envelope event"); + assert!( + !events[0].contains("internal_search"), + "response envelope leaked internal tool definition: {}", + events[0] + ); + } +} diff --git a/model_gateway/src/routers/openai/responses/utils.rs b/model_gateway/src/routers/openai/responses/utils.rs index 03a2b24a2..aef86e960 100644 --- a/model_gateway/src/routers/openai/responses/utils.rs +++ b/model_gateway/src/routers/openai/responses/utils.rs @@ -279,11 +279,33 @@ pub(super) fn restore_original_tools( session: Option<&McpToolSession<'_>>, ) { let user_function_names = collect_user_function_names(original_body); - strip_internal_mcp_output_items(resp, session, &user_function_names); - strip_internal_mcp_tools(resp, session, &user_function_names); + strip_internal_mcp_artifacts_with_names(resp, session, &user_function_names); restore_client_tool_view(resp, original_body, session, &user_function_names); } +/// Strip internal MCP artifacts without restoring request-level client tool view. +/// +/// Live streaming envelopes should redact leaked internals in-place, but they +/// should not synthesize `tools` / `tool_choice` fields that upstream omitted. +pub(super) fn strip_internal_mcp_artifacts( + resp: &mut Value, + original_body: &ResponsesRequest, + session: Option<&McpToolSession<'_>>, +) { + let user_function_names = collect_user_function_names(original_body); + strip_internal_mcp_artifacts_with_names(resp, session, &user_function_names); +} + +fn strip_internal_mcp_artifacts_with_names( + resp: &mut Value, + session: Option<&McpToolSession<'_>>, + user_function_names: &HashSet, +) { + strip_internal_mcp_output_items(resp, session, user_function_names); + strip_internal_mcp_tools(resp, session, user_function_names); + strip_internal_mcp_tool_choice(resp, session, user_function_names); +} + fn restore_client_tool_view( resp: &mut Value, original_body: &ResponsesRequest, @@ -380,6 +402,26 @@ fn strip_internal_mcp_tools( }); } +fn strip_internal_mcp_tool_choice( + resp: &mut Value, + session: Option<&McpToolSession<'_>>, + user_function_names: &HashSet, +) { + let Some(session) = session else { + return; + }; + let Some(obj) = resp.as_object_mut() else { + return; + }; + let should_hide_tool_choice = obj + .get("tool_choice") + .is_some_and(|tool_choice| session.should_hide_tool_json(tool_choice, user_function_names)); + + if should_hide_tool_choice { + obj.insert("tool_choice".to_string(), json!("auto")); + } +} + fn strip_internal_mcp_output_items( resp: &mut Value, session: Option<&McpToolSession<'_>>, @@ -501,7 +543,10 @@ mod tests { "required": ["query"] } }], - "tool_choice": "auto" + "tool_choice": { + "type": "function", + "name": "internal_search" + } }); restore_original_tools(&mut response, &original_body, Some(&session));