3
3
import time
4
4
from typing import Any
5
5
from typing import Dict
6
+ from typing import List
6
7
from typing import Optional
7
8
from typing import Union
9
+ from typing import Tuple
8
10
9
11
import ddtrace
10
12
from ddtrace import Span
14
16
from ddtrace .ext import SpanTypes
15
17
from ddtrace .internal import atexit
16
18
from ddtrace .internal import forksafe
19
+ from ddtrace .internal import core
17
20
from ddtrace .internal ._rand import rand64bits
18
21
from ddtrace .internal .compat import ensure_text
19
22
from ddtrace .internal .logger import get_logger
45
48
from ddtrace .llmobs ._constants import SPAN_START_WHILE_DISABLED_WARNING
46
49
from ddtrace .llmobs ._constants import TAGS
47
50
from ddtrace .llmobs ._evaluators .runner import EvaluatorRunner
48
- from ddtrace .llmobs ._trace_processor import LLMObsTraceProcessor
49
51
from ddtrace .llmobs ._utils import AnnotationContext
50
52
from ddtrace .llmobs ._utils import _get_llmobs_parent_id
51
53
from ddtrace .llmobs ._utils import _get_ml_app
52
54
from ddtrace .llmobs ._utils import _get_session_id
55
+ from ddtrace .llmobs ._utils import _get_span_name
53
56
from ddtrace .llmobs ._utils import _inject_llmobs_parent_id
54
57
from ddtrace .llmobs ._utils import safe_json
55
58
from ddtrace .llmobs ._utils import validate_prompt
60
63
from ddtrace .llmobs .utils import Messages
61
64
from ddtrace .propagation .http import HTTPPropagator
62
65
66
+ from . import _constants as constants
67
+ from ..constants import ERROR_MSG , ERROR_STACK , ERROR_TYPE
63
68
64
69
log = get_logger (__name__ )
65
70
@@ -81,34 +86,155 @@ class LLMObs(Service):
81
86
def __init__ (self , tracer = None ):
82
87
super (LLMObs , self ).__init__ ()
83
88
self .tracer = tracer or ddtrace .tracer
84
- self ._llmobs_span_writer = None
85
-
86
89
self ._llmobs_span_writer = LLMObsSpanWriter (
87
90
is_agentless = config ._llmobs_agentless_enabled ,
88
91
interval = float (os .getenv ("_DD_LLMOBS_WRITER_INTERVAL" , 1.0 )),
89
92
timeout = float (os .getenv ("_DD_LLMOBS_WRITER_TIMEOUT" , 5.0 )),
90
93
)
91
-
92
94
self ._llmobs_eval_metric_writer = LLMObsEvalMetricWriter (
93
95
site = config ._dd_site ,
94
96
api_key = config ._dd_api_key ,
95
97
interval = float (os .getenv ("_DD_LLMOBS_WRITER_INTERVAL" , 1.0 )),
96
98
timeout = float (os .getenv ("_DD_LLMOBS_WRITER_TIMEOUT" , 5.0 )),
97
99
)
98
-
99
100
self ._evaluator_runner = EvaluatorRunner (
100
101
interval = float (os .getenv ("_DD_LLMOBS_EVALUATOR_INTERVAL" , 1.0 )),
101
102
llmobs_service = self ,
102
103
)
103
104
104
- self ._trace_processor = LLMObsTraceProcessor (self ._llmobs_span_writer , self ._evaluator_runner )
105
105
forksafe .register (self ._child_after_fork )
106
106
107
107
self ._annotations = []
108
108
self ._annotation_context_lock = forksafe .RLock ()
109
- self .tracer .on_start_span (self ._do_annotations )
110
109
111
- def _do_annotations (self , span ):
110
+ # Register hooks for span events
111
+ core .on ("trace.span_start" , self ._do_annotations )
112
+ core .on ("trace.span_finish" , self ._on_span_finish )
113
+
114
+ def _on_span_finish (self , span ):
115
+ if self .enabled and span .span_type == SpanTypes .LLM :
116
+ self ._submit_llmobs_span (span )
117
+
118
+ def _submit_llmobs_span (self , span : Span ) -> None :
119
+ """Generate and submit an LLMObs span event to be sent to LLMObs."""
120
+ span_event = None
121
+ is_llm_span = span ._get_ctx_item (SPAN_KIND ) == "llm"
122
+ is_ragas_integration_span = False
123
+ try :
124
+ span_event , is_ragas_integration_span = self ._llmobs_span_event (span )
125
+ self ._span_writer .enqueue (span_event )
126
+ except (KeyError , TypeError ):
127
+ log .error ("Error generating LLMObs span event for span %s, likely due to malformed span" , span , exc_info = True )
128
+ finally :
129
+ if not span_event or not is_llm_span or is_ragas_integration_span :
130
+ return
131
+ if self ._evaluator_runner :
132
+ self ._evaluator_runner .enqueue (span_event , span )
133
+
134
+ @classmethod
135
+ def _llmobs_span_event (cls , span : Span ) -> Tuple [Dict [str , Any ], bool ]:
136
+ """Span event object structure."""
137
+ span_kind = span ._get_ctx_item (SPAN_KIND )
138
+ if not span_kind :
139
+ raise KeyError ("Span kind not found in span context" )
140
+ meta : Dict [str , Any ] = {"span.kind" : span_kind , "input" : {}, "output" : {}}
141
+ if span_kind in ("llm" , "embedding" ) and span ._get_ctx_item (MODEL_NAME ) is not None :
142
+ meta ["model_name" ] = span ._get_ctx_item (MODEL_NAME )
143
+ meta ["model_provider" ] = (span ._get_ctx_item (MODEL_PROVIDER ) or "custom" ).lower ()
144
+ meta ["metadata" ] = span ._get_ctx_item (METADATA ) or {}
145
+ if span ._get_ctx_item (INPUT_PARAMETERS ):
146
+ meta ["input" ]["parameters" ] = span ._get_ctx_item (INPUT_PARAMETERS )
147
+ if span_kind == "llm" and span ._get_ctx_item (INPUT_MESSAGES ) is not None :
148
+ meta ["input" ]["messages" ] = span ._get_ctx_item (INPUT_MESSAGES )
149
+ if span ._get_ctx_item (INPUT_VALUE ) is not None :
150
+ meta ["input" ]["value" ] = safe_json (span ._get_ctx_item (INPUT_VALUE ))
151
+ if span_kind == "llm" and span ._get_ctx_item (OUTPUT_MESSAGES ) is not None :
152
+ meta ["output" ]["messages" ] = span ._get_ctx_item (OUTPUT_MESSAGES )
153
+ if span_kind == "embedding" and span ._get_ctx_item (INPUT_DOCUMENTS ) is not None :
154
+ meta ["input" ]["documents" ] = span ._get_ctx_item (INPUT_DOCUMENTS )
155
+ if span ._get_ctx_item (OUTPUT_VALUE ) is not None :
156
+ meta ["output" ]["value" ] = safe_json (span ._get_ctx_item (OUTPUT_VALUE ))
157
+ if span_kind == "retrieval" and span ._get_ctx_item (OUTPUT_DOCUMENTS ) is not None :
158
+ meta ["output" ]["documents" ] = span ._get_ctx_item (OUTPUT_DOCUMENTS )
159
+ if span ._get_ctx_item (INPUT_PROMPT ) is not None :
160
+ prompt_json_str = span ._get_ctx_item (INPUT_PROMPT )
161
+ if span_kind != "llm" :
162
+ log .warning (
163
+ "Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds."
164
+ )
165
+ else :
166
+ meta ["input" ]["prompt" ] = prompt_json_str
167
+ if span .error :
168
+ meta .update (
169
+ {
170
+ ERROR_MSG : span .get_tag (ERROR_MSG ),
171
+ ERROR_STACK : span .get_tag (ERROR_STACK ),
172
+ ERROR_TYPE : span .get_tag (ERROR_TYPE ),
173
+ }
174
+ )
175
+ if not meta ["input" ]:
176
+ meta .pop ("input" )
177
+ if not meta ["output" ]:
178
+ meta .pop ("output" )
179
+ metrics = span ._get_ctx_item (METRICS ) or {}
180
+ ml_app = _get_ml_app (span )
181
+
182
+ is_ragas_integration_span = False
183
+
184
+ if ml_app .startswith (constants .RAGAS_ML_APP_PREFIX ):
185
+ is_ragas_integration_span = True
186
+
187
+ span ._set_ctx_item (ML_APP , ml_app )
188
+ parent_id = str (_get_llmobs_parent_id (span ) or "undefined" )
189
+
190
+ llmobs_span_event = {
191
+ "trace_id" : "{:x}" .format (span .trace_id ),
192
+ "span_id" : str (span .span_id ),
193
+ "parent_id" : parent_id ,
194
+ "name" : _get_span_name (span ),
195
+ "start_ns" : span .start_ns ,
196
+ "duration" : span .duration_ns ,
197
+ "status" : "error" if span .error else "ok" ,
198
+ "meta" : meta ,
199
+ "metrics" : metrics ,
200
+ }
201
+ session_id = _get_session_id (span )
202
+ if session_id is not None :
203
+ span ._set_ctx_item (SESSION_ID , session_id )
204
+ llmobs_span_event ["session_id" ] = session_id
205
+
206
+ llmobs_span_event ["tags" ] = cls ._llmobs_tags (
207
+ span , ml_app , session_id , is_ragas_integration_span = is_ragas_integration_span
208
+ )
209
+ return llmobs_span_event , is_ragas_integration_span
210
+
211
+ @staticmethod
212
+ def _llmobs_tags (
213
+ span : Span , ml_app : str , session_id : Optional [str ] = None , is_ragas_integration_span : bool = False
214
+ ) -> List [str ]:
215
+ tags = {
216
+ "version" : config .version or "" ,
217
+ "env" : config .env or "" ,
218
+ "service" : span .service or "" ,
219
+ "source" : "integration" ,
220
+ "ml_app" : ml_app ,
221
+ "ddtrace.version" : ddtrace .__version__ ,
222
+ "language" : "python" ,
223
+ "error" : span .error ,
224
+ }
225
+ err_type = span .get_tag (ERROR_TYPE )
226
+ if err_type :
227
+ tags ["error_type" ] = err_type
228
+ if session_id :
229
+ tags ["session_id" ] = session_id
230
+ if is_ragas_integration_span :
231
+ tags [constants .RUNNER_IS_INTEGRATION_SPAN_TAG ] = "ragas"
232
+ existing_tags = span ._get_ctx_item (TAGS )
233
+ if existing_tags is not None :
234
+ tags .update (existing_tags )
235
+ return ["{}:{}" .format (k , v ) for k , v in tags .items ()]
236
+
237
+ def _do_annotations (self , span : Span ) -> None :
112
238
# get the current span context
113
239
# only do the annotations if it matches the context
114
240
if span .span_type != SpanTypes .LLM : # do this check to avoid the warning log in `annotate`
@@ -120,20 +246,14 @@ def _do_annotations(self, span):
120
246
if current_context_id == context_id :
121
247
self .annotate (span , ** annotation_kwargs )
122
248
123
- def _child_after_fork (self ):
249
+ def _child_after_fork (self ) -> None :
124
250
self ._llmobs_span_writer = self ._llmobs_span_writer .recreate ()
125
251
self ._llmobs_eval_metric_writer = self ._llmobs_eval_metric_writer .recreate ()
126
252
self ._evaluator_runner = self ._evaluator_runner .recreate ()
127
- self ._trace_processor ._span_writer = self ._llmobs_span_writer
128
- self ._trace_processor ._evaluator_runner = self ._evaluator_runner
129
253
if self .enabled :
130
254
self ._start_service ()
131
255
132
256
def _start_service (self ) -> None :
133
- tracer_filters = self .tracer ._filters
134
- if not any (isinstance (tracer_filter , LLMObsTraceProcessor ) for tracer_filter in tracer_filters ):
135
- tracer_filters += [self ._trace_processor ]
136
- self .tracer .configure (settings = {"FILTERS" : tracer_filters })
137
257
try :
138
258
self ._llmobs_span_writer .start ()
139
259
self ._llmobs_eval_metric_writer .start ()
@@ -160,11 +280,7 @@ def _stop_service(self) -> None:
160
280
except ServiceStatusError :
161
281
log .debug ("Error stopping LLMObs writers" )
162
282
163
- try :
164
- forksafe .unregister (self ._child_after_fork )
165
- self .tracer .shutdown ()
166
- except Exception :
167
- log .warning ("Failed to shutdown tracer" , exc_info = True )
283
+ forksafe .unregister (self ._child_after_fork )
168
284
169
285
@classmethod
170
286
def enable (
@@ -265,7 +381,6 @@ def disable(cls) -> None:
265
381
266
382
cls ._instance .stop ()
267
383
cls .enabled = False
268
- cls ._instance .tracer .deregister_on_start_span (cls ._instance ._do_annotations )
269
384
telemetry_writer .product_activated (TELEMETRY_APM_PRODUCT .LLMOBS , False )
270
385
271
386
log .debug ("%s disabled" , cls .__name__ )
0 commit comments