Skip to content

Enhance Gemini usage tracking to collect comprehensive token data #1752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
75 changes: 75 additions & 0 deletions pydantic_ai_slim/pydantic_ai/models/_google_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Annotated, Literal

import pydantic
from typing_extensions import NotRequired, TypedDict


class GeminiModalityTokenCount(TypedDict):
"""See <https://ai.google.dev/api/generate-content#modalitytokencount>."""

modality: Annotated[
Literal['MODALITY_UNSPECIFIED', 'TEXT', 'IMAGE', 'VIDEO', 'AUDIO', 'DOCUMENT'], pydantic.Field(alias='modality')
]
token_count: Annotated[int, pydantic.Field(alias='tokenCount', default=0)]


@pydantic.with_config(pydantic.ConfigDict(populate_by_name=True))
class GeminiUsageMetaData(TypedDict, total=False):
"""See <https://ai.google.dev/api/generate-content#UsageMetadata>.

The docs suggest all fields are required, but some are actually not required, so we assume they are all optional.
"""

prompt_token_count: Annotated[int, pydantic.Field(alias='promptTokenCount')]
candidates_token_count: NotRequired[Annotated[int, pydantic.Field(alias='candidatesTokenCount')]]
total_token_count: Annotated[int, pydantic.Field(alias='totalTokenCount')]
cached_content_token_count: NotRequired[Annotated[int, pydantic.Field(alias='cachedContentTokenCount')]]
thoughts_token_count: NotRequired[Annotated[int, pydantic.Field(alias='thoughtsTokenCount')]]
tool_use_prompt_token_count: NotRequired[Annotated[int, pydantic.Field(alias='toolUsePromptTokenCount')]]
prompt_tokens_details: NotRequired[
Annotated[list[GeminiModalityTokenCount], pydantic.Field(alias='promptTokensDetails')]
]
cache_tokens_details: NotRequired[
Annotated[list[GeminiModalityTokenCount], pydantic.Field(alias='cacheTokensDetails')]
]
candidates_tokens_details: NotRequired[
Annotated[list[GeminiModalityTokenCount], pydantic.Field(alias='candidatesTokensDetails')]
]
tool_use_prompt_tokens_details: NotRequired[
Annotated[list[GeminiModalityTokenCount], pydantic.Field(alias='toolUsePromptTokensDetails')]
]


gemini_usage_metadata_ta = pydantic.TypeAdapter(GeminiUsageMetaData)


def parse_usage_details(metadata: GeminiUsageMetaData) -> dict[str, int]:
details: dict[str, int] = {}
if cached_content_token_count := metadata.get('cached_content_token_count'):
# 'cached_content_token_count' left for backwards compatibility
details['cached_content_token_count'] = cached_content_token_count # pragma: no cover
details['cached_content_tokens'] = cached_content_token_count # pragma: no cover

if thoughts_token_count := metadata.get('thoughts_token_count'):
details['thoughts_tokens'] = thoughts_token_count

if tool_use_prompt_token_count := metadata.get('tool_use_prompt_token_count'):
details['tool_use_prompt_tokens'] = tool_use_prompt_token_count # pragma: no cover

detailed_keys_map: dict[str, str] = {
'prompt_tokens_details': 'prompt_tokens',
'cache_tokens_details': 'cache_tokens',
'candidates_tokens_details': 'candidates_tokens',
'tool_use_prompt_tokens_details': 'tool_use_prompt_tokens',
}

details.update(
{
f'{detail["modality"].lower()}_{suffix}': detail['token_count']
for key, suffix in detailed_keys_map.items()
if (metadata_details := metadata.get(key))
for detail in metadata_details
}
)

return details
22 changes: 4 additions & 18 deletions pydantic_ai_slim/pydantic_ai/models/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
check_allow_model_requests,
get_user_agent,
)
from ._google_common import GeminiUsageMetaData as _GeminiUsageMetaData, parse_usage_details
from ._json_schema import JsonSchema, WalkJsonSchema

