Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(langgraph): submit spans from langgraph to APM and LLMObs #11730

Merged
merged 44 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7c5cdac
init
sabrenner Dec 12, 2024
d00517d
wip
sabrenner Dec 13, 2024
24a9a13
Merge branch 'main' of github.com:DataDog/dd-trace-py into sabrenner/…
sabrenner Dec 13, 2024
263ab81
wip
sabrenner Dec 13, 2024
529502b
move node invokes logic to integration
sabrenner Dec 16, 2024
0be2c82
just handle default span links in langgraph integration for now
sabrenner Dec 16, 2024
f195f6e
input to input linkage
sabrenner Dec 17, 2024
b94a84f
more possible but brittle linking logic
sabrenner Dec 17, 2024
3d545e2
addressing some feedback
sabrenner Dec 17, 2024
6b4b67b
Merge branch 'main' of github.com:DataDog/dd-trace-py into sabrenner/…
sabrenner Dec 17, 2024
e9b43bc
clean up graph, subgraph as a node, and subgraph invoked from node logic
sabrenner Dec 18, 2024
120651c
Refactor for readability, move _node_invokes to integration level
Yun-Kim Dec 20, 2024
757d1d8
Add node dict clearing logic after full graph execution
Yun-Kim Dec 23, 2024
cd5b393
Add async patching, basic patch tests
Yun-Kim Dec 23, 2024
82d879e
Add tests
Yun-Kim Dec 27, 2024
c5d04bf
Add lockfiles, fmt, add suitespec
Yun-Kim Dec 27, 2024
1701b91
Release note
Yun-Kim Dec 27, 2024
bb45036
Docs, remove unnecessary config
Yun-Kim Dec 27, 2024
6385b44
Docs
Yun-Kim Dec 27, 2024
c36ac96
Merge branch 'main' into sabrenner/langgraph
Yun-Kim Dec 27, 2024
280c033
fmt
Yun-Kim Dec 30, 2024
d9c8966
Make span sample rate optional config for init
Yun-Kim Dec 31, 2024
921c0cd
add simple stream patching
sabrenner Jan 3, 2025
81ad3a6
update apm tests to cover all functions
sabrenner Jan 3, 2025
faed9d6
fix rel note and add some llmobs output testing
sabrenner Jan 3, 2025
41d1a6c
fmt
sabrenner Jan 3, 2025
859c966
change to __anext__
sabrenner Jan 7, 2025
4e077ab
Merge branch 'main' into sabrenner/langgraph
sabrenner Jan 7, 2025
22c7d9d
Merge branch 'main' into sabrenner/langgraph
sabrenner Jan 9, 2025
4c63c72
Add lockfile for 3.13
Yun-Kim Jan 9, 2025
74a2e87
Override base patch test due to langgraph import structure
Yun-Kim Jan 9, 2025
ed37f93
Merge branch 'main' of github.com:DataDog/dd-trace-py into sabrenner/…
sabrenner Jan 9, 2025
ff45cef
add span event changes to _llmobs
sabrenner Jan 9, 2025
f1ac1ec
fix llmobs tests
sabrenner Jan 9, 2025
68abdf3
only enabled by explicit env var
sabrenner Jan 9, 2025
cc31619
fix rel note
sabrenner Jan 9, 2025
63f7255
try enabling testagent
sabrenner Jan 10, 2025
0370e96
some review comments
sabrenner Jan 13, 2025
ec996b0
add patching gate via private envvar
sabrenner Jan 14, 2025
d6e5e8e
remove rel note
sabrenner Jan 14, 2025
79e78f8
put env var setting in-code for tests
sabrenner Jan 14, 2025
2621dc1
remove more docs
sabrenner Jan 14, 2025
ebc7128
comments
sabrenner Jan 16, 2025
1a2dab7
Merge branch 'main' into sabrenner/langgraph
sabrenner Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
"unittest": True,
"coverage": False,
"selenium": True,
"langgraph": True,
}


Expand Down Expand Up @@ -146,6 +147,7 @@
"httplib": ("http.client",),
"kafka": ("confluent_kafka",),
"google_generativeai": ("google.generativeai",),
"langgraph": ("langgraph.graph",),
}


Expand Down
153 changes: 153 additions & 0 deletions ddtrace/contrib/internal/langgraph/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os
import sys

import langgraph

from ddtrace import config
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.contrib.trace_utils import wrap
from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._integrations.langgraph import LangGraphIntegration
from ddtrace.pin import Pin


def get_version():
return getattr(langgraph, "__version__", "")


config._add(
"langgraph",
{
"span_prompt_completion_sample_rate": float(os.getenv("DD_LANGGRAPH_SPAN_PROMPT_COMPLETION_SAMPLE_RATE", 1.0)),
"span_char_limit": int(os.getenv("DD_LANGGRAPH_SPAN_CHAR_LIMIT", 128)),
},
)


@with_traced_module
def traced_runnable_callable_invoke(langgraph, pin, func, instance, args, kwargs):
"""
This function traces specific invocations of a RunnableCallable.
Importantly, RunnableCallables can be any sort of operation.
This includes nodes, routing functions, branching operations, and channel writes.
We are only interested in tracing the initial user-defined node.

To accomplish this, we mark the config associated with the sequence of nodes as "visited"
once we trace a non-routing or writing function the first time.
"""
node_name = instance.name
integration: LangGraphIntegration = langgraph._datadog_integration

# inputs = get_argument_value(args, kwargs, 0, "input")
config = get_argument_value(args, kwargs, 1, "config")
metadata = config.get("metadata", {}) if isinstance(config, dict) else {}
if node_name in ("_write", "_route", "_control_branch") or metadata.get("visited", False):
return func(*args, **kwargs)

span = integration.trace(
pin,
"%s.%s.%s" % (instance.__module__, instance.__class__.__name__, node_name),
submit_to_llmobs=True,
interface_type="agent",
)

result = None

try:
result = func(*args, **kwargs)
if isinstance(
config, dict
): # this needs to be better - we need another way to see if a runnablecallable is a node vs routing function
config["metadata"]["visited"] = True
except Exception:
span.set_exc_info(*sys.exc_info())
integration.metric(span, "incr", "request.error", 1)
raise
finally:
integration.llmobs_set_tags(
span, args=args, kwargs={**kwargs, "name": node_name}, response=result, operation="node"
)
span.finish()

return result


@with_traced_module
def traced_pregel_invoke(langgraph, pin, func, instance, args, kwargs):
"""
Trace the invocation of a Pregel (CompiledGraph) instance.
This operation represents the parent execution of an individual graph.
This graph could be standalone, or embedded as a subgraph in a node of a larger graph.
Under the hood, this graph will `tick` through until all computed tasks are completed.
"""
integration: LangGraphIntegration = langgraph._datadog_integration
span = integration.trace(
pin,
"%s.%s.%s" % (instance.__module__, instance.__class__.__name__, instance.name),
submit_to_llmobs=True,
interface_type="agent",
)

result = None

try:
result = func(*args, **kwargs)
except Exception:
span.set_exc_info(*sys.exc_info())
integration.metric(span, "incr", "request.error", 1)
raise
finally:
integration.llmobs_set_tags(
span, args=args, kwargs={**kwargs, "name": instance.name}, response=result, operation="graph"
)
span.finish()

return result


@with_traced_module
def patched_pregel_loop_tick(langgraph, pin, func, instance, args, kwargs):
"""
Patch the pregel loop tick. No tracing is done, and processing only happens if LLM Observability is enabled.
The underlying `handle_pregel_loop_tick` function adds span links between specific node invocations in the graph.
"""
integration: LangGraphIntegration = langgraph._datadog_integration

finished_tasks = getattr(instance, "tasks", {})
result = func(*args, **kwargs)
next_tasks = getattr(instance, "tasks", {}) # they should have been updated at this point

integration.handle_pregel_loop_tick(finished_tasks, next_tasks, result)

return result


def patch():
if getattr(langgraph, "_datadog_patch", False):
return

langgraph._datadog_patch = True

Pin().onto(langgraph)
integration = LangGraphIntegration(integration_config=config.langgraph)
langgraph._datadog_integration = integration

# wrap("langgraph", "utils.runnable.RunnableSeq.invoke", traced_runnable_seq_invoke(langgraph))
wrap("langgraph", "utils.runnable.RunnableCallable.invoke", traced_runnable_callable_invoke(langgraph))
wrap("langgraph", "pregel.Pregel.invoke", traced_pregel_invoke(langgraph))
wrap("langgraph", "pregel.loop.PregelLoop.tick", patched_pregel_loop_tick(langgraph))


def unpatch():
if not getattr(langgraph, "_datadog_patch", False):
return

langgraph._datadog_patch = False

# unwrap(langgraph.utils.runnable.RunnableSeq, "invoke")
unwrap(langgraph.utils.runnable.RunnableCallable, "invoke")
unwrap(langgraph.pregel.Pregel, "invoke")
unwrap(langgraph.pregel.loop.PregelLoop, "tick")

delattr(langgraph, "_datadog_integration")
12 changes: 12 additions & 0 deletions ddtrace/contrib/langgraph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from ddtrace.internal.utils.importlib import require_modules


required_modules = ["langgraph"]

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from ddtrace.contrib.internal.langgraph.patch import get_version
from ddtrace.contrib.internal.langgraph.patch import patch
from ddtrace.contrib.internal.langgraph.patch import unpatch

__all__ = ["patch", "unpatch", "get_version"]
3 changes: 3 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,6 @@
FAITHFULNESS_DISAGREEMENTS_METADATA = "_dd.faithfulness_disagreements"
EVALUATION_KIND_METADATA = "_dd.evaluation_kind"
EVALUATION_SPAN_METADATA = "_dd.evaluation_span"

SPAN_LINKS = "_ml_obs.span_links"
NAME = "_ml_obs.name"
185 changes: 185 additions & 0 deletions ddtrace/llmobs/_integrations/langgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from ddtrace import tracer
from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import NAME
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_LINKS
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import format_langchain_io
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.span import Span


node_invokes: Dict[str, Any] = {}


class LangGraphIntegration(BaseLLMIntegration):
_integration_name = "langgraph"

def _llmobs_set_tags(
self,
span: Span,
args: List[Any],
kwargs: Dict[str, Any],
response: Optional[Any] = None,
operation: str = "", # oneof graph, node
):
if not self.llmobs_enabled:
return

inputs = get_argument_value(args, kwargs, 0, "input")
span_name = kwargs.get("name", span.name)

span._set_ctx_items(
{
SPAN_KIND: "agent", # should nodes be workflows? should it be dynamic to if a subgraph is included?
INPUT_VALUE: format_langchain_io(inputs),
OUTPUT_VALUE: format_langchain_io(response),
NAME: span_name,
}
)

if operation != "node":
return # we set the graph span links in handle_pregel_loop_tick

config = get_argument_value(args, kwargs, 1, "config")

metadata = config.get("metadata", {}) if isinstance(config, dict) else {}
node_instance_id = metadata["langgraph_checkpoint_ns"].split(":")[-1]

node_invoke = node_invokes[node_instance_id] = node_invokes.get(node_instance_id, {})
node_invoke["span"] = {
"trace_id": "{:x}".format(span.trace_id),
"span_id": str(span.span_id),
}

node_invoke_span_links = node_invoke.get("from")

span_links = (
[
{
"span_id": str(_get_llmobs_parent_id(span)) or "undefined",
"trace_id": "{:x}".format(span.trace_id),
# we assume no span link means it is the first node of a graph
"attributes": {
"from": "input",
"to": "input",
},
}
]
if node_invoke_span_links is None
else node_invoke_span_links
)

current_span_links = span._get_ctx_item(SPAN_LINKS) or []
span._set_ctx_item(SPAN_LINKS, current_span_links + span_links)

def handle_pregel_loop_tick(self, finished_tasks: dict, next_tasks: dict, more_tasks: bool):
"""
Handle a specific tick of the pregel loop.
Specifically, this function computes incoming and outgoing span links between finished tasks
and queued tasks in the graph.

Additionally, it sets the span links at the outer ends of the graph, between the span that invokes
the graph and the last set of nodes before the graph ends.
"""
if not self.llmobs_enabled:
return

graph_span = (
tracer.current_span()
) # since we're running the the pregel loop, and not in a node, the graph span should be the current span
graph_caller = _get_nearest_llmobs_ancestor(graph_span) if graph_span else None

if not more_tasks and graph_span is not None:
span_links = [
{**node_invokes[task_id]["span"], "attributes": {"from": "output", "to": "output"}}
for task_id in finished_tasks.keys()
]

current_span_links = graph_span._get_ctx_item(SPAN_LINKS) or []
graph_span._set_ctx_item(SPAN_LINKS, current_span_links + span_links)

if graph_caller is not None:
current_graph_caller_span_links = graph_caller._get_ctx_item(SPAN_LINKS) or []
graph_caller_span_links = [
{
"span_id": str(graph_span.span_id) or "undefined",
"trace_id": "{:x}".format(graph_caller.trace_id),
"attributes": {
"from": "output",
"to": "output",
},
}
]
graph_caller._set_ctx_item(SPAN_LINKS, current_graph_caller_span_links + graph_caller_span_links)

return

if not finished_tasks and graph_caller is not None: # first tick of a graph, possibly very brittle logic
# this is for subgraph logic
if graph_span is not None:
current_span_links = graph_span._get_ctx_item(SPAN_LINKS) or []
graph_span._set_ctx_item(
SPAN_LINKS,
current_span_links
+ [
{
"span_id": str(_get_llmobs_parent_id(graph_span)) or "undefined",
"trace_id": "{:x}".format(graph_caller.trace_id),
"attributes": {
"from": "input",
"to": "input",
},
}
],
)

parent_node_names_to_ids = {task.name: task_id for task_id, task in finished_tasks.items()}

for task_id, task in next_tasks.items():
task_config = getattr(task, "config", {})
task_triggers = task_config.get("metadata", {}).get("langgraph_triggers", [])

parent_node_names = [extract_parent(trigger) for trigger in task_triggers]
parent_ids: List[str] = [
parent_node_names_to_ids.get(parent_node_name, "") for parent_node_name in parent_node_names
]

for parent_id in parent_ids:
parent_span = node_invokes.get(parent_id, {}).get("span")
if not parent_span:
continue
parent_span_link = {
**node_invokes.get(parent_id, {}).get("span", {}),
"attributes": {
"from": "output",
"to": "input",
},
}
node_invoke = node_invokes[task_id] = node_invokes.get(task_id, {})
from_nodes = node_invoke["from"] = node_invoke.get("from", [])

from_nodes.append(parent_span_link)


def extract_parent(trigger: str) -> str:
"""
Extract the parent node name from a trigger string.

The string could have the format:
- `parent:child`
- `parent:routing_logic:child`
- `branch:parent:routing_logic:child`
"""
split = trigger.split(":")
if len(split) < 3:
return split[0]
return split[1]
Loading
Loading