Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def __init__(

self.system_fingerprint: Optional[str] = None
self.received_finish_reason: Optional[str] = None
self.intermittent_finish_reason: Optional[str] = (
None # finish reasons that show up mid-stream
)
self.intermittent_finish_reason: Optional[
str
] = None # finish reasons that show up mid-stream
self.special_tokens = [
"<|assistant|>",
"<|system|>",
Expand Down Expand Up @@ -662,6 +662,15 @@ def model_response_creator(
pass
else:
model_response.choices = [StreamingChoices(finish_reason=None)]
# Add usage attribute when send_stream_usage=True
if (
hasattr(self, "send_stream_usage")
and self.send_stream_usage
and hasattr(self, "chunks")
):
# Calculate usage from accumulated chunks
usage = calculate_total_usage(chunks=self.chunks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would do it every time the model response object is created.

i can see us doing a usage calculation for gemini on streaming already @mhdawson

usage = VertexGeminiConfig._calculate_usage(

is there a minimal script you can share for me to reproduce the issue? Curious what's happening

Copy link
Author

@mhdawson mhdawson Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia thanks for following up. My recreate unfortuantely is with llama stack and the responses API where the usage field was not being populated. Does the test that is being added in the PR potentially show the issue as I think it confirms usage is not populated when it is not requested and the custom wrapper is in use?

Copy link
Author

@mhdawson mhdawson Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I don't know the code base well, I asked Claude to explain the issue. This is what is said:

_calculate_usage was being called during streaming - specifically in ModelResponseIterator.chunk_parser() at vertex_and_google_ai_studio_gemini.py:2130, where it calculates
usage for each individual chunk.

The problem was that CustomStreamWrapper wasn't aggregating this usage information from the chunks.

Here's the flow:

  1. Per-chunk calculation (already happening): ModelResponseIterator.chunk_parser() calls _calculate_usage() on each streaming chunk and sets model_response.usage (line 2142)
  2. Missing aggregation (the bug): CustomStreamWrapper collects these chunks in self.chunks but wasn't extracting/aggregating the usage data when stream_options was enabled
  3. The fix:
    - Passes stream_options to CustomStreamWrapper so it knows to enable usage tracking (lines 1749, 1763 in vertex file)
    - Added code in CustomStreamWrapper.model_response_creator() (streaming_handler.py:665-672) that calls calculate_total_usage(chunks=self.chunks) to aggregate usage from all
    chunks

So _calculate_usage was running, but its results were being discarded. The fix enabled CustomStreamWrapper to collect and report the aggregated usage when send_stream_usage=True.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia not sure if there is anything I need to do so it gets untagged for waiting on a response.

model_response.usage = usage
return model_response

def is_delta_empty(self, delta: Delta) -> bool:
Expand Down Expand Up @@ -879,7 +888,6 @@ def return_processed_chunk_logic( # noqa
## check if openai/azure chunk
original_chunk = response_obj.get("original_chunk", None)
if original_chunk:

if len(original_chunk.choices) > 0:
choices = []
for choice in original_chunk.choices:
Expand All @@ -896,7 +904,6 @@ def return_processed_chunk_logic( # noqa
print_verbose(f"choices in streaming: {choices}")
setattr(model_response, "choices", choices)
else:

return
model_response.system_fingerprint = (
original_chunk.system_fingerprint
Expand Down Expand Up @@ -1024,7 +1031,7 @@ def _optional_combine_thinking_block_in_choices(
return

def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915
if hasattr(chunk, 'id'):
if hasattr(chunk, "id"):
self.response_id = chunk.id
model_response = self.model_response_creator()
response_obj: Dict[str, Any] = {}
Expand Down Expand Up @@ -1424,9 +1431,9 @@ def chunk_creator(self, chunk: Any): # type: ignore # noqa: PLR0915
_json_delta = delta.model_dump()
print_verbose(f"_json_delta: {_json_delta}")
if "role" not in _json_delta or _json_delta["role"] is None:
_json_delta["role"] = (
"assistant" # mistral's api returns role as None
)
_json_delta[
"role"
] = "assistant" # mistral's api returns role as None
if "tool_calls" in _json_delta and isinstance(
_json_delta["tool_calls"], list
):
Expand Down Expand Up @@ -1837,9 +1844,9 @@ async def __anext__(self): # noqa: PLR0915
chunk = next(self.completion_stream)
if chunk is not None and chunk != b"":
print_verbose(f"PROCESSED CHUNK PRE CHUNK CREATOR: {chunk}")
processed_chunk: Optional[ModelResponseStream] = (
self.chunk_creator(chunk=chunk)
)
processed_chunk: Optional[
ModelResponseStream
] = self.chunk_creator(chunk=chunk)
print_verbose(
f"PROCESSED CHUNK POST CHUNK CREATOR: {processed_chunk}"
)
Expand Down
57 changes: 32 additions & 25 deletions litellm/llms/vertex_ai/gemini/vertex_and_google_ai_studio_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ def get_tool_value(tool: dict, tool_name: str) -> Optional[dict]:
return None

for tool in value:
openai_function_object: Optional[ChatCompletionToolParamFunctionChunk] = (
None
)
openai_function_object: Optional[
ChatCompletionToolParamFunctionChunk
] = None
if "function" in tool: # tools list
_openai_function_object = ChatCompletionToolParamFunctionChunk( # type: ignore
**tool["function"]
Expand Down Expand Up @@ -437,7 +437,9 @@ def _map_reasoning_effort_to_thinking_budget(
elif model and "gemini-2.5-pro" in model.lower():
budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_PRO
elif model and "gemini-2.5-flash" in model.lower():
budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_FLASH
budget = (
DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET_GEMINI_2_5_FLASH
)
else:
budget = DEFAULT_REASONING_EFFORT_MINIMAL_THINKING_BUDGET

Expand Down Expand Up @@ -621,16 +623,16 @@ def map_openai_params( # noqa: PLR0915
elif param == "seed":
optional_params["seed"] = value
elif param == "reasoning_effort" and isinstance(value, str):
optional_params["thinkingConfig"] = (
VertexGeminiConfig._map_reasoning_effort_to_thinking_budget(
value, model
)
optional_params[
"thinkingConfig"
] = VertexGeminiConfig._map_reasoning_effort_to_thinking_budget(
value, model
)
elif param == "thinking":
optional_params["thinkingConfig"] = (
VertexGeminiConfig._map_thinking_param(
cast(AnthropicThinkingParam, value)
)
optional_params[
"thinkingConfig"
] = VertexGeminiConfig._map_thinking_param(
cast(AnthropicThinkingParam, value)
)
elif param == "modalities" and isinstance(value, list):
response_modalities = self.map_response_modalities(value)
Expand Down Expand Up @@ -1066,7 +1068,6 @@ def _calculate_usage(
GenerateContentResponseBody, BidiGenerateContentServerMessage
],
) -> Usage:

if (
completion_response is not None
and "usageMetadata" not in completion_response
Expand Down Expand Up @@ -1502,28 +1503,28 @@ def _transform_google_generate_content_to_openai_model_response(
## ADD METADATA TO RESPONSE ##

setattr(model_response, "vertex_ai_grounding_metadata", grounding_metadata)
model_response._hidden_params["vertex_ai_grounding_metadata"] = (
grounding_metadata
)
model_response._hidden_params[
"vertex_ai_grounding_metadata"
] = grounding_metadata

setattr(
model_response, "vertex_ai_url_context_metadata", url_context_metadata
)

model_response._hidden_params["vertex_ai_url_context_metadata"] = (
url_context_metadata
)
model_response._hidden_params[
"vertex_ai_url_context_metadata"
] = url_context_metadata

setattr(model_response, "vertex_ai_safety_results", safety_ratings)
model_response._hidden_params["vertex_ai_safety_results"] = (
safety_ratings # older approach - maintaining to prevent regressions
)
model_response._hidden_params[
"vertex_ai_safety_results"
] = safety_ratings # older approach - maintaining to prevent regressions

## ADD CITATION METADATA ##
setattr(model_response, "vertex_ai_citation_metadata", citation_metadata)
model_response._hidden_params["vertex_ai_citation_metadata"] = (
citation_metadata # older approach - maintaining to prevent regressions
)
model_response._hidden_params[
"vertex_ai_citation_metadata"
] = citation_metadata # older approach - maintaining to prevent regressions

except Exception as e:
raise VertexAIError(
Expand Down Expand Up @@ -1744,6 +1745,8 @@ async def async_streaming(
)

request_body_str = json.dumps(request_body)
# Extract stream_options from optional_params to enable usage tracking
stream_options = optional_params.get("stream_options")
streaming_response = CustomStreamWrapper(
completion_stream=None,
make_call=partial(
Expand All @@ -1757,6 +1760,7 @@ async def async_streaming(
logging_obj=logging_obj,
),
model=model,
stream_options=stream_options,
custom_llm_provider="vertex_ai_beta",
logging_obj=logging_obj,
)
Expand Down Expand Up @@ -2010,6 +2014,8 @@ def completion(
## SYNC STREAMING CALL ##
if stream is True:
request_data_str = json.dumps(data)
# Extract stream_options from optional_params to enable usage tracking
stream_options = optional_params.get("stream_options")
streaming_response = CustomStreamWrapper(
completion_stream=None,
make_call=partial(
Expand All @@ -2029,6 +2035,7 @@ def completion(
model=model,
custom_llm_provider="vertex_ai_beta",
logging_obj=logging_obj,
stream_options=stream_options,
)

return streaming_response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def test_vertex_ai_tool_call_id_format():
def test_vertex_ai_code_line_length():
"""
Test that the specific code line generating tool call IDs is within character limit.

This is a meta-test to ensure the code change meets the 40-character requirement.
"""
import inspect
Expand All @@ -1040,19 +1040,117 @@ def test_vertex_ai_code_line_length():

# Get the source code of the _transform_parts method
source_lines = inspect.getsource(VertexGeminiConfig._transform_parts).split('\n')

# Find the line that generates the ID
id_line = None
for line in source_lines:
if 'id=f"call_{uuid.uuid4().hex' in line:
id_line = line.strip() # Remove indentation for length check
break

assert id_line is not None, "Could not find the ID generation line in source code"

# Check that the line is 40 characters or less (excluding indentation)
line_length = len(id_line)
assert line_length <= 40, f"ID generation line is {line_length} characters, should be ≤40: {id_line}"

# Verify it contains the expected UUID format
assert 'uuid.uuid4().hex[:28]' in id_line, f"Line should contain shortened UUID format: {id_line}"


def test_vertex_ai_streaming_with_stream_options():
"""
Test that CustomStreamWrapper includes usage in final chunk when stream_options is enabled.

"""
from unittest.mock import MagicMock, patch
from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper, calculate_total_usage
from litellm.types.utils import ModelResponseStream, StreamingChoices, Delta, Usage

# Create mock chunks with usage data
mock_chunks = [
ModelResponseStream(
id="chunk1",
created=1742056047,
model="vertex_ai/gemini-pro",
object="chat.completion.chunk",
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(content="Hello", role="assistant"),
)
],
usage=Usage(
prompt_tokens=10,
completion_tokens=1,
total_tokens=11,
),
),
ModelResponseStream(
id="chunk2",
created=1742056047,
model="vertex_ai/gemini-pro",
object="chat.completion.chunk",
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(content=" world", role="assistant"),
)
],
usage=Usage(
prompt_tokens=10,
completion_tokens=3,
total_tokens=13,
),
),
]

# Test CustomStreamWrapper with stream_options enabled
stream_wrapper = CustomStreamWrapper(
completion_stream=None,
model="vertex_ai/gemini-pro",
custom_llm_provider="vertex_ai",
logging_obj=MagicMock(),
stream_options={"include_usage": True},
)

# Simulate chunks being added
stream_wrapper.chunks = mock_chunks

# Verify send_stream_usage is enabled
assert stream_wrapper.send_stream_usage is True

# Test model_response_creator includes usage when chunks are present
with patch('litellm.litellm_core_utils.streaming_handler.calculate_total_usage') as mock_calc_usage:
expected_usage = Usage(
prompt_tokens=10,
completion_tokens=3,
total_tokens=13,
)
mock_calc_usage.return_value = expected_usage

model_response = stream_wrapper.model_response_creator()

# Verify calculate_total_usage was called with the chunks
mock_calc_usage.assert_called_once_with(chunks=mock_chunks)

# Verify usage was added to the response
assert hasattr(model_response, 'usage')
assert model_response.usage == expected_usage

# Test without stream_options - usage should not be included
stream_wrapper_no_usage = CustomStreamWrapper(
completion_stream=None,
model="vertex_ai/gemini-pro",
custom_llm_provider="vertex_ai",
logging_obj=MagicMock(),
stream_options=None,
)

stream_wrapper_no_usage.chunks = mock_chunks
assert stream_wrapper_no_usage.send_stream_usage is False

model_response_no_usage = stream_wrapper_no_usage.model_response_creator()
assert not hasattr(model_response_no_usage, 'usage') or model_response_no_usage.usage is None
Loading