diff --git a/src/adapters/workspace-log-sink.ts b/src/adapters/workspace-log-sink.ts index 81e0dab3..95cfd753 100644 --- a/src/adapters/workspace-log-sink.ts +++ b/src/adapters/workspace-log-sink.ts @@ -9,6 +9,7 @@ const WORKSPACE_EVENTS = new Set([ "bundle.crashed", "bundle.recovered", "bundle.dead", + "bundle.startFailed", "data.changed", "config.changed", "skill.created", diff --git a/src/api/events.ts b/src/api/events.ts index 16528e84..8dbadb18 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -49,6 +49,7 @@ const SSE_ROUTES: Partial> = { "bundle.crashed": { scope: "workspace", wsIdField: "wsId" }, "bundle.recovered": { scope: "workspace", wsIdField: "wsId" }, "bundle.dead": { scope: "workspace", wsIdField: "wsId" }, + "bundle.startFailed": { scope: "workspace", wsIdField: "wsId" }, // Per-principal connection state — workspace-scoped. Drives the // pending-auth banner; without forwarding here, the banner never auto-clears // after a user completes interactive OAuth. diff --git a/src/api/server.ts b/src/api/server.ts index 1ce7bc04..a96c3b18 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -62,7 +62,9 @@ export function startServer(options: ServerOptions): ServerHandle { const internalToken = runtime.getInternalToken(); const mcpSources = runtime.mcpSources(); - const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink()); + const healthMonitor = new HealthMonitor(mcpSources, runtime.getEventSink(), { + startFailures: runtime.bundleStartFailures(), + }); healthMonitor.start(); // SSE event manager — listens to runtime events and broadcasts to clients diff --git a/src/engine/types.ts b/src/engine/types.ts index 1729b34d..6ffa11e6 100644 --- a/src/engine/types.ts +++ b/src/engine/types.ts @@ -66,6 +66,12 @@ export type EngineEventType = | "bundle.crashed" | "bundle.recovered" | "bundle.dead" + /** + * Bundle failed to start at boot (no McpSource was ever produced). + * Distinct from `bundle.crashed`, which requires a running source that + * went away. Payload: { wsId, serverName, bundleName, error }. + */ + | "bundle.startFailed" /** * Per-principal connection state change for a remote URL bundle. * Payload: { wsId, serverName, principalId, state, authorizationUrl? }. diff --git a/src/runtime/runtime.ts b/src/runtime/runtime.ts index 5814140d..98eb55f6 100644 --- a/src/runtime/runtime.ts +++ b/src/runtime/runtime.ts @@ -192,6 +192,13 @@ export class Runtime { _systemSource: import("../tools/types.ts").ToolSource | null; /** Platform sources (home, conversations, files, etc.) — retained for JIT workspace registration. */ private _platformSources: import("../tools/types.ts").ToolSource[] = []; + /** + * Boot-time bundle startup failures recorded by `startWorkspaceBundles`. + * These never produced an McpSource; HealthMonitor reads this list at + * construction so `/v1/health` reports the failures as terminal `dead` + * entries rather than silently omitting them. + */ + private _bundleStartFailures: import("./workspace-runtime.ts").BundleStartFailure[] = []; /** * Domain-context getter for the automations bundle. Set by the * automations source factory; consumed by internal callers (CLI's @@ -540,13 +547,24 @@ export class Runtime { // Phase 3: Start workspace bundles with per-workspace registries const configDir = config.configPath ? dirname(config.configPath) : undefined; - const { registries: workspaceRegistries, entries: workspaceBundleEntries } = - await startWorkspaceBundles(workspaceStore, platformSources, systemTools, events, configDir, { + const { + registries: workspaceRegistries, + entries: workspaceBundleEntries, + failures: bundleStartFailures, + } = await startWorkspaceBundles( + workspaceStore, + platformSources, + systemTools, + events, + configDir, + { workDir: resolveWorkDir(config), allowInsecureRemotes: config.allowInsecureRemotes, - }); + }, + ); rt._workspaceRegistries = workspaceRegistries; rt._platformSources = platformSources; + rt._bundleStartFailures = bundleStartFailures; // Wire the workspace registries into lifecycle so workspace-scope // startAuth / disconnect / install can add+remove sources without @@ -1083,6 +1101,14 @@ export class Runtime { return [...names]; } + /** + * Boot-time bundle startup failures. Read once at HealthMonitor + * construction so failed bundles appear as `dead` in `/v1/health`. + */ + bundleStartFailures(): import("./workspace-runtime.ts").BundleStartFailure[] { + return [...this._bundleStartFailures]; + } + /** Get MCP sources across all workspace registries (for health monitoring). */ mcpSources(): McpSource[] { const sources: McpSource[] = []; diff --git a/src/runtime/workspace-runtime.ts b/src/runtime/workspace-runtime.ts index d4f1a009..f3cf85dc 100644 --- a/src/runtime/workspace-runtime.ts +++ b/src/runtime/workspace-runtime.ts @@ -22,6 +22,19 @@ import type { WorkspaceStore } from "../workspace/workspace-store.ts"; // Types // --------------------------------------------------------------------------- +/** + * A boot-time bundle startup failure — recorded when `startBundleSource` + * throws. Surfaced via `bundle.startFailed` event (workspace log + SSE) + * and threaded into HealthMonitor so `/v1/health` reports the failed + * bundle as `state: "dead"` rather than omitting it entirely. + */ +export interface BundleStartFailure { + wsId: string; + serverName: string; + bundleName: string; + error: string; +} + /** A single entry in the process inventory — one per (workspace, bundle) pair. */ export interface ProcessInventoryEntry { /** Workspace id (e.g., "ws_engineering"). */ @@ -133,7 +146,11 @@ export async function startWorkspaceBundles( allowInsecureRemotes?: boolean; workDir?: string; }, -): Promise<{ registries: Map; entries: ProcessInventoryEntry[] }> { +): Promise<{ + registries: Map; + entries: ProcessInventoryEntry[]; + failures: BundleStartFailure[]; +}> { const workDir = opts?.workDir ?? join(process.env.NB_WORK_DIR ?? "", ".nimblebrain"); const workspaces = await workspaceStore.list(); const inventory = buildProcessInventory(workspaces, workDir); @@ -174,6 +191,7 @@ export async function startWorkspaceBundles( wsEntries.map((entry) => ({ wsId, entry })), ); const resultEntries: ProcessInventoryEntry[] = new Array(flat.length); + const failures: BundleStartFailure[] = []; const concurrency = resolveBundleStartConcurrency(); const startMs = Date.now(); @@ -260,9 +278,18 @@ export async function startWorkspaceBundles( resultEntries[idx] = { ...entry, serverName: result.sourceName, meta: result.meta }; } catch (err) { const msg = err instanceof Error ? err.message : String(err); + const bundleName = bundleNameFromRef(entry.bundle); process.stderr.write( `[workspace-runtime] Failed to start ${entry.serverName} in ${wsId}: ${msg}\n`, ); + // Persistent observability: workspace log (via WorkspaceLogSink) + + // SSE broadcast (via SseEventManager). Without this, operators have + // to grep container stderr to discover a failed boot. + eventSink.emit({ + type: "bundle.startFailed", + data: { wsId, serverName: entry.serverName, bundleName, error: msg }, + }); + failures.push({ wsId, serverName: entry.serverName, bundleName, error: msg }); } }); @@ -273,7 +300,7 @@ export async function startWorkspaceBundles( `[workspace-runtime] Started ${finalEntries.length}/${flat.length} bundles in ${elapsedMs}ms (concurrency=${concurrency})`, ); } - return { registries, entries: finalEntries }; + return { registries, entries: finalEntries, failures }; } /** diff --git a/src/tools/health-monitor.ts b/src/tools/health-monitor.ts index 3d1266e9..d348a6e2 100644 --- a/src/tools/health-monitor.ts +++ b/src/tools/health-monitor.ts @@ -1,4 +1,5 @@ import type { EventSink } from "../engine/types.ts"; +import type { BundleStartFailure } from "../runtime/workspace-runtime.ts"; import type { McpSource } from "./mcp-source.ts"; export type BundleState = "healthy" | "restarting" | "dead"; @@ -8,6 +9,12 @@ export interface BundleHealth { state: BundleState; uptime: number | null; restartCount: number; + /** + * Workspace id — populated only for boot-time start failures (live + * sources don't carry a wsId on `McpSource`). Lets `/v1/health` + * consumers distinguish same-named failed bundles across workspaces. + */ + wsId?: string; } interface BundleRecord { @@ -23,6 +30,13 @@ const DEFAULT_CHECK_INTERVAL_MS = 30_000; export interface HealthMonitorOptions { checkIntervalMs?: number; baseDelayMs?: number; + /** + * Boot-time start failures from `startWorkspaceBundles`. These never + * produced an `McpSource`, so they can't be restarted; we surface them + * as terminal `dead` entries in `getStatus()` so `/v1/health` reflects + * the failure instead of silently omitting the bundle. + */ + startFailures?: BundleStartFailure[]; } /** @@ -31,6 +45,7 @@ export interface HealthMonitorOptions { */ export class HealthMonitor { private records: BundleRecord[]; + private startFailures: BundleStartFailure[]; private timer: ReturnType | null = null; private checkIntervalMs: number; private baseDelayMs: number; @@ -47,6 +62,7 @@ export class HealthMonitor { state: "healthy" as BundleState, restartCount: 0, })); + this.startFailures = opts.startFailures ?? []; } /** Start the periodic health check loop. */ @@ -71,12 +87,20 @@ export class HealthMonitor { /** Get per-bundle health info. */ getStatus(): BundleHealth[] { - return this.records.map((r) => ({ + const live: BundleHealth[] = this.records.map((r) => ({ name: r.source.name, state: r.state, uptime: r.source.uptime(), restartCount: r.restartCount, })); + const dead: BundleHealth[] = this.startFailures.map((f) => ({ + name: f.serverName, + state: "dead" as const, + uptime: null, + restartCount: 0, + wsId: f.wsId, + })); + return [...live, ...dead]; } private async checkOne(record: BundleRecord): Promise { diff --git a/test/unit/health-monitor.test.ts b/test/unit/health-monitor.test.ts index baf59b45..327b1645 100644 --- a/test/unit/health-monitor.test.ts +++ b/test/unit/health-monitor.test.ts @@ -147,6 +147,76 @@ describe("HealthMonitor", () => { monitor.stop(); }); + it("includes boot-time start failures as dead entries in getStatus", async () => { + const sink = makeEventCollector(); + const monitor = new HealthMonitor([], sink, { + checkIntervalMs: 60_000, + baseDelayMs: 1, + startFailures: [ + { wsId: "ws_a", serverName: "broken", bundleName: "@nb/broken", error: "no manifest" }, + { wsId: "ws_b", serverName: "remote-x", bundleName: "https://x", error: "ECONNREFUSED" }, + ], + }); + + const status = monitor.getStatus(); + expect(status).toHaveLength(2); + + const broken = status.find((s) => s.name === "broken"); + expect(broken?.state).toBe("dead"); + expect(broken?.uptime).toBeNull(); + expect(broken?.restartCount).toBe(0); + expect(broken?.wsId).toBe("ws_a"); + + const remote = status.find((s) => s.name === "remote-x"); + expect(remote?.state).toBe("dead"); + expect(remote?.wsId).toBe("ws_b"); + + monitor.stop(); + }); + + it("does not attempt to restart boot-time failed bundles", async () => { + const sink = makeEventCollector(); + const monitor = new HealthMonitor([], sink, { + checkIntervalMs: 60_000, + baseDelayMs: 1, + startFailures: [ + { wsId: "ws_a", serverName: "broken", bundleName: "@nb/broken", error: "no manifest" }, + ], + }); + + await monitor.check(); + + // No restart, no extra events — these never produced a source to restart. + expect(sink.events).toHaveLength(0); + const status = monitor.getStatus(); + expect(status[0]!.state).toBe("dead"); + + monitor.stop(); + }); + + it("merges live sources and start failures in getStatus", async () => { + const source = makeMockSource("live-one"); + const sink = makeEventCollector(); + const monitor = new HealthMonitor([source], sink, { + checkIntervalMs: 60_000, + baseDelayMs: 1, + startFailures: [ + { wsId: "ws_a", serverName: "dead-one", bundleName: "@nb/dead", error: "boom" }, + ], + }); + + const status = monitor.getStatus(); + expect(status).toHaveLength(2); + const names = status.map((s) => s.name).sort(); + expect(names).toEqual(["dead-one", "live-one"]); + expect(status.find((s) => s.name === "live-one")?.state).toBe("healthy"); + expect(status.find((s) => s.name === "live-one")?.wsId).toBeUndefined(); + expect(status.find((s) => s.name === "dead-one")?.state).toBe("dead"); + expect(status.find((s) => s.name === "dead-one")?.wsId).toBe("ws_a"); + + monitor.stop(); + }); + it("stop() clears the interval so no more checks run", async () => { const source = makeMockSource("interval-bundle"); const sink = makeEventCollector(); diff --git a/test/unit/runtime/workspace-runtime.test.ts b/test/unit/runtime/workspace-runtime.test.ts index 8137ca89..4ba25400 100644 --- a/test/unit/runtime/workspace-runtime.test.ts +++ b/test/unit/runtime/workspace-runtime.test.ts @@ -1,13 +1,18 @@ import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; import { join } from "node:path"; import type { BundleRef } from "../../../src/bundles/types.ts"; +import type { EngineEvent, EventSink } from "../../../src/engine/types.ts"; import type { Workspace } from "../../../src/workspace/types.ts"; import { buildProcessInventory, mapWithConcurrency, type ProcessInventoryEntry, resolveBundleStartConcurrency, + startWorkspaceBundles, } from "../../../src/runtime/workspace-runtime.ts"; +import type { WorkspaceStore } from "../../../src/workspace/workspace-store.ts"; // --------------------------------------------------------------------------- // Fixtures @@ -301,3 +306,128 @@ describe("mapWithConcurrency", () => { ).rejects.toBe(err); }); }); + +// --------------------------------------------------------------------------- +// startWorkspaceBundles — bundle.startFailed observability +// --------------------------------------------------------------------------- + +function makeStore(workspaces: Workspace[]): WorkspaceStore { + return { + async list() { + return workspaces; + }, + } as unknown as WorkspaceStore; +} + +function makeEventCollector(): EventSink & { events: EngineEvent[] } { + const events: EngineEvent[] = []; + return { + events, + emit(event: EngineEvent) { + events.push(event); + }, + }; +} + +describe("startWorkspaceBundles — bundle.startFailed", () => { + let workDir: string; + + beforeEach(() => { + workDir = mkdtempSync(join(tmpdir(), "ws-runtime-test-")); + }); + + afterEach(() => { + rmSync(workDir, { recursive: true, force: true }); + }); + + it("emits bundle.startFailed when a bundle fails to start", async () => { + // Path-based bundle pointing at a nonexistent directory: startBundleSource + // throws from buildLocalSource. No fs setup needed. + const ws: Workspace = { + id: "ws_test", + name: "Test", + members: [], + bundles: [{ path: join(workDir, "does-not-exist") }], + createdAt: "2026-01-01T00:00:00Z", + updatedAt: "2026-01-01T00:00:00Z", + }; + const sink = makeEventCollector(); + + const result = await startWorkspaceBundles(makeStore([ws]), [], null, sink, undefined, { + workDir, + }); + + // Catch path keeps siblings unaffected — registry exists, just no source registered. + expect(result.registries.get("ws_test")).toBeDefined(); + expect(result.entries).toHaveLength(0); + + const failedEvents = sink.events.filter((e) => e.type === "bundle.startFailed"); + expect(failedEvents).toHaveLength(1); + const { data } = failedEvents[0]!; + expect(data.wsId).toBe("ws_test"); + expect(data.serverName).toBe("does-not-exist"); + expect(typeof data.bundleName).toBe("string"); + expect((data.bundleName as string).length).toBeGreaterThan(0); + expect(typeof data.error).toBe("string"); + expect((data.error as string).length).toBeGreaterThan(0); + }); + + it("returns failures array alongside entries", async () => { + const ws: Workspace = { + id: "ws_test", + name: "Test", + members: [], + bundles: [{ path: join(workDir, "missing-bundle") }], + createdAt: "2026-01-01T00:00:00Z", + updatedAt: "2026-01-01T00:00:00Z", + }; + const sink = makeEventCollector(); + + const result = await startWorkspaceBundles(makeStore([ws]), [], null, sink, undefined, { + workDir, + }); + + expect(result.failures).toBeDefined(); + expect(result.failures).toHaveLength(1); + const failure = result.failures![0]!; + expect(failure.wsId).toBe("ws_test"); + expect(failure.serverName).toBe("missing-bundle"); + expect(typeof failure.bundleName).toBe("string"); + expect(failure.bundleName.length).toBeGreaterThan(0); + expect(typeof failure.error).toBe("string"); + expect(failure.error.length).toBeGreaterThan(0); + }); + + it("a failed bundle does not abort siblings — workspace still gets a registry", async () => { + const ws1: Workspace = { + id: "ws_failing", + name: "Failing", + members: [], + bundles: [{ path: join(workDir, "broken") }], + createdAt: "2026-01-01T00:00:00Z", + updatedAt: "2026-01-01T00:00:00Z", + }; + const ws2: Workspace = { + id: "ws_empty", + name: "Empty", + members: [], + bundles: [], + createdAt: "2026-01-01T00:00:00Z", + updatedAt: "2026-01-01T00:00:00Z", + }; + const sink = makeEventCollector(); + + const result = await startWorkspaceBundles( + makeStore([ws1, ws2]), + [], + null, + sink, + undefined, + { workDir }, + ); + + expect(result.registries.get("ws_failing")).toBeDefined(); + expect(result.registries.get("ws_empty")).toBeDefined(); + expect(result.failures).toHaveLength(1); + }); +}); diff --git a/test/unit/sse-event-manager.test.ts b/test/unit/sse-event-manager.test.ts index ffcc7d4a..bf56908b 100644 --- a/test/unit/sse-event-manager.test.ts +++ b/test/unit/sse-event-manager.test.ts @@ -117,6 +117,26 @@ describe("SseEventManager — routing table", () => { expect(wsB.events).toEqual(["bundle.crashed"]); }); + test("bundle.startFailed is forwarded to the matching workspace only", async () => { + const wsA = collect(mgr.addClient("ws_a")); + const wsB = collect(mgr.addClient("ws_b")); + released.push(wsA.release, wsB.release); + + mgr.emit({ + type: "bundle.startFailed", + data: { + wsId: "ws_a", + serverName: "broken", + bundleName: "@nb/broken", + error: "spawn failed", + }, + }); + await flush(); + + expect(wsA.events).toContain("bundle.startFailed"); + expect(wsB.events).not.toContain("bundle.startFailed"); + }); + test("workspace-scoped event with missing wsId is dropped (no global fan-out)", async () => { const wsA = collect(mgr.addClient("ws_a")); const wsB = collect(mgr.addClient("ws_b")); diff --git a/test/unit/workspace-log-sink.test.ts b/test/unit/workspace-log-sink.test.ts index fd181484..c2a066b6 100644 --- a/test/unit/workspace-log-sink.test.ts +++ b/test/unit/workspace-log-sink.test.ts @@ -91,6 +91,7 @@ describe("WorkspaceLogSink", () => { "bundle.crashed", "bundle.recovered", "bundle.dead", + "bundle.startFailed", "data.changed", "config.changed", "skill.created", @@ -113,6 +114,28 @@ describe("WorkspaceLogSink", () => { expect(lines).toHaveLength(workspaceTypes.length); }); + it("writes bundle.startFailed event with error details", () => { + const sink = new WorkspaceLogSink({ dir }); + sink.emit( + makeEvent("bundle.startFailed", { + wsId: "ws_a", + serverName: "broken", + bundleName: "@nb/broken", + error: "ENOENT: manifest.json missing", + }), + ); + + const files = readdirSync(join(dir, "workspace")); + const lines = readFileSync(join(dir, "workspace", files[0]!), "utf-8").trim().split("\n"); + expect(lines).toHaveLength(1); + + const record = JSON.parse(lines[0]!); + expect(record.event).toBe("bundle.startFailed"); + expect(record.wsId).toBe("ws_a"); + expect(record.serverName).toBe("broken"); + expect(record.error).toBe("ENOENT: manifest.json missing"); + }); + it("close() is a no-op", () => { const sink = new WorkspaceLogSink({ dir }); expect(() => sink.close()).not.toThrow();