diff --git a/model_gateway/src/routers/grpc/common/responses/utils.rs b/model_gateway/src/routers/grpc/common/responses/utils.rs index 66b09b63d..54eb9c48e 100644 --- a/model_gateway/src/routers/grpc/common/responses/utils.rs +++ b/model_gateway/src/routers/grpc/common/responses/utils.rs @@ -1,24 +1,26 @@ //! Utility functions for /v1/responses endpoint -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use axum::response::Response; use openai_protocol::{ common::Tool, responses::{ResponseTool, ResponsesRequest, ResponsesResponse}, }; -use serde_json::to_value; +use serde::Serialize; +use serde_json::{to_value, Value}; use smg_data_connector::{ ConversationItemStorage, ConversationStorage, RequestContext as StorageRequestContext, ResponseStorage, }; -use smg_mcp::{McpOrchestrator, McpServerBinding}; +use smg_mcp::{McpOrchestrator, McpServerBinding, ResponseFormat}; use tracing::{debug, error, warn}; use crate::{ routers::{ common::{ - mcp_utils::ensure_request_mcp_client, persistence_utils::persist_conversation_items, + mcp_utils::{collect_user_function_names, ensure_request_mcp_client}, + persistence_utils::persist_conversation_items, }, error, }, @@ -146,6 +148,116 @@ pub(crate) fn extract_tools_from_response_tools( .collect() } +pub(crate) fn redact_response_for_client( + response: &mut ResponsesResponse, + original_request: &ResponsesRequest, + session: Option<&smg_mcp::McpToolSession<'_>>, +) { + if let Some(redaction) = ClientRedaction::new(original_request, session) { + redaction.redact_response(response); + } +} + +pub(crate) fn redact_response_completed_event( + event: &mut Value, + original_request: &ResponsesRequest, + session: Option<&smg_mcp::McpToolSession<'_>>, +) { + if let (Some(response), Some(redaction)) = ( + event.get_mut("response"), + ClientRedaction::new(original_request, session), + ) { + redaction.redact_response_json(response); + } +} + +pub(crate) fn should_hide_mcp_streaming_tool( + tool_name: &str, + response_format: Option<&ResponseFormat>, + session: &smg_mcp::McpToolSession<'_>, + user_function_names: &HashSet, +) -> bool { + matches!(response_format, None | Some(ResponseFormat::Passthrough)) + && session.is_internal_non_builtin_tool(tool_name) + && !user_function_names.contains(tool_name) +} + +struct ClientRedaction<'a, 'session> { + session: &'a smg_mcp::McpToolSession<'session>, + user_function_names: HashSet, +} + +impl<'a, 'session> ClientRedaction<'a, 'session> { + fn new( + original_request: &ResponsesRequest, + session: Option<&'a smg_mcp::McpToolSession<'session>>, + ) -> Option { + session.map(|session| Self { + session, + user_function_names: collect_user_function_names(original_request), + }) + } + + fn redact_response(&self, response: &mut ResponsesResponse) { + response + .output + .retain(|item| !self.should_hide_serialized_output_item(item)); + response + .tools + .retain(|tool| !self.should_hide_serialized_tool(tool)); + + let should_hide_tool_choice = serde_json::from_str::(&response.tool_choice) + .ok() + .is_some_and(|tool_choice| self.should_hide_tool_json(&tool_choice)); + if should_hide_tool_choice { + response.tool_choice = "auto".to_string(); + } + } + + fn redact_response_json(&self, response: &mut Value) { + let Some(obj) = response.as_object_mut() else { + return; + }; + + if let Some(output) = obj.get_mut("output").and_then(Value::as_array_mut) { + output.retain(|item| !self.should_hide_output_item_json(item)); + } + + if let Some(tools) = obj.get_mut("tools").and_then(Value::as_array_mut) { + tools.retain(|tool| !self.should_hide_tool_json(tool)); + } + + let should_hide_tool_choice = obj + .get("tool_choice") + .is_some_and(|tool_choice| self.should_hide_tool_json(tool_choice)); + if should_hide_tool_choice { + obj.insert("tool_choice".to_string(), Value::String("auto".to_string())); + } + } + + fn should_hide_serialized_output_item(&self, item: &impl Serialize) -> bool { + to_value(item) + .ok() + .is_some_and(|item| self.should_hide_output_item_json(&item)) + } + + fn should_hide_serialized_tool(&self, tool: &impl Serialize) -> bool { + to_value(tool) + .ok() + .is_some_and(|tool| self.should_hide_tool_json(&tool)) + } + + fn should_hide_output_item_json(&self, item: &Value) -> bool { + self.session + .should_hide_output_item_json(item, &self.user_function_names) + } + + fn should_hide_tool_json(&self, tool: &Value) -> bool { + self.session + .should_hide_tool_json(tool, &self.user_function_names) + } +} + /// Persist response to storage if store=true /// /// Common helper function to avoid duplication across sync and streaming paths @@ -179,3 +291,256 @@ pub(crate) async fn persist_response_if_needed( } } } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use openai_protocol::responses::{ + McpTool, ResponseInput, ResponseOutputItem, ResponseTool, ResponsesRequest, + ResponsesResponse, + }; + use serde_json::{json, Value}; + use smg_mcp::{ + McpConfig, McpOrchestrator, McpServerBinding, McpServerConfig, McpToolSession, + McpTransport, ResponseFormat, Tool, ToolEntry, + }; + + use super::{ + redact_response_completed_event, redact_response_for_client, should_hide_mcp_streaming_tool, + }; + + fn test_tool(name: &str) -> Tool { + Tool { + name: name.to_string().into(), + title: None, + description: Some("internal".into()), + input_schema: json!({"type": "object"}) + .as_object() + .expect("schema object") + .clone() + .into(), + output_schema: None, + icons: None, + annotations: None, + } + } + + async fn session_with_server( + server_name: &str, + server_label: &str, + internal: bool, + tool_entry: ToolEntry, + ) -> (McpOrchestrator, ResponsesRequest) { + let orchestrator = McpOrchestrator::new(McpConfig { + servers: vec![McpServerConfig { + name: server_name.to_string(), + transport: McpTransport::Sse { + url: "http://localhost:3000/sse".to_string(), + token: None, + headers: Default::default(), + }, + proxy: None, + required: false, + tools: None, + builtin_type: None, + builtin_tool_name: None, + internal, + }], + ..Default::default() + }) + .await + .expect("orchestrator"); + + orchestrator.tool_inventory().insert_entry(tool_entry); + + let request = ResponsesRequest { + model: "gpt-5.4".to_string(), + input: ResponseInput::Text("hello".to_string()), + tools: Some(vec![ResponseTool::Mcp(McpTool { + server_url: Some("http://localhost:3000/sse".to_string()), + authorization: None, + headers: None, + server_label: server_label.to_string(), + server_description: None, + require_approval: None, + allowed_tools: None, + connector_id: None, + defer_loading: None, + })]), + ..Default::default() + }; + + (orchestrator, request) + } + + async fn session_with_internal_server() -> (McpOrchestrator, ResponsesRequest) { + session_with_server( + "internal-server", + "internal-label", + true, + ToolEntry::from_server_tool("internal-server", test_tool("internal_search")), + ) + .await + } + + fn binding(server_key: &str, server_label: &str) -> Vec { + vec![McpServerBinding { + label: server_label.to_string(), + server_key: server_key.to_string(), + allowed_tools: None, + }] + } + + #[tokio::test] + async fn grpc_response_redaction_strips_internal_mcp_artifacts() { + let (orchestrator, request) = session_with_internal_server().await; + let session = McpToolSession::new( + &orchestrator, + binding("internal-server", "internal-label"), + "resp_test", + ); + let mut response = ResponsesResponse::builder("resp_test", &request.model) + .copy_from_request(&request) + .output(vec![ + ResponseOutputItem::McpCall { + id: "mcp_internal".to_string(), + status: "completed".to_string(), + approval_request_id: None, + arguments: "{\"query\":\"secret args\"}".to_string(), + error: None, + name: "internal_search".to_string(), + output: "{\"trace\":\"secret output\"}".to_string(), + server_label: "internal-label".to_string(), + }, + ResponseOutputItem::FunctionToolCall { + id: "fc_public".to_string(), + call_id: "call_public".to_string(), + name: "public_function".to_string(), + arguments: "{\"city\":\"Paris\"}".to_string(), + output: None, + status: "completed".to_string(), + }, + ]) + .build(); + + redact_response_for_client(&mut response, &request, Some(&session)); + + let redacted = serde_json::to_value(&response).expect("response json"); + let serialized = redacted.to_string(); + assert!(!serialized.contains("internal_search")); + assert!(!serialized.contains("internal-label")); + assert!(!serialized.contains("secret args")); + assert!(!serialized.contains("secret output")); + assert!(serialized.contains("public_function")); + + let mut event = json!({ + "type": "response.completed", + "response": { + "id": "resp_test", + "object": "response", + "status": "completed", + "model": request.model.clone(), + "tools": request.tools, + "tool_choice": { + "type": "mcp", + "server_label": "internal-label", + "name": "internal_search" + }, + "output": [ + { + "type": "function_call", + "id": "fc_internal", + "call_id": "call_internal", + "name": "internal_search", + "arguments": "{\"query\":\"secret args\"}", + "output": "{\"trace\":\"secret output\"}", + "status": "completed" + }, + { + "type": "function_call", + "id": "fc_public", + "call_id": "call_public", + "name": "public_function", + "arguments": "{\"city\":\"Paris\"}", + "status": "completed" + } + ] + } + }); + + redact_response_completed_event(&mut event, &request, Some(&session)); + + let serialized = event.to_string(); + assert!(!serialized.contains("internal_search")); + assert!(!serialized.contains("internal-label")); + assert!(!serialized.contains("secret args")); + assert!(!serialized.contains("secret output")); + assert!(!serialized.contains("localhost:3000")); + assert!(serialized.contains("public_function")); + assert_eq!( + event["response"]["tool_choice"], + Value::String("auto".to_string()) + ); + } + + #[tokio::test] + async fn grpc_streaming_visibility_keeps_public_and_hosted_formats_visible() { + let user_function_names = HashSet::new(); + let (internal_orchestrator, _internal_request) = session_with_internal_server().await; + let internal_session = McpToolSession::new( + &internal_orchestrator, + binding("internal-server", "internal-label"), + "resp_test", + ); + assert!(should_hide_mcp_streaming_tool( + "internal_search", + Some(&internal_session.tool_response_format("internal_search")), + &internal_session, + &user_function_names + )); + + let (public_orchestrator, _public_request) = session_with_server( + "public-server", + "public-label", + false, + ToolEntry::from_server_tool("public-server", test_tool("public_search")), + ) + .await; + let public_session = McpToolSession::new( + &public_orchestrator, + binding("public-server", "public-label"), + "resp_test", + ); + assert!(!should_hide_mcp_streaming_tool( + "public_search", + Some(&public_session.tool_response_format("public_search")), + &public_session, + &user_function_names + )); + + let (image_orchestrator, _image_request) = session_with_server( + "image-server", + "image-label", + true, + ToolEntry::from_server_tool("image-server", test_tool("image_generation")) + .with_response_format(ResponseFormat::ImageGenerationCall), + ) + .await; + let image_session = McpToolSession::new( + &image_orchestrator, + binding("image-server", "image-label"), + "resp_test", + ); + assert_eq!( + image_session.tool_response_format("image_generation"), + ResponseFormat::ImageGenerationCall + ); + assert!(!should_hide_mcp_streaming_tool( + "image_generation", + Some(&image_session.tool_response_format("image_generation")), + &image_session, + &user_function_names + )); + } +} diff --git a/model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs b/model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs index 3fd641141..11e1b190e 100644 --- a/model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs +++ b/model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs @@ -34,7 +34,7 @@ use crate::{ grpc::{ common::responses::{ collect_user_function_names, ensure_mcp_connection, persist_response_if_needed, - ResponsesContext, + utils::redact_response_for_client, ResponsesContext, }, harmony::processor::ResponsesIterationResult, }, @@ -109,10 +109,11 @@ async fn execute_with_mcp_loop( // Extract user's max_tool_calls limit (if set) let max_tool_calls = current_request.max_tool_calls.map(|n| n as usize); - // Preserve original tools for response (before merging MCP tools) + // Preserve original request view for response redaction before merging MCP tools. // The response should show the user's original request tools, not internal MCP tools - let mut original_tools = current_request.tools.clone(); - let user_function_names = collect_user_function_names(¤t_request); + let client_request = current_request.clone(); + let mut original_tools = client_request.tools.clone(); + let user_function_names = collect_user_function_names(&client_request); // Create session once — bundles orchestrator, request_ctx, server_keys, mcp_tools let session_request_id = format!("resp_{}", uuid::Uuid::now_v7()); @@ -253,6 +254,7 @@ async fn execute_with_mcp_loop( &user_function_names, ); } + redact_response_for_client(&mut response, &client_request, Some(&session)); return Ok(response); } @@ -309,6 +311,7 @@ async fn execute_with_mcp_loop( &user_function_names, ); } + redact_response_for_client(&mut response, &client_request, Some(&session)); return Ok(response); } @@ -343,6 +346,7 @@ async fn execute_with_mcp_loop( // Restore original tools (hide internal MCP tools from response) response.tools = original_tools.take().unwrap_or_default(); + redact_response_for_client(&mut response, &client_request, Some(&session)); debug!( mcp_calls = mcp_tracking.total_calls(), diff --git a/model_gateway/src/routers/grpc/harmony/responses/streaming.rs b/model_gateway/src/routers/grpc/harmony/responses/streaming.rs index 496a348af..c6f684fe7 100644 --- a/model_gateway/src/routers/grpc/harmony/responses/streaming.rs +++ b/model_gateway/src/routers/grpc/harmony/responses/streaming.rs @@ -22,11 +22,13 @@ use crate::{ middleware::TenantRequestMeta, observability::metrics::Metrics, routers::{ - common::mcp_utils::DEFAULT_MAX_ITERATIONS, + common::mcp_utils::{collect_user_function_names, DEFAULT_MAX_ITERATIONS}, grpc::{ common::responses::{ build_sse_response, ensure_mcp_connection, persist_response_if_needed, - streaming::ResponseStreamEventEmitter, ResponsesContext, + streaming::ResponseStreamEventEmitter, + utils::{redact_response_completed_event, redact_response_for_client}, + ResponsesContext, }, harmony::{processor::ResponsesIterationResult, streaming::HarmonyStreamingProcessor}, }, @@ -173,9 +175,13 @@ async fn execute_mcp_tool_loop_streaming( strip_image_generation_from_request_tools(&mut current_request, &session); let mut mcp_tracking = McpCallTracking::new(); + let user_function_names = collect_user_function_names(original_request); // Emit mcp_list_tools on first iteration for binding in session.mcp_servers() { + if session.is_internal_server_label(&binding.label) { + continue; + } let tools_for_server = session.list_tools_for_server(&binding.server_key); if emitter @@ -241,6 +247,7 @@ async fn execute_mcp_tool_loop_streaming( emitter, tx, Some(&session), + &user_function_names, ) .await { @@ -304,7 +311,8 @@ async fn execute_mcp_tool_loop_streaming( "total_tokens": usage.total_tokens, "incomplete_details": incomplete_details, }); - let event = emitter.emit_completed(Some(&usage_json)); + let mut event = emitter.emit_completed(Some(&usage_json)); + redact_response_completed_event(&mut event, original_request, Some(&session)); emitter.send_event_best_effort(&event, tx); return; } @@ -357,7 +365,8 @@ async fn execute_mcp_tool_loop_streaming( "output_tokens": usage.completion_tokens, "total_tokens": usage.total_tokens, }); - let event = emitter.emit_completed(Some(&usage_json)); + let mut event = emitter.emit_completed(Some(&usage_json)); + redact_response_completed_event(&mut event, original_request, Some(&session)); emitter.send_event_best_effort(&event, tx); return; } @@ -385,7 +394,8 @@ async fn execute_mcp_tool_loop_streaming( ); // Finalize response from emitter's accumulated data - let final_response = emitter.finalize(Some(usage.clone())); + let mut final_response = emitter.finalize(Some(usage.clone())); + redact_response_for_client(&mut final_response, original_request, Some(&session)); // Persist response to storage if store=true persist_response_if_needed( @@ -410,7 +420,8 @@ async fn execute_mcp_tool_loop_streaming( json!({ "cached_tokens": details.cached_tokens }); } } - let event = emitter.emit_completed(Some(&usage_json)); + let mut event = emitter.emit_completed(Some(&usage_json)); + redact_response_completed_event(&mut event, original_request, Some(&session)); emitter.send_event_best_effort(&event, tx); return; } @@ -450,11 +461,13 @@ async fn execute_without_mcp_streaming( }; // Process stream (no MCP context, all tools treated as function tools) + let user_function_names = std::collections::HashSet::new(); let iteration_result = match HarmonyStreamingProcessor::process_responses_iteration_stream( execution_result, emitter, tx, None, + &user_function_names, ) .await { diff --git a/model_gateway/src/routers/grpc/harmony/streaming.rs b/model_gateway/src/routers/grpc/harmony/streaming.rs index 0515a7185..3c153a613 100644 --- a/model_gateway/src/routers/grpc/harmony/streaming.rs +++ b/model_gateway/src/routers/grpc/harmony/streaming.rs @@ -1,7 +1,7 @@ //! Harmony streaming response processor use std::{ - collections::{hash_map::Entry::Vacant, HashMap}, + collections::{hash_map::Entry::Vacant, HashMap, HashSet}, io, sync::Arc, time::Instant, @@ -36,6 +36,7 @@ use crate::{ responses::{ build_sse_response, streaming::{attach_mcp_server_label, OutputItemType, ResponseStreamEventEmitter}, + utils::should_hide_mcp_streaming_tool, }, }, context, @@ -499,15 +500,25 @@ impl HarmonyStreamingProcessor { emitter: &mut ResponseStreamEventEmitter, tx: &mpsc::UnboundedSender>, session: Option<&McpToolSession<'_>>, + user_function_names: &HashSet, ) -> Result { match execution_result { context::ExecutionResult::Single { stream } => { debug!("Processing Responses API single stream mode"); - Self::process_decode_stream(stream, emitter, tx, session, 0).await + Self::process_decode_stream(stream, emitter, tx, session, user_function_names, 0) + .await } context::ExecutionResult::Dual { prefill, decode } => { debug!("Processing Responses API dual stream mode"); - Self::process_responses_dual_stream(prefill, *decode, emitter, tx, session).await + Self::process_responses_dual_stream( + prefill, + *decode, + emitter, + tx, + session, + user_function_names, + ) + .await } context::ExecutionResult::Embedding { .. } => { Err("Embeddings not supported in Responses API streaming".to_string()) @@ -521,6 +532,7 @@ impl HarmonyStreamingProcessor { emitter: &mut ResponseStreamEventEmitter, tx: &mpsc::UnboundedSender>, session: Option<&McpToolSession<'_>>, + user_function_names: &HashSet, ) -> Result { // Phase 1: Drain prefill stream, collecting cached_tokens from Complete messages let mut prefill_cached_tokens_by_index: HashMap = HashMap::new(); @@ -534,9 +546,15 @@ impl HarmonyStreamingProcessor { let prefill_cached_tokens: u32 = prefill_cached_tokens_by_index.values().sum(); // Phase 2: Process decode stream - let result = - Self::process_decode_stream(decode_stream, emitter, tx, session, prefill_cached_tokens) - .await; + let result = Self::process_decode_stream( + decode_stream, + emitter, + tx, + session, + user_function_names, + prefill_cached_tokens, + ) + .await; prefill_stream.mark_completed(); result @@ -548,6 +566,7 @@ impl HarmonyStreamingProcessor { emitter: &mut ResponseStreamEventEmitter, tx: &mpsc::UnboundedSender>, session: Option<&McpToolSession<'_>>, + user_function_names: &HashSet, prefill_cached_tokens: u32, ) -> Result { let mut parser = @@ -685,86 +704,96 @@ impl HarmonyStreamingProcessor { None } }); + let hide_from_client = session.is_some_and(|s| { + should_hide_mcp_streaming_tool( + tool_name, + response_format.as_ref(), + s, + user_function_names, + ) + }); - // Determine output item type and JSON type string - let output_item_type = - ResponseStreamEventEmitter::output_item_type_for_format( + if !hide_from_client { + // Determine output item type and JSON type string + let output_item_type = + ResponseStreamEventEmitter::output_item_type_for_format( + response_format.as_ref(), + ); + let type_str = ResponseStreamEventEmitter::type_str_for_format( response_format.as_ref(), ); - let type_str = ResponseStreamEventEmitter::type_str_for_format( - response_format.as_ref(), - ); - - let (output_index, item_id) = - emitter.allocate_output_index(output_item_type); - - tool_call_tracking.insert( - call_index, - (output_index, item_id.clone(), response_format.clone()), - ); - // Build output_item.added event - let mut item = json!({ - "id": item_id, - "type": type_str, - "name": tool_name, - "call_id": call_id, - "arguments": "", - "status": "in_progress" - }); + let (output_index, item_id) = + emitter.allocate_output_index(output_item_type); - let label = session - .map(|s| s.resolve_tool_server_label(tool_name)) - .unwrap_or_else(|| DEFAULT_SERVER_LABEL.to_string()); - attach_mcp_server_label( - &mut item, - Some(label.as_str()), - response_format.as_ref(), - ); + tool_call_tracking.insert( + call_index, + (output_index, item_id.clone(), response_format.clone()), + ); - let event = emitter.emit_output_item_added(output_index, &item); - emitter.send_event_best_effort(&event, tx); + // Build output_item.added event + let mut item = json!({ + "id": item_id, + "type": type_str, + "name": tool_name, + "call_id": call_id, + "arguments": "", + "status": "in_progress" + }); - // Emit in_progress event for MCP tools - if let Some(ref fmt) = response_format { - let event = emitter.emit_tool_call_in_progress( - output_index, - &item_id, - fmt, + let label = session + .map(|s| s.resolve_tool_server_label(tool_name)) + .unwrap_or_else(|| DEFAULT_SERVER_LABEL.to_string()); + attach_mcp_server_label( + &mut item, + Some(label.as_str()), + response_format.as_ref(), ); - emitter.send_event_best_effort(&event, tx); - // Emit searching/interpreting event for builtin tools - if let Some(event) = emitter.emit_tool_call_searching( - output_index, - &item_id, - fmt, - ) { - emitter.send_event_best_effort(&event, tx); - } - } + let event = emitter.emit_output_item_added(output_index, &item); + emitter.send_event_best_effort(&event, tx); - // Emit initial arguments delta for mcp_call / function_call - // only. Hosted built-in tools (web_search_call, - // code_interpreter_call, file_search_call, - // image_generation_call) surface progress via - // the structured `*.in_progress` / - // `*.searching` / `*.generating` events emitted - // above instead of streaming their arguments. - if streams_arguments(response_format.as_ref()) { - let event = match &response_format { - Some(_) => emitter.emit_mcp_call_arguments_delta( + // Emit in_progress event for MCP tools + if let Some(ref fmt) = response_format { + let event = emitter.emit_tool_call_in_progress( output_index, &item_id, - "", - ), - None => emitter.emit_function_call_arguments_delta( + fmt, + ); + emitter.send_event_best_effort(&event, tx); + + // Emit searching/interpreting event for builtin tools + if let Some(event) = emitter.emit_tool_call_searching( output_index, &item_id, - "", - ), - }; - emitter.send_event_best_effort(&event, tx); + fmt, + ) { + emitter.send_event_best_effort(&event, tx); + } + } + + // Emit initial arguments delta for mcp_call / function_call + // only. Hosted built-in tools (web_search_call, + // code_interpreter_call, file_search_call, + // image_generation_call) surface progress via + // the structured `*.in_progress` / + // `*.searching` / `*.generating` events emitted + // above instead of streaming their arguments. + if streams_arguments(response_format.as_ref()) { + let event = match &response_format { + Some(_) => emitter.emit_mcp_call_arguments_delta( + output_index, + &item_id, + "", + ), + None => emitter.emit_function_call_arguments_delta( + output_index, + &item_id, + "", + ), + }; + emitter.send_event_best_effort(&event, tx); + } } } else { // Continuing tool call: emit arguments delta diff --git a/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs b/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs index 483092800..bf462e97a 100644 --- a/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs +++ b/model_gateway/src/routers/grpc/regular/responses/non_streaming.rs @@ -28,7 +28,7 @@ use crate::{ error, grpc::common::responses::{ collect_user_function_names, ensure_mcp_connection, persist_response_if_needed, - ResponsesContext, + utils::redact_response_for_client, ResponsesContext, }, }, }; @@ -241,6 +241,7 @@ pub(super) async fn execute_tool_loop( ); } + redact_response_for_client(&mut responses_response, original_request, Some(&session)); return Ok(responses_response); } else { state.iteration += 1; @@ -269,7 +270,7 @@ pub(super) async fn execute_tool_loop( // If ANY tool call is a function tool, return to caller immediately if !function_tool_calls.is_empty() { // Convert chat response to responses format (includes all tool calls) - let responses_response = conversions::chat_to_responses( + let mut responses_response = conversions::chat_to_responses( &chat_response, original_request, params.response_id.clone(), @@ -289,6 +290,11 @@ pub(super) async fn execute_tool_loop( })?; // Return response with function tool calls to caller + redact_response_for_client( + &mut responses_response, + original_request, + Some(&session), + ); return Ok(responses_response); } @@ -332,6 +338,11 @@ pub(super) async fn execute_tool_loop( responses_response.status = ResponseStatus::Completed; responses_response.incomplete_details = Some(json!({ "reason": "max_tool_calls" })); + redact_response_for_client( + &mut responses_response, + original_request, + Some(&session), + ); return Ok(responses_response); } diff --git a/model_gateway/src/routers/grpc/regular/responses/streaming.rs b/model_gateway/src/routers/grpc/regular/responses/streaming.rs index d269808d5..85a3f59c0 100644 --- a/model_gateway/src/routers/grpc/regular/responses/streaming.rs +++ b/model_gateway/src/routers/grpc/regular/responses/streaming.rs @@ -50,11 +50,14 @@ use super::{ use crate::{ observability::metrics::{metrics_labels, Metrics}, routers::{ - common::mcp_utils::{prepare_hosted_dispatch_args, DEFAULT_MAX_ITERATIONS}, + common::mcp_utils::{ + collect_user_function_names, prepare_hosted_dispatch_args, DEFAULT_MAX_ITERATIONS, + }, grpc::{ common::responses::{ build_sse_response, persist_response_if_needed, streaming::{attach_mcp_server_label, OutputItemType, ResponseStreamEventEmitter}, + utils::{redact_response_completed_event, should_hide_mcp_streaming_tool}, ResponsesContext, }, utils, @@ -505,6 +508,7 @@ async fn execute_tool_loop_streaming_internal( ) -> Result<(), String> { let mut state = ToolLoopState::new(original_request.input.clone()); let max_tool_calls = original_request.max_tool_calls.map(|n| n as usize); + let user_function_names = collect_user_function_names(original_request); // Generate response ID first so we can use it for both emitter and session let response_id = format!("resp_{}", Uuid::now_v7()); @@ -554,6 +558,9 @@ async fn execute_tool_loop_streaming_internal( // Emit mcp_list_tools as first output item (only once, on first iteration) if !mcp_list_tools_emitted { for binding in session.mcp_servers() { + if session.is_internal_server_label(&binding.label) { + continue; + } let tools_for_server = session.list_tools_for_server(&binding.server_key); emitter.emit_mcp_list_tools_sequence(&binding.label, &tools_for_server, &tx)?; @@ -639,65 +646,81 @@ async fn execute_tool_loop_streaming_internal( // Look up response_format for this tool let response_format = session.tool_response_format(&tool_call.name); - - // Use emitter helpers to determine correct type and allocate index - let item_type = - ResponseStreamEventEmitter::type_str_for_format(Some(&response_format)); - let output_item_type = - ResponseStreamEventEmitter::output_item_type_for_format(Some(&response_format)); - let resolved_label = session.resolve_tool_server_label(&tool_call.name); - - // Allocate output_index with correct type (generates appropriate item_id prefix) - let (output_index, item_id) = emitter.allocate_output_index(output_item_type); - - // Build initial tool call item - let mut item = json!({ - "id": item_id, - "type": item_type, - "name": tool_call.name, - "status": "in_progress", - "arguments": "" - }); - attach_mcp_server_label( - &mut item, - Some(resolved_label.as_str()), + let hide_from_client = should_hide_mcp_streaming_tool( + &tool_call.name, Some(&response_format), + &session, + &user_function_names, ); - // Emit output_item.added - let event = emitter.emit_output_item_added(output_index, &item); - emitter.send_event(&event, &tx)?; + let visible_output = if hide_from_client { + None + } else { + // Use emitter helpers to determine correct type and allocate index + let item_type = + ResponseStreamEventEmitter::type_str_for_format(Some(&response_format)); + let output_item_type = ResponseStreamEventEmitter::output_item_type_for_format( + Some(&response_format), + ); + let resolved_label = session.resolve_tool_server_label(&tool_call.name); - // Emit tool_call.in_progress - let event = - emitter.emit_tool_call_in_progress(output_index, &item_id, &response_format); - emitter.send_event(&event, &tx)?; + // Allocate output_index with correct type (generates appropriate item_id prefix) + let (output_index, item_id) = emitter.allocate_output_index(output_item_type); - // Emit arguments events for mcp_call only (skip for builtin tools) - if matches!(response_format, ResponseFormat::Passthrough) { - // Emit mcp_call_arguments.delta (simulate streaming by sending full arguments) - let event = emitter.emit_mcp_call_arguments_delta( - output_index, - &item_id, - &tool_call.arguments, + // Build initial tool call item + let mut item = json!({ + "id": item_id, + "type": item_type, + "name": tool_call.name, + "status": "in_progress", + "arguments": "" + }); + attach_mcp_server_label( + &mut item, + Some(resolved_label.as_str()), + Some(&response_format), ); + + // Emit output_item.added + let event = emitter.emit_output_item_added(output_index, &item); emitter.send_event(&event, &tx)?; - // Emit mcp_call_arguments.done - let event = emitter.emit_mcp_call_arguments_done( + // Emit tool_call.in_progress + let event = emitter.emit_tool_call_in_progress( output_index, &item_id, - &tool_call.arguments, + &response_format, ); emitter.send_event(&event, &tx)?; - } - // Emit searching/interpreting event for builtin tools - if let Some(event) = - emitter.emit_tool_call_searching(output_index, &item_id, &response_format) - { - emitter.send_event(&event, &tx)?; - } + // Emit arguments events for mcp_call only (skip for builtin tools) + if matches!(response_format, ResponseFormat::Passthrough) { + // Emit mcp_call_arguments.delta (simulate streaming by sending full arguments) + let event = emitter.emit_mcp_call_arguments_delta( + output_index, + &item_id, + &tool_call.arguments, + ); + emitter.send_event(&event, &tx)?; + + // Emit mcp_call_arguments.done + let event = emitter.emit_mcp_call_arguments_done( + output_index, + &item_id, + &tool_call.arguments, + ); + emitter.send_event(&event, &tx)?; + } + + // Emit searching/interpreting event for builtin tools + if let Some(event) = + emitter.emit_tool_call_searching(output_index, &item_id, &response_format) + { + emitter.send_event(&event, &tx)?; + } + + Some((output_index, item_id, item_type)) + }; // Execute the MCP tool trace!( @@ -734,30 +757,35 @@ async fn execute_tool_loop_streaming_internal( let output_str = tool_output.output.to_string(); if success { - // Emit tool_call.completed - let event = - emitter.emit_tool_call_completed(output_index, &item_id, &response_format); - emitter.send_event(&event, &tx)?; - - // Build complete item with output - let mut item_done = json!({ - "id": item_id, - "type": item_type, - "name": tool_output.tool_name, - "status": "completed", - "arguments": tool_output.arguments_str, - "output": output_str - }); - attach_mcp_server_label( - &mut item_done, - Some(tool_output.server_label.as_str()), - Some(&response_format), - ); - - // Emit output_item.done - let event = emitter.emit_output_item_done(output_index, &item_done); - emitter.send_event(&event, &tx)?; - emitter.complete_output_item(output_index); + if let Some((output_index, item_id, item_type)) = &visible_output { + // Emit tool_call.completed + let event = emitter.emit_tool_call_completed( + *output_index, + item_id, + &response_format, + ); + emitter.send_event(&event, &tx)?; + + // Build complete item with output + let mut item_done = json!({ + "id": item_id, + "type": item_type, + "name": tool_output.tool_name, + "status": "completed", + "arguments": tool_output.arguments_str, + "output": output_str + }); + attach_mcp_server_label( + &mut item_done, + Some(tool_output.server_label.as_str()), + Some(&response_format), + ); + + // Emit output_item.done + let event = emitter.emit_output_item_done(*output_index, &item_done); + emitter.send_event(&event, &tx)?; + emitter.complete_output_item(*output_index); + } } else { let err_text = tool_output .error_message @@ -765,29 +793,31 @@ async fn execute_tool_loop_streaming_internal( .unwrap_or_else(|| output_str.clone()); warn!("Tool execution returned error: {}", err_text); - // Emit mcp_call.failed (no web_search_call.failed event exists) - let event = emitter.emit_mcp_call_failed(output_index, &item_id, &err_text); - emitter.send_event(&event, &tx)?; - - // Build failed item - let mut item_done = json!({ - "id": item_id, - "type": item_type, - "name": tool_output.tool_name, - "status": "failed", - "arguments": tool_output.arguments_str, - "error": err_text - }); - attach_mcp_server_label( - &mut item_done, - Some(tool_output.server_label.as_str()), - Some(&response_format), - ); - - // Emit output_item.done - let event = emitter.emit_output_item_done(output_index, &item_done); - emitter.send_event(&event, &tx)?; - emitter.complete_output_item(output_index); + if let Some((output_index, item_id, item_type)) = &visible_output { + // Emit mcp_call.failed (no web_search_call.failed event exists) + let event = emitter.emit_mcp_call_failed(*output_index, item_id, &err_text); + emitter.send_event(&event, &tx)?; + + // Build failed item + let mut item_done = json!({ + "id": item_id, + "type": item_type, + "name": tool_output.tool_name, + "status": "failed", + "arguments": tool_output.arguments_str, + "error": err_text + }); + attach_mcp_server_label( + &mut item_done, + Some(tool_output.server_label.as_str()), + Some(&response_format), + ); + + // Emit output_item.done + let event = emitter.emit_output_item_done(*output_index, &item_done); + emitter.send_event(&event, &tx)?; + emitter.complete_output_item(*output_index); + } } // Record MCP tool metrics @@ -917,7 +947,8 @@ async fn execute_tool_loop_streaming_internal( "total_tokens": u.total_tokens }) }); - let event = emitter.emit_completed(usage_json.as_ref()); + let mut event = emitter.emit_completed(usage_json.as_ref()); + redact_response_completed_event(&mut event, original_request, Some(&session)); emitter.send_event(&event, &tx)?; break;