Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/acp-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,23 @@ export class ClaudeAcpAgent implements Agent {
const { query, input } = this.sessions[params.sessionId];

input.push(promptToClaude(params));

// Background task completions from between prompts generate their own
// system/task_notification, system/init, stream_events, assistant/user
// messages, and result messages BEFORE the real response starts.
//
// The message sequence for a background task completion is:
// task_notification → init → [streaming turn] → result
//
// The real prompt response starts with:
// init → [streaming turn] → result
//
// We distinguish them: an init preceded by task_notification starts a
// background turn; an init NOT preceded by task_notification starts the
// real response. We drain all background turns before processing.
let lastWasTaskNotification = false;
let backgroundTurnInProgress = false;

while (true) {
const { value: message, done } = await (query as AsyncGenerator<SDKMessageTemp, void>).next();

Expand All @@ -446,10 +463,18 @@ export class ClaudeAcpAgent implements Agent {
}
switch (message.subtype) {
case "init":
if (lastWasTaskNotification) {
backgroundTurnInProgress = true;
lastWasTaskNotification = false;
} else {
backgroundTurnInProgress = false;
}
break;
case "task_notification":
lastWasTaskNotification = true;
break;
case "compact_boundary":
case "hook_started":
case "task_notification":
case "hook_progress":
case "hook_response":
case "status":
Expand All @@ -468,6 +493,12 @@ export class ClaudeAcpAgent implements Agent {
return { stopReason: "cancelled" };
}

if (backgroundTurnInProgress) {
// This result ends a background task processing turn.
backgroundTurnInProgress = false;
break;
}

// Accumulate usage from this result
const session = this.sessions[params.sessionId];
session.accumulatedUsage.inputTokens += message.usage.input_tokens;
Expand Down Expand Up @@ -544,6 +575,9 @@ export class ClaudeAcpAgent implements Agent {
break;
}
case "stream_event": {
if (backgroundTurnInProgress) {
break;
}
for (const notification of streamEventToAcpNotifications(
message,
params.sessionId,
Expand All @@ -561,6 +595,9 @@ export class ClaudeAcpAgent implements Agent {
if (this.sessions[params.sessionId].cancelled) {
break;
}
if (backgroundTurnInProgress) {
break;
}

// Store latest assistant usage (excluding subagents)
if ((message.message as any).usage && message.parent_tool_use_id === null) {
Expand Down
250 changes: 250 additions & 0 deletions src/tests/background-task-drain.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { AgentSideConnection, SessionNotification } from "@agentclientprotocol/sdk";
import type { ClaudeAcpAgent as ClaudeAcpAgentType } from "../acp-agent.js";

vi.mock("../tools.js", async () => {
const actual = await vi.importActual<typeof import("../tools.js")>("../tools.js");
return {
...actual,
registerHookCallback: vi.fn(),
};
});

const SESSION_ID = "test-session-id";
const UUID = "00000000-0000-0000-0000-000000000000";

// Minimal messages matching SDK types
function taskNotification(taskId: string) {
return {
type: "system" as const,
subtype: "task_notification" as const,
task_id: taskId,
status: "completed" as const,
output_file: `/tmp/tasks/${taskId}.output`,
summary: `Background task ${taskId} completed`,
uuid: UUID,
session_id: SESSION_ID,
};
}

function initMessage() {
return {
type: "system" as const,
subtype: "init" as const,
apiKeySource: "api_key" as const,
claude_code_version: "1.0.0",
cwd: "/tmp",
tools: [],
mcp_servers: [],
model: "claude-haiku-4-5",
permissionMode: "default" as const,
slash_commands: [],
output_style: "text",
skills: [],
plugins: [],
uuid: UUID,
session_id: SESSION_ID,
};
}

function streamEvent(text: string) {
return {
type: "stream_event" as const,
event: {
type: "content_block_delta" as const,
index: 0,
delta: { type: "text_delta" as const, text },
},
parent_tool_use_id: null,
uuid: UUID,
session_id: SESSION_ID,
};
}

function assistantMessage(text: string) {
return {
type: "assistant" as const,
message: {
content: [{ type: "text" as const, text }],
model: "claude-haiku-4-5",
},
parent_tool_use_id: null,
uuid: UUID,
session_id: SESSION_ID,
};
}

function resultSuccess(text: string) {
return {
type: "result" as const,
subtype: "success" as const,
duration_ms: 100,
duration_api_ms: 50,
is_error: false,
num_turns: 1,
result: text,
stop_reason: "end_turn",
total_cost_usd: 0.001,
usage: {
input_tokens: 10,
output_tokens: 5,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
},
modelUsage: {
"claude-haiku-4-5": {
inputTokens: 10,
outputTokens: 5,
cacheReadTokens: 0,
cacheWriteTokens: 0,
contextWindow: 200000,
},
},
permission_denials: [],
uuid: UUID,
session_id: SESSION_ID,
};
}

describe("background task drain", () => {
let agent: ClaudeAcpAgentType;
let ClaudeAcpAgent: typeof ClaudeAcpAgentType;
let sessionUpdates: SessionNotification[];

function createMockClient(): AgentSideConnection {
return {
sessionUpdate: async (notification: SessionNotification) => {
sessionUpdates.push(notification);
},
requestPermission: async () => ({ outcome: { outcome: "cancelled" } }),
readTextFile: async () => ({ content: "" }),
writeTextFile: async () => ({}),
} as unknown as AgentSideConnection;
}

beforeEach(async () => {
sessionUpdates = [];
vi.resetModules();
const acpAgent = await import("../acp-agent.js");
ClaudeAcpAgent = acpAgent.ClaudeAcpAgent;
agent = new ClaudeAcpAgent(createMockClient());
});

function populateSession(generatorMessages: unknown[]) {
const input = { push: vi.fn() };
const query = (async function* () {
for (const msg of generatorMessages) {
yield msg;
}
})();

(agent as unknown as { sessions: Record<string, unknown> }).sessions[SESSION_ID] = {
query,
input,
cancelled: false,
accumulatedUsage: {
inputTokens: 0,
outputTokens: 0,
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
};
}

it("skips background task turn and processes real response", async () => {
// Generator yields: background task turn, then real response
populateSession([
// Background task turn
taskNotification("bg-task-1"),
initMessage(),
streamEvent("background output"),
assistantMessage("background output"),
resultSuccess("background output"),
// Real response
initMessage(),
streamEvent("HELLO"),
assistantMessage("HELLO"),
resultSuccess("HELLO"),
]);

const response = await agent.prompt({
sessionId: SESSION_ID,
prompt: [{ type: "text", text: "say hello" }],
});

expect(response.stopReason).toBe("end_turn");

// Only the real response's stream events should have been sent
const textChunks = sessionUpdates
.filter((n) => n.update.sessionUpdate === "agent_message_chunk")
.map((n) => (n.update as any).content?.text)
.filter(Boolean);

expect(textChunks).not.toContain("background output");
expect(textChunks.join("")).toContain("HELLO");
});

it("drains multiple background task turns", async () => {
populateSession([
// Background task 1
taskNotification("bg-1"),
initMessage(),
streamEvent("task 1 done"),
resultSuccess("task 1 done"),
// Background task 2
taskNotification("bg-2"),
initMessage(),
streamEvent("task 2 done"),
resultSuccess("task 2 done"),
// Background task 3
taskNotification("bg-3"),
initMessage(),
streamEvent("task 3 done"),
resultSuccess("task 3 done"),
// Real response
initMessage(),
streamEvent("HELLO"),
resultSuccess("HELLO"),
]);

const response = await agent.prompt({
sessionId: SESSION_ID,
prompt: [{ type: "text", text: "say hello" }],
});

expect(response.stopReason).toBe("end_turn");

const textChunks = sessionUpdates
.filter((n) => n.update.sessionUpdate === "agent_message_chunk")
.map((n) => (n.update as any).content?.text)
.filter(Boolean);

expect(textChunks).not.toContain("task 1 done");
expect(textChunks).not.toContain("task 2 done");
expect(textChunks).not.toContain("task 3 done");
expect(textChunks.join("")).toContain("HELLO");
});

it("works normally when no background tasks are pending", async () => {
populateSession([
// Just a normal response, no background tasks
initMessage(),
streamEvent("HELLO"),
resultSuccess("HELLO"),
]);

const response = await agent.prompt({
sessionId: SESSION_ID,
prompt: [{ type: "text", text: "say hello" }],
});

expect(response.stopReason).toBe("end_turn");

const textChunks = sessionUpdates
.filter((n) => n.update.sessionUpdate === "agent_message_chunk")
.map((n) => (n.update as any).content?.text)
.filter(Boolean);

expect(textChunks.join("")).toContain("HELLO");
});
});
8 changes: 6 additions & 2 deletions src/tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ export const registerHookCallback = (
/* A callback for Claude Code that is called when receiving a PostToolUse hook */
export const createPostToolUseHook =
(
logger: Logger = console,
_logger: Logger = console,
options?: {
onEnterPlanMode?: () => Promise<void>;
},
Expand All @@ -766,7 +766,11 @@ export const createPostToolUseHook =
await onPostToolUseHook(toolUseID, input.tool_input, input.tool_response);
delete toolUseCallbacks[toolUseID]; // Cleanup after execution
} else {
logger.error(`No onPostToolUseHook found for tool use ID: ${toolUseID}`);
// No callback registered — expected for subagent tools (the SDK
// fires PostToolUse during subagent execution before we see the
// tool_use block) and server_tool_use (API-side tools like
// WebSearch). Silently ignore: logging here goes to stderr,
// which acp.el surfaces as user-visible "Notices".
delete toolUseCallbacks[toolUseID];
}
}
Expand Down