From 1bcceeb5de5d5a509ebef08aebfe23476692993a Mon Sep 17 00:00:00 2001 From: Benjamin Shafii Date: Fri, 3 Apr 2026 08:42:43 -0700 Subject: [PATCH 1/2] feat(labs): replace the PI host shim with embedded opencode --- apps/labs/dist-electron/main.js | 414 ++++---- apps/labs/electron/main.ts | 450 +++++---- apps/labs/package.json | 1 + apps/labs/src/app.tsx | 8 +- apps/openwork-host/package.json | 20 - apps/openwork-host/src/index.ts | 341 ------- apps/openwork-host/tsconfig.json | 16 - pnpm-lock.yaml | 1600 +----------------------------- 8 files changed, 507 insertions(+), 2343 deletions(-) delete mode 100644 apps/openwork-host/package.json delete mode 100644 apps/openwork-host/src/index.ts delete mode 100644 apps/openwork-host/tsconfig.json diff --git a/apps/labs/dist-electron/main.js b/apps/labs/dist-electron/main.js index c83453f1a..80294e1ff 100644 --- a/apps/labs/dist-electron/main.js +++ b/apps/labs/dist-electron/main.js @@ -1,16 +1,11 @@ import { randomUUID } from "node:crypto"; -import { once } from "node:events"; -import { access, appendFile, mkdir, readFile, writeFile } from "node:fs/promises"; -import net from "node:net"; +import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises"; import path from "node:path"; import { fileURLToPath } from "node:url"; -import { spawn } from "node:child_process"; import { app, BrowserWindow, dialog, ipcMain } from "electron"; +import { createOpencode, createOpencodeClient, } from "@opencode-ai/sdk/v2"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const packageRoot = path.resolve(__dirname, ".."); -const repoRoot = path.resolve(packageRoot, "..", ".."); -const hostPackageRoot = path.resolve(repoRoot, "apps", "openwork-host"); -const hostEntry = path.resolve(hostPackageRoot, "dist", "index.js"); const rendererUrl = process.env.OPENWORK_LABS_RENDERER_URL ?? "http://127.0.0.1:4174"; const smokeWorkspaceDir = process.env.OPENWORK_LABS_SMOKE_WORKSPACE_DIR ?? ""; const smokePrompt = process.env.OPENWORK_LABS_SMOKE_PROMPT ?? ""; @@ -18,13 +13,18 @@ const configDir = path.join(app.getPath("userData"), "labs"); const configFile = path.join(configDir, "workspace.json"); const logFile = path.join(configDir, "runtime.log"); let mainWindow = null; -let localHost = null; +let localOpencode = null; +let activeClient = null; +let activeDirectory; let eventAbort = null; +let syncTimer = null; let connection = { kind: "none" }; let sessions = []; let currentSessionID = null; let messages = []; const logs = []; +const sessionStatuses = new Map(); +const pendingSync = { sessions: false, messages: false }; function snapshot() { return { connection, sessions, currentSessionID, messages }; } @@ -39,25 +39,6 @@ function pushLog(scope, level, message) { mainWindow?.webContents.send("ow:log", entry); void appendFile(logFile, `[${entry.at}] [${scope}] [${level}] ${message}\n`).catch(() => undefined); } -async function ensureHostBuilt() { - try { - await access(hostEntry); - return; - } - catch { - pushLog("main", "info", "Building openwork-host before launch"); - const child = spawn("pnpm", ["--filter", "openwork-host", "build"], { - cwd: repoRoot, - stdio: ["ignore", "pipe", "pipe"], - }); - child.stdout.on("data", (chunk) => pushLog("main", "info", String(chunk).trim())); - child.stderr.on("data", (chunk) => pushLog("main", "error", String(chunk).trim())); - const [code] = (await once(child, "exit")); - if (code !== 0) { - throw new Error(`openwork-host build failed with code ${code}`); - } - } -} async function saveConnection() { await mkdir(configDir, { recursive: true }); if (connection.kind === "none") { @@ -80,45 +61,107 @@ async function loadConnection() { return null; } } -async function getPort() { - const server = net.createServer(); - server.listen(0, "127.0.0.1"); - await once(server, "listening"); - const address = server.address(); - server.close(); - if (!address || typeof address === "string") - throw new Error("Unable to allocate port"); - return address.port; +function clientOrThrow() { + if (!activeClient) { + throw new Error("No active opencode connection"); + } + return activeClient; } -function baseUrl() { - if (connection.kind === "local" || connection.kind === "remote") - return connection.url; - throw new Error("No active connection"); +function requestArgs(input) { + if (!activeDirectory) { + return { ...(input ?? {}) }; + } + return { ...(input ?? {}), directory: activeDirectory }; } -function authHeaders() { - if (connection.kind === "remote" && connection.token) { - return { Authorization: `Bearer ${connection.token}` }; +function formatOpencodeError(error) { + if (!error) + return "Unknown error"; + if (error instanceof Error) + return error.message; + if (typeof error === "string") + return error; + if (typeof error === "object") { + const value = error; + if (typeof value.data?.message === "string" && value.data.message.trim()) { + return value.data.message; + } + if (typeof value.name === "string" && value.name.trim()) { + return value.name; + } } - return {}; + return JSON.stringify(error); } -async function api(pathname, init) { - const response = await fetch(`${baseUrl()}${pathname}`, { - ...init, - headers: { - "content-type": "application/json", - ...authHeaders(), - ...(init?.headers ?? {}), - }, - }); - if (!response.ok) { - const text = await response.text(); - throw new Error(text || `${response.status} ${response.statusText}`); +function basicAuthHeader(username, password) { + return `Basic ${Buffer.from(`${username}:${password}`, "utf8").toString("base64")}`; +} +function localServerHeaders() { + const password = process.env.OPENCODE_SERVER_PASSWORD?.trim(); + if (!password) + return undefined; + const username = process.env.OPENCODE_SERVER_USERNAME?.trim() || "opencode"; + return { Authorization: basicAuthHeader(username, password) }; +} +function isSessionActive(status) { + return status?.type === "busy" || status?.type === "retry"; +} +function toSessionSummary(session) { + return { + id: session.id, + title: session.title || "Untitled chat", + createdAt: new Date(session.time.created).toISOString(), + updatedAt: new Date(session.time.updated).toISOString(), + active: isSessionActive(sessionStatuses.get(session.id)), + }; +} +function partText(part) { + if (part.type === "text") { + return part.text; + } + if (part.type === "tool" && part.state.status === "error") { + return `Tool ${part.tool} failed: ${part.state.error}`; } - return response.json(); + return ""; +} +function messageText(info, parts) { + const text = parts + .map((part) => partText(part)) + .filter((value) => value.length > 0) + .join(""); + if (text.trim()) + return text; + if (info.role === "assistant" && info.error) + return formatOpencodeError(info.error); + return ""; +} +function toMessage(entry) { + return { + id: entry.info.id, + role: entry.info.role === "assistant" && entry.info.error ? "error" : entry.info.role, + text: messageText(entry.info, entry.parts), + createdAt: new Date(entry.info.time.created).toISOString(), + }; +} +function resetSnapshot() { + sessions = []; + currentSessionID = null; + messages = []; + sessionStatuses.clear(); } async function refreshSessions() { - const data = (await api("/session", { method: "GET" })); - sessions = data.sessions; + const sdk = clientOrThrow(); + const [listResult, statusResult] = await Promise.all([ + sdk.session.list(requestArgs({ roots: true, limit: 100 })), + sdk.session.status(requestArgs()), + ]); + if (listResult.error) + throw new Error(formatOpencodeError(listResult.error)); + if (statusResult.error) + throw new Error(formatOpencodeError(statusResult.error)); + sessionStatuses.clear(); + for (const [sessionID, status] of Object.entries(statusResult.data ?? {})) { + sessionStatuses.set(sessionID, status); + } + sessions = (listResult.data ?? []).map((session) => toSessionSummary(session)); if (!currentSessionID && sessions[0]) currentSessionID = sessions[0].id; if (currentSessionID && !sessions.some((session) => session.id === currentSessionID)) { @@ -130,139 +173,136 @@ async function refreshMessages() { messages = []; return; } - const data = (await api(`/session/${currentSessionID}/message`, { method: "GET" })); - messages = data.messages; + const result = await clientOrThrow().session.messages(requestArgs({ sessionID: currentSessionID, limit: 200 })); + if (result.error) + throw new Error(formatOpencodeError(result.error)); + messages = (result.data ?? []).map((entry) => toMessage(entry)); } -function upsertSession(summary) { - const existing = sessions.findIndex((session) => session.id === summary.id); - if (existing >= 0) { - sessions[existing] = summary; +function queueSync(request) { + pendingSync.sessions ||= request.sessions === true; + pendingSync.messages ||= request.messages === true; + if (syncTimer) + return; + syncTimer = setTimeout(() => { + syncTimer = null; + const next = { ...pendingSync }; + pendingSync.sessions = false; + pendingSync.messages = false; + void (async () => { + if (next.sessions) + await refreshSessions(); + if (next.messages) + await refreshMessages(); + emitState(); + })().catch((error) => { + pushLog("main", "error", formatOpencodeError(error)); + }); + }, 75); +} +function applyEvent(event) { + if (event.type === "session.created" || event.type === "session.updated" || event.type === "session.deleted") { + queueSync({ sessions: true, messages: event.type !== "session.deleted" }); + return; } - else { - sessions = [summary, ...sessions]; + if (event.type === "session.status" || event.type === "session.idle") { + queueSync({ sessions: true, messages: event.properties.sessionID === currentSessionID }); + return; } - sessions = [...sessions].sort((a, b) => b.updatedAt.localeCompare(a.updatedAt)); -} -function applyHostEvent(event) { - if (event.type === "session.created") { - upsertSession(event.session); - currentSessionID = event.session.id; - messages = []; + if (event.type === "message.updated") { + queueSync({ sessions: true, messages: event.properties.info.sessionID === currentSessionID }); + return; } - if (event.type === "session.updated") { - upsertSession(event.session); + if (event.type === "message.removed") { + queueSync({ messages: event.properties.sessionID === currentSessionID }); + return; } - if (event.type === "message.created" && event.sessionID === currentSessionID) { - messages = [...messages, event.message]; + if (event.type === "message.part.updated") { + queueSync({ messages: event.properties.part.sessionID === currentSessionID }); + return; } - if (event.type === "message.delta" && event.sessionID === currentSessionID) { - messages = messages.map((message) => message.id === event.messageID ? { ...message, text: `${message.text}${event.delta}` } : message); + if (event.type === "message.part.removed") { + queueSync({ messages: event.properties.sessionID === currentSessionID }); + return; } - if (event.type === "run.failed") { - pushLog("host", "error", event.error); + if (event.type === "session.error") { + pushLog("host", "error", formatOpencodeError(event.properties.error)); + queueSync({ sessions: true, messages: event.properties.sessionID === currentSessionID }); + return; } - emitState(); } async function openEventStream() { eventAbort?.abort(); const controller = new AbortController(); eventAbort = controller; - const response = await fetch(`${baseUrl()}/event`, { - method: "GET", - headers: { Accept: "text/event-stream", ...authHeaders() }, - signal: controller.signal, - }); - if (!response.ok || !response.body) - throw new Error(`SSE failed: ${response.status}`); - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - const read = async () => { - while (!controller.signal.aborted) { - const { done, value } = await reader.read(); - if (done) + const stream = await clientOrThrow().event.subscribe(requestArgs(), { signal: controller.signal }); + void (async () => { + for await (const event of stream.stream) { + applyEvent(event); + if (controller.signal.aborted) break; - buffer += decoder.decode(value, { stream: true }); - const frames = buffer.split("\n\n"); - buffer = frames.pop() ?? ""; - for (const frame of frames) { - const dataLine = frame - .split("\n") - .find((line) => line.startsWith("data:")); - if (!dataLine) - continue; - const payload = dataLine.slice(5).trim(); - if (!payload) - continue; - applyHostEvent(JSON.parse(payload)); - } + await new Promise((resolve) => setTimeout(resolve, 0)); + } + })().catch((error) => { + if (!controller.signal.aborted) { + pushLog("main", "error", formatOpencodeError(error)); } - }; - void read().catch((error) => { - if (!controller.signal.aborted) - pushLog("main", "error", String(error)); }); } async function stopLocalHost() { eventAbort?.abort(); eventAbort = null; - if (!localHost) - return; - localHost.kill(); - await once(localHost, "exit").catch(() => undefined); - localHost = null; -} -async function waitForHealth(url) { - for (let attempt = 0; attempt < 80; attempt += 1) { - try { - const response = await fetch(`${url}/global/health`); - if (response.ok) - return; - } - catch { - // Retry until the child is ready. - } - await new Promise((resolve) => setTimeout(resolve, 250)); + if (syncTimer) { + clearTimeout(syncTimer); + syncTimer = null; } - throw new Error("openwork-host did not become healthy in time"); + pendingSync.sessions = false; + pendingSync.messages = false; + connection = { kind: "none" }; + resetSnapshot(); + activeClient = null; + activeDirectory = undefined; + sessionStatuses.clear(); + if (!localOpencode) + return; + localOpencode.server.close(); + localOpencode = null; + pushLog("host", "info", "Embedded opencode server stopped"); } async function connectLocal(workspacePath) { await stopLocalHost(); - await ensureHostBuilt(); - const port = await getPort(); - const child = spawn(process.execPath, [hostEntry], { - cwd: hostPackageRoot, - env: { - ...process.env, - PORT: String(port), - OPENWORK_HOST_WORKSPACE_DIR: workspacePath, - }, - stdio: ["ignore", "pipe", "pipe"], - }); - child.stdout.on("data", (chunk) => pushLog("host", "info", String(chunk).trim())); - child.stderr.on("data", (chunk) => pushLog("host", "error", String(chunk).trim())); - child.on("exit", (code, signal) => pushLog("host", code === 0 ? "info" : "error", `host exited code=${code} signal=${signal}`)); - localHost = child; - const url = `http://127.0.0.1:${port}`; - await waitForHealth(url); - connection = { kind: "local", workspacePath, url }; - sessions = []; - currentSessionID = null; - messages = []; + pushLog("host", "info", `Starting embedded opencode for ${workspacePath}`); + localOpencode = await createOpencode({ hostname: "127.0.0.1", port: 0 }); + activeClient = createOpencodeClient({ baseUrl: localOpencode.server.url, headers: localServerHeaders() }); + const health = await activeClient.global.health(); + if (health.error || health.data?.healthy !== true) { + throw new Error(formatOpencodeError(health.error ?? "Embedded opencode health check failed")); + } + activeDirectory = workspacePath; + connection = { kind: "local", workspacePath, url: localOpencode.server.url }; + resetSnapshot(); await refreshSessions(); await refreshMessages(); await openEventStream(); emitState(); + pushLog("host", "info", `Embedded opencode ready at ${localOpencode.server.url}`); pushLog("main", "info", `Connected local workspace ${workspacePath}`); await saveConnection(); return snapshot(); } async function connectRemote(urlInput, token) { await stopLocalHost(); - connection = { kind: "remote", url: urlInput.replace(/\/$/, ""), token }; - sessions = []; - currentSessionID = null; - messages = []; + const url = urlInput.trim().replace(/\/$/, ""); + const trimmedToken = token.trim(); + const headers = trimmedToken ? { Authorization: `Bearer ${trimmedToken}` } : undefined; + const client = createOpencodeClient({ baseUrl: url, headers }); + const health = await client.global.health(); + if (health.error || health.data?.healthy !== true) { + throw new Error(formatOpencodeError(health.error ?? `Remote opencode health check failed for ${url}`)); + } + activeClient = client; + activeDirectory = undefined; + connection = { kind: "remote", url, token: trimmedToken }; + resetSnapshot(); await refreshSessions(); await refreshMessages(); await openEventStream(); @@ -272,17 +312,22 @@ async function connectRemote(urlInput, token) { return snapshot(); } async function runMainSmokeFlow(promptValue) { - const data = (await api("/session", { method: "POST", body: JSON.stringify({ title: "Smoke chat" }) })); - currentSessionID = data.session.id; - messages = []; - upsertSession(data.session); - pushLog("main", "info", `Smoke created session ${data.session.id}`); - await api(`/session/${data.session.id}/prompt_async`, { - method: "POST", - body: JSON.stringify({ prompt: promptValue }), - }); - pushLog("main", "info", `Smoke prompt submitted for ${data.session.id}`); - await waitForAssistantReply(data.session.id); + const created = await clientOrThrow().session.create(requestArgs({ title: "Smoke chat" })); + if (created.error || !created.data) { + throw new Error(formatOpencodeError(created.error ?? "Smoke session create failed")); + } + currentSessionID = created.data.id; + await refreshSessions(); + await refreshMessages(); + pushLog("main", "info", `Smoke created session ${created.data.id}`); + const prompt = await clientOrThrow().session.promptAsync(requestArgs({ + sessionID: created.data.id, + parts: [{ type: "text", text: promptValue }], + })); + if (prompt.error) + throw new Error(formatOpencodeError(prompt.error)); + pushLog("main", "info", `Smoke prompt submitted for ${created.data.id}`); + await waitForAssistantReply(created.data.id); emitState(); } async function waitForAssistantReply(sessionID, timeoutMs = 30_000) { @@ -291,7 +336,7 @@ async function waitForAssistantReply(sessionID, timeoutMs = 30_000) { currentSessionID = sessionID; await refreshMessages(); await refreshSessions(); - const done = messages.some((message) => message.role === "assistant" && message.text.trim().length > 0); + const done = messages.some((message) => (message.role === "assistant" || message.role === "error") && message.text.trim().length > 0); if (done) { pushLog("main", "info", `Assistant reply observed for ${sessionID}`); return; @@ -329,10 +374,13 @@ ipcMain.handle("ow:connectRemote", async (_event, payload) => { return connectRemote(payload.url, payload.token); }); ipcMain.handle("ow:createSession", async () => { - const data = (await api("/session", { method: "POST", body: JSON.stringify({}) })); - currentSessionID = data.session.id; - messages = []; - upsertSession(data.session); + const created = await clientOrThrow().session.create(requestArgs({})); + if (created.error || !created.data) { + throw new Error(formatOpencodeError(created.error ?? "Session create failed")); + } + currentSessionID = created.data.id; + await refreshSessions(); + await refreshMessages(); emitState(); return snapshot(); }); @@ -344,14 +392,22 @@ ipcMain.handle("ow:selectSession", async (_event, sessionID) => { }); ipcMain.handle("ow:sendPrompt", async (_event, payload) => { pushLog("main", "info", `Sending prompt for ${payload.sessionID}`); - await api(`/session/${payload.sessionID}/prompt_async`, { - method: "POST", - body: JSON.stringify({ prompt: payload.prompt }), - }); + const result = await clientOrThrow().session.promptAsync(requestArgs({ + sessionID: payload.sessionID, + parts: [{ type: "text", text: payload.prompt }], + })); + if (result.error) + throw new Error(formatOpencodeError(result.error)); + await refreshSessions(); + await refreshMessages(); return snapshot(); }); ipcMain.handle("ow:abortSession", async (_event, sessionID) => { - await api(`/session/${sessionID}/abort`, { method: "POST", body: JSON.stringify({}) }); + const result = await clientOrThrow().session.abort(requestArgs({ sessionID })); + if (result.error) + throw new Error(formatOpencodeError(result.error)); + await refreshSessions(); + await refreshMessages(); return snapshot(); }); ipcMain.on("ow:rendererLog", (_event, payload) => { diff --git a/apps/labs/electron/main.ts b/apps/labs/electron/main.ts index 78f06dade..ecb19ec47 100644 --- a/apps/labs/electron/main.ts +++ b/apps/labs/electron/main.ts @@ -1,11 +1,17 @@ import { randomUUID } from "node:crypto"; -import { once } from "node:events"; -import { access, appendFile, mkdir, readFile, writeFile } from "node:fs/promises"; -import net from "node:net"; +import { appendFile, mkdir, readFile, writeFile } from "node:fs/promises"; import path from "node:path"; import { fileURLToPath } from "node:url"; -import { spawn } from "node:child_process"; import { app, BrowserWindow, dialog, ipcMain } from "electron"; +import { + createOpencode, + createOpencodeClient, + type Event as OpencodeEvent, + type Message as OpencodeMessage, + type Part as OpencodePart, + type Session as OpencodeSession, + type SessionStatus as OpencodeSessionStatus, +} from "@opencode-ai/sdk/v2"; type Role = "user" | "assistant" | "error"; @@ -44,20 +50,15 @@ type LogEntry = { at: string; }; -type HostEvent = - | { type: "session.created"; session: SessionSummary } - | { type: "session.updated"; session: SessionSummary } - | { type: "message.created"; sessionID: string; message: Message } - | { type: "message.delta"; sessionID: string; messageID: string; delta: string } - | { type: "run.finished"; sessionID: string } - | { type: "run.failed"; sessionID: string; error: string } - | { type: "ready" }; +type Client = ReturnType; +type EmbeddedOpencode = Awaited>; +type SyncRequest = { + sessions?: boolean; + messages?: boolean; +}; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const packageRoot = path.resolve(__dirname, ".."); -const repoRoot = path.resolve(packageRoot, "..", ".."); -const hostPackageRoot = path.resolve(repoRoot, "apps", "openwork-host"); -const hostEntry = path.resolve(hostPackageRoot, "dist", "index.js"); const rendererUrl = process.env.OPENWORK_LABS_RENDERER_URL ?? "http://127.0.0.1:4174"; const smokeWorkspaceDir = process.env.OPENWORK_LABS_SMOKE_WORKSPACE_DIR ?? ""; const smokePrompt = process.env.OPENWORK_LABS_SMOKE_PROMPT ?? ""; @@ -66,13 +67,18 @@ const configFile = path.join(configDir, "workspace.json"); const logFile = path.join(configDir, "runtime.log"); let mainWindow: BrowserWindow | null = null; -let localHost: ReturnType | null = null; +let localOpencode: EmbeddedOpencode | null = null; +let activeClient: Client | null = null; +let activeDirectory: string | undefined; let eventAbort: AbortController | null = null; +let syncTimer: ReturnType | null = null; let connection: ConnectionState = { kind: "none" }; let sessions: SessionSummary[] = []; let currentSessionID: string | null = null; let messages: Message[] = []; const logs: LogEntry[] = []; +const sessionStatuses = new Map(); +const pendingSync: Required = { sessions: false, messages: false }; type PersistedConnection = Extract; @@ -92,25 +98,6 @@ function pushLog(scope: LogEntry["scope"], level: LogEntry["level"], message: st void appendFile(logFile, `[${entry.at}] [${scope}] [${level}] ${message}\n`).catch(() => undefined); } -async function ensureHostBuilt(): Promise { - try { - await access(hostEntry); - return; - } catch { - pushLog("main", "info", "Building openwork-host before launch"); - const child = spawn("pnpm", ["--filter", "openwork-host", "build"], { - cwd: repoRoot, - stdio: ["ignore", "pipe", "pipe"], - }); - child.stdout.on("data", (chunk) => pushLog("main", "info", String(chunk).trim())); - child.stderr.on("data", (chunk) => pushLog("main", "error", String(chunk).trim())); - const [code] = (await once(child, "exit")) as [number | null]; - if (code !== 0) { - throw new Error(`openwork-host build failed with code ${code}`); - } - } -} - async function saveConnection(): Promise { await mkdir(configDir, { recursive: true }); if (connection.kind === "none") { @@ -132,47 +119,112 @@ async function loadConnection(): Promise { } } -async function getPort(): Promise { - const server = net.createServer(); - server.listen(0, "127.0.0.1"); - await once(server, "listening"); - const address = server.address(); - server.close(); - if (!address || typeof address === "string") throw new Error("Unable to allocate port"); - return address.port; +function clientOrThrow(): Client { + if (!activeClient) { + throw new Error("No active opencode connection"); + } + return activeClient; } -function baseUrl(): string { - if (connection.kind === "local" || connection.kind === "remote") return connection.url; - throw new Error("No active connection"); +function requestArgs>(input?: T): T & { directory?: string } { + if (!activeDirectory) { + return { ...(input ?? {}) } as T & { directory?: string }; + } + return { ...(input ?? {}), directory: activeDirectory } as T & { directory?: string }; } -function authHeaders(): Record { - if (connection.kind === "remote" && connection.token) { - return { Authorization: `Bearer ${connection.token}` }; +function formatOpencodeError(error: unknown): string { + if (!error) return "Unknown error"; + if (error instanceof Error) return error.message; + if (typeof error === "string") return error; + if (typeof error === "object") { + const value = error as { name?: unknown; data?: { message?: unknown } }; + if (typeof value.data?.message === "string" && value.data.message.trim()) { + return value.data.message; + } + if (typeof value.name === "string" && value.name.trim()) { + return value.name; + } } - return {}; + return JSON.stringify(error); } -async function api(pathname: string, init?: RequestInit): Promise { - const response = await fetch(`${baseUrl()}${pathname}`, { - ...init, - headers: { - "content-type": "application/json", - ...authHeaders(), - ...(init?.headers ?? {}), - }, - }); - if (!response.ok) { - const text = await response.text(); - throw new Error(text || `${response.status} ${response.statusText}`); +function basicAuthHeader(username: string, password: string): string { + return `Basic ${Buffer.from(`${username}:${password}`, "utf8").toString("base64")}`; +} + +function localServerHeaders(): Record | undefined { + const password = process.env.OPENCODE_SERVER_PASSWORD?.trim(); + if (!password) return undefined; + const username = process.env.OPENCODE_SERVER_USERNAME?.trim() || "opencode"; + return { Authorization: basicAuthHeader(username, password) }; +} + +function isSessionActive(status: OpencodeSessionStatus | undefined): boolean { + return status?.type === "busy" || status?.type === "retry"; +} + +function toSessionSummary(session: OpencodeSession): SessionSummary { + return { + id: session.id, + title: session.title || "Untitled chat", + createdAt: new Date(session.time.created).toISOString(), + updatedAt: new Date(session.time.updated).toISOString(), + active: isSessionActive(sessionStatuses.get(session.id)), + }; +} + +function partText(part: OpencodePart): string { + if (part.type === "text") { + return part.text; + } + if (part.type === "tool" && part.state.status === "error") { + return `Tool ${part.tool} failed: ${part.state.error}`; } - return response.json(); + return ""; +} + +function messageText(info: OpencodeMessage, parts: OpencodePart[]): string { + const text = parts + .map((part) => partText(part)) + .filter((value) => value.length > 0) + .join(""); + if (text.trim()) return text; + if (info.role === "assistant" && info.error) return formatOpencodeError(info.error); + return ""; +} + +function toMessage(entry: { info: OpencodeMessage; parts: OpencodePart[] }): Message { + return { + id: entry.info.id, + role: entry.info.role === "assistant" && entry.info.error ? "error" : entry.info.role, + text: messageText(entry.info, entry.parts), + createdAt: new Date(entry.info.time.created).toISOString(), + }; +} + +function resetSnapshot(): void { + sessions = []; + currentSessionID = null; + messages = []; + sessionStatuses.clear(); } async function refreshSessions(): Promise { - const data = (await api("/session", { method: "GET" })) as { sessions: SessionSummary[] }; - sessions = data.sessions; + const sdk = clientOrThrow(); + const [listResult, statusResult] = await Promise.all([ + sdk.session.list(requestArgs({ roots: true, limit: 100 })), + sdk.session.status(requestArgs()), + ]); + if (listResult.error) throw new Error(formatOpencodeError(listResult.error)); + if (statusResult.error) throw new Error(formatOpencodeError(statusResult.error)); + + sessionStatuses.clear(); + for (const [sessionID, status] of Object.entries(statusResult.data ?? {})) { + sessionStatuses.set(sessionID, status); + } + + sessions = (listResult.data ?? []).map((session) => toSessionSummary(session)); if (!currentSessionID && sessions[0]) currentSessionID = sessions[0].id; if (currentSessionID && !sessions.some((session) => session.id === currentSessionID)) { currentSessionID = sessions[0]?.id ?? null; @@ -184,132 +236,127 @@ async function refreshMessages(): Promise { messages = []; return; } - const data = (await api(`/session/${currentSessionID}/message`, { method: "GET" })) as { messages: Message[] }; - messages = data.messages; + const result = await clientOrThrow().session.messages(requestArgs({ sessionID: currentSessionID, limit: 200 })); + if (result.error) throw new Error(formatOpencodeError(result.error)); + messages = (result.data ?? []).map((entry) => toMessage(entry)); } -function upsertSession(summary: SessionSummary): void { - const existing = sessions.findIndex((session) => session.id === summary.id); - if (existing >= 0) { - sessions[existing] = summary; - } else { - sessions = [summary, ...sessions]; - } - sessions = [...sessions].sort((a, b) => b.updatedAt.localeCompare(a.updatedAt)); +function queueSync(request: SyncRequest): void { + pendingSync.sessions ||= request.sessions === true; + pendingSync.messages ||= request.messages === true; + if (syncTimer) return; + + syncTimer = setTimeout(() => { + syncTimer = null; + const next = { ...pendingSync }; + pendingSync.sessions = false; + pendingSync.messages = false; + + void (async () => { + if (next.sessions) await refreshSessions(); + if (next.messages) await refreshMessages(); + emitState(); + })().catch((error: unknown) => { + pushLog("main", "error", formatOpencodeError(error)); + }); + }, 75); } -function applyHostEvent(event: HostEvent): void { - if (event.type === "session.created") { - upsertSession(event.session); - currentSessionID = event.session.id; - messages = []; +function applyEvent(event: OpencodeEvent): void { + if (event.type === "session.created" || event.type === "session.updated" || event.type === "session.deleted") { + queueSync({ sessions: true, messages: event.type !== "session.deleted" }); + return; } - if (event.type === "session.updated") { - upsertSession(event.session); + + if (event.type === "session.status" || event.type === "session.idle") { + queueSync({ sessions: true, messages: event.properties.sessionID === currentSessionID }); + return; } - if (event.type === "message.created" && event.sessionID === currentSessionID) { - messages = [...messages, event.message]; + + if (event.type === "message.updated") { + queueSync({ sessions: true, messages: event.properties.info.sessionID === currentSessionID }); + return; } - if (event.type === "message.delta" && event.sessionID === currentSessionID) { - messages = messages.map((message) => - message.id === event.messageID ? { ...message, text: `${message.text}${event.delta}` } : message, - ); + + if (event.type === "message.removed") { + queueSync({ messages: event.properties.sessionID === currentSessionID }); + return; } - if (event.type === "run.failed") { - pushLog("host", "error", event.error); + + if (event.type === "message.part.updated") { + queueSync({ messages: event.properties.part.sessionID === currentSessionID }); + return; + } + + if (event.type === "message.part.removed") { + queueSync({ messages: event.properties.sessionID === currentSessionID }); + return; + } + + if (event.type === "session.error") { + pushLog("host", "error", formatOpencodeError(event.properties.error)); + queueSync({ sessions: true, messages: event.properties.sessionID === currentSessionID }); + return; } - emitState(); } async function openEventStream(): Promise { eventAbort?.abort(); const controller = new AbortController(); eventAbort = controller; - const response = await fetch(`${baseUrl()}/event`, { - method: "GET", - headers: { Accept: "text/event-stream", ...authHeaders() }, - signal: controller.signal, - }); - if (!response.ok || !response.body) throw new Error(`SSE failed: ${response.status}`); - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - const read = async (): Promise => { - while (!controller.signal.aborted) { - const { done, value } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); - const frames = buffer.split("\n\n"); - buffer = frames.pop() ?? ""; - for (const frame of frames) { - const dataLine = frame - .split("\n") - .find((line) => line.startsWith("data:")); - if (!dataLine) continue; - const payload = dataLine.slice(5).trim(); - if (!payload) continue; - applyHostEvent(JSON.parse(payload) as HostEvent); - } - } - }; + const stream = await clientOrThrow().event.subscribe(requestArgs(), { signal: controller.signal }); - void read().catch((error: unknown) => { - if (!controller.signal.aborted) pushLog("main", "error", String(error)); + void (async () => { + for await (const event of stream.stream as AsyncIterable) { + applyEvent(event); + if (controller.signal.aborted) break; + await new Promise((resolve) => setTimeout(resolve, 0)); + } + })().catch((error: unknown) => { + if (!controller.signal.aborted) { + pushLog("main", "error", formatOpencodeError(error)); + } }); } async function stopLocalHost(): Promise { eventAbort?.abort(); eventAbort = null; - if (!localHost) return; - localHost.kill(); - await once(localHost, "exit").catch(() => undefined); - localHost = null; -} - -async function waitForHealth(url: string): Promise { - for (let attempt = 0; attempt < 80; attempt += 1) { - try { - const response = await fetch(`${url}/global/health`); - if (response.ok) return; - } catch { - // Retry until the child is ready. - } - await new Promise((resolve) => setTimeout(resolve, 250)); + if (syncTimer) { + clearTimeout(syncTimer); + syncTimer = null; } - throw new Error("openwork-host did not become healthy in time"); + pendingSync.sessions = false; + pendingSync.messages = false; + connection = { kind: "none" }; + resetSnapshot(); + activeClient = null; + activeDirectory = undefined; + sessionStatuses.clear(); + if (!localOpencode) return; + localOpencode.server.close(); + localOpencode = null; + pushLog("host", "info", "Embedded opencode server stopped"); } async function connectLocal(workspacePath: string): Promise { await stopLocalHost(); - await ensureHostBuilt(); - const port = await getPort(); - const child = spawn(process.execPath, [hostEntry], { - cwd: hostPackageRoot, - env: { - ...process.env, - PORT: String(port), - OPENWORK_HOST_WORKSPACE_DIR: workspacePath, - }, - stdio: ["ignore", "pipe", "pipe"], - }); - child.stdout.on("data", (chunk) => pushLog("host", "info", String(chunk).trim())); - child.stderr.on("data", (chunk) => pushLog("host", "error", String(chunk).trim())); - child.on("exit", (code, signal) => pushLog("host", code === 0 ? "info" : "error", `host exited code=${code} signal=${signal}`)); - localHost = child; - - const url = `http://127.0.0.1:${port}`; - await waitForHealth(url); - connection = { kind: "local", workspacePath, url }; - sessions = []; - currentSessionID = null; - messages = []; + pushLog("host", "info", `Starting embedded opencode for ${workspacePath}`); + localOpencode = await createOpencode({ hostname: "127.0.0.1", port: 0 }); + activeClient = createOpencodeClient({ baseUrl: localOpencode.server.url, headers: localServerHeaders() }); + const health = await activeClient.global.health(); + if (health.error || health.data?.healthy !== true) { + throw new Error(formatOpencodeError(health.error ?? "Embedded opencode health check failed")); + } + + activeDirectory = workspacePath; + connection = { kind: "local", workspacePath, url: localOpencode.server.url }; + resetSnapshot(); await refreshSessions(); await refreshMessages(); await openEventStream(); emitState(); + pushLog("host", "info", `Embedded opencode ready at ${localOpencode.server.url}`); pushLog("main", "info", `Connected local workspace ${workspacePath}`); await saveConnection(); return snapshot(); @@ -317,10 +364,19 @@ async function connectLocal(workspacePath: string): Promise { async function connectRemote(urlInput: string, token: string): Promise { await stopLocalHost(); - connection = { kind: "remote", url: urlInput.replace(/\/$/, ""), token }; - sessions = []; - currentSessionID = null; - messages = []; + const url = urlInput.trim().replace(/\/$/, ""); + const trimmedToken = token.trim(); + const headers = trimmedToken ? { Authorization: `Bearer ${trimmedToken}` } : undefined; + const client = createOpencodeClient({ baseUrl: url, headers }); + const health = await client.global.health(); + if (health.error || health.data?.healthy !== true) { + throw new Error(formatOpencodeError(health.error ?? `Remote opencode health check failed for ${url}`)); + } + + activeClient = client; + activeDirectory = undefined; + connection = { kind: "remote", url, token: trimmedToken }; + resetSnapshot(); await refreshSessions(); await refreshMessages(); await openEventStream(); @@ -331,19 +387,26 @@ async function connectRemote(urlInput: string, token: string): Promise } async function runMainSmokeFlow(promptValue: string): Promise { - const data = (await api("/session", { method: "POST", body: JSON.stringify({ title: "Smoke chat" }) })) as { - session: SessionSummary; - }; - currentSessionID = data.session.id; - messages = []; - upsertSession(data.session); - pushLog("main", "info", `Smoke created session ${data.session.id}`); - await api(`/session/${data.session.id}/prompt_async`, { - method: "POST", - body: JSON.stringify({ prompt: promptValue }), - }); - pushLog("main", "info", `Smoke prompt submitted for ${data.session.id}`); - await waitForAssistantReply(data.session.id); + const created = await clientOrThrow().session.create(requestArgs({ title: "Smoke chat" })); + if (created.error || !created.data) { + throw new Error(formatOpencodeError(created.error ?? "Smoke session create failed")); + } + + currentSessionID = created.data.id; + await refreshSessions(); + await refreshMessages(); + pushLog("main", "info", `Smoke created session ${created.data.id}`); + + const prompt = await clientOrThrow().session.promptAsync( + requestArgs({ + sessionID: created.data.id, + parts: [{ type: "text", text: promptValue }], + }), + ); + if (prompt.error) throw new Error(formatOpencodeError(prompt.error)); + + pushLog("main", "info", `Smoke prompt submitted for ${created.data.id}`); + await waitForAssistantReply(created.data.id); emitState(); } @@ -353,7 +416,9 @@ async function waitForAssistantReply(sessionID: string, timeoutMs = 30_000): Pro currentSessionID = sessionID; await refreshMessages(); await refreshSessions(); - const done = messages.some((message) => message.role === "assistant" && message.text.trim().length > 0); + const done = messages.some( + (message) => (message.role === "assistant" || message.role === "error") && message.text.trim().length > 0, + ); if (done) { pushLog("main", "info", `Assistant reply observed for ${sessionID}`); return; @@ -392,10 +457,13 @@ ipcMain.handle("ow:connectRemote", async (_event, payload: { url: string; token: return connectRemote(payload.url, payload.token); }); ipcMain.handle("ow:createSession", async () => { - const data = (await api("/session", { method: "POST", body: JSON.stringify({}) })) as { session: SessionSummary }; - currentSessionID = data.session.id; - messages = []; - upsertSession(data.session); + const created = await clientOrThrow().session.create(requestArgs({})); + if (created.error || !created.data) { + throw new Error(formatOpencodeError(created.error ?? "Session create failed")); + } + currentSessionID = created.data.id; + await refreshSessions(); + await refreshMessages(); emitState(); return snapshot(); }); @@ -407,14 +475,22 @@ ipcMain.handle("ow:selectSession", async (_event, sessionID: string) => { }); ipcMain.handle("ow:sendPrompt", async (_event, payload: { sessionID: string; prompt: string }) => { pushLog("main", "info", `Sending prompt for ${payload.sessionID}`); - await api(`/session/${payload.sessionID}/prompt_async`, { - method: "POST", - body: JSON.stringify({ prompt: payload.prompt }), - }); + const result = await clientOrThrow().session.promptAsync( + requestArgs({ + sessionID: payload.sessionID, + parts: [{ type: "text", text: payload.prompt }], + }), + ); + if (result.error) throw new Error(formatOpencodeError(result.error)); + await refreshSessions(); + await refreshMessages(); return snapshot(); }); ipcMain.handle("ow:abortSession", async (_event, sessionID: string) => { - await api(`/session/${sessionID}/abort`, { method: "POST", body: JSON.stringify({}) }); + const result = await clientOrThrow().session.abort(requestArgs({ sessionID })); + if (result.error) throw new Error(formatOpencodeError(result.error)); + await refreshSessions(); + await refreshMessages(); return snapshot(); }); ipcMain.on("ow:rendererLog", (_event, payload: { level: string; message: string }) => { diff --git a/apps/labs/package.json b/apps/labs/package.json index 64d7088d2..2a17f084e 100644 --- a/apps/labs/package.json +++ b/apps/labs/package.json @@ -12,6 +12,7 @@ "start": "electron ./dist-electron/main.js" }, "dependencies": { + "@opencode-ai/sdk": "^1.1.31", "react": "19.2.4", "react-dom": "19.2.4" }, diff --git a/apps/labs/src/app.tsx b/apps/labs/src/app.tsx index f2504fb08..71dc33102 100644 --- a/apps/labs/src/app.tsx +++ b/apps/labs/src/app.tsx @@ -50,18 +50,18 @@ export function App() { {snapshot.connection.kind === "none" ? ( <>
setRemoteUrl(event.target.value)} placeholder="Remote URL" /> - setRemoteToken(event.target.value)} placeholder="Token" /> - + setRemoteToken(event.target.value)} placeholder="Bearer token (optional)" /> +
) : ( <>
-
{snapshot.connection.kind === "local" ? "Local runtime" : "Remote runtime"}
+
{snapshot.connection.kind === "local" ? "Embedded opencode" : "Remote opencode"}
{snapshot.connection.kind === "local" ? snapshot.connection.workspacePath : snapshot.connection.url}
setRemoteUrl(event.target.value)} placeholder="Remote URL" /> - setRemoteToken(event.target.value)} placeholder="Bearer token (optional)" /> - + setRemoteToken(event.target.value)} placeholder="Token" /> +
) : ( <>
-
{snapshot.connection.kind === "local" ? "Embedded opencode" : "Remote opencode"}
+
{snapshot.connection.kind === "local" ? "Local openwork-host" : "Remote runtime"}
{snapshot.connection.kind === "local" ? snapshot.connection.workspacePath : snapshot.connection.url}