Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dispatch/completion.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const HUMAN_SUMMARY_SECTION_RE = /(?:^|\n)\s*(?:Human-readable summary|Human sum
const TECHNICAL_DETAILS_SECTION_RE = /(?:^|\n)\s*(?:Technical details?|Details(?:_technical)?)\s*:\s*/i;
const HUMAN_SUMMARY_LABEL_RE = /^(?:human-readable summary|human summary)\s*:\s*/i;
const TECHNICAL_DETAILS_LABEL_RE = /^(?:technical details?|details(?:_technical)?)\s*:\s*/i;
const FINAL_REPORT_HEADING_RE = /^(?:#{1,6}\s*)?(?:root cause|files? changed|changes|validation|tests?(?: run| passed)?|sacrificial(?: delivery)?(?: result)?|deployment(?:\/live-runtime)?(?: step)?|live-runtime(?: step)?|result|results|summary|highlights?|notes?|follow[- ]ups?|next steps?|blockers?|implementation|what changed|verification)\s*:?$/i;
const FINAL_REPORT_CUE_RE = /\b(?:root cause|files? changed|tests? run|validation|sacrificial(?: delivery)?(?: result)?|deployment(?:\/live-runtime)?(?: step)?|live-runtime(?: step)?|final report|human-readable report|files changed|tests passed)\b/i;

export function normalizeCompletionText(value) {
if (typeof value !== 'string') return null;
Expand Down Expand Up @@ -76,6 +78,56 @@ function cleanMarkdown(text) {
.replace(/^>\s?/gm, '');
}

function normalizeReportLineEndings(text) {
const normalized = normalizeCompletionText(text);
if (!normalized) return null;
return stripAnsi(normalized)
.replace(/\r\n?/g, '\n')
.split('\n')
.map(line => line.replace(/[ \t]+$/g, ''))
.join('\n')
.replace(/\n{3,}/g, '\n\n')
.trim();
}

function isLikelyHumanFinalReport(text) {
const normalized = normalizeReportLineEndings(text);
if (!normalized) return false;
if (isGenericOrTrivial(normalized)) return false;
if (isInternalTransportNoiseText(normalized)) return false;
if (looksLikeRawPayloadText(normalized)) return false;
if (looksLikeGunbrokerReport(normalized)) return false;

const rawLines = normalized
.split('\n')
.map(line => line.trim())
.filter(Boolean)
.filter(line => !/^```/.test(line));
if (rawLines.length < 3) return false;

const cleanedLines = rawLines.map(line => cleanMarkdown(line).replace(/\s+/g, ' ').trim());
const headingCount = cleanedLines.filter(line => FINAL_REPORT_HEADING_RE.test(line)).length;
const itemCount = rawLines.filter(isItemLine).length;
const hasCue = FINAL_REPORT_CUE_RE.test(normalized);
const hasSectionLabel = /^#{1,6}\s+\S|^[A-Za-z][A-Za-z0-9 /_-]{2,60}:$/m.test(normalized);

// This is the key path for real completion reports from agents: multiple
// human-readable sections plus bullets. Those reports are already the final
// answer and must not be collapsed into "Files changed: Validation: ...".
if (hasCue && headingCount >= 2 && (itemCount >= 1 || rawLines.length >= 5)) return true;

// Allow slightly shorter reports with an explicit root cause / validation shape.
if (hasCue && headingCount >= 1 && itemCount >= 2 && hasSectionLabel) return true;

return false;
}

function getPassThroughHumanFinalReport(text) {
const normalized = normalizeReportLineEndings(text);
if (!normalized) return null;
return isLikelyHumanFinalReport(normalized) ? normalized : null;
}

function isGenericOrTrivial(text) {
const normalized = normalizeCompletionText(text)?.toLowerCase().replace(/\s+/g, ' ').trim();
if (!normalized) return true;
Expand Down Expand Up @@ -758,6 +810,9 @@ export function humanizeCompletionText(value) {
const raw = normalizeCompletionText(value);
if (!raw) return null;

const passThroughReport = getPassThroughHumanFinalReport(raw);
if (passThroughReport) return passThroughReport;

const structuredSections = extractStructuredSummarySections(raw);
const summarySource = normalizeCompletionText(structuredSections?.summary || raw);
if (!summarySource) return null;
Expand Down Expand Up @@ -894,6 +949,9 @@ export function summarizeCompletionText(value, { skipEmbeddedObject = false } =
const raw = normalizeCompletionText(value);
if (!raw) return null;

const passThroughReport = getPassThroughHumanFinalReport(raw);
if (passThroughReport) return passThroughReport;

if (!skipEmbeddedObject) {
const parsed = extractEmbeddedCompletionObject(raw);
if (parsed !== null) {
Expand Down
98 changes: 58 additions & 40 deletions dispatch/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ function setLabel(name, data) {
return labels[name];
}

function setLabelDone(name, data) {
const labels = mutateLabels((current) => {
current[name] = {
...current[name],
...data,
status: 'done',
updatedAt: new Date().toISOString(),
};
delete current[name].error;
});
return labels[name];
}

// -- Gateway Calls --------------------------------------------

/**
Expand Down Expand Up @@ -352,14 +365,27 @@ function getSessionJsonlPath(agent = 'main', sessionId) {

function inspectSessionActivitySignal(sessionKey, sessionsStore) {
if (!sessionKey || !sessionsStore?.[sessionKey]) {
return { found: false, hasActivitySignal: false, messageCount: null, jsonlExists: false, hasTokens: false, updatedAtMs: null };
return {
found: false,
hasStartedSignal: false,
hasActivitySignal: false,
messageCount: null,
jsonlExists: false,
hasTokens: false,
updatedAtMs: null,
sessionStartedAtMs: null,
sessionId: null,
};
}

const agent = agentFromSessionKey(sessionKey) || 'main';
const entry = sessionsStore[sessionKey];
const jsonlPath = getSessionJsonlPath(agent, entry.sessionId);
const jsonlExists = jsonlPath ? existsSync(jsonlPath) : false;
const hasTokens = typeof entry.totalTokens === 'number' && entry.totalTokens > 0;
const sessionStartedAtMs = toTimestampMs(entry.sessionStartedAt || entry.startedAt);
const updatedAtMs = toTimestampMs(entry.updatedAt);
const hasStartedSignal = Boolean(entry.sessionId) || sessionStartedAtMs !== null || updatedAtMs !== null;
let messageCount = null;

try {
Expand All @@ -371,11 +397,14 @@ function inspectSessionActivitySignal(sessionKey, sessionsStore) {

return {
found: true,
hasStartedSignal,
hasActivitySignal: jsonlExists || hasTokens || (typeof messageCount === 'number' && messageCount > 0),
messageCount,
jsonlExists,
hasTokens,
updatedAtMs: toTimestampMs(entry.updatedAt),
updatedAtMs,
sessionStartedAtMs,
sessionId: entry.sessionId || null,
};
}

Expand All @@ -385,12 +414,7 @@ function inspectSessionBootstrapFailure(sessionKey, sessionsStore, spawnedAtMs,
}

const ageMs = spawnedAtMs ? Date.now() - spawnedAtMs : Infinity;
if (ageMs < startupGraceMs || ageMs > startupGraceMs * 2) {
return { shouldResolve: false, reason: null, errorMsg: null };
}

const signal = inspectSessionActivitySignal(sessionKey, sessionsStore);
if (signal.hasActivitySignal) {
if (ageMs < startupGraceMs) {
return { shouldResolve: false, reason: null, errorMsg: null };
}

Expand All @@ -403,22 +427,10 @@ function inspectSessionBootstrapFailure(sessionKey, sessionsStore, spawnedAtMs,
};
}

if (signal.messageCount === 0) {
return {
shouldResolve: true,
reason: 'session entered sessions store but never wrote transcript/history',
errorMsg: 'spawn-failure: session entered sessions store but never wrote transcript/history',
};
}

if (signal.updatedAtMs !== null && spawnedAtMs && signal.updatedAtMs <= spawnedAtMs + 5000) {
return {
shouldResolve: true,
reason: 'session entered sessions store but never showed any activity',
errorMsg: 'spawn-failure: session entered sessions store but never showed any activity',
};
}

// A Codex session can enter the sessions store before chat.history, JSONL, or
// token counters are written. Treat that as "still booting"; the watcher and
// job timeout own later failure handling. Only fail fast when the gateway has
// recorded an explicit lane error above.
return { shouldResolve: false, reason: null, errorMsg: null };
}

Expand Down Expand Up @@ -683,7 +695,7 @@ function quoteForSingleQuotedShell(value) {
}

/**
* Schedule a one-shot delivery watcher shell job for a dispatch label.
* Schedule a quick-poll delivery watcher shell job for a dispatch label.
* Used both for the initial watcher registration and SIGTERM handoffs.
*/
function scheduleDeliveryWatcherJob({
Expand All @@ -704,13 +716,19 @@ function scheduleDeliveryWatcherJob({
const watcherTimeoutS = Number(timeoutSeconds) + 120;
const idleThresholdS = Number(idleThresholdSeconds) || 300;
const sq = quoteForSingleQuotedShell;
const watcherCmd = `DISPATCH_LABELS_PATH='${sq(LABELS_PATH)}' '${sq(process.execPath)}' '${sq(watcherPath)}' --label '${sq(label)}' --timeout ${watcherTimeoutS} --poll-interval 20 --idle-threshold ${idleThresholdS}`;
const watcherCmd =
`DISPATCH_LABELS_PATH='${sq(LABELS_PATH)}' ` +
`DISPATCH_INDEX_PATH='${sq(join(__dirname, 'index.mjs'))}' ` +
`'${sq(process.execPath)}' '${sq(watcherPath)}' ` +
`--label '${sq(label)}' --timeout ${watcherTimeoutS} ` +
`--poll-interval 20 --idle-threshold ${idleThresholdS} --once`;

const nowUtc = new Date().toISOString().replace('T', ' ').slice(0, 19);
const jobSpec = {
name: `${agentBrand}-deliver:${label}${nameSuffix}`,
schedule_kind: 'at',
schedule_at: nowUtc,
schedule_kind: 'cron',
schedule_cron: config.deliver_watcher_cron || '* * * * *',
next_run_at: nowUtc,
session_target: 'shell',
payload_kind: 'shellCommand',
payload_message: watcherCmd,
Expand All @@ -720,8 +738,7 @@ function scheduleDeliveryWatcherJob({
delivery_guarantee: 'at-least-once',
ttl_hours: config.deliver_watcher_ttl_hours ?? 48,
overlap_policy: 'skip',
run_timeout_ms: Math.max(watcherTimeoutS, 4 * 3600) * 1000
+ 420 * 1000,
run_timeout_ms: 120_000,
delete_after_run: 1,
origin: origin || 'system',
};
Expand Down Expand Up @@ -1088,9 +1105,10 @@ async function cmdEnqueue(flags) {
}

// -- Register scheduler watcher for delivery ---------------
// Creates a one-shot shell job that runs watcher.mjs (blocks until session
// completes, outputs result). The scheduler's handleDelivery delivers with
// retry, alias resolution, and audit trail in scheduler.db.
// Creates a quick-poll shell job that runs watcher.mjs once per tick. Empty
// stdout means "still running" and advances the next tick without delivery.
// Terminal stdout goes through the scheduler's handleDelivery with retry,
// alias resolution, and audit trail in scheduler.db.
// The watcher is the only final-delivery path for dispatched jobs.
const sq = s => String(s).replace(/'/g, "'\\''");
let schedulerWatcherOk = false;
Expand Down Expand Up @@ -1204,17 +1222,18 @@ async function cmdEnqueue(flags) {

// -- Post-spawn verification (Fix 3) --------------------------------
// Canary: poll sessions.json up to 3 times at 10s intervals to confirm the
// session appeared in the store. Non-fatal -- output is already written above.
// If the session never shows up, stderr gets a loud warning and ledger status
// is set to 'spawn-warning'. The watcher provides the definitive error path.
// session appeared in the store. A session store entry with sessionId or
// startedAt/sessionStartedAt is enough: long first turns may not flush JSONL,
// token counts, or chat.history until the model call completes. The delivery
// watcher owns later completion/failure handling.
const SPAWN_POLL_MAX = 3;
const SPAWN_POLL_DELAY_MS = 10_000;
let spawnConfirmed = false;
for (let spawnPoll = 0; spawnPoll < SPAWN_POLL_MAX; spawnPoll++) {
await sleep(SPAWN_POLL_DELAY_MS);
const spawnStore = readSessionsStore(agent);
const signal = inspectSessionActivitySignal(sessionKey, spawnStore);
if (signal.hasActivitySignal) {
if (signal.hasStartedSignal || signal.hasActivitySignal) {
spawnConfirmed = true;
break;
}
Expand Down Expand Up @@ -1972,7 +1991,7 @@ async function cmdDone(flags) {
// Label was never registered (e.g. direct subagent spawn, not via enqueue).
// This is not an error -- the work completed, the label just wasn't tracked.
process.stderr.write(`[${BRAND}] warn: no session found for label "${label}" -- registering as done\n`);
setLabel(label, { status: 'done', summary, completion, ...(sha ? { sha } : {}) });
setLabelDone(label, { summary, completion, ...(sha ? { sha } : {}) });

// No watcher is polling for this label, so actively notify via the gateway
// post office using delivery config from config.json as fallback target.
Expand Down Expand Up @@ -2001,8 +2020,7 @@ async function cmdDone(flags) {
return;
}

setLabel(label, {
status: 'done',
setLabelDone(label, {
summary,
completion,
...(sha ? { sha } : {}),
Expand Down
Loading