Skip to content
130 changes: 130 additions & 0 deletions packages/junior/src/chat/pi/traced-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import {
type Api,
type AssistantMessage,
type Context,
type Model,
streamSimple,
} from "@mariozechner/pi-ai";
import * as Sentry from "@/chat/sentry";
import {
extractGenAiUsageAttributes,
getLogContextAttributes,
serializeGenAiAttribute,
} from "@/chat/logging";

const PROVIDER_NAME = "vercel-ai-gateway";

// Compose only the OTel GenAI attributes that are knowable at span start
// (request-shape + system instructions). End-of-call attributes such as
// usage and finish reasons are set after the stream resolves.
function buildChatStartAttributes(
model: Model<Api>,
context: Context,
): Record<string, string> {
const attributes: Record<string, string> = {
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": PROVIDER_NAME,
"gen_ai.request.model": model.id,
};

const inputMessages = serializeGenAiAttribute(context.messages);
if (inputMessages) {
attributes["gen_ai.input.messages"] = inputMessages;
}

if (context.systemPrompt) {
const systemInstructions = serializeGenAiAttribute([
{ type: "text", content: context.systemPrompt },
]);
if (systemInstructions) {
attributes["gen_ai.system_instructions"] = systemInstructions;
}
}

return attributes;
}

// Composes post-stream attributes for the chat span. Two deferrals worth
// flagging: `extractGenAiUsageAttributes` currently omits cache token fields,
// and `gen_ai.response.finish_reasons` is emitted with pi-ai's raw StopReason
// (Sentry ingest expects the OTel set). Both are tracked separately and out
// of scope here.
function buildChatEndAttributes(
message: AssistantMessage,
): Record<string, string | string[] | number> {
const attributes: Record<string, string | string[] | number> = {};

const outputMessages = serializeGenAiAttribute([message]);
if (outputMessages) {
attributes["gen_ai.output.messages"] = outputMessages;
}

Object.assign(attributes, extractGenAiUsageAttributes(message));

if (message.stopReason) {
attributes["gen_ai.response.finish_reasons"] = [message.stopReason];
}

if (message.model) {
attributes["gen_ai.response.model"] = message.model;
}

return attributes;
}

/**
* Wraps pi-ai's `streamSimple` so each LLM call inside a pi-agent-core agent
* loop produces its own `gen_ai.chat` Sentry span. The returned function is
* passed to `new Agent({ streamFn: ... })` and runs once per loop iteration.
*
* The base argument exists so tests can inject a stub stream function.
*/
export function createTracedStreamFn(base: StreamFn = streamSimple): StreamFn {
return async (model, context, options) => {
const span = Sentry.startInactiveSpan({
name: `chat ${model.id}`,
op: "gen_ai.chat",
attributes: {
...getLogContextAttributes(),
...buildChatStartAttributes(model, context),
},
});

try {
const stream = await Sentry.withActiveSpan(span, () =>
Promise.resolve(base(model, context, options)),
);

stream
.result()
.then(
(finalMessage) => {
try {
for (const [key, value] of Object.entries(
buildChatEndAttributes(finalMessage),
)) {
span.setAttribute(key, value);
}
} finally {
span.end();
}
},
() => {
span.setStatus({ code: 2, message: "LLM stream failed" });
span.end();
},
Comment thread
obostjancic marked this conversation as resolved.
)
.catch(() => {
// setAttribute is best-effort; suppress unexpected attribute-write
// errors so they don't surface as unhandled promise rejections.
});

return stream;
} catch (error) {
span.setStatus({ code: 2, message: "LLM call failed" });
span.end();
throw error;
}
Comment thread
cursor[bot] marked this conversation as resolved.
};
}
3 changes: 3 additions & 0 deletions packages/junior/src/chat/respond.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
resolveGatewayModel,
} from "@/chat/pi/client";
import type { PiMessage } from "@/chat/pi/messages";
import { createTracedStreamFn } from "@/chat/pi/traced-stream";
import {
createSandboxExecutor,
type SandboxAcquiredState,
Expand Down Expand Up @@ -819,6 +820,7 @@ export async function generateAssistantReply(
conversationId: sessionConversationId,
logContext: spanContext,
getTools: () => advisorTools,
streamFn: createTracedStreamFn(),
},
},
);
Expand Down Expand Up @@ -927,6 +929,7 @@ export async function generateAssistantReply(
// ── Agent execution ──────────────────────────────────────────────
agent = new Agent({
getApiKey: () => getPiGatewayApiKeyOverride(),
streamFn: createTracedStreamFn(),
Comment thread
cursor[bot] marked this conversation as resolved.
initialState: {
systemPrompt: baseInstructions,
model: resolveGatewayModel(botConfig.modelId),
Expand Down
12 changes: 11 additions & 1 deletion packages/junior/src/chat/tools/slack/channel-list-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,23 @@ export function createSlackChannelListMessagesTool(
throw error;
}

return {
const summary = {
ok: true,
channel_id: targetChannelId,
count: result.messages.length,
next_cursor: result.nextCursor,
messages: result.messages,
};

return {
content: [{ type: "text" as const, text: JSON.stringify(summary) }],
details: {
ok: true,
channel_id: targetChannelId,
count: result.messages.length,
...(result.nextCursor ? { next_cursor: result.nextCursor } : {}),
},
};
Comment thread
cursor[bot] marked this conversation as resolved.
},
});
}
Loading
Loading