Skip to content

Commit 8fe8a20

Browse files
committed
chore(llmobs): refactor to use span events
The LLMObs service formerly depended on the TraceProcessor interface in the tracer. This was problematic due to sharing a dependency with the public API. As such, users could configure a trace filter (under the hood is a trace processor) and overwrite the LLMObs TraceProcessor. Instead, the tracer can emit span start and finish events which the LLMObs service listens to and acts on, as proposed here. The gotcha is that the LLMObs service no longer has a way to drop traces when run in agentless mode, which only LLMObs supports. Instead, we encourage users to explicitly turn off APM which carries the benefit of clarity since this was implicit before.
1 parent 3b4bd62 commit 8fe8a20

File tree

6 files changed

+144
-285
lines changed

6 files changed

+144
-285
lines changed

ddtrace/_trace/tracer.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from ddtrace.internal.atexit import register_on_exit_signal
4242
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
4343
from ddtrace.internal.constants import SPAN_API_DATADOG
44+
from ddtrace.internal.core import dispatch
4445
from ddtrace.internal.dogstatsd import get_dogstatsd_client
4546
from ddtrace.internal.logger import get_logger
4647
from ddtrace.internal.peer_service.processor import PeerServiceProcessor
@@ -866,7 +867,7 @@ def _start_span(
866867
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
867868
p.on_span_start(span)
868869
self._hooks.emit(self.__class__.start_span, span)
869-
870+
dispatch("trace.span_start", (span,))
870871
return span
871872

872873
start_span = _start_span
@@ -883,6 +884,8 @@ def _on_span_finish(self, span: Span) -> None:
883884
for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors):
884885
p.on_span_finish(span)
885886

887+
dispatch("trace.span_finish", (span,))
888+
886889
if log.isEnabledFor(logging.DEBUG):
887890
log.debug("finishing span %s (enabled:%s)", span._pprint(), self.enabled)
888891

ddtrace/llmobs/_llmobs.py

+136-21
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import time
44
from typing import Any
55
from typing import Dict
6+
from typing import List
67
from typing import Optional
78
from typing import Union
9+
from typing import Tuple
810

911
import ddtrace
1012
from ddtrace import Span
@@ -14,6 +16,7 @@
1416
from ddtrace.ext import SpanTypes
1517
from ddtrace.internal import atexit
1618
from ddtrace.internal import forksafe
19+
from ddtrace.internal import core
1720
from ddtrace.internal._rand import rand64bits
1821
from ddtrace.internal.compat import ensure_text
1922
from ddtrace.internal.logger import get_logger
@@ -45,11 +48,11 @@
4548
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
4649
from ddtrace.llmobs._constants import TAGS
4750
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
48-
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
4951
from ddtrace.llmobs._utils import AnnotationContext
5052
from ddtrace.llmobs._utils import _get_llmobs_parent_id
5153
from ddtrace.llmobs._utils import _get_ml_app
5254
from ddtrace.llmobs._utils import _get_session_id
55+
from ddtrace.llmobs._utils import _get_span_name
5356
from ddtrace.llmobs._utils import _inject_llmobs_parent_id
5457
from ddtrace.llmobs._utils import safe_json
5558
from ddtrace.llmobs._utils import validate_prompt
@@ -60,6 +63,8 @@
6063
from ddtrace.llmobs.utils import Messages
6164
from ddtrace.propagation.http import HTTPPropagator
6265

66+
from . import _constants as constants
67+
from ..constants import ERROR_MSG, ERROR_STACK, ERROR_TYPE
6368

6469
log = get_logger(__name__)
6570

@@ -81,34 +86,155 @@ class LLMObs(Service):
8186
def __init__(self, tracer=None):
8287
super(LLMObs, self).__init__()
8388
self.tracer = tracer or ddtrace.tracer
84-
self._llmobs_span_writer = None
85-
8689
self._llmobs_span_writer = LLMObsSpanWriter(
8790
is_agentless=config._llmobs_agentless_enabled,
8891
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
8992
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
9093
)
91-
9294
self._llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
9395
site=config._dd_site,
9496
api_key=config._dd_api_key,
9597
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
9698
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
9799
)
98-
99100
self._evaluator_runner = EvaluatorRunner(
100101
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
101102
llmobs_service=self,
102103
)
103104

