diff --git a/sdk/AGENTS.md b/sdk/AGENTS.md index 293d883..8cf1330 100644 --- a/sdk/AGENTS.md +++ b/sdk/AGENTS.md @@ -182,7 +182,7 @@ chat UIs that interleave protocol events, system events, and custom events. |------|-----------|------| | `acp_protocol` | `SessionUpdate \| unknown` | Known ACP protocol event (agent or client method) | | `claude_protocol` | `SDKMessage` | Known Claude protocol event | -| `system` | `SystemEvent` | Broker system event (`turn.started`, `turn.completed`, `broker.error`) | +| `system` | `SystemEvent` | Broker system event (`turn.started`, `turn.completed`, `turn.failed`, `broker.error`) | | `unknown` | `null` | Anything else — inspect `axonEvent` for details | Every timeline event has `{ kind, data, axonEvent }` where `axonEvent` is the @@ -198,7 +198,7 @@ conn.onTimelineEvent((event) => { // event.data is SessionUpdate | unknown break; case "system": - // event.data is SystemEvent ({ type: "turn.started" | "turn.completed", turnId, ... }) + // event.data is SystemEvent ({ type: "turn.started" | "turn.completed" | "turn.failed", turnId, ... }) break; case "unknown": // event.data is null — check event.axonEvent for raw data diff --git a/sdk/README.md b/sdk/README.md index f490976..af84aac 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -93,7 +93,7 @@ conn.onTimelineEvent((event) => { // Typed ACP payload — narrow further with event.eventType break; case "system": - // event.data: { type: "turn.started" | "turn.completed", turnId, ... } + // event.data: { type: "turn.started" | "turn.completed" | "turn.failed", turnId, ... } break; case "unknown": break; @@ -487,7 +487,7 @@ conn.onTimelineEvent((event: ACPTimelineEvent) => { // Use isFromAgent(event) / isFromUser(event) to check direction (or event.axonEvent.origin directly) break; case "system": - // event.data is SystemEvent: { type: "turn.started", turnId } | { type: "turn.completed", turnId, stopReason? } | { type: "broker.error", message } + // event.data is SystemEvent: { type: "turn.started", turnId } | { type: "turn.completed", turnId, stopReason? } | { type: "turn.failed", turnId, error, stopReason? } | { type: "broker.error", message } break; case "unknown": // event.data is null — use axonEvent to identify and parse the event yourself @@ -679,9 +679,16 @@ Typed representation of recognized broker system events: ```typescript type SystemEvent = | { type: "turn.started"; turnId: string } - | { type: "turn.completed"; turnId: string; stopReason?: string }; + | { type: "turn.completed"; turnId: string; stopReason?: string } + | { type: "turn.failed"; turnId: string; error: string; stopReason?: string } + | { type: "broker.error"; message: string }; ``` +`turn.failed` is emitted when the broker terminates an in-flight turn (for +example, on a model error). The Axon stream layer also rejects any pending +ACP JSON-RPC request with a JSON-RPC error (`code: -32000`, `message: error`), +keeping the SSE stream alive so subsequent prompts can still flow. + ### `WireData` (Claude module) Generic JSON wire format used by the Claude transport: @@ -721,6 +728,10 @@ This means **`await conn.prompt(...)` returns before the agent's response text h case SYSTEM_EVENT_TYPES.TURN_COMPLETED: // All content for this turn has been delivered break; + case SYSTEM_EVENT_TYPES.TURN_FAILED: + // Turn was terminated by the broker (e.g. model error) — + // surface event.data.error to the user + break; } break; case "acp_protocol": diff --git a/sdk/src/acp/axon-stream.test.ts b/sdk/src/acp/axon-stream.test.ts index 1af5dd7..b2ee3b0 100644 --- a/sdk/src/acp/axon-stream.test.ts +++ b/sdk/src/acp/axon-stream.test.ts @@ -5,6 +5,7 @@ import { createMockAxon, drain, makeAgentEvent, + makeSystemEvent, makeSystemEventWithRawPayload, makeUserEvent, type PublishCall, @@ -388,6 +389,182 @@ describe("axonStream", () => { expect(sysErr.sequence).toBe(42); } }); + + it("converts turn.failed SYSTEM_EVENT to JSON-RPC error for the pending request", async () => { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl.stream); + + const { readable, writable } = axonStream({ axon: axon as never }); + + const writer = writable.getWriter(); + await writer.write({ + jsonrpc: "2.0", + id: 7, + method: "session/prompt", + params: { sessionId: "s1", prompt: [{ type: "text", text: "hi" }] }, + } as never); + writer.releaseLock(); + + ctrl.push( + makeSystemEvent("turn.failed", { + turn_id: "t-1", + error: "You have exhausted your daily quota on this model.", + stop_reason: "Error", + }), + ); + ctrl.end(); + + const messages = await drain(readable); + + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + jsonrpc: "2.0", + id: 7, + error: { + code: -32000, + message: "You have exhausted your daily quota on this model.", + data: { event_type: "turn.failed" }, + }, + }); + }); + + it("rejects every pending request when turn.failed arrives with multiple in-flight", async () => { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl.stream); + + const { readable, writable } = axonStream({ axon: axon as never }); + + const writer = writable.getWriter(); + await writer.write({ + jsonrpc: "2.0", + id: 1, + method: "session/prompt", + params: { sessionId: "s1", prompt: [{ type: "text", text: "a" }] }, + } as never); + await writer.write({ + jsonrpc: "2.0", + id: 2, + method: "session/new", + params: {}, + } as never); + writer.releaseLock(); + + ctrl.push(makeSystemEvent("turn.failed", { error: "boom" })); + ctrl.end(); + + const messages = await drain(readable); + + expect(messages).toHaveLength(2); + expect(messages[0]).toMatchObject({ + id: 1, + error: { code: -32000, message: "boom", data: { event_type: "turn.failed" } }, + }); + expect(messages[1]).toMatchObject({ + id: 2, + error: { code: -32000, message: "boom", data: { event_type: "turn.failed" } }, + }); + }); + + it("falls back to raw payload string when turn.failed payload has no error field", async () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + try { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl.stream); + + const { readable, writable } = axonStream({ axon: axon as never }); + + const writer = writable.getWriter(); + await writer.write({ + jsonrpc: "2.0", + id: 9, + method: "session/prompt", + params: { sessionId: "s1", prompt: [{ type: "text", text: "hi" }] }, + } as never); + writer.releaseLock(); + + ctrl.push(makeSystemEventWithRawPayload("turn.failed", "raw failure string")); + ctrl.end(); + + const messages = await drain(readable); + + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ + jsonrpc: "2.0", + id: 9, + error: { + code: -32000, + message: "raw failure string", + data: { event_type: "turn.failed" }, + }, + }); + } finally { + warnSpy.mockRestore(); + } + }); + + it("keeps the SSE stream open after turn.failed so subsequent prompts can flow", async () => { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl.stream); + + const { readable, writable } = axonStream({ axon: axon as never }); + + const writer = writable.getWriter(); + await writer.write({ + jsonrpc: "2.0", + id: 1, + method: "session/prompt", + params: { sessionId: "s1", prompt: [{ type: "text", text: "first" }] }, + } as never); + + ctrl.push(makeSystemEvent("turn.failed", { turn_id: "t-1", error: "quota exhausted" })); + + const reader = readable.getReader(); + const firstMsg = await reader.read(); + expect(firstMsg.done).toBe(false); + expect(firstMsg.value).toMatchObject({ + id: 1, + error: { code: -32000, message: "quota exhausted" }, + }); + + // Stream is still alive — a second prompt + response can flow. + await writer.write({ + jsonrpc: "2.0", + id: 2, + method: "session/prompt", + params: { sessionId: "s1", prompt: [{ type: "text", text: "second" }] }, + } as never); + writer.releaseLock(); + ctrl.push(makeAgentEvent("session/prompt", { stopReason: "end_turn" })); + ctrl.end(); + + const secondMsg = await reader.read(); + expect(secondMsg.done).toBe(false); + expect(secondMsg.value).toMatchObject({ + jsonrpc: "2.0", + id: 2, + result: { stopReason: "end_turn" }, + }); + const closing = await reader.read(); + expect(closing.done).toBe(true); + reader.releaseLock(); + }); + + it("does not emit a JSON-RPC error for turn.failed when there are no pending requests", async () => { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl.stream); + + const { readable } = axonStream({ axon: axon as never }); + + ctrl.push(makeSystemEvent("turn.failed", { error: "boom" })); + ctrl.push(makeAgentEvent("session/update", { sessionUpdate: "usage_update" })); + ctrl.end(); + + const messages = await drain(readable); + + // The session/update notification still flows; turn.failed alone produces nothing. + expect(messages).toHaveLength(1); + expect(messages[0]).toMatchObject({ method: "session/update" }); + }); }); describe("auto-reconnect", () => { diff --git a/sdk/src/acp/axon-stream.ts b/sdk/src/acp/axon-stream.ts index 6c0b4a9..5f0be00 100644 --- a/sdk/src/acp/axon-stream.ts +++ b/sdk/src/acp/axon-stream.ts @@ -6,6 +6,7 @@ import { isSystemError, SystemError } from "../shared/errors/system-error.js"; import { makeDefaultOnError } from "../shared/logging.js"; import { isFromAgent, isFromUser } from "../shared/origin-guards.js"; import { getJsonRpcId, isNonNullObject } from "../shared/structural-guards.js"; +import { isTurnFailedAxonEvent, tryParseSystemEvent } from "../shared/timeline.js"; import type { LogFn } from "../shared/types.js"; import type { AxonStreamOptions } from "./types.js"; @@ -169,6 +170,33 @@ function createReadable( // --- Normal (live) processing --- + // Reject in-flight requests on `turn.failed` SYSTEM_EVENTs but + // keep the SSE loop alive — the agent process is still healthy + // and can accept further sessions/prompts. + if (isTurnFailedAxonEvent(axonEvent)) { + const parsed = tryParseSystemEvent(axonEvent); + const message = + parsed?.type === "turn.failed" + ? parsed.error || "turn failed" + : String(axonEvent.payload ?? "turn failed"); + log?.("read", `#${totalEvents} TURN_FAILED: ${message}`); + for (const [method, id] of pendingRequests) { + if (id !== undefined && id !== null) { + controller.enqueue({ + jsonrpc: "2.0", + id, + error: { + code: -32000, + message, + data: { event_type: axonEvent.event_type }, + }, + }); + } + pendingRequests.delete(method); + } + continue; + } + if (isSystemError(axonEvent)) { log?.("read", `#${totalEvents} SYSTEM_ERROR: ${axonEvent.payload}`); if (pendingRequests.size === 0) { diff --git a/sdk/src/acp/connection.test.ts b/sdk/src/acp/connection.test.ts index d17d7f3..4bc8f37 100644 --- a/sdk/src/acp/connection.test.ts +++ b/sdk/src/acp/connection.test.ts @@ -982,6 +982,39 @@ describe("ACPAxonConnection", () => { conn.disconnect(); }); + it("classifies SYSTEM_EVENT turn.failed as system", async () => { + const ctrl = createControllableStream(); + const { axon } = createMockAxon(ctrl); + const conn = new ACPAxonConnection(axon as never, { id: "dbx-test" } as never, { + replay: false, + }); + await conn.connect(); + + const events: ACPTimelineEvent[] = []; + conn.onTimelineEvent((ev) => events.push(ev)); + + ctrl.push( + makeSystemEvent("turn.failed", { + turn_id: "t-1", + error: "You have exhausted your daily quota on this model.", + stop_reason: "Error", + }), + ); + + await waitFor(() => events.length > 0); + expect(events[0].kind).toBe("system"); + if (events[0].kind === "system") { + expect(events[0].data.type).toBe("turn.failed"); + if (events[0].data.type === "turn.failed") { + expect(events[0].data.turnId).toBe("t-1"); + expect(events[0].data.error).toBe("You have exhausted your daily quota on this model."); + expect(events[0].data.stopReason).toBe("Error"); + } + } + + conn.disconnect(); + }); + it("classifies SYSTEM_EVENT broker.error as system", () => { const ev = makeFullAxonEvent({ event_type: "broker.error", @@ -1141,6 +1174,7 @@ describe("isACPProtocolEventType", () => { it("returns false for system event types", () => { expect(isACPProtocolEventType("turn.started")).toBe(false); expect(isACPProtocolEventType("turn.completed")).toBe(false); + expect(isACPProtocolEventType("turn.failed")).toBe(false); expect(isACPProtocolEventType("broker.error")).toBe(false); }); diff --git a/sdk/src/acp/connection.ts b/sdk/src/acp/connection.ts index 427a7f6..1f7c5bc 100644 --- a/sdk/src/acp/connection.ts +++ b/sdk/src/acp/connection.ts @@ -395,7 +395,8 @@ export class ACPAxonConnection { * * Every Axon event on the channel is classified into one of: * - `acp_protocol` — a known ACP protocol event (agent or client method) - * - `system` — a broker system event (`turn.started`, `turn.completed`, `broker.error`) + * - `system` — a broker system event (`turn.started`, `turn.completed`, + * `turn.failed`, `broker.error`) * - `unknown` — anything else * * For a pull-based alternative, see {@link receiveTimelineEvents}. @@ -566,7 +567,8 @@ export class ACPAxonConnection { * Classifies a raw Axon event into an {@link ACPTimelineEvent}. * * Classification rules: - * 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `broker.error` -> `system` + * 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `turn.failed` / + * `broker.error` -> `system` * 2. Known ACP protocol `event_type` (agent or client method) -> `acp_protocol` * 3. Everything else -> `unknown` * diff --git a/sdk/src/acp/index.ts b/sdk/src/acp/index.ts index 748eb98..270764a 100644 --- a/sdk/src/acp/index.ts +++ b/sdk/src/acp/index.ts @@ -131,6 +131,7 @@ export type { ElicitationCompleteTimelineEvent, ElicitationTimelineEvent, TurnCompletedTimelineEvent, + TurnFailedTimelineEvent, TurnStartedTimelineEvent, } from "./timeline-event-guards.js"; export { @@ -149,6 +150,7 @@ export { isSessionUpdateEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "./timeline-event-guards.js"; diff --git a/sdk/src/acp/timeline-event-guards.test.ts b/sdk/src/acp/timeline-event-guards.test.ts index 43d462e..d9bd2ef 100644 --- a/sdk/src/acp/timeline-event-guards.test.ts +++ b/sdk/src/acp/timeline-event-guards.test.ts @@ -13,6 +13,7 @@ import { isSessionUpdateEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "./timeline-event-guards.js"; @@ -47,7 +48,7 @@ function makeProtocolTimelineEvent( } function makeSystemTimelineEvent( - type: "turn.started" | "turn.completed" | "broker.error", + type: "turn.started" | "turn.completed" | "turn.failed" | "broker.error", extra: Record = {}, ): ACPTimelineEvent { const data = @@ -55,7 +56,9 @@ function makeSystemTimelineEvent( ? { type, turnId: "t-1" } : type === "turn.completed" ? { type, turnId: "t-1", ...extra } - : { type, message: "test error" }; + : type === "turn.failed" + ? { type, turnId: "t-1", error: "boom", ...extra } + : { type, message: "test error" }; return { kind: "system", data, @@ -113,6 +116,7 @@ describe("isTurnCompletedEvent", () => { it("returns false for other system events", () => { expect(isTurnCompletedEvent(makeSystemTimelineEvent("turn.started"))).toBe(false); + expect(isTurnCompletedEvent(makeSystemTimelineEvent("turn.failed"))).toBe(false); expect(isTurnCompletedEvent(makeSystemTimelineEvent("broker.error"))).toBe(false); }); @@ -121,6 +125,26 @@ describe("isTurnCompletedEvent", () => { }); }); +describe("isTurnFailedEvent", () => { + it("returns true for turn.failed system events", () => { + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.failed"))).toBe(true); + }); + + it("returns false for other system events", () => { + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.started"))).toBe(false); + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.completed"))).toBe(false); + expect(isTurnFailedEvent(makeSystemTimelineEvent("broker.error"))).toBe(false); + }); + + it("returns false for protocol events", () => { + expect(isTurnFailedEvent(makeProtocolTimelineEvent("session/update"))).toBe(false); + }); + + it("returns false for unknown events", () => { + expect(isTurnFailedEvent(makeUnknownTimelineEvent())).toBe(false); + }); +}); + describe("isBrokerErrorEvent", () => { it("returns true for broker.error system events", () => { expect(isBrokerErrorEvent(makeSystemTimelineEvent("broker.error"))).toBe(true); diff --git a/sdk/src/acp/timeline-event-guards.ts b/sdk/src/acp/timeline-event-guards.ts index 90c03e2..1a1e2ee 100644 --- a/sdk/src/acp/timeline-event-guards.ts +++ b/sdk/src/acp/timeline-event-guards.ts @@ -43,6 +43,7 @@ export type { BrokerErrorTimelineEvent, DevboxLifecycleTimelineEvent, TurnCompletedTimelineEvent, + TurnFailedTimelineEvent, TurnStartedTimelineEvent, } from "../shared/timeline-event-guards.js"; export { @@ -53,6 +54,7 @@ export { isDevboxLifecycleEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "../shared/timeline-event-guards.js"; diff --git a/sdk/src/claude/classify-claude-axon-event.ts b/sdk/src/claude/classify-claude-axon-event.ts index d8f70be..34557c2 100644 --- a/sdk/src/claude/classify-claude-axon-event.ts +++ b/sdk/src/claude/classify-claude-axon-event.ts @@ -21,7 +21,8 @@ export function isClaudeProtocolEventType(eventType: string): boolean { * Classifies a raw Axon event into a {@link ClaudeTimelineEvent}. * * Classification rules: - * 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `broker.error` -> `system` + * 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `turn.failed` / + * `broker.error` -> `system` * 2. Known Claude protocol `event_type` -> `claude_protocol` with `eventType` discriminator * 3. Everything else -> `unknown` * diff --git a/sdk/src/claude/connection.ts b/sdk/src/claude/connection.ts index 075d6da..b461bb2 100644 --- a/sdk/src/claude/connection.ts +++ b/sdk/src/claude/connection.ts @@ -480,7 +480,8 @@ export class ClaudeAxonConnection { * * Every Axon event on the channel is classified into one of: * - `claude_protocol` — a known Claude protocol event (user or agent message) - * - `system` — a broker system event (`turn.started`, `turn.completed`, `broker.error`) + * - `system` — a broker system event (`turn.started`, `turn.completed`, + * `turn.failed`, `broker.error`) * - `unknown` — anything else * * For a pull-based alternative, see {@link receiveTimelineEvents}. diff --git a/sdk/src/claude/index.ts b/sdk/src/claude/index.ts index 96f6a8f..bd53588 100644 --- a/sdk/src/claude/index.ts +++ b/sdk/src/claude/index.ts @@ -103,6 +103,7 @@ export type { BrokerErrorTimelineEvent, DevboxLifecycleTimelineEvent, TurnCompletedTimelineEvent, + TurnFailedTimelineEvent, TurnStartedTimelineEvent, } from "./timeline-event-guards.js"; export { @@ -121,6 +122,7 @@ export { isDevboxLifecycleEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "./timeline-event-guards.js"; diff --git a/sdk/src/claude/timeline-event-guards.test.ts b/sdk/src/claude/timeline-event-guards.test.ts index 501387d..5e85e93 100644 --- a/sdk/src/claude/timeline-event-guards.test.ts +++ b/sdk/src/claude/timeline-event-guards.test.ts @@ -12,6 +12,7 @@ import { isClaudeSystemInitEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "./timeline-event-guards.js"; @@ -47,14 +48,16 @@ function makeProtocolTimelineEvent( } function makeSystemTimelineEvent( - type: "turn.started" | "turn.completed" | "broker.error", + type: "turn.started" | "turn.completed" | "turn.failed" | "broker.error", ): ClaudeTimelineEvent { const data = type === "turn.started" ? { type, turnId: "t-1" } : type === "turn.completed" ? { type, turnId: "t-1" } - : { type, message: "test error" }; + : type === "turn.failed" + ? { type, turnId: "t-1", error: "boom" } + : { type, message: "test error" }; return { kind: "system", data, @@ -112,6 +115,7 @@ describe("isTurnCompletedEvent", () => { it("returns false for other system events", () => { expect(isTurnCompletedEvent(makeSystemTimelineEvent("turn.started"))).toBe(false); + expect(isTurnCompletedEvent(makeSystemTimelineEvent("turn.failed"))).toBe(false); expect(isTurnCompletedEvent(makeSystemTimelineEvent("broker.error"))).toBe(false); }); @@ -120,6 +124,26 @@ describe("isTurnCompletedEvent", () => { }); }); +describe("isTurnFailedEvent", () => { + it("returns true for turn.failed system events", () => { + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.failed"))).toBe(true); + }); + + it("returns false for other system events", () => { + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.started"))).toBe(false); + expect(isTurnFailedEvent(makeSystemTimelineEvent("turn.completed"))).toBe(false); + expect(isTurnFailedEvent(makeSystemTimelineEvent("broker.error"))).toBe(false); + }); + + it("returns false for protocol events", () => { + expect(isTurnFailedEvent(makeProtocolTimelineEvent("assistant"))).toBe(false); + }); + + it("returns false for unknown events", () => { + expect(isTurnFailedEvent(makeUnknownTimelineEvent())).toBe(false); + }); +}); + describe("isBrokerErrorEvent", () => { it("returns true for broker.error system events", () => { expect(isBrokerErrorEvent(makeSystemTimelineEvent("broker.error"))).toBe(true); diff --git a/sdk/src/claude/timeline-event-guards.ts b/sdk/src/claude/timeline-event-guards.ts index bc98018..c69760c 100644 --- a/sdk/src/claude/timeline-event-guards.ts +++ b/sdk/src/claude/timeline-event-guards.ts @@ -43,6 +43,7 @@ export type { BrokerErrorTimelineEvent, DevboxLifecycleTimelineEvent, TurnCompletedTimelineEvent, + TurnFailedTimelineEvent, TurnStartedTimelineEvent, } from "../shared/timeline-event-guards.js"; export { @@ -53,6 +54,7 @@ export { isDevboxLifecycleEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "../shared/timeline-event-guards.js"; diff --git a/sdk/src/shared/index.ts b/sdk/src/shared/index.ts index 6bab306..c1aaef4 100644 --- a/sdk/src/shared/index.ts +++ b/sdk/src/shared/index.ts @@ -52,6 +52,7 @@ export type { BrokerErrorTimelineEvent, DevboxLifecycleTimelineEvent, TurnCompletedTimelineEvent, + TurnFailedTimelineEvent, TurnStartedTimelineEvent, } from "./timeline-event-guards.js"; export { @@ -62,6 +63,7 @@ export { isDevboxLifecycleEvent, isSystemTimelineEvent, isTurnCompletedEvent, + isTurnFailedEvent, isTurnStartedEvent, isUnknownTimelineEvent, } from "./timeline-event-guards.js"; diff --git a/sdk/src/shared/timeline-event-guards.ts b/sdk/src/shared/timeline-event-guards.ts index 67abfaa..c17e4e6 100644 --- a/sdk/src/shared/timeline-event-guards.ts +++ b/sdk/src/shared/timeline-event-guards.ts @@ -38,6 +38,14 @@ export type TurnCompletedTimelineEvent = SystemTimelineEvent & { data: { type: "turn.completed"; turnId: string; stopReason?: string }; }; +/** + * Narrowed type for a `turn.failed` system event. + * @category Timeline + */ +export type TurnFailedTimelineEvent = SystemTimelineEvent & { + data: { type: "turn.failed"; turnId: string; error: string; stopReason?: string }; +}; + /** * Narrowed type for a `broker.error` system event. * @category Timeline @@ -91,6 +99,20 @@ export function isTurnCompletedEvent( ); } +/** + * Type guard for `turn.failed` system events. + * + * @param event - The timeline event to test. + * @returns `true` if `event` is a turn-failed system event. + * @category Timeline + */ +export function isTurnFailedEvent(event: BaseTimelineEvent): event is TurnFailedTimelineEvent { + return ( + event.kind === "system" && + (event.data as { type?: string }).type === SYSTEM_EVENT_TYPES.TURN_FAILED + ); +} + /** * Type guard for `broker.error` system events. * diff --git a/sdk/src/shared/timeline.test.ts b/sdk/src/shared/timeline.test.ts index ad43df6..1af0609 100644 --- a/sdk/src/shared/timeline.test.ts +++ b/sdk/src/shared/timeline.test.ts @@ -1,6 +1,11 @@ import { describe, expect, it, vi } from "vitest"; import { makeFullAxonEvent as makeAxonEvent } from "../__test-utils__/mock-axon.js"; -import { createClassifier, tryParseSystemEvent, tryParseTimelinePayload } from "./timeline.js"; +import { + createClassifier, + isTurnFailedAxonEvent, + tryParseSystemEvent, + tryParseTimelinePayload, +} from "./timeline.js"; describe("tryParseTimelinePayload", () => { it("parses a JSON string payload", () => { @@ -66,6 +71,68 @@ describe("tryParseSystemEvent", () => { }); }); + it("parses turn.failed with full payload", () => { + const ev = makeAxonEvent({ + event_type: "turn.failed", + payload: JSON.stringify({ + turn_id: "t-4", + error: "You have exhausted your daily quota on this model.", + stop_reason: "Error", + }), + }); + expect(tryParseSystemEvent(ev)).toEqual({ + type: "turn.failed", + turnId: "t-4", + error: "You have exhausted your daily quota on this model.", + stopReason: "Error", + }); + }); + + it("parses turn.failed with missing turn_id and stop_reason", () => { + const ev = makeAxonEvent({ + event_type: "turn.failed", + payload: JSON.stringify({ error: "boom" }), + }); + expect(tryParseSystemEvent(ev)).toEqual({ + type: "turn.failed", + turnId: "", + error: "boom", + stopReason: undefined, + }); + }); + + it("parses turn.failed missing error field falls back to stringified payload", () => { + const ev = makeAxonEvent({ + event_type: "turn.failed", + payload: JSON.stringify({ turn_id: "t-5", stop_reason: "Error" }), + }); + expect(tryParseSystemEvent(ev)).toEqual({ + type: "turn.failed", + turnId: "t-5", + error: JSON.stringify({ turn_id: "t-5", stop_reason: "Error" }), + stopReason: "Error", + }); + }); + + it("parses turn.failed with raw string payload", () => { + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + try { + const ev = makeAxonEvent({ + event_type: "turn.failed", + payload: "raw failure string", + }); + expect(tryParseSystemEvent(ev)).toEqual({ + type: "turn.failed", + turnId: "", + error: "raw failure string", + stopReason: undefined, + }); + expect(warnSpy).toHaveBeenCalledOnce(); + } finally { + warnSpy.mockRestore(); + } + }); + it("parses broker.error with message field", () => { const ev = makeAxonEvent({ event_type: "broker.error", @@ -267,6 +334,44 @@ describe("tryParseSystemEvent", () => { }); }); +describe("isTurnFailedAxonEvent", () => { + it("returns true for SYSTEM_EVENT origin with turn.failed event_type", () => { + const ev = makeAxonEvent({ + origin: "SYSTEM_EVENT", + event_type: "turn.failed", + payload: JSON.stringify({ error: "boom" }), + }); + expect(isTurnFailedAxonEvent(ev)).toBe(true); + }); + + it("returns false for SYSTEM_EVENT with a different event_type", () => { + const ev = makeAxonEvent({ + origin: "SYSTEM_EVENT", + event_type: "turn.completed", + payload: JSON.stringify({ turn_id: "t-1" }), + }); + expect(isTurnFailedAxonEvent(ev)).toBe(false); + }); + + it("returns false for turn.failed event_type with a non-SYSTEM origin", () => { + const ev = makeAxonEvent({ + origin: "AGENT_EVENT", + event_type: "turn.failed", + payload: JSON.stringify({ error: "boom" }), + }); + expect(isTurnFailedAxonEvent(ev)).toBe(false); + }); + + it("returns false for unrelated SYSTEM_EVENTs", () => { + const ev = makeAxonEvent({ + origin: "SYSTEM_EVENT", + event_type: "broker.error", + payload: "agent crashed", + }); + expect(isTurnFailedAxonEvent(ev)).toBe(false); + }); +}); + describe("createClassifier", () => { const classify = createClassifier<{ kind: "test"; data: unknown; axonEvent: unknown }>({ label: "testClassifier", diff --git a/sdk/src/shared/timeline.ts b/sdk/src/shared/timeline.ts index 7685db3..3bfa50e 100644 --- a/sdk/src/shared/timeline.ts +++ b/sdk/src/shared/timeline.ts @@ -3,6 +3,7 @@ */ import type { AxonEventView } from "@runloop/api-client/resources/axons"; +import { SYSTEM_EVENT_ORIGIN } from "./errors/system-error.js"; import type { DevboxLifecycleKind, SystemEvent, @@ -23,6 +24,7 @@ import type { export const SYSTEM_EVENT_TYPES = { TURN_STARTED: "turn.started", TURN_COMPLETED: "turn.completed", + TURN_FAILED: "turn.failed", BROKER_ERROR: "broker.error", DEVBOX_RUNNING: "devbox.running", DEVBOX_SUSPENDED: "devbox.suspended", @@ -40,6 +42,26 @@ export function isSystemEventType(eventType: string): boolean { return SYSTEM_EVENT_TYPE_SET.has(eventType); } +/** + * Checks whether a raw Axon event is a `turn.failed` SYSTEM_EVENT. + * + * Counterpart to {@link isSystemError} for the per-turn failure case. + * Use in the Axon-stream layer (before timeline classification) to detect + * turn-level failures and reject in-flight JSON-RPC requests without + * tearing the SSE stream down. + * + * @param event - The raw Axon event to check. + * @returns `true` if the event is a `turn.failed` system event. + * + * @category Utilities + * @internal + */ +export function isTurnFailedAxonEvent(event: AxonEventView): boolean { + return ( + event.origin === SYSTEM_EVENT_ORIGIN && event.event_type === SYSTEM_EVENT_TYPES.TURN_FAILED + ); +} + // --------------------------------------------------------------------------- // System event payload shapes // --------------------------------------------------------------------------- @@ -53,6 +75,12 @@ interface TurnCompletedPayload { stop_reason?: string; } +interface TurnFailedPayload { + turn_id?: string; + error?: string; + stop_reason?: string; +} + interface BrokerErrorPayload { message?: string; } @@ -139,6 +167,20 @@ export function tryParseSystemEvent(ev: AxonEventView): SystemEvent | null { }; } + if (ev.event_type === SYSTEM_EVENT_TYPES.TURN_FAILED) { + const parsed = tryParseTimelinePayload({ axonEvent: ev }); + // Mirror the broker.error fallback: when the payload is unparseable or + // missing the `error` field, surface the raw payload string. + const error = + parsed != null ? (parsed.error ?? String(ev.payload ?? "")) : String(ev.payload ?? ""); + return { + type: "turn.failed", + turnId: parsed?.turn_id ?? "", + error, + stopReason: parsed?.stop_reason, + }; + } + if (ev.event_type === SYSTEM_EVENT_TYPES.BROKER_ERROR) { const parsed = tryParseTimelinePayload({ axonEvent: ev }); const message = diff --git a/sdk/src/shared/types.ts b/sdk/src/shared/types.ts index 19816d8..401aa19 100644 --- a/sdk/src/shared/types.ts +++ b/sdk/src/shared/types.ts @@ -68,6 +68,7 @@ export interface AgentLogEvent { export type SystemEvent = | { type: "turn.started"; turnId: string } | { type: "turn.completed"; turnId: string; stopReason?: string } + | { type: "turn.failed"; turnId: string; error: string; stopReason?: string } | { type: "broker.error"; message: string } | DevboxLifecycleEvent | AgentErrorEvent