Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.util.context.ContextView;

/**
* Middleware that adds OpenTelemetry tracing to the agent lifecycle.
Expand Down Expand Up @@ -99,11 +100,13 @@ public Flux<AgentEvent> onAgent(
RuntimeContext ctx,
AgentInput input,
Function<AgentInput, Flux<AgentEvent>> next) {
return Flux.defer(
() -> {
return Flux.deferContextual(
ctxView -> {
Context parentContext = resolveOtelContext(ctxView);
Span span =
getTracer()
.spanBuilder("invoke_agent " + agent.getName())
.setParent(parentContext)
.setAttribute("gen_ai.operation.name", "invoke_agent")
.setAttribute("gen_ai.agent.name", agent.getName())
.setAttribute(
Expand All @@ -114,7 +117,7 @@ public Flux<AgentEvent> onAgent(
(long) input.msgs().size())
.startSpan();

Context otelCtx = Context.current().with(span);
Context otelCtx = span.storeInContext(parentContext);
AtomicReference<Boolean> ended = new AtomicReference<>(false);

return ContextPropagationOperator.runWithContext(
Expand Down Expand Up @@ -165,13 +168,15 @@ public Flux<AgentEvent> onModelCall(
RuntimeContext ctx,
ModelCallInput input,
Function<ModelCallInput, Flux<AgentEvent>> next) {
return Flux.defer(
() -> {
return Flux.deferContextual(
ctxView -> {
Context parentContext = resolveOtelContext(ctxView);
Model model = input.model();
String modelName = model != null ? model.getModelName() : "unknown";
Span span =
getTracer()
.spanBuilder("chat " + modelName)
.setParent(parentContext)
.setAttribute("gen_ai.operation.name", "chat")
.setAttribute("gen_ai.request.model", modelName)
.setAttribute(
Expand All @@ -184,7 +189,7 @@ public Flux<AgentEvent> onModelCall(
: 0L)
.startSpan();

Context otelCtx = Context.current().with(span);
Context otelCtx = span.storeInContext(parentContext);
AtomicReference<Boolean> ended = new AtomicReference<>(false);

return ContextPropagationOperator.runWithContext(
Expand Down Expand Up @@ -232,18 +237,21 @@ public Flux<AgentEvent> onActing(
RuntimeContext ctx,
ActingInput input,
Function<ActingInput, Flux<AgentEvent>> next) {
return Flux.defer(
() -> {
return Flux.deferContextual(
ctxView -> {
Context parentContext = resolveOtelContext(ctxView);
String toolNames =
input.toolCalls() != null
? input.toolCalls().stream()
.map(ToolUseBlock::getName)
.collect(Collectors.joining(", "))
: "unknown";
String spanName = buildToolSpanName(input);

Span span =
getTracer()
.spanBuilder("execute_tool " + toolNames)
.spanBuilder("execute_tool " + spanName)
.setParent(parentContext)
.setAttribute("gen_ai.operation.name", "execute_tool")
.setAttribute("gen_ai.tool.name", toolNames)
.setAttribute(
Expand All @@ -253,9 +261,9 @@ public Flux<AgentEvent> onActing(
: 0L)
.startSpan();

Context otelCtx = Context.current().with(span);
Context otelCtx = span.storeInContext(parentContext);
AtomicReference<Boolean> ended = new AtomicReference<>(false);
Set<String> callIds = new LinkedHashSet<>();
Set<String> callIds = ConcurrentHashMap.newKeySet();

return ContextPropagationOperator.runWithContext(
next.apply(input)
Expand Down Expand Up @@ -300,6 +308,26 @@ public Flux<AgentEvent> onActing(
// helpers
// ------------------------------------------------------------------

// Reads OTel Context from Reactor ContextView first; falls back to ThreadLocal
// Context.current()
// so spans created inside a reactive pipeline can find their parent even after a thread hop.
private Context resolveOtelContext(ContextView ctxView) {
return ContextPropagationOperator.getOpenTelemetryContextFromContextView(
ctxView, Context.current());
}

// Uses the first tool name as the span name; appends "(+N more)" for batches to cap
// cardinality.
// Full tool name list is still available in the gen_ai.tool.name attribute.
private static String buildToolSpanName(ActingInput input) {
if (input.toolCalls() == null || input.toolCalls().isEmpty()) {
return "unknown";
}
String first = input.toolCalls().get(0).getName();
int rest = input.toolCalls().size() - 1;
return rest > 0 ? first + " (+" + rest + " more)" : first;
}

private void setToolCallIds(Span span, Set<String> callIds) {
if (!callIds.isEmpty()) {
span.setAttribute("gen_ai.tool.call.id", String.join(",", callIds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,57 @@ void onActing_aggregatesAllToolCallIds() {
assertTrue(callIds.contains("call-b"));
}

@Test
void onActing_multiTool_spanNameTruncatedButAttributeContainsAll() {
Agent agent = stubAgent("multi-tool-agent2", "agent-104");
ToolUseBlock t1 = ToolUseBlock.builder().id("c1").name("search").input(Map.of()).build();
ToolUseBlock t2 = ToolUseBlock.builder().id("c2").name("calc").input(Map.of()).build();
ToolUseBlock t3 = ToolUseBlock.builder().id("c3").name("fetchUrl").input(Map.of()).build();
ActingInput input = new ActingInput(List.of(t1, t2, t3));

ToolResultEndEvent r1 =
new ToolResultEndEvent("reply-1", "c1", "search", ToolResultState.SUCCESS);
ToolResultEndEvent r2 =
new ToolResultEndEvent("reply-1", "c2", "calc", ToolResultState.SUCCESS);
ToolResultEndEvent r3 =
new ToolResultEndEvent("reply-1", "c3", "fetchUrl", ToolResultState.SUCCESS);

middleware.onActing(agent, null, input, in -> Flux.just(r1, r2, r3)).collectList().block();

List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertEquals(1, spans.size());

SpanData span = spans.get(0);
// span name uses first tool + count suffix to cap cardinality
assertEquals("execute_tool search (+2 more)", span.getName());
// gen_ai.tool.name attribute still has all names
String toolNames =
span.getAttributes()
.get(
io.opentelemetry.api.common.AttributeKey.stringKey(
"gen_ai.tool.name"));
assertNotNull(toolNames);
assertTrue(toolNames.contains("search"));
assertTrue(toolNames.contains("calc"));
assertTrue(toolNames.contains("fetchUrl"));
}

@Test
void onActing_singleTool_spanNameIsToolName() {
Agent agent = stubAgent("single-tool-agent", "agent-105");
ToolUseBlock t1 = ToolUseBlock.builder().id("c1").name("myTool").input(Map.of()).build();
ActingInput input = new ActingInput(List.of(t1));

ToolResultEndEvent r1 =
new ToolResultEndEvent("reply-1", "c1", "myTool", ToolResultState.SUCCESS);

middleware.onActing(agent, null, input, in -> Flux.just(r1)).collectList().block();

List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertEquals(1, spans.size());
assertEquals("execute_tool myTool", spans.get(0).getName());
}

// ------------------------------------------------------------------
// helpers
// ------------------------------------------------------------------
Expand Down
Loading