Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dashboard/public/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,17 @@ let viewerMode = null; // "conversation" | "status-md" | null
let viewerTarget = null; // session name (conversation) or taskId (status-md)
let lastBatchId = null; // TP-178: track batchId for stale viewer detection (#487)

// #507: Debounce the no-batch transition. A single missed poll happens
// transiently during batch-state.json writes at batch startup, and was
// causing the dashboard to flash the previous batch's history view before
// switching to the new live batch. Require N consecutive no-batch polls
// before clearing the viewer / showing history. With the server's 2s
// POLL_INTERVAL, a threshold of 3 corresponds to ~6s of confirmed silence —
// well past the typical batch-state.json write window (sub-second) while
// still cleaning up promptly when a batch genuinely ends.
let consecutiveNoBatchPolls = 0;
const NO_BATCH_DEBOUNCE_THRESHOLD = 3;

// ─── Repo Helpers ───────────────────────────────────────────────────────────

/**
Expand Down Expand Up @@ -1818,6 +1829,16 @@ function render(data) {
$lastUpdate.textContent = new Date().toLocaleTimeString();

if (!batch) {
// #507: A single missed poll during batch startup (batch-state.json being
// written) is not a real "batch disappeared" signal. Only act on no-batch
// after N consecutive polls confirm it, so we don't flash the history
// view between two live batches.
consecutiveNoBatchPolls += 1;
if (consecutiveNoBatchPolls < NO_BATCH_DEBOUNCE_THRESHOLD) {
// Hold the previous render in place. Still tick the timestamp so the
// user knows the SSE stream is alive.
return;
}
// TP-178: Clear viewer when batch disappears (#487)
if (lastBatchId && viewerMode) closeViewer();
lastBatchId = null;
Expand All @@ -1830,6 +1851,9 @@ function render(data) {
return;
}

// Batch present — reset the no-batch debounce counter (#507).
consecutiveNoBatchPolls = 0;

// TP-178: Detect batchId change — clear stale viewer state (#487)
if (batch.batchId && lastBatchId && batch.batchId !== lastBatchId && viewerMode) {
closeViewer();
Expand Down
29 changes: 25 additions & 4 deletions dashboard/server.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,26 @@ function loadRuntimeLaneSnapshots(batchId) {
/**
* Load Runtime V2 merge agent snapshots for the current batch.
*
* Reads all `merge-N.json` files from `.pi/runtime/{batchId}/lanes/`.
* Returns a map of mergeNumber (string) → snapshot data.
* Reads all `merge-*.json` files from `.pi/runtime/{batchId}/lanes/`.
* Returns a map of unique key → snapshot data, where the key is a composite
* of waveIndex and mergeNumber.
*
* The composite key is essential because lane numbers (and therefore
* `mergeNumber`) repeat across waves — keying solely by `mergeNumber` caused
* wave N+1's snapshots to silently overwrite wave N's in the intermediate
* map, which is the root cause of #509 ('merge agent telemetry missing for
* some waves').
*
* Follows the same pattern as {@link loadRuntimeLaneSnapshots}.
*
* @since TP-164
* Filename accepted patterns (back-compat-tolerant):
* merge-w{waveIndex}-{mergeNumber}.json (current, post-#509)
* merge-{mergeNumber}.json (legacy, pre-#509)
*
* Both patterns embed waveIndex inside the snapshot JSON itself, so the key
* derivation works for either filename.
*
* @since TP-164 (composite key added in #509 remediation)
*/
function loadRuntimeMergeSnapshots(batchId) {
if (!batchId) return {};
Expand All @@ -484,7 +498,14 @@ function loadRuntimeMergeSnapshots(batchId) {
for (const file of files) {
try {
const data = JSON.parse(fs.readFileSync(path.join(lanesDir, file), "utf-8"));
if (data.mergeNumber != null) snapshots[data.mergeNumber] = data;
if (data.mergeNumber == null) continue;
// Composite key keeps cross-wave snapshots from colliding in this map.
// Falls back to mergeNumber-only for legacy snapshots that pre-date
// the waveIndex-in-filename change.
const key = data.waveIndex != null
? `w${data.waveIndex}-${data.mergeNumber}`
: String(data.mergeNumber);
snapshots[key] = data;
} catch { continue; }
}
} catch { /* dir missing */ }
Expand Down
8 changes: 4 additions & 4 deletions extensions/taskplane/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ export async function spawnMergeAgentV2(
agent: buildAgentSnap(tel, "running"),
updatedAt: Date.now(),
};
writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap);
writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap);
} catch {
/* non-fatal */
}
Expand All @@ -915,7 +915,7 @@ export async function spawnMergeAgentV2(
agent: buildAgentSnap({}, "running"),
updatedAt: Date.now(),
};
writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, initialSnap);
writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, initialSnap);
} catch {
/* non-fatal */
}
Expand Down Expand Up @@ -964,7 +964,7 @@ export async function spawnMergeAgentV2(
agent: buildAgentSnap(result, terminalStatus === "complete" ? "exited" : "crashed"),
updatedAt: Date.now(),
};
writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap);
writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap);
} catch {
/* non-fatal */
}
Expand All @@ -987,7 +987,7 @@ export async function spawnMergeAgentV2(
agent: buildAgentSnap({}, "crashed"),
updatedAt: Date.now(),
};
writeMergeSnapshot(mergeStateRoot, bid, mergeNumber, snap);
writeMergeSnapshot(mergeStateRoot, bid, waveIndex ?? 0, mergeNumber, snap);
} catch {
/* non-fatal */
}
Expand Down
15 changes: 11 additions & 4 deletions extensions/taskplane/process-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,20 +400,25 @@ export function readLaneSnapshot(
* Stored in the `lanes/` directory alongside lane snapshots so the dashboard
* server picks it up with the same scan that reads lane-N.json files.
*
* Filename includes BOTH waveIndex and mergeNumber so wave-N+1's merges
* cannot overwrite wave-N's snapshots before the dashboard polls them (#509).
*
* @param stateRoot - Repository root (where `.pi/` lives)
* @param batchId - Current batch identifier
* @param waveIndex - 0-based wave index for the merge
* @param mergeNumber - 1-indexed merge agent number
* @param snapshot - Snapshot data to persist
*
* @since TP-164
* @since TP-164 (waveIndex parameter added in #509 remediation)
*/
export function writeMergeSnapshot(
stateRoot: string,
batchId: string,
waveIndex: number,
mergeNumber: number,
snapshot: RuntimeMergeSnapshot,
): void {
const path = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber);
const path = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber);
mkdirSync(dirname(path), { recursive: true });
const tmpPath = path + ".tmp";
writeFileSync(tmpPath, JSON.stringify(snapshot, null, 2) + "\n", "utf-8");
Expand All @@ -426,17 +431,19 @@ export function writeMergeSnapshot(
*
* @param stateRoot - Repository root (where `.pi/` lives)
* @param batchId - Current batch identifier
* @param waveIndex - 0-based wave index for the merge
* @param mergeNumber - 1-indexed merge agent number
*
* @since TP-164
* @since TP-164 (waveIndex parameter added in #509 remediation)
*/
export function readMergeSnapshot(
stateRoot: string,
batchId: string,
waveIndex: number,
mergeNumber: number,
): RuntimeMergeSnapshot | null {
try {
const p = runtimeMergeSnapshotPath(stateRoot, batchId, mergeNumber);
const p = runtimeMergeSnapshotPath(stateRoot, batchId, waveIndex, mergeNumber);
if (!existsSync(p)) return null;
return JSON.parse(readFileSync(p, "utf-8")) as RuntimeMergeSnapshot;
} catch {
Expand Down
16 changes: 15 additions & 1 deletion extensions/taskplane/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4323,12 +4323,26 @@ export interface RuntimeMergeSnapshot {
*
* @since TP-164
*/
/**
* Path to a merge agent snapshot file.
*
* The filename includes BOTH `waveIndex` and `mergeNumber` because lane
* numbers (and therefore the legacy `mergeNumber`-only filename) repeat
* across waves — a wave-2 lane-1 merge would overwrite the wave-1 lane-1
* snapshot before the dashboard's next poll could read it. Per-wave
* namespacing keeps each merge's snapshot durable until the runtime
* directory itself is cleaned up at end-of-batch. See #509.
*
* @param waveIndex 0-based wave index for the merge
* @param mergeNumber 1-based merge agent number (derived from lane number)
*/
export function runtimeMergeSnapshotPath(
stateRoot: string,
batchId: string,
waveIndex: number,
mergeNumber: number,
): string {
return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-${mergeNumber}.json`;
return `${stateRoot}/.pi/runtime/${batchId}/lanes/merge-w${waveIndex}-${mergeNumber}.json`;
}

/**
Expand Down
85 changes: 85 additions & 0 deletions extensions/tests/process-registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ import {
cleanupBatchRuntime,
appendAgentEvent,
writeLaneSnapshot,
writeMergeSnapshot,
readMergeSnapshot,
} from "../taskplane/process-registry.ts";

import {
runtimeManifestPath,
runtimeRegistryPath,
runtimeAgentEventsPath,
runtimeLaneSnapshotPath,
runtimeMergeSnapshotPath,
type RuntimeAgentManifest,
type RuntimeMergeSnapshot,
} from "../taskplane/types.ts";

let tmpDir: string;
Expand Down Expand Up @@ -417,6 +421,87 @@ describe("7.x: Event and snapshot persistence", () => {
const data = JSON.parse(readFileSync(path, "utf-8"));
expect(data.laneNumber).toBe(1);
});

// ── #509 regression: merge snapshots are namespaced per wave ──────────
// Pre-fix, writeMergeSnapshot keyed the on-disk filename by mergeNumber
// alone (which is derived from lane.laneNumber). Because lane numbers
// reset every wave, wave N+1's first merge would overwrite wave N's
// first merge before the dashboard could observe the terminal snapshot,
// causing the dashboard's merge telemetry column to render '—' for any
// wave whose lane numbers were reused by a subsequent wave.

function sampleSnapshot(
waveIndex: number,
mergeNumber: number,
status: RuntimeMergeSnapshot["status"] = "complete",
): RuntimeMergeSnapshot {
return {
batchId,
mergeNumber,
sessionName: `orch-${batchId}-merge-w${waveIndex}-${mergeNumber}`,
waveIndex,
status,
agent: {
contextPct: 0,
costUsd: 0,
elapsedMs: 0,
inputTokens: 0,
outputTokens: 0,
cacheReadTokens: 0,
cacheWriteTokens: 0,
toolCalls: 0,
lastTool: "",
status: status === "running" ? "running" : "exited",
},
updatedAt: Date.now(),
} as RuntimeMergeSnapshot;
}

it("7.4: writeMergeSnapshot namespaces filename by wave (#509 regression)", () => {
writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1));
const expected = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1);
expect(existsSync(expected)).toBe(true);
// The new filename must include the waveIndex so wave-N+1's lane-1
// merge cannot reuse the same path as wave-N's lane-1 merge.
expect(expected).toMatch(/merge-w0-1\.json$/);
});

it("7.5: same mergeNumber across two waves writes to distinct files (#509)", () => {
writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete"));
writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running"));

const wave0Path = runtimeMergeSnapshotPath(tmpDir, batchId, 0, 1);
const wave1Path = runtimeMergeSnapshotPath(tmpDir, batchId, 1, 1);

expect(wave0Path).not.toBe(wave1Path);
expect(existsSync(wave0Path)).toBe(true);
expect(existsSync(wave1Path)).toBe(true);
});

it("7.6: writing wave 1 does not overwrite wave 0 with same mergeNumber (#509)", () => {
// This is the exact failure mode from the issue: wave-2 lane-1 merge
// trampling wave-1 lane-1's terminal snapshot.
writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1, "complete"));
writeMergeSnapshot(tmpDir, batchId, 1, 1, sampleSnapshot(1, 1, "running"));

const wave0Snap = readMergeSnapshot(tmpDir, batchId, 0, 1);
const wave1Snap = readMergeSnapshot(tmpDir, batchId, 1, 1);

expect(wave0Snap).not.toBeNull();
expect(wave1Snap).not.toBeNull();
expect(wave0Snap?.waveIndex).toBe(0);
expect(wave0Snap?.status).toBe("complete");
expect(wave1Snap?.waveIndex).toBe(1);
expect(wave1Snap?.status).toBe("running");
});

it("7.7: readMergeSnapshot returns null for absent (wave, mergeNumber) tuple (#509)", () => {
writeMergeSnapshot(tmpDir, batchId, 0, 1, sampleSnapshot(0, 1));
// Same wave, different mergeNumber — absent
expect(readMergeSnapshot(tmpDir, batchId, 0, 99)).toBeNull();
// Different wave, same mergeNumber — also absent (no cross-wave fallback)
expect(readMergeSnapshot(tmpDir, batchId, 5, 1)).toBeNull();
});
});

// ── 8. Agent-host export contract ───────────────────────────────────
Expand Down