Turn-orchestrator: leaner durable FSM (reactive approval, finishSession teardown, readability)#185
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughReplaces the turn orchestrator with per-state durable steps, moves approvals to persisted state with a reactive trigger, centralizes compaction via a shared pipeline, revises state scopes/APIs, updates docs, and aligns tests with the new flows and contracts. ChangesDurable Orchestrator, Approvals, Compaction, and State Runtime
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
skill-check — worker1 verified, 12 skipped (no docs/). 1 error, 1 warning across the verified workers.
|
534f9c7 to
90279a2
Compare
90279a2 to
2314cf2
Compare
- Eliminated the isTerminal function from the state module, simplifying the state management logic. - Updated tests to directly check the state property instead of using isTerminal, ensuring clarity and consistency in state validation. - Adjusted the function_id generation in wakeState to use a template string for better readability.
… into run_request
…l, TurnStateView)
…ystem prompt - Add skillIdFromUri; remove the iii:// strip duplicated across system-prompt, provisioning, and bootstrap. - buildSystemPrompt takes a SystemPromptOptions object instead of four positional optionals, removing the unreadable bare-null call sites. - Delete the dead-in-production cwd path (no run-request source fed it). - Drop stale 'Mirrors *.rs' doc comments; use function-schema terminology.
…treaming - New provider-stream.ts owns channel creation, the concurrent provider trigger, and the read loop. A MessagePump bridges channel.onMessage to an async iterator, replacing the hand-rolled messageQueue/resolveNext/done. - assistant-streaming.ts drops 259->154 lines; handleStreaming is now linear orchestration over streamProviderTurn + finalizeAssistant. - Unify the two synthetic-error paths: createChannel failure now also emits a message_update, so the UI surfaces the error like the channel-closed path. - Add provider-stream.test.ts covering done/error frames, the per-delta callback, trigger rejection, create_channel failure, and bad-frame skips.
- Replace the stale 55-line finalize comment and the convoluted tail-walk dedup (incomingIds/existingResultIds/unseen boundary) with a small persistedResultIds helper: a turn's results are the trailing run of function_result messages, so skip ids already present there. - Make augmentFunctionCall pure (it mutated the arguments object in place). - Extract applyAfterHook and toFunctionResultMessage from finalizeExecutedCalls. - Fix the stale 'handleFinalize:' log label.
Drop tests that assert properties set on a literal (type-shape only) or duplicate behavior covered elsewhere: - persistence-prepared.test.ts: asserted PreparedEntry fields it just set; PreparedEntry is exercised for real in functions/awaiting-approval tests - agent-trigger.test.ts: DispatchResult shape block read back the discriminant it constructed; real behavior is covered by dispatchWithHook - state.test.ts: removed negative-tautology asserts, the AwaitingApprovalEntry shape test, the handleAwaitingApproval empty-queue case (dup of awaiting-approval.test.ts), and the duplicated newRecord test (work-undefined check folded into the kept one) No behavioral coverage lost: 228 -> 217 passing, tsc clean.
…e dead abort path - After-function-call hook skips publish_collect when no durable subscriber is registered for the topic (subscriber-presence cache), removing a fixed ~500ms collect wait per executed tool-call result on the turn critical path. - Context compaction subscribes to a dedicated `agent::turn_end` stream (mirrored by the event producer) instead of the full `agent::events` firehose, so it wakes once per turn instead of on every agent event. - Session-create fanout rides a dedicated `session_index` scope marker written once at first persist; the trigger matches by scope alone, dropping the per-write `harness::session::is_create_event` condition RPC. - Remove the orphaned state-based abort subgraph (abort.ts, on-abort-signal.ts, the `turn_abort` scope, the steering_check abort route): it had no production producer (abort/cancel is handled at the ACP layer) and the `router::abort` entrypoint it depended on does not exist. Drops the dead `!router::abort` kernel-deny entry.
…modules (#188) * refactor(turn-orchestrator): split state handlers into ports/process modules Reorganize the turn FSM into per-state directories that separate injected dependencies from pure transition logic, and consolidate the shared runtime behind typed ports. State handlers (flat states/*.ts -> per-state dirs): - assistant-streaming, function-execute, function-awaiting-approval, provisioning, steering-check each gain a ports.ts (I/O dependencies) and process.ts (transition logic); function-execute also splits out run.ts and types.ts New state-runtime/ layer: - store.ts all state::* turn-store I/O (was persistence/turn-state-write) - transcript.ts transcript idempotency helpers (was flat-messages) - turn-end.ts turn-end and FSM resume helpers (was finish) - ports.ts shared TurnStatePorts consumed by every state handler context-compaction: - handler-pipeline.ts holds the shared prune -> summarize -> flat-state path for the sync and async handlers (was emit.ts) Dead code removed (with tests): estimate, wake, subscriber-presence, turn-state-write, flat-messages, and the obsolete flat-state-key test. 945 tests pass; tsc -b clean. * style: format harness to biome 2.4.10 CI runs `biome ci` with biome 2.4.10 while the repo toolchain formats with 1.9.4, so newly written and edited files drift from CI's expected output and fail the lint gate. Apply 2.4.10 formatting across the affected harness files (no logic changes). Includes a few pre-existing base-branch files that had the same drift. 945 tests pass; tsc -b clean; `biome ci harness` reports no errors.
- Update the function-awaiting-approval state to execute resolved calls immediately as decisions arrive, improving responsiveness. - Modify the function-execute state to allow concurrent handling of pending approvals, ensuring that multiple calls can await approval without blocking. - Refactor related logic to streamline the transition between states, including clearer handling of batch completion and decision processing. - Remove outdated tests and add new integration tests to validate parallel approval flows and decision handling. This refactor aims to improve the efficiency and clarity of the approval process within the turn orchestrator, ensuring a more responsive and robust system.
…ransitions - Remove the wakeStep and wakeFromRecord methods from TurnStore, replacing them with inline enqueuing of state transitions directly from saveRecord. - Update the on-approval trigger to enqueue function_awaiting_approval, enhancing the responsiveness of the approval process. - Refactor the function-awaiting-approval logic to ensure that parked calls are handled more efficiently, allowing for better management of concurrent approvals. - Clean up related tests and remove outdated code to improve maintainability and clarity. These changes aim to enhance the efficiency and clarity of the turn orchestrator's approval handling, ensuring a more robust and responsive system.
…gement - Streamline the approval process by updating the `approval-gate` description to clarify its role in persisting decisions and enqueuing `turn::function_awaiting_approval`. - Refactor the handling of approval decisions to ensure that state transitions are triggered correctly when decisions are written to the `approvals` scope. - Remove the obsolete `on-approval.ts` file and integrate its functionality into the existing structure, improving maintainability. - Introduce a new `TurnStateInvariantError` class to handle validation errors related to turn state records. These changes aim to improve the clarity and efficiency of the turn orchestrator's approval handling, ensuring a more robust and responsive system.
eae2cb9 to
0f6f945
Compare
Match the CI biome version on 11 files (3 src, 8 tests).
Header and registered-function blurb still said the subscriber fires on every agent::events message; the wire is agent::turn_end (one wake per turn). The function name kept its historical on_agent_event id.
- architecture.md: worker count 11 → 15; add provider-config + provider-llamacpp rows; note context-compaction rides agent::turn_end. - approval-gate.md: replace dangling on-approval.ts link with function-awaiting-approval/process.ts. - context-compaction.md: use update_parts (batched) in prose + dependencies table; list handler-pipeline.ts and flat-state.ts in source layout. - session.md: add the four session-tree functions the list was missing (compactions, append_synthetic, update_part, update_parts); 11 → 15. - turn-orchestrator.md: drop misleading "in parallel" from the function_execute description — the dispatch loop is sequential, just non-blocking on pending calls.
The README.md file has been updated to remove the section regarding the isolation boundary of `shell::exec`, clarifying that it is not an effective isolation mechanism. This change aims to improve the documentation's accuracy and guidance for users regarding the use of shell execution in untrusted environments.
There was a problem hiding this comment.
Actionable comments posted: 14
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
harness/src/context-compaction/handler-sync.ts (1)
103-123:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPersist the synthetic auto-continue prompt into flat state too.
When
replayis present, Lines 103-114 append both the replayed message and a synthetic assistant continuation to the session tree, but Lines 117-123 only rewrite flat state withreplay.message. That leaves the shared transcript out of sync on the auto-continue path, so the next turn can miss the continuation prompt. Persist the synthetic assistant message alongside the replayed message.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/context-compaction/handler-sync.ts` around lines 103 - 123, The flat-state persist call only writes replay.message but should also include the synthetic assistant continuation appended via the trigger; update the call to persistCompactionFlatState (from handler-sync.ts) to pass both messages when replay is present — e.g., construct an array containing replay.message plus a synthetic assistant message matching the triggered payload (text "Continue if you have next steps..." with metadata {compaction_continue: true} and appropriate role/ids) and pass that array instead of just [replay.message]; ensure this happens in the same conditional flow after reinjectReplay and the append_synthetic trigger so the shared transcript stays in sync.harness/src/context-compaction/flat-state.ts (1)
29-35:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftAvoid last-write-wins overwrites of shared
messagesstate.This now replaces the session’s shared
messagessnapshot with a compacted copy. On the async path, compaction runs afterTurnEndand only holds the compaction lease, so a newer turn can append messages before this write lands; when that happens, this stale overwrite will drop the newer transcript entries. Keep compaction output on its own key again, or gate this write on a head/version check from the source snapshot.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/context-compaction/flat-state.ts` around lines 29 - 35, The current rewriteFlatMessages implementation unconditionally calls stateSet(iii, MESSAGES_SCOPE, session_id, messages) which can clobber newer appends; change it to avoid last-write-wins by either (a) writing compaction output to a separate key (e.g. a compaction-specific scope/key) instead of replacing the shared MESSAGES_SCOPE/session_id snapshot, or (b) perform a conditional/compare-and-set style write: read the source snapshot head/version before writing and only call stateSet if the head/version matches the one used to produce the compacted messages (reject/abort the write if it has advanced). Update rewriteFlatMessages to use one of these approaches and reference stateSet, MESSAGES_SCOPE, session_id, and the source snapshot head/version in the implementation.
🧹 Nitpick comments (4)
harness/src/turn-orchestrator/function-awaiting-approval/process.ts (1)
67-71: ⚡ Quick winUse shared approval scope constant instead of a string literal.
Line 70 hardcodes
'approvals'. ReusingSTATE_SCOPEhere avoids future drift between trigger wiring and approval storage scope.Suggested fix
+import { STATE_SCOPE } from '../../approval-gate/schemas.js'; ... iii.registerTrigger({ type: 'state', function_id: 'turn::on_approval', - config: { scope: 'approvals' }, + config: { scope: STATE_SCOPE }, });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/function-awaiting-approval/process.ts` around lines 67 - 71, Replace the hardcoded scope string in the iii.registerTrigger call with the shared constant STATE_SCOPE (used for approval storage) to avoid drift; update the config line in function-awaiting-approval/process.ts where iii.registerTrigger is called (the object with type: 'state', function_id: 'turn::on_approval') to use STATE_SCOPE, and ensure STATE_SCOPE is imported from the module that defines approval scopes so the trigger and approval storage share the same constant.harness/src/turn-orchestrator/function-execute/run.ts (1)
188-191: ⚡ Quick winReplace the non-null assertion with an explicit invariant check
work.executed[preparedCallId(item)]!isn’t currently enforced by Biome (noNonNullAssertionisoffinbiome.json), but it still hides the batch invariant behind a generic runtime crash; a guarded lookup yields a clearer, intentional error.Proposed fix
function executedInBatchOrder(work: FunctionBatchWork): ExecutedCall[] { - return work.prepared.map((item) => work.executed[preparedCallId(item)]!); + return work.prepared.map((item) => { + const entry = work.executed[preparedCallId(item)]; + if (!entry) { + throw new Error( + 'finalizeBatch invariant violated: missing executed entry for prepared call', + ); + } + return entry; + }); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/function-execute/run.ts` around lines 188 - 191, The function executedInBatchOrder currently uses a non-null assertion work.executed[preparedCallId(item)]! which can mask missing entries; replace this with an explicit guard/invariant: for each prepared item compute id = preparedCallId(item), lookup entry = work.executed[id], and if entry is undefined throw a clear Error (or use an assert helper) that includes the id and contextual info (e.g., which batch or prepared item) so the invariant failure is descriptive; return the collected entries after validating every lookup.harness/src/turn-orchestrator/provisioning/load-skills.ts (1)
12-16: ⚡ Quick winFetch default skill bodies in parallel.
This loop serializes one directory call per default skill, which adds avoidable provisioning latency.
Promise.all(...)keeps the behavior but preserves the PR's latency goals.Possible refactor
export async function loadDefaultSkillBodies( ports: Pick<ProvisioningPorts, 'fetchSkillBody'>, uris: readonly string[], ): Promise<DefaultSkillBody[]> { - const bodies: DefaultSkillBody[] = []; - for (const uri of uris) { - const body = await ports.fetchSkillBody(skillIdFromUri(uri)); - bodies.push(defaultSkillBody(uri, body)); - } - return bodies; + return Promise.all( + uris.map(async (uri) => { + const body = await ports.fetchSkillBody(skillIdFromUri(uri)); + return defaultSkillBody(uri, body); + }), + ); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/provisioning/load-skills.ts` around lines 12 - 16, The current for-loop serially awaits ports.fetchSkillBody for each uri, causing unnecessary latency; change to fetch all skill bodies in parallel by mapping uris to promises that call ports.fetchSkillBody(skillIdFromUri(uri)) and then await Promise.all on that array, then map the resolved results to defaultSkillBody(uri, body) to populate bodies (preserving order), using the existing symbols bodies, DefaultSkillBody[], uris, ports.fetchSkillBody, skillIdFromUri, and defaultSkillBody.harness/src/turn-orchestrator/schemas.ts (1)
179-185: ⚡ Quick winRequire strict approval-event scope/type in schema validation.
Line 180 and Line 181 currently accept missing
type/scope, which weakens filtering and can admit non-approval state-write payloads intoturn::on_approval. Make both fields required literals.Proposed change
const ApprovalDecisionWriteEventSchema = z.object({ - type: z.literal('state').optional(), - scope: z.literal('approvals').optional(), + type: z.literal('state'), + scope: z.literal('approvals'), event_type: z.enum(['state:created', 'state:updated']), key: z.string().regex(/^[^/]+\/[^/]+$/), new_value: z.object({ decision: z.enum(['allow', 'deny', 'aborted']) }).passthrough(), old_value: z.unknown().optional(), });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@harness/src/turn-orchestrator/schemas.ts` around lines 179 - 185, The ApprovalDecisionWriteEventSchema currently allows missing type/scope which weakens validation: update the ApprovalDecisionWriteEventSchema so that the type and scope fields are required literals (change the `type: z.literal('state').optional()` and `scope: z.literal('approvals').optional()` entries to required `z.literal('state')` and `z.literal('approvals')` respectively), keeping the rest of the schema (event_type, key, new_value, old_value) unchanged so only strict approval state-write payloads match.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@harness/src/runtime/state.ts`:
- Around line 69-73: parseStateListValues currently treats a malformed
state::list response as an empty list (returns []), which silently masks errors;
change it to fail fast by checking the result of stateListResponseRows(response)
and throwing a descriptive error (including the original response) when it is
not an array/valid value instead of returning []; keep using
unwrapStateListEntry<T>(entry) to map entries once the array is validated.
- Around line 169-177: stateListGroups currently returns [] for malformed
responses even when strict mode is requested; update stateListGroups (function
stateListGroups in runtime/state.ts) to validate the trigger result when
tolerant is false: after awaiting iii.trigger, if Array.isArray(result) return
it; otherwise if tolerant === false assert that result is an object with a
groups property that is an array and throw a clear error (e.g., "Invalid
response from state::list_groups") when the shape is wrong; only fall back to
returning result?.groups ?? [] when tolerant is true.
In `@harness/src/turn-orchestrator/bootstrap.ts`:
- Around line 14-15: The current call to skillIdFromUri(uri) can throw and is
executed outside the per-URI error handling, which may abort startup; change
this to guard parsing per-URI by wrapping the skillIdFromUri(uri).split('/')[0]
extraction in a try/catch (for the local variable ns) so any parse error for a
single URI is caught, logged or ignored, and that URI is skipped instead of
throwing, and only call namespaces.add(ns) when ns is successfully obtained;
update the logic around the const ns assignment and namespaces.add to use that
guarded flow.
In `@harness/src/turn-orchestrator/function-awaiting-approval/run.ts`:
- Around line 66-68: The code assumes work.prepared.find((p) => p.call.id ===
callId) always returns a value; change this to explicitly check the result and
throw a clear invariant error with context if missing before calling
applyDecisionToPrepared and runOneCall: retrieve the found item into a variable
(e.g., currentPrepared), if it's undefined throw an Error (or use your project's
invariant helper) that includes callId, rec.session_id and a brief snapshot
(e.g., list of prepared call ids) so the failure is explicit; only call
applyDecisionToPrepared(currentPrepared, decision) and await runOneCall(...)
when currentPrepared is present.
In `@harness/src/turn-orchestrator/provider-stream.ts`:
- Around line 119-150: The drain loop can throw (e.g., params.onDelta) and
currently skips pump.end() and awaiting the provider trigger; wrap the
for-await-of over pump.drain() in a try/finally so that in the finally block you
always call pump.end() and await triggerPromise (or otherwise ensure
triggerPromise completes), and ensure the reader is unblocked on reader close by
tying pump.end() to trigger completion or reader close; update references: the
async iteration over pump.drain(), the triggerPromise created from
iii.trigger({...}), and params.onDelta — move cleanup (pump.end() and await
triggerPromise) into the finally to guarantee teardown even on exceptions or
missing terminal frames.
In `@harness/src/turn-orchestrator/run-transition.ts`:
- Around line 45-59: The failure path currently calls store.saveRecord(rec,
previous) then emits UI events, which can leave the run in a permanently
"failed" state if emits or loadMessages fail (future retries are skipped by
staleSkipResult()); change the flow to persist an idempotency marker before
doing emits (e.g., add a field like failure_emitted:false when calling
store.saveRecord or use a dedicated store.markFailurePending(session_id)),
perform the emits (syntheticAssistant, emit, loadMessages), and only after
successful emits update the persisted record to failure_emitted:true (or
transition state from failed_pending_emit → failed); also update retry logic
(staleSkipResult check) to allow retries when failure_emitted is false so the
message_complete / agent_end can be re-sent until the marker flips true. Ensure
the new marker is set/updated via the existing store API (store.saveRecord / a
new store method) so it is durable across restarts.
- Around line 42-56: The handler exception text (err.message) is being copied
into the synthetic assistant sent to the UI, risking leakage of internal
details; preserve the detailed error in rec.error (as currently set) and instead
change the syntheticAssistant payload used in emit to provide a generic
user-facing message (e.g., "An internal error occurred while processing your
request") while keeping stop_reason: 'error'; update the code around
syntheticAssistant(...) and the emit(...) call so rec.error.message retains the
full detail for logs/record metadata but the emitted message.text is the
non-sensitive generic string (refer to rec.error, syntheticAssistant, emit,
transitionTo, store.saveRecord in your change).
In `@harness/src/turn-orchestrator/session-tree-mirror.ts`:
- Around line 35-44: When resuming an existing mirror (alreadyMirrored > 0) the
call to triggerSessionTree('session-tree::messages', ...) may return no tail
entry_id, causing lastAppended to be set to null and later appends to use
parent_id: null (breaking/forking the chain); change the logic in
session-tree-mirror.ts around the triggerSessionTree response handling so that
if resp.messages?.at(-1)?.entry_id is missing you bail out or trigger a
re-bootstrap instead of assigning lastAppended = null and continuing.
Specifically, update the block that reads resp and sets lastAppended
(referencing alreadyMirrored, triggerSessionTree, resp, tail, lastAppended,
session_id, iii) to detect a missing tail.entry_id and return/fallback to
re-bootstrap before any append occurs.
In `@harness/src/turn-orchestrator/state-runtime/store.ts`:
- Around line 29-39: The enqueueTurnStep function currently swallows failures by
catching errors and only logging them (logger.warn) which can leave a session in
a new state without scheduling the next step; update enqueueTurnStep (and its
try/catch around iii.trigger/TriggerAction.Enqueue with TURN_STEP_QUEUE) to
propagate the error instead of returning silently — either remove the try/catch
or rethrow the caught error after logging so the caller / queue retry semantics
can handle retries.
In `@harness/src/turn-orchestrator/steering-check/ports.ts`:
- Around line 30-39: The current drainInbox function swallows errors from
iii.trigger and returns [] which hides transient backend failures; update
drainInbox (the async drainInbox(name, session_id) that calls iii.trigger and
parseDrainItems) to stop converting exceptions to an empty array — either remove
the try/catch so errors naturally propagate, or replace the empty-array catch
with a rethrow (or throw a new descriptive error including the original error)
so callers can detect transient failures and retry rather than treating them as
an empty inbox.
In `@harness/tests/integration/parallel-approval.e2e.test.ts`:
- Around line 145-154: The test records endsAfterFirst via
executionEvents(h.emitted, 'function_execution_end', 'fc-1') but never verifies
it's non-zero, which can mask regressions; after computing endsAfterFirst (the
first call to h.resolveApproval), add an assertion that endsAfterFirst is
greater than zero (e.g., expect(endsAfterFirst).toBeGreaterThan(0)) before
calling h.resolveApproval again, so h.resolveApproval, executionEvents,
h.emitted, and the subsequent loadTurnRecord/rec?.awaiting_approval checks
operate against a valid non-zero baseline.
In `@harness/tests/turn-orchestrator/functions.test.ts`:
- Around line 191-195: The test currently asserts that there are no replayed
'function_execution_end' events for a previously executed call (it filters
emitted for events with type 'function_execution_end' and function_call_id
'fc-1' into fc1Ends and expects length 0); update this to align with the
run-path contract by expecting the replayed end event to be present (change the
expectation on fc1Ends from 0 to 1) so the test accepts the end-event replay for
skipped/executed calls.
In `@harness/tests/turn-orchestrator/steering.test.ts`:
- Around line 140-141: The test currently only checks that emitSpy was called
with events 'turn_end' and 'agent_end' but not their order; update the
assertions to assert ordering by inspecting emitSpy.mock.calls for the call
indices of the two events (or use expect(emitSpy).toHaveBeenNthCalledWith if
available) and assert that the index for the call containing { type: 'turn_end'
} is less than the index for the call containing { type: 'agent_end' } — locate
and modify the assertions around emitSpy, iii and 's1' to implement this ordered
check.
In `@harness/tests/turn-orchestrator/store.test.ts`:
- Around line 65-74: The test's mock IISdk (iii.trigger) currently throws for
every call, so it accidentally simulates both "stream::set" and "state::set"
failures; change the mock used in the test for createTurnStore so iii.trigger
only throws when the event name equals "stream::set" (inspect the first arg
passed to iii.trigger and only throw for "stream::set"), leaving calls for
"state::set" to succeed; keep references to iii.trigger, ISdk, createTurnStore
and saveRecord so reviewers can locate the change.
---
Outside diff comments:
In `@harness/src/context-compaction/flat-state.ts`:
- Around line 29-35: The current rewriteFlatMessages implementation
unconditionally calls stateSet(iii, MESSAGES_SCOPE, session_id, messages) which
can clobber newer appends; change it to avoid last-write-wins by either (a)
writing compaction output to a separate key (e.g. a compaction-specific
scope/key) instead of replacing the shared MESSAGES_SCOPE/session_id snapshot,
or (b) perform a conditional/compare-and-set style write: read the source
snapshot head/version before writing and only call stateSet if the head/version
matches the one used to produce the compacted messages (reject/abort the write
if it has advanced). Update rewriteFlatMessages to use one of these approaches
and reference stateSet, MESSAGES_SCOPE, session_id, and the source snapshot
head/version in the implementation.
In `@harness/src/context-compaction/handler-sync.ts`:
- Around line 103-123: The flat-state persist call only writes replay.message
but should also include the synthetic assistant continuation appended via the
trigger; update the call to persistCompactionFlatState (from handler-sync.ts) to
pass both messages when replay is present — e.g., construct an array containing
replay.message plus a synthetic assistant message matching the triggered payload
(text "Continue if you have next steps..." with metadata {compaction_continue:
true} and appropriate role/ids) and pass that array instead of just
[replay.message]; ensure this happens in the same conditional flow after
reinjectReplay and the append_synthetic trigger so the shared transcript stays
in sync.
---
Nitpick comments:
In `@harness/src/turn-orchestrator/function-awaiting-approval/process.ts`:
- Around line 67-71: Replace the hardcoded scope string in the
iii.registerTrigger call with the shared constant STATE_SCOPE (used for approval
storage) to avoid drift; update the config line in
function-awaiting-approval/process.ts where iii.registerTrigger is called (the
object with type: 'state', function_id: 'turn::on_approval') to use STATE_SCOPE,
and ensure STATE_SCOPE is imported from the module that defines approval scopes
so the trigger and approval storage share the same constant.
In `@harness/src/turn-orchestrator/function-execute/run.ts`:
- Around line 188-191: The function executedInBatchOrder currently uses a
non-null assertion work.executed[preparedCallId(item)]! which can mask missing
entries; replace this with an explicit guard/invariant: for each prepared item
compute id = preparedCallId(item), lookup entry = work.executed[id], and if
entry is undefined throw a clear Error (or use an assert helper) that includes
the id and contextual info (e.g., which batch or prepared item) so the invariant
failure is descriptive; return the collected entries after validating every
lookup.
In `@harness/src/turn-orchestrator/provisioning/load-skills.ts`:
- Around line 12-16: The current for-loop serially awaits ports.fetchSkillBody
for each uri, causing unnecessary latency; change to fetch all skill bodies in
parallel by mapping uris to promises that call
ports.fetchSkillBody(skillIdFromUri(uri)) and then await Promise.all on that
array, then map the resolved results to defaultSkillBody(uri, body) to populate
bodies (preserving order), using the existing symbols bodies,
DefaultSkillBody[], uris, ports.fetchSkillBody, skillIdFromUri, and
defaultSkillBody.
In `@harness/src/turn-orchestrator/schemas.ts`:
- Around line 179-185: The ApprovalDecisionWriteEventSchema currently allows
missing type/scope which weakens validation: update the
ApprovalDecisionWriteEventSchema so that the type and scope fields are required
literals (change the `type: z.literal('state').optional()` and `scope:
z.literal('approvals').optional()` entries to required `z.literal('state')` and
`z.literal('approvals')` respectively), keeping the rest of the schema
(event_type, key, new_value, old_value) unchanged so only strict approval
state-write payloads match.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b5b45c75-0b2c-48b0-95a4-4a20bfd878a0
📒 Files selected for processing (139)
console/web/src/lib/backend/real.tsharness/README.mdharness/docs/architecture.mdharness/docs/workers/approval-gate.mdharness/docs/workers/context-compaction.mdharness/docs/workers/harness.mdharness/docs/workers/session.mdharness/docs/workers/turn-orchestrator.mdharness/src/approval-gate/iii.worker.yamlharness/src/approval-gate/resolve.tsharness/src/approval-gate/schemas.tsharness/src/context-compaction/config.tsharness/src/context-compaction/emit.tsharness/src/context-compaction/flat-state.tsharness/src/context-compaction/handler-async.tsharness/src/context-compaction/handler-pipeline.tsharness/src/context-compaction/handler-sync.tsharness/src/context-compaction/lease.tsharness/src/context-compaction/model-resolver.tsharness/src/context-compaction/overflow.tsharness/src/context-compaction/register.tsharness/src/context-compaction/summarize.tsharness/src/harness/fanout/sessions-poll.tsharness/src/index.tsharness/src/llm-budget/store.tsharness/src/models-catalog/state.tsharness/src/runtime/state.tsharness/src/session/config.tsharness/src/session/inbox/key.tsharness/src/session/tree/store.tsharness/src/turn-orchestrator/abort.tsharness/src/turn-orchestrator/agent-trigger.tsharness/src/turn-orchestrator/approval-resume.tsharness/src/turn-orchestrator/assistant-streaming/ports.tsharness/src/turn-orchestrator/assistant-streaming/process.tsharness/src/turn-orchestrator/assistant-streaming/run.tsharness/src/turn-orchestrator/bootstrap.tsharness/src/turn-orchestrator/errors.tsharness/src/turn-orchestrator/estimate.tsharness/src/turn-orchestrator/events.tsharness/src/turn-orchestrator/function-awaiting-approval/ports.tsharness/src/turn-orchestrator/function-awaiting-approval/process.tsharness/src/turn-orchestrator/function-awaiting-approval/run.tsharness/src/turn-orchestrator/function-execute/ports.tsharness/src/turn-orchestrator/function-execute/process.tsharness/src/turn-orchestrator/function-execute/run.tsharness/src/turn-orchestrator/function-execute/types.tsharness/src/turn-orchestrator/get-state.tsharness/src/turn-orchestrator/hook.tsharness/src/turn-orchestrator/iii.worker.yamlharness/src/turn-orchestrator/main.tsharness/src/turn-orchestrator/on-abort-signal.tsharness/src/turn-orchestrator/persistence.tsharness/src/turn-orchestrator/preflight.tsharness/src/turn-orchestrator/provider-stream.tsharness/src/turn-orchestrator/provisioning/load-skills.tsharness/src/turn-orchestrator/provisioning/ports.tsharness/src/turn-orchestrator/provisioning/process.tsharness/src/turn-orchestrator/register.tsharness/src/turn-orchestrator/run-request.tsharness/src/turn-orchestrator/run-start.tsharness/src/turn-orchestrator/run-transition.tsharness/src/turn-orchestrator/schemas.tsharness/src/turn-orchestrator/session-tree-mirror.tsharness/src/turn-orchestrator/state-runtime/ports.tsharness/src/turn-orchestrator/state-runtime/store.tsharness/src/turn-orchestrator/state-runtime/transcript.tsharness/src/turn-orchestrator/state-runtime/turn-end.tsharness/src/turn-orchestrator/state.tsharness/src/turn-orchestrator/states/assistant-finished.tsharness/src/turn-orchestrator/states/assistant-streaming.tsharness/src/turn-orchestrator/states/function-awaiting-approval.tsharness/src/turn-orchestrator/states/function-execute.tsharness/src/turn-orchestrator/states/index.tsharness/src/turn-orchestrator/states/provisioning.tsharness/src/turn-orchestrator/states/steering-check.tsharness/src/turn-orchestrator/states/tearing-down.tsharness/src/turn-orchestrator/steering-check/ports.tsharness/src/turn-orchestrator/steering-check/process.tsharness/src/turn-orchestrator/steering-check/run.tsharness/src/turn-orchestrator/synthetic-assistant.tsharness/src/turn-orchestrator/system-prompt.tsharness/src/turn-orchestrator/turn-state-write.tsharness/src/turn-orchestrator/wake.tsharness/src/types/agent-event.tsharness/src/types/function.tsharness/tests/_helpers/stateStoreKey.tsharness/tests/approval-gate/_helpers/fakeIii.tsharness/tests/approval-gate/resolve.test.tsharness/tests/approval-gate/schemas.test.tsharness/tests/context-compaction/e2e/full-session.test.tsharness/tests/context-compaction/flat-state-key.test.tsharness/tests/context-compaction/integration/flow-sync.test.tsharness/tests/context-compaction/lease.test.tsharness/tests/context-compaction/turn-end-subscription.test.tsharness/tests/harness/fanout/sessions-poll.test.tsharness/tests/harness/policy.test.tsharness/tests/integration/approval-resume.e2e.test.tsharness/tests/integration/on-record-written.e2e.test.tsharness/tests/integration/parallel-approval-harness.tsharness/tests/integration/parallel-approval.e2e.test.tsharness/tests/runtime/state-client.test.tsharness/tests/runtime/state-list.test.tsharness/tests/session/inbox.test.tsharness/tests/session/tree/store.test.tsharness/tests/turn-orchestrator/_helpers/mockTurnStore.tsharness/tests/turn-orchestrator/abort.test.tsharness/tests/turn-orchestrator/agent-trigger.test.tsharness/tests/turn-orchestrator/approval-resume.test.tsharness/tests/turn-orchestrator/assistant-streaming.test.tsharness/tests/turn-orchestrator/assistant.test.tsharness/tests/turn-orchestrator/awaiting-approval.test.tsharness/tests/turn-orchestrator/estimate.test.tsharness/tests/turn-orchestrator/events.test.tsharness/tests/turn-orchestrator/finish.test.tsharness/tests/turn-orchestrator/function-awaiting-approval-state-trigger.test.tsharness/tests/turn-orchestrator/function-awaiting-approval.test.tsharness/tests/turn-orchestrator/function-execute.test.tsharness/tests/turn-orchestrator/functions.test.tsharness/tests/turn-orchestrator/get-state.test.tsharness/tests/turn-orchestrator/on-abort-signal.test.tsharness/tests/turn-orchestrator/parse-turn-state-record.test.tsharness/tests/turn-orchestrator/persistence-prepared.test.tsharness/tests/turn-orchestrator/provider-stream.test.tsharness/tests/turn-orchestrator/provisioning-layer.test.tsharness/tests/turn-orchestrator/provisioning.test.tsharness/tests/turn-orchestrator/run-request.test.tsharness/tests/turn-orchestrator/run-start.test.tsharness/tests/turn-orchestrator/run-transition.test.tsharness/tests/turn-orchestrator/state.test.tsharness/tests/turn-orchestrator/steering-check-layer.test.tsharness/tests/turn-orchestrator/steering.test.tsharness/tests/turn-orchestrator/store.test.tsharness/tests/turn-orchestrator/system-prompt.test.tsharness/tests/turn-orchestrator/tearing-down.test.tsharness/tests/turn-orchestrator/turn-state-write.test.tsharness/tests/turn-orchestrator/wake.test.tsiii-permissions.yamlshell/README.md
💤 Files with no reviewable changes (31)
- harness/tests/context-compaction/flat-state-key.test.ts
- harness/tests/turn-orchestrator/wake.test.ts
- harness/tests/turn-orchestrator/abort.test.ts
- harness/src/turn-orchestrator/states/provisioning.ts
- harness/src/turn-orchestrator/estimate.ts
- harness/src/turn-orchestrator/states/tearing-down.ts
- harness/tests/turn-orchestrator/turn-state-write.test.ts
- harness/tests/turn-orchestrator/approval-resume.test.ts
- harness/src/turn-orchestrator/approval-resume.ts
- iii-permissions.yaml
- harness/tests/turn-orchestrator/on-abort-signal.test.ts
- harness/src/turn-orchestrator/turn-state-write.ts
- harness/src/context-compaction/emit.ts
- harness/src/turn-orchestrator/wake.ts
- shell/README.md
- harness/src/turn-orchestrator/states/function-execute.ts
- harness/src/turn-orchestrator/states/steering-check.ts
- harness/tests/turn-orchestrator/persistence-prepared.test.ts
- harness/tests/turn-orchestrator/awaiting-approval.test.ts
- harness/src/turn-orchestrator/on-abort-signal.ts
- harness/tests/integration/approval-resume.e2e.test.ts
- harness/tests/turn-orchestrator/tearing-down.test.ts
- harness/src/turn-orchestrator/abort.ts
- harness/tests/harness/policy.test.ts
- harness/src/turn-orchestrator/states/index.ts
- harness/src/turn-orchestrator/states/assistant-streaming.ts
- harness/src/turn-orchestrator/persistence.ts
- harness/src/turn-orchestrator/states/assistant-finished.ts
- harness/tests/turn-orchestrator/estimate.test.ts
- harness/src/types/function.ts
- harness/src/turn-orchestrator/states/function-awaiting-approval.ts
| export function parseStateListValues<T>(response: unknown): T[] { | ||
| const arr = stateListResponseRows(response); | ||
| if (!arr) return []; | ||
| return arr.map((entry) => unwrapStateListEntry<T>(entry)); | ||
| } |
There was a problem hiding this comment.
Strict mode silently masks malformed state::list responses.
With tolerant: false, a non-array state::list response should fail fast, but parseStateListValues converts it to []. That can make missing/corrupt state look like “no data” in strict consumers (e.g., session tree reads).
Suggested fix
- const resp = await iii.trigger<StateListInput, unknown>({
+ const resp = await iii.trigger<StateListInput, unknown>({
function_id: 'state::list',
payload: input,
});
- return parseStateListValues<TData>(resp);
+ const rows = stateListResponseRows(resp);
+ if (!rows) throw new Error('state::list returned non-array response');
+ return rows.map((entry) => unwrapStateListEntry<TData>(entry));Also applies to: 138-149
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/runtime/state.ts` around lines 69 - 73, parseStateListValues
currently treats a malformed state::list response as an empty list (returns []),
which silently masks errors; change it to fail fast by checking the result of
stateListResponseRows(response) and throwing a descriptive error (including the
original response) when it is not an array/valid value instead of returning [];
keep using unwrapStateListEntry<T>(entry) to map entries once the array is
validated.
| export async function stateListGroups(iii: ISdk, opts: CreateStateOptions = {}): Promise<string[]> { | ||
| const tolerant = opts.tolerant !== false; | ||
| try { | ||
| const v = await iii.trigger<unknown, unknown>({ | ||
| function_id: 'state::get', | ||
| payload: { scope, key }, | ||
| const result = await iii.trigger<Record<string, never>, StateListGroupsResult | string[]>({ | ||
| function_id: 'state::list_groups', | ||
| payload: {}, | ||
| }); | ||
| if (v === null || v === undefined) return null; | ||
| return v; | ||
| if (Array.isArray(result)) return result; | ||
| return result?.groups ?? []; |
There was a problem hiding this comment.
stateListGroups also bypasses strict-mode contract checks.
When tolerant: false, malformed state::list_groups payloads currently return [] instead of throwing. This hides protocol/schema breakages and can cause silent data invisibility.
Suggested fix
- if (Array.isArray(result)) return result;
- return result?.groups ?? [];
+ if (Array.isArray(result)) return result;
+ if (result && typeof result === 'object' && Array.isArray(result.groups)) {
+ return result.groups;
+ }
+ if (tolerant) return [];
+ throw new Error('state::list_groups returned malformed response');📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export async function stateListGroups(iii: ISdk, opts: CreateStateOptions = {}): Promise<string[]> { | |
| const tolerant = opts.tolerant !== false; | |
| try { | |
| const v = await iii.trigger<unknown, unknown>({ | |
| function_id: 'state::get', | |
| payload: { scope, key }, | |
| const result = await iii.trigger<Record<string, never>, StateListGroupsResult | string[]>({ | |
| function_id: 'state::list_groups', | |
| payload: {}, | |
| }); | |
| if (v === null || v === undefined) return null; | |
| return v; | |
| if (Array.isArray(result)) return result; | |
| return result?.groups ?? []; | |
| export async function stateListGroups(iii: ISdk, opts: CreateStateOptions = {}): Promise<string[]> { | |
| const tolerant = opts.tolerant !== false; | |
| try { | |
| const result = await iii.trigger<Record<string, never>, StateListGroupsResult | string[]>({ | |
| function_id: 'state::list_groups', | |
| payload: {}, | |
| }); | |
| if (Array.isArray(result)) return result; | |
| if (result && typeof result === 'object' && Array.isArray(result.groups)) { | |
| return result.groups; | |
| } | |
| if (tolerant) return []; | |
| throw new Error('state::list_groups returned malformed response'); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/runtime/state.ts` around lines 169 - 177, stateListGroups
currently returns [] for malformed responses even when strict mode is requested;
update stateListGroups (function stateListGroups in runtime/state.ts) to
validate the trigger result when tolerant is false: after awaiting iii.trigger,
if Array.isArray(result) return it; otherwise if tolerant === false assert that
result is an object with a groups property that is an array and throw a clear
error (e.g., "Invalid response from state::list_groups") when the shape is
wrong; only fall back to returning result?.groups ?? [] when tolerant is true.
| const ns = skillIdFromUri(uri).split('/')[0]; | ||
| if (ns) namespaces.add(ns); |
There was a problem hiding this comment.
Guard default-skill URI parsing to preserve best-effort startup.
Line 14 can throw before the download try/catch, which would abort boot and violate the “never abort startup” behavior. Parse failures should be handled per URI and skipped.
Suggested fix
const namespaces = new Set<string>();
for (const uri of cfg.system_default_skills) {
- const ns = skillIdFromUri(uri).split('/')[0];
- if (ns) namespaces.add(ns);
+ try {
+ const ns = skillIdFromUri(uri).split('/')[0];
+ if (ns) namespaces.add(ns);
+ } catch (err) {
+ logger.debug('invalid default skill uri (best-effort)', {
+ uri,
+ err: String(err),
+ });
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const ns = skillIdFromUri(uri).split('/')[0]; | |
| if (ns) namespaces.add(ns); | |
| try { | |
| const ns = skillIdFromUri(uri).split('/')[0]; | |
| if (ns) namespaces.add(ns); | |
| } catch (err) { | |
| logger.debug('invalid default skill uri (best-effort)', { | |
| uri, | |
| err: String(err), | |
| }); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/bootstrap.ts` around lines 14 - 15, The current
call to skillIdFromUri(uri) can throw and is executed outside the per-URI error
handling, which may abort startup; change this to guard parsing per-URI by
wrapping the skillIdFromUri(uri).split('/')[0] extraction in a try/catch (for
the local variable ns) so any parse error for a single URI is caught, logged or
ignored, and that URI is skipped instead of throwing, and only call
namespaces.add(ns) when ns is successfully obtained; update the logic around the
const ns assignment and namespaces.add to use that guarded flow.
| const current = work.prepared.find((p) => p.call.id === callId)!; | ||
| const resolved = applyDecisionToPrepared(current, decision); | ||
| await runOneCall(executePorts, rec.session_id, resolved, executed, { skipStart: true }); |
There was a problem hiding this comment.
Handle missing prepared-call invariant before execution.
Line 66 assumes every awaiting_approval entry exists in work.prepared. If state is inconsistent, this crashes mid-transition. Convert this to an explicit invariant failure with context.
Suggested fix
+import { TurnStateInvariantError } from '../errors.js';
...
- const current = work.prepared.find((p) => p.call.id === callId)!;
+ const current = work.prepared.find((p) => p.call.id === callId);
+ if (!current) {
+ throw new TurnStateInvariantError(
+ `Missing prepared call for awaiting approval function_call_id=${callId}`,
+ );
+ }
const resolved = applyDecisionToPrepared(current, decision);🧰 Tools
🪛 GitHub Check: harness: node lint + test
[warning] 66-66: lint/style/noNonNullAssertion
Forbidden non-null assertion.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/function-awaiting-approval/run.ts` around lines
66 - 68, The code assumes work.prepared.find((p) => p.call.id === callId) always
returns a value; change this to explicitly check the result and throw a clear
invariant error with context if missing before calling applyDecisionToPrepared
and runOneCall: retrieve the found item into a variable (e.g., currentPrepared),
if it's undefined throw an Error (or use your project's invariant helper) that
includes callId, rec.session_id and a brief snapshot (e.g., list of prepared
call ids) so the failure is explicit; only call
applyDecisionToPrepared(currentPrepared, decision) and await runOneCall(...)
when currentPrepared is present.
| const triggerPromise = iii | ||
| .trigger<unknown, unknown>({ | ||
| function_id: params.targetFn, | ||
| payload: params.buildInput(channel.writerRef as StreamChannelRef), | ||
| timeoutMs: PROVIDER_STREAM_TIMEOUT_MS, | ||
| }) | ||
| .catch((err) => { | ||
| logger.warn('provider stream trigger failed', { | ||
| targetFn: params.targetFn, | ||
| err: String(err), | ||
| }); | ||
| error = formatProviderError(err); | ||
| pump.end(); | ||
| return null; | ||
| }); | ||
|
|
||
| let final: AssistantMessage | null = null; | ||
| for await (const text of pump.drain()) { | ||
| const event = parseEvent(text, params.session_id); | ||
| if (!event) continue; | ||
| if (event.type === 'done' || event.type === 'error') { | ||
| final = eventMessage(event); | ||
| break; | ||
| } | ||
| const partial = eventMessage(event); | ||
| if (partial) { | ||
| final = partial; | ||
| await params.onDelta(partial, event); | ||
| } | ||
| } | ||
| pump.end(); | ||
| await triggerPromise; |
There was a problem hiding this comment.
Make provider-stream teardown unconditional.
If onDelta on Line 146 throws, this function exits before pump.end() and await triggerPromise, so the original provider invocation can keep running while the durable step retries. Also, the happy path only stops reading after a done/error frame, so a dropped or malformed terminal frame leaves pump.drain() blocked forever. Wrap the drain in try/finally, and tie pump.end() to trigger completion or reader close.
Possible local fix
- const triggerPromise = iii
+ const triggerPromise = iii
.trigger<unknown, unknown>({
function_id: params.targetFn,
payload: params.buildInput(channel.writerRef as StreamChannelRef),
timeoutMs: PROVIDER_STREAM_TIMEOUT_MS,
})
.catch((err) => {
logger.warn('provider stream trigger failed', {
targetFn: params.targetFn,
err: String(err),
});
error = formatProviderError(err);
- pump.end();
return null;
+ })
+ .finally(() => {
+ pump.end();
});
let final: AssistantMessage | null = null;
- for await (const text of pump.drain()) {
- const event = parseEvent(text, params.session_id);
- if (!event) continue;
- if (event.type === 'done' || event.type === 'error') {
- final = eventMessage(event);
- break;
- }
- const partial = eventMessage(event);
- if (partial) {
- final = partial;
- await params.onDelta(partial, event);
+ try {
+ for await (const text of pump.drain()) {
+ const event = parseEvent(text, params.session_id);
+ if (!event) continue;
+ if (event.type === 'done' || event.type === 'error') {
+ final = eventMessage(event);
+ break;
+ }
+ const partial = eventMessage(event);
+ if (partial) {
+ final = partial;
+ await params.onDelta(partial, event);
+ }
}
+ } finally {
+ pump.end();
+ await triggerPromise;
}
- pump.end();
- await triggerPromise;
return { final, error };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/provider-stream.ts` around lines 119 - 150, The
drain loop can throw (e.g., params.onDelta) and currently skips pump.end() and
awaiting the provider trigger; wrap the for-await-of over pump.drain() in a
try/finally so that in the finally block you always call pump.end() and await
triggerPromise (or otherwise ensure triggerPromise completes), and ensure the
reader is unblocked on reader close by tying pump.end() to trigger completion or
reader close; update references: the async iteration over pump.drain(), the
triggerPromise created from iii.trigger({...}), and params.onDelta — move
cleanup (pump.end() and await triggerPromise) into the finally to guarantee
teardown even on exceptions or missing terminal frames.
| async drainInbox(name, session_id) { | ||
| try { | ||
| const resp = await iii.trigger<unknown, unknown>({ | ||
| function_id: 'session-inbox::drain', | ||
| payload: { name, session_id }, | ||
| }); | ||
| return parseDrainItems(resp); | ||
| } catch { | ||
| return []; | ||
| } |
There was a problem hiding this comment.
Don’t map drain failures to “no messages.”
Line 37–38 converts session-inbox::drain failures into [], which makes transient backend errors indistinguishable from a truly empty inbox and can skip steering/followup work for this turn. Propagate the error (or explicit transient error) so the step can retry.
Proposed change
async drainInbox(name, session_id) {
try {
const resp = await iii.trigger<unknown, unknown>({
function_id: 'session-inbox::drain',
payload: { name, session_id },
});
return parseDrainItems(resp);
- } catch {
- return [];
+ } catch (err) {
+ throw err;
}
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async drainInbox(name, session_id) { | |
| try { | |
| const resp = await iii.trigger<unknown, unknown>({ | |
| function_id: 'session-inbox::drain', | |
| payload: { name, session_id }, | |
| }); | |
| return parseDrainItems(resp); | |
| } catch { | |
| return []; | |
| } | |
| async drainInbox(name, session_id) { | |
| try { | |
| const resp = await iii.trigger<unknown, unknown>({ | |
| function_id: 'session-inbox::drain', | |
| payload: { name, session_id }, | |
| }); | |
| return parseDrainItems(resp); | |
| } catch (err) { | |
| throw err; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/src/turn-orchestrator/steering-check/ports.ts` around lines 30 - 39,
The current drainInbox function swallows errors from iii.trigger and returns []
which hides transient backend failures; update drainInbox (the async
drainInbox(name, session_id) that calls iii.trigger and parseDrainItems) to stop
converting exceptions to an empty array — either remove the try/catch so errors
naturally propagate, or replace the empty-array catch with a rethrow (or throw a
new descriptive error including the original error) so callers can detect
transient failures and retry rather than treating them as an empty inbox.
| await h.resolveApproval('sess-dup', 'fc-1', 'allow'); | ||
| const endsAfterFirst = executionEvents(h.emitted, 'function_execution_end', 'fc-1').length; | ||
|
|
||
| await h.resolveApproval('sess-dup', 'fc-1', 'allow'); | ||
| const rec = h.loadTurnRecord('sess-dup'); | ||
|
|
||
| expect(rec?.awaiting_approval).toEqual([]); | ||
| expect(executionEvents(h.emitted, 'function_execution_end', 'fc-1')).toHaveLength( | ||
| endsAfterFirst, | ||
| ); |
There was a problem hiding this comment.
Strengthen the idempotency assertion baseline.
Line 146 records the first function_execution_end count but never verifies it is non-zero. If both first and second resolves emit 0, this test still passes and hides a regression.
Suggested patch
await h.resolveApproval('sess-dup', 'fc-1', 'allow');
const endsAfterFirst = executionEvents(h.emitted, 'function_execution_end', 'fc-1').length;
+ expect(endsAfterFirst).toBe(1);
await h.resolveApproval('sess-dup', 'fc-1', 'allow');
const rec = h.loadTurnRecord('sess-dup');🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/tests/integration/parallel-approval.e2e.test.ts` around lines 145 -
154, The test records endsAfterFirst via executionEvents(h.emitted,
'function_execution_end', 'fc-1') but never verifies it's non-zero, which can
mask regressions; after computing endsAfterFirst (the first call to
h.resolveApproval), add an assertion that endsAfterFirst is greater than zero
(e.g., expect(endsAfterFirst).toBeGreaterThan(0)) before calling
h.resolveApproval again, so h.resolveApproval, executionEvents, h.emitted, and
the subsequent loadTurnRecord/rec?.awaiting_approval checks operate against a
valid non-zero baseline.
| const fc1Ends = emitted.filter( | ||
| (e) => e.type === 'function_execution_end' && e.function_call_id === 'fc-1', | ||
| ); | ||
| expect(fc1Ends).toHaveLength(0); | ||
| }); |
There was a problem hiding this comment.
Align re-entry end-event expectation with the current execution contract.
Line 191–195 expects no function_execution_end replay for an already executed call, but the run-path contract replays end events for skipped executed calls. This assertion can make the suite inconsistent and brittle.
Suggested fix
- expect(fc1Ends).toHaveLength(0);
+ expect(fc1Ends).toHaveLength(1);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const fc1Ends = emitted.filter( | |
| (e) => e.type === 'function_execution_end' && e.function_call_id === 'fc-1', | |
| ); | |
| expect(fc1Ends).toHaveLength(0); | |
| }); | |
| const fc1Ends = emitted.filter( | |
| (e) => e.type === 'function_execution_end' && e.function_call_id === 'fc-1', | |
| ); | |
| expect(fc1Ends).toHaveLength(1); | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/tests/turn-orchestrator/functions.test.ts` around lines 191 - 195,
The test currently asserts that there are no replayed 'function_execution_end'
events for a previously executed call (it filters emitted for events with type
'function_execution_end' and function_call_id 'fc-1' into fc1Ends and expects
length 0); update this to align with the run-path contract by expecting the
replayed end event to be present (change the expectation on fc1Ends from 0 to 1)
so the test accepts the end-event replay for skipped/executed calls.
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'turn_end' })); | ||
| expect(loadSpy).not.toHaveBeenCalled(); | ||
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'agent_end' })); |
There was a problem hiding this comment.
Assert turn_end occurs before agent_end, not just that both exist.
Line 140 and Line 141 only verify presence, so an ordering regression would still pass despite the test name requiring sequence.
Proposed assertion tightening
expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'turn_end' }));
expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'agent_end' }));
+ const emittedTypes = emitSpy.mock.calls.map(([, , evt]) => (evt as { type?: string }).type);
+ expect(emittedTypes.indexOf('turn_end')).toBeLessThan(emittedTypes.indexOf('agent_end'));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'turn_end' })); | |
| expect(loadSpy).not.toHaveBeenCalled(); | |
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'agent_end' })); | |
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'turn_end' })); | |
| expect(emitSpy).toHaveBeenCalledWith(iii, 's1', expect.objectContaining({ type: 'agent_end' })); | |
| const emittedTypes = emitSpy.mock.calls.map(([, , evt]) => (evt as { type?: string }).type); | |
| expect(emittedTypes.indexOf('turn_end')).toBeLessThan(emittedTypes.indexOf('agent_end')); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/tests/turn-orchestrator/steering.test.ts` around lines 140 - 141, The
test currently only checks that emitSpy was called with events 'turn_end' and
'agent_end' but not their order; update the assertions to assert ordering by
inspecting emitSpy.mock.calls for the call indices of the two events (or use
expect(emitSpy).toHaveBeenNthCalledWith if available) and assert that the index
for the call containing { type: 'turn_end' } is less than the index for the call
containing { type: 'agent_end' } — locate and modify the assertions around
emitSpy, iii and 's1' to implement this ordered check.
| it('swallows emit failures (logs only, never rethrows)', async () => { | ||
| const iii = { | ||
| trigger: vi.fn(async () => { | ||
| throw new Error('stream::set down'); | ||
| }), | ||
| } as unknown as ISdk; | ||
| const store = createTurnStore(iii); | ||
| const rec = newRecord('sess-a'); | ||
| await expect(store.saveRecord(rec)).resolves.toBeUndefined(); | ||
| }); |
There was a problem hiding this comment.
The “emit failure” test currently also simulates state-write failure.
Line 67–69 throws for all iii.trigger calls, so this test can pass even if saveRecord incorrectly swallows state::set failures. Scope the throw to stream::set only.
Proposed mock isolation
- const iii = {
- trigger: vi.fn(async () => {
- throw new Error('stream::set down');
- }),
- } as unknown as ISdk;
+ const iii = {
+ trigger: vi.fn(async ({ function_id, payload }: { function_id: string; payload: unknown }) => {
+ if (function_id === 'state::set') {
+ return { old_value: null, new_value: (payload as { value: unknown }).value };
+ }
+ if (function_id === 'stream::set') {
+ throw new Error('stream::set down');
+ }
+ return null;
+ }),
+ } as unknown as ISdk;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it('swallows emit failures (logs only, never rethrows)', async () => { | |
| const iii = { | |
| trigger: vi.fn(async () => { | |
| throw new Error('stream::set down'); | |
| }), | |
| } as unknown as ISdk; | |
| const store = createTurnStore(iii); | |
| const rec = newRecord('sess-a'); | |
| await expect(store.saveRecord(rec)).resolves.toBeUndefined(); | |
| }); | |
| it('swallows emit failures (logs only, never rethrows)', async () => { | |
| const iii = { | |
| trigger: vi.fn(async ({ function_id, payload }: { function_id: string; payload: unknown }) => { | |
| if (function_id === 'state::set') { | |
| return { old_value: null, new_value: (payload as { value: unknown }).value }; | |
| } | |
| if (function_id === 'stream::set') { | |
| throw new Error('stream::set down'); | |
| } | |
| return null; | |
| }), | |
| } as unknown as ISdk; | |
| const store = createTurnStore(iii); | |
| const rec = newRecord('sess-a'); | |
| await expect(store.saveRecord(rec)).resolves.toBeUndefined(); | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@harness/tests/turn-orchestrator/store.test.ts` around lines 65 - 74, The
test's mock IISdk (iii.trigger) currently throws for every call, so it
accidentally simulates both "stream::set" and "state::set" failures; change the
mock used in the test for createTurnStore so iii.trigger only throws when the
event name equals "stream::set" (inspect the first arg passed to iii.trigger and
only throw for "stream::set"), leaving calls for "state::set" to succeed; keep
references to iii.trigger, ISdk, createTurnStore and saveRecord so reviewers can
locate the change.
Results
Latency on the turn critical path
after-function-callhook short-circuitspublish_collectvia a subscriber-presence cache when no durable subscriber is registered for the topic — the fixed collect wait is gone from the common path.tearing_downstate is gone; teardown is an inlinefinishSession()port (emitagent_end→stopped) called from the turn-end paths instead of a separate enqueued step.agent::turn_endstream (mirrored by the event producer) instead of the fullagent::eventsfirehose.turn_statematches by scope alone (nocondition_function_id); the handler gates onstate:createdevents in-process. The per-writeharness::session::is_create_eventRPC is gone.recoverPendingApprovalsscan — the singleturn::on_approvalstate trigger covers every pending call.Concurrency
function_executekeeps dispatching the rest of a batch while individual calls park toawaiting_approval[];function_awaiting_approvalexecutes each call as its decision lands (allow → pre-approved dispatch, deny/aborted → synthetic denial), advancing only when no entries remain.processResolvedApprovalscheckpoints per resolved call.Code reduction
assistant_streaminghandler: ~260 → ~150 lines after extractingprovider-stream.ts(channel creation, provider trigger, read loop behind a pull-basedMessagePump).synthetic-assistant.ts) replaces four hand-built error / abort / max_turnsAssistantMessageliterals.approval-resume.ts, the per-callturn::approval_resume::<sid>/<cid>registration, theabort.ts/on-abort-signal.ts/turn_abort-scope subgraph, thetearing_downstate, the standalonewakeStep/wakeFromRecordmethods, theon-record-writtenadapter, the deadpending_function_callsfield, and the stalestaging_*keys.Architecture
provisioning,assistant_streaming,function_execute,function_awaiting_approval,steering_check,stopped,failed);tearing_downfolded into the inlinefinishSession()port.turn::on_approvalstate trigger on scopeapprovalsdrives every wake, replacing N per-call registrations and the startup re-scan.ports.ts(injected deps) +process.ts(registered entry) +run.ts(pure transition). Shared runtime helpers (store, ports, transcript, turn-end emit) live understate-runtime/.saveRecordwake:TurnStore.saveRecordenqueuesturn::{newState}on theturn-stepFIFO directly viashouldWakeStep;turn_state_changedevents are emitted inline frompersistRecordfor the same reason — both adapters gone.Reliability
runTransitioncentralizesload → null-check → stale-skip → handle → savefor every state. An unexpected handler throw routes the session tofailed(acked, queue stops retrying) and surfacesmessage_complete{stop_reason:'error'}+agent_endso the UI shows the reason.TransientError.Turn FSM
Test plan
pnpm -C harness exec vitest run— full suite green (942 tests)pnpm -C harness typecheck— cleanmain; no remaining conflictsharness: node lint + test, rust lints, PR checks;skill-checkis a pre-existingmainfailure unrelated to this PR)Summary by CodeRabbit
Bug Fixes
failedstate.Refactor