Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ chat UIs that interleave protocol events, system events, and custom events.
|------|-----------|------|
| `acp_protocol` | `SessionUpdate \| unknown` | Known ACP protocol event (agent or client method) |
| `claude_protocol` | `SDKMessage` | Known Claude protocol event |
| `system` | `SystemEvent` | Broker system event (`turn.started`, `turn.completed`, `broker.error`) |
| `system` | `SystemEvent` | Broker system event (`turn.started`, `turn.completed`, `turn.failed`, `broker.error`) |
| `unknown` | `null` | Anything else — inspect `axonEvent` for details |

Every timeline event has `{ kind, data, axonEvent }` where `axonEvent` is the
Expand All @@ -198,7 +198,7 @@ conn.onTimelineEvent((event) => {
// event.data is SessionUpdate | unknown
break;
case "system":
// event.data is SystemEvent ({ type: "turn.started" | "turn.completed", turnId, ... })
// event.data is SystemEvent ({ type: "turn.started" | "turn.completed" | "turn.failed", turnId, ... })
break;
case "unknown":
// event.data is null — check event.axonEvent for raw data
Expand Down
17 changes: 14 additions & 3 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ conn.onTimelineEvent((event) => {
// Typed ACP payload — narrow further with event.eventType
break;
case "system":
// event.data: { type: "turn.started" | "turn.completed", turnId, ... }
// event.data: { type: "turn.started" | "turn.completed" | "turn.failed", turnId, ... }
break;
case "unknown":
break;
Expand Down Expand Up @@ -487,7 +487,7 @@ conn.onTimelineEvent((event: ACPTimelineEvent) => {
// Use isFromAgent(event) / isFromUser(event) to check direction (or event.axonEvent.origin directly)
break;
case "system":
// event.data is SystemEvent: { type: "turn.started", turnId } | { type: "turn.completed", turnId, stopReason? } | { type: "broker.error", message }
// event.data is SystemEvent: { type: "turn.started", turnId } | { type: "turn.completed", turnId, stopReason? } | { type: "turn.failed", turnId, error, stopReason? } | { type: "broker.error", message }
break;
case "unknown":
// event.data is null — use axonEvent to identify and parse the event yourself
Expand Down Expand Up @@ -679,9 +679,16 @@ Typed representation of recognized broker system events:
```typescript
type SystemEvent =
| { type: "turn.started"; turnId: string }
| { type: "turn.completed"; turnId: string; stopReason?: string };
| { type: "turn.completed"; turnId: string; stopReason?: string }
| { type: "turn.failed"; turnId: string; error: string; stopReason?: string }
| { type: "broker.error"; message: string };
```

`turn.failed` is emitted when the broker terminates an in-flight turn (for
example, on a model error). The Axon stream layer also rejects any pending
ACP JSON-RPC request with a JSON-RPC error (`code: -32000`, `message: error`),
keeping the SSE stream alive so subsequent prompts can still flow.

### `WireData` (Claude module)

Generic JSON wire format used by the Claude transport:
Expand Down Expand Up @@ -721,6 +728,10 @@ This means **`await conn.prompt(...)` returns before the agent's response text h
case SYSTEM_EVENT_TYPES.TURN_COMPLETED:
// All content for this turn has been delivered
break;
case SYSTEM_EVENT_TYPES.TURN_FAILED:
// Turn was terminated by the broker (e.g. model error) —
// surface event.data.error to the user
break;
}
break;
case "acp_protocol":
Expand Down
177 changes: 177 additions & 0 deletions sdk/src/acp/axon-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
createMockAxon,
drain,
makeAgentEvent,
makeSystemEvent,
makeSystemEventWithRawPayload,
makeUserEvent,
type PublishCall,
Expand Down Expand Up @@ -388,6 +389,182 @@ describe("axonStream", () => {
expect(sysErr.sequence).toBe(42);
}
});

it("converts turn.failed SYSTEM_EVENT to JSON-RPC error for the pending request", async () => {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl.stream);

const { readable, writable } = axonStream({ axon: axon as never });

const writer = writable.getWriter();
await writer.write({
jsonrpc: "2.0",
id: 7,
method: "session/prompt",
params: { sessionId: "s1", prompt: [{ type: "text", text: "hi" }] },
} as never);
writer.releaseLock();

ctrl.push(
makeSystemEvent("turn.failed", {
turn_id: "t-1",
error: "You have exhausted your daily quota on this model.",
stop_reason: "Error",
}),
);
ctrl.end();

const messages = await drain(readable);

expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
jsonrpc: "2.0",
id: 7,
error: {
code: -32000,
message: "You have exhausted your daily quota on this model.",
data: { event_type: "turn.failed" },
},
});
});

it("rejects every pending request when turn.failed arrives with multiple in-flight", async () => {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl.stream);

const { readable, writable } = axonStream({ axon: axon as never });

const writer = writable.getWriter();
await writer.write({
jsonrpc: "2.0",
id: 1,
method: "session/prompt",
params: { sessionId: "s1", prompt: [{ type: "text", text: "a" }] },
} as never);
await writer.write({
jsonrpc: "2.0",
id: 2,
method: "session/new",
params: {},
} as never);
writer.releaseLock();

ctrl.push(makeSystemEvent("turn.failed", { error: "boom" }));
ctrl.end();

const messages = await drain(readable);

expect(messages).toHaveLength(2);
expect(messages[0]).toMatchObject({
id: 1,
error: { code: -32000, message: "boom", data: { event_type: "turn.failed" } },
});
expect(messages[1]).toMatchObject({
id: 2,
error: { code: -32000, message: "boom", data: { event_type: "turn.failed" } },
});
});