LatestGeminiModelNames = Literal[
Expand Down Expand Up @@ -438,13 +439,12 @@ async def _get_gemini_responses(self) -> AsyncIterator[_GeminiResponse]:
responses_to_yield = gemini_responses[:-1]
for r in responses_to_yield[current_gemini_response_index:]:
current_gemini_response_index += 1
self._usage += _metadata_as_usage(r)
yield r

# Now yield the final response, which should be complete
if gemini_responses: # pragma: no branch
r = gemini_responses[-1]
self._usage += _metadata_as_usage(r)
self._usage = _metadata_as_usage(r)
yield r

@property
Expand Down Expand Up @@ -737,30 +737,16 @@ class _GeminiCandidates(TypedDict):
safety_ratings: NotRequired[Annotated[list[_GeminiSafetyRating], pydantic.Field(alias='safetyRatings')]]


class _GeminiUsageMetaData(TypedDict, total=False):
"""See <https://ai.google.dev/api/generate-content#FinishReason>.

The docs suggest all fields are required, but some are actually not required, so we assume they are all optional.
"""

prompt_token_count: Annotated[int, pydantic.Field(alias='promptTokenCount')]
candidates_token_count: NotRequired[Annotated[int, pydantic.Field(alias='candidatesTokenCount')]]
total_token_count: Annotated[int, pydantic.Field(alias='totalTokenCount')]
cached_content_token_count: NotRequired[Annotated[int, pydantic.Field(alias='cachedContentTokenCount')]]


def _metadata_as_usage(response: _GeminiResponse) -> usage.Usage:
metadata = response.get('usage_metadata')
if metadata is None:
return usage.Usage() # pragma: no cover
details: dict[str, int] = {}
if cached_content_token_count := metadata.get('cached_content_token_count'):
details['cached_content_token_count'] = cached_content_token_count # pragma: no cover

return usage.Usage(
request_tokens=metadata.get('prompt_token_count', 0),
response_tokens=metadata.get('candidates_token_count', 0),
total_tokens=metadata.get('total_token_count', 0),
details=details,
details=parse_usage_details(metadata),
)


Expand Down
14 changes: 5 additions & 9 deletions pydantic_ai_slim/pydantic_ai/models/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
check_allow_model_requests,
get_user_agent,
)
from ._google_common import gemini_usage_metadata_ta, parse_usage_details
from ._json_schema import JsonSchema, WalkJsonSchema

try:
Expand Down Expand Up @@ -388,7 +389,7 @@ class GeminiStreamedResponse(StreamedResponse):

async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
async for chunk in self._response:
self._usage += _metadata_as_usage(chunk)
self._usage = _metadata_as_usage(chunk)

assert chunk.candidates is not None
candidate = chunk.candidates[0]
Expand Down Expand Up @@ -471,18 +472,13 @@ def _metadata_as_usage(response: GenerateContentResponse) -> usage.Usage:
metadata = response.usage_metadata
if metadata is None:
return usage.Usage() # pragma: no cover
# TODO(Marcelo): We exclude the `prompt_tokens_details` and `candidate_token_details` fields because on
# `usage.Usage.incr``, it will try to sum non-integer values with integers, which will fail. We should probably
# handle this in the `Usage` class.
details = metadata.model_dump(
exclude={'prompt_tokens_details', 'candidates_tokens_details', 'traffic_type'},
exclude_defaults=True,
)
details = metadata.model_dump(exclude_defaults=True)

return usage.Usage(
request_tokens=details.pop('prompt_token_count', 0),
response_tokens=details.pop('candidates_token_count', 0),
total_tokens=details.pop('total_token_count', 0),
details=details,
details=parse_usage_details(gemini_usage_metadata_ta.validate_python(details)),
)


Expand Down
36 changes: 27 additions & 9 deletions tests/models/test_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,12 +732,12 @@ async def test_stream_text(get_gemini_client: GetGeminiClient):
'Hello world',
]
)
assert result.usage() == snapshot(Usage(requests=1, request_tokens=2, response_tokens=4, total_tokens=6))
assert result.usage() == snapshot(Usage(requests=1, request_tokens=1, response_tokens=2, total_tokens=3))

async with agent.run_stream('Hello') as result:
chunks = [chunk async for chunk in result.stream_text(delta=True, debounce_by=None)]
assert chunks == snapshot(['Hello ', 'world'])
assert result.usage() == snapshot(Usage(requests=1, request_tokens=2, response_tokens=4, total_tokens=6))
assert result.usage() == snapshot(Usage(requests=1, request_tokens=1, response_tokens=2, total_tokens=3))


