Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions apps/labs/dist-electron/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ const logFile = path.join(configDir, "runtime.log");
let mainWindow = null;
let localHost = null;
let eventAbort = null;
let syncTimer = null;
let connection = { kind: "none" };
let sessions = [];
let currentSessionID = null;
let messages = [];
const logs = [];
const pendingSync = { sessions: false, messages: false };
function snapshot() {
return { connection, sessions, currentSessionID, messages };
}
Expand Down Expand Up @@ -116,6 +118,9 @@ async function api(pathname, init) {
}
return response.json();
}
function formatHostError(error) {
return error instanceof Error ? error.message : String(error);
}
async function refreshSessions() {
const data = (await api("/session", { method: "GET" }));
sessions = data.sessions;
Expand All @@ -133,35 +138,58 @@ async function refreshMessages() {
const data = (await api(`/session/${currentSessionID}/message`, { method: "GET" }));
messages = data.messages;
}
function upsertSession(summary) {
const existing = sessions.findIndex((session) => session.id === summary.id);
if (existing >= 0) {
sessions[existing] = summary;
}
else {
sessions = [summary, ...sessions];
}
sessions = [...sessions].sort((a, b) => b.updatedAt.localeCompare(a.updatedAt));
function queueSync(request) {
pendingSync.sessions ||= request.sessions === true;
pendingSync.messages ||= request.messages === true;
if (syncTimer)
return;
syncTimer = setTimeout(() => {
syncTimer = null;
const next = { ...pendingSync };
pendingSync.sessions = false;
pendingSync.messages = false;
void (async () => {
if (next.sessions)
await refreshSessions();
if (next.messages)
await refreshMessages();
emitState();
})().catch((error) => {
pushLog("main", "error", formatHostError(error));
});
}, 75);
}
function applyHostEvent(event) {
if (event.type === "session.created") {
upsertSession(event.session);
currentSessionID = event.session.id;
messages = [];
}
if (event.type === "session.updated") {
upsertSession(event.session);
if (event.type === "ready") {
return;
}
if (event.type === "message.created" && event.sessionID === currentSessionID) {
messages = [...messages, event.message];
if (event.type === "sessions.changed") {
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}
if (event.type === "message.delta" && event.sessionID === currentSessionID) {
messages = messages.map((message) => message.id === event.messageID ? { ...message, text: `${message.text}${event.delta}` } : message);
if (event.type === "messages.changed") {
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}
if (event.type === "run.failed") {
pushLog("host", "error", event.error);
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}
emitState();
}
async function waitForHealth(url) {
for (let attempt = 0; attempt < 80; attempt += 1) {
try {
const response = await fetch(`${url}/global/health`);
if (response.ok)
return;
}
catch {
// Retry until the child is ready.
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error("openwork-host did not become healthy in time");
}
async function openEventStream() {
eventAbort?.abort();
Expand Down Expand Up @@ -206,26 +234,18 @@ async function openEventStream() {
async function stopLocalHost() {
eventAbort?.abort();
eventAbort = null;
if (syncTimer) {
clearTimeout(syncTimer);
syncTimer = null;
}
pendingSync.sessions = false;
pendingSync.messages = false;
if (!localHost)
return;
localHost.kill();
await once(localHost, "exit").catch(() => undefined);
localHost = null;
}
async function waitForHealth(url) {
for (let attempt = 0; attempt < 80; attempt += 1) {
try {
const response = await fetch(`${url}/global/health`);
if (response.ok)
return;
}
catch {
// Retry until the child is ready.
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error("openwork-host did not become healthy in time");
}
async function connectLocal(workspacePath) {
await stopLocalHost();
await ensureHostBuilt();
Expand Down Expand Up @@ -274,8 +294,8 @@ async function connectRemote(urlInput, token) {
async function runMainSmokeFlow(promptValue) {
const data = (await api("/session", { method: "POST", body: JSON.stringify({ title: "Smoke chat" }) }));
currentSessionID = data.session.id;
messages = [];
upsertSession(data.session);
await refreshMessages();
await refreshSessions();
pushLog("main", "info", `Smoke created session ${data.session.id}`);
await api(`/session/${data.session.id}/prompt_async`, {
method: "POST",
Expand Down Expand Up @@ -332,7 +352,7 @@ ipcMain.handle("ow:createSession", async () => {
const data = (await api("/session", { method: "POST", body: JSON.stringify({}) }));
currentSessionID = data.session.id;
messages = [];
upsertSession(data.session);
await refreshSessions();
emitState();
return snapshot();
});
Expand Down
114 changes: 69 additions & 45 deletions apps/labs/electron/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ type LogEntry = {
};

type HostEvent =
| { type: "session.created"; session: SessionSummary }
| { type: "session.updated"; session: SessionSummary }
| { type: "message.created"; sessionID: string; message: Message }
| { type: "message.delta"; sessionID: string; messageID: string; delta: string }
| { type: "run.finished"; sessionID: string }
| { type: "run.failed"; sessionID: string; error: string }
| { type: "ready" };
| { type: "ready" }
| { type: "sessions.changed"; sessionID?: string }
| { type: "messages.changed"; sessionID: string }
| { type: "run.failed"; sessionID?: string; error: string };

type SyncRequest = {
sessions?: boolean;
messages?: boolean;
};

const __dirname = path.dirname(fileURLToPath(import.meta.url));
const packageRoot = path.resolve(__dirname, "..");
Expand All @@ -68,11 +70,13 @@ const logFile = path.join(configDir, "runtime.log");
let mainWindow: BrowserWindow | null = null;
let localHost: ReturnType<typeof spawn> | null = null;
let eventAbort: AbortController | null = null;
let syncTimer: ReturnType<typeof setTimeout> | null = null;
let connection: ConnectionState = { kind: "none" };
let sessions: SessionSummary[] = [];
let currentSessionID: string | null = null;
let messages: Message[] = [];
const logs: LogEntry[] = [];
const pendingSync: Required<SyncRequest> = { sessions: false, messages: false };

type PersistedConnection = Extract<ConnectionState, { kind: "local" | "remote" }>;

Expand Down Expand Up @@ -170,6 +174,10 @@ async function api(pathname: string, init?: RequestInit): Promise<unknown> {
return response.json();
}

function formatHostError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

async function refreshSessions(): Promise<void> {
const data = (await api("/session", { method: "GET" })) as { sessions: SessionSummary[] };
sessions = data.sessions;
Expand All @@ -188,37 +196,60 @@ async function refreshMessages(): Promise<void> {
messages = data.messages;
}

function upsertSession(summary: SessionSummary): void {
const existing = sessions.findIndex((session) => session.id === summary.id);
if (existing >= 0) {
sessions[existing] = summary;
} else {
sessions = [summary, ...sessions];
}
sessions = [...sessions].sort((a, b) => b.updatedAt.localeCompare(a.updatedAt));
function queueSync(request: SyncRequest): void {
pendingSync.sessions ||= request.sessions === true;
pendingSync.messages ||= request.messages === true;
if (syncTimer) return;

syncTimer = setTimeout(() => {
syncTimer = null;
const next = { ...pendingSync };
pendingSync.sessions = false;
pendingSync.messages = false;

void (async () => {
if (next.sessions) await refreshSessions();
if (next.messages) await refreshMessages();
emitState();
})().catch((error: unknown) => {
pushLog("main", "error", formatHostError(error));
});
}, 75);
}

function applyHostEvent(event: HostEvent): void {
if (event.type === "session.created") {
upsertSession(event.session);
currentSessionID = event.session.id;
messages = [];
}
if (event.type === "session.updated") {
upsertSession(event.session);
if (event.type === "ready") {
return;
}
if (event.type === "message.created" && event.sessionID === currentSessionID) {
messages = [...messages, event.message];

if (event.type === "sessions.changed") {
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}
if (event.type === "message.delta" && event.sessionID === currentSessionID) {
messages = messages.map((message) =>
message.id === event.messageID ? { ...message, text: `${message.text}${event.delta}` } : message,
);

if (event.type === "messages.changed") {
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}

if (event.type === "run.failed") {
pushLog("host", "error", event.error);
queueSync({ sessions: true, messages: event.sessionID === currentSessionID });
return;
}
emitState();
}

async function waitForHealth(url: string): Promise<void> {
for (let attempt = 0; attempt < 80; attempt += 1) {
try {
const response = await fetch(`${url}/global/health`);
if (response.ok) return;
} catch {
// Retry until the child is ready.
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error("openwork-host did not become healthy in time");
}

async function openEventStream(): Promise<void> {
Expand Down Expand Up @@ -263,25 +294,18 @@ async function openEventStream(): Promise<void> {
async function stopLocalHost(): Promise<void> {
eventAbort?.abort();
eventAbort = null;
if (syncTimer) {
clearTimeout(syncTimer);
syncTimer = null;
}
pendingSync.sessions = false;
pendingSync.messages = false;
if (!localHost) return;
localHost.kill();
await once(localHost, "exit").catch(() => undefined);
localHost = null;
}

async function waitForHealth(url: string): Promise<void> {
for (let attempt = 0; attempt < 80; attempt += 1) {
try {
const response = await fetch(`${url}/global/health`);
if (response.ok) return;
} catch {
// Retry until the child is ready.
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error("openwork-host did not become healthy in time");
}

async function connectLocal(workspacePath: string): Promise<Snapshot> {
await stopLocalHost();
await ensureHostBuilt();
Expand Down Expand Up @@ -335,8 +359,8 @@ async function runMainSmokeFlow(promptValue: string): Promise<void> {
session: SessionSummary;
};
currentSessionID = data.session.id;
messages = [];
upsertSession(data.session);
await refreshMessages();
await refreshSessions();
pushLog("main", "info", `Smoke created session ${data.session.id}`);
await api(`/session/${data.session.id}/prompt_async`, {
method: "POST",
Expand Down Expand Up @@ -395,7 +419,7 @@ ipcMain.handle("ow:createSession", async () => {
const data = (await api("/session", { method: "POST", body: JSON.stringify({}) })) as { session: SessionSummary };
currentSessionID = data.session.id;
messages = [];
upsertSession(data.session);
await refreshSessions();
emitState();
return snapshot();
});
Expand Down
2 changes: 1 addition & 1 deletion apps/labs/src/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export function App() {
) : (
<>
<div className="connection">
<div>{snapshot.connection.kind === "local" ? "Local runtime" : "Remote runtime"}</div>
<div>{snapshot.connection.kind === "local" ? "Local openwork-host" : "Remote runtime"}</div>
<code>{snapshot.connection.kind === "local" ? snapshot.connection.workspacePath : snapshot.connection.url}</code>
</div>
<button className="primary" onClick={() => void window.openwork.createSession()}>
Expand Down
5 changes: 3 additions & 2 deletions apps/openwork-host/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
"private": true,
"version": "0.0.0",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc -p tsconfig.json",
"start": "node dist/index.js",
"typecheck": "tsc -p tsconfig.json --noEmit"
},
"dependencies": {
"@mariozechner/pi-ai": "^0.64.0",
"@mariozechner/pi-coding-agent": "^0.64.0"
"@opencode-ai/sdk": "^1.1.31"
},
"devDependencies": {
"@types/node": "^24.3.0",
Expand Down
Loading
Loading