diff --git a/src/acp-agent.ts b/src/acp-agent.ts index b5507de9..60f0cd23 100644 --- a/src/acp-agent.ts +++ b/src/acp-agent.ts @@ -427,6 +427,23 @@ export class ClaudeAcpAgent implements Agent { const { query, input } = this.sessions[params.sessionId]; input.push(promptToClaude(params)); + + // Background task completions from between prompts generate their own + // system/task_notification, system/init, stream_events, assistant/user + // messages, and result messages BEFORE the real response starts. + // + // The message sequence for a background task completion is: + // task_notification → init → [streaming turn] → result + // + // The real prompt response starts with: + // init → [streaming turn] → result + // + // We distinguish them: an init preceded by task_notification starts a + // background turn; an init NOT preceded by task_notification starts the + // real response. We drain all background turns before processing. + let lastWasTaskNotification = false; + let backgroundTurnInProgress = false; + while (true) { const { value: message, done } = await (query as AsyncGenerator).next(); @@ -446,10 +463,18 @@ export class ClaudeAcpAgent implements Agent { } switch (message.subtype) { case "init": + if (lastWasTaskNotification) { + backgroundTurnInProgress = true; + lastWasTaskNotification = false; + } else { + backgroundTurnInProgress = false; + } + break; + case "task_notification": + lastWasTaskNotification = true; break; case "compact_boundary": case "hook_started": - case "task_notification": case "hook_progress": case "hook_response": case "status": @@ -468,6 +493,12 @@ export class ClaudeAcpAgent implements Agent { return { stopReason: "cancelled" }; } + if (backgroundTurnInProgress) { + // This result ends a background task processing turn. + backgroundTurnInProgress = false; + break; + } + // Accumulate usage from this result const session = this.sessions[params.sessionId]; session.accumulatedUsage.inputTokens += message.usage.input_tokens; @@ -544,6 +575,9 @@ export class ClaudeAcpAgent implements Agent { break; } case "stream_event": { + if (backgroundTurnInProgress) { + break; + } for (const notification of streamEventToAcpNotifications( message, params.sessionId, @@ -561,6 +595,9 @@ export class ClaudeAcpAgent implements Agent { if (this.sessions[params.sessionId].cancelled) { break; } + if (backgroundTurnInProgress) { + break; + } // Store latest assistant usage (excluding subagents) if ((message.message as any).usage && message.parent_tool_use_id === null) { diff --git a/src/tests/background-task-drain.test.ts b/src/tests/background-task-drain.test.ts new file mode 100644 index 00000000..59768eae --- /dev/null +++ b/src/tests/background-task-drain.test.ts @@ -0,0 +1,250 @@ +import { describe, it, expect, beforeEach, vi } from "vitest"; +import { AgentSideConnection, SessionNotification } from "@agentclientprotocol/sdk"; +import type { ClaudeAcpAgent as ClaudeAcpAgentType } from "../acp-agent.js"; + +vi.mock("../tools.js", async () => { + const actual = await vi.importActual("../tools.js"); + return { + ...actual, + registerHookCallback: vi.fn(), + }; +}); + +const SESSION_ID = "test-session-id"; +const UUID = "00000000-0000-0000-0000-000000000000"; + +// Minimal messages matching SDK types +function taskNotification(taskId: string) { + return { + type: "system" as const, + subtype: "task_notification" as const, + task_id: taskId, + status: "completed" as const, + output_file: `/tmp/tasks/${taskId}.output`, + summary: `Background task ${taskId} completed`, + uuid: UUID, + session_id: SESSION_ID, + }; +} + +function initMessage() { + return { + type: "system" as const, + subtype: "init" as const, + apiKeySource: "api_key" as const, + claude_code_version: "1.0.0", + cwd: "/tmp", + tools: [], + mcp_servers: [], + model: "claude-haiku-4-5", + permissionMode: "default" as const, + slash_commands: [], + output_style: "text", + skills: [], + plugins: [], + uuid: UUID, + session_id: SESSION_ID, + }; +} + +function streamEvent(text: string) { + return { + type: "stream_event" as const, + event: { + type: "content_block_delta" as const, + index: 0, + delta: { type: "text_delta" as const, text }, + }, + parent_tool_use_id: null, + uuid: UUID, + session_id: SESSION_ID, + }; +} + +function assistantMessage(text: string) { + return { + type: "assistant" as const, + message: { + content: [{ type: "text" as const, text }], + model: "claude-haiku-4-5", + }, + parent_tool_use_id: null, + uuid: UUID, + session_id: SESSION_ID, + }; +} + +function resultSuccess(text: string) { + return { + type: "result" as const, + subtype: "success" as const, + duration_ms: 100, + duration_api_ms: 50, + is_error: false, + num_turns: 1, + result: text, + stop_reason: "end_turn", + total_cost_usd: 0.001, + usage: { + input_tokens: 10, + output_tokens: 5, + cache_read_input_tokens: 0, + cache_creation_input_tokens: 0, + }, + modelUsage: { + "claude-haiku-4-5": { + inputTokens: 10, + outputTokens: 5, + cacheReadTokens: 0, + cacheWriteTokens: 0, + contextWindow: 200000, + }, + }, + permission_denials: [], + uuid: UUID, + session_id: SESSION_ID, + }; +} + +describe("background task drain", () => { + let agent: ClaudeAcpAgentType; + let ClaudeAcpAgent: typeof ClaudeAcpAgentType; + let sessionUpdates: SessionNotification[]; + + function createMockClient(): AgentSideConnection { + return { + sessionUpdate: async (notification: SessionNotification) => { + sessionUpdates.push(notification); + }, + requestPermission: async () => ({ outcome: { outcome: "cancelled" } }), + readTextFile: async () => ({ content: "" }), + writeTextFile: async () => ({}), + } as unknown as AgentSideConnection; + } + + beforeEach(async () => { + sessionUpdates = []; + vi.resetModules(); + const acpAgent = await import("../acp-agent.js"); + ClaudeAcpAgent = acpAgent.ClaudeAcpAgent; + agent = new ClaudeAcpAgent(createMockClient()); + }); + + function populateSession(generatorMessages: unknown[]) { + const input = { push: vi.fn() }; + const query = (async function* () { + for (const msg of generatorMessages) { + yield msg; + } + })(); + + (agent as unknown as { sessions: Record }).sessions[SESSION_ID] = { + query, + input, + cancelled: false, + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + }; + } + + it("skips background task turn and processes real response", async () => { + // Generator yields: background task turn, then real response + populateSession([ + // Background task turn + taskNotification("bg-task-1"), + initMessage(), + streamEvent("background output"), + assistantMessage("background output"), + resultSuccess("background output"), + // Real response + initMessage(), + streamEvent("HELLO"), + assistantMessage("HELLO"), + resultSuccess("HELLO"), + ]); + + const response = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "say hello" }], + }); + + expect(response.stopReason).toBe("end_turn"); + + // Only the real response's stream events should have been sent + const textChunks = sessionUpdates + .filter((n) => n.update.sessionUpdate === "agent_message_chunk") + .map((n) => (n.update as any).content?.text) + .filter(Boolean); + + expect(textChunks).not.toContain("background output"); + expect(textChunks.join("")).toContain("HELLO"); + }); + + it("drains multiple background task turns", async () => { + populateSession([ + // Background task 1 + taskNotification("bg-1"), + initMessage(), + streamEvent("task 1 done"), + resultSuccess("task 1 done"), + // Background task 2 + taskNotification("bg-2"), + initMessage(), + streamEvent("task 2 done"), + resultSuccess("task 2 done"), + // Background task 3 + taskNotification("bg-3"), + initMessage(), + streamEvent("task 3 done"), + resultSuccess("task 3 done"), + // Real response + initMessage(), + streamEvent("HELLO"), + resultSuccess("HELLO"), + ]); + + const response = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "say hello" }], + }); + + expect(response.stopReason).toBe("end_turn"); + + const textChunks = sessionUpdates + .filter((n) => n.update.sessionUpdate === "agent_message_chunk") + .map((n) => (n.update as any).content?.text) + .filter(Boolean); + + expect(textChunks).not.toContain("task 1 done"); + expect(textChunks).not.toContain("task 2 done"); + expect(textChunks).not.toContain("task 3 done"); + expect(textChunks.join("")).toContain("HELLO"); + }); + + it("works normally when no background tasks are pending", async () => { + populateSession([ + // Just a normal response, no background tasks + initMessage(), + streamEvent("HELLO"), + resultSuccess("HELLO"), + ]); + + const response = await agent.prompt({ + sessionId: SESSION_ID, + prompt: [{ type: "text", text: "say hello" }], + }); + + expect(response.stopReason).toBe("end_turn"); + + const textChunks = sessionUpdates + .filter((n) => n.update.sessionUpdate === "agent_message_chunk") + .map((n) => (n.update as any).content?.text) + .filter(Boolean); + + expect(textChunks.join("")).toContain("HELLO"); + }); +}); diff --git a/src/tools.ts b/src/tools.ts index 83200bb1..78d3449a 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -748,7 +748,7 @@ export const registerHookCallback = ( /* A callback for Claude Code that is called when receiving a PostToolUse hook */ export const createPostToolUseHook = ( - logger: Logger = console, + _logger: Logger = console, options?: { onEnterPlanMode?: () => Promise; }, @@ -766,7 +766,11 @@ export const createPostToolUseHook = await onPostToolUseHook(toolUseID, input.tool_input, input.tool_response); delete toolUseCallbacks[toolUseID]; // Cleanup after execution } else { - logger.error(`No onPostToolUseHook found for tool use ID: ${toolUseID}`); + // No callback registered — expected for subagent tools (the SDK + // fires PostToolUse during subagent execution before we see the + // tool_use block) and server_tool_use (API-side tools like + // WebSearch). Silently ignore: logging here goes to stderr, + // which acp.el surfaces as user-visible "Notices". delete toolUseCallbacks[toolUseID]; } }