diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 151fa5646ba..39e7f2c9521 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -102,4 +102,12 @@ export class ACPSessionManager { this.sessions.set(sessionId, session) return session } + + remove(sessionId: string) { + const deleted = this.sessions.delete(sessionId) + if (deleted) { + log.info("removed session", { sessionId }) + } + return deleted + } } diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts index 35c780fbdd5..5e0c9d8b11a 100644 --- a/packages/opencode/src/file/time.ts +++ b/packages/opencode/src/file/time.ts @@ -66,4 +66,10 @@ export namespace FileTime { ) } } + + export function clearSession(sessionID: string) { + const current = state() + delete current.read[sessionID] + log.info("cleared session", { sessionID }) + } } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index bab758030b9..84073923a9e 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -102,36 +102,45 @@ export namespace Format { export function init() { log.info("init") - Bus.subscribe(File.Event.Edited, async (payload) => { - const file = payload.properties.file - log.info("formatting", { file }) - const ext = path.extname(file) - for (const item of await getFormatter(ext)) { - log.info("running", { command: item.command }) - try { - const proc = Bun.spawn({ - cmd: item.command.map((x) => x.replace("$FILE", file)), - cwd: Instance.directory, - env: { ...process.env, ...item.environment }, - stdout: "ignore", - stderr: "ignore", - }) - const exit = await proc.exited - if (exit !== 0) - log.error("failed", { - command: item.command, - ...item.environment, - }) - } catch (error) { - log.error("failed to format file", { - error, - command: item.command, - ...item.environment, - file, - }) - } - } - }) + Instance.state( + () => { + const unsubscribe = Bus.subscribe(File.Event.Edited, async (payload) => { + const file = payload.properties.file + log.info("formatting", { file }) + const ext = path.extname(file) + + for (const item of await getFormatter(ext)) { + log.info("running", { command: item.command }) + try { + const proc = Bun.spawn({ + cmd: item.command.map((x) => x.replace("$FILE", file)), + cwd: Instance.directory, + env: { ...process.env, ...item.environment }, + stdout: "ignore", + stderr: "ignore", + }) + const exit = await proc.exited + if (exit !== 0) + log.error("failed", { + command: item.command, + ...item.environment, + }) + } catch (error) { + log.error("failed to format file", { + error, + command: item.command, + ...item.environment, + file, + }) + } + } + }) + return { unsubscribe } + }, + async (state) => { + state.unsubscribe() + }, + )() } } diff --git a/packages/opencode/src/permission/next.ts b/packages/opencode/src/permission/next.ts index 2481f104ed1..a29a6dcad20 100644 --- a/packages/opencode/src/permission/next.ts +++ b/packages/opencode/src/permission/next.ts @@ -115,6 +115,7 @@ export namespace PermissionNext { info: Request resolve: () => void reject: (e: any) => void + timeout: ReturnType } > = {} @@ -143,10 +144,28 @@ export namespace PermissionNext { id, ...request, } + + const timeout = setTimeout( + () => { + if (s.pending[id]) { + delete s.pending[id] + reject(new Error("Permission request timed out")) + } + }, + 5 * 60 * 1000, + ) + s.pending[id] = { info, - resolve, - reject, + resolve: () => { + clearTimeout(timeout) + resolve() + }, + reject: (e) => { + clearTimeout(timeout) + reject(e) + }, + timeout, } Bus.publish(Event.Asked, info) }) @@ -166,6 +185,7 @@ export namespace PermissionNext { const s = await state() const existing = s.pending[input.requestID] if (!existing) return + clearTimeout(existing.timeout) delete s.pending[input.requestID] Bus.publish(Event.Replied, { sessionID: existing.info.sessionID, @@ -178,6 +198,7 @@ export namespace PermissionNext { const sessionID = existing.info.sessionID for (const [id, pending] of Object.entries(s.pending)) { if (pending.info.sessionID === sessionID) { + clearTimeout(pending.timeout) delete s.pending[id] Bus.publish(Event.Replied, { sessionID: pending.info.sessionID, @@ -211,6 +232,7 @@ export namespace PermissionNext { (pattern) => evaluate(pending.info.permission, pattern, s.approved).action === "allow", ) if (!ok) continue + clearTimeout(pending.timeout) delete s.pending[id] Bus.publish(Event.Replied, { sessionID: pending.info.sessionID, @@ -277,4 +299,15 @@ export namespace PermissionNext { export async function list() { return state().then((x) => Object.values(x.pending).map((x) => x.info)) } + + export async function clearSession(sessionID: string) { + const s = await state() + for (const [id, pending] of Object.entries(s.pending)) { + if (pending.info.sessionID === sessionID) { + clearTimeout(pending.timeout) + delete s.pending[id] + pending.reject(new Error("Session ended")) + } + } + } } diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 76d34b845ae..b1b92a7660f 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -123,13 +123,22 @@ export namespace Plugin { // @ts-expect-error this is because we haven't moved plugin to sdk v2 await hook.config?.(config) } - Bus.subscribeAll(async (input) => { - const hooks = await state().then((x) => x.hooks) - for (const hook of hooks) { - hook["event"]?.({ - event: input, + + Instance.state( + () => { + const unsubscribe = Bus.subscribeAll(async (input) => { + const hooks = await state().then((x) => x.hooks) + for (const hook of hooks) { + hook["event"]?.({ + event: input, + }) + } }) - } - }) + return { unsubscribe } + }, + async (state) => { + state.unsubscribe() + }, + )() } } diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index fdd4ccdfb61..7de425da683 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -13,6 +13,7 @@ import { Env } from "../env" import { Instance } from "../project/instance" import { Flag } from "../flag/flag" import { iife } from "@/util/iife" +import { LRUMap } from "@/util/lru" // Direct imports for bundled providers import { createAmazonBedrock, type AmazonBedrockProviderSettings } from "@ai-sdk/amazon-bedrock" @@ -689,11 +690,11 @@ export namespace Provider { } const providers: { [providerID: string]: Info } = {} - const languages = new Map() + const languages = new LRUMap(100) const modelLoaders: { [providerID: string]: CustomModelLoader } = {} - const sdk = new Map() + const sdk = new LRUMap(50) log.info("init") diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts index 41029ecbbdb..eb04c3718dd 100644 --- a/packages/opencode/src/question/index.ts +++ b/packages/opencode/src/question/index.ts @@ -86,6 +86,7 @@ export namespace Question { info: Request resolve: (answers: Answer[]) => void reject: (e: any) => void + timeout: ReturnType } > = {} @@ -111,10 +112,28 @@ export namespace Question { questions: input.questions, tool: input.tool, } + + const timeout = setTimeout( + () => { + if (s.pending[id]) { + delete s.pending[id] + reject(new Error("Question timed out")) + } + }, + 5 * 60 * 1000, + ) + s.pending[id] = { info, - resolve, - reject, + resolve: (answers) => { + clearTimeout(timeout) + resolve(answers) + }, + reject: (e) => { + clearTimeout(timeout) + reject(e) + }, + timeout, } Bus.publish(Event.Asked, info) }) @@ -127,6 +146,7 @@ export namespace Question { log.warn("reply for unknown request", { requestID: input.requestID }) return } + clearTimeout(existing.timeout) delete s.pending[input.requestID] log.info("replied", { requestID: input.requestID, answers: input.answers }) @@ -147,6 +167,7 @@ export namespace Question { log.warn("reject for unknown request", { requestID }) return } + clearTimeout(existing.timeout) delete s.pending[requestID] log.info("rejected", { requestID }) @@ -168,4 +189,15 @@ export namespace Question { export async function list() { return state().then((x) => Object.values(x.pending).map((x) => x.info)) } + + export async function clearSession(sessionID: string) { + const s = await state() + for (const [id, pending] of Object.entries(s.pending)) { + if (pending.info.sessionID === sessionID) { + clearTimeout(pending.timeout) + delete s.pending[id] + pending.reject(new Error("Session ended")) + } + } + } } diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index fb382530291..71c6608df58 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -81,11 +81,13 @@ export namespace SessionCompaction { if (pruned > PRUNE_MINIMUM) { for (const part of toPrune) { if (part.state.status === "completed") { + part.state.output = "[Old tool result content cleared]" + part.state.attachments = [] part.state.time.compacted = Date.now() await Session.updatePart(part) } } - log.info("pruned", { count: toPrune.length }) + log.info("pruned", { count: toPrune.length, estimatedMB: Math.round(pruned / 1024 / 1024) }) } } @@ -189,6 +191,7 @@ export namespace SessionCompaction { } if (processor.message.error) return "stop" Bus.publish(Event.Compacted, { sessionID: input.sessionID }) + if (global.gc) global.gc() return "continue" } diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index b81a21a57be..cabe3ddf047 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -21,7 +21,9 @@ import { Snapshot } from "@/snapshot" import type { Provider } from "@/provider/provider" import { PermissionNext } from "@/permission/next" +import { Question } from "@/question" import { Global } from "@/global" +import { FileTime } from "@/file/time" export namespace Session { const log = Log.create({ service: "session" }) @@ -341,6 +343,11 @@ export namespace Session { await remove(child.id) } await unshare(sessionID).catch(() => {}) + + FileTime.clearSession(sessionID) + await PermissionNext.clearSession(sessionID) + await Question.clearSession(sessionID) + for (const msg of await Storage.list(["message", sessionID])) { for (const part of await Storage.list(["part", msg.at(-1)!])) { await Storage.remove(part) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 27071056180..6e3957fc295 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -50,6 +50,8 @@ export namespace SessionProcessor { try { let currentText: MessageV2.TextPart | undefined let reasoningMap: Record = {} + const reasoningChunks: Record = {} + let textChunks: string[] = [] const stream = await LLM.stream(streamInput) for await (const value of stream.fullStream) { @@ -63,6 +65,7 @@ export namespace SessionProcessor { if (value.id in reasoningMap) { continue } + reasoningChunks[value.id] = [] reasoningMap[value.id] = { id: Identifier.ascending("part"), messageID: input.assistantMessage.id, @@ -79,7 +82,8 @@ export namespace SessionProcessor { case "reasoning-delta": if (value.id in reasoningMap) { const part = reasoningMap[value.id] - part.text += value.text + reasoningChunks[value.id].push(value.text) + part.text = reasoningChunks[value.id].join("") if (value.providerMetadata) part.metadata = value.providerMetadata if (part.text) await Session.updatePart({ part, delta: value.text }) } @@ -97,6 +101,7 @@ export namespace SessionProcessor { if (value.providerMetadata) part.metadata = value.providerMetadata await Session.updatePart(part) delete reasoningMap[value.id] + delete reasoningChunks[value.id] } break @@ -277,6 +282,7 @@ export namespace SessionProcessor { break case "text-start": + textChunks = [] currentText = { id: Identifier.ascending("part"), messageID: input.assistantMessage.id, @@ -292,7 +298,8 @@ export namespace SessionProcessor { case "text-delta": if (currentText) { - currentText.text += value.text + textChunks.push(value.text) + currentText.text = textChunks.join("") if (value.providerMetadata) currentText.metadata = value.providerMetadata if (currentText.text) await Session.updatePart({ @@ -323,6 +330,7 @@ export namespace SessionProcessor { await Session.updatePart(currentText) } currentText = undefined + textChunks = [] break case "finish": @@ -361,6 +369,7 @@ export namespace SessionProcessor { error: input.assistantMessage.error, }) } + for (const id in toolcalls) delete toolcalls[id] if (snapshot) { const patch = await Snapshot.patch(snapshot) if (patch.files.length) { diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index de62788200b..e7860cc14d1 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -570,24 +570,39 @@ export namespace SessionPrompt { }) } - const sessionMessages = clone(msgs) + const sessionMessages: MessageV2.WithParts[] = [] + + for (const msg of msgs) { + if (step > 1 && lastFinished && msg.info.role === "user" && msg.info.id > lastFinished.id) { + const wrappedParts: MessageV2.Part[] = [] + let hasWrapped = false - // Ephemerally wrap queued user messages with a reminder to stay on track - if (step > 1 && lastFinished) { - for (const msg of sessionMessages) { - if (msg.info.role !== "user" || msg.info.id <= lastFinished.id) continue for (const part of msg.parts) { - if (part.type !== "text" || part.ignored || part.synthetic) continue - if (!part.text.trim()) continue - part.text = [ - "", - "The user sent the following message:", - part.text, - "", - "Please address this message and continue with your tasks.", - "", - ].join("\n") + if (part.type === "text" && !part.ignored && !part.synthetic && part.text.trim()) { + wrappedParts.push({ + ...part, + text: [ + "", + "The user sent the following message:", + part.text, + "", + "Please address this message and continue with your tasks.", + "", + ].join("\n"), + }) + hasWrapped = true + } else { + wrappedParts.push(part) + } + } + + if (hasWrapped) { + sessionMessages.push({ info: msg.info, parts: wrappedParts }) + } else { + sessionMessages.push(msg) } + } else { + sessionMessages.push(msg) } } diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index dddce95cb4f..e36ca4bdfd3 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -6,6 +6,7 @@ import { Session } from "@/session" import { MessageV2 } from "@/session/message-v2" import { Storage } from "@/storage/storage" import { Log } from "@/util/log" +import { Instance } from "@/project/instance" import type * as SDK from "@opencode-ai/sdk/v2" export namespace ShareNext { @@ -19,50 +20,65 @@ export namespace ShareNext { export async function init() { if (disabled) return - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync(evt.properties.info.id, [ - { - type: "session", - data: evt.properties.info, - }, - ]) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync(evt.properties.info.sessionID, [ - { - type: "message", - data: evt.properties.info, - }, - ]) - if (evt.properties.info.role === "user") { - await sync(evt.properties.info.sessionID, [ - { - type: "model", - data: [ - await Provider.getModel(evt.properties.info.model.providerID, evt.properties.info.model.modelID).then( - (m) => m, - ), - ], - }, - ]) - } - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync(evt.properties.part.sessionID, [ - { - type: "part", - data: evt.properties.part, - }, - ]) - }) - Bus.subscribe(Session.Event.Diff, async (evt) => { - await sync(evt.properties.sessionID, [ - { - type: "session_diff", - data: evt.properties.diff, - }, - ]) - }) + + // Use Instance.state to ensure subscriptions are cleaned up when instance is disposed + Instance.state( + () => { + const unsubscribers = [ + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync(evt.properties.info.id, [ + { + type: "session", + data: evt.properties.info, + }, + ]) + }), + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + await sync(evt.properties.info.sessionID, [ + { + type: "message", + data: evt.properties.info, + }, + ]) + if (evt.properties.info.role === "user") { + await sync(evt.properties.info.sessionID, [ + { + type: "model", + data: [ + await Provider.getModel( + evt.properties.info.model.providerID, + evt.properties.info.model.modelID, + ).then((m) => m), + ], + }, + ]) + } + }), + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync(evt.properties.part.sessionID, [ + { + type: "part", + data: evt.properties.part, + }, + ]) + }), + Bus.subscribe(Session.Event.Diff, async (evt) => { + await sync(evt.properties.sessionID, [ + { + type: "session_diff", + data: evt.properties.diff, + }, + ]) + }), + ] + return { unsubscribers } + }, + async (state) => { + for (const unsub of state.unsubscribers) { + unsub() + } + }, + )() } export async function create(sessionID: string) { diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index f7bf4b3fa52..f2fcaec1d43 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -3,6 +3,7 @@ import { Installation } from "../installation" import { Session } from "../session" import { MessageV2 } from "../session/message-v2" import { Log } from "../util/log" +import { Instance } from "../project/instance" export namespace Share { const log = Log.create({ service: "share" }) @@ -47,23 +48,39 @@ export namespace Share { } export function init() { - Bus.subscribe(Session.Event.Updated, async (evt) => { - await sync("session/info/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.Updated, async (evt) => { - await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) - }) - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { - await sync( - "session/part/" + - evt.properties.part.sessionID + - "/" + - evt.properties.part.messageID + - "/" + - evt.properties.part.id, - evt.properties.part, - ) - }) + // Use Instance.state to ensure subscriptions are cleaned up when instance is disposed + Instance.state( + () => { + const unsubscribers = [ + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync("session/info/" + evt.properties.info.id, evt.properties.info) + }), + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + await sync( + "session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, + evt.properties.info, + ) + }), + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync( + "session/part/" + + evt.properties.part.sessionID + + "/" + + evt.properties.part.messageID + + "/" + + evt.properties.part.id, + evt.properties.part, + ) + }), + ] + return { unsubscribers } + }, + async (state) => { + for (const unsub of state.unsubscribers) { + unsub() + } + }, + )() } export const URL = diff --git a/packages/opencode/src/tool/bash.ts b/packages/opencode/src/tool/bash.ts index bf7c524941f..055d0d23e91 100644 --- a/packages/opencode/src/tool/bash.ts +++ b/packages/opencode/src/tool/bash.ts @@ -164,7 +164,7 @@ export const BashTool = Tool.define("bash", async () => { detached: process.platform !== "win32", }) - let output = "" + const chunks: Buffer[] = [] // Initialize metadata with empty output ctx.metadata({ @@ -175,7 +175,8 @@ export const BashTool = Tool.define("bash", async () => { }) const append = (chunk: Buffer) => { - output += chunk.toString() + chunks.push(chunk) + const output = Buffer.concat(chunks).toString() ctx.metadata({ metadata: { // truncate the metadata to avoid GIANT blobs of data (has nothing to do w/ what agent can access) @@ -240,6 +241,7 @@ export const BashTool = Tool.define("bash", async () => { resultMetadata.push("User aborted the command") } + let output = Buffer.concat(chunks).toString() if (resultMetadata.length > 0) { output += "\n\n\n" + resultMetadata.join("\n") + "\n" } diff --git a/packages/opencode/src/util/lru.ts b/packages/opencode/src/util/lru.ts new file mode 100644 index 00000000000..63292d4a908 --- /dev/null +++ b/packages/opencode/src/util/lru.ts @@ -0,0 +1,67 @@ +export class LRUMap { + private cache = new Map() + private maxSize: number + + constructor(maxSize: number) { + if (maxSize < 1) throw new Error("LRUMap maxSize must be at least 1") + this.maxSize = maxSize + } + + get(key: K): V | undefined { + const value = this.cache.get(key) + if (value === undefined) return undefined + this.cache.delete(key) + this.cache.set(key, value) + return value + } + + set(key: K, value: V): this { + if (this.cache.has(key)) { + this.cache.delete(key) + } + while (this.cache.size >= this.maxSize) { + const oldest = this.cache.keys().next().value + if (oldest !== undefined) { + this.cache.delete(oldest) + } + } + this.cache.set(key, value) + return this + } + + has(key: K): boolean { + return this.cache.has(key) + } + + delete(key: K): boolean { + return this.cache.delete(key) + } + + clear(): void { + this.cache.clear() + } + + get size(): number { + return this.cache.size + } + + keys(): IterableIterator { + return this.cache.keys() + } + + values(): IterableIterator { + return this.cache.values() + } + + entries(): IterableIterator<[K, V]> { + return this.cache.entries() + } + + [Symbol.iterator](): IterableIterator<[K, V]> { + return this.cache[Symbol.iterator]() + } + + forEach(fn: (value: V, key: K, map: LRUMap) => void): void { + this.cache.forEach((value, key) => fn(value, key, this)) + } +}