From eb56701996b9e7ff1edd2e9333eb79048c875b2f Mon Sep 17 00:00:00 2001 From: popododo0720 Date: Thu, 12 Feb 2026 11:38:11 +0900 Subject: [PATCH] fix: reduce session.messages() calls with event-based caching to prevent memory leaks - Replace session.messages() fetch in context-window-monitor with message.updated event cache - Replace session.messages() fetch in preemptive-compaction with message.updated event cache - Add per-session transcript cache (5min TTL) to avoid full rebuild per tool call - Remove session.messages() from background-agent polling (use event-based progress) - Add TTL pruning to todo-continuation-enforcer session state Map - Add setInterval.unref() to tool-input-cache cleanup timer Fixes #1222 --- src/features/background-agent/manager.ts | 96 +------- .../claude-code-hooks/tool-input-cache.ts | 6 +- .../claude-code-hooks/transcript.test.ts | 102 ++++++++ src/hooks/claude-code-hooks/transcript.ts | 219 +++++++++-------- src/hooks/context-window-monitor.test.ts | 185 ++++++++++++++ src/hooks/context-window-monitor.ts | 83 +++---- src/hooks/preemptive-compaction.test.ts | 225 ++++++++++-------- src/hooks/preemptive-compaction.ts | 98 ++++---- .../session-state.ts | 57 ++++- 9 files changed, 700 insertions(+), 371 deletions(-) create mode 100644 src/hooks/claude-code-hooks/transcript.test.ts create mode 100644 src/hooks/context-window-monitor.test.ts diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index dad9dca252..c17a29a063 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -1424,94 +1424,16 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea continue } - const messagesResult = await this.client.session.messages({ - path: { id: sessionID }, + // Session is still actively running (not idle). + // Progress is already tracked via handleEvent(message.part.updated), + // so we skip the expensive session.messages() fetch here. + // Completion will be detected when session transitions to idle. + log("[background-agent] Session still running, relying on event-based progress:", { + taskId: task.id, + sessionID, + sessionStatus: sessionStatus?.type ?? "not_in_status", + toolCalls: task.progress?.toolCalls ?? 0, }) - - if (!messagesResult.error && messagesResult.data) { - const messages = messagesResult.data as Array<{ - info?: { role?: string } - parts?: Array<{ type?: string; tool?: string; name?: string; text?: string }> - }> - const assistantMsgs = messages.filter( - (m) => m.info?.role === "assistant" - ) - - let toolCalls = 0 - let lastTool: string | undefined - let lastMessage: string | undefined - - for (const msg of assistantMsgs) { - const parts = msg.parts ?? [] - for (const part of parts) { - if (part.type === "tool_use" || part.tool) { - toolCalls++ - lastTool = part.tool || part.name || "unknown" - } - if (part.type === "text" && part.text) { - lastMessage = part.text - } - } - } - - if (!task.progress) { - task.progress = { toolCalls: 0, lastUpdate: new Date() } - } - task.progress.toolCalls = toolCalls - task.progress.lastTool = lastTool - task.progress.lastUpdate = new Date() - if (lastMessage) { - task.progress.lastMessage = lastMessage - task.progress.lastMessageAt = new Date() - } - - // Stability detection: complete when message count unchanged for 3 polls - const currentMsgCount = messages.length - const startedAt = task.startedAt - if (!startedAt) continue - - const elapsedMs = Date.now() - startedAt.getTime() - - if (elapsedMs >= MIN_STABILITY_TIME_MS) { - if (task.lastMsgCount === currentMsgCount) { - task.stablePolls = (task.stablePolls ?? 0) + 1 - if (task.stablePolls >= 3) { - // Re-fetch session status to confirm agent is truly idle - const recheckStatus = await this.client.session.status() - const recheckData = (recheckStatus.data ?? {}) as Record - const currentStatus = recheckData[sessionID] - - if (currentStatus?.type !== "idle") { - log("[background-agent] Stability reached but session not idle, resetting:", { - taskId: task.id, - sessionStatus: currentStatus?.type ?? "not_in_status" - }) - task.stablePolls = 0 - continue - } - - // Edge guard: Validate session has actual output before completing - const hasValidOutput = await this.validateSessionHasOutput(sessionID) - if (!hasValidOutput) { - log("[background-agent] Stability reached but no valid output, waiting:", task.id) - continue - } - - // Re-check status after async operation - if (task.status !== "running") continue - - const hasIncompleteTodos = await this.checkSessionTodos(sessionID) - if (!hasIncompleteTodos) { - await this.tryCompleteTask(task, "stability detection") - continue - } - } - } else { - task.stablePolls = 0 - } - } - task.lastMsgCount = currentMsgCount - } } catch (error) { log("[background-agent] Poll error for task:", { taskId: task.id, error }) } diff --git a/src/hooks/claude-code-hooks/tool-input-cache.ts b/src/hooks/claude-code-hooks/tool-input-cache.ts index 7aa7d388a6..3b47317c63 100644 --- a/src/hooks/claude-code-hooks/tool-input-cache.ts +++ b/src/hooks/claude-code-hooks/tool-input-cache.ts @@ -37,7 +37,7 @@ export function getToolInput( } // Periodic cleanup (every minute) -setInterval(() => { +const cleanupInterval = setInterval(() => { const now = Date.now() for (const [key, entry] of cache.entries()) { if (now - entry.timestamp > CACHE_TTL) { @@ -45,3 +45,7 @@ setInterval(() => { } } }, CACHE_TTL) +// Allow process to exit naturally even if interval is running +if (typeof cleanupInterval === "object" && "unref" in cleanupInterval) { + cleanupInterval.unref() +} diff --git a/src/hooks/claude-code-hooks/transcript.test.ts b/src/hooks/claude-code-hooks/transcript.test.ts new file mode 100644 index 0000000000..a31aa3922e --- /dev/null +++ b/src/hooks/claude-code-hooks/transcript.test.ts @@ -0,0 +1,102 @@ +import { describe, it, expect, mock, beforeEach, afterEach } from "bun:test" +import { existsSync, unlinkSync, readFileSync } from "fs" +import { + buildTranscriptFromSession, + deleteTempTranscript, + clearTranscriptCache, +} from "./transcript" + +function createMockClient(messages: unknown[] = []) { + return { + session: { + messages: mock(() => + Promise.resolve({ + data: messages, + }) + ), + }, + } +} + +describe("transcript caching", () => { + afterEach(() => { + clearTranscriptCache() + }) + + // #given same session called twice + // #when buildTranscriptFromSession is invoked + // #then session.messages() should be called only once (cached) + it("should cache transcript and not re-fetch for same session", async () => { + const client = createMockClient([ + { + info: { role: "assistant" }, + parts: [ + { + type: "tool", + tool: "bash", + state: { status: "completed", input: { command: "ls" } }, + }, + ], + }, + ]) + + const path1 = await buildTranscriptFromSession( + client, + "ses_cache1", + "/tmp", + "bash", + { command: "echo hi" } + ) + + const path2 = await buildTranscriptFromSession( + client, + "ses_cache1", + "/tmp", + "read", + { path: "/tmp/file" } + ) + + // session.messages() called only once + expect(client.session.messages).toHaveBeenCalledTimes(1) + + // Both return valid paths + expect(path1).not.toBeNull() + expect(path2).not.toBeNull() + + // Second call should append the new tool entry + if (path2) { + const content = readFileSync(path2, "utf-8") + expect(content).toContain("Read") + } + + deleteTempTranscript(path1) + deleteTempTranscript(path2) + }) + + // #given different sessions + // #when buildTranscriptFromSession called for each + // #then session.messages() should be called for each + it("should not share cache between different sessions", async () => { + const client = createMockClient([]) + + await buildTranscriptFromSession(client, "ses_a", "/tmp", "bash", {}) + await buildTranscriptFromSession(client, "ses_b", "/tmp", "bash", {}) + + expect(client.session.messages).toHaveBeenCalledTimes(2) + + clearTranscriptCache() + }) + + // #given clearTranscriptCache is called + // #when buildTranscriptFromSession called again + // #then should re-fetch + it("should re-fetch after cache is cleared", async () => { + const client = createMockClient([]) + + await buildTranscriptFromSession(client, "ses_clear", "/tmp", "bash", {}) + clearTranscriptCache() + await buildTranscriptFromSession(client, "ses_clear", "/tmp", "bash", {}) + + expect(client.session.messages).toHaveBeenCalledTimes(2) + }) +}) diff --git a/src/hooks/claude-code-hooks/transcript.ts b/src/hooks/claude-code-hooks/transcript.ts index 5ee2054ebc..3c1693db99 100644 --- a/src/hooks/claude-code-hooks/transcript.ts +++ b/src/hooks/claude-code-hooks/transcript.ts @@ -29,12 +29,9 @@ export function appendTranscriptEntry( } // ============================================================================ -// Claude Code Compatible Transcript Builder (PORT FROM DISABLED) +// Claude Code Compatible Transcript Builder // ============================================================================ -/** - * OpenCode API response type (loosely typed) - */ interface OpenCodeMessagePart { type: string tool?: string @@ -51,9 +48,6 @@ interface OpenCodeMessage { parts?: OpenCodeMessagePart[] } -/** - * Claude Code compatible transcript entry (from disabled file) - */ interface DisabledTranscriptEntry { type: "assistant" message: { @@ -66,18 +60,93 @@ interface DisabledTranscriptEntry { } } +// ============================================================================ +// Session-scoped transcript cache to avoid full session.messages() rebuild +// on every tool call. Cache stores base entries from initial fetch; +// subsequent calls append new tool entries without re-fetching. +// ============================================================================ + +interface TranscriptCacheEntry { + baseEntries: string[] + tempPath: string | null + createdAt: number +} + +const TRANSCRIPT_CACHE_TTL_MS = 5 * 60 * 1000 // 5 minutes + +const transcriptCache = new Map() + /** - * Build Claude Code compatible transcript from session messages - * - * PORT FROM DISABLED: This calls client.session.messages() API to fetch - * the full session history and builds a JSONL file in Claude Code format. - * - * @param client OpenCode client instance - * @param sessionId Session ID - * @param directory Working directory - * @param currentToolName Current tool being executed (added as last entry) - * @param currentToolInput Current tool input - * @returns Temp file path (caller must call deleteTempTranscript!) + * Clear transcript cache for a specific session or all sessions. + * Call on session.deleted to prevent memory accumulation. + */ +export function clearTranscriptCache(sessionId?: string): void { + if (sessionId) { + const entry = transcriptCache.get(sessionId) + if (entry?.tempPath) { + try { unlinkSync(entry.tempPath) } catch { /* ignore */ } + } + transcriptCache.delete(sessionId) + } else { + for (const [, entry] of transcriptCache) { + if (entry.tempPath) { + try { unlinkSync(entry.tempPath) } catch { /* ignore */ } + } + } + transcriptCache.clear() + } +} + +function isCacheValid(entry: TranscriptCacheEntry): boolean { + return Date.now() - entry.createdAt < TRANSCRIPT_CACHE_TTL_MS +} + +function buildCurrentEntry(toolName: string, toolInput: Record): string { + const entry: DisabledTranscriptEntry = { + type: "assistant", + message: { + role: "assistant", + content: [ + { + type: "tool_use", + name: transformToolName(toolName), + input: toolInput, + }, + ], + }, + } + return JSON.stringify(entry) +} + +function parseMessagesToEntries(messages: OpenCodeMessage[]): string[] { + const entries: string[] = [] + for (const msg of messages) { + if (msg.info?.role !== "assistant") continue + for (const part of msg.parts || []) { + if (part.type !== "tool") continue + if (part.state?.status !== "completed") continue + if (!part.state?.input) continue + + const rawToolName = part.tool as string + const toolName = transformToolName(rawToolName) + + const entry: DisabledTranscriptEntry = { + type: "assistant", + message: { + role: "assistant", + content: [{ type: "tool_use", name: toolName, input: part.state.input }], + }, + } + entries.push(JSON.stringify(entry)) + } + } + return entries +} + +/** + * Build Claude Code compatible transcript from session messages. + * Uses per-session cache to avoid redundant session.messages() API calls. + * First call fetches and caches; subsequent calls reuse cached base entries. */ export async function buildTranscriptFromSession( client: { @@ -91,97 +160,63 @@ export async function buildTranscriptFromSession( currentToolInput: Record ): Promise { try { - const response = await client.session.messages({ - path: { id: sessionId }, - query: { directory }, - }) - - // Handle various response formats - const messages = (response as { "200"?: unknown[]; data?: unknown[] })["200"] - ?? (response as { data?: unknown[] }).data - ?? (Array.isArray(response) ? response : []) - - const entries: string[] = [] - - if (Array.isArray(messages)) { - for (const msg of messages as OpenCodeMessage[]) { - if (msg.info?.role !== "assistant") continue - - for (const part of msg.parts || []) { - if (part.type !== "tool") continue - if (part.state?.status !== "completed") continue - if (!part.state?.input) continue - - const rawToolName = part.tool as string - const toolName = transformToolName(rawToolName) - - const entry: DisabledTranscriptEntry = { - type: "assistant", - message: { - role: "assistant", - content: [ - { - type: "tool_use", - name: toolName, - input: part.state.input, - }, - ], - }, - } - entries.push(JSON.stringify(entry)) - } + let baseEntries: string[] + + const cached = transcriptCache.get(sessionId) + if (cached && isCacheValid(cached)) { + baseEntries = cached.baseEntries + } else { + // Fetch full session messages (only on first call or cache expiry) + const response = await client.session.messages({ + path: { id: sessionId }, + query: { directory }, + }) + + const messages = (response as { "200"?: unknown[]; data?: unknown[] })["200"] + ?? (response as { data?: unknown[] }).data + ?? (Array.isArray(response) ? response : []) + + baseEntries = Array.isArray(messages) + ? parseMessagesToEntries(messages as OpenCodeMessage[]) + : [] + + // Clean up old temp file if exists + if (cached?.tempPath) { + try { unlinkSync(cached.tempPath) } catch { /* ignore */ } } - } - // Always add current tool call as the last entry - const currentEntry: DisabledTranscriptEntry = { - type: "assistant", - message: { - role: "assistant", - content: [ - { - type: "tool_use", - name: transformToolName(currentToolName), - input: currentToolInput, - }, - ], - }, + transcriptCache.set(sessionId, { + baseEntries, + tempPath: null, + createdAt: Date.now(), + }) } - entries.push(JSON.stringify(currentEntry)) - // Write to temp file + // Append current tool call + const allEntries = [...baseEntries, buildCurrentEntry(currentToolName, currentToolInput)] + const tempPath = join( tmpdir(), `opencode-transcript-${sessionId}-${randomUUID()}.jsonl` ) - writeFileSync(tempPath, entries.join("\n") + "\n") + writeFileSync(tempPath, allEntries.join("\n") + "\n") + + // Update cache temp path for cleanup tracking + const cacheEntry = transcriptCache.get(sessionId) + if (cacheEntry) { + cacheEntry.tempPath = tempPath + } return tempPath } catch { - // CRITICAL FIX: Even on API failure, create file with current tool entry only - // (matching original disabled behavior - never return null with incompatible format) try { - const currentEntry: DisabledTranscriptEntry = { - type: "assistant", - message: { - role: "assistant", - content: [ - { - type: "tool_use", - name: transformToolName(currentToolName), - input: currentToolInput, - }, - ], - }, - } const tempPath = join( tmpdir(), `opencode-transcript-${sessionId}-${randomUUID()}.jsonl` ) - writeFileSync(tempPath, JSON.stringify(currentEntry) + "\n") + writeFileSync(tempPath, buildCurrentEntry(currentToolName, currentToolInput) + "\n") return tempPath } catch { - // If even this fails, return null (truly catastrophic failure) return null } } @@ -189,8 +224,6 @@ export async function buildTranscriptFromSession( /** * Delete temp transcript file (call in finally block) - * - * PORT FROM DISABLED: Cleanup mechanism to avoid disk accumulation */ export function deleteTempTranscript(path: string | null): void { if (!path) return diff --git a/src/hooks/context-window-monitor.test.ts b/src/hooks/context-window-monitor.test.ts new file mode 100644 index 0000000000..6d3d56d6b1 --- /dev/null +++ b/src/hooks/context-window-monitor.test.ts @@ -0,0 +1,185 @@ +import { describe, it, expect, mock, beforeEach } from "bun:test" +import { createContextWindowMonitorHook } from "./context-window-monitor" + +function createMockCtx() { + return { + client: { + session: { + messages: mock(() => Promise.resolve({ data: [] })), + }, + }, + directory: "/tmp/test", + } +} + +describe("context-window-monitor", () => { + let ctx: ReturnType + + beforeEach(() => { + ctx = createMockCtx() + }) + + // #given event caches token info from message.updated + // #when tool.execute.after is called + // #then session.messages() should NOT be called + it("should use cached token info instead of fetching session.messages()", async () => { + const hook = createContextWindowMonitorHook(ctx as never) + const sessionID = "ses_test1" + + // Simulate message.updated event with token info + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + finish: true, + tokens: { + input: 50000, + output: 1000, + reasoning: 0, + cache: { read: 10000, write: 0 }, + }, + }, + }, + }, + }) + + const output = { title: "", output: "test output", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output + ) + + // session.messages() should NOT have been called + expect(ctx.client.session.messages).not.toHaveBeenCalled() + }) + + // #given no cached token info exists + // #when tool.execute.after is called + // #then should skip gracefully without fetching + it("should skip gracefully when no cached token info exists", async () => { + const hook = createContextWindowMonitorHook(ctx as never) + const sessionID = "ses_no_cache" + + const output = { title: "", output: "test output", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output + ) + + // No fetch, no crash + expect(ctx.client.session.messages).not.toHaveBeenCalled() + expect(output.output).toBe("test output") + }) + + // #given token usage exceeds 70% threshold + // #when tool.execute.after is called + // #then context reminder should be appended to output + it("should append context reminder when usage exceeds threshold", async () => { + const hook = createContextWindowMonitorHook(ctx as never) + const sessionID = "ses_high_usage" + + // 150K input + 10K cache read = 160K, which is 80% of 200K limit + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + finish: true, + tokens: { + input: 150000, + output: 1000, + reasoning: 0, + cache: { read: 10000, write: 0 }, + }, + }, + }, + }, + }) + + const output = { title: "", output: "original", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output + ) + + expect(output.output).toContain("context remaining") + expect(ctx.client.session.messages).not.toHaveBeenCalled() + }) + + // #given session is deleted + // #when session.deleted event fires + // #then cached data should be cleaned up + it("should clean up cache on session.deleted", async () => { + const hook = createContextWindowMonitorHook(ctx as never) + const sessionID = "ses_deleted" + + // Cache some data + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + finish: true, + tokens: { input: 150000, output: 0, reasoning: 0, cache: { read: 10000, write: 0 } }, + }, + }, + }, + }) + + // Delete session + await hook.event({ + event: { + type: "session.deleted", + properties: { info: { id: sessionID } }, + }, + }) + + // After deletion, no reminder should fire (cache gone, reminded set gone) + const output = { title: "", output: "test", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output + ) + expect(output.output).toBe("test") + }) + + // #given non-anthropic provider + // #when message.updated fires + // #then should not trigger reminder + it("should ignore non-anthropic providers", async () => { + const hook = createContextWindowMonitorHook(ctx as never) + const sessionID = "ses_openai" + + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "openai", + finish: true, + tokens: { input: 200000, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + }, + }, + }) + + const output = { title: "", output: "test", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output + ) + expect(output.output).toBe("test") + }) +}) diff --git a/src/hooks/context-window-monitor.ts b/src/hooks/context-window-monitor.ts index 3b92191146..b617ad8242 100644 --- a/src/hooks/context-window-monitor.ts +++ b/src/hooks/context-window-monitor.ts @@ -15,23 +15,21 @@ You are using Anthropic Claude with 1M context window. You have plenty of context remaining - do NOT rush or skip tasks. Complete your work thoroughly and methodically.` -interface AssistantMessageInfo { - role: "assistant" - providerID: string - tokens: { - input: number - output: number - reasoning: number - cache: { read: number; write: number } - } +interface TokenInfo { + input: number + output: number + reasoning: number + cache: { read: number; write: number } } -interface MessageWrapper { - info: { role: string } & Partial +interface CachedTokenState { + providerID: string + tokens: TokenInfo } export function createContextWindowMonitorHook(ctx: PluginInput) { const remindedSessions = new Set() + const tokenCache = new Map() const toolExecuteAfter = async ( input: { tool: string; sessionID: string; callID: string }, @@ -41,44 +39,28 @@ export function createContextWindowMonitorHook(ctx: PluginInput) { if (remindedSessions.has(sessionID)) return - try { - const response = await ctx.client.session.messages({ - path: { id: sessionID }, - }) - - const messages = (response.data ?? response) as MessageWrapper[] - - const assistantMessages = messages - .filter((m) => m.info.role === "assistant") - .map((m) => m.info as AssistantMessageInfo) + const cached = tokenCache.get(sessionID) + if (!cached) return - if (assistantMessages.length === 0) return + if (cached.providerID !== "anthropic") return - const lastAssistant = assistantMessages[assistantMessages.length - 1] - if (lastAssistant.providerID !== "anthropic") return + const lastTokens = cached.tokens + const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0) - // Use only the last assistant message's input tokens - // This reflects the ACTUAL current context window usage (post-compaction) - const lastTokens = lastAssistant.tokens - const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0) + const actualUsagePercentage = totalInputTokens / ANTHROPIC_ACTUAL_LIMIT - const actualUsagePercentage = totalInputTokens / ANTHROPIC_ACTUAL_LIMIT + if (actualUsagePercentage < CONTEXT_WARNING_THRESHOLD) return - if (actualUsagePercentage < CONTEXT_WARNING_THRESHOLD) return + remindedSessions.add(sessionID) - remindedSessions.add(sessionID) + const displayUsagePercentage = totalInputTokens / ANTHROPIC_DISPLAY_LIMIT + const usedPct = (displayUsagePercentage * 100).toFixed(1) + const remainingPct = ((1 - displayUsagePercentage) * 100).toFixed(1) + const usedTokens = totalInputTokens.toLocaleString() + const limitTokens = ANTHROPIC_DISPLAY_LIMIT.toLocaleString() - const displayUsagePercentage = totalInputTokens / ANTHROPIC_DISPLAY_LIMIT - const usedPct = (displayUsagePercentage * 100).toFixed(1) - const remainingPct = ((1 - displayUsagePercentage) * 100).toFixed(1) - const usedTokens = totalInputTokens.toLocaleString() - const limitTokens = ANTHROPIC_DISPLAY_LIMIT.toLocaleString() - - output.output += `\n\n${CONTEXT_REMINDER} + output.output += `\n\n${CONTEXT_REMINDER} [Context Status: ${usedPct}% used (${usedTokens}/${limitTokens} tokens), ${remainingPct}% remaining]` - } catch { - // Graceful degradation - do not disrupt tool execution - } } const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => { @@ -88,8 +70,27 @@ export function createContextWindowMonitorHook(ctx: PluginInput) { const sessionInfo = props?.info as { id?: string } | undefined if (sessionInfo?.id) { remindedSessions.delete(sessionInfo.id) + tokenCache.delete(sessionInfo.id) } } + + if (event.type === "message.updated") { + const info = props?.info as { + role?: string + sessionID?: string + providerID?: string + finish?: boolean + tokens?: TokenInfo + } | undefined + + if (!info || info.role !== "assistant" || !info.finish) return + if (!info.sessionID || !info.providerID || !info.tokens) return + + tokenCache.set(info.sessionID, { + providerID: info.providerID, + tokens: info.tokens, + }) + } } return { diff --git a/src/hooks/preemptive-compaction.test.ts b/src/hooks/preemptive-compaction.test.ts index 17f64afa59..81bb6bc786 100644 --- a/src/hooks/preemptive-compaction.test.ts +++ b/src/hooks/preemptive-compaction.test.ts @@ -1,132 +1,155 @@ -import { describe, expect, mock, test } from "bun:test" -import { createPreemptiveCompactionHook } from "./preemptive-compaction.ts" +import { describe, it, expect, mock, beforeEach } from "bun:test" +import { createPreemptiveCompactionHook } from "./preemptive-compaction" -describe("preemptive-compaction", () => { - const sessionID = "preemptive-compaction-session" - - function createMockCtx(overrides?: { - messages?: ReturnType - summarize?: ReturnType - }) { - const messages = overrides?.messages ?? mock(() => Promise.resolve({ data: [] })) - const summarize = overrides?.summarize ?? mock(() => Promise.resolve()) - - return { - client: { - session: { - messages, - summarize, - }, - tui: { - showToast: mock(() => Promise.resolve()), - }, +function createMockCtx() { + return { + client: { + session: { + messages: mock(() => Promise.resolve({ data: [] })), + summarize: mock(() => Promise.resolve({})), + }, + tui: { + showToast: mock(() => Promise.resolve()), }, - directory: "/tmp/test", - } as never + }, + directory: "/tmp/test", } +} + +describe("preemptive-compaction", () => { + let ctx: ReturnType + + beforeEach(() => { + ctx = createMockCtx() + }) + + // #given event caches token info from message.updated + // #when tool.execute.after is called + // #then session.messages() should NOT be called + it("should use cached token info instead of fetching session.messages()", async () => { + const hook = createPreemptiveCompactionHook(ctx as never) + const sessionID = "ses_test1" - test("triggers summarize when usage exceeds threshold", async () => { - // #given - const messages = mock(() => - Promise.resolve({ - data: [ - { - info: { - role: "assistant", - providerID: "anthropic", - modelID: "claude-opus-4-6", - tokens: { - input: 180000, - output: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, + // Simulate message.updated with token info below threshold + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + modelID: "claude-sonnet-4-5", + finish: true, + tokens: { + input: 50000, + output: 1000, + reasoning: 0, + cache: { read: 5000, write: 0 }, }, }, - ], - }) + }, + }, + }) + + const output = { title: "", output: "test", metadata: null } + await hook["tool.execute.after"]( + { tool: "bash", sessionID, callID: "call_1" }, + output ) - const summarize = mock(() => Promise.resolve()) - const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize })) - const output = { title: "", output: "", metadata: {} } - // #when + expect(ctx.client.session.messages).not.toHaveBeenCalled() + }) + + // #given no cached token info + // #when tool.execute.after is called + // #then should skip without fetching + it("should skip gracefully when no cached token info exists", async () => { + const hook = createPreemptiveCompactionHook(ctx as never) + + const output = { title: "", output: "test", metadata: null } await hook["tool.execute.after"]( - { tool: "Read", sessionID, callID: "call-1" }, + { tool: "bash", sessionID: "ses_none", callID: "call_1" }, output ) - // #then - expect(summarize).toHaveBeenCalled() + expect(ctx.client.session.messages).not.toHaveBeenCalled() }) - test("triggers summarize for non-anthropic providers when usage exceeds threshold", async () => { - //#given - const messages = mock(() => - Promise.resolve({ - data: [ - { - info: { - role: "assistant", - providerID: "openai", - modelID: "gpt-5.2", - tokens: { - input: 180000, - output: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, + // #given usage above 78% threshold + // #when tool.execute.after runs + // #then should trigger summarize + it("should trigger compaction when usage exceeds threshold", async () => { + const hook = createPreemptiveCompactionHook(ctx as never) + const sessionID = "ses_high" + + // 170K input + 10K cache = 180K → 90% of 200K + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + modelID: "claude-sonnet-4-5", + finish: true, + tokens: { + input: 170000, + output: 1000, + reasoning: 0, + cache: { read: 10000, write: 0 }, }, }, - ], - }) - ) - const summarize = mock(() => Promise.resolve()) - const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize })) - const output = { title: "", output: "", metadata: {} } + }, + }, + }) - //#when + const output = { title: "", output: "test", metadata: null } await hook["tool.execute.after"]( - { tool: "Read", sessionID, callID: "call-3" }, + { tool: "bash", sessionID, callID: "call_1" }, output ) - //#then - expect(summarize).toHaveBeenCalled() + expect(ctx.client.session.messages).not.toHaveBeenCalled() + expect(ctx.client.session.summarize).toHaveBeenCalled() }) - test("does not summarize when usage is below threshold", async () => { - // #given - const messages = mock(() => - Promise.resolve({ - data: [ - { - info: { - role: "assistant", - providerID: "anthropic", - modelID: "claude-opus-4-6", - tokens: { - input: 100000, - output: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - }, + // #given session deleted + // #then cache should be cleaned up + it("should clean up cache on session.deleted", async () => { + const hook = createPreemptiveCompactionHook(ctx as never) + const sessionID = "ses_del" + + await hook.event({ + event: { + type: "message.updated", + properties: { + info: { + role: "assistant", + sessionID, + providerID: "anthropic", + modelID: "claude-sonnet-4-5", + finish: true, + tokens: { input: 180000, output: 0, reasoning: 0, cache: { read: 10000, write: 0 } }, }, - ], - }) - ) - const summarize = mock(() => Promise.resolve()) - const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize })) - const output = { title: "", output: "", metadata: {} } + }, + }, + }) + + await hook.event({ + event: { + type: "session.deleted", + properties: { info: { id: sessionID } }, + }, + }) - // #when + const output = { title: "", output: "test", metadata: null } await hook["tool.execute.after"]( - { tool: "Read", sessionID, callID: "call-2" }, + { tool: "bash", sessionID, callID: "call_1" }, output ) - // #then - expect(summarize).not.toHaveBeenCalled() + expect(ctx.client.session.summarize).not.toHaveBeenCalled() }) }) diff --git a/src/hooks/preemptive-compaction.ts b/src/hooks/preemptive-compaction.ts index e567a6b0d0..87190415a3 100644 --- a/src/hooks/preemptive-compaction.ts +++ b/src/hooks/preemptive-compaction.ts @@ -8,29 +8,29 @@ const ANTHROPIC_ACTUAL_LIMIT = const PREEMPTIVE_COMPACTION_THRESHOLD = 0.78 -interface AssistantMessageInfo { - role: "assistant" - providerID: string - modelID?: string - tokens: { - input: number - output: number - reasoning: number - cache: { read: number; write: number } - } +interface TokenInfo { + input: number + output: number + reasoning: number + cache: { read: number; write: number } } -interface MessageWrapper { - info: { role: string } & Partial +interface CachedCompactionState { + providerID: string + modelID: string + tokens: TokenInfo } type PluginInput = { client: { session: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any messages: (...args: any[]) => any + // eslint-disable-next-line @typescript-eslint/no-explicit-any summarize: (...args: any[]) => any } tui: { + // eslint-disable-next-line @typescript-eslint/no-explicit-any showToast: (...args: any[]) => any } } @@ -40,6 +40,7 @@ type PluginInput = { export function createPreemptiveCompactionHook(ctx: PluginInput) { const compactionInProgress = new Set() const compactedSessions = new Set() + const tokenCache = new Map() const toolExecuteAfter = async ( input: { tool: string; sessionID: string; callID: string }, @@ -48,38 +49,29 @@ export function createPreemptiveCompactionHook(ctx: PluginInput) { const { sessionID } = input if (compactedSessions.has(sessionID) || compactionInProgress.has(sessionID)) return - try { - const response = await ctx.client.session.messages({ - path: { id: sessionID }, - }) - const payload = response as { data?: MessageWrapper[] } | MessageWrapper[] - const messages = Array.isArray(payload) ? payload : (payload.data ?? []) - const assistantMessages = messages - .filter((m) => m.info.role === "assistant") - .map((m) => m.info as AssistantMessageInfo) - - if (assistantMessages.length === 0) return + const cached = tokenCache.get(sessionID) + if (!cached) return - const lastAssistant = assistantMessages[assistantMessages.length - 1] - const actualLimit = - lastAssistant.providerID === "anthropic" - ? ANTHROPIC_ACTUAL_LIMIT - : DEFAULT_ACTUAL_LIMIT + const actualLimit = + cached.providerID === "anthropic" + ? ANTHROPIC_ACTUAL_LIMIT + : DEFAULT_ACTUAL_LIMIT - const lastTokens = lastAssistant.tokens - const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0) - const usageRatio = totalInputTokens / actualLimit + const lastTokens = cached.tokens + const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0) + const usageRatio = totalInputTokens / actualLimit - if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD) return + if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD) return - const modelID = lastAssistant.modelID - if (!modelID) return + const modelID = cached.modelID + if (!modelID) return - compactionInProgress.add(sessionID) + compactionInProgress.add(sessionID) + try { await ctx.client.session.summarize({ path: { id: sessionID }, - body: { providerID: lastAssistant.providerID, modelID, auto: true } as never, + body: { providerID: cached.providerID, modelID, auto: true } as never, query: { directory: ctx.directory }, }) @@ -92,12 +84,36 @@ export function createPreemptiveCompactionHook(ctx: PluginInput) { } const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => { - if (event.type !== "session.deleted") return const props = event.properties as Record | undefined - const sessionInfo = props?.info as { id?: string } | undefined - if (sessionInfo?.id) { - compactionInProgress.delete(sessionInfo.id) - compactedSessions.delete(sessionInfo.id) + + if (event.type === "session.deleted") { + const sessionInfo = props?.info as { id?: string } | undefined + if (sessionInfo?.id) { + compactionInProgress.delete(sessionInfo.id) + compactedSessions.delete(sessionInfo.id) + tokenCache.delete(sessionInfo.id) + } + return + } + + if (event.type === "message.updated") { + const info = props?.info as { + role?: string + sessionID?: string + providerID?: string + modelID?: string + finish?: boolean + tokens?: TokenInfo + } | undefined + + if (!info || info.role !== "assistant" || !info.finish) return + if (!info.sessionID || !info.providerID || !info.tokens) return + + tokenCache.set(info.sessionID, { + providerID: info.providerID, + modelID: info.modelID ?? "", + tokens: info.tokens, + }) } } diff --git a/src/hooks/todo-continuation-enforcer/session-state.ts b/src/hooks/todo-continuation-enforcer/session-state.ts index fc96437ab7..93d0064df5 100644 --- a/src/hooks/todo-continuation-enforcer/session-state.ts +++ b/src/hooks/todo-continuation-enforcer/session-state.ts @@ -1,33 +1,69 @@ import type { SessionState } from "./types" +// TTL for idle session state entries (10 minutes) +const SESSION_STATE_TTL_MS = 10 * 60 * 1000 +// Prune interval (every 2 minutes) +const SESSION_STATE_PRUNE_INTERVAL_MS = 2 * 60 * 1000 + +interface TrackedSessionState { + state: SessionState + lastAccessedAt: number +} + export interface SessionStateStore { getState: (sessionID: string) => SessionState getExistingState: (sessionID: string) => SessionState | undefined cancelCountdown: (sessionID: string) => void cleanup: (sessionID: string) => void cancelAllCountdowns: () => void + shutdown: () => void } export function createSessionStateStore(): SessionStateStore { - const sessions = new Map() + const sessions = new Map() + + // Periodic pruning of stale session states to prevent unbounded Map growth + let pruneInterval: ReturnType | undefined + pruneInterval = setInterval(() => { + const now = Date.now() + for (const [sessionID, tracked] of sessions.entries()) { + if (now - tracked.lastAccessedAt > SESSION_STATE_TTL_MS) { + cancelCountdown(sessionID) + sessions.delete(sessionID) + } + } + }, SESSION_STATE_PRUNE_INTERVAL_MS) + // Allow process to exit naturally even if interval is running + if (typeof pruneInterval === "object" && "unref" in pruneInterval) { + pruneInterval.unref() + } function getState(sessionID: string): SessionState { - const existingState = sessions.get(sessionID) - if (existingState) return existingState + const existing = sessions.get(sessionID) + if (existing) { + existing.lastAccessedAt = Date.now() + return existing.state + } const state: SessionState = {} - sessions.set(sessionID, state) + sessions.set(sessionID, { state, lastAccessedAt: Date.now() }) return state } function getExistingState(sessionID: string): SessionState | undefined { - return sessions.get(sessionID) + const existing = sessions.get(sessionID) + if (existing) { + existing.lastAccessedAt = Date.now() + return existing.state + } + return undefined } function cancelCountdown(sessionID: string): void { - const state = sessions.get(sessionID) - if (!state) return + const tracked = sessions.get(sessionID) + if (!tracked) return + const state = tracked.state if (state.countdownTimer) { clearTimeout(state.countdownTimer) state.countdownTimer = undefined @@ -52,11 +88,18 @@ export function createSessionStateStore(): SessionStateStore { } } + function shutdown(): void { + clearInterval(pruneInterval) + cancelAllCountdowns() + sessions.clear() + } + return { getState, getExistingState, cancelCountdown, cleanup, cancelAllCountdowns, + shutdown, } }