104-
self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner)
105105
forksafe.register(self._child_after_fork)
106106

107107
self._annotations = []
108108
self._annotation_context_lock = forksafe.RLock()
109-
self.tracer.on_start_span(self._do_annotations)
110109

111-
def _do_annotations(self, span):
110+
# Register hooks for span events
111+
core.on("trace.span_start", self._do_annotations)
112+
core.on("trace.span_finish", self._on_span_finish)
113+
114+
def _on_span_finish(self, span):
115+
if self.enabled and span.span_type == SpanTypes.LLM:
116+
self._submit_llmobs_span(span)
117+
118+
def _submit_llmobs_span(self, span: Span) -> None:
119+
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
120+
span_event = None
121+
is_llm_span = span._get_ctx_item(SPAN_KIND) == "llm"
122+
is_ragas_integration_span = False
123+
try:
124+
span_event, is_ragas_integration_span = self._llmobs_span_event(span)
125+
self._span_writer.enqueue(span_event)
126+
except (KeyError, TypeError):
127+
log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span, exc_info=True)
128+
finally:
129+
if not span_event or not is_llm_span or is_ragas_integration_span:
130+
return
131+
if self._evaluator_runner:
132+
self._evaluator_runner.enqueue(span_event, span)
133+
134+
@classmethod
135+
def _llmobs_span_event(cls, span: Span) -> Tuple[Dict[str, Any], bool]:
136+
"""Span event object structure."""
137+
span_kind = span._get_ctx_item(SPAN_KIND)
138+
if not span_kind:
139+
raise KeyError("Span kind not found in span context")
140+
meta: Dict[str, Any] = {"span.kind": span_kind, "input": {}, "output": {}}
141+
if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None:
142+
meta["model_name"] = span._get_ctx_item(MODEL_NAME)
143+
meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower()
144+
meta["metadata"] = span._get_ctx_item(METADATA) or {}
145+
if span._get_ctx_item(INPUT_PARAMETERS):
146+
meta["input"]["parameters"] = span._get_ctx_item(INPUT_PARAMETERS)
147+
if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None:
148+
meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES)
149+
if span._get_ctx_item(INPUT_VALUE) is not None:
150+
meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE))
151+
if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None:
152+
meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES)
153+
if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None:
154+
meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS)
155+
if span._get_ctx_item(OUTPUT_VALUE) is not None:
156+
meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE))
157+
if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None:
158+
meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS)
159+
if span._get_ctx_item(INPUT_PROMPT) is not None:
160+
prompt_json_str = span._get_ctx_item(INPUT_PROMPT)
161+
if span_kind != "llm":
162+
log.warning(
163+
"Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds."
164+
)
165+
else:
166+
meta["input"]["prompt"] = prompt_json_str
167+
if span.error:
168+
meta.update(
169+
{
170+
ERROR_MSG: span.get_tag(ERROR_MSG),
171+
ERROR_STACK: span.get_tag(ERROR_STACK),
172+
ERROR_TYPE: span.get_tag(ERROR_TYPE),
173+
}
174+
)
175+
if not meta["input"]:
176+
meta.pop("input")
177+
if not meta["output"]:
178+
meta.pop("output")
179+
metrics = span._get_ctx_item(METRICS) or {}
180+
ml_app = _get_ml_app(span)
181+
182+
is_ragas_integration_span = False
183+
184+
if ml_app.startswith(constants.RAGAS_ML_APP_PREFIX):
185+
is_ragas_integration_span = True
186+
187+
span._set_ctx_item(ML_APP, ml_app)
188+
parent_id = str(_get_llmobs_parent_id(span) or "undefined")
189+
190+
llmobs_span_event = {
191+
"trace_id": "{:x}".format(span.trace_id),
192+
"span_id": str(span.span_id),
193+
"parent_id": parent_id,
194+
"name": _get_span_name(span),
195+
"start_ns": span.start_ns,
196+
"duration": span.duration_ns,
197+
"status": "error" if span.error else "ok",
198+
"meta": meta,
199+
"metrics": metrics,
200+
}
201+
session_id = _get_session_id(span)
202+
if session_id is not None:
203+
span._set_ctx_item(SESSION_ID, session_id)
204+
llmobs_span_event["session_id"] = session_id
205+
206+
llmobs_span_event["tags"] = cls._llmobs_tags(
207+
span, ml_app, session_id, is_ragas_integration_span=is_ragas_integration_span
208+
)
209+
return llmobs_span_event, is_ragas_integration_span
210+
211+
@staticmethod
212+
def _llmobs_tags(
213+
span: Span, ml_app: str, session_id: Optional[str] = None, is_ragas_integration_span: bool = False
214+
) -> List[str]:
215+
tags = {
216+
"version": config.version or "",
217+
"env": config.env or "",
218+
"service": span.service or "",
219+
"source": "integration",
220+
"ml_app": ml_app,
221+
"ddtrace.version": ddtrace.__version__,
222+
"language": "python",
223+
"error": span.error,
224+
}
225+
err_type = span.get_tag(ERROR_TYPE)
226+
if err_type:
227+
tags["error_type"] = err_type
228+
if session_id:
229+
tags["session_id"] = session_id
230+
if is_ragas_integration_span:
231+
tags[constants.RUNNER_IS_INTEGRATION_SPAN_TAG] = "ragas"
232+
existing_tags = span._get_ctx_item(TAGS)
233+
if existing_tags is not None:
234+
tags.update(existing_tags)
235+
return ["{}:{}".format(k, v) for k, v in tags.items()]
236+
237+
def _do_annotations(self, span: Span) -> None:
112238
# get the current span context
113239
# only do the annotations if it matches the context
114240
if span.span_type != SpanTypes.LLM: # do this check to avoid the warning log in `annotate`
@@ -120,20 +246,14 @@ def _do_annotations(self, span):
120246
if current_context_id == context_id:
121247
self.annotate(span, **annotation_kwargs)
122248

