Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions harness/src/approval-gate/settings/verdict.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
*/

import { ApprovalDecisionSchema, STATE_SCOPE } from '../../approval-gate/schemas.js';
import { readSettings } from '../../approval-gate/settings/store.js';
import type { ApprovalSettings } from '../../approval-gate/settings/types.js';
import type { ISdk } from '../../runtime/iii.js';
import type { z } from 'zod';
export type ApprovalDecision = z.infer<typeof ApprovalDecisionSchema>;
Expand All @@ -17,7 +15,6 @@ export function parseApprovalDecision(value: unknown): ApprovalDecision | null {

export type AwaitingApprovalPorts = {
readDecision(session_id: string, function_call_id: string): Promise<ApprovalDecision | null>;
readSettings(session_id: string): Promise<ApprovalSettings>;
};

export function createAwaitingApprovalPorts(iii: ISdk): AwaitingApprovalPorts {
Expand All @@ -30,8 +27,5 @@ export function createAwaitingApprovalPorts(iii: ISdk): AwaitingApprovalPorts {
});
return parseApprovalDecision(raw);
},
readSettings(session_id) {
return readSettings(iii, session_id);
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export function register(iii: ISdk): void {
'turn::function_awaiting_approval',
async (payload: TurnStepPayload) => {
const parsed = TurnStepPayloadSchema.parse(payload);
return runTransition(iii, 'function_awaiting_approval', handleAwaitingApproval, parsed);
return runTransition(iii, 'function_awaiting_approval', handleAwaitingApproval, parsed, {
serialize: true,
});
},
{
description:
Expand Down
14 changes: 1 addition & 13 deletions harness/src/turn-orchestrator/function-awaiting-approval/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Resolve approval decisions and route the batch after each decision.
*/

import { settingsVerdict } from '../../approval-gate/settings/verdict.js';
import { text } from '../../types/content.js';
import type { FunctionResult } from '../../types/function.js';
import { finalizeBatch, runOneCall } from '../function-execute/run.js';
Expand Down Expand Up @@ -52,11 +51,6 @@
const work = rec.work;
let awaiting = [...rec.awaiting_approval];
const executed = { ...work.executed };
// Lazily snapshotted once per wake: a grant made AFTER a call parked
// (e.g. "approve always" on a sibling of the same function id, or a
// switch to auto/full mode) must release the still-parked calls it now
// covers — otherwise the batch never finalizes and the turn hangs.
let settings: Awaited<ReturnType<AwaitingApprovalPorts['readSettings']>> | null = null;

for (const entry of [...awaiting]) {
const callId = entry.function_call_id;
Expand All @@ -66,16 +60,10 @@
continue;
}

let decision = await readPorts.readDecision(rec.session_id, callId);
if (!decision) {
if (settings === null) settings = await readPorts.readSettings(rec.session_id);
if (settingsVerdict(settings, entry.function_id) === 'allow') {
decision = { decision: 'allow', reason: null };
}
}
const decision = await readPorts.readDecision(rec.session_id, callId);
if (!decision) continue;

const current = work.prepared.find((p) => p.call.id === callId)!;

Check warning on line 66 in harness/src/turn-orchestrator/function-awaiting-approval/run.ts

View workflow job for this annotation

GitHub Actions / harness: node lint + test

lint/style/noNonNullAssertion

Forbidden non-null assertion.
const resolved = applyDecisionToPrepared(current, decision);
await runOneCall(executePorts, rec.session_id, resolved, executed, { skipStart: true });

Expand Down
26 changes: 23 additions & 3 deletions harness/src/turn-orchestrator/hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { permissionsDenyEnvelope } from '../approval-gate/denial.js';
import { DENIAL_SCHEMA_VERSION, type DenialEnvelope } from '../approval-gate/schemas.js';
import { isHumanOnlyApprovalFunction } from '../approval-gate/settings/human-only.js';
import { readSettings } from '../approval-gate/settings/store.js';
import { settingsVerdict } from '../approval-gate/settings/verdict.js';
import type { ApprovalSettings } from '../approval-gate/settings/types.js';
import type {
CheckPermissionsPayload,
PolicyCheckReply,
Expand Down Expand Up @@ -63,6 +63,18 @@ function extractSessionId(args: unknown): string | null {
return null;
}

function isAlwaysAllowed(settings: ApprovalSettings, function_id: string): boolean {
return settings.always_allow.some(
(entry: ApprovalSettings['always_allow'][number]) => entry.function_id === function_id,
);
}

function isApprovedAlways(settings: ApprovalSettings, function_id: string): boolean {
return settings.approved_always.some(
(entry: ApprovalSettings['approved_always'][number]) => entry.function_id === function_id,
);
}

export async function consultBefore(iii: ISdk, function_call: FunctionCall): Promise<HookOutcome> {
if (isHumanOnlyApprovalFunction(function_call.function_id)) {
return {
Expand All @@ -74,8 +86,16 @@ export async function consultBefore(iii: ISdk, function_call: FunctionCall): Pro
const session_id = extractSessionId(function_call.arguments);
const settings = session_id ? await readSettings(iii, session_id) : null;

if (settings && settingsVerdict(settings, function_call.function_id) === 'allow') {
return { kind: 'allow' };
if (settings) {
if (settings.mode === 'full') return { kind: 'allow' };
// Per-session "approve always" grants apply in every mode — they are
// remembered human decisions, not an auto-policy.
if (isApprovedAlways(settings, function_call.function_id)) {
return { kind: 'allow' };
}
if (settings.mode === 'auto' && isAlwaysAllowed(settings, function_call.function_id)) {
return { kind: 'allow' };
}
}

try {
Expand Down
37 changes: 37 additions & 0 deletions harness/src/turn-orchestrator/run-transition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,24 @@ import { logger } from '../runtime/otel.js';
import { TransientError } from './errors.js';
import { emit } from './events.js';
import { type TurnStepPayload, type TurnStepResult } from './schemas.js';
import { acquireSessionLease, releaseSessionLease } from './state-runtime/session-lease.js';
import { createTurnStore } from './state-runtime/store.js';
import { type TurnState, type TurnStateRecord, transitionTo } from './state.js';
import { syntheticAssistant } from './synthetic-assistant.js';

export type TransitionHandler = (iii: ISdk, rec: TurnStateRecord) => Promise<void>;

export type RunTransitionOptions = {
/**
* Serialize this transition behind a per-session lease. Required for states
* woken by a fan-out trigger that can enqueue concurrent steps for one
* session (function_awaiting_approval — one wake per approval::resolve). A
* contender that cannot acquire throws {@link TransientError} so the durable
* queue retries it after the holder releases.
*/
serialize?: boolean;
};

/** Returns a stale skip result when the queue message no longer matches persisted state. */
function staleSkipResult(expectedState: TurnState, rec: TurnStateRecord): TurnStepResult | null {
if (rec.state === expectedState) return null;
Expand Down Expand Up @@ -70,6 +82,31 @@ export async function runTransition(
state: TurnState,
handle: TransitionHandler,
payload: TurnStepPayload,
options?: RunTransitionOptions,
): Promise<TurnStepResult> {
if (!options?.serialize) {
return runTransitionInner(iii, state, handle, payload);
}
const acquired = await acquireSessionLease(iii, payload.session_id);
if (!acquired) {
// Another wake holds the session. Retry via the queue once it releases;
// by then the persisted state has usually advanced and we stale-skip.
throw new TransientError(
`turn::${state}: session ${payload.session_id} held by a concurrent step`,
);
}
try {
return await runTransitionInner(iii, state, handle, payload);
} finally {
await releaseSessionLease(iii, payload.session_id);
}
}

async function runTransitionInner(
iii: ISdk,
state: TurnState,
handle: TransitionHandler,
payload: TurnStepPayload,
): Promise<TurnStepResult> {
const store = createTurnStore(iii);
const rec = await store.loadRecord(payload.session_id);
Expand Down
69 changes: 69 additions & 0 deletions harness/src/turn-orchestrator/state-runtime/session-lease.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Per-session mutual-exclusion lease for turn FSM transitions.
*
* The `turn-step` durable queue has no per-session ordering — `Enqueue` takes
* only a queue name (see iii-sdk `TriggerAction.Enqueue`). So two
* `turn::function_awaiting_approval` wakes for one session (one per
* `approval::resolve` write, fanned out by the `turn::on_approval` state
* trigger) can run concurrently. Without serialization both load the same
* parked `turn_state`, execute every call, and finalize — duplicating side
* effects (the function runs twice) and emitting duplicate
* `function_execution_end` / `turn_end` frames, which wedges the turn.
*
* The only atomic primitive the state worker exposes is `state::update` with
* `increment` — a locked read-modify-write that returns the prior value (the
* kv adapter holds the store write-lock for the whole op). Acquire increments
* a per-session holder counter; the caller that observes prior `0` (or a
* missing key) won and may proceed. Release resets the counter to `0`.
*
* Crash recovery: a holder that dies mid-transition never resets the counter,
* which would wedge the session forever. A contender whose acquire fails
* therefore steals a lease whose recorded acquire time is older than
* {@link LEASE_TTL_MS}. The steal is best-effort (a post-crash window can let
* two contenders through), but that degrades to the pre-fix behavior only
* briefly after a crash — far better than a permanent deadlock.
*/

import type { ISdk } from '../../runtime/iii.js';
import { stateGet, stateSet, stateUpdate } from '../../runtime/state.js';

export const LEASE_SCOPE = 'turn_lease';
export const LEASE_AT_SCOPE = 'turn_lease_at';
/** A holder older than this is assumed crashed and may be stolen. */
export const LEASE_TTL_MS = 30_000;

/** Atomically bump the holder counter; returns the prior count (0 when free/missing). */
async function bumpHolders(iii: ISdk, session_id: string): Promise<number> {
const res = await stateUpdate(iii, LEASE_SCOPE, session_id, [
{ type: 'increment', path: '', by: 1 },
]);
const prior = (res as { old_value?: unknown } | null)?.old_value;
return typeof prior === 'number' ? prior : 0;
}

/**
* Try to acquire the session lease. Returns `true` when this caller holds it
* and must call {@link releaseSessionLease}; `false` when another transition
* holds it (the caller should back off / retry).
*/
export async function acquireSessionLease(iii: ISdk, session_id: string): Promise<boolean> {
if ((await bumpHolders(iii, session_id)) === 0) {
await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now());
return true;
}
// Contended — recover a lease abandoned by a crashed holder.
const acquiredAt = await stateGet(iii, LEASE_AT_SCOPE, session_id);
if (typeof acquiredAt === 'number' && Date.now() - acquiredAt > LEASE_TTL_MS) {
await stateSet(iii, LEASE_SCOPE, session_id, 0);
if ((await bumpHolders(iii, session_id)) === 0) {
await stateSet(iii, LEASE_AT_SCOPE, session_id, Date.now());
return true;
Comment on lines +50 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Split-brain lease acquisition is still possible in the normal (non-crash) path.

At Line 50, the holder counter is acquired before the new acquire timestamp is published (Line 51). A concurrent contender can then hit the stale-steal branch at Lines 56-59 using an old LEASE_AT_SCOPE value and acquire too, so two workers may proceed concurrently.

This reintroduces duplicate transition execution risk. The lease needs ownership fencing (e.g., token/epoch validation on acquire/steal/release) or native KV lock primitives to avoid this race.

}
}
return false;
}

/** Release the session lease. Safe to call only from the holder. */
export async function releaseSessionLease(iii: ISdk, session_id: string): Promise<void> {
await stateSet(iii, LEASE_SCOPE, session_id, 0);
}
61 changes: 55 additions & 6 deletions harness/tests/integration/parallel-approval-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
handleApprovalStateWrite,
handleAwaitingApproval,
} from '../../src/turn-orchestrator/function-awaiting-approval/process.js';
import { TransientError } from '../../src/turn-orchestrator/errors.js';
import { handleExecute } from '../../src/turn-orchestrator/function-execute/process.js';
import { enterFunctionExecute } from '../../src/turn-orchestrator/function-execute/run.js';
import { runTransition } from '../../src/turn-orchestrator/run-transition.js';
Expand Down Expand Up @@ -77,14 +78,39 @@ async function runTurnStep(iii: ISdk, function_id: string, session_id: string):
return;
}
if (function_id === 'turn::function_awaiting_approval') {
await runTransition(iii, 'function_awaiting_approval', handleAwaitingApproval, payload);
await runTransition(iii, 'function_awaiting_approval', handleAwaitingApproval, payload, {
serialize: true,
});
}
}

/**
* Model the durable queue's TransientError retry: a wake that loses the
* per-session lease re-runs after the holder releases. Yields between attempts
* so the lease holder can make progress.
*/
async function runTurnStepWithRetry(
iii: ISdk,
function_id: string,
session_id: string,
): Promise<void> {
for (let attempt = 0; attempt < 100; attempt += 1) {
try {
await runTurnStep(iii, function_id, session_id);
return;
} catch (err) {
if (err instanceof TransientError) {
await flushMicrotasks();
continue;
}
throw err;
}
}
}
Comment on lines +97 to 109
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't silently succeed after retry exhaustion.

If all 100 attempts hit TransientError, this helper currently falls through and returns undefined. That drops the wake instead of surfacing a failed retry cycle, which can hide lease/retry regressions and make the harness pass for the wrong reason.

Proposed fix
 async function runTurnStepWithRetry(
   iii: ISdk,
   function_id: string,
   session_id: string,
 ): Promise<void> {
+  let lastTransient: TransientError | null = null;
   for (let attempt = 0; attempt < 100; attempt += 1) {
     try {
       await runTurnStep(iii, function_id, session_id);
       return;
     } catch (err) {
       if (err instanceof TransientError) {
+        lastTransient = err;
         await flushMicrotasks();
         continue;
       }
       throw err;
     }
   }
+  throw lastTransient ?? new Error(`retry budget exhausted for ${function_id}`);
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@harness/tests/integration/parallel-approval-harness.ts` around lines 97 -
109, The retry loop around runTurnStep silently returns undefined when all 100
attempts throw TransientError; change it so that after the loop completes it
does not fall through but throws a clear error (either rethrow the last caught
error or throw a new Error like "Retries exhausted" that includes the last
error) so the harness surfaces a failed retry cycle; update the for-loop
handling in the function that calls runTurnStep and references TransientError
and flushMicrotasks to capture the last err in the catch and throw it after the
loop instead of returning.


export function createParallelApprovalHarness(): ParallelApprovalHarness {
const stateStore = new Map<string, unknown>();
const emitted: AgentEvent[] = [];
let eventSeq = 0;

const iii = {
trigger: vi.fn(
Expand Down Expand Up @@ -126,12 +152,35 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness {
}

if (function_id === 'state::update') {
eventSeq += 1;
return { old_value: eventSeq - 1 };
// Faithful atomic read-modify-write per (scope, key): the engine's
// kv adapter holds the store write-lock for the whole op, so
// increment returns the prior value (null/absent → treated as 0).
// Both the event counter and the per-session lease depend on this.
const p = payload as {
scope: string;
key: string;
ops?: Array<{ type: string; path?: string; by?: number }>;
};
const storeKey = `${p.scope}/${p.key}`;
const old_value = stateStore.has(storeKey)
? structuredClone(stateStore.get(storeKey))
: null;
let next: unknown = old_value;
for (const op of p.ops ?? []) {
if (op.type === 'increment' && (op.path ?? '') === '') {
next = (typeof next === 'number' ? next : 0) + (op.by ?? 1);
}
}
stateStore.set(storeKey, next);
return { old_value, new_value: structuredClone(next) };
}

if (function_id === 'stream::set') {
const p = payload as { data: AgentEvent };
const p = payload as { stream_name?: string; data: AgentEvent };
// events.ts mirrors every turn_end onto a second `agent::turn_end`
// stream for compaction. Record only the primary `agent::events`
// stream so `emitted` is a faithful one-entry-per-event log.
if (p.stream_name === 'agent::turn_end') return null;
emitted.push(p.data);
return null;
}
Expand All @@ -154,7 +203,7 @@ export function createParallelApprovalHarness(): ParallelApprovalHarness {

if (function_id.startsWith('turn::') && action != null) {
const p = payload as { session_id: string };
await runTurnStep(iii as unknown as ISdk, function_id, p.session_id);
await runTurnStepWithRetry(iii as unknown as ISdk, function_id, p.session_id);
return null;
}

Expand Down
Loading
Loading