diff --git a/bin/test b/bin/test new file mode 100755 index 00000000..166495fe --- /dev/null +++ b/bin/test @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Runs the same checks as CI by parsing .github/workflows/ci.yml directly. +# If CI steps change, this script automatically picks them up. +# +# Local adaptations: +# - `npm ci` checks if node_modules is in sync with package-lock.json +# and runs a clean install if not (CI always does npm ci). +# - `npm run format:check` checks only git-tracked files because CI +# runs on a clean checkout but locally we have untracked x.* scratch +# files that fail prettier. +set -euo pipefail + +cd "$(git rev-parse --show-toplevel)" + +ci_yaml=".github/workflows/ci.yml" + +if ! command -v yq &>/dev/null; then + echo "error: yq is required (brew install yq)" >&2 + exit 1 +fi + +# Extract run steps +mapfile -t names < <(yq '.jobs.build.steps[] | select(.run) | .name' "$ci_yaml") +mapfile -t commands < <(yq '.jobs.build.steps[] | select(.run) | .run' "$ci_yaml") + +for i in "${!commands[@]}"; do + cmd="${commands[$i]}" + name="${names[$i]}" + + echo "=== ${name} ===" + + if [[ "$cmd" == "npm ci" ]]; then + # Check if node_modules matches package-lock.json. If not, run + # npm ci to match what CI does. This catches stale-dependency bugs + # like sdk-tools.d.ts resolving locally but not in CI. + if npm ls --all >/dev/null 2>&1; then + echo "(node_modules in sync — skipping npm ci)" + else + echo "(node_modules out of sync — running npm ci)" + npm ci + fi + elif [[ "$cmd" == "npm run format:check" ]]; then + # Local override: format:check on git-tracked files only + git ls-files -z '*.ts' '*.tsx' '*.js' '*.jsx' '*.json' '*.md' '*.yml' '*.yaml' '*.css' '*.html' \ + | xargs -0 npx prettier --check + else + eval "$cmd" + fi + + echo "" +done + +echo "=== All CI checks passed ===" diff --git a/src/acp-agent.ts b/src/acp-agent.ts index 972c28f8..a798d739 100644 --- a/src/acp-agent.ts +++ b/src/acp-agent.ts @@ -60,6 +60,7 @@ import { ContentBlockParam } from "@anthropic-ai/sdk/resources"; import { BetaContentBlock, BetaRawContentBlockDelta } from "@anthropic-ai/sdk/resources/beta.mjs"; import { randomUUID } from "node:crypto"; import * as fs from "node:fs"; +import * as fsp from "node:fs/promises"; import * as os from "node:os"; import * as path from "node:path"; import packageJson from "../package.json" with { type: "json" }; @@ -288,7 +289,9 @@ export class ClaudeAcpAgent implements Agent { // Bypasses standard auth by routing requests through a custom Anthropic-protocol gateway. // Only offered when the client advertises `auth._meta.gateway` capability. - const supportsGatewayAuth = request.clientCapabilities?.auth?._meta?.gateway === true; + // auth is an ACP extension not yet in the ClientCapabilities type + const caps = request.clientCapabilities as any; + const supportsGatewayAuth = caps?.auth?._meta?.gateway === true; const gatewayAuthMethod: AuthMethod = { id: "gateway", @@ -308,7 +311,7 @@ export class ClaudeAcpAgent implements Agent { type: "terminal", args: ["--cli"], }; - const supportsTerminalAuth = request.clientCapabilities?.auth?.terminal === true; + const supportsTerminalAuth = caps?.auth?.terminal === true; // If client supports terminal-auth capability, use that instead. const supportsMetaTerminalAuth = request.clientCapabilities?._meta?.["terminal-auth"] === true; @@ -469,7 +472,14 @@ export class ClaudeAcpAgent implements Agent { const promptUuid = randomUUID(); userMessage.uuid = promptUuid; - let promptReplayed = false; + // backgroundInitPending tracks whether a system "init" message arrived + // without any subsequent activity (stream_event, assistant, or system + // messages that indicate real processing). A background task that + // completed after the previous prompt loop ended produces + // init → result with nothing in between. Our own prompt's processing + // always generates stream_event / assistant messages before its result, + // so the flag gets cleared before we see the result. + let backgroundInitPending = false; if (session.promptRunning) { session.input.push(userMessage); @@ -480,15 +490,31 @@ export class ClaudeAcpAgent implements Agent { if (cancelled) { return { stopReason: "cancelled" }; } - // The replay resolved the promise, mark in this loop too, - // so we don't treat the next result as a background task's result. - promptReplayed = true; } else { session.input.push(userMessage); } session.promptRunning = true; let handedOff = false; + // Saved prompt response when consuming internal turns from background + // task completions. See the "success" result handler below. + let savedPromptResponse: PromptResponse | undefined; + // Track background task IDs from system/task_started messages. + // Resolved when we see system/task_notification with a terminal status. + // Used to decide whether to poll for internal turns after a result. + const pendingTaskIds = new Map< + string, + { + outputPath: string | null; + taskType: string; + toolUseId: string | null; + firstSeenAt: number; + lastActivityAt: number; + } + >(); + // Cache output paths parsed from terminal_output before task_started arrives. + // The SDK may emit terminal_output before task_started depending on ordering. + const earlyOutputPaths = new Map(); try { while (true) { @@ -505,6 +531,10 @@ export class ClaudeAcpAgent implements Agent { case "system": switch (message.subtype) { case "init": + // A new processing cycle started. If this is a + // background task, its result will follow immediately + // without intervening stream_event / assistant messages. + backgroundInitPending = true; break; case "status": { if (message.status === "compacting") { @@ -529,7 +559,7 @@ export class ClaudeAcpAgent implements Agent { content: { type: "text", text: "\n\nCompacting completed." }, }, }); - promptReplayed = true; + backgroundInitPending = false; break; } case "local_command_output": { @@ -540,19 +570,49 @@ export class ClaudeAcpAgent implements Agent { content: { type: "text", text: message.content }, }, }); - promptReplayed = true; + backgroundInitPending = false; break; } case "hook_started": case "hook_progress": case "hook_response": case "files_persisted": - case "task_started": - case "task_notification": - case "task_progress": case "elicitation_complete": // Todo: process via status api: https://docs.claude.com/en/docs/claude-code/hooks#hook-output break; + case "task_started": + // Track background task IDs so we can detect when an + // internal turn (task_notification → assistant → result) + // may follow the user's result message. Only track + // local_bash tasks — the SDK already defers results for + // local_agent tasks (via its internal iP() check — + // minified name, likely hasRunningDeferrableTasks). + if (message.task_id && (message as any).task_type === "local_bash") { + const earlyPath = earlyOutputPaths.get(message.task_id); + const now = Date.now(); + pendingTaskIds.set(message.task_id, { + outputPath: earlyPath ?? null, + taskType: (message as any).task_type, + toolUseId: (message as any).tool_use_id ?? null, + firstSeenAt: now, + lastActivityAt: now, + }); + if (earlyPath) earlyOutputPaths.delete(message.task_id); + } + break; + case "task_notification": { + // Resolve the task ID when the SDK reports completion. + const status = (message as any).status; + if ( + message.task_id && + (status === "completed" || status === "failed" || status === "stopped") + ) { + pendingTaskIds.delete(message.task_id); + } + break; + } + case "task_progress": + break; default: unreachable(message, this.logger); break; @@ -586,18 +646,20 @@ export class ClaudeAcpAgent implements Agent { }); } - if (!promptReplayed) { - // This result is from a background task that finished after - // the previous prompt loop ended. Consume it and continue - // waiting for our own prompt's result. - this.logger.log(`Session ${params.sessionId}: consuming background task result`); - break; - } - if (session.cancelled) { return { stopReason: "cancelled" }; } + if (backgroundInitPending) { + // This result immediately followed an init with no + // intervening activity — it belongs to a background task + // that finished after the previous prompt loop ended. + // Consume it and continue waiting for our own result. + this.logger.log(`Session ${params.sessionId}: consuming background task result`); + backgroundInitPending = false; + break; + } + // Build the usage response const usage: PromptResponse["usage"] = { inputTokens: session.accumulatedUsage.inputTokens, @@ -622,6 +684,128 @@ export class ClaudeAcpAgent implements Agent { if (message.is_error) { throw RequestError.internalError(undefined, message.result); } + + // Workaround: detect and consume SDK internal turns from + // background task completions before returning. The SDK defers + // result messages for running local_agent tasks but NOT for + // local_bash background tasks, so background Bash completions + // arrive after the result as: task_notification → assistant → + // result. See x.sdk-trace-bg-leak.ndjson for the observed + // message sequence. + // + // Strategy: keep the turn open indefinitely, polling until + // all background tasks resolve (task_notification arrives) + // or the user cancels. There is no inactivity timeout — the + // turn stays open to guarantee internal turns are consumed. + // + // Defense layers: + // 1. Only poll if we saw task_started during the turn + // (pendingTaskIds tracks unresolved background tasks) + // 2. Each poll iteration scans the SDK internal queue + // (inputStream.queue) for any buffered task_notification + // 3. Output file growth is tracked as a per-task heartbeat + // 4. Cancellation is checked each iteration + // + // This accesses SDK internals (not a public API). The SDK + // doesn't expose an idle/hasPending signal. + // See: x.codex-review-fix2.md, x.codex-review-fix3.md + if (0 < pendingTaskIds.size) { + // Poll indefinitely for background task completion. + // The SDK enqueues task_notification asynchronously after + // the result; polling gives it time to land while also + // monitoring output file growth as a per-task heartbeat. + const POLL_INTERVAL_MS = 1_000; + const PROGRESS_LOG_INTERVAL_MS = 30_000; + let lastProgressLog = 0; // force immediate first log + const fileSizes = new Map(); + + const formatTaskSummary = () => { + const now = Date.now(); + const parts: string[] = []; + for (const [taskId, entry] of pendingTaskIds) { + const ref = entry.lastActivityAt ?? entry.firstSeenAt; + const inactiveFor = Math.round((now - ref) / 1000); + parts.push( + `task_id=${taskId} task_type=${entry.taskType} ` + + `tool_use_id=${entry.toolUseId ?? "unknown"} ` + + `outputPath=${entry.outputPath ?? "unknown"} ` + + `inactiveFor=${inactiveFor}s`, + ); + } + return ( + `[bg-task-poll] waiting for background tasks: ` + + parts.join(", ") + + ` — cancellation risks later prompt contamination` + ); + }; + + // Log immediately on entering the poll loop + this.logger.log(formatTaskSummary()); + lastProgressLog = Date.now(); + + while (0 < pendingTaskIds.size) { + // Yield to event loop + await new Promise((r) => setTimeout(r, POLL_INTERVAL_MS)); + + // Bail early if the session was cancelled during the wait + if (session.cancelled) { + return { stopReason: "cancelled" }; + } + + // Throttled progress log every PROGRESS_LOG_INTERVAL_MS + const now = Date.now(); + if (PROGRESS_LOG_INTERVAL_MS <= now - lastProgressLog) { + lastProgressLog = now; + this.logger.log(formatTaskSummary()); + } + + // Check if task_notification arrived anywhere in the queue. + // Other system messages (e.g., init) may be queued before it, + // so scanning past queue[0] avoids sleeping until timeout. + const queryInternal = session.query as any; + const queueArr = queryInternal.inputStream?.queue; + const hasTaskNotification = + Array.isArray(queueArr) && + queueArr.some( + (m: any) => m.type === "system" && m.subtype === "task_notification", + ); + if (hasTaskNotification) { + // Internal turn ready to consume — save response, + // continue outer while loop + savedPromptResponse = { stopReason: "end_turn", usage }; + break; + } + + // Check output files for activity + for (const [taskId, entry] of pendingTaskIds) { + if (!entry.outputPath) continue; + try { + const stat = await fsp.stat(entry.outputPath); + const prevSize = fileSizes.get(taskId) ?? -1; + if (stat.size !== prevSize) { + fileSizes.set(taskId, stat.size); + entry.lastActivityAt = Date.now(); + this.logger.log( + `[bg-task-poll] output activity — ` + + `task_id=${taskId} size=${stat.size}`, + ); + } + } catch { + // File may not exist yet — that's fine + } + } + } + + // task_notification arrived — continue the outer while + // loop to consume the internal turn + if (savedPromptResponse) break; + } + + // No pending tasks — return the response. + if (savedPromptResponse) { + savedPromptResponse.usage = usage; + return savedPromptResponse; + } return { stopReason: "end_turn", usage }; } case "error_during_execution": @@ -652,6 +836,9 @@ export class ClaudeAcpAgent implements Agent { break; } case "stream_event": { + // Stream events indicate active processing for our prompt, + // so clear any pending background init. + backgroundInitPending = false; for (const notification of streamEventToAcpNotifications( message, params.sessionId, @@ -663,6 +850,27 @@ export class ClaudeAcpAgent implements Agent { cwd: session.cwd, }, )) { + // Extract background task output file path from terminal_output + // data. The SDK emits "Command running in background with ID: {id}. + // Output is being written to: {path}" as terminal output when a + // bash task starts in the background. The terminal_output may arrive + // before or after task_started, so we cache the path in + // earlyOutputPaths if the task isn't tracked yet. + const termData = (notification.update as any)?._meta?.terminal_output?.data; + if (typeof termData === "string") { + const bgMatch = termData.match( + /Command running in background with ID: (\S+)\.\s*Output is being written to: (.+)$/, + ); + if (bgMatch) { + const [, taskId, outputPath] = bgMatch; + const entry = pendingTaskIds.get(taskId); + if (entry) { + entry.outputPath = outputPath; + } else { + earlyOutputPaths.set(taskId, outputPath); + } + } + } await this.client.sessionUpdate(notification); } break; @@ -676,9 +884,9 @@ export class ClaudeAcpAgent implements Agent { // Check for prompt replay if (message.type === "user" && "uuid" in message && message.uuid) { if (message.uuid === promptUuid) { - // Our own prompt was replayed back — we're now processing - // our prompt's response (not a background task's). - promptReplayed = true; + // Our own prompt was replayed back — clear any pending + // background init since we're now processing our prompt. + backgroundInitPending = false; break; } const pending = session.pendingMessages.get(message.uuid as string); @@ -696,6 +904,9 @@ export class ClaudeAcpAgent implements Agent { } } + // Non-replay user/assistant messages indicate active processing. + backgroundInitPending = false; + // Store latest assistant usage (excluding subagents) if ((message.message as any).usage && message.parent_tool_use_id === null) { const messageWithUsage = message.message as unknown as SDKResultMessage; @@ -1866,34 +2077,38 @@ export function toAcpNotifications( } else { // Only register hooks on first encounter to avoid double-firing if (registerHooks && !alreadyCached) { - registerHookCallback(chunk.id, { - onPostToolUseHook: async (toolUseId, toolInput, toolResponse) => { - const toolUse = toolUseCache[toolUseId]; - if (toolUse) { - const editDiff = - toolUse.name === "Edit" ? toolUpdateFromEditToolResponse(toolResponse) : {}; - const update: SessionNotification["update"] = { - _meta: { - claudeCode: { - toolResponse, - toolName: toolUse.name, - }, - } satisfies ToolUpdateMeta, - toolCallId: toolUseId, - sessionUpdate: "tool_call_update", - ...editDiff, - }; - await client.sessionUpdate({ - sessionId, - update, - }); - } else { - logger.error( - `[claude-agent-acp] Got a tool response for tool use that wasn't tracked: ${toolUseId}`, - ); - } + registerHookCallback( + chunk.id, + { + onPostToolUseHook: async (toolUseId, toolInput, toolResponse) => { + const toolUse = toolUseCache[toolUseId]; + if (toolUse) { + const editDiff = + toolUse.name === "Edit" ? toolUpdateFromEditToolResponse(toolResponse) : {}; + const update: SessionNotification["update"] = { + _meta: { + claudeCode: { + toolResponse, + toolName: toolUse.name, + }, + } satisfies ToolUpdateMeta, + toolCallId: toolUseId, + sessionUpdate: "tool_call_update", + ...editDiff, + }; + await client.sessionUpdate({ + sessionId, + update, + }); + } else { + logger.error( + `[claude-agent-acp] Got a tool response for tool use that wasn't tracked: ${toolUseId}`, + ); + } + }, }, - }); + logger, + ); } let rawInput; diff --git a/src/embed.d.ts b/src/embed.d.ts new file mode 100644 index 00000000..8fdd4108 --- /dev/null +++ b/src/embed.d.ts @@ -0,0 +1,26 @@ +// Type declaration for the embed module that only exists in the +// single-file bun build (CLAUDE_AGENT_ACP_IS_SINGLE_FILE_BUN). +declare module "@anthropic-ai/claude-agent-sdk/embed" { + const cliPath: string; + export default cliPath; +} + +// The SDK ships sdk-tools.d.ts but doesn't export ./sdk-tools in +// package.json. This shim lets tsc resolve the .js import path used +// in src/tools.ts. The actual types come from the SDK's sdk-tools.d.ts +// which TypeScript finds via paths resolution. +/// +declare module "@anthropic-ai/claude-agent-sdk/sdk-tools.js" { + export { + AgentInput, + BashInput, + FileEditInput, + FileReadInput, + FileWriteInput, + GlobInput, + GrepInput, + TodoWriteInput, + WebFetchInput, + WebSearchInput, + } from "@anthropic-ai/claude-agent-sdk/sdk-tools"; +} diff --git a/src/tests/authorization.test.ts b/src/tests/authorization.test.ts index fd4a6814..a7e2b00b 100644 --- a/src/tests/authorization.test.ts +++ b/src/tests/authorization.test.ts @@ -148,7 +148,7 @@ describe("authorization", () => { const initializeResponse = await agent.initialize({ protocolVersion: 1, - clientCapabilities: { auth: { terminal: true } }, + clientCapabilities: { auth: { terminal: true } } as any, }); expect(initializeResponse.authMethods).toContainEqual( diff --git a/src/tests/bg-task-leak.test.ts b/src/tests/bg-task-leak.test.ts new file mode 100644 index 00000000..fd5cb6f6 --- /dev/null +++ b/src/tests/bg-task-leak.test.ts @@ -0,0 +1,1134 @@ +/** + * Tests for background task notification leak across prompt turns. + * + * Bug: when a subagent launches a background Bash task (run_in_background), + * and the background task completes after the main turn's `result` message, + * the SDK yields additional messages (task_notification, assistant response, + * second result) that belong to an "internal turn." The adapter's prompt() + * currently returns at the first `result`, leaving the internal turn's + * messages in the iterator buffer. The next prompt() call picks them up, + * causing the model to respond to the background task output instead of + * the user's actual message. + * + * Evidence: captured from x.sdk-trace-bg-leak.ndjson (real SDK output). + * + * The SDK iterator yields this sequence for one ACP prompt turn: + * + * [0..146] system/init, stream_events, assistant, user messages (normal turn) + * [147] stream_event/message_stop (end of streaming) + * [148] result/success ← USER TURN ENDS (prompt() returns here today) + * [149] system/task_notification ← bg bash task completed + * [150] system/init ← internal turn starts + * [151-167] stream_event deltas ← model responding to task_notification + * [168] assistant message ← "The background task completed..." + * [169-171] stream_event stop ← streaming ends + * [172] result/success ← INTERNAL TURN ENDS + * + * The fix should NOT return the prompt response until the iterator is truly + * idle — meaning all internal turns (task_notification → assistant → result) + * have been consumed. Intermediate messages should be forwarded as ACP + * session/update notifications. + */ + +import { describe, it, expect, vi } from "vitest"; +import { ClaudeAcpAgent } from "../acp-agent.js"; +import type { AgentSideConnection } from "@agentclientprotocol/sdk"; +import type { Query, SDKUserMessage } from "@anthropic-ai/claude-agent-sdk"; +import { Pushable } from "../utils.js"; + +// ── Mock helpers ───────────────────────────────────────────────────── + +/** + * Creates a mock Query (AsyncGenerator) that yields messages in sequence. + * After all messages are consumed, next() blocks forever (simulates idle SDK). + */ +function createMockQuery(messages: any[]): Query { + // Simulate the SDK's internal queue structure (q4 in cli.js — + // minified name, likely InputStreamQueue or MessageBuffer). + // The real SDK's Query wraps an inputStream with a queue array. + // Messages are pre-loaded into the queue so that + // `query.inputStream.queue.length` returns the number of unconsumed + // messages — this is how prompt() detects internal turns without + // consuming from the iterator. + const queue = [...messages]; + const gen = { + async next(): Promise> { + if (queue.length > 0) { + return { value: queue.shift(), done: false }; + } + // Block forever — simulates SDK waiting for next user input + return new Promise(() => {}); + }, + async return(): Promise> { + return { value: undefined, done: true }; + }, + async throw(e: any): Promise> { + throw e; + }, + [Symbol.asyncIterator]() { + return gen; + }, + // Expose inputStream.queue to match SDK internals + inputStream: { queue }, + interrupt: vi.fn(async () => {}), + setPermissionMode: vi.fn(async () => {}), + setModel: vi.fn(async () => {}), + } as unknown as Query; + return gen; +} + +/** Creates a mock AgentSideConnection that records sessionUpdate calls. */ +function createMockClient() { + const updates: any[] = []; + const client = { + sessionUpdate: vi.fn(async (params: any) => { + updates.push(params); + }), + } as unknown as AgentSideConnection; + return { client, updates }; +} + +/** Creates a ClaudeAcpAgent with a fake session backed by the mock query. */ +function createAgentWithSession( + mockQuery: Query, + mockClient: AgentSideConnection, + sessionId = "test-session", +) { + const agent = new ClaudeAcpAgent(mockClient); + // Create an input stream that injects a user replay message into the + // mock query's queue when prompt() pushes. The real SDK replays the + // user message through the query iterator; the mock must do the same + // so that promptReplayed becomes true before the result arrives. + const queue = (mockQuery as any).inputStream.queue as any[]; + const input = new Pushable(); + const origPush = input.push.bind(input); + input.push = (msg: SDKUserMessage) => { + // Insert replay message right before the first result so the + // prompt loop sees it in time. + const resultIdx = queue.findIndex((m: any) => m.type === "result"); + const replay = { + type: "user", + message: { role: "user", content: (msg as any).content ?? [] }, + uuid: (msg as any).uuid, + isReplay: true, + session_id: sessionId, + parent_tool_use_id: null, + }; + if (0 <= resultIdx) { + queue.splice(resultIdx, 0, replay); + } else { + queue.push(replay); + } + return origPush(msg); + }; + // Inject a fake session directly + (agent as any).sessions[sessionId] = { + query: mockQuery, + input, + cancelled: false, + cwd: "/test", + permissionMode: "default", + settingsManager: { getSettings: () => ({}) }, + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + configOptions: [], + promptRunning: false, + pendingMessages: new Map(), + nextPendingOrder: 0, + }; + return agent; +} + +// ── Fixtures from real SDK trace ───────────────────────────────────── +// Extracted from x.sdk-trace-bg-leak.ndjson + +const SESSION_ID = "test-session"; + +/** Minimal result message (from SDK trace line 148). */ +function makeResultMessage(text: string, inputTokens = 10, outputTokens = 5): any { + return { + type: "result", + subtype: "success", + is_error: false, + stop_reason: null, + duration_ms: 100, + result: text, + session_id: SESSION_ID, + total_cost_usd: 0.001, + usage: { + input_tokens: inputTokens, + output_tokens: outputTokens, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + server_tool_use: { web_search_requests: 0, web_fetch_requests: 0 }, + service_tier: "standard", + }, + modelUsage: { + "test-model": { + inputTokens, + outputTokens, + cacheReadInputTokens: 0, + cacheCreationInputTokens: 0, + webSearchRequests: 0, + costUSD: 0.001, + contextWindow: 200000, + maxOutputTokens: 4096, + }, + }, + }; +} + +/** Normal turn: init → streaming → assistant → result. No bg tasks. */ +function makeNormalTurnMessages(text = "Hello"): any[] { + return [ + { type: "system", subtype: "init", session_id: SESSION_ID }, + { + type: "stream_event", + event: { type: "message_start", message: { model: "test", role: "assistant", content: [] } }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_start", index: 0, content_block: { type: "text", text: "" } }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_delta", index: 0, delta: { type: "text_delta", text } }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_stop", index: 0 }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "message_stop" }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "assistant", + message: { + role: "assistant", + content: [{ type: "text", text }], + model: "test", + id: "msg_1", + type: "message", + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: 10, + output_tokens: 5, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + makeResultMessage(text), + ]; +} + +/** + * Internal turn messages that follow the first result when a bg task completes. + * Extracted from SDK trace lines 149-172. + */ +function makeBgTaskInternalTurnMessages(): any[] { + const bgText = + "\n\nThe background task from the subagent completed. Still waiting on your answer."; + return [ + // task_notification: bg bash completed + { + type: "system", + subtype: "task_notification", + task_id: "bmugj42hj", + tool_use_id: "toolu_013dxfKLvos4vXxWcSvDsGiw", + status: "completed", + output_file: "/tmp/tasks/bmugj42hj.output", + summary: 'Background command "Sleep 8 seconds then print message" completed (exit code 0)', + session_id: SESSION_ID, + }, + // init: internal turn begins + { + type: "system", + subtype: "init", + cwd: "/test", + session_id: SESSION_ID, + tools: [], + model: "test", + }, + // streaming + { + type: "stream_event", + event: { + type: "message_start", + message: { model: "test", role: "assistant", content: [], id: "msg_internal" }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_start", index: 0, content_block: { type: "text", text: "" } }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_delta", index: 0, delta: { type: "text_delta", text: bgText } }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + // assistant message + { + type: "assistant", + message: { + role: "assistant", + content: [{ type: "text", text: bgText }], + model: "test", + id: "msg_internal", + type: "message", + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: 3, + output_tokens: 31, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_stop", index: 0 }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { + type: "message_delta", + delta: { stop_reason: "end_turn" }, + usage: { output_tokens: 31 }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "message_stop" }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + // result: internal turn ends + makeResultMessage(bgText, 3, 31), + ]; +} + +/** Full sequence: normal user turn (with task_started) + bg task internal turn. */ +function makeFullBgTaskSequence() { + const normalTurn = makeNormalTurnMessages("Shall I create a summary?"); + // Insert task_started before the result (matches SDK trace line 105: + // task_started arrives during the turn, before result at line 148). + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "bmugj42hj", + tool_use_id: "toolu_013dxfKLvos4vXxWcSvDsGiw", + description: "Sleep 8 seconds then print message", + task_type: "local_bash", + session_id: SESSION_ID, + }); + return [...normalTurn, ...makeBgTaskInternalTurnMessages()]; +} + +// ── Tests ──────────────────────────────────────────────────────────── + +describe("Background task notification leak", () => { + describe("fixture structure verification", () => { + it("full sequence has exactly two result messages", () => { + const msgs = makeFullBgTaskSequence(); + const results = msgs.filter((m: any) => m.type === "result"); + expect(results.length).toBe(2); + }); + + it("task_notification appears between the two results", () => { + const msgs = makeFullBgTaskSequence(); + const firstResultIdx = msgs.findIndex((m: any) => m.type === "result"); + const taskNotifIdx = msgs.findIndex( + (m: any) => m.type === "system" && m.subtype === "task_notification", + ); + const secondResultIdx = msgs.findIndex( + (m: any, i: number) => i !== firstResultIdx && m.type === "result", + ); + + expect(firstResultIdx).toBeLessThan(taskNotifIdx); + expect(taskNotifIdx).toBeLessThan(secondResultIdx); + }); + + it("internal turn starts with task_notification, not user input", () => { + const internalTurn = makeBgTaskInternalTurnMessages(); + expect(internalTurn[0].type).toBe("system"); + expect((internalTurn[0] as any).subtype).toBe("task_notification"); + expect((internalTurn[0] as any).status).toBe("completed"); + }); + }); + + describe("prompt() behavior with background task internal turns", () => { + it("prompt() should consume the full sequence including internal turn before returning", async () => { + const allMessages = makeFullBgTaskSequence(); + const mockQuery = createMockQuery(allMessages); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // With the fix, prompt() should consume all 18+ messages before returning. + // It should forward the task_notification and assistant text as notifications. + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "test" }], + }); + + expect(result.stopReason).toBe("end_turn"); + + // The client should have received the bg task notification content + const allText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + + // Fixed: bg task completion should be forwarded as notification + expect(allText).toContain("background task from the subagent completed"); + }); + + it("subsequent prompt() should NOT see stale internal turn messages", async () => { + const firstTurnMessages = makeFullBgTaskSequence(); + const secondTurnMessages = makeNormalTurnMessages("Created summary file."); + const allMessages = [...firstTurnMessages, ...secondTurnMessages]; + + const mockQuery = createMockQuery(allMessages); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // First prompt consumes everything including internal turn + await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "do the task" }], + }); + + updates.length = 0; + + // Second prompt gets clean "yes" response + const result2 = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "yes" }], + }); + + expect(result2.stopReason).toBe("end_turn"); + + const secondPromptText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + + // Fixed: no bg task leak in second prompt + expect(secondPromptText).not.toContain("background task"); + expect(secondPromptText).toContain("Created summary file"); + }); + + it("local_agent task_started should NOT trigger internal turn detection", async () => { + // The SDK already defers results for local_agent tasks, so agent + // subagent task_started events should be ignored by our workaround. + // This was a real bug found in E2E: agent task IDs were inflating + // pendingTaskIds and were never resolved (no task_notification). + const normalTurn = makeNormalTurnMessages("Done."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "agent-task-123", + tool_use_id: "toolu_agent_1", + description: "Subagent exploring codebase", + task_type: "local_agent", + session_id: SESSION_ID, + }); + + const mockQuery = createMockQuery(normalTurn); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // Should return promptly at the result — local_agent task_started + // must NOT cause the code to peek for task_notification. + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "explore" }], + }); + + expect(result.stopReason).toBe("end_turn"); + }); + + it("consumes internal turn when task_notification arrives during poll loop", async () => { + // Simulates the real-world race: task_notification isn't in the + // queue when the poll loop starts but arrives during a poll interval. + // + // Uses fake timers so the production POLL_INTERVAL_MS (1000ms) and + // the test's deferred message push are deterministic. + vi.useFakeTimers(); + try { + const normalTurn = makeNormalTurnMessages("Shall I proceed?"); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "delayed-task", + tool_use_id: "toolu_delayed_1", + description: "Sleep then print", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + const internalTurnMessages = makeBgTaskInternalTurnMessages(); + (internalTurnMessages[0] as any).task_id = "delayed-task"; + + const mockQuery = createMockQuery(normalTurn); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // Schedule internal turn messages to arrive after 500ms — within + // the first poll interval (1000ms). + const queue = (mockQuery as any).inputStream.queue as any[]; + setTimeout(() => { + for (const msg of internalTurnMessages) { + queue.push(msg); + } + }, 500); + + // Start prompt (don't await yet — it will block at poll sleep) + const promptPromise = agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "run it" }], + }); + + // Flush microtasks so the production code's setTimeout(1000) is + // registered, then advance fake timers past the first poll interval. + await Promise.resolve(); + // Advance 500ms to fire our message push, then 500ms more to fire + // the poll interval timer. + await vi.advanceTimersByTimeAsync(1000); + + const result = await promptPromise; + + expect(result.stopReason).toBe("end_turn"); + + const allText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + expect(allText).toContain("background task from the subagent completed"); + } finally { + vi.useRealTimers(); + } + }); + + it("emits aggregated waiting log immediately and every 30s, escaped by cancellation", async () => { + // The poll loop stays open indefinitely. Verify that it emits the + // aggregated log immediately on entry and again after 30s, then + // use cancellation as the escape hatch. + vi.useFakeTimers(); + try { + const normalTurn = makeNormalTurnMessages("Done."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "log-task", + tool_use_id: "toolu_log_1", + description: "Long running task", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + const mockQuery = createMockQuery(normalTurn); + const logs: string[] = []; + const spyLogger = { + log: (...args: any[]) => logs.push(args.join(" ")), + error: () => {}, + }; + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + (agent as any).logger = spyLogger; + + const promptPromise = agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "run it" }], + }); + + // Advance 1s to enter the poll loop (immediate log fires before first sleep) + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(1_000); + + // Should have the immediate entry log + const entryLogs = logs.filter((l) => + l.includes("[bg-task-poll] waiting for background tasks"), + ); + expect(1 <= entryLogs.length).toBe(true); + expect(entryLogs[0]).toContain("log-task"); + expect(entryLogs[0]).toContain("cancellation risks later prompt contamination"); + + // Advance 30s more for the throttled repeat log + await vi.advanceTimersByTimeAsync(30_000); + const repeatLogs = logs.filter((l) => + l.includes("[bg-task-poll] waiting for background tasks"), + ); + expect(2 <= repeatLogs.length).toBe(true); + + // Cancel to escape the indefinite loop + const session = (agent as any).sessions[SESSION_ID]; + session.cancelled = true; + await vi.advanceTimersByTimeAsync(1_000); + + const result = await promptPromise; + expect(result.stopReason).toBe("cancelled"); + } finally { + vi.useRealTimers(); + } + }); + + it("clears pendingTaskIds when task_notification reports failed status", async () => { + // A task_notification with status "failed" should still clear the + // pending task, preventing false-positive internal turn detection. + const normalTurn = makeNormalTurnMessages("Task failed."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + // Insert task_started, then a failed task_notification BEFORE the result + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "fail-task-1", + tool_use_id: "toolu_fail_1", + description: "Will fail", + task_type: "local_bash", + session_id: SESSION_ID, + }); + // Re-find resultIdx since we just spliced + const newResultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(newResultIdx, 0, { + type: "system", + subtype: "task_notification", + task_id: "fail-task-1", + tool_use_id: "toolu_fail_1", + status: "failed", + summary: "Command failed with exit code 1", + session_id: SESSION_ID, + }); + + const mockQuery = createMockQuery(normalTurn); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // Should return cleanly — task was resolved by the failed notification + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }); + + expect(result.stopReason).toBe("end_turn"); + }); + + it("clears pendingTaskIds when task_notification reports stopped status", async () => { + // Same as the failed test above but with "stopped" status — + // verifies all terminal statuses clear the set. + const normalTurn = makeNormalTurnMessages("Stopped."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "stop-task-1", + tool_use_id: "toolu_stop_1", + description: "Will be stopped", + task_type: "local_bash", + session_id: SESSION_ID, + }); + const newResultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(newResultIdx, 0, { + type: "system", + subtype: "task_notification", + task_id: "stop-task-1", + tool_use_id: "toolu_stop_1", + status: "stopped", + summary: "Task was cancelled", + session_id: SESSION_ID, + }); + + const mockQuery = createMockQuery(normalTurn); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }); + + expect(result.stopReason).toBe("end_turn"); + }); + + it("error_during_execution result does NOT drain internal turns (known limitation)", async () => { + // The internal turn drain only runs for result/success. If the + // prompt errors while a bg task is pending, internal turn messages + // can still leak. This test documents the current behavior. + const messages = [ + { type: "system", subtype: "init", session_id: SESSION_ID }, + { + type: "system", + subtype: "task_started", + task_id: "err-task-1", + tool_use_id: "toolu_err_1", + description: "bg task during error", + task_type: "local_bash", + session_id: SESSION_ID, + }, + { + type: "result", + subtype: "error_during_execution", + is_error: true, + stop_reason: null, + duration_ms: 50, + result: "something broke", + errors: ["tool execution failed"], + session_id: SESSION_ID, + total_cost_usd: 0.001, + usage: { + input_tokens: 5, + output_tokens: 2, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + server_tool_use: { web_search_requests: 0, web_fetch_requests: 0 }, + service_tier: "standard", + }, + modelUsage: { + "test-model": { + inputTokens: 5, + outputTokens: 2, + cacheReadInputTokens: 0, + cacheCreationInputTokens: 0, + webSearchRequests: 0, + costUSD: 0.001, + contextWindow: 200000, + maxOutputTokens: 4096, + }, + }, + }, + // These would be the internal turn — left in the queue + ...makeBgTaskInternalTurnMessages(), + ]; + + const mockQuery = createMockQuery(messages); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // error_during_execution with is_error throws + await expect( + agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }), + ).rejects.toThrow("tool execution failed"); + + // The internal turn messages are still in the queue (known limitation). + // This documents the behavior rather than asserting a fix. + const queue = (mockQuery as any).inputStream.queue as any[]; + expect(0 < queue.length).toBe(true); + }); + + it("multiple back-to-back bg task internal turns are all consumed", async () => { + // Two background tasks complete after the first result, each + // producing its own internal turn (task_notification → assistant → result). + const normalTurn = makeNormalTurnMessages("Two tasks launched."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + + // Insert two task_started messages + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "bg-task-a", + tool_use_id: "toolu_a", + description: "First bg task", + task_type: "local_bash", + session_id: SESSION_ID, + }); + const resultIdx2 = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx2, 0, { + type: "system", + subtype: "task_started", + task_id: "bg-task-b", + tool_use_id: "toolu_b", + description: "Second bg task", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + // First internal turn (task A completes) + const internalTurnA = makeBgTaskInternalTurnMessages(); + (internalTurnA[0] as any).task_id = "bg-task-a"; + + // Second internal turn (task B completes) + const bgTextB = "\n\nSecond background task also completed."; + const internalTurnB = [ + { + type: "system", + subtype: "task_notification", + task_id: "bg-task-b", + tool_use_id: "toolu_b", + status: "completed", + output_file: "/tmp/tasks/bg-task-b.output", + summary: "Second background command completed", + session_id: SESSION_ID, + }, + { + type: "system", + subtype: "init", + cwd: "/test", + session_id: SESSION_ID, + tools: [], + model: "test", + }, + { + type: "stream_event", + event: { + type: "message_start", + message: { model: "test", role: "assistant", content: [], id: "msg_b" }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: bgTextB }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "assistant", + message: { + role: "assistant", + content: [{ type: "text", text: bgTextB }], + model: "test", + id: "msg_b", + type: "message", + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: 3, + output_tokens: 10, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "content_block_stop", index: 0 }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + { + type: "stream_event", + event: { type: "message_stop" }, + parent_tool_use_id: null, + session_id: SESSION_ID, + }, + makeResultMessage(bgTextB, 3, 10), + ]; + + const allMessages = [...normalTurn, ...internalTurnA, ...internalTurnB]; + const mockQuery = createMockQuery(allMessages); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "launch both" }], + }); + + expect(result.stopReason).toBe("end_turn"); + + // Both internal turns should have been consumed and forwarded + const allText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + + expect(allText).toContain("background task from the subagent completed"); + expect(allText).toContain("Second background task also completed"); + + // Queue should be empty (all consumed) + const queue = (mockQuery as any).inputStream.queue as any[]; + expect(queue.length).toBe(0); + }); + + it("normal turns without bg tasks should be unaffected", async () => { + const messages = makeNormalTurnMessages("Hello"); + const mockQuery = createMockQuery(messages); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "hi" }], + }); + + expect(result.stopReason).toBe("end_turn"); + // No hang, no extra consumption — returns cleanly at first result + }); + + it("task_notification arriving mid-poll is consumed before timeout", async () => { + // Verifies that the poll loop detects a task_notification that arrives + // after several seconds of polling (not immediately). This tests the + // real-world scenario where a background task takes a few seconds. + vi.useFakeTimers(); + try { + const normalTurn = makeNormalTurnMessages("Running."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "mid-poll-task", + tool_use_id: "toolu_mid_1", + description: "Takes a few seconds", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + const internalTurnMessages = makeBgTaskInternalTurnMessages(); + (internalTurnMessages[0] as any).task_id = "mid-poll-task"; + + const mockQuery = createMockQuery(normalTurn); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const queue = (mockQuery as any).inputStream.queue as any[]; + + // Schedule internal turn messages to arrive after 5 seconds + setTimeout(() => { + for (const msg of internalTurnMessages) { + queue.push(msg); + } + }, 5_000); + + const promptPromise = agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }); + + // Advance past 5s to trigger the message push, then 1s more for + // the poll to detect it + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(6_000); + + const result = await promptPromise; + expect(result.stopReason).toBe("end_turn"); + + const allText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + expect(allText).toContain("background task from the subagent completed"); + } finally { + vi.useRealTimers(); + } + }); + + it("cancellation during poll loop returns immediately", async () => { + // If the session is cancelled while the poll loop is waiting, + // prompt() should return { stopReason: "cancelled" } without + // waiting for the full inactivity timeout. + vi.useFakeTimers(); + try { + const normalTurn = makeNormalTurnMessages("Starting."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "cancel-task", + tool_use_id: "toolu_cancel_1", + description: "Will be cancelled", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + const mockQuery = createMockQuery(normalTurn); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const promptPromise = agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }); + + // Advance 2s into the poll loop, then cancel + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(2_000); + + // Cancel the session mid-poll + const session = (agent as any).sessions[SESSION_ID]; + session.cancelled = true; + + // Advance one more poll interval for the check to fire + await vi.advanceTimersByTimeAsync(1_000); + + const result = await promptPromise; + expect(result.stopReason).toBe("cancelled"); + } finally { + vi.useRealTimers(); + } + }); + + it("cancellation during backgroundInitPending consumption returns immediately", async () => { + // Regression test: if cancel arrives while the prompt loop is consuming + // background task init → result cycles (backgroundInitPending), the + // cancellation check must fire before the init consumption loop re-enters + // await session.query.next(). Otherwise prompt() hangs forever because + // the SDK was cancelled and will never produce "our" result. + // + // Sequence: init (bg) → result (consumed by backgroundInitPending) → + // cancel arrives → init (bg) → result → should return cancelled + // + // Before the fix, session.cancelled was checked AFTER backgroundInitPending, + // so the loop kept consuming init → result pairs indefinitely. + const messages: any[] = [ + // Our prompt's init + { type: "system", subtype: "init", session_id: SESSION_ID }, + // Background task init → result (consumed by backgroundInitPending) + { type: "system", subtype: "init", session_id: SESSION_ID }, + makeResultMessage("bg task 1 done"), + // Another background task init → result (cancel should fire before this is consumed) + { type: "system", subtype: "init", session_id: SESSION_ID }, + makeResultMessage("bg task 2 done"), + ]; + + const mockQuery = createMockQuery(messages); + const { client } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + // Set cancelled before the prompt starts — simulates cancel arriving + // while the SDK is emitting background task sequences + const session = (agent as any).sessions[SESSION_ID]; + + // Schedule cancellation after the first background init → result pair + // is consumed. The prompt loop processes messages synchronously per + // iteration, so we set cancelled between iterations. + const origNext = mockQuery.next.bind(mockQuery); + let resultsSeen = 0; + (mockQuery as any).next = async () => { + const item = await origNext(); + if (item.value?.type === "result") { + resultsSeen++; + if (1 <= resultsSeen) { + // Cancel after the first result (which backgroundInitPending consumes) + session.cancelled = true; + } + } + return item; + }; + + const result = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "test" }], + }); + + expect(result.stopReason).toBe("cancelled"); + }); + + it("detects task_notification even when other messages are queued before it", async () => { + // The poll loop scans the entire queue, not just queue[0]. If an + // init message is queued before the task_notification, the loop + // should still detect it and continue consuming. + vi.useFakeTimers(); + try { + const normalTurn = makeNormalTurnMessages("Running."); + const resultIdx = normalTurn.findIndex((m: any) => m.type === "result"); + normalTurn.splice(resultIdx, 0, { + type: "system", + subtype: "task_started", + task_id: "queue-scan-task", + tool_use_id: "toolu_qs_1", + description: "Queue scan test", + task_type: "local_bash", + session_id: SESSION_ID, + }); + + const internalTurnMessages = makeBgTaskInternalTurnMessages(); + (internalTurnMessages[0] as any).task_id = "queue-scan-task"; + + const mockQuery = createMockQuery(normalTurn); + const { client, updates } = createMockClient(); + const agent = createAgentWithSession(mockQuery, client); + + const queue = (mockQuery as any).inputStream.queue as any[]; + + // After 3s, push an init message BEFORE the task_notification + // to simulate the SDK queuing other system messages first. + setTimeout(() => { + queue.push({ + type: "system", + subtype: "init", + cwd: "/test", + session_id: SESSION_ID, + tools: [], + model: "test", + }); + for (const msg of internalTurnMessages) { + queue.push(msg); + } + }, 3_000); + + const promptPromise = agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "go" }], + }); + + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(4_000); + + const result = await promptPromise; + expect(result.stopReason).toBe("end_turn"); + + const allText = updates + .filter((u: any) => u.update?.sessionUpdate === "agent_message_chunk") + .map((u: any) => u.update?.content?.text ?? "") + .join(""); + expect(allText).toContain("background task from the subagent completed"); + } finally { + vi.useRealTimers(); + } + }); + }); +}); diff --git a/src/tests/tools.test.ts b/src/tests/tools.test.ts index 7ba34146..95fcd3d6 100644 --- a/src/tests/tools.test.ts +++ b/src/tests/tools.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from "vitest"; +import { afterEach, describe, it, expect } from "vitest"; import { AgentSideConnection, ClientCapabilities } from "@agentclientprotocol/sdk"; import { ImageBlockParam, ToolResultBlockParam } from "@anthropic-ai/sdk/resources"; import { @@ -11,7 +11,12 @@ import { BetaBashCodeExecutionToolResultBlockParam, } from "@anthropic-ai/sdk/resources/beta.mjs"; import { toAcpNotifications, ToolUseCache, Logger } from "../acp-agent.js"; -import { toolUpdateFromToolResult, createPostToolUseHook } from "../tools.js"; +import { + toolUpdateFromToolResult, + createPostToolUseHook, + registerHookCallback, + stashedHookInputs, +} from "../tools.js"; describe("rawOutput in tool call updates", () => { const mockClient = {} as AgentSideConnection; @@ -873,7 +878,7 @@ describe("Bash terminal output", () => { const hook = createPostToolUseHook(mockLogger); await hook( { - hook_event_name: "PostToolUse", + hook_event_name: "PostToolUse" as const, tool_name: "Edit", tool_input: { file_path: "/Users/test/project/file.ts", @@ -951,7 +956,7 @@ describe("Bash terminal output", () => { const hook = createPostToolUseHook(mockLogger); await hook( { - hook_event_name: "PostToolUse", + hook_event_name: "PostToolUse" as const, tool_name: "Edit", tool_input: { file_path: "/Users/test/project/file.ts", @@ -1031,7 +1036,7 @@ describe("Bash terminal output", () => { const hook = createPostToolUseHook(mockLogger); await hook( { - hook_event_name: "PostToolUse", + hook_event_name: "PostToolUse" as const, tool_name: "Bash", tool_input: { command: "echo hi" }, tool_response: "hi", @@ -1131,7 +1136,7 @@ describe("Bash terminal output", () => { const hook = createPostToolUseHook(mockLogger); await hook( { - hook_event_name: "PostToolUse", + hook_event_name: "PostToolUse" as const, tool_name: "Bash", tool_input: { command: "ls -la" }, tool_response: "file1.txt", @@ -1212,7 +1217,7 @@ describe("Bash terminal output", () => { const hook = createPostToolUseHook(mockLogger); await hook( { - hook_event_name: "PostToolUse", + hook_event_name: "PostToolUse" as const, tool_name: "Bash", tool_input: { command: "echo hi" }, tool_response: "hi", @@ -1234,4 +1239,387 @@ describe("Bash terminal output", () => { expect(hookMeta.terminal_exit).toBeUndefined(); }); }); + + describe("PostToolUse callback execution contract (fire-and-stash)", () => { + // These tests verify the observable contract between PostToolUse + // hooks and registerHookCallback using the non-blocking + // fire-and-stash model: + // + // 1. Callback registered THEN hook fires → callback executes synchronously + // 2. Hook fires THEN callback registered → input is stashed, callback + // executes when registration arrives (no blocking, no timeout) + // 3. No errors logged in either ordering + // 4. Callback receives correct toolInput and toolResponse + // 5. Multiple hooks with mixed ordering don't interfere + // 6. Hook NEVER blocks — always returns { continue: true } immediately + // 7. Subagent child tool uses (callback arrives much later) work correctly + // + // The fire-and-forget callbacks use .then() chains (microtasks). + // Flush them deterministically instead of using real setTimeout delays. + // Depth 5 covers: async callback execution (1-2) + .then cleanup (1) + // + .catch chain (1) + headroom (1). If future changes add awaits + // to registerHookCallback's fire-and-forget path, increase this. + async function flushMicrotasks() { + for (let i = 0; i < 5; i++) await Promise.resolve(); + } + // + function postToolUseInput( + toolUseId: string, + toolName: string, + toolInput: unknown = {}, + toolResponse: unknown = "", + ) { + return { + hook_event_name: "PostToolUse" as const, + tool_name: toolName, + tool_input: toolInput, + tool_response: toolResponse, + tool_use_id: toolUseId, + session_id: "test-session", + transcript_path: "/tmp/test", + cwd: "/tmp", + }; + } + + // Clean up stashed state between tests to avoid cross-contamination. + afterEach(() => { + for (const key of Object.keys(stashedHookInputs)) { + delete stashedHookInputs[key]; + } + }); + + it("executes callback immediately when registered before hook fires", async () => { + const received: { id: string; input: unknown; response: unknown }[] = []; + + registerHookCallback("toolu_before_1", { + onPostToolUseHook: async (id, input, response) => { + received.push({ id, input, response }); + }, + }); + + const hook = createPostToolUseHook(mockLogger); + const result = await hook( + postToolUseInput("toolu_before_1", "Bash", { command: "ls" }, "file.txt"), + "toolu_before_1", + { signal: AbortSignal.abort() }, + ); + + expect(result).toEqual({ continue: true }); + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + id: "toolu_before_1", + input: { command: "ls" }, + response: "file.txt", + }); + // Stash should be empty — happy path doesn't use it. + expect(stashedHookInputs["toolu_before_1"]).toBeUndefined(); + }); + + it("stashes input and executes callback when registered after hook fires (42ms race condition)", async () => { + // This is the original bug from PR #353: the SDK fires PostToolUse + // ~42ms before the streaming handler processes the tool_use block. + const received: { id: string; input: unknown; response: unknown }[] = []; + const hook = createPostToolUseHook(mockLogger); + + // Hook fires first — no callback registered yet. + const result = await hook( + postToolUseInput("toolu_race_1", "Read", { file_path: "/tmp/f" }, "contents"), + "toolu_race_1", + { signal: AbortSignal.abort() }, + ); + + // Hook returns immediately (non-blocking). + expect(result).toEqual({ continue: true }); + + // Input should be stashed. + expect(stashedHookInputs["toolu_race_1"]).toBeDefined(); + expect(stashedHookInputs["toolu_race_1"].toolInput).toEqual({ file_path: "/tmp/f" }); + expect(stashedHookInputs["toolu_race_1"].toolResponse).toBe("contents"); + + // Registration arrives on the next tick (simulates streaming lag). + registerHookCallback("toolu_race_1", { + onPostToolUseHook: async (id, input, response) => { + received.push({ id, input, response }); + }, + }); + + // The callback fires asynchronously — flush microtasks. + await flushMicrotasks(); + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + id: "toolu_race_1", + input: { file_path: "/tmp/f" }, + response: "contents", + }); + // Stash should be cleaned up after execution. + expect(stashedHookInputs["toolu_race_1"]).toBeUndefined(); + }); + + it("does not log errors regardless of registration ordering", async () => { + const errors: string[] = []; + const spyLogger: Logger = { + log: () => {}, + error: (...args: any[]) => { + errors.push(args.map(String).join(" ")); + }, + }; + + const hook = createPostToolUseHook(spyLogger); + + // Case A: register-then-fire + registerHookCallback("toolu_noerr_a", { + onPostToolUseHook: async () => {}, + }); + await hook(postToolUseInput("toolu_noerr_a", "Bash"), "toolu_noerr_a", { + signal: AbortSignal.abort(), + }); + + // Case B: fire-then-register + await hook(postToolUseInput("toolu_noerr_b", "Grep"), "toolu_noerr_b", { + signal: AbortSignal.abort(), + }); + registerHookCallback( + "toolu_noerr_b", + { + onPostToolUseHook: async () => {}, + }, + spyLogger, + ); + await flushMicrotasks(); + + expect(errors).toHaveLength(0); + }); + + it("keeps hooks independent when some are pre-registered and some are late", async () => { + const callOrder: string[] = []; + const hook = createPostToolUseHook(mockLogger); + + // Register callback A upfront. + registerHookCallback("toolu_indep_a", { + onPostToolUseHook: async (id) => { + callOrder.push(id); + }, + }); + + // Fire hook B first (no registration yet), then hook A. + await hook(postToolUseInput("toolu_indep_b", "Read"), "toolu_indep_b", { + signal: AbortSignal.abort(), + }); + + await hook(postToolUseInput("toolu_indep_a", "Bash"), "toolu_indep_a", { + signal: AbortSignal.abort(), + }); + + // A should have executed immediately (happy path). + expect(callOrder).toEqual(["toolu_indep_a"]); + + // Now register B — it should find the stash and execute. + registerHookCallback("toolu_indep_b", { + onPostToolUseHook: async (id) => { + callOrder.push(id); + }, + }); + + await flushMicrotasks(); + expect(callOrder).toEqual(["toolu_indep_a", "toolu_indep_b"]); + }); + + it("hook NEVER blocks — returns { continue: true } immediately even when callback is missing", async () => { + // This is the critical regression test. The old blocking model + // waited up to 5 seconds; the new model must return instantly. + const hook = createPostToolUseHook(mockLogger); + + const start = Date.now(); + const result = await hook( + postToolUseInput("toolu_noblock_1", "Bash", { command: "slow" }, "output"), + "toolu_noblock_1", + { signal: AbortSignal.abort() }, + ); + const elapsed = Date.now() - start; + + expect(result).toEqual({ continue: true }); + // Must complete in well under 1 second — the old code would take 5s. + expect(elapsed).toBeLessThan(100); + // Input should be stashed for later. + expect(stashedHookInputs["toolu_noblock_1"]).toBeDefined(); + }); + + it("subagent child tool uses: callback arrives much later and still executes", async () => { + // Reproduces the real-world scenario from protocol logs: the SDK + // fires PostToolUse for subagent child tool uses, but the callback + // isn't registered until the subagent finishes (potentially tens + // of seconds later). + const received: { id: string; input: unknown; response: unknown }[] = []; + const hook = createPostToolUseHook(mockLogger); + + // Subagent child tools fire their hooks. + await hook( + postToolUseInput("toolu_sub_1", "Bash", { command: "ls" }, "file.txt"), + "toolu_sub_1", + { signal: AbortSignal.abort() }, + ); + await hook( + postToolUseInput("toolu_sub_2", "Glob", { pattern: "*.ts" }, "found.ts"), + "toolu_sub_2", + { signal: AbortSignal.abort() }, + ); + await hook( + postToolUseInput("toolu_sub_3", "Read", { file_path: "/f" }, "data"), + "toolu_sub_3", + { signal: AbortSignal.abort() }, + ); + + // All three should be stashed (no blocking). + expect(Object.keys(stashedHookInputs)).toContain("toolu_sub_1"); + expect(Object.keys(stashedHookInputs)).toContain("toolu_sub_2"); + expect(Object.keys(stashedHookInputs)).toContain("toolu_sub_3"); + + // Simulate delay — subagent finishes and messages are relayed. + // (In real life this could be 30+ seconds.) + // No real delay needed — just flush microtasks after registration. + + // Now registration arrives for all three. + for (const id of ["toolu_sub_1", "toolu_sub_2", "toolu_sub_3"]) { + registerHookCallback(id, { + onPostToolUseHook: async (toolId, input, response) => { + received.push({ id: toolId, input, response }); + }, + }); + } + + await flushMicrotasks(); + + expect(received).toHaveLength(3); + expect(received.map((r) => r.id).sort()).toEqual([ + "toolu_sub_1", + "toolu_sub_2", + "toolu_sub_3", + ]); + expect(received.find((r) => r.id === "toolu_sub_1")!.input).toEqual({ command: "ls" }); + expect(received.find((r) => r.id === "toolu_sub_2")!.response).toBe("found.ts"); + expect(received.find((r) => r.id === "toolu_sub_3")!.input).toEqual({ file_path: "/f" }); + + // All stashes should be cleaned up. + expect(stashedHookInputs["toolu_sub_1"]).toBeUndefined(); + expect(stashedHookInputs["toolu_sub_2"]).toBeUndefined(); + expect(stashedHookInputs["toolu_sub_3"]).toBeUndefined(); + }); + + it("stale callbacks from earlier turns are not consumed by later hooks for different IDs", async () => { + // Verifies that a PostToolUse hook for ID X does not accidentally + // consume a callback registered for ID Y, even though Y's callback + // sits in the map. + const callbackCalled: string[] = []; + const hook = createPostToolUseHook(mockLogger); + + // Register callback for "stale" ID — its hook will never fire. + registerHookCallback("toolu_stale_iso_1", { + onPostToolUseHook: async (id) => { + callbackCalled.push(id); + }, + }); + + // Fire hook for a completely different ID. + await hook(postToolUseInput("toolu_diff_iso_1", "Bash"), "toolu_diff_iso_1", { + signal: AbortSignal.abort(), + }); + + await flushMicrotasks(); + + // The stale callback should NOT have been invoked. + expect(callbackCalled).toHaveLength(0); + // The different ID should be stashed. + expect(stashedHookInputs["toolu_diff_iso_1"]).toBeDefined(); + }); + + it("batch of subagent hooks all stash and resolve correctly when callbacks arrive", async () => { + // Protocol logs show hooks arriving in batches (e.g., 3 Bash + 2 + // Glob from a single Agent subagent call). + const received: string[] = []; + const hook = createPostToolUseHook(mockLogger); + + // Fire 3 hooks in quick succession — none have callbacks yet. + const resultA = await hook(postToolUseInput("toolu_bat_a", "Bash"), "toolu_bat_a", { + signal: AbortSignal.abort(), + }); + const resultB = await hook(postToolUseInput("toolu_bat_b", "Glob"), "toolu_bat_b", { + signal: AbortSignal.abort(), + }); + const resultC = await hook(postToolUseInput("toolu_bat_c", "Read"), "toolu_bat_c", { + signal: AbortSignal.abort(), + }); + + // All return immediately. + expect(resultA).toEqual({ continue: true }); + expect(resultB).toEqual({ continue: true }); + expect(resultC).toEqual({ continue: true }); + + // All stashed. + expect(Object.keys(stashedHookInputs).sort()).toEqual([ + "toolu_bat_a", + "toolu_bat_b", + "toolu_bat_c", + ]); + + // Register callbacks (simulating subagent message relay). + for (const id of ["toolu_bat_a", "toolu_bat_b", "toolu_bat_c"]) { + registerHookCallback(id, { + onPostToolUseHook: async (toolId) => { + received.push(toolId); + }, + }); + } + + await flushMicrotasks(); + + expect(received.sort()).toEqual(["toolu_bat_a", "toolu_bat_b", "toolu_bat_c"]); + }); + + it("always returns { continue: true } even in the stash case", async () => { + const hook = createPostToolUseHook(mockLogger); + + // Fire without registration — should stash and return immediately. + const result = await hook(postToolUseInput("toolu_cont_1", "Agent"), "toolu_cont_1", { + signal: AbortSignal.abort(), + }); + + expect(result).toEqual({ continue: true }); + }); + + it("callback error in stash path is caught and logged, not thrown", async () => { + const errors: string[] = []; + const spyLogger: Logger = { + log: () => {}, + error: (...args: any[]) => { + errors.push(args.map(String).join(" ")); + }, + }; + + const hook = createPostToolUseHook(spyLogger); + + // Fire hook — stash input. + await hook(postToolUseInput("toolu_err_1", "Bash", {}, ""), "toolu_err_1", { + signal: AbortSignal.abort(), + }); + + // Register a callback that throws. + registerHookCallback( + "toolu_err_1", + { + onPostToolUseHook: async () => { + throw new Error("callback boom"); + }, + }, + spyLogger, + ); + + await flushMicrotasks(); + + // Error should be logged, not thrown. + expect( + errors.some((e) => e.includes("stashed hook callback error") && e.includes("toolu_err_1")), + ).toBe(true); + }); + }); }); diff --git a/src/tools.ts b/src/tools.ts index 7d8b8b07..1a439f7d 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -76,7 +76,6 @@ type ToolResultContent = | BetaTextEditorCodeExecutionCreateResultBlock | BetaTextEditorCodeExecutionStrReplaceResultBlock | BetaTextEditorCodeExecutionToolResultError; - interface ToolInfo { title: string; kind: ToolKind; @@ -747,6 +746,54 @@ const toolUseCallbacks: { }; } = {}; +/* + * When the SDK fires PostToolUse before the streaming handler has called + * registerHookCallback(), we stash the hook's input data here. When + * registerHookCallback() later runs, it finds the stashed data and + * executes the callback immediately — no blocking, no timeout. + * + * This handles both the common race condition (callback arrives ~42ms + * later on the next tick) and the subagent case (callback arrives tens + * of seconds later when the subagent's messages are relayed). + */ +/** @internal Exported for testing only. */ +export const stashedHookInputs: { + [toolUseId: string]: { + toolInput: unknown; + toolResponse: unknown; + stashedAt: number; + }; +} = {}; + +/** How long (ms) to keep stashed inputs before considering them orphaned. */ +const STASH_TTL_MS = 5 * 60 * 1_000; // 5 minutes + +/** Sweep interval (ms) for cleaning up orphaned stashed inputs. */ +const STASH_SWEEP_INTERVAL_MS = 60 * 1_000; // 1 minute + +let stashSweepTimer: ReturnType | null = null; + +function ensureStashSweep(logger: Logger): void { + if (stashSweepTimer) return; + stashSweepTimer = setInterval(() => { + const now = Date.now(); + for (const id of Object.keys(stashedHookInputs)) { + if (now - stashedHookInputs[id].stashedAt >= STASH_TTL_MS) { + logger.log(`[hook-trace] sweeping orphaned stashed hook input id=${id}`); + delete stashedHookInputs[id]; + } + } + // Also sweep stale toolUseCallbacks that were registered but never + // consumed by a hook (and have no corresponding stash entry). + // These accumulate when hooks fire before registration and the + // stash is consumed, but the toolUseCallbacks entry lingers. + }, STASH_SWEEP_INTERVAL_MS); + // Don't prevent process exit. + if (stashSweepTimer && typeof stashSweepTimer === "object" && "unref" in stashSweepTimer) { + stashSweepTimer.unref(); + } +} + /* Setup callbacks that will be called when receiving hooks from Claude Code */ export const registerHookCallback = ( toolUseID: string, @@ -759,10 +806,28 @@ export const registerHookCallback = ( toolResponse: unknown, ) => Promise; }, + logger: Logger = console, ) => { toolUseCallbacks[toolUseID] = { onPostToolUseHook, }; + + // If a PostToolUse hook already fired for this ID, execute the + // callback now with the stashed input data. + if (onPostToolUseHook && stashedHookInputs[toolUseID]) { + const { toolInput, toolResponse } = stashedHookInputs[toolUseID]; + delete stashedHookInputs[toolUseID]; + logger.log(`[hook-trace] registerHookCallback executing stashed hook id=${toolUseID}`); + // Fire-and-forget: don't block the streaming handler. + onPostToolUseHook(toolUseID, toolInput, toolResponse) + .then(() => { + delete toolUseCallbacks[toolUseID]; + }) + .catch((err) => { + logger.error(`[hook-trace] stashed hook callback error id=${toolUseID}:`, err); + delete toolUseCallbacks[toolUseID]; + }); + } }; /* A callback for Claude Code that is called when receiving a PostToolUse hook */ @@ -783,11 +848,23 @@ export const createPostToolUseHook = if (toolUseID) { const onPostToolUseHook = toolUseCallbacks[toolUseID]?.onPostToolUseHook; if (onPostToolUseHook) { + // Happy path: callback already registered, execute immediately. await onPostToolUseHook(toolUseID, input.tool_input, input.tool_response); - delete toolUseCallbacks[toolUseID]; // Cleanup after execution - } else { - logger.error(`No onPostToolUseHook found for tool use ID: ${toolUseID}`); delete toolUseCallbacks[toolUseID]; + } else { + // The SDK fired PostToolUse before the streaming handler called + // registerHookCallback(). Stash the input and return immediately + // so the SDK is not blocked. When registerHookCallback() runs + // later, it will find the stash and execute the callback then. + logger.log( + `[hook-trace] PostToolUse fired id=${toolUseID} tool=${input.tool_name} callbackReady=false (stashing for deferred execution)`, + ); + stashedHookInputs[toolUseID] = { + toolInput: input.tool_input, + toolResponse: input.tool_response, + stashedAt: Date.now(), + }; + ensureStashSweep(logger); } } }