123-
def _child_after_fork(self):
249+
def _child_after_fork(self) -> None:
124250
self._llmobs_span_writer = self._llmobs_span_writer.recreate()
125251
self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
126252
self._evaluator_runner = self._evaluator_runner.recreate()
127-
self._trace_processor._span_writer = self._llmobs_span_writer
128-
self._trace_processor._evaluator_runner = self._evaluator_runner
129253
if self.enabled:
130254
self._start_service()
131255

132256
def _start_service(self) -> None:
133-
tracer_filters = self.tracer._filters
134-
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
135-
tracer_filters += [self._trace_processor]
136-
self.tracer.configure(settings={"FILTERS": tracer_filters})
137257
try:
138258
self._llmobs_span_writer.start()
139259
self._llmobs_eval_metric_writer.start()
@@ -160,11 +280,7 @@ def _stop_service(self) -> None:
160280
except ServiceStatusError:
161281
log.debug("Error stopping LLMObs writers")
162282

163-
try:
164-
forksafe.unregister(self._child_after_fork)
165-
self.tracer.shutdown()
166-
except Exception:
167-
log.warning("Failed to shutdown tracer", exc_info=True)
283+
forksafe.unregister(self._child_after_fork)
168284

169285
@classmethod
170286
def enable(
@@ -265,7 +381,6 @@ def disable(cls) -> None:
265381

266382
cls._instance.stop()
267383
cls.enabled = False
268-
cls._instance.tracer.deregister_on_start_span(cls._instance._do_annotations)
269384
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, False)
270385

271386
log.debug("%s disabled", cls.__name__)

0 commit comments

Comments
 (0)