it("falls back to raw payload string when turn.failed payload has no error field", async () => {
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
try {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl.stream);

const { readable, writable } = axonStream({ axon: axon as never });

const writer = writable.getWriter();
await writer.write({
jsonrpc: "2.0",
id: 9,
method: "session/prompt",
params: { sessionId: "s1", prompt: [{ type: "text", text: "hi" }] },
} as never);
writer.releaseLock();

ctrl.push(makeSystemEventWithRawPayload("turn.failed", "raw failure string"));
ctrl.end();

const messages = await drain(readable);

expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
jsonrpc: "2.0",
id: 9,
error: {
code: -32000,
message: "raw failure string",
data: { event_type: "turn.failed" },
},
});
} finally {
warnSpy.mockRestore();
}
});

it("keeps the SSE stream open after turn.failed so subsequent prompts can flow", async () => {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl.stream);

const { readable, writable } = axonStream({ axon: axon as never });

const writer = writable.getWriter();
await writer.write({
jsonrpc: "2.0",
id: 1,
method: "session/prompt",
params: { sessionId: "s1", prompt: [{ type: "text", text: "first" }] },
} as never);

ctrl.push(makeSystemEvent("turn.failed", { turn_id: "t-1", error: "quota exhausted" }));

const reader = readable.getReader();
const firstMsg = await reader.read();
expect(firstMsg.done).toBe(false);
expect(firstMsg.value).toMatchObject({
id: 1,
error: { code: -32000, message: "quota exhausted" },
});

// Stream is still alive — a second prompt + response can flow.
await writer.write({
jsonrpc: "2.0",
id: 2,
method: "session/prompt",
params: { sessionId: "s1", prompt: [{ type: "text", text: "second" }] },
} as never);
writer.releaseLock();
ctrl.push(makeAgentEvent("session/prompt", { stopReason: "end_turn" }));
ctrl.end();

const secondMsg = await reader.read();
expect(secondMsg.done).toBe(false);
expect(secondMsg.value).toMatchObject({
jsonrpc: "2.0",
id: 2,
result: { stopReason: "end_turn" },
});
const closing = await reader.read();
expect(closing.done).toBe(true);
reader.releaseLock();
});

it("does not emit a JSON-RPC error for turn.failed when there are no pending requests", async () => {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl.stream);

const { readable } = axonStream({ axon: axon as never });

ctrl.push(makeSystemEvent("turn.failed", { error: "boom" }));
ctrl.push(makeAgentEvent("session/update", { sessionUpdate: "usage_update" }));
ctrl.end();

const messages = await drain(readable);

// The session/update notification still flows; turn.failed alone produces nothing.
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({ method: "session/update" });
});
});

describe("auto-reconnect", () => {
Expand Down
28 changes: 28 additions & 0 deletions sdk/src/acp/axon-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { isSystemError, SystemError } from "../shared/errors/system-error.js";
import { makeDefaultOnError } from "../shared/logging.js";
import { isFromAgent, isFromUser } from "../shared/origin-guards.js";
import { getJsonRpcId, isNonNullObject } from "../shared/structural-guards.js";
import { isTurnFailedAxonEvent, tryParseSystemEvent } from "../shared/timeline.js";
import type { LogFn } from "../shared/types.js";
import type { AxonStreamOptions } from "./types.js";

Expand Down Expand Up @@ -169,6 +170,33 @@ function createReadable(

// --- Normal (live) processing ---

// Reject in-flight requests on `turn.failed` SYSTEM_EVENTs but
// keep the SSE loop alive — the agent process is still healthy
// and can accept further sessions/prompts.
if (isTurnFailedAxonEvent(axonEvent)) {
const parsed = tryParseSystemEvent(axonEvent);
const message =
parsed?.type === "turn.failed"
? parsed.error || "turn failed"
: String(axonEvent.payload ?? "turn failed");
log?.("read", `#${totalEvents} TURN_FAILED: ${message}`);
for (const [method, id] of pendingRequests) {
if (id !== undefined && id !== null) {
controller.enqueue({
jsonrpc: "2.0",
id,
error: {
code: -32000,
message,
data: { event_type: axonEvent.event_type },
},
});
}
pendingRequests.delete(method);
}
continue;
}

if (isSystemError(axonEvent)) {
log?.("read", `#${totalEvents} SYSTEM_ERROR: ${axonEvent.payload}`);
if (pendingRequests.size === 0) {
Expand Down
34 changes: 34 additions & 0 deletions sdk/src/acp/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,39 @@ describe("ACPAxonConnection", () => {
conn.disconnect();
});

it("classifies SYSTEM_EVENT turn.failed as system", async () => {
const ctrl = createControllableStream();
const { axon } = createMockAxon(ctrl);
const conn = new ACPAxonConnection(axon as never, { id: "dbx-test" } as never, {
replay: false,
});
await conn.connect();

const events: ACPTimelineEvent[] = [];
conn.onTimelineEvent((ev) => events.push(ev));

ctrl.push(
makeSystemEvent("turn.failed", {
turn_id: "t-1",
error: "You have exhausted your daily quota on this model.",
stop_reason: "Error",
}),
);

await waitFor(() => events.length > 0);
expect(events[0].kind).toBe("system");
if (events[0].kind === "system") {
expect(events[0].data.type).toBe("turn.failed");
if (events[0].data.type === "turn.failed") {
expect(events[0].data.turnId).toBe("t-1");
expect(events[0].data.error).toBe("You have exhausted your daily quota on this model.");
expect(events[0].data.stopReason).toBe("Error");
}
}

conn.disconnect();
});

it("classifies SYSTEM_EVENT broker.error as system", () => {
const ev = makeFullAxonEvent({
event_type: "broker.error",
Expand Down Expand Up @@ -1141,6 +1174,7 @@ describe("isACPProtocolEventType", () => {
it("returns false for system event types", () => {
expect(isACPProtocolEventType("turn.started")).toBe(false);
expect(isACPProtocolEventType("turn.completed")).toBe(false);
expect(isACPProtocolEventType("turn.failed")).toBe(false);
expect(isACPProtocolEventType("broker.error")).toBe(false);
});

Expand Down
6 changes: 4 additions & 2 deletions sdk/src/acp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ export class ACPAxonConnection {
*
* Every Axon event on the channel is classified into one of:
* - `acp_protocol` — a known ACP protocol event (agent or client method)
* - `system` — a broker system event (`turn.started`, `turn.completed`, `broker.error`)
* - `system` — a broker system event (`turn.started`, `turn.completed`,
* `turn.failed`, `broker.error`)
* - `unknown` — anything else
*
* For a pull-based alternative, see {@link receiveTimelineEvents}.
Expand Down Expand Up @@ -566,7 +567,8 @@ export class ACPAxonConnection {
* Classifies a raw Axon event into an {@link ACPTimelineEvent}.
*
* Classification rules:
* 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `broker.error` -> `system`
* 1. `SYSTEM_EVENT` with `turn.started` / `turn.completed` / `turn.failed` /
* `broker.error` -> `system`
* 2. Known ACP protocol `event_type` (agent or client method) -> `acp_protocol`
* 3. Everything else -> `unknown`
*
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/acp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export type {
ElicitationCompleteTimelineEvent,
ElicitationTimelineEvent,
TurnCompletedTimelineEvent,
TurnFailedTimelineEvent,
TurnStartedTimelineEvent,
} from "./timeline-event-guards.js";
export {
Expand All @@ -149,6 +150,7 @@ export {
isSessionUpdateEvent,
isSystemTimelineEvent,
isTurnCompletedEvent,
isTurnFailedEvent,
isTurnStartedEvent,
isUnknownTimelineEvent,
} from "./timeline-event-guards.js";
Expand Down
Loading
Loading