diff --git a/TELEMETRY.md b/TELEMETRY.md index c4148315..4e314541 100644 --- a/TELEMETRY.md +++ b/TELEMETRY.md @@ -134,7 +134,7 @@ Events: `agent_message_in`, `agent_message_out`, `agent_turn_timeout`, `assistant_reply_generation_failed` Spans: `ai.generate_assistant_reply`, `ai.chat_completion`, -`chat.route_thinking`, `ai.invoke_advisor` +`chat.route_thinking`, `ai.invoke_advisor`, `gen_ai.chat` Attributes: `gen_ai.operation.name`, `gen_ai.request.model`, `gen_ai.response.finish_reasons`, `app.ai.outcome`, @@ -153,7 +153,7 @@ Events: `agent_tool_call_failed`, `mcp_tool_call_failed`, Spans: `execute_tool `, `sandbox.acquire`, `sandbox.create`, `sandbox.snapshot.resolve`, `sandbox.sync_skills`, `bash` -Attributes: `gen_ai.tool.name`, `gen_ai.tool.call.id`, `mcp.method.name`, +Attributes: `gen_ai.tool.name`, `gen_ai.tool.call.id`, `gen_ai.tool.call.result`, `mcp.method.name`, `process.executable.name`, `process.exit.code`, `app.sandbox.source`, `app.sandbox.snapshot.resolve_outcome` diff --git a/packages/junior/src/chat/pi/traced-stream.ts b/packages/junior/src/chat/pi/traced-stream.ts new file mode 100644 index 00000000..e6ee7328 --- /dev/null +++ b/packages/junior/src/chat/pi/traced-stream.ts @@ -0,0 +1,128 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { + type Api, + type AssistantMessage, + type Context, + type Model, + streamSimple, +} from "@mariozechner/pi-ai"; +import * as Sentry from "@/chat/sentry"; +import { + extractGenAiUsageAttributes, + getLogContextAttributes, + serializeGenAiAttribute, +} from "@/chat/logging"; +import { GEN_AI_PROVIDER_NAME } from "@/chat/pi/client"; + +// Compose only the OTel GenAI attributes that are knowable at span start +// (request-shape + system instructions). End-of-call attributes such as +// usage and finish reasons are set after the stream resolves. +function buildChatStartAttributes( + model: Model, + context: Context, +): Record { + const attributes: Record = { + "gen_ai.operation.name": "chat", + "gen_ai.provider.name": GEN_AI_PROVIDER_NAME, + "gen_ai.request.model": model.id, + }; + + const inputMessages = serializeGenAiAttribute(context.messages); + if (inputMessages) { + attributes["gen_ai.input.messages"] = inputMessages; + } + + if (context.systemPrompt) { + const systemInstructions = serializeGenAiAttribute([ + { type: "text", content: context.systemPrompt }, + ]); + if (systemInstructions) { + attributes["gen_ai.system_instructions"] = systemInstructions; + } + } + + return attributes; +} + +// Composes post-stream attributes for the chat span. +// Known gap: `gen_ai.response.finish_reasons` emits pi-ai's raw StopReason +// values (e.g. "toolUse", "aborted") instead of the OTel canonical set +// ("tool_use", "max_tokens"). Tracked separately, out of scope here. +function buildChatEndAttributes( + message: AssistantMessage, +): Record { + const attributes: Record = {}; + + const outputMessages = serializeGenAiAttribute([message]); + if (outputMessages) { + attributes["gen_ai.output.messages"] = outputMessages; + } + + Object.assign(attributes, extractGenAiUsageAttributes(message)); + + if (message.stopReason) { + attributes["gen_ai.response.finish_reasons"] = [message.stopReason]; + } + + if (message.model) { + attributes["gen_ai.response.model"] = message.model; + } + + return attributes; +} + +/** + * Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent + * loop produces its own `gen_ai.chat` Sentry span. The returned function is + * passed to `new Agent({ streamFn: ... })` and runs once per loop iteration. + * + * The base argument exists so tests can inject a stub stream function. + */ +export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn { + return async (model, context, options) => { + const span = Sentry.startInactiveSpan({ + name: `chat ${model.id}`, + op: "gen_ai.chat", + attributes: { + ...getLogContextAttributes(), + ...buildChatStartAttributes(model, context), + }, + }); + + try { + const stream = await Sentry.withActiveSpan(span, () => + Promise.resolve(base(model, context, options)), + ); + + stream + .result() + .then( + (finalMessage) => { + try { + for (const [key, value] of Object.entries( + buildChatEndAttributes(finalMessage), + )) { + span.setAttribute(key, value); + } + } finally { + span.end(); + } + }, + () => { + span.setStatus({ code: 2, message: "LLM stream failed" }); + span.end(); + }, + ) + .catch(() => { + // setAttribute is best-effort; suppress unexpected attribute-write + // errors so they don't surface as unhandled promise rejections. + }); + + return stream; + } catch (error) { + span.setStatus({ code: 2, message: "LLM call failed" }); + span.end(); + throw error; + } + }; +} diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index ac2f44b8..74697df3 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -51,6 +51,7 @@ import { resolveGatewayModel, } from "@/chat/pi/client"; import type { PiMessage } from "@/chat/pi/messages"; +import { createTracedStreamFn } from "@/chat/pi/traced-stream"; import { createSandboxExecutor, type SandboxAcquiredState, @@ -819,6 +820,7 @@ export async function generateAssistantReply( conversationId: sessionConversationId, logContext: spanContext, getTools: () => advisorTools, + streamFn: createTracedStreamFn(), }, }, ); @@ -927,6 +929,7 @@ export async function generateAssistantReply( // ── Agent execution ────────────────────────────────────────────── agent = new Agent({ getApiKey: () => getPiGatewayApiKeyOverride(), + streamFn: createTracedStreamFn(), initialState: { systemPrompt: baseInstructions, model: resolveGatewayModel(botConfig.modelId), diff --git a/packages/junior/src/chat/tools/slack/channel-list-messages.ts b/packages/junior/src/chat/tools/slack/channel-list-messages.ts index 09f9e902..7c780b61 100644 --- a/packages/junior/src/chat/tools/slack/channel-list-messages.ts +++ b/packages/junior/src/chat/tools/slack/channel-list-messages.ts @@ -94,13 +94,23 @@ export function createSlackChannelListMessagesTool( throw error; } - return { + const summary = { ok: true, channel_id: targetChannelId, count: result.messages.length, next_cursor: result.nextCursor, messages: result.messages, }; + + return { + content: [{ type: "text" as const, text: JSON.stringify(summary) }], + details: { + ok: true, + channel_id: targetChannelId, + count: result.messages.length, + ...(result.nextCursor ? { next_cursor: result.nextCursor } : {}), + }, + }; }, }); } diff --git a/packages/junior/tests/integration/slack-channel-tools.test.ts b/packages/junior/tests/integration/slack-channel-tools.test.ts index 308482cc..a72a7dbc 100644 --- a/packages/junior/tests/integration/slack-channel-tools.test.ts +++ b/packages/junior/tests/integration/slack-channel-tools.test.ts @@ -143,15 +143,16 @@ describe("slack channel tools", () => { max_pages: 3, }); - expect(result).toMatchObject({ + expect(result.details).toMatchObject({ ok: true, channel_id: "C123", count: 1, - next_cursor: undefined, - }); - expect(result).toMatchObject({ - messages: [{ ts: "1700000000.300", text: "hello", user: "U1" }], }); + expect(result.details).not.toHaveProperty("next_cursor"); + const body = JSON.parse(result.content[0].text); + expect(body.messages).toMatchObject([ + { ts: "1700000000.300", text: "hello", user: "U1" }, + ]); const historyCalls = getCapturedSlackApiCalls("conversations.history"); expect(historyCalls).toHaveLength(1); @@ -213,18 +214,17 @@ describe("slack channel tools", () => { max_pages: 3, }); - expect(result).toMatchObject({ + expect(result.details).toMatchObject({ ok: true, channel_id: "C123", count: 2, - next_cursor: undefined, - }); - expect(result).toMatchObject({ - messages: [ - { ts: "1700000000.500", text: "page-1", user: "U1" }, - { ts: "1700000000.501", text: "page-2", user: "U2" }, - ], }); + expect(result.details).not.toHaveProperty("next_cursor"); + const body = JSON.parse(result.content[0].text); + expect(body.messages).toMatchObject([ + { ts: "1700000000.500", text: "page-1", user: "U1" }, + { ts: "1700000000.501", text: "page-2", user: "U2" }, + ]); const historyCalls = getCapturedSlackApiCalls("conversations.history"); expect(historyCalls).toHaveLength(2); diff --git a/packages/junior/tests/unit/chat/pi/traced-stream.test.ts b/packages/junior/tests/unit/chat/pi/traced-stream.test.ts new file mode 100644 index 00000000..c4636d5f --- /dev/null +++ b/packages/junior/tests/unit/chat/pi/traced-stream.test.ts @@ -0,0 +1,278 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + createAssistantMessageEventStream, + type AssistantMessage, + type Model, +} from "@mariozechner/pi-ai"; + +const { startInactiveSpan, withActiveSpan } = vi.hoisted(() => { + const span = { + setAttribute: vi.fn(), + setAttributes: vi.fn(), + setStatus: vi.fn(), + end: vi.fn(), + }; + return { + startInactiveSpan: vi.fn((_options: unknown) => span), + withActiveSpan: vi.fn((_s: unknown, cb: () => T) => cb()), + }; +}); + +vi.mock("@/chat/sentry", () => ({ + startInactiveSpan, + withActiveSpan, +})); + +function fakeModel(id: string): Model<"anthropic-messages"> { + return { id } as unknown as Model<"anthropic-messages">; +} + +function fakeMessage(): AssistantMessage { + return { + role: "assistant", + content: [{ type: "text", text: "hi" }], + api: "anthropic-messages", + provider: "vercel-ai-gateway", + model: "openai/gpt-5.4", + usage: { + input: 100, + output: 5, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 105, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; +} + +type SpanMock = { + setAttribute: ReturnType; + setAttributes: ReturnType; + setStatus: ReturnType; + end: ReturnType; +}; + +function getSpan(): SpanMock { + return startInactiveSpan.mock.results[0]!.value as SpanMock; +} + +describe("createTracedStreamFn", () => { + afterEach(() => { + vi.clearAllMocks(); + vi.resetModules(); + }); + + it("opens a gen_ai.chat span when invoked", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + const returned = await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + expect(returned).toBe(stream); + expect(startInactiveSpan).toHaveBeenCalledTimes(1); + const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { + name: string; + op: string; + }; + expect(opts.op).toBe("gen_ai.chat"); + expect(opts.name).toBe("chat openai/gpt-5.4"); + }); + + it("sets gen_ai.input.messages and gen_ai.system_instructions on the chat span", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { + systemPrompt: "you are junior", + messages: [{ role: "user", content: "hello", timestamp: 0 }], + }, + undefined, + ); + + const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as { + attributes: Record; + }; + expect(opts.attributes["gen_ai.provider.name"]).toBe("vercel-ai-gateway"); + expect(typeof opts.attributes["gen_ai.input.messages"]).toBe("string"); + expect(opts.attributes["gen_ai.input.messages"]).toContain("hello"); + expect(typeof opts.attributes["gen_ai.system_instructions"]).toBe("string"); + expect(opts.attributes["gen_ai.system_instructions"]).toContain( + "you are junior", + ); + expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); + expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); + }); + + it("sets output.messages, usage tokens, finish_reasons, response.model after stream completion", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + const returned = await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + expect(returned).toBe(stream); + + // Resolve the stream's terminal Promise to trigger end-attribute population. + const finalMessage = fakeMessage(); + stream.end(finalMessage); + await stream.result(); + // Allow the .then callback to flush. + await new Promise((r) => setImmediate(r)); + + const span = getSpan(); + const endAttributes = Object.fromEntries( + span.setAttribute.mock.calls.map((c) => [c[0], c[1]]), + ); + expect(typeof endAttributes["gen_ai.output.messages"]).toBe("string"); + expect(endAttributes["gen_ai.usage.input_tokens"]).toBe(100); + expect(endAttributes["gen_ai.usage.output_tokens"]).toBe(5); + expect(endAttributes["gen_ai.response.finish_reasons"]).toEqual(["stop"]); + expect(endAttributes["gen_ai.response.model"]).toBe("openai/gpt-5.4"); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it("inherits LogContext attributes (e.g. gen_ai.conversation.id) onto the chat span", async () => { + const { withLogContext } = await import("@/chat/logging"); + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + const traced = createTracedStreamFn(base as unknown as StreamFn); + + await withLogContext( + { conversationId: "conv_123", runId: "run_456" }, + async () => { + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + }, + ); + + const opts = startInactiveSpan.mock.calls[0]?.[0] as { + attributes: Record; + }; + expect(opts.attributes["gen_ai.conversation.id"]).toBe("conv_123"); + expect(opts.attributes["app.run.id"]).toBe("run_456"); + // wrapper-supplied attributes still present + expect(opts.attributes["gen_ai.operation.name"]).toBe("chat"); + expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4"); + }); + + it("ends the span when the stream errors", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + // pi-ai's AssistantMessageEventStream resolves `result()` with the carrier + // AssistantMessage on `error` events instead of rejecting, so the wrapper's + // `.then` success arm runs on the error path. The load-bearing invariant + // is that the span ends exactly once. + const errorMessage = { ...fakeMessage(), stopReason: "error" as const }; + stream.push({ type: "error", reason: "error", error: errorMessage }); + await stream.result(); + await new Promise((r) => setImmediate(r)); + + const span = getSpan(); + expect(span.end).toHaveBeenCalledTimes(1); + // End attributes are still populated because the success arm runs. + const endAttributeKeys = span.setAttribute.mock.calls.map((c) => c[0]); + expect(endAttributeKeys).toContain("gen_ai.output.messages"); + }); + + it("sets error status and ends the span when base() throws", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const base = vi.fn(() => { + throw new Error("gateway down"); + }); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await expect( + traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ), + ).rejects.toThrow("gateway down"); + + const span = getSpan(); + expect(span.setStatus).toHaveBeenCalledWith({ + code: 2, + message: "LLM call failed", + }); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it("sets error status and ends the span when stream.result() rejects", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const fakeStream = { + result: () => Promise.reject(new Error("stream failure")), + }; + const base = vi.fn(() => fakeStream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + await new Promise((r) => setImmediate(r)); + + const span = getSpan(); + expect(span.setStatus).toHaveBeenCalledWith({ + code: 2, + message: "LLM stream failed", + }); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it("ends the span even when setAttribute throws in the success callback", async () => { + const { createTracedStreamFn } = await import("@/chat/pi/traced-stream"); + const stream = createAssistantMessageEventStream(); + const base = vi.fn(() => stream); + + const traced = createTracedStreamFn(base as unknown as StreamFn); + await traced( + fakeModel("openai/gpt-5.4"), + { messages: [{ role: "user", content: "hi", timestamp: 0 }] }, + undefined, + ); + + const span = getSpan(); + span.setAttribute.mockImplementation(() => { + throw new Error("setAttribute exploded"); + }); + + stream.end(fakeMessage()); + await stream.result(); + await new Promise((r) => setImmediate(r)); + + expect(span.end).toHaveBeenCalledTimes(1); + }); +}); diff --git a/specs/logging/tracing-spec.md b/specs/logging/tracing-spec.md index 34c018b4..93430aef 100644 --- a/specs/logging/tracing-spec.md +++ b/specs/logging/tracing-spec.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-02-25 -- Last Edited: 2026-05-11 +- Last Edited: 2026-05-21 ## Changelog @@ -14,7 +14,9 @@ - 2026-04-06: Added official GenAI finish-reasons, system-instructions, and tool-description tracing attributes. - 2026-04-28: Added MCP tool-call span attributes from the OpenTelemetry MCP semantic conventions. - 2026-05-01: Added `gen_ai.conversation.id` as required correlation context for GenAI spans when available. +- 2026-05-06: Documented the `gen_ai.invoke_agent` / `gen_ai.chat` span hierarchy rule. - 2026-05-11: Aligned exception details and GenAI cache token counters with OpenTelemetry 1.41.0. +- 2026-05-21: Added error status rule for failed gen_ai.chat spans; extended traced streamFn coverage to the advisor agent loop. ## Status @@ -184,6 +186,15 @@ semantic conventions: - Sandbox spans should be nested under `ai.generate_assistant_reply` when invoked during reply generation. - Sandbox execution spans should be nested under the active tool-call/request span context. +### GenAI Span Hierarchy + +- A `gen_ai.invoke_agent` span MUST have at least one `gen_ai.chat` child covering the LLM call(s) issued during its agent loop. +- A `gen_ai.chat` span MAY appear at the top level (as a sibling of `gen_ai.invoke_agent`, or under a non-`gen_ai.*` parent such as `chat.route_thinking`) only when it represents an LLM call that is independent of an agent loop, for example a routing or classification pre-flight. +- Every `gen_ai.chat` span MUST carry `gen_ai.input.messages` and `gen_ai.output.messages`. +- The parent `gen_ai.invoke_agent` MAY also carry `gen_ai.input.messages` / `gen_ai.output.messages` as a high-level rollup; this is optional. +- A `gen_ai.chat` span MUST have its status set to error (code 2) when the underlying LLM call fails — either because `streamFn` itself throws or because the returned stream rejects. +- The per-iteration `gen_ai.chat` child span is created in `packages/junior/src/chat/pi/traced-stream.ts` via the `streamFn` injected into `pi-agent-core`'s `Agent`. This applies to both the main agent and the advisor agent. + ## Rollout Guidance - Start with lifecycle + I/O spans.