Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions harness/src/turn-orchestrator/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,33 @@
* cannot recover a session before a provider call.
*/

export class ContextOverflowError extends Error {
/** Thrown by a handler for a genuinely retryable failure. runTransition
* re-throws it so the turn-step queue applies backoff/retry/DLQ. Any other
* throw is treated as terminal and routes the session to `failed`. */
export class TransientError extends Error {
constructor(message: string) {
super(message);
this.name = 'ContextOverflowError';
this.name = 'TransientError';
}
}

export class CompactionBusyError extends Error {
export class ContextOverflowError extends Error {
constructor(message: string) {
super(message);
this.name = 'CompactionBusyError';
this.name = 'ContextOverflowError';
}
}

/** Thrown by a handler for a genuinely retryable failure. runTransition
* re-throws it so the turn-step queue applies backoff/retry/DLQ. Any other
* throw is treated as terminal and routes the session to `failed`. */
export class TransientError extends Error {
/** Another compaction holds the session's compaction lease (e.g. the async
* post-turn summarize of a large session outlives `busyTimeoutMs`). This is
* transient by construction — the lease TTL (300s) bounds it — so it extends
* {@link TransientError}: runTransition re-throws and the turn-step queue
* retries the step once the in-flight compaction releases, instead of
* killing the session with a terminal `failed`. */
export class CompactionBusyError extends TransientError {
constructor(message: string) {
super(message);
this.name = 'TransientError';
this.name = 'CompactionBusyError';
}
}

Expand Down
16 changes: 14 additions & 2 deletions harness/src/turn-orchestrator/preflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
* Pre-flight overflow check. Returns 'compacted' when compact_now ran;
* caller must then reload messages from persistence.
*
* @throws ContextOverflowError when the session is too large to compact.
* @throws CompactionBusyError when another compaction is in progress.
* @throws ContextOverflowError when the session is too large to compact (terminal).
* @throws CompactionBusyError when another compaction is in progress — a
* TransientError subclass, so the turn-step queue retries the step after
* the in-flight compaction releases the lease instead of failing the run.
*/

import { fetchModelLimit } from '../context-compaction/model-resolver.js';
Expand Down Expand Up @@ -96,6 +98,16 @@ export async function runPreflight(
if (res?.status === 'busy') {
throw new CompactionBusyError('compaction already in progress');
}
if (res?.status !== 'ok') {
// 'empty' (nothing eligible to compact) or an unknown/drifted status:
// no compaction ran, so do NOT claim 'compacted' — the caller would
// reload messages and proceed as if the context shrank.
logger.warn('preflight: compact_now returned non-ok status; proceeding without reload', {
session_id,
status: res?.status,
});
return 'ok';
}

return 'compacted';
}
7 changes: 7 additions & 0 deletions harness/src/turn-orchestrator/state-runtime/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import { toView, type TurnStateView } from '../schemas.js';
import { mirrorMessagesToSessionTree } from '../session-tree-mirror.js';
import { type TurnState, type TurnStateRecord, parseTurnStateRecord } from '../state.js';

/**
* Turn-step wakes go to the engine's `default` queue. NOTE: engine.config.yaml
* defines a `turn-step` FIFO queue (session_id grouping, max_retries: 5,
* concurrency: 1) that is currently ORPHANED — nothing enqueues to it.
* Switching to it changes scheduling semantics for every step (per-session
* ordering, retry bound) and is a deliberate follow-up, not a drive-by rename.
*/
export const TURN_STEP_QUEUE = 'default';

const NON_STEPABLE_STATES = new Set<TurnState>(['stopped', 'failed', 'function_awaiting_approval']);
Expand Down
22 changes: 22 additions & 0 deletions harness/tests/turn-orchestrator/preflight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,28 @@ describe('runPreflight', () => {
).rejects.toBeInstanceOf(CompactionBusyError);
});

it("returns ok without claiming compacted when compact_now reports 'empty'", async () => {
const { iii } = makeIii({
modelsGetResult: { context_window: 10, max_output_tokens: 0 },
compactNowResult: { status: 'empty' },
});

const result = await runPreflight(iii, 'session-1', [smallMessage], 'anthropic', 'claude-3');

expect(result).toBe('ok');
});

it('returns ok on an unknown compact_now status instead of claiming compacted', async () => {
const { iii } = makeIii({
modelsGetResult: { context_window: 10, max_output_tokens: 0 },
compactNowResult: { status: 'something-new' },
});

const result = await runPreflight(iii, 'session-1', [smallMessage], 'anthropic', 'claude-3');

expect(result).toBe('ok');
});

it('passes session_id and model info to compact_now', async () => {
const { iii, calls } = makeIii({
modelsGetResult: { context_window: 10, max_output_tokens: 0 },
Expand Down
37 changes: 36 additions & 1 deletion harness/tests/turn-orchestrator/run-transition.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { ISdk } from '../../src/runtime/iii.js';
import { TransientError } from '../../src/turn-orchestrator/errors.js';
import { CompactionBusyError, TransientError } from '../../src/turn-orchestrator/errors.js';
import { TURN_STATE_SCOPE } from '../../src/turn-orchestrator/state.js';
import { runTransition } from '../../src/turn-orchestrator/run-transition.js';
import {
Expand Down Expand Up @@ -171,4 +171,39 @@ describe('runTransition error model', () => {
),
).rejects.toThrow('retry me');
});

it('CompactionBusyError IS a TransientError — the subclass relation is the fix', () => {
// A one-line revert of `extends TransientError` back to `extends Error`
// resurrects the terminal-failure bug; pin the contract at the source.
const err = new CompactionBusyError('compaction already in progress');
expect(err).toBeInstanceOf(TransientError);
expect(err).toBeInstanceOf(Error);
expect(err.name).toBe('CompactionBusyError');
});

it('re-throws CompactionBusyError (transient) instead of failing the session', async () => {
// Regression: a busy compaction lease (async post-turn summarize of a
// large session) used to route the turn to terminal `failed` with
// "response failed: from assistant_streaming: compaction already in
// progress". Busy is lease-TTL-bounded — the queue must retry instead.
const { iii, writes } = fakeIii({ ...base, state: 'assistant_streaming' });
await expect(
runTransition(
iii,
'assistant_streaming',
async () => {
throw new CompactionBusyError('compaction already in progress');
},
{ session_id: 's1' },
),
).rejects.toThrow('compaction already in progress');
// The session record must NOT be routed to failed.
const failedWrite = writes.find(
(w) =>
w.function_id === 'state::set' &&
w.payload.scope === TURN_STATE_SCOPE &&
w.payload.value?.state === 'failed',
);
expect(failedWrite).toBeUndefined();
});
});
Loading
Loading