@@ -231,28 +231,6 @@ def _warn(msg: str):
231231 _warn ._LOGGER .warning (msg ) # pyright: ignore[reportFunctionMemberAccess]
232232
233233
234- def _force_flush_traces ():
235- try :
236- import opentelemetry .trace
237- except (ImportError , AttributeError ):
238- _warn (
239- "Could not force flush traces. opentelemetry-api is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
240- )
241- return None
242-
243- try :
244- import opentelemetry .sdk .trace
245- except (ImportError , AttributeError ):
246- _warn (
247- "Could not force flush traces. opentelemetry-sdk is not installed. Please call 'pip install google-cloud-aiplatform[agent_engines]'."
248- )
249- return None
250-
251- provider = opentelemetry .trace .get_tracer_provider ()
252- if isinstance (provider , opentelemetry .sdk .trace .TracerProvider ):
253- _ = provider .force_flush ()
254-
255-
256234def _default_instrumentor_builder (
257235 project_id : str ,
258236 * ,
@@ -333,23 +311,28 @@ def _detect_cloud_resource_id(project_id: str) -> Optional[str]:
333311
334312 if enable_tracing :
335313 try :
336- import opentelemetry .exporter .otlp .proto .http .trace_exporter
337- import google .auth .transport .requests
314+ import opentelemetry .exporter .cloud_trace
338315 except (ImportError , AttributeError ):
339316 return _warn_missing_dependency (
340- "opentelemetry-exporter-otlp-proto-http" , needed_for_tracing = True
317+ "opentelemetry-exporter-gcp-trace" , needed_for_tracing = True
318+ )
319+
320+ try :
321+ import google .cloud .trace_v2
322+ except (ImportError , AttributeError ):
323+ return _warn_missing_dependency (
324+ "google-cloud-trace" , needed_for_tracing = True
341325 )
342326
343327 import google .auth
344328
345329 credentials , _ = google .auth .default ()
346- span_exporter = (
347- opentelemetry .exporter .otlp .proto .http .trace_exporter .OTLPSpanExporter (
348- session = google .auth .transport .requests .AuthorizedSession (
349- credentials = credentials
350- ),
351- endpoint = "https://telemetry.googleapis.com/v1/traces" ,
352- )
330+ span_exporter = opentelemetry .exporter .cloud_trace .CloudTraceSpanExporter (
331+ project_id = project_id ,
332+ client = google .cloud .trace_v2 .TraceServiceClient (
333+ credentials = credentials .with_quota_project (project_id ),
334+ ),
335+ resource_regex = "|" .join (resource .attributes .keys ()),
353336 )
354337 span_processor = opentelemetry .sdk .trace .export .BatchSpanProcessor (
355338 span_exporter = span_exporter ,
@@ -712,17 +695,54 @@ def set_up(self):
712695 else :
713696 os .environ ["ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS" ] = "false"
714697
715- enable_logging = bool (self ._telemetry_enabled ())
698+ GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
699+ "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
700+ )
701+
702+ def telemetry_enabled () -> Optional [bool ]:
703+ return (
704+ os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
705+ in ("true" , "1" )
706+ if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
707+ else None
708+ )
709+
710+ # Tracing enablement follows truth table:
711+ def tracing_enabled () -> bool :
712+ """Tracing enablement follows true table:
713+
714+ | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
715+ |----------------|-----------------------|--------------------------|
716+ | false | false | false |
717+ | false | true | false |
718+ | false | None | false |
719+ | true | false | false |
720+ | true | true | true |
721+ | true | None | true |
722+ | None(default) | false | false |
723+ | None(default) | true | adk_version >= 1.17 |
724+ | None(default) | None | false |
725+ """
726+ enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
727+ enable_telemetry : Optional [bool ] = telemetry_enabled ()
728+
729+ return (enable_tracing is True and enable_telemetry is not False ) or (
730+ enable_tracing is None
731+ and enable_telemetry is True
732+ and is_version_sufficient ("1.17.0" )
733+ )
734+
735+ enable_logging = bool (telemetry_enabled ())
716736
717737 custom_instrumentor = self ._tmpl_attrs .get ("instrumentor_builder" )
718738
719- if custom_instrumentor and self . _tracing_enabled ():
739+ if custom_instrumentor and tracing_enabled ():
720740 self ._tmpl_attrs ["instrumentor" ] = custom_instrumentor (project )
721741
722742 if not custom_instrumentor :
723743 self ._tmpl_attrs ["instrumentor" ] = _default_instrumentor_builder (
724744 project ,
725- enable_tracing = self . _tracing_enabled (),
745+ enable_tracing = tracing_enabled (),
726746 enable_logging = enable_logging ,
727747 )
728748
@@ -894,14 +914,9 @@ async def async_stream_query(
894914 ** kwargs ,
895915 )
896916
897- try :
898- async for event in events_async :
899- # Yield the event data as a dictionary
900- yield _utils .dump_event_for_json (event )
901- finally :
902- # Avoid trace data loss having to do with CPU throttling on instance turndown
903- if self ._tracing_enabled ():
904- _ = await asyncio .to_thread (_force_flush_traces )
917+ async for event in events_async :
918+ # Yield the event data as a dictionary
919+ yield _utils .dump_event_for_json (event )
905920
906921 def stream_query (
907922 self ,
@@ -1053,9 +1068,6 @@ async def streaming_agent_run_with_events(self, request_json: str):
10531068 user_id = request .user_id ,
10541069 session_id = session .id ,
10551070 )
1056- # Avoid trace data loss having to do with CPU throttling on instance turndown
1057- if self ._tracing_enabled ():
1058- _ = await asyncio .to_thread (_force_flush_traces )
10591071
10601072 async def async_get_session (
10611073 self ,
@@ -1438,52 +1450,3 @@ def register_operations(self) -> Dict[str, List[str]]:
14381450 "streaming_agent_run_with_events" ,
14391451 ],
14401452 }
1441-
1442- def _telemetry_enabled (self ) -> Optional [bool ]:
1443- """Return status of telemetry enablement depending on enablement env variable.
1444-
1445- In detail:
1446- - Logging is always enabled when telemetry is enabled.
1447- - Tracing is enabled depending on the truth table seen in `_tracing_enabled` method, in order to not break existing user enablement.
1448-
1449- Returns:
1450- True if telemetry is enabled, False if telemetry is disabled, or None
1451- if telemetry enablement is not set (i.e. old deployments which don't support this env variable).
1452- """
1453- import os
1454-
1455- GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
1456- "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
1457- )
1458-
1459- return (
1460- os .getenv (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY , "0" ).lower ()
1461- in ("true" , "1" )
1462- if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in os .environ
1463- else None
1464- )
1465-
1466- # Tracing enablement follows truth table:
1467- def _tracing_enabled (self ) -> bool :
1468- """Tracing enablement follows true table:
1469-
1470- | enable_tracing | enable_telemetry(env) | tracing_actually_enabled |
1471- |----------------|-----------------------|--------------------------|
1472- | false | false | false |
1473- | false | true | false |
1474- | false | None | false |
1475- | true | false | false |
1476- | true | true | true |
1477- | true | None | true |
1478- | None(default) | false | false |
1479- | None(default) | true | adk_version >= 1.17 |
1480- | None(default) | None | false |
1481- """
1482- enable_tracing : Optional [bool ] = self ._tmpl_attrs .get ("enable_tracing" )
1483- enable_telemetry : Optional [bool ] = self ._telemetry_enabled ()
1484-
1485- return (enable_tracing is True and enable_telemetry is not False ) or (
1486- enable_tracing is None
1487- and enable_telemetry is True
1488- and is_version_sufficient ("1.17.0" )
1489- )
0 commit comments