diff --git a/litellm/litellm_core_utils/streaming_handler.py b/litellm/litellm_core_utils/streaming_handler.py index b2f6c621407d..6864def3d8ae 100644 --- a/litellm/litellm_core_utils/streaming_handler.py +++ b/litellm/litellm_core_utils/streaming_handler.py @@ -94,9 +94,9 @@ def __init__( self.system_fingerprint: Optional[str] = None self.received_finish_reason: Optional[str] = None - self.intermittent_finish_reason: Optional[str] = ( - None # finish reasons that show up mid-stream - ) + self.intermittent_finish_reason: Optional[ + str + ] = None # finish reasons that show up mid-stream self.special_tokens = [ "<|assistant|>", "<|system|>", @@ -662,6 +662,15 @@ def model_response_creator( pass else: model_response.choices = [StreamingChoices(finish_reason=None)] + # Add usage attribute when send_stream_usage=True + if ( + hasattr(self, "send_stream_usage") + and self.send_stream_usage + and hasattr(self, "chunks") + ): + # Calculate usage from accumulated chunks + usage = calculate_total_usage(chunks=self.chunks) + model_response.usage = usage return model_response def is_delta_empty(self, delta: Delta) -> bool: @@ -879,7 +888,6 @@ def return_processed_chunk_logic( # noqa ## check if openai/azure chunk original_chunk = response_obj.get("original_chunk", None) if original_chunk: - if len(original_chunk.choices) > 0: choices = [] for choice in original_chunk.choices: @@ -896,7 +904,6 @@ def return_processed_chunk_logic( # noqa print_verbose(f"choices in streaming: {choices}") setattr(model_response, "choices", choices) else: - return model_response.system_fingerprint = ( original_chunk.system_fingerprint @@ -1024,7 +1031,7 @@ def _optional_combine_thinking_block_in_choices( return def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915 - if hasattr(chunk, 'id'): + if hasattr(chunk, "id"): self.response_id = chunk.id model_response = self.model_response_creator() response_obj: Dict[str, Any] = {} @@ -1424,9 +1431,9 @@ def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915 _json_delta = delta.model_dump() print_verbose(f"_json_delta: {_json_delta}") if "role" not in _json_delta or _json_delta["role"] is None: - _json_delta["role"] = ( - "assistant" # mistral's api returns role as None - ) + _json_delta[ + "role" + ] = "assistant" # mistral's api returns role as None if "tool_calls" in _json_delta and isinstance( _json_delta["tool_calls"], list ): @@ -1837,9 +1844,9 @@ async def __anext__(self): # noqa: PLR0915 chunk = next(self.completion_stream) if chunk is not None and chunk != b"": print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}") - processed_chunk: Optional[ModelResponseStream] = ( - self.chunk_creator(chunk=chunk) - ) + processed_chunk: Optional[ + ModelResponseStream + ] = self.chunk_creator(chunk=chunk) print_verbose( f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}" ) diff --git a/litellm/llms/vertex_ai/gemini/vertex_and_google_ai_studio_gemini.py b/litellm/llms/vertex_ai/gemini/vertex_and_google_ai_studio_gemini.py index dc3a6cf15e50..f59be1f3f33c 100644 --- a/litellm/llms/vertex_ai/gemini/vertex_and_google_ai_studio_gemini.py +++ b/litellm/llms/vertex_ai/gemini/vertex_and_google_ai_studio_gemini.py @@ -313,9 +313,9 @@ def get_tool_value(tool: dict, tool_name: str) -> Optional[dict]: return None for tool in value: - openai_function_object: Optional[ChatCompletionToolParamFunctionChunk] = ( - None - ) + openai_function_object: Optional[ + ChatCompletionToolParamFunctionChunk + ] = None if "function" in tool: # tools list _openai_function_object = ChatCompletionToolParamFunctionChunk( # type: ignore **tool["function"] @@ -437,7 +437,9 @@ def _map_reasoning_effort_to_thinking_budget( elif model and "gemini-2.5-pro" in model.lower(): budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_PRO elif model and "gemini-2.5-flash" in model.lower(): - budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_FLASH + budget = ( + DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_FLASH + ) else: budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET @@ -621,16 +623,16 @@ def map_openai_params( # noqa: PLR0915 elif param == "seed": optional_params["seed"] = value elif param == "reasoning_effort" and isinstance(value, str): - optional_params["thinkingConfig"] = ( - VertexGeminiConfig._map_reasoning_effort_to_thinking_budget( - value, model - ) + optional_params[ + "thinkingConfig" + ] = VertexGeminiConfig._map_reasoning_effort_to_thinking_budget( + value, model ) elif param == "thinking": - optional_params["thinkingConfig"] = ( - VertexGeminiConfig._map_thinking_param( - cast(AnthropicThinkingParam, value) - ) + optional_params[ + "thinkingConfig" + ] = VertexGeminiConfig._map_thinking_param( + cast(AnthropicThinkingParam, value) ) elif param == "modalities" and isinstance(value, list): response_modalities = self.map_response_modalities(value) @@ -1066,7 +1068,6 @@ def _calculate_usage( GenerateContentResponseBody, BidiGenerateContentServerMessage ], ) -> Usage: - if ( completion_response is not None and "usageMetadata" not in completion_response @@ -1502,28 +1503,28 @@ def _transform_google_generate_content_to_openai_model_response( ## ADD METADATA TO RESPONSE ## setattr(model_response, "vertex_ai_grounding_metadata", grounding_metadata) - model_response._hidden_params["vertex_ai_grounding_metadata"] = ( - grounding_metadata - ) + model_response._hidden_params[ + "vertex_ai_grounding_metadata" + ] = grounding_metadata setattr( model_response, "vertex_ai_url_context_metadata", url_context_metadata ) - model_response._hidden_params["vertex_ai_url_context_metadata"] = ( - url_context_metadata - ) + model_response._hidden_params[ + "vertex_ai_url_context_metadata" + ] = url_context_metadata setattr(model_response, "vertex_ai_safety_results", safety_ratings) - model_response._hidden_params["vertex_ai_safety_results"] = ( - safety_ratings # older approach - maintaining to prevent regressions - ) + model_response._hidden_params[ + "vertex_ai_safety_results" + ] = safety_ratings # older approach - maintaining to prevent regressions ## ADD CITATION METADATA ## setattr(model_response, "vertex_ai_citation_metadata", citation_metadata) - model_response._hidden_params["vertex_ai_citation_metadata"] = ( - citation_metadata # older approach - maintaining to prevent regressions - ) + model_response._hidden_params[ + "vertex_ai_citation_metadata" + ] = citation_metadata # older approach - maintaining to prevent regressions except Exception as e: raise VertexAIError( @@ -1744,6 +1745,8 @@ async def async_streaming( ) request_body_str = json.dumps(request_body) + # Extract stream_options from optional_params to enable usage tracking + stream_options = optional_params.get("stream_options") streaming_response = CustomStreamWrapper( completion_stream=None, make_call=partial( @@ -1757,6 +1760,7 @@ async def async_streaming( logging_obj=logging_obj, ), model=model, + stream_options=stream_options, custom_llm_provider="vertex_ai_beta", logging_obj=logging_obj, ) @@ -2010,6 +2014,8 @@ def completion( ## SYNC STREAMING CALL ## if stream is True: request_data_str = json.dumps(data) + # Extract stream_options from optional_params to enable usage tracking + stream_options = optional_params.get("stream_options") streaming_response = CustomStreamWrapper( completion_stream=None, make_call=partial( @@ -2029,6 +2035,7 @@ def completion( model=model, custom_llm_provider="vertex_ai_beta", logging_obj=logging_obj, + stream_options=stream_options, ) return streaming_response diff --git a/tests/test_litellm/llms/vertex_ai/gemini/test_vertex_and_google_ai_studio_gemini.py b/tests/test_litellm/llms/vertex_ai/gemini/test_vertex_and_google_ai_studio_gemini.py index 62a11bf67651..94e848010417 100644 --- a/tests/test_litellm/llms/vertex_ai/gemini/test_vertex_and_google_ai_studio_gemini.py +++ b/tests/test_litellm/llms/vertex_ai/gemini/test_vertex_and_google_ai_studio_gemini.py @@ -1029,7 +1029,7 @@ def test_vertex_ai_tool_call_id_format(): def test_vertex_ai_code_line_length(): """ Test that the specific code line generating tool call IDs is within character limit. - + This is a meta-test to ensure the code change meets the 40-character requirement. """ import inspect @@ -1040,19 +1040,117 @@ def test_vertex_ai_code_line_length(): # Get the source code of the _transform_parts method source_lines = inspect.getsource(VertexGeminiConfig._transform_parts).split('\n') - + # Find the line that generates the ID id_line = None for line in source_lines: if 'id=f"call_{uuid.uuid4().hex' in line: id_line = line.strip() # Remove indentation for length check break - + assert id_line is not None, "Could not find the ID generation line in source code" - + # Check that the line is 40 characters or less (excluding indentation) line_length = len(id_line) assert line_length <= 40, f"ID generation line is {line_length} characters, should be ≤40: {id_line}" - + # Verify it contains the expected UUID format assert 'uuid.uuid4().hex[:28]' in id_line, f"Line should contain shortened UUID format: {id_line}" + + +def test_vertex_ai_streaming_with_stream_options(): + """ + Test that CustomStreamWrapper includes usage in final chunk when stream_options is enabled. + + """ + from unittest.mock import MagicMock, patch + from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper, calculate_total_usage + from litellm.types.utils import ModelResponseStream, StreamingChoices, Delta, Usage + + # Create mock chunks with usage data + mock_chunks = [ + ModelResponseStream( + id="chunk1", + created=1742056047, + model="vertex_ai/gemini-pro", + object="chat.completion.chunk", + choices=[ + StreamingChoices( + finish_reason=None, + index=0, + delta=Delta(content="Hello", role="assistant"), + ) + ], + usage=Usage( + prompt_tokens=10, + completion_tokens=1, + total_tokens=11, + ), + ), + ModelResponseStream( + id="chunk2", + created=1742056047, + model="vertex_ai/gemini-pro", + object="chat.completion.chunk", + choices=[ + StreamingChoices( + finish_reason=None, + index=0, + delta=Delta(content=" world", role="assistant"), + ) + ], + usage=Usage( + prompt_tokens=10, + completion_tokens=3, + total_tokens=13, + ), + ), + ] + + # Test CustomStreamWrapper with stream_options enabled + stream_wrapper = CustomStreamWrapper( + completion_stream=None, + model="vertex_ai/gemini-pro", + custom_llm_provider="vertex_ai", + logging_obj=MagicMock(), + stream_options={"include_usage": True}, + ) + + # Simulate chunks being added + stream_wrapper.chunks = mock_chunks + + # Verify send_stream_usage is enabled + assert stream_wrapper.send_stream_usage is True + + # Test model_response_creator includes usage when chunks are present + with patch('litellm.litellm_core_utils.streaming_handler.calculate_total_usage') as mock_calc_usage: + expected_usage = Usage( + prompt_tokens=10, + completion_tokens=3, + total_tokens=13, + ) + mock_calc_usage.return_value = expected_usage + + model_response = stream_wrapper.model_response_creator() + + # Verify calculate_total_usage was called with the chunks + mock_calc_usage.assert_called_once_with(chunks=mock_chunks) + + # Verify usage was added to the response + assert hasattr(model_response, 'usage') + assert model_response.usage == expected_usage + + # Test without stream_options - usage should not be included + stream_wrapper_no_usage = CustomStreamWrapper( + completion_stream=None, + model="vertex_ai/gemini-pro", + custom_llm_provider="vertex_ai", + logging_obj=MagicMock(), + stream_options=None, + ) + + stream_wrapper_no_usage.chunks = mock_chunks + assert stream_wrapper_no_usage.send_stream_usage is False + + model_response_no_usage = stream_wrapper_no_usage.model_response_creator() + assert not hasattr(model_response_no_usage, 'usage') or model_response_no_usage.usage is None