async def test_stream_invalid_unicode_text(get_gemini_client: GetGeminiClient):
Expand Down Expand Up @@ -769,7 +769,7 @@ async def test_stream_invalid_unicode_text(get_gemini_client: GetGeminiClient):
async with agent.run_stream('Hello') as result:
chunks = [chunk async for chunk in result.stream(debounce_by=None)]
assert chunks == snapshot(['abc', 'abc€def', 'abc€def'])
assert result.usage() == snapshot(Usage(requests=1, request_tokens=2, response_tokens=4, total_tokens=6))
assert result.usage() == snapshot(Usage(requests=1, request_tokens=1, response_tokens=2, total_tokens=3))


async def test_stream_text_no_data(get_gemini_client: GetGeminiClient):
Expand Down Expand Up @@ -840,7 +840,7 @@ async def bar(y: str) -> str:
async with agent.run_stream('Hello') as result:
response = await result.get_output()
assert response == snapshot((1, 2))
assert result.usage() == snapshot(Usage(requests=2, request_tokens=3, response_tokens=6, total_tokens=9))
assert result.usage() == snapshot(Usage(requests=2, request_tokens=2, response_tokens=4, total_tokens=6))
assert result.all_messages() == snapshot(
[
ModelRequest(parts=[UserPromptPart(content='Hello', timestamp=IsNow(tz=timezone.utc))]),
Expand All @@ -849,7 +849,7 @@ async def bar(y: str) -> str:
ToolCallPart(tool_name='foo', args={'x': 'a'}, tool_call_id=IsStr()),
ToolCallPart(tool_name='bar', args={'y': 'b'}, tool_call_id=IsStr()),
],
usage=Usage(request_tokens=2, response_tokens=4, total_tokens=6),
usage=Usage(request_tokens=1, response_tokens=2, total_tokens=3, details={}),
model_name='gemini-1.5-flash',
timestamp=IsNow(tz=timezone.utc),
),
Expand All @@ -865,7 +865,7 @@ async def bar(y: str) -> str:
),
ModelResponse(
parts=[ToolCallPart(tool_name='final_result', args={'response': [1, 2]}, tool_call_id=IsStr())],
usage=Usage(request_tokens=1, response_tokens=2, total_tokens=3),
usage=Usage(request_tokens=1, response_tokens=2, total_tokens=3, details={}),
model_name='gemini-1.5-flash',
timestamp=IsNow(tz=timezone.utc),
),
Expand Down Expand Up @@ -1096,7 +1096,13 @@ async def get_image() -> BinaryContent:
),
ToolCallPart(tool_name='get_image', args={}, tool_call_id=IsStr()),
],
usage=Usage(requests=1, request_tokens=38, response_tokens=28, total_tokens=427, details={}),
usage=Usage(
requests=1,
request_tokens=38,
response_tokens=28,
total_tokens=427,
details={'thoughts_tokens': 361, 'text_prompt_tokens': 38},
),
model_name='gemini-2.5-pro-preview-03-25',
timestamp=IsDatetime(),
),
Expand All @@ -1119,7 +1125,13 @@ async def get_image() -> BinaryContent:
),
ModelResponse(
parts=[TextPart(content='The image shows a kiwi fruit, sliced in half.')],
usage=Usage(requests=1, request_tokens=360, response_tokens=11, total_tokens=572, details={}),
usage=Usage(
requests=1,
request_tokens=360,
response_tokens=11,
total_tokens=572,
details={'thoughts_tokens': 201, 'text_prompt_tokens': 102, 'image_prompt_tokens': 258},
),
model_name='gemini-2.5-pro-preview-03-25',
timestamp=IsDatetime(),
),
Expand Down Expand Up @@ -1229,7 +1241,13 @@ async def test_gemini_model_instructions(allow_model_requests: None, gemini_api_
),
ModelResponse(
parts=[TextPart(content='The capital of France is Paris.\n')],
usage=Usage(requests=1, request_tokens=13, response_tokens=8, total_tokens=21, details={}),
usage=Usage(
requests=1,
request_tokens=13,
response_tokens=8,
total_tokens=21,
details={'text_prompt_tokens': 13, 'text_candidates_tokens': 8},
),
model_name='gemini-1.5-flash',
timestamp=IsDatetime(),
),
Expand Down
62 changes: 54 additions & 8 deletions tests/models/test_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ async def test_google_model(allow_model_requests: None, google_provider: GoogleP

result = await agent.run('Hello!')
assert result.output == snapshot('Hello there! How can I help you today?\n')
assert result.usage() == snapshot(Usage(requests=1, request_tokens=7, response_tokens=11, total_tokens=18))
assert result.usage() == snapshot(
Usage(
requests=1,
request_tokens=7,
response_tokens=11,
total_tokens=18,
details={'text_prompt_tokens': 7, 'text_candidates_tokens': 11},
)
)
assert result.all_messages() == snapshot(
[
ModelRequest(
Expand All @@ -82,7 +90,13 @@ async def test_google_model(allow_model_requests: None, google_provider: GoogleP
),
ModelResponse(
parts=[TextPart(content='Hello there! How can I help you today?\n')],
usage=Usage(requests=1, request_tokens=7, response_tokens=11, total_tokens=18, details={}),
usage=Usage(
requests=1,
request_tokens=7,
response_tokens=11,
total_tokens=18,
details={'text_prompt_tokens': 7, 'text_candidates_tokens': 11},
),
model_name='gemini-1.5-flash',
timestamp=IsDatetime(),
),
Expand Down Expand Up @@ -114,7 +128,15 @@ async def temperature(city: str, date: datetime.date) -> str:

result = await agent.run('What was the temperature in London 1st January 2022?', output_type=Response)
assert result.output == snapshot({'temperature': '30°C', 'date': datetime.date(2022, 1, 1), 'city': 'London'})
assert result.usage() == snapshot(Usage(requests=2, request_tokens=224, response_tokens=35, total_tokens=259))
assert result.usage() == snapshot(
Usage(
requests=2,
request_tokens=224,
response_tokens=35,
total_tokens=259,
details={'text_prompt_tokens': 224, 'text_candidates_tokens': 35},
)
)
assert result.all_messages() == snapshot(
[
ModelRequest(
Expand All @@ -135,7 +157,13 @@ async def temperature(city: str, date: datetime.date) -> str:
tool_name='temperature', args={'date': '2022-01-01', 'city': 'London'}, tool_call_id=IsStr()
)
],
usage=Usage(requests=1, request_tokens=101, response_tokens=14, total_tokens=115, details={}),
usage=Usage(
requests=1,
request_tokens=101,
response_tokens=14,
total_tokens=115,
details={'text_prompt_tokens': 101, 'text_candidates_tokens': 14},
),
model_name='gemini-1.5-flash',
timestamp=IsDatetime(),
),
Expand All @@ -154,7 +182,13 @@ async def temperature(city: str, date: datetime.date) -> str:
tool_call_id=IsStr(),
)
],
usage=Usage(requests=1, request_tokens=123, response_tokens=21, total_tokens=144, details={}),
usage=Usage(
requests=1,
request_tokens=123,
response_tokens=21,
total_tokens=144,
details={'text_prompt_tokens': 123, 'text_candidates_tokens': 21},
),
model_name='gemini-1.5-flash',
timestamp=IsDatetime(),
),
Expand Down Expand Up @@ -211,7 +245,7 @@ async def get_capital(country: str) -> str:
request_tokens=57,
response_tokens=15,
total_tokens=173,
details={'thoughts_token_count': 101},
details={'thoughts_tokens': 101, 'text_prompt_tokens': 57},
),
model_name='models/gemini-2.5-pro-preview-05-06',
timestamp=IsDatetime(),
Expand All @@ -232,7 +266,13 @@ async def get_capital(country: str) -> str:
content='I am sorry, I cannot fulfill this request. The country you provided is not supported.'
)
],
usage=Usage(requests=1, request_tokens=104, response_tokens=18, total_tokens=122, details={}),
usage=Usage(
requests=1,
request_tokens=104,
response_tokens=18,
total_tokens=122,
details={'text_prompt_tokens': 104},
),
model_name='models/gemini-2.5-pro-preview-05-06',
timestamp=IsDatetime(),
),
Expand Down Expand Up @@ -466,7 +506,13 @@ def instructions() -> str:
),
ModelResponse(
parts=[TextPart(content='The capital of France is Paris.\n')],
usage=Usage(requests=1, request_tokens=13, response_tokens=8, total_tokens=21, details={}),
usage=Usage(
requests=1,
request_tokens=13,
response_tokens=8,
total_tokens=21,
details={'text_prompt_tokens': 13, 'text_candidates_tokens': 8},
),
model_name='gemini-2.0-flash',
timestamp=IsDatetime(),
),
Expand Down