Skip to content
119 changes: 119 additions & 0 deletions packages/junior/src/chat/pi/traced-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import {
type Api,
type AssistantMessage,
type Context,
type Model,
streamSimple,
} from "@mariozechner/pi-ai";
import * as Sentry from "@/chat/sentry";
import {
extractGenAiUsageAttributes,
getLogContextAttributes,
serializeGenAiAttribute,
} from "@/chat/logging";

const PROVIDER_NAME = "vercel-ai-gateway";

// Compose only the OTel GenAI attributes that are knowable at span start
// (request-shape + system instructions). End-of-call attributes such as
// usage and finish reasons are set after the stream resolves.
function buildChatStartAttributes(
model: Model<Api>,
context: Context,
): Record<string, string> {
const attributes: Record<string, string> = {
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": PROVIDER_NAME,
"gen_ai.request.model": model.id,
};

const inputMessages = serializeGenAiAttribute(context.messages);
if (inputMessages) {
attributes["gen_ai.input.messages"] = inputMessages;
}

if (context.systemPrompt) {
const systemInstructions = serializeGenAiAttribute([
{ type: "text", content: context.systemPrompt },
]);
if (systemInstructions) {
attributes["gen_ai.system_instructions"] = systemInstructions;
}
}

return attributes;
}

// Composes post-stream attributes for the chat span. Two deferrals worth
// flagging: `extractGenAiUsageAttributes` currently omits cache token fields,
// and `gen_ai.response.finish_reasons` is emitted with pi-ai's raw StopReason
// (Sentry ingest expects the OTel set). Both are tracked separately and out
// of scope here.
function buildChatEndAttributes(
message: AssistantMessage,
): Record<string, string | string[] | number> {
const attributes: Record<string, string | string[] | number> = {};

const outputMessages = serializeGenAiAttribute([message]);
if (outputMessages) {
attributes["gen_ai.output.messages"] = outputMessages;
}

Object.assign(attributes, extractGenAiUsageAttributes(message));

if (message.stopReason) {
attributes["gen_ai.response.finish_reasons"] = [message.stopReason];
}

if (message.model) {
attributes["gen_ai.response.model"] = message.model;
}

return attributes;
}

/**
* Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent
* loop produces its own `gen_ai.chat` Sentry span. The returned function is
* passed to `new Agent({ streamFn: ... })` and runs once per loop iteration.
*
* The base argument exists so tests can inject a stub stream function.
*/
export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn {
return async (model, context, options) => {
const span = Sentry.startInactiveSpan({
name: `chat ${model.id}`,
op: "gen_ai.chat",
attributes: {
...getLogContextAttributes(),
...buildChatStartAttributes(model, context),
},
});

try {
const stream = await Sentry.withActiveSpan(span, () =>
Promise.resolve(base(model, context, options)),
);

stream.result().then(
(finalMessage) => {
for (const [key, value] of Object.entries(
buildChatEndAttributes(finalMessage),
)) {
span.setAttribute(key, value);
}
span.end();
},
() => {
span.end();
},
);
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

return stream;
} catch (error) {
span.end();
throw error;
}
Comment thread
cursor[bot] marked this conversation as resolved.
};
}
2 changes: 2 additions & 0 deletions packages/junior/src/chat/respond.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
resolveGatewayModel,
} from "@/chat/pi/client";
import type { PiMessage } from "@/chat/pi/messages";
import { createTracedStreamFn } from "@/chat/pi/traced-stream";
import {
createSandboxExecutor,
type SandboxAcquiredState,
Expand Down Expand Up @@ -927,6 +928,7 @@ export async function generateAssistantReply(
// ── Agent execution ──────────────────────────────────────────────
agent = new Agent({
getApiKey: () => getPiGatewayApiKeyOverride(),
streamFn: createTracedStreamFn(),
Comment thread
cursor[bot] marked this conversation as resolved.
initialState: {
systemPrompt: baseInstructions,
model: resolveGatewayModel(botConfig.modelId),
Expand Down
12 changes: 11 additions & 1 deletion packages/junior/src/chat/tools/slack/channel-list-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,23 @@ export function createSlackChannelListMessagesTool(
throw error;
}

return {
const summary = {
ok: true,
channel_id: targetChannelId,
count: result.messages.length,
next_cursor: result.nextCursor,
messages: result.messages,
};

return {
content: [{ type: "text" as const, text: JSON.stringify(summary) }],
details: {
ok: true,
channel_id: targetChannelId,
count: result.messages.length,
...(result.nextCursor ? { next_cursor: result.nextCursor } : {}),
},
};
Comment thread
cursor[bot] marked this conversation as resolved.
},
});
}
207 changes: 207 additions & 0 deletions packages/junior/tests/unit/chat/pi/traced-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
createAssistantMessageEventStream,
type AssistantMessage,
type Model,
} from "@mariozechner/pi-ai";

const { startInactiveSpan, withActiveSpan } = vi.hoisted(() => {
const span = {
setAttribute: vi.fn(),
setAttributes: vi.fn(),
setStatus: vi.fn(),
end: vi.fn(),
};
return {
startInactiveSpan: vi.fn((_options: unknown) => span),
withActiveSpan: vi.fn(<T>(_s: unknown, cb: () => T) => cb()),
};
});

vi.mock("@/chat/sentry", () => ({
startInactiveSpan,
withActiveSpan,
}));

function fakeModel(id: string): Model<"anthropic-messages"> {
return { id } as unknown as Model<"anthropic-messages">;
}

function fakeMessage(): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text: "hi" }],
api: "anthropic-messages",
provider: "vercel-ai-gateway",
model: "openai/gpt-5.4",
usage: {
input: 100,
output: 5,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 105,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}

type SpanMock = {
setAttribute: ReturnType<typeof vi.fn>;
setAttributes: ReturnType<typeof vi.fn>;
setStatus: ReturnType<typeof vi.fn>;
end: ReturnType<typeof vi.fn>;
};

function getSpan(): SpanMock {
return startInactiveSpan.mock.results[0]!.value as SpanMock;
}

describe("createTracedStreamFn", () => {
afterEach(() => {
vi.clearAllMocks();
vi.resetModules();
});

it("opens a gen_ai.chat span when invoked", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
const returned = await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

expect(returned).toBe(stream);
expect(startInactiveSpan).toHaveBeenCalledTimes(1);
const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as {
name: string;
op: string;
};
expect(opts.op).toBe("gen_ai.chat");
expect(opts.name).toBe("chat openai/gpt-5.4");
});

it("sets gen_ai.input.messages and gen_ai.system_instructions on the chat span", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
await traced(
fakeModel("openai/gpt-5.4"),
{
systemPrompt: "you are junior",
messages: [{ role: "user", content: "hello", timestamp: 0 }],
},
undefined,
);

const opts = startInactiveSpan.mock.calls[0]?.[0] as unknown as {
attributes: Record<string, unknown>;
};
expect(opts.attributes["gen_ai.provider.name"]).toBe("vercel-ai-gateway");
expect(typeof opts.attributes["gen_ai.input.messages"]).toBe("string");
expect(opts.attributes["gen_ai.input.messages"]).toContain("hello");
expect(typeof opts.attributes["gen_ai.system_instructions"]).toBe("string");
expect(opts.attributes["gen_ai.system_instructions"]).toContain(
"you are junior",
);
expect(opts.attributes["gen_ai.operation.name"]).toBe("chat");
expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4");
});

it("sets output.messages, usage tokens, finish_reasons, response.model after stream completion", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
const returned = await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

expect(returned).toBe(stream);

// Resolve the stream's terminal Promise to trigger end-attribute population.
const finalMessage = fakeMessage();
stream.end(finalMessage);
await stream.result();
// Allow the .then callback to flush.
await new Promise((r) => setImmediate(r));

const span = getSpan();
const endAttributes = Object.fromEntries(
span.setAttribute.mock.calls.map((c) => [c[0], c[1]]),
);
expect(typeof endAttributes["gen_ai.output.messages"]).toBe("string");
expect(endAttributes["gen_ai.usage.input_tokens"]).toBe(100);
expect(endAttributes["gen_ai.usage.output_tokens"]).toBe(5);
expect(endAttributes["gen_ai.response.finish_reasons"]).toEqual(["stop"]);
expect(endAttributes["gen_ai.response.model"]).toBe("openai/gpt-5.4");
expect(span.end).toHaveBeenCalledTimes(1);
});

it("inherits LogContext attributes (e.g. gen_ai.conversation.id) onto the chat span", async () => {
const { withLogContext } = await import("@/chat/logging");
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);
const traced = createTracedStreamFn(base as unknown as StreamFn);

await withLogContext(
{ conversationId: "conv_123", runId: "run_456" },
async () => {
await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);
},
);

const opts = startInactiveSpan.mock.calls[0]?.[0] as {
attributes: Record<string, unknown>;
};
expect(opts.attributes["gen_ai.conversation.id"]).toBe("conv_123");
expect(opts.attributes["app.run.id"]).toBe("run_456");
// wrapper-supplied attributes still present
expect(opts.attributes["gen_ai.operation.name"]).toBe("chat");
expect(opts.attributes["gen_ai.request.model"]).toBe("openai/gpt-5.4");
});

it("ends the span when the stream errors", async () => {
const { createTracedStreamFn } = await import("@/chat/pi/traced-stream");
const stream = createAssistantMessageEventStream();
const base = vi.fn(() => stream);

const traced = createTracedStreamFn(base as unknown as StreamFn);
await traced(
fakeModel("openai/gpt-5.4"),
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
undefined,
);

// pi-ai's AssistantMessageEventStream resolves `result()` with the carrier
// AssistantMessage on `error` events instead of rejecting, so the wrapper's
// `.then` success arm runs on the error path. The load-bearing invariant
// is that the span ends exactly once.
const errorMessage = { ...fakeMessage(), stopReason: "error" as const };
stream.push({ type: "error", reason: "error", error: errorMessage });
await stream.result();
await new Promise((r) => setImmediate(r));

const span = getSpan();
expect(span.end).toHaveBeenCalledTimes(1);
// End attributes are still populated because the success arm runs.
const endAttributeKeys = span.setAttribute.mock.calls.map((c) => c[0]);
expect(endAttributeKeys).toContain("gen_ai.output.messages");
});
});
9 changes: 9 additions & 0 deletions specs/logging/tracing-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- 2026-04-06: Added official GenAI finish-reasons, system-instructions, and tool-description tracing attributes.
- 2026-04-28: Added MCP tool-call span attributes from the OpenTelemetry MCP semantic conventions.
- 2026-05-01: Added `gen_ai.conversation.id` as required correlation context for GenAI spans when available.
- 2026-05-06: Documented the `gen_ai.invoke_agent` / `gen_ai.chat` span hierarchy rule.
- 2026-05-11: Aligned exception details and GenAI cache token counters with OpenTelemetry 1.41.0.

## Status
Expand Down Expand Up @@ -184,6 +185,14 @@ semantic conventions:
- Sandbox spans should be nested under `ai.generate_assistant_reply` when invoked during reply generation.
- Sandbox execution spans should be nested under the active tool-call/request span context.

### GenAI Span Hierarchy

- A `gen_ai.invoke_agent` span MUST have at least one `gen_ai.chat` child covering the LLM call(s) issued during its agent loop.
- A `gen_ai.chat` span MAY appear at the top level (as a sibling of `gen_ai.invoke_agent`, or under a non-`gen_ai.*` parent such as `chat.route_thinking`) only when it represents an LLM call that is independent of an agent loop, for example a routing or classification pre-flight.
- Every `gen_ai.chat` span MUST carry `gen_ai.input.messages` and `gen_ai.output.messages`.
- The parent `gen_ai.invoke_agent` MAY also carry `gen_ai.input.messages` / `gen_ai.output.messages` as a high-level rollup; this is optional.
- The per-iteration `gen_ai.chat` child span is created in `packages/junior/src/chat/pi/traced-stream.ts` via the `streamFn` injected into `pi-agent-core`'s `Agent`.

## Rollout Guidance

- Start with lifecycle + I/O spans.
Expand Down
Loading