From 3537d2a19304cfe31f0c464ae3f1f8009898d594 Mon Sep 17 00:00:00 2001 From: Omri SirComp Date: Wed, 20 May 2026 22:10:56 +0300 Subject: [PATCH 1/4] fix(junior): keep resume diagnostics cumulative Track cumulative duration and token usage across paused turn checkpoints, then use those totals when rendering timeout-resume Slack footers. Timeout continuation notices also now use the normal Slack footer shape so the conversation ID stays traceable. Co-Authored-By: GPT-5 Codex --- packages/junior/src/chat/respond.ts | 41 ++++---- .../junior/src/chat/runtime/reply-executor.ts | 29 +++++- .../junior/src/chat/runtime/slack-resume.ts | 46 ++++++++- .../src/chat/services/turn-checkpoint.ts | 48 ++++++++++ .../src/chat/state/turn-session-store.ts | 51 ++++++++++ packages/junior/src/chat/usage.ts | 28 ++++++ .../tests/unit/handlers/oauth-resume.test.ts | 96 +++++++++++++++++++ .../unit/services/turn-checkpoint.test.ts | 51 ++++++++++ 8 files changed, 364 insertions(+), 26 deletions(-) diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index e40f9a54..42acf466 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -81,7 +81,7 @@ import { toAgentThinkingLevel, type TurnThinkingSelection, } from "@/chat/services/turn-thinking-level"; -import type { AgentTurnUsage } from "@/chat/usage"; +import { hasAgentTurnUsage, type AgentTurnUsage } from "@/chat/usage"; import { loadTurnCheckpoint, persistCompletedCheckpoint, @@ -185,6 +185,16 @@ function trimRouterAttachmentText(text: string): string { : `${normalized.slice(0, MAX_ROUTER_ATTACHMENT_PREVIEW_CHARS)}...`; } +function extractSliceUsage( + messages: PiMessage[], + beforeMessageCount: number, +): AgentTurnUsage | undefined { + const usage = extractGenAiUsageSummary( + ...messages.slice(beforeMessageCount).filter(isAssistantMessage), + ); + return hasAgentTurnUsage(usage) ? usage : undefined; +} + function supportsRouterTextPreview(mediaType: string): boolean { const baseMediaType = mediaType.split(";", 1)[0]?.trim().toLowerCase(); if (!baseMediaType) { @@ -1044,9 +1054,7 @@ export async function generateAssistantReply( agent.state, ...outputMessages, ); - turnUsage = Object.values(usageSummary).some( - (value) => value !== undefined, - ) + turnUsage = hasAgentTurnUsage(usageSummary) ? usageSummary : undefined; setSpanAttributes({ @@ -1082,6 +1090,8 @@ export async function generateAssistantReply( ) { await persistCompletedCheckpoint({ conversationId: sessionConversationId, + currentDurationMs: Date.now() - replyStartedAtMs, + currentUsage: turnUsage, sessionId, sliceId: currentSliceId, allMessages: agent.state.messages, @@ -1114,10 +1124,15 @@ export async function generateAssistantReply( }); } catch (error) { if (timedOut && timeoutResumeConversationId && timeoutResumeSessionId) { + turnUsage = + turnUsage ?? + extractSliceUsage(timeoutResumeMessages, beforeMessageCount); const checkpoint = await persistTimeoutCheckpoint({ conversationId: timeoutResumeConversationId, sessionId: timeoutResumeSessionId, currentSliceId: timeoutResumeSliceId, + currentDurationMs: Date.now() - replyStartedAtMs, + currentUsage: turnUsage, messages: timeoutResumeMessages, loadedSkillNames: loadedSkillNamesForResume, errorMessage: error instanceof Error ? error.message : String(error), @@ -1151,25 +1166,17 @@ export async function generateAssistantReply( timeoutResumeSessionId ) { if (!turnUsage && timeoutResumeMessages.length > 0) { - // Match the canonical slice-scoped extraction: sum usage from new - // assistant messages produced during this slice, not the full - // message history (which may include prior slices whose usage was - // already reported in earlier footers). - const fallbackUsage = extractGenAiUsageSummary( - ...timeoutResumeMessages - .slice(beforeMessageCount) - .filter(isAssistantMessage), + turnUsage = extractSliceUsage( + timeoutResumeMessages, + beforeMessageCount, ); - turnUsage = Object.values(fallbackUsage).some( - (value) => value !== undefined, - ) - ? fallbackUsage - : undefined; } const nextSliceId = await persistAuthPauseCheckpoint({ conversationId: timeoutResumeConversationId, sessionId: timeoutResumeSessionId, currentSliceId: timeoutResumeSliceId, + currentDurationMs: Date.now() - replyStartedAtMs, + currentUsage: turnUsage, messages: timeoutResumeMessages, loadedSkillNames: loadedSkillNamesForResume, errorMessage: error.message, diff --git a/packages/junior/src/chat/runtime/reply-executor.ts b/packages/junior/src/chat/runtime/reply-executor.ts index da874a80..eb27ec42 100644 --- a/packages/junior/src/chat/runtime/reply-executor.ts +++ b/packages/junior/src/chat/runtime/reply-executor.ts @@ -49,7 +49,10 @@ import { isVisionEnabled, } from "@/chat/services/vision-context"; import { createSlackAdapterAssistantStatusSession } from "@/chat/slack/assistant-thread/status"; -import { buildSlackReplyFooter } from "@/chat/slack/footer"; +import { + buildSlackReplyBlocks, + buildSlackReplyFooter, +} from "@/chat/slack/footer"; import { maybeUpdateAssistantTitle } from "@/chat/slack/assistant-thread/title"; import { appendSlackLegacyAttachmentText } from "@/chat/slack/legacy-attachments"; import { type ThreadArtifactsState } from "@/chat/state/artifacts"; @@ -61,7 +64,7 @@ import { buildDeterministicTurnId } from "@/chat/runtime/turn"; import { markTurnCompleted, markTurnFailed } from "@/chat/runtime/turn"; import { startActiveTurn } from "@/chat/runtime/turn"; import { isRedundantReactionAckText } from "@/chat/services/reply-delivery-plan"; -import { deleteSlackMessage } from "@/chat/slack/outbound"; +import { deleteSlackMessage, postSlackMessage } from "@/chat/slack/outbound"; import { finalizeFailedTurnReply, getAgentTurnDiagnosticsAttributes, @@ -232,9 +235,25 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { const postTurnContinuationNotice = async (): Promise => { try { await beforeFirstResponsePost(); - await thread.post( - buildSlackOutputMessage(buildTurnContinuationResponse()), - ); + const text = buildTurnContinuationResponse(); + const footer = buildSlackReplyFooter({ conversationId }); + const shouldUseSlackFooter = + Boolean(footer) && + Boolean(channelId && threadTs) && + (thread.adapter as { name?: string } | undefined)?.name === + "slack"; + if (shouldUseSlackFooter && channelId && threadTs) { + const blocks = buildSlackReplyBlocks(text, footer); + await postSlackMessage({ + channelId, + threadTs, + text, + ...(blocks ? { blocks } : {}), + }); + return; + } + + await thread.post(buildSlackOutputMessage(text)); } catch (error) { logException( error, diff --git a/packages/junior/src/chat/runtime/slack-resume.ts b/packages/junior/src/chat/runtime/slack-resume.ts index 27c29731..b7d5ba28 100644 --- a/packages/junior/src/chat/runtime/slack-resume.ts +++ b/packages/junior/src/chat/runtime/slack-resume.ts @@ -21,13 +21,18 @@ import { createSlackWebApiAssistantStatusSession, type AssistantStatusSession, } from "@/chat/slack/assistant-thread/status"; -import { buildSlackReplyFooter } from "@/chat/slack/footer"; +import { + buildSlackReplyBlocks, + buildSlackReplyFooter, +} from "@/chat/slack/footer"; import { planSlackReplyPosts, postSlackApiReplyPosts, } from "@/chat/slack/reply"; import { postSlackMessage as postSlackApiMessage } from "@/chat/slack/outbound"; import { getStateAdapter } from "@/chat/state/adapter"; +import { getAgentTurnSessionCheckpoint } from "@/chat/state/turn-session-store"; +import { addAgentTurnUsage } from "@/chat/usage"; import { startSlackProcessingReactionForMessage, type ProcessingReactionSession, @@ -172,11 +177,18 @@ async function postTurnContinuationNoticeBestEffort(args: { lockKey: string; resumeArgs: ResumeSlackTurnArgs; }): Promise { + const text = buildTurnContinuationResponse(); + const footer = buildSlackReplyFooter({ + conversationId: + args.resumeArgs.replyContext?.correlation?.conversationId ?? args.lockKey, + }); + const blocks = buildSlackReplyBlocks(text, footer); try { await postSlackApiMessage({ channelId: args.resumeArgs.channelId, threadTs: args.resumeArgs.threadTs, - text: buildTurnContinuationResponse(), + text, + ...(blocks ? { blocks } : {}), }); } catch (error) { logException( @@ -258,6 +270,19 @@ function createResumeReplyContext( }; } +async function getResumeCheckpoint(args: { + conversationId?: string; + sessionId?: string; +}) { + if (!args.conversationId || !args.sessionId) { + return undefined; + } + return await getAgentTurnSessionCheckpoint( + args.conversationId, + args.sessionId, + ); +} + /** * Resume a paused Slack turn under the normal thread lock. * @@ -311,6 +336,10 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) { const generateReply = args.generateReply ?? generateAssistantReply; const replyContext = createResumeReplyContext(args, status); + const priorCheckpoint = await getResumeCheckpoint({ + conversationId: replyContext.correlation?.conversationId, + sessionId: replyContext.correlation?.turnId, + }); const replyPromise = generateReply(args.messageText, replyContext); const replyTimeoutMs = resolveReplyTimeoutMs(args.replyTimeoutMs); let reply = @@ -339,9 +368,18 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) { await status.stop(); const footer = buildSlackReplyFooter({ conversationId: args.replyContext?.correlation?.conversationId ?? lockKey, - durationMs: reply.diagnostics.durationMs, + durationMs: + typeof priorCheckpoint?.cumulativeDurationMs === "number" || + typeof reply.diagnostics.durationMs === "number" + ? (priorCheckpoint?.cumulativeDurationMs ?? 0) + + (reply.diagnostics.durationMs ?? 0) + : undefined, thinkingLevel: reply.diagnostics.thinkingLevel, - usage: reply.diagnostics.usage, + usage: + addAgentTurnUsage( + priorCheckpoint?.cumulativeUsage, + reply.diagnostics.usage, + ) ?? reply.diagnostics.usage, }); await postSlackApiReplyPosts({ channelId: args.channelId, diff --git a/packages/junior/src/chat/services/turn-checkpoint.ts b/packages/junior/src/chat/services/turn-checkpoint.ts index 6b3f42d2..21e02581 100644 --- a/packages/junior/src/chat/services/turn-checkpoint.ts +++ b/packages/junior/src/chat/services/turn-checkpoint.ts @@ -6,6 +6,7 @@ import { import { logException } from "@/chat/logging"; import type { PiMessage } from "@/chat/pi/messages"; import { trimTrailingAssistantMessages } from "@/chat/respond-helpers"; +import { addAgentTurnUsage, type AgentTurnUsage } from "@/chat/usage"; export interface TurnCheckpointContext { conversationId?: string; @@ -19,6 +20,19 @@ export interface TurnCheckpointState { existingCheckpoint?: AgentTurnSessionCheckpoint; } +function addDurationMs( + prior: number | undefined, + current: number | undefined, +): number | undefined { + const total = [prior, current].reduce((sum, value) => { + if (typeof value !== "number" || !Number.isFinite(value)) { + return sum; + } + return (sum ?? 0) + Math.max(0, Math.floor(value)); + }, undefined); + return total; +} + /** Load turn checkpoint state for a conversation/session pair. */ export async function loadTurnCheckpoint( ctx: TurnCheckpointContext, @@ -46,13 +60,27 @@ export async function loadTurnCheckpoint( /** Persist a completed turn checkpoint. */ export async function persistCompletedCheckpoint(args: { conversationId: string; + currentDurationMs?: number; + currentUsage?: AgentTurnUsage; sessionId: string; sliceId: number; allMessages: PiMessage[]; loadedSkillNames: string[]; }): Promise { + const latestCheckpoint = await getAgentTurnSessionCheckpoint( + args.conversationId, + args.sessionId, + ); await upsertAgentTurnSessionCheckpoint({ conversationId: args.conversationId, + cumulativeDurationMs: addDurationMs( + latestCheckpoint?.cumulativeDurationMs, + args.currentDurationMs, + ), + cumulativeUsage: addAgentTurnUsage( + latestCheckpoint?.cumulativeUsage, + args.currentUsage, + ), sessionId: args.sessionId, sliceId: args.sliceId, state: "completed", @@ -69,6 +97,8 @@ export async function persistAuthPauseCheckpoint(args: { conversationId: string; sessionId: string; currentSliceId: number; + currentDurationMs?: number; + currentUsage?: AgentTurnUsage; messages: PiMessage[]; loadedSkillNames: string[]; errorMessage: string; @@ -94,6 +124,14 @@ export async function persistAuthPauseCheckpoint(args: { ); await upsertAgentTurnSessionCheckpoint({ conversationId: args.conversationId, + cumulativeDurationMs: addDurationMs( + latestCheckpoint?.cumulativeDurationMs, + args.currentDurationMs, + ), + cumulativeUsage: addAgentTurnUsage( + latestCheckpoint?.cumulativeUsage, + args.currentUsage, + ), sessionId: args.sessionId, sliceId: nextSliceId, state: "awaiting_resume", @@ -135,6 +173,8 @@ export async function persistTimeoutCheckpoint(args: { conversationId: string; sessionId: string; currentSliceId: number; + currentDurationMs?: number; + currentUsage?: AgentTurnUsage; messages: PiMessage[]; loadedSkillNames: string[]; errorMessage: string; @@ -161,6 +201,14 @@ export async function persistTimeoutCheckpoint(args: { ); return await upsertAgentTurnSessionCheckpoint({ conversationId: args.conversationId, + cumulativeDurationMs: addDurationMs( + latestCheckpoint?.cumulativeDurationMs, + args.currentDurationMs, + ), + cumulativeUsage: addAgentTurnUsage( + latestCheckpoint?.cumulativeUsage, + args.currentUsage, + ), sessionId: args.sessionId, sliceId: nextSliceId, state: "awaiting_resume", diff --git a/packages/junior/src/chat/state/turn-session-store.ts b/packages/junior/src/chat/state/turn-session-store.ts index 11f87eec..e936fd8d 100644 --- a/packages/junior/src/chat/state/turn-session-store.ts +++ b/packages/junior/src/chat/state/turn-session-store.ts @@ -1,5 +1,6 @@ import { isRecord } from "@/chat/coerce"; import type { PiMessage } from "@/chat/pi/messages"; +import type { AgentTurnUsage } from "@/chat/usage"; import { getStateAdapter } from "./adapter"; const AGENT_TURN_SESSION_PREFIX = "junior:agent_turn_session"; @@ -17,6 +18,8 @@ export type AgentTurnResumeReason = "timeout" | "auth"; export interface AgentTurnSessionCheckpoint { checkpointVersion: number; conversationId: string; + cumulativeDurationMs?: number; + cumulativeUsage?: AgentTurnUsage; errorMessage?: string; loadedSkillNames?: string[]; piMessages: PiMessage[]; @@ -35,6 +38,34 @@ function agentTurnSessionKey( return `${AGENT_TURN_SESSION_PREFIX}:${conversationId}:${sessionId}`; } +function toFiniteNonNegativeNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) + ? Math.max(0, Math.floor(value)) + : undefined; +} + +function parseAgentTurnUsage(value: unknown): AgentTurnUsage | undefined { + if (!isRecord(value)) { + return undefined; + } + + const usage: AgentTurnUsage = {}; + for (const field of [ + "inputTokens", + "outputTokens", + "cachedInputTokens", + "cacheCreationTokens", + "totalTokens", + ] as const) { + const count = toFiniteNonNegativeNumber(value[field]); + if (count !== undefined) { + usage[field] = count; + } + } + + return Object.keys(usage).length > 0 ? usage : undefined; +} + function parseAgentTurnSessionCheckpoint( value: unknown, ): AgentTurnSessionCheckpoint | undefined { @@ -64,6 +95,10 @@ function parseAgentTurnSessionCheckpoint( const sliceId = parsed.sliceId; const checkpointVersion = parsed.checkpointVersion; const updatedAtMs = parsed.updatedAtMs; + const cumulativeDurationMs = toFiniteNonNegativeNumber( + parsed.cumulativeDurationMs, + ); + const cumulativeUsage = parseAgentTurnUsage(parsed.cumulativeUsage); if ( typeof conversationId !== "string" || typeof sessionId !== "string" || @@ -81,6 +116,8 @@ function parseAgentTurnSessionCheckpoint( sliceId, state: status, updatedAtMs, + ...(cumulativeDurationMs !== undefined ? { cumulativeDurationMs } : {}), + ...(cumulativeUsage ? { cumulativeUsage } : {}), piMessages: Array.isArray(parsed.piMessages) ? (parsed.piMessages as PiMessage[]) : [], @@ -120,6 +157,8 @@ export async function getAgentTurnSessionCheckpoint( export async function upsertAgentTurnSessionCheckpoint(args: { conversationId: string; + cumulativeDurationMs?: number; + cumulativeUsage?: AgentTurnUsage; sessionId: string; sliceId: number; state: AgentTurnSessionStatus; @@ -145,6 +184,16 @@ export async function upsertAgentTurnSessionCheckpoint(args: { state: args.state, updatedAtMs: Date.now(), piMessages: Array.isArray(args.piMessages) ? args.piMessages : [], + ...(typeof args.cumulativeDurationMs === "number" && + Number.isFinite(args.cumulativeDurationMs) + ? { + cumulativeDurationMs: Math.max( + 0, + Math.floor(args.cumulativeDurationMs), + ), + } + : {}), + ...(args.cumulativeUsage ? { cumulativeUsage: args.cumulativeUsage } : {}), ...(Array.isArray(args.loadedSkillNames) ? { loadedSkillNames: args.loadedSkillNames.filter( @@ -192,6 +241,8 @@ export async function supersedeAgentTurnSessionCheckpoint(args: { sliceId: existing.sliceId, state: "superseded", piMessages: existing.piMessages, + cumulativeDurationMs: existing.cumulativeDurationMs, + cumulativeUsage: existing.cumulativeUsage, loadedSkillNames: existing.loadedSkillNames, resumeReason: existing.resumeReason, resumedFromSliceId: existing.resumedFromSliceId, diff --git a/packages/junior/src/chat/usage.ts b/packages/junior/src/chat/usage.ts index c707dacb..53448d2a 100644 --- a/packages/junior/src/chat/usage.ts +++ b/packages/junior/src/chat/usage.ts @@ -18,3 +18,31 @@ export interface AgentTurnUsage { /** Provider-reported total. May not equal the sum of individual counters across providers. */ totalTokens?: number; } + +/** Return whether any token counter is present on a usage record. */ +export function hasAgentTurnUsage( + usage: AgentTurnUsage | undefined, +): usage is AgentTurnUsage { + return Boolean( + usage && + Object.values(usage).some( + (value) => typeof value === "number" && Number.isFinite(value), + ), + ); +} + +/** Sum token counters across turn slices while preserving absent fields. */ +export function addAgentTurnUsage( + ...usages: Array +): AgentTurnUsage | undefined { + const total: AgentTurnUsage = {}; + for (const usage of usages) { + if (!usage) continue; + for (const field of Object.keys(usage) as (keyof AgentTurnUsage)[]) { + const value = usage[field]; + if (typeof value !== "number" || !Number.isFinite(value)) continue; + total[field] = (total[field] ?? 0) + value; + } + } + return hasAgentTurnUsage(total) ? total : undefined; +} diff --git a/packages/junior/tests/unit/handlers/oauth-resume.test.ts b/packages/junior/tests/unit/handlers/oauth-resume.test.ts index 39350f08..bf0a3223 100644 --- a/packages/junior/tests/unit/handlers/oauth-resume.test.ts +++ b/packages/junior/tests/unit/handlers/oauth-resume.test.ts @@ -182,6 +182,21 @@ describe("resumeAuthorizedRequest", () => { expect(onTimeoutPause).toHaveBeenCalledTimes(1); expect(postMessageMock).toHaveBeenCalledWith( expect.objectContaining({ + blocks: [ + { + type: "markdown", + text: buildTurnContinuationResponse(), + }, + { + type: "context", + elements: [ + { + type: "mrkdwn", + text: "*ID:* slack:C-test:1700000000.0002", + }, + ], + }, + ], channel: "C-test", thread_ts: "1700000000.0002", text: buildTurnContinuationResponse(), @@ -189,6 +204,87 @@ describe("resumeAuthorizedRequest", () => { ); }); + it("uses cumulative checkpoint diagnostics for timeout resume footers", async () => { + const { upsertAgentTurnSessionCheckpoint } = + await import("@/chat/state/turn-session-store"); + + await upsertAgentTurnSessionCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 2, + state: "awaiting_resume", + piMessages: [], + resumeReason: "timeout", + cumulativeDurationMs: 1_000, + cumulativeUsage: { + inputTokens: 2, + outputTokens: 3, + }, + }); + + await resumeSlackTurn({ + messageText: "continue this turn", + channelId: "C-test", + threadTs: "1700000000.0005", + lockKey: "slack:C-test:1700000000.0005", + replyContext: { + requester: { userId: "U-test" }, + correlation: { + conversationId: "conversation-1", + turnId: "turn-1", + }, + }, + generateReply: async () => + ({ + text: "done", + diagnostics: { + assistantMessageCount: 1, + durationMs: 500, + modelId: "fake-agent-model", + outcome: "success", + toolCalls: [], + toolErrorCount: 0, + toolResultCount: 0, + usage: { + outputTokens: 7, + }, + usedPrimaryText: true, + }, + }) as any, + }); + + expect(postMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "C-test", + thread_ts: "1700000000.0005", + text: "done", + blocks: [ + { + type: "markdown", + text: "done", + }, + { + type: "context", + elements: expect.arrayContaining([ + { + type: "mrkdwn", + text: "*ID:* conversation-1", + }, + { + type: "mrkdwn", + text: "*Tokens:* 12", + }, + { + type: "mrkdwn", + text: "*Time:* 1.5s", + }, + ]), + }, + ], + }), + ); + }); + it("posts the canonical failure response when timeout pause handling throws", async () => { const onFailure = vi.fn(async () => undefined); diff --git a/packages/junior/tests/unit/services/turn-checkpoint.test.ts b/packages/junior/tests/unit/services/turn-checkpoint.test.ts index 268fc253..238da5cd 100644 --- a/packages/junior/tests/unit/services/turn-checkpoint.test.ts +++ b/packages/junior/tests/unit/services/turn-checkpoint.test.ts @@ -97,4 +97,55 @@ describe("persistAuthPauseCheckpoint", () => { piMessages: [priorMessages[0]], }); }); + + it("carries cumulative diagnostics across pause checkpoints", async () => { + const { persistTimeoutCheckpoint } = + await import("@/chat/services/turn-checkpoint"); + const { getAgentTurnSessionCheckpoint, upsertAgentTurnSessionCheckpoint } = + await import("@/chat/state/turn-session-store"); + + await upsertAgentTurnSessionCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + state: "awaiting_resume", + piMessages: [], + resumeReason: "timeout", + cumulativeDurationMs: 1_500, + cumulativeUsage: { + inputTokens: 10, + outputTokens: 3, + }, + }); + + await persistTimeoutCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + currentSliceId: 1, + currentDurationMs: 2_250, + currentUsage: { + outputTokens: 7, + cachedInputTokens: 2, + }, + messages: [], + loadedSkillNames: [], + errorMessage: "timed out again", + logContext: { + modelId: "test-model", + }, + }); + + const checkpoint = await getAgentTurnSessionCheckpoint( + "conversation-1", + "turn-1", + ); + expect(checkpoint).toMatchObject({ + cumulativeDurationMs: 3_750, + cumulativeUsage: { + inputTokens: 10, + outputTokens: 10, + cachedInputTokens: 2, + }, + }); + }); }); From 9423a749ba45e86a6e3ec97db11bdaeea43aef05 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Wed, 20 May 2026 20:23:50 -0700 Subject: [PATCH 2/4] fix(junior): Recover from interrupted continuation work Persist safe running checkpoints at Pi-continuable boundaries so interrupted turns retain a previous resumable state without checkpointing mid-tool-call. Treat interrupted sandbox command streams as failed bash results that the agent can inspect or retry, and add eval fault injection to prove the recovery path. Supersedes GH-385 Co-Authored-By: GPT-5 Codex --- packages/junior-evals/README.md | 1 + .../junior-evals/evals/behavior-harness.ts | 17 +++ .../core/lifecycle-and-resilience.eval.ts | 33 +++++ .../resilient-working-directory/SKILL.md | 28 ++++ packages/junior/src/chat/prompt.ts | 2 +- packages/junior/src/chat/respond.ts | 48 ++++++- .../junior/src/chat/runtime/reply-executor.ts | 18 +-- .../junior/src/chat/runtime/slack-resume.ts | 14 +- packages/junior/src/chat/sandbox/errors.ts | 18 +++ .../src/chat/sandbox/fault-injection.ts | 22 +++ packages/junior/src/chat/sandbox/session.ts | 26 ++++ .../src/chat/services/turn-checkpoint.ts | 67 ++++++++- .../chat/slack/turn-continuation-notice.ts | 24 ++++ packages/junior/src/chat/usage.ts | 57 +++++++- .../junior/src/handlers/oauth-callback.ts | 2 + .../integration/oauth-resume-slack.test.ts | 77 ++++++++++ .../integration/slack/bot-handlers.test.ts | 77 ++++++++++ .../integration/turn-resume-slack.test.ts | 119 ++++++++++++++++ .../tests/unit/handlers/oauth-resume.test.ts | 98 ------------- .../tests/unit/misc/sandbox-executor.test.ts | 83 +++++++++++ .../unit/services/turn-checkpoint.test.ts | 132 ++++++++++++++++++ packages/junior/tests/unit/usage.test.ts | 29 ++++ specs/slack-agent-delivery-spec.md | 6 +- 23 files changed, 862 insertions(+), 136 deletions(-) create mode 100644 packages/junior-evals/evals/fixtures/skills/resilient-working-directory/SKILL.md create mode 100644 packages/junior/src/chat/sandbox/fault-injection.ts create mode 100644 packages/junior/src/chat/slack/turn-continuation-notice.ts create mode 100644 packages/junior/tests/unit/usage.test.ts diff --git a/packages/junior-evals/README.md b/packages/junior-evals/README.md index f0b900ed..e31cb5f0 100644 --- a/packages/junior-evals/README.md +++ b/packages/junior-evals/README.md @@ -67,6 +67,7 @@ Harness override knobs (in `EvalOverrides`): - `auto_complete_mcp_oauth`: after our app genuinely starts an MCP OAuth flow for the listed providers, the harness immediately completes the fake provider callback. - `auto_complete_oauth`: after our app genuinely starts a generic OAuth flow for the listed providers, the harness immediately completes the fake provider callback. - `fail_reply_call`: force a non-retryable reply failure on a specific call. +- `faults.sandbox_bash_stream_interrupts`: inject a fixed number of eval-only sandbox bash stream interruptions so the real agent must recover from failed command results. - `mock_image_generation`: stub the image-generation HTTP response with a valid image payload while still exercising the real attachment path. - `plugin_dirs`: load plugin fixtures from eval-local directories without adding workspace packages. - `reply_texts`: override returned reply text per call. diff --git a/packages/junior-evals/evals/behavior-harness.ts b/packages/junior-evals/evals/behavior-harness.ts index e5b6625c..0e8e2478 100644 --- a/packages/junior-evals/evals/behavior-harness.ts +++ b/packages/junior-evals/evals/behavior-harness.ts @@ -130,6 +130,9 @@ export interface EvalOverrides { auto_complete_oauth?: string[]; enable_test_credentials?: boolean; fail_reply_call?: number; + faults?: { + sandbox_bash_stream_interrupts?: number; + }; mock_image_generation?: boolean; plugin_dirs?: string[]; plugin_packages?: string[]; @@ -397,6 +400,8 @@ const HARNESS_ENV_KEYS = [ "EVAL_TEST_CREDENTIAL_TOKEN", "JUNIOR_BASE_URL", "JUNIOR_EXTRA_PLUGIN_ROOTS", + "JUNIOR_EVAL_ENABLE_FAULTS", + "JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS", "JUNIOR_STATE_ADAPTER", "SLACK_BOT_TOKEN", ] as const; @@ -942,6 +947,18 @@ async function setupHarnessEnvironment( scenario.overrides.test_credential_token; } } + const sandboxBashStreamInterrupts = + scenario.overrides?.faults?.sandbox_bash_stream_interrupts; + if ( + typeof sandboxBashStreamInterrupts === "number" && + Number.isFinite(sandboxBashStreamInterrupts) && + sandboxBashStreamInterrupts > 0 + ) { + process.env.JUNIOR_EVAL_ENABLE_FAULTS = "1"; + process.env.JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS = String( + Math.floor(sandboxBashStreamInterrupts), + ); + } process.env.JUNIOR_BASE_URL = "https://junior.example.com"; process.env.JUNIOR_STATE_ADAPTER = "memory"; process.env.JUNIOR_EXTRA_PLUGIN_ROOTS = JSON.stringify(configuredPluginDirs); diff --git a/packages/junior-evals/evals/core/lifecycle-and-resilience.eval.ts b/packages/junior-evals/evals/core/lifecycle-and-resilience.eval.ts index f30a9b56..f46f9ce3 100644 --- a/packages/junior-evals/evals/core/lifecycle-and-resilience.eval.ts +++ b/packages/junior-evals/evals/core/lifecycle-and-resilience.eval.ts @@ -68,4 +68,37 @@ describeEval("Lifecycle and Resilience", slackEvals, (it) => { }), }); }); + + it("when a sandbox command stream is interrupted, recover and finish the request", async ({ + run, + }) => { + await run({ + overrides: { + faults: { + sandbox_bash_stream_interrupts: 1, + }, + skill_dirs: ["evals/fixtures/skills"], + }, + events: [ + mention( + "/resilient-working-directory list files in the working directory", + ), + ], + taskTimeout: 120_000, + criteria: rubric({ + contract: + "A transient sandbox command-stream interruption is treated as recoverable tool output, not a terminal assistant failure.", + pass: [ + "observed_tool_invocations includes at least two `bash` calls, showing the agent retried after the injected interruption.", + "assistant_posts contains exactly one final reply.", + "The reply includes `Working directory files:` and a fenced list of files from the successful retry.", + ], + fail: [ + "Do not post a generic assistant failure reply.", + "Do not stop after reporting only the injected stream interruption.", + "Do not mention Sentry event IDs, stack traces, or provider internals.", + ], + }), + }); + }); }); diff --git a/packages/junior-evals/evals/fixtures/skills/resilient-working-directory/SKILL.md b/packages/junior-evals/evals/fixtures/skills/resilient-working-directory/SKILL.md new file mode 100644 index 00000000..830955c0 --- /dev/null +++ b/packages/junior-evals/evals/fixtures/skills/resilient-working-directory/SKILL.md @@ -0,0 +1,28 @@ +--- +name: resilient-working-directory +description: Use for /resilient-working-directory eval requests that verify command interruption recovery. +allowed-tools: bash +--- + +Generate a short response for `/resilient-working-directory` requests in eval runs. + +## Step 1: List Files + +Call `bash` with this input: + +```json +{ "command": "ls -1", "timeout_ms": 120000, "max_output_chars": 12000 } +``` + +## Step 2: Recover Once + +If the command result has `ok: false` and `stderr` says the command stream ended before the command finished, call the same `bash` command one more time. + +## Step 3: Return Result + +- If the final command result has `ok: true`, return markdown with: + - `Working directory files:` + - a fenced code block containing `stdout` +- If the final command result has `ok: false`, return markdown with: + - `Working directory files: unavailable` + - `Error:` and `stderr` diff --git a/packages/junior/src/chat/prompt.ts b/packages/junior/src/chat/prompt.ts index f3c54c2c..3be8d953 100644 --- a/packages/junior/src/chat/prompt.ts +++ b/packages/junior/src/chat/prompt.ts @@ -438,7 +438,7 @@ const EXECUTION_CONTRACT_RULES = [ const CONVERSATION_RULES = [ "- In thread follow-ups, answer from prior thread context; do not repeat resolved clarifying questions.", "- Preserve attribution roles from thread context: the requester is the person asking now, which may differ from the original reporter or subject.", - "- On resumed turns, post a brief continuation notice, then the resumed answer as a separate message.", + "- Runtime owns continuation and authorization notices; on resumed turns, answer with the final requested content only.", ]; const SLACK_ACTION_RULES = [ diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index 42acf466..18f7668a 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -86,6 +86,7 @@ import { loadTurnCheckpoint, persistCompletedCheckpoint, persistAuthPauseCheckpoint, + persistRunningCheckpoint, persistTimeoutCheckpoint, } from "@/chat/services/turn-checkpoint"; import { createMcpAuthOrchestration } from "@/chat/services/mcp-auth-orchestration"; @@ -927,8 +928,38 @@ export async function generateAssistantReply( }); let hasEmittedText = false; let needsSeparator = false; + const persistSafeBoundary = async ( + messages: PiMessage[], + ): Promise => { + if ( + !checkpointState.canUseTurnSession || + !sessionConversationId || + !sessionId + ) { + return; + } + + await persistRunningCheckpoint({ + conversationId: sessionConversationId, + sessionId, + sliceId: currentSliceId, + messages, + loadedSkillNames: loadedSkillNamesForResume, + logContext: { + threadId: context.correlation?.threadId, + requesterId: context.correlation?.requesterId, + channelId: context.correlation?.channelId, + runId: context.correlation?.runId, + assistantUserName: botConfig.userName, + modelId: botConfig.modelId, + }, + }); + }; const unsubscribe = agent.subscribe((event) => { + if (event.type === "turn_end" && event.toolResults.length > 0) { + return persistSafeBoundary([...agent!.state.messages]); + } if (event.type === "message_start") { Promise.resolve(context.onAssistantMessageStart?.()).catch((error) => { logWarn( @@ -987,13 +1018,20 @@ export async function generateAssistantReply( spanContext, async () => { let promptResult: unknown; + const freshPromptMessage: PiMessage = { + role: "user", + content: promptContentParts, + timestamp: Date.now(), + } as PiMessage; + if (!resumedFromCheckpoint) { + await persistSafeBoundary([ + ...agent.state.messages, + freshPromptMessage, + ]); + } const promptPromise = resumedFromCheckpoint ? agent.continue() - : agent.prompt({ - role: "user", - content: promptContentParts, - timestamp: Date.now(), - }); + : agent.prompt(freshPromptMessage); let timeoutId: ReturnType | undefined; const timeoutPromise = new Promise((_, reject) => { diff --git a/packages/junior/src/chat/runtime/reply-executor.ts b/packages/junior/src/chat/runtime/reply-executor.ts index eb27ec42..57455bd5 100644 --- a/packages/junior/src/chat/runtime/reply-executor.ts +++ b/packages/junior/src/chat/runtime/reply-executor.ts @@ -49,10 +49,7 @@ import { isVisionEnabled, } from "@/chat/services/vision-context"; import { createSlackAdapterAssistantStatusSession } from "@/chat/slack/assistant-thread/status"; -import { - buildSlackReplyBlocks, - buildSlackReplyFooter, -} from "@/chat/slack/footer"; +import { buildSlackReplyFooter } from "@/chat/slack/footer"; import { maybeUpdateAssistantTitle } from "@/chat/slack/assistant-thread/title"; import { appendSlackLegacyAttachmentText } from "@/chat/slack/legacy-attachments"; import { type ThreadArtifactsState } from "@/chat/state/artifacts"; @@ -69,7 +66,7 @@ import { finalizeFailedTurnReply, getAgentTurnDiagnosticsAttributes, } from "@/chat/services/turn-failure-response"; -import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; +import { buildSlackTurnContinuationNotice } from "@/chat/slack/turn-continuation-notice"; import { buildAuthPauseResponse } from "@/chat/services/auth-pause-response"; import { maybeApplyProviderDefaultConfigRequest } from "@/chat/services/provider-default-config"; @@ -235,25 +232,22 @@ export function createReplyToThread(deps: ReplyExecutorDeps) { const postTurnContinuationNotice = async (): Promise => { try { await beforeFirstResponsePost(); - const text = buildTurnContinuationResponse(); - const footer = buildSlackReplyFooter({ conversationId }); + const notice = buildSlackTurnContinuationNotice({ conversationId }); const shouldUseSlackFooter = - Boolean(footer) && + Boolean(notice.blocks?.length) && Boolean(channelId && threadTs) && (thread.adapter as { name?: string } | undefined)?.name === "slack"; if (shouldUseSlackFooter && channelId && threadTs) { - const blocks = buildSlackReplyBlocks(text, footer); await postSlackMessage({ channelId, threadTs, - text, - ...(blocks ? { blocks } : {}), + ...notice, }); return; } - await thread.post(buildSlackOutputMessage(text)); + await thread.post(buildSlackOutputMessage(notice.text)); } catch (error) { logException( error, diff --git a/packages/junior/src/chat/runtime/slack-resume.ts b/packages/junior/src/chat/runtime/slack-resume.ts index b7d5ba28..ec5632a1 100644 --- a/packages/junior/src/chat/runtime/slack-resume.ts +++ b/packages/junior/src/chat/runtime/slack-resume.ts @@ -15,21 +15,18 @@ import { finalizeFailedTurnReply, requireTurnFailureEventId, } from "@/chat/services/turn-failure-response"; -import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; import { persistThreadStateById } from "@/chat/runtime/thread-state"; import { createSlackWebApiAssistantStatusSession, type AssistantStatusSession, } from "@/chat/slack/assistant-thread/status"; -import { - buildSlackReplyBlocks, - buildSlackReplyFooter, -} from "@/chat/slack/footer"; +import { buildSlackReplyFooter } from "@/chat/slack/footer"; import { planSlackReplyPosts, postSlackApiReplyPosts, } from "@/chat/slack/reply"; import { postSlackMessage as postSlackApiMessage } from "@/chat/slack/outbound"; +import { buildSlackTurnContinuationNotice } from "@/chat/slack/turn-continuation-notice"; import { getStateAdapter } from "@/chat/state/adapter"; import { getAgentTurnSessionCheckpoint } from "@/chat/state/turn-session-store"; import { addAgentTurnUsage } from "@/chat/usage"; @@ -177,18 +174,15 @@ async function postTurnContinuationNoticeBestEffort(args: { lockKey: string; resumeArgs: ResumeSlackTurnArgs; }): Promise { - const text = buildTurnContinuationResponse(); - const footer = buildSlackReplyFooter({ + const notice = buildSlackTurnContinuationNotice({ conversationId: args.resumeArgs.replyContext?.correlation?.conversationId ?? args.lockKey, }); - const blocks = buildSlackReplyBlocks(text, footer); try { await postSlackApiMessage({ channelId: args.resumeArgs.channelId, threadTs: args.resumeArgs.threadTs, - text, - ...(blocks ? { blocks } : {}), + ...notice, }); } catch (error) { logException( diff --git a/packages/junior/src/chat/sandbox/errors.ts b/packages/junior/src/chat/sandbox/errors.ts index 15c22143..d32a7f0e 100644 --- a/packages/junior/src/chat/sandbox/errors.ts +++ b/packages/junior/src/chat/sandbox/errors.ts @@ -95,6 +95,24 @@ export function isSnapshottingError(error: unknown): boolean { }); } +/** Detect interrupted command streams where no reliable exit status is available. */ +export function isSandboxCommandStreamInterruptedError( + error: unknown, +): boolean { + return findInErrorChain(error, (candidate) => { + if (!(candidate instanceof Error)) { + return false; + } + + return ( + candidate.name === "StreamError" && + candidate.message + .toLowerCase() + .includes("stream ended before command finished") + ); + }); +} + /** Wrap raw sandbox setup failures into one stable user-facing error contract. */ export function wrapSandboxSetupError(error: unknown): Error { try { diff --git a/packages/junior/src/chat/sandbox/fault-injection.ts b/packages/junior/src/chat/sandbox/fault-injection.ts new file mode 100644 index 00000000..754c5eb4 --- /dev/null +++ b/packages/junior/src/chat/sandbox/fault-injection.ts @@ -0,0 +1,22 @@ +const STREAM_INTERRUPT_FAULT_ENV = + "JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS"; + +/** Consume one eval-only sandbox bash stream interruption fault. */ +export function consumeSandboxBashStreamInterruptFault(): Error | undefined { + if (process.env.JUNIOR_EVAL_ENABLE_FAULTS !== "1") { + return undefined; + } + + const remaining = Number.parseInt( + process.env[STREAM_INTERRUPT_FAULT_ENV] ?? "0", + 10, + ); + if (!Number.isFinite(remaining) || remaining <= 0) { + return undefined; + } + + process.env[STREAM_INTERRUPT_FAULT_ENV] = String(remaining - 1); + return Object.assign(new Error("Stream ended before command finished"), { + name: "StreamError", + }); +} diff --git a/packages/junior/src/chat/sandbox/session.ts b/packages/junior/src/chat/sandbox/session.ts index 12799db8..705d3d6a 100644 --- a/packages/junior/src/chat/sandbox/session.ts +++ b/packages/junior/src/chat/sandbox/session.ts @@ -10,10 +10,12 @@ import { import { getVercelSandboxCredentials } from "@/chat/sandbox/credentials"; import { isAlreadyExistsError, + isSandboxCommandStreamInterruptedError, isSandboxUnavailableError, isSnapshottingError, wrapSandboxSetupError, } from "@/chat/sandbox/errors"; +import { consumeSandboxBashStreamInterruptFault } from "@/chat/sandbox/fault-injection"; import { buildNonInteractiveShellScript } from "@/chat/sandbox/noninteractive-command"; import { SANDBOX_WORKSPACE_ROOT } from "@/chat/sandbox/paths"; import { @@ -138,6 +140,23 @@ function parseKeepAliveMs(): number { return Number.isFinite(parsed) && parsed > 0 ? parsed : 0; } +function getCommandStreamInterruptedResult(): { + stdout: string; + stderr: string; + exitCode: number; + stdoutTruncated: boolean; + stderrTruncated: boolean; +} { + return { + stdout: "", + stderr: + "Command stream ended before the command finished. The command may still have produced side effects; inspect the workspace or rerun only if it is safe.", + exitCode: 125, + stdoutTruncated: false, + stderrTruncated: false, + }; +} + /** Manage sandbox lifecycle, sync, keepalive, and tool executor caching for one executor instance. */ export function createSandboxSessionManager(options?: { sandboxId?: string; @@ -683,6 +702,10 @@ export function createSandboxSessionManager(options?: { controller.abort(); }, input.timeoutMs) : undefined; + const streamInterruptFault = consumeSandboxBashStreamInterruptFault(); + if (streamInterruptFault) { + throw streamInterruptFault; + } const commandResult = await sandboxInstance.runCommand({ cmd: "bash", args: ["-c", script], @@ -702,6 +725,9 @@ export function createSandboxSessionManager(options?: { timedOut: true, }; } + if (isSandboxCommandStreamInterruptedError(error)) { + return getCommandStreamInterruptedResult(); + } throw error; } finally { if (timeoutId) { diff --git a/packages/junior/src/chat/services/turn-checkpoint.ts b/packages/junior/src/chat/services/turn-checkpoint.ts index 21e02581..56e2f59f 100644 --- a/packages/junior/src/chat/services/turn-checkpoint.ts +++ b/packages/junior/src/chat/services/turn-checkpoint.ts @@ -5,7 +5,10 @@ import { } from "@/chat/state/turn-session-store"; import { logException } from "@/chat/logging"; import type { PiMessage } from "@/chat/pi/messages"; -import { trimTrailingAssistantMessages } from "@/chat/respond-helpers"; +import { + getPiMessageRole, + trimTrailingAssistantMessages, +} from "@/chat/respond-helpers"; import { addAgentTurnUsage, type AgentTurnUsage } from "@/chat/usage"; export interface TurnCheckpointContext { @@ -33,6 +36,11 @@ function addDurationMs( return total; } +function isContinuableBoundary(messages: PiMessage[]): boolean { + const lastRole = getPiMessageRole(messages.at(-1)); + return lastRole === "user" || lastRole === "toolResult"; +} + /** Load turn checkpoint state for a conversation/session pair. */ export async function loadTurnCheckpoint( ctx: TurnCheckpointContext, @@ -57,6 +65,63 @@ export async function loadTurnCheckpoint( }; } +/** Persist the latest safe in-progress boundary without scheduling continuation. */ +export async function persistRunningCheckpoint(args: { + conversationId: string; + sessionId: string; + sliceId: number; + messages: PiMessage[]; + loadedSkillNames: string[]; + logContext: { + threadId?: string; + requesterId?: string; + channelId?: string; + runId?: string; + assistantUserName?: string; + modelId: string; + }; +}): Promise { + if (args.messages.length === 0 || !isContinuableBoundary(args.messages)) { + return; + } + + try { + const latestCheckpoint = await getAgentTurnSessionCheckpoint( + args.conversationId, + args.sessionId, + ); + await upsertAgentTurnSessionCheckpoint({ + conversationId: args.conversationId, + cumulativeDurationMs: latestCheckpoint?.cumulativeDurationMs, + cumulativeUsage: latestCheckpoint?.cumulativeUsage, + sessionId: args.sessionId, + sliceId: args.sliceId, + state: "running", + piMessages: args.messages, + loadedSkillNames: args.loadedSkillNames, + }); + } catch (checkpointError) { + logException( + checkpointError, + "agent_turn_running_checkpoint_failed", + { + slackThreadId: args.logContext.threadId, + slackUserId: args.logContext.requesterId, + slackChannelId: args.logContext.channelId, + runId: args.logContext.runId, + assistantUserName: args.logContext.assistantUserName, + modelId: args.logContext.modelId, + }, + { + "app.ai.resume_conversation_id": args.conversationId, + "app.ai.resume_session_id": args.sessionId, + "app.ai.resume_slice_id": args.sliceId, + }, + "Failed to persist running turn checkpoint", + ); + } +} + /** Persist a completed turn checkpoint. */ export async function persistCompletedCheckpoint(args: { conversationId: string; diff --git a/packages/junior/src/chat/slack/turn-continuation-notice.ts b/packages/junior/src/chat/slack/turn-continuation-notice.ts new file mode 100644 index 00000000..53329380 --- /dev/null +++ b/packages/junior/src/chat/slack/turn-continuation-notice.ts @@ -0,0 +1,24 @@ +import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; +import { + buildSlackReplyBlocks, + buildSlackReplyFooter, + type SlackMessageBlock, +} from "@/chat/slack/footer"; + +/** Build the Slack timeout-continuation acknowledgement with correlation-only metadata. */ +export function buildSlackTurnContinuationNotice(args: { + conversationId?: string; +}): { + blocks?: SlackMessageBlock[]; + text: string; +} { + const text = buildTurnContinuationResponse(); + const footer = buildSlackReplyFooter({ + conversationId: args.conversationId, + }); + const blocks = footer ? buildSlackReplyBlocks(text, footer) : undefined; + return { + text, + ...(blocks ? { blocks } : {}), + }; +} diff --git a/packages/junior/src/chat/usage.ts b/packages/junior/src/chat/usage.ts index 53448d2a..971d4fe3 100644 --- a/packages/junior/src/chat/usage.ts +++ b/packages/junior/src/chat/usage.ts @@ -19,6 +19,13 @@ export interface AgentTurnUsage { totalTokens?: number; } +const COMPONENT_USAGE_FIELDS = [ + "inputTokens", + "outputTokens", + "cachedInputTokens", + "cacheCreationTokens", +] as const satisfies ReadonlyArray; + /** Return whether any token counter is present on a usage record. */ export function hasAgentTurnUsage( usage: AgentTurnUsage | undefined, @@ -31,18 +38,54 @@ export function hasAgentTurnUsage( ); } -/** Sum token counters across turn slices while preserving absent fields. */ +function getFiniteCount(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) + ? Math.max(0, Math.floor(value)) + : undefined; +} + +function getComponentTotal(usage: AgentTurnUsage): number | undefined { + let total: number | undefined; + for (const field of COMPONENT_USAGE_FIELDS) { + const value = getFiniteCount(usage[field]); + if (value === undefined) continue; + total = (total ?? 0) + value; + } + return total; +} + +/** Aggregate token usage across slices without double-counting provider totals. */ export function addAgentTurnUsage( ...usages: Array ): AgentTurnUsage | undefined { - const total: AgentTurnUsage = {}; + const components: AgentTurnUsage = {}; + let componentTotal: number | undefined; + let totalOnlyTokens: number | undefined; + for (const usage of usages) { if (!usage) continue; - for (const field of Object.keys(usage) as (keyof AgentTurnUsage)[]) { - const value = usage[field]; - if (typeof value !== "number" || !Number.isFinite(value)) continue; - total[field] = (total[field] ?? 0) + value; + const usageComponentTotal = getComponentTotal(usage); + if (usageComponentTotal !== undefined) { + componentTotal = (componentTotal ?? 0) + usageComponentTotal; + for (const field of COMPONENT_USAGE_FIELDS) { + const value = getFiniteCount(usage[field]); + if (value === undefined) continue; + components[field] = (components[field] ?? 0) + value; + } + continue; + } + + const totalTokens = getFiniteCount(usage.totalTokens); + if (totalTokens !== undefined) { + totalOnlyTokens = (totalOnlyTokens ?? 0) + totalTokens; } } - return hasAgentTurnUsage(total) ? total : undefined; + + if (totalOnlyTokens !== undefined) { + return { + totalTokens: totalOnlyTokens + (componentTotal ?? 0), + }; + } + + return hasAgentTurnUsage(components) ? components : undefined; } diff --git a/packages/junior/src/handlers/oauth-callback.ts b/packages/junior/src/handlers/oauth-callback.ts index 77da8a88..67e28bdb 100644 --- a/packages/junior/src/handlers/oauth-callback.ts +++ b/packages/junior/src/handlers/oauth-callback.ts @@ -264,6 +264,8 @@ async function resumeCheckpointedOAuthTurn( fullName: userMessage.author.fullName, }, correlation: { + conversationId: stored.resumeConversationId, + turnId: resolvedSessionId, channelId: stored.channelId, threadTs: stored.threadTs, requesterId: userMessage.author.userId, diff --git a/packages/junior/tests/integration/oauth-resume-slack.test.ts b/packages/junior/tests/integration/oauth-resume-slack.test.ts index 43b6b926..15e9b7b8 100644 --- a/packages/junior/tests/integration/oauth-resume-slack.test.ts +++ b/packages/junior/tests/integration/oauth-resume-slack.test.ts @@ -124,6 +124,83 @@ describe("oauth resume slack integration", () => { ]); }, 10_000); + it("uses cumulative checkpoint diagnostics for resumed reply footers", async () => { + const { resumeAuthorizedRequest } = + await import("@/chat/runtime/slack-resume"); + const { upsertAgentTurnSessionCheckpoint } = + await import("@/chat/state/turn-session-store"); + + await upsertAgentTurnSessionCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 2, + state: "awaiting_resume", + piMessages: [], + resumeReason: "timeout", + cumulativeDurationMs: 1_000, + cumulativeUsage: { + totalTokens: 1_000, + }, + }); + + await resumeAuthorizedRequest({ + messageText: "continue this turn", + channelId: "C123", + threadTs: "1700000000.007", + connectedText: "", + replyContext: { + requester: { userId: "U123" }, + correlation: { + conversationId: "conversation-1", + turnId: "turn-1", + }, + }, + generateReply: async () => + ({ + text: "done", + diagnostics: makeDiagnostics("success", { + durationMs: 500, + usage: { + outputTokens: 7, + }, + }), + }) as any, + }); + + expect(getCapturedSlackApiCalls("chat.postMessage")).toEqual([ + expect.objectContaining({ + params: expect.objectContaining({ + channel: "C123", + thread_ts: "1700000000.007", + text: "done", + blocks: [ + { + type: "markdown", + text: "done", + }, + { + type: "context", + elements: expect.arrayContaining([ + { + type: "mrkdwn", + text: "*ID:* conversation-1", + }, + { + type: "mrkdwn", + text: "*Tokens:* 1k", + }, + { + type: "mrkdwn", + text: "*Time:* 1.5s", + }, + ]), + }, + ], + }), + }), + ]); + }); + it("chunks long resumed replies into explicit continuation messages", async () => { const { resumeAuthorizedRequest } = await import("@/chat/runtime/slack-resume"); diff --git a/packages/junior/tests/integration/slack/bot-handlers.test.ts b/packages/junior/tests/integration/slack/bot-handlers.test.ts index 798393bb..005c63f5 100644 --- a/packages/junior/tests/integration/slack/bot-handlers.test.ts +++ b/packages/junior/tests/integration/slack/bot-handlers.test.ts @@ -4,6 +4,10 @@ import { makeAssistantStatus } from "@/chat/slack/assistant-thread/status"; import { getSlackInterruptionMarker } from "@/chat/slack/output"; import { RetryableTurnError } from "@/chat/runtime/turn"; import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; +import { + getCapturedSlackApiCalls, + resetSlackApiMockState, +} from "../../msw/handlers/slack-api"; import { FakeSlackAdapter, createTestThread, @@ -77,6 +81,7 @@ function createAwaitingContinuationState(args: { describe("bot handlers (integration)", () => { afterEach(() => { + resetSlackApiMockState(); vi.restoreAllMocks(); }); @@ -556,6 +561,78 @@ describe("bot handlers (integration)", () => { expect(conversation?.processing?.activeTurnId).toBe(sessionId); }); + it("posts a Slack continuation notice with a correlation footer when a live turn times out", async () => { + resetSlackApiMockState(); + const scheduleTurnTimeoutResume = vi.fn().mockResolvedValue(undefined); + const conversationId = "slack:C_TIMEOUT_API:1700000000.000"; + const sessionId = "turn_msg-timeout-api"; + const { slackRuntime } = createRuntime({ + services: { + replyExecutor: { + scheduleTurnTimeoutResume, + generateAssistantReply: async () => { + throw new RetryableTurnError( + "turn_timeout_resume", + "simulated timeout continuation", + { + conversationId, + sessionId, + checkpointVersion: 3, + sliceId: 2, + }, + ); + }, + }, + }, + }); + + const thread = createTestThread({ id: conversationId }); + (thread.adapter as { name?: string }).name = "slack"; + + await expect( + slackRuntime.handleNewMention( + thread, + createTestMessage({ + id: "msg-timeout-api", + threadId: conversationId, + text: "please keep working", + isMention: true, + }), + ), + ).resolves.toBeUndefined(); + + expect(scheduleTurnTimeoutResume).toHaveBeenCalledWith({ + conversationId, + sessionId, + expectedCheckpointVersion: 3, + }); + expect(thread.posts).toEqual([]); + expect(getCapturedSlackApiCalls("chat.postMessage")).toEqual([ + expect.objectContaining({ + params: expect.objectContaining({ + channel: "C_TIMEOUT_API", + thread_ts: "1700000000.000", + text: buildTurnContinuationResponse(), + blocks: [ + { + type: "markdown", + text: buildTurnContinuationResponse(), + }, + { + type: "context", + elements: [ + { + type: "mrkdwn", + text: `*ID:* ${conversationId}`, + }, + ], + }, + ], + }), + }), + ]); + }); + it("reschedules an awaiting turn continuation instead of starting a new turn", async () => { const conversationId = "slack:C_TIMEOUT_RETRY:1700000000.000"; const activeSessionId = "turn_msg-original"; diff --git a/packages/junior/tests/integration/turn-resume-slack.test.ts b/packages/junior/tests/integration/turn-resume-slack.test.ts index c1e7ba4f..8cfa6bfa 100644 --- a/packages/junior/tests/integration/turn-resume-slack.test.ts +++ b/packages/junior/tests/integration/turn-resume-slack.test.ts @@ -1,6 +1,7 @@ import { Buffer } from "node:buffer"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { WaitUntilFn } from "@/handlers/types"; +import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; import { getCapturedSlackApiCalls, getCapturedSlackFileUploadCalls, @@ -351,6 +352,124 @@ describe("turn resume slack integration", () => { expect(conversation.processing?.activeTurnId).toBeUndefined(); }); + it("posts a continuation notice with a correlation footer when a resumed slice times out again", async () => { + const conversationId = "slack:C123:1712345.0006"; + const sessionId = "turn_msg_6"; + const checkpoint = + await turnSessionStoreModule.upsertAgentTurnSessionCheckpoint({ + conversationId, + sessionId, + sliceId: 2, + state: "awaiting_resume", + piMessages: [ + { + role: "user", + content: [{ type: "text", text: "hello" }], + timestamp: 1, + }, + ], + loadedSkillNames: ["demo-skill"], + resumeReason: "timeout", + resumedFromSliceId: 1, + errorMessage: "Agent turn timed out", + }); + + await threadStateModule.persistThreadStateById(conversationId, { + artifacts: { + listColumnMap: {}, + }, + conversation: { + schemaVersion: 1, + backfill: {}, + compactions: [], + piMessages: [], + messages: [ + { + id: "msg.6", + role: "user", + text: "resume this request", + createdAtMs: 1, + author: { + userId: "U123", + }, + }, + ], + processing: { + activeTurnId: sessionId, + }, + stats: { + compactedMessageCount: 0, + estimatedContextTokens: 0, + totalMessageCount: 1, + updatedAtMs: 1, + }, + vision: { + byFileId: {}, + }, + }, + }); + + const { RetryableTurnError } = await import("@/chat/runtime/turn"); + generateAssistantReplyMock.mockRejectedValueOnce( + new RetryableTurnError("turn_timeout_resume", "timed out again", { + conversationId, + sessionId, + checkpointVersion: checkpoint.checkpointVersion + 1, + sliceId: 3, + }), + ); + + const response = await turnResumeHandlerModule.POST( + await buildSignedTurnResumeRequest({ + conversationId, + sessionId, + expectedCheckpointVersion: checkpoint.checkpointVersion, + }), + testWaitUntil, + ); + + expect(response.status).toBe(202); + expect(waitUntilCallbacks).toHaveLength(1); + + const originalFetch = global.fetch; + const fetchMock = vi.fn( + async () => new Response("Accepted", { status: 202 }), + ); + global.fetch = fetchMock as typeof fetch; + try { + await waitUntilCallbacks[0]?.(); + } finally { + global.fetch = originalFetch; + } + + const postCalls = getCapturedSlackApiCalls("chat.postMessage"); + expect(postCalls).toEqual([ + expect.objectContaining({ + params: expect.objectContaining({ + channel: "C123", + thread_ts: "1712345.0006", + text: buildTurnContinuationResponse(), + blocks: [ + { + type: "markdown", + text: buildTurnContinuationResponse(), + }, + { + type: "context", + elements: [ + { + type: "mrkdwn", + text: `*ID:* ${conversationId}`, + }, + ], + }, + ], + }), + }), + ]); + expect(fetchMock).toHaveBeenCalledOnce(); + }); + it("uploads resumed reply files through the shared delivery path", async () => { const conversationId = "slack:C123:1712345.0003"; const sessionId = "turn_msg_3"; diff --git a/packages/junior/tests/unit/handlers/oauth-resume.test.ts b/packages/junior/tests/unit/handlers/oauth-resume.test.ts index bf0a3223..604c0cf6 100644 --- a/packages/junior/tests/unit/handlers/oauth-resume.test.ts +++ b/packages/junior/tests/unit/handlers/oauth-resume.test.ts @@ -1,7 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { RetryableTurnError } from "@/chat/runtime/turn"; import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter"; -import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response"; const { logExceptionMock, postMessageMock, setStatusMock } = vi.hoisted(() => ({ logExceptionMock: vi.fn(), @@ -182,105 +181,8 @@ describe("resumeAuthorizedRequest", () => { expect(onTimeoutPause).toHaveBeenCalledTimes(1); expect(postMessageMock).toHaveBeenCalledWith( expect.objectContaining({ - blocks: [ - { - type: "markdown", - text: buildTurnContinuationResponse(), - }, - { - type: "context", - elements: [ - { - type: "mrkdwn", - text: "*ID:* slack:C-test:1700000000.0002", - }, - ], - }, - ], channel: "C-test", thread_ts: "1700000000.0002", - text: buildTurnContinuationResponse(), - }), - ); - }); - - it("uses cumulative checkpoint diagnostics for timeout resume footers", async () => { - const { upsertAgentTurnSessionCheckpoint } = - await import("@/chat/state/turn-session-store"); - - await upsertAgentTurnSessionCheckpoint({ - conversationId: "conversation-1", - sessionId: "turn-1", - sliceId: 2, - state: "awaiting_resume", - piMessages: [], - resumeReason: "timeout", - cumulativeDurationMs: 1_000, - cumulativeUsage: { - inputTokens: 2, - outputTokens: 3, - }, - }); - - await resumeSlackTurn({ - messageText: "continue this turn", - channelId: "C-test", - threadTs: "1700000000.0005", - lockKey: "slack:C-test:1700000000.0005", - replyContext: { - requester: { userId: "U-test" }, - correlation: { - conversationId: "conversation-1", - turnId: "turn-1", - }, - }, - generateReply: async () => - ({ - text: "done", - diagnostics: { - assistantMessageCount: 1, - durationMs: 500, - modelId: "fake-agent-model", - outcome: "success", - toolCalls: [], - toolErrorCount: 0, - toolResultCount: 0, - usage: { - outputTokens: 7, - }, - usedPrimaryText: true, - }, - }) as any, - }); - - expect(postMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "C-test", - thread_ts: "1700000000.0005", - text: "done", - blocks: [ - { - type: "markdown", - text: "done", - }, - { - type: "context", - elements: expect.arrayContaining([ - { - type: "mrkdwn", - text: "*ID:* conversation-1", - }, - { - type: "mrkdwn", - text: "*Tokens:* 12", - }, - { - type: "mrkdwn", - text: "*Time:* 1.5s", - }, - ]), - }, - ], }), ); }); diff --git a/packages/junior/tests/unit/misc/sandbox-executor.test.ts b/packages/junior/tests/unit/misc/sandbox-executor.test.ts index 0ea0d5cd..e5941503 100644 --- a/packages/junior/tests/unit/misc/sandbox-executor.test.ts +++ b/packages/junior/tests/unit/misc/sandbox-executor.test.ts @@ -228,6 +228,8 @@ describe("createSandboxExecutor", () => { delete process.env.VERCEL_PROJECT_ID; delete process.env.VERCEL_OIDC_TOKEN; delete process.env.VERCEL_SANDBOX_KEEPALIVE_MS; + delete process.env.JUNIOR_EVAL_ENABLE_FAULTS; + delete process.env.JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS; delete process.env.EVAL_ENABLE_TEST_CREDENTIALS; process.env.JUNIOR_BASE_URL = "https://junior.example.com"; }); @@ -827,6 +829,87 @@ describe("createSandboxExecutor", () => { }); }); + it("returns a failed bash result when the command stream ends without a status", async () => { + const streamError = Object.assign( + new Error("Stream ended before command finished"), + { name: "StreamError" }, + ); + const sandbox = makeSandbox("sbx_stream_interrupted"); + sandbox.runCommand.mockRejectedValueOnce(streamError); + sandboxGetMock.mockResolvedValue(sandbox); + vi.mocked(createBashTool).mockResolvedValue({ + tools: { + readFile: { execute: vi.fn(async () => ({ content: "" })) }, + writeFile: { execute: vi.fn(async () => ({ success: true })) }, + }, + } as never); + + const executor = createSandboxExecutor({ + sandboxId: "sbx_stream_interrupted", + }); + executor.configureSkills([]); + + const response = await executor.execute({ + toolName: "bash", + input: { + command: "pnpm test", + }, + }); + + expect(response.result).toMatchObject({ + ok: false, + exit_code: 125, + stderr: + "Command stream ended before the command finished. The command may still have produced side effects; inspect the workspace or rerun only if it is safe.", + }); + }); + + it("supports eval-only bash stream interruption fault injection", async () => { + process.env.JUNIOR_EVAL_ENABLE_FAULTS = "1"; + process.env.JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS = "1"; + const sandbox = makeSandbox("sbx_fault_injection"); + sandbox.runCommand.mockResolvedValueOnce({ + exitCode: 0, + stdout: async () => "ok\n", + stderr: async () => "", + }); + sandboxGetMock.mockResolvedValue(sandbox); + vi.mocked(createBashTool).mockResolvedValue({ + tools: { + readFile: { execute: vi.fn(async () => ({ content: "" })) }, + writeFile: { execute: vi.fn(async () => ({ success: true })) }, + }, + } as never); + + const executor = createSandboxExecutor({ + sandboxId: "sbx_fault_injection", + }); + executor.configureSkills([]); + + const interrupted = await executor.execute({ + toolName: "bash", + input: { + command: "echo first", + }, + }); + const recovered = await executor.execute({ + toolName: "bash", + input: { + command: "echo second", + }, + }); + + expect(interrupted.result).toMatchObject({ + ok: false, + exit_code: 125, + }); + expect(recovered.result).toMatchObject({ + ok: true, + stdout: "ok\n", + }); + expect(sandbox.runCommand).toHaveBeenCalledTimes(1); + }); + it("routes matching bash commands through custom command handler", async () => { const sandbox = makeSandbox("sbx_custom"); sandboxGetMock.mockResolvedValue(sandbox); diff --git a/packages/junior/tests/unit/services/turn-checkpoint.test.ts b/packages/junior/tests/unit/services/turn-checkpoint.test.ts index 238da5cd..efd97f08 100644 --- a/packages/junior/tests/unit/services/turn-checkpoint.test.ts +++ b/packages/junior/tests/unit/services/turn-checkpoint.test.ts @@ -148,4 +148,136 @@ describe("persistAuthPauseCheckpoint", () => { }, }); }); + + it("stores running checkpoints only at continuable message boundaries", async () => { + const { persistRunningCheckpoint } = + await import("@/chat/services/turn-checkpoint"); + const { getAgentTurnSessionCheckpoint } = + await import("@/chat/state/turn-session-store"); + const userBoundary: PiMessage[] = [ + { + role: "user", + content: [{ type: "text", text: "help me" }], + timestamp: 1, + }, + ]; + const unsafeAssistantBoundary: PiMessage[] = [ + ...userBoundary, + { + role: "assistant", + content: [{ type: "text", text: "working" }], + timestamp: 2, + } as PiMessage, + ]; + const toolResultBoundary: PiMessage[] = [ + ...unsafeAssistantBoundary, + { + role: "toolResult", + toolCallId: "call-1", + toolName: "bash", + content: [{ type: "text", text: "ok" }], + timestamp: 3, + } as PiMessage, + ]; + + await persistRunningCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + messages: userBoundary, + loadedSkillNames: [], + logContext: { + modelId: "test-model", + }, + }); + + await persistRunningCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + messages: unsafeAssistantBoundary, + loadedSkillNames: [], + logContext: { + modelId: "test-model", + }, + }); + + let checkpoint = await getAgentTurnSessionCheckpoint( + "conversation-1", + "turn-1", + ); + expect(checkpoint).toMatchObject({ + state: "running", + piMessages: userBoundary, + }); + + await persistRunningCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + messages: toolResultBoundary, + loadedSkillNames: ["demo-skill"], + logContext: { + modelId: "test-model", + }, + }); + + checkpoint = await getAgentTurnSessionCheckpoint( + "conversation-1", + "turn-1", + ); + expect(checkpoint).toMatchObject({ + state: "running", + loadedSkillNames: ["demo-skill"], + piMessages: toolResultBoundary, + }); + }); + + it("promotes the latest running checkpoint when timeout capture has no messages", async () => { + const { persistTimeoutCheckpoint, persistRunningCheckpoint } = + await import("@/chat/services/turn-checkpoint"); + const { getAgentTurnSessionCheckpoint } = + await import("@/chat/state/turn-session-store"); + const messages: PiMessage[] = [ + { + role: "user", + content: [{ type: "text", text: "help me" }], + timestamp: 1, + }, + ]; + + await persistRunningCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + messages, + loadedSkillNames: ["demo-skill"], + logContext: { + modelId: "test-model", + }, + }); + + await persistTimeoutCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + currentSliceId: 1, + messages: [], + loadedSkillNames: ["demo-skill"], + errorMessage: "provider stream interrupted", + logContext: { + modelId: "test-model", + }, + }); + + const checkpoint = await getAgentTurnSessionCheckpoint( + "conversation-1", + "turn-1", + ); + expect(checkpoint).toMatchObject({ + state: "awaiting_resume", + resumeReason: "timeout", + sliceId: 2, + piMessages: messages, + }); + }); }); diff --git a/packages/junior/tests/unit/usage.test.ts b/packages/junior/tests/unit/usage.test.ts new file mode 100644 index 00000000..a94a407b --- /dev/null +++ b/packages/junior/tests/unit/usage.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it } from "vitest"; +import { addAgentTurnUsage } from "@/chat/usage"; + +describe("addAgentTurnUsage", () => { + it("preserves component counters when all slices report components", () => { + expect( + addAgentTurnUsage( + { inputTokens: 10, outputTokens: 3 }, + { outputTokens: 7, cachedInputTokens: 2 }, + ), + ).toEqual({ + inputTokens: 10, + outputTokens: 10, + cachedInputTokens: 2, + }); + }); + + it("uses provider totals only for slices without component counters", () => { + expect( + addAgentTurnUsage( + { totalTokens: 1_000 }, + { outputTokens: 7 }, + { inputTokens: 2, outputTokens: 3, totalTokens: 999 }, + ), + ).toEqual({ + totalTokens: 1_012, + }); + }); +}); diff --git a/specs/slack-agent-delivery-spec.md b/specs/slack-agent-delivery-spec.md index 560c4a6c..a00b6e71 100644 --- a/specs/slack-agent-delivery-spec.md +++ b/specs/slack-agent-delivery-spec.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-04-15 -- Last Edited: 2026-05-19 +- Last Edited: 2026-05-20 ## Changelog @@ -25,6 +25,7 @@ - 2026-05-16: Added automatic processing reactions for Slack messages Junior is handling or evaluating for handling. - 2026-05-19: Restored the visible URL-free auth-pause thread acknowledgement and required processing reaction restoration during auth resumes. - 2026-05-19: Deferred subscribed-thread processing reactions until after routing approves a reply. +- 2026-05-20: Allowed timeout continuation acknowledgements to include correlation-only footer metadata for traceability. ## Status @@ -173,7 +174,7 @@ Current rules: 4. If explicit user intent requested an in-channel post and that post already satisfied the request, Junior may suppress the thread text reply according to the reply-delivery plan. 5. Persisted assistant conversation state must reflect the same finalized reply content the user saw, not provisional pre-tool text. 6. Reply text must be rendered through the shared Slack output translator before delivery; raw Slack API writers do not own markdown translation rules. -7. When Junior adds reply footer metadata, it attaches that metadata as a Slack `context` block on the final text chunk only, while keeping the main reply text as the top-level fallback. +7. When Junior adds finalized reply footer metadata, it attaches that metadata as a Slack `context` block on the final text chunk only, while keeping the main reply text as the top-level fallback. 8. Footer metadata is derived from structured reply diagnostics and correlation state. Conversation ID, selected thinking level, token totals, and turn duration may be shown when available; footer rendering must not scrape logs or spans after the fact. 9. Footer metadata is not an assistant-status surface and must not be used to convey in-flight progress. @@ -244,6 +245,7 @@ Current rules: 9. When a turn checkpoint is scheduled for automatic continuation, Junior must post a durable thread acknowledgement that the turn is continuing in the background. Assistant status alone is not sufficient because it is best effort and expires independently of thread history. 10. If a user follow-up or duplicate delivery hits the same awaiting continuation, Junior should acknowledge the existing continuation instead of creating a second visible turn. Checkpoint rescheduling mechanics belong to `./agent-session-resumability-spec.md`. 11. Turn-continuation acknowledgements are not final assistant replies. They do not mark the original turn completed, and the final resumed answer must still be delivered through the normal finalized-reply path. +12. Turn-continuation acknowledgements may include a correlation-only footer with the conversation ID or trace link so operators can connect the durable notice to diagnostics. They must not include final-turn duration, token usage, or thinking-level metadata because those belong to the finalized reply. ### 12. Testing Contract From e44ae95f5252a719df020d3c17ece92943cf8ffe Mon Sep 17 00:00:00 2001 From: David Cramer Date: Wed, 20 May 2026 20:36:08 -0700 Subject: [PATCH 3/4] fix(junior): Keep completed checkpoint failures non-fatal Make completed checkpoint persistence best-effort like the running, timeout, and auth checkpoint paths so state adapter failures cannot block an already generated reply. Co-Authored-By: GPT-5 Codex --- packages/junior/src/chat/respond.ts | 8 +++ .../src/chat/services/turn-checkpoint.ts | 69 +++++++++++++------ .../unit/services/turn-checkpoint.test.ts | 56 +++++++++++++++ 3 files changed, 113 insertions(+), 20 deletions(-) diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index 18f7668a..3e5a19a1 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -1134,6 +1134,14 @@ export async function generateAssistantReply( sliceId: currentSliceId, allMessages: agent.state.messages, loadedSkillNames: activeSkills.map((skill) => skill.name), + logContext: { + threadId: context.correlation?.threadId, + requesterId: context.correlation?.requesterId, + channelId: context.correlation?.channelId, + runId: context.correlation?.runId, + assistantUserName: botConfig.userName, + modelId: botConfig.modelId, + }, }); } diff --git a/packages/junior/src/chat/services/turn-checkpoint.ts b/packages/junior/src/chat/services/turn-checkpoint.ts index 56e2f59f..fcca7fc0 100644 --- a/packages/junior/src/chat/services/turn-checkpoint.ts +++ b/packages/junior/src/chat/services/turn-checkpoint.ts @@ -131,27 +131,56 @@ export async function persistCompletedCheckpoint(args: { sliceId: number; allMessages: PiMessage[]; loadedSkillNames: string[]; + logContext: { + threadId?: string; + requesterId?: string; + channelId?: string; + runId?: string; + assistantUserName?: string; + modelId: string; + }; }): Promise { - const latestCheckpoint = await getAgentTurnSessionCheckpoint( - args.conversationId, - args.sessionId, - ); - await upsertAgentTurnSessionCheckpoint({ - conversationId: args.conversationId, - cumulativeDurationMs: addDurationMs( - latestCheckpoint?.cumulativeDurationMs, - args.currentDurationMs, - ), - cumulativeUsage: addAgentTurnUsage( - latestCheckpoint?.cumulativeUsage, - args.currentUsage, - ), - sessionId: args.sessionId, - sliceId: args.sliceId, - state: "completed", - piMessages: args.allMessages, - loadedSkillNames: args.loadedSkillNames, - }); + try { + const latestCheckpoint = await getAgentTurnSessionCheckpoint( + args.conversationId, + args.sessionId, + ); + await upsertAgentTurnSessionCheckpoint({ + conversationId: args.conversationId, + cumulativeDurationMs: addDurationMs( + latestCheckpoint?.cumulativeDurationMs, + args.currentDurationMs, + ), + cumulativeUsage: addAgentTurnUsage( + latestCheckpoint?.cumulativeUsage, + args.currentUsage, + ), + sessionId: args.sessionId, + sliceId: args.sliceId, + state: "completed", + piMessages: args.allMessages, + loadedSkillNames: args.loadedSkillNames, + }); + } catch (checkpointError) { + logException( + checkpointError, + "agent_turn_completed_checkpoint_failed", + { + slackThreadId: args.logContext.threadId, + slackUserId: args.logContext.requesterId, + slackChannelId: args.logContext.channelId, + runId: args.logContext.runId, + assistantUserName: args.logContext.assistantUserName, + modelId: args.logContext.modelId, + }, + { + "app.ai.resume_conversation_id": args.conversationId, + "app.ai.resume_session_id": args.sessionId, + "app.ai.resume_slice_id": args.sliceId, + }, + "Failed to persist completed turn checkpoint", + ); + } } /** diff --git a/packages/junior/tests/unit/services/turn-checkpoint.test.ts b/packages/junior/tests/unit/services/turn-checkpoint.test.ts index efd97f08..8ef70d61 100644 --- a/packages/junior/tests/unit/services/turn-checkpoint.test.ts +++ b/packages/junior/tests/unit/services/turn-checkpoint.test.ts @@ -17,6 +17,8 @@ describe("persistAuthPauseCheckpoint", () => { afterEach(async () => { const { disconnectStateAdapter } = await import("@/chat/state/adapter"); await disconnectStateAdapter(); + vi.doUnmock("@/chat/logging"); + vi.doUnmock("@/chat/state/turn-session-store"); vi.resetModules(); process.env = { ...ORIGINAL_ENV }; }); @@ -149,6 +151,60 @@ describe("persistAuthPauseCheckpoint", () => { }); }); + it("does not fail a completed turn when checkpoint persistence fails", async () => { + const logException = vi.fn(); + vi.doMock("@/chat/logging", () => ({ + logException, + })); + vi.doMock("@/chat/state/turn-session-store", () => ({ + getAgentTurnSessionCheckpoint: vi.fn(async () => { + throw new Error("state adapter unavailable"); + }), + upsertAgentTurnSessionCheckpoint: vi.fn(), + })); + const { persistCompletedCheckpoint } = + await import("@/chat/services/turn-checkpoint"); + + await expect( + persistCompletedCheckpoint({ + conversationId: "conversation-1", + sessionId: "turn-1", + sliceId: 1, + allMessages: [ + { + role: "user", + content: [{ type: "text", text: "help me" }], + timestamp: 1, + }, + ], + loadedSkillNames: [], + logContext: { + channelId: "C123", + modelId: "test-model", + requesterId: "U123", + threadId: "slack:C123:1", + }, + }), + ).resolves.toBeUndefined(); + + expect(logException).toHaveBeenCalledWith( + expect.any(Error), + "agent_turn_completed_checkpoint_failed", + expect.objectContaining({ + modelId: "test-model", + slackChannelId: "C123", + slackThreadId: "slack:C123:1", + slackUserId: "U123", + }), + expect.objectContaining({ + "app.ai.resume_conversation_id": "conversation-1", + "app.ai.resume_session_id": "turn-1", + "app.ai.resume_slice_id": 1, + }), + "Failed to persist completed turn checkpoint", + ); + }); + it("stores running checkpoints only at continuable message boundaries", async () => { const { persistRunningCheckpoint } = await import("@/chat/services/turn-checkpoint"); From a9d6711d70fb3fe19e1bcae4ed06f69d1862c1ce Mon Sep 17 00:00:00 2001 From: David Cramer Date: Wed, 20 May 2026 20:44:03 -0700 Subject: [PATCH 4/4] ref(junior): Simplify turn checkpoint plumbing Share checkpoint logging context and remove a one-use resume helper so the continuation changes stay smaller without changing behavior. Co-Authored-By: GPT-5 Codex --- packages/junior/src/chat/respond.ts | 44 ++---- .../junior/src/chat/runtime/slack-resume.ts | 25 ++-- .../src/chat/services/turn-checkpoint.ts | 128 +++++++----------- 3 files changed, 72 insertions(+), 125 deletions(-) diff --git a/packages/junior/src/chat/respond.ts b/packages/junior/src/chat/respond.ts index 3e5a19a1..ac2f44b8 100644 --- a/packages/junior/src/chat/respond.ts +++ b/packages/junior/src/chat/respond.ts @@ -405,6 +405,14 @@ export async function generateAssistantReply( let timedOut = false; let turnUsage: AgentTurnUsage | undefined; let thinkingSelection: TurnThinkingSelection | undefined; + const checkpointLogContext = { + threadId: context.correlation?.threadId, + requesterId: context.correlation?.requesterId, + channelId: context.correlation?.channelId, + runId: context.correlation?.runId, + assistantUserName: botConfig.userName, + modelId: botConfig.modelId, + }; const getSandboxMetadata = () => sandboxExecutor @@ -945,14 +953,7 @@ export async function generateAssistantReply( sliceId: currentSliceId, messages, loadedSkillNames: loadedSkillNamesForResume, - logContext: { - threadId: context.correlation?.threadId, - requesterId: context.correlation?.requesterId, - channelId: context.correlation?.channelId, - runId: context.correlation?.runId, - assistantUserName: botConfig.userName, - modelId: botConfig.modelId, - }, + logContext: checkpointLogContext, }); }; @@ -1134,14 +1135,7 @@ export async function generateAssistantReply( sliceId: currentSliceId, allMessages: agent.state.messages, loadedSkillNames: activeSkills.map((skill) => skill.name), - logContext: { - threadId: context.correlation?.threadId, - requesterId: context.correlation?.requesterId, - channelId: context.correlation?.channelId, - runId: context.correlation?.runId, - assistantUserName: botConfig.userName, - modelId: botConfig.modelId, - }, + logContext: checkpointLogContext, }); } @@ -1182,14 +1176,7 @@ export async function generateAssistantReply( messages: timeoutResumeMessages, loadedSkillNames: loadedSkillNamesForResume, errorMessage: error instanceof Error ? error.message : String(error), - logContext: { - threadId: context.correlation?.threadId, - requesterId: context.correlation?.requesterId, - channelId: context.correlation?.channelId, - runId: context.correlation?.runId, - assistantUserName: botConfig.userName, - modelId: botConfig.modelId, - }, + logContext: checkpointLogContext, }); if (checkpoint) { throw new RetryableTurnError( @@ -1226,14 +1213,7 @@ export async function generateAssistantReply( messages: timeoutResumeMessages, loadedSkillNames: loadedSkillNamesForResume, errorMessage: error.message, - logContext: { - threadId: context.correlation?.threadId, - requesterId: context.correlation?.requesterId, - channelId: context.correlation?.channelId, - runId: context.correlation?.runId, - assistantUserName: botConfig.userName, - modelId: botConfig.modelId, - }, + logContext: checkpointLogContext, }); throw new RetryableTurnError( error.kind === "plugin" ? "plugin_auth_resume" : "mcp_auth_resume", diff --git a/packages/junior/src/chat/runtime/slack-resume.ts b/packages/junior/src/chat/runtime/slack-resume.ts index ec5632a1..69c8ab57 100644 --- a/packages/junior/src/chat/runtime/slack-resume.ts +++ b/packages/junior/src/chat/runtime/slack-resume.ts @@ -264,19 +264,6 @@ function createResumeReplyContext( }; } -async function getResumeCheckpoint(args: { - conversationId?: string; - sessionId?: string; -}) { - if (!args.conversationId || !args.sessionId) { - return undefined; - } - return await getAgentTurnSessionCheckpoint( - args.conversationId, - args.sessionId, - ); -} - /** * Resume a paused Slack turn under the normal thread lock. * @@ -330,10 +317,14 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) { const generateReply = args.generateReply ?? generateAssistantReply; const replyContext = createResumeReplyContext(args, status); - const priorCheckpoint = await getResumeCheckpoint({ - conversationId: replyContext.correlation?.conversationId, - sessionId: replyContext.correlation?.turnId, - }); + const priorCheckpoint = + replyContext.correlation?.conversationId && + replyContext.correlation?.turnId + ? await getAgentTurnSessionCheckpoint( + replyContext.correlation.conversationId, + replyContext.correlation.turnId, + ) + : undefined; const replyPromise = generateReply(args.messageText, replyContext); const replyTimeoutMs = resolveReplyTimeoutMs(args.replyTimeoutMs); let reply = diff --git a/packages/junior/src/chat/services/turn-checkpoint.ts b/packages/junior/src/chat/services/turn-checkpoint.ts index fcca7fc0..9e9f8919 100644 --- a/packages/junior/src/chat/services/turn-checkpoint.ts +++ b/packages/junior/src/chat/services/turn-checkpoint.ts @@ -23,6 +23,46 @@ export interface TurnCheckpointState { existingCheckpoint?: AgentTurnSessionCheckpoint; } +interface CheckpointLogContext { + threadId?: string; + requesterId?: string; + channelId?: string; + runId?: string; + assistantUserName?: string; + modelId: string; +} + +function logCheckpointError( + error: unknown, + eventName: string, + args: { + conversationId: string; + sessionId: string; + logContext: CheckpointLogContext; + }, + attributes: Record, + message: string, +): void { + logException( + error, + eventName, + { + slackThreadId: args.logContext.threadId, + slackUserId: args.logContext.requesterId, + slackChannelId: args.logContext.channelId, + runId: args.logContext.runId, + assistantUserName: args.logContext.assistantUserName, + modelId: args.logContext.modelId, + }, + { + "app.ai.resume_conversation_id": args.conversationId, + "app.ai.resume_session_id": args.sessionId, + ...attributes, + }, + message, + ); +} + function addDurationMs( prior: number | undefined, current: number | undefined, @@ -72,14 +112,7 @@ export async function persistRunningCheckpoint(args: { sliceId: number; messages: PiMessage[]; loadedSkillNames: string[]; - logContext: { - threadId?: string; - requesterId?: string; - channelId?: string; - runId?: string; - assistantUserName?: string; - modelId: string; - }; + logContext: CheckpointLogContext; }): Promise { if (args.messages.length === 0 || !isContinuableBoundary(args.messages)) { return; @@ -101,20 +134,11 @@ export async function persistRunningCheckpoint(args: { loadedSkillNames: args.loadedSkillNames, }); } catch (checkpointError) { - logException( + logCheckpointError( checkpointError, "agent_turn_running_checkpoint_failed", + args, { - slackThreadId: args.logContext.threadId, - slackUserId: args.logContext.requesterId, - slackChannelId: args.logContext.channelId, - runId: args.logContext.runId, - assistantUserName: args.logContext.assistantUserName, - modelId: args.logContext.modelId, - }, - { - "app.ai.resume_conversation_id": args.conversationId, - "app.ai.resume_session_id": args.sessionId, "app.ai.resume_slice_id": args.sliceId, }, "Failed to persist running turn checkpoint", @@ -131,14 +155,7 @@ export async function persistCompletedCheckpoint(args: { sliceId: number; allMessages: PiMessage[]; loadedSkillNames: string[]; - logContext: { - threadId?: string; - requesterId?: string; - channelId?: string; - runId?: string; - assistantUserName?: string; - modelId: string; - }; + logContext: CheckpointLogContext; }): Promise { try { const latestCheckpoint = await getAgentTurnSessionCheckpoint( @@ -162,20 +179,11 @@ export async function persistCompletedCheckpoint(args: { loadedSkillNames: args.loadedSkillNames, }); } catch (checkpointError) { - logException( + logCheckpointError( checkpointError, "agent_turn_completed_checkpoint_failed", + args, { - slackThreadId: args.logContext.threadId, - slackUserId: args.logContext.requesterId, - slackChannelId: args.logContext.channelId, - runId: args.logContext.runId, - assistantUserName: args.logContext.assistantUserName, - modelId: args.logContext.modelId, - }, - { - "app.ai.resume_conversation_id": args.conversationId, - "app.ai.resume_session_id": args.sessionId, "app.ai.resume_slice_id": args.sliceId, }, "Failed to persist completed turn checkpoint", @@ -196,14 +204,7 @@ export async function persistAuthPauseCheckpoint(args: { messages: PiMessage[]; loadedSkillNames: string[]; errorMessage: string; - logContext: { - threadId?: string; - requesterId?: string; - channelId?: string; - runId?: string; - assistantUserName?: string; - modelId: string; - }; + logContext: CheckpointLogContext; }): Promise { const nextSliceId = args.currentSliceId + 1; try { @@ -236,20 +237,11 @@ export async function persistAuthPauseCheckpoint(args: { errorMessage: args.errorMessage, }); } catch (checkpointError) { - logException( + logCheckpointError( checkpointError, "agent_turn_auth_resume_checkpoint_failed", + args, { - slackThreadId: args.logContext.threadId, - slackUserId: args.logContext.requesterId, - slackChannelId: args.logContext.channelId, - runId: args.logContext.runId, - assistantUserName: args.logContext.assistantUserName, - modelId: args.logContext.modelId, - }, - { - "app.ai.resume_conversation_id": args.conversationId, - "app.ai.resume_session_id": args.sessionId, "app.ai.resume_from_slice_id": args.currentSliceId, "app.ai.resume_next_slice_id": nextSliceId, }, @@ -272,14 +264,7 @@ export async function persistTimeoutCheckpoint(args: { messages: PiMessage[]; loadedSkillNames: string[]; errorMessage: string; - logContext: { - threadId?: string; - requesterId?: string; - channelId?: string; - runId?: string; - assistantUserName?: string; - modelId: string; - }; + logContext: CheckpointLogContext; }): Promise { const nextSliceId = args.currentSliceId + 1; @@ -313,20 +298,11 @@ export async function persistTimeoutCheckpoint(args: { errorMessage: args.errorMessage, }); } catch (checkpointError) { - logException( + logCheckpointError( checkpointError, "agent_turn_timeout_resume_checkpoint_failed", + args, { - slackThreadId: args.logContext.threadId, - slackUserId: args.logContext.requesterId, - slackChannelId: args.logContext.channelId, - runId: args.logContext.runId, - assistantUserName: args.logContext.assistantUserName, - modelId: args.logContext.modelId, - }, - { - "app.ai.resume_conversation_id": args.conversationId, - "app.ai.resume_session_id": args.sessionId, "app.ai.resume_from_slice_id": args.currentSliceId, "app.ai.resume_next_slice_id": nextSliceId, },