diff --git a/dispatch/completion.mjs b/dispatch/completion.mjs index 92a599a..c7bd218 100644 --- a/dispatch/completion.mjs +++ b/dispatch/completion.mjs @@ -39,6 +39,8 @@ const HUMAN_SUMMARY_SECTION_RE = /(?:^|\n)\s*(?:Human-readable summary|Human sum const TECHNICAL_DETAILS_SECTION_RE = /(?:^|\n)\s*(?:Technical details?|Details(?:_technical)?)\s*:\s*/i; const HUMAN_SUMMARY_LABEL_RE = /^(?:human-readable summary|human summary)\s*:\s*/i; const TECHNICAL_DETAILS_LABEL_RE = /^(?:technical details?|details(?:_technical)?)\s*:\s*/i; +const FINAL_REPORT_HEADING_RE = /^(?:#{1,6}\s*)?(?:root cause|files? changed|changes|validation|tests?(?: run| passed)?|sacrificial(?: delivery)?(?: result)?|deployment(?:\/live-runtime)?(?: step)?|live-runtime(?: step)?|result|results|summary|highlights?|notes?|follow[- ]ups?|next steps?|blockers?|implementation|what changed|verification)\s*:?$/i; +const FINAL_REPORT_CUE_RE = /\b(?:root cause|files? changed|tests? run|validation|sacrificial(?: delivery)?(?: result)?|deployment(?:\/live-runtime)?(?: step)?|live-runtime(?: step)?|final report|human-readable report|files changed|tests passed)\b/i; export function normalizeCompletionText(value) { if (typeof value !== 'string') return null; @@ -76,6 +78,56 @@ function cleanMarkdown(text) { .replace(/^>\s?/gm, ''); } +function normalizeReportLineEndings(text) { + const normalized = normalizeCompletionText(text); + if (!normalized) return null; + return stripAnsi(normalized) + .replace(/\r\n?/g, '\n') + .split('\n') + .map(line => line.replace(/[ \t]+$/g, '')) + .join('\n') + .replace(/\n{3,}/g, '\n\n') + .trim(); +} + +function isLikelyHumanFinalReport(text) { + const normalized = normalizeReportLineEndings(text); + if (!normalized) return false; + if (isGenericOrTrivial(normalized)) return false; + if (isInternalTransportNoiseText(normalized)) return false; + if (looksLikeRawPayloadText(normalized)) return false; + if (looksLikeGunbrokerReport(normalized)) return false; + + const rawLines = normalized + .split('\n') + .map(line => line.trim()) + .filter(Boolean) + .filter(line => !/^```/.test(line)); + if (rawLines.length < 3) return false; + + const cleanedLines = rawLines.map(line => cleanMarkdown(line).replace(/\s+/g, ' ').trim()); + const headingCount = cleanedLines.filter(line => FINAL_REPORT_HEADING_RE.test(line)).length; + const itemCount = rawLines.filter(isItemLine).length; + const hasCue = FINAL_REPORT_CUE_RE.test(normalized); + const hasSectionLabel = /^#{1,6}\s+\S|^[A-Za-z][A-Za-z0-9 /_-]{2,60}:$/m.test(normalized); + + // This is the key path for real completion reports from agents: multiple + // human-readable sections plus bullets. Those reports are already the final + // answer and must not be collapsed into "Files changed: Validation: ...". + if (hasCue && headingCount >= 2 && (itemCount >= 1 || rawLines.length >= 5)) return true; + + // Allow slightly shorter reports with an explicit root cause / validation shape. + if (hasCue && headingCount >= 1 && itemCount >= 2 && hasSectionLabel) return true; + + return false; +} + +function getPassThroughHumanFinalReport(text) { + const normalized = normalizeReportLineEndings(text); + if (!normalized) return null; + return isLikelyHumanFinalReport(normalized) ? normalized : null; +} + function isGenericOrTrivial(text) { const normalized = normalizeCompletionText(text)?.toLowerCase().replace(/\s+/g, ' ').trim(); if (!normalized) return true; @@ -758,6 +810,9 @@ export function humanizeCompletionText(value) { const raw = normalizeCompletionText(value); if (!raw) return null; + const passThroughReport = getPassThroughHumanFinalReport(raw); + if (passThroughReport) return passThroughReport; + const structuredSections = extractStructuredSummarySections(raw); const summarySource = normalizeCompletionText(structuredSections?.summary || raw); if (!summarySource) return null; @@ -894,6 +949,9 @@ export function summarizeCompletionText(value, { skipEmbeddedObject = false } = const raw = normalizeCompletionText(value); if (!raw) return null; + const passThroughReport = getPassThroughHumanFinalReport(raw); + if (passThroughReport) return passThroughReport; + if (!skipEmbeddedObject) { const parsed = extractEmbeddedCompletionObject(raw); if (parsed !== null) { diff --git a/dispatch/index.mjs b/dispatch/index.mjs index 02251e0..1fd9ed2 100644 --- a/dispatch/index.mjs +++ b/dispatch/index.mjs @@ -205,6 +205,19 @@ function setLabel(name, data) { return labels[name]; } +function setLabelDone(name, data) { + const labels = mutateLabels((current) => { + current[name] = { + ...current[name], + ...data, + status: 'done', + updatedAt: new Date().toISOString(), + }; + delete current[name].error; + }); + return labels[name]; +} + // -- Gateway Calls -------------------------------------------- /** @@ -352,7 +365,17 @@ function getSessionJsonlPath(agent = 'main', sessionId) { function inspectSessionActivitySignal(sessionKey, sessionsStore) { if (!sessionKey || !sessionsStore?.[sessionKey]) { - return { found: false, hasActivitySignal: false, messageCount: null, jsonlExists: false, hasTokens: false, updatedAtMs: null }; + return { + found: false, + hasStartedSignal: false, + hasActivitySignal: false, + messageCount: null, + jsonlExists: false, + hasTokens: false, + updatedAtMs: null, + sessionStartedAtMs: null, + sessionId: null, + }; } const agent = agentFromSessionKey(sessionKey) || 'main'; @@ -360,6 +383,9 @@ function inspectSessionActivitySignal(sessionKey, sessionsStore) { const jsonlPath = getSessionJsonlPath(agent, entry.sessionId); const jsonlExists = jsonlPath ? existsSync(jsonlPath) : false; const hasTokens = typeof entry.totalTokens === 'number' && entry.totalTokens > 0; + const sessionStartedAtMs = toTimestampMs(entry.sessionStartedAt || entry.startedAt); + const updatedAtMs = toTimestampMs(entry.updatedAt); + const hasStartedSignal = Boolean(entry.sessionId) || sessionStartedAtMs !== null || updatedAtMs !== null; let messageCount = null; try { @@ -371,11 +397,14 @@ function inspectSessionActivitySignal(sessionKey, sessionsStore) { return { found: true, + hasStartedSignal, hasActivitySignal: jsonlExists || hasTokens || (typeof messageCount === 'number' && messageCount > 0), messageCount, jsonlExists, hasTokens, - updatedAtMs: toTimestampMs(entry.updatedAt), + updatedAtMs, + sessionStartedAtMs, + sessionId: entry.sessionId || null, }; } @@ -385,12 +414,7 @@ function inspectSessionBootstrapFailure(sessionKey, sessionsStore, spawnedAtMs, } const ageMs = spawnedAtMs ? Date.now() - spawnedAtMs : Infinity; - if (ageMs < startupGraceMs || ageMs > startupGraceMs * 2) { - return { shouldResolve: false, reason: null, errorMsg: null }; - } - - const signal = inspectSessionActivitySignal(sessionKey, sessionsStore); - if (signal.hasActivitySignal) { + if (ageMs < startupGraceMs) { return { shouldResolve: false, reason: null, errorMsg: null }; } @@ -403,22 +427,10 @@ function inspectSessionBootstrapFailure(sessionKey, sessionsStore, spawnedAtMs, }; } - if (signal.messageCount === 0) { - return { - shouldResolve: true, - reason: 'session entered sessions store but never wrote transcript/history', - errorMsg: 'spawn-failure: session entered sessions store but never wrote transcript/history', - }; - } - - if (signal.updatedAtMs !== null && spawnedAtMs && signal.updatedAtMs <= spawnedAtMs + 5000) { - return { - shouldResolve: true, - reason: 'session entered sessions store but never showed any activity', - errorMsg: 'spawn-failure: session entered sessions store but never showed any activity', - }; - } - + // A Codex session can enter the sessions store before chat.history, JSONL, or + // token counters are written. Treat that as "still booting"; the watcher and + // job timeout own later failure handling. Only fail fast when the gateway has + // recorded an explicit lane error above. return { shouldResolve: false, reason: null, errorMsg: null }; } @@ -683,7 +695,7 @@ function quoteForSingleQuotedShell(value) { } /** - * Schedule a one-shot delivery watcher shell job for a dispatch label. + * Schedule a quick-poll delivery watcher shell job for a dispatch label. * Used both for the initial watcher registration and SIGTERM handoffs. */ function scheduleDeliveryWatcherJob({ @@ -704,13 +716,19 @@ function scheduleDeliveryWatcherJob({ const watcherTimeoutS = Number(timeoutSeconds) + 120; const idleThresholdS = Number(idleThresholdSeconds) || 300; const sq = quoteForSingleQuotedShell; - const watcherCmd = `DISPATCH_LABELS_PATH='${sq(LABELS_PATH)}' '${sq(process.execPath)}' '${sq(watcherPath)}' --label '${sq(label)}' --timeout ${watcherTimeoutS} --poll-interval 20 --idle-threshold ${idleThresholdS}`; + const watcherCmd = + `DISPATCH_LABELS_PATH='${sq(LABELS_PATH)}' ` + + `DISPATCH_INDEX_PATH='${sq(join(__dirname, 'index.mjs'))}' ` + + `'${sq(process.execPath)}' '${sq(watcherPath)}' ` + + `--label '${sq(label)}' --timeout ${watcherTimeoutS} ` + + `--poll-interval 20 --idle-threshold ${idleThresholdS} --once`; const nowUtc = new Date().toISOString().replace('T', ' ').slice(0, 19); const jobSpec = { name: `${agentBrand}-deliver:${label}${nameSuffix}`, - schedule_kind: 'at', - schedule_at: nowUtc, + schedule_kind: 'cron', + schedule_cron: config.deliver_watcher_cron || '* * * * *', + next_run_at: nowUtc, session_target: 'shell', payload_kind: 'shellCommand', payload_message: watcherCmd, @@ -720,8 +738,7 @@ function scheduleDeliveryWatcherJob({ delivery_guarantee: 'at-least-once', ttl_hours: config.deliver_watcher_ttl_hours ?? 48, overlap_policy: 'skip', - run_timeout_ms: Math.max(watcherTimeoutS, 4 * 3600) * 1000 - + 420 * 1000, + run_timeout_ms: 120_000, delete_after_run: 1, origin: origin || 'system', }; @@ -1088,9 +1105,10 @@ async function cmdEnqueue(flags) { } // -- Register scheduler watcher for delivery --------------- - // Creates a one-shot shell job that runs watcher.mjs (blocks until session - // completes, outputs result). The scheduler's handleDelivery delivers with - // retry, alias resolution, and audit trail in scheduler.db. + // Creates a quick-poll shell job that runs watcher.mjs once per tick. Empty + // stdout means "still running" and advances the next tick without delivery. + // Terminal stdout goes through the scheduler's handleDelivery with retry, + // alias resolution, and audit trail in scheduler.db. // The watcher is the only final-delivery path for dispatched jobs. const sq = s => String(s).replace(/'/g, "'\\''"); let schedulerWatcherOk = false; @@ -1204,9 +1222,10 @@ async function cmdEnqueue(flags) { // -- Post-spawn verification (Fix 3) -------------------------------- // Canary: poll sessions.json up to 3 times at 10s intervals to confirm the - // session appeared in the store. Non-fatal -- output is already written above. - // If the session never shows up, stderr gets a loud warning and ledger status - // is set to 'spawn-warning'. The watcher provides the definitive error path. + // session appeared in the store. A session store entry with sessionId or + // startedAt/sessionStartedAt is enough: long first turns may not flush JSONL, + // token counts, or chat.history until the model call completes. The delivery + // watcher owns later completion/failure handling. const SPAWN_POLL_MAX = 3; const SPAWN_POLL_DELAY_MS = 10_000; let spawnConfirmed = false; @@ -1214,7 +1233,7 @@ async function cmdEnqueue(flags) { await sleep(SPAWN_POLL_DELAY_MS); const spawnStore = readSessionsStore(agent); const signal = inspectSessionActivitySignal(sessionKey, spawnStore); - if (signal.hasActivitySignal) { + if (signal.hasStartedSignal || signal.hasActivitySignal) { spawnConfirmed = true; break; } @@ -1972,7 +1991,7 @@ async function cmdDone(flags) { // Label was never registered (e.g. direct subagent spawn, not via enqueue). // This is not an error -- the work completed, the label just wasn't tracked. process.stderr.write(`[${BRAND}] warn: no session found for label "${label}" -- registering as done\n`); - setLabel(label, { status: 'done', summary, completion, ...(sha ? { sha } : {}) }); + setLabelDone(label, { summary, completion, ...(sha ? { sha } : {}) }); // No watcher is polling for this label, so actively notify via the gateway // post office using delivery config from config.json as fallback target. @@ -2001,8 +2020,7 @@ async function cmdDone(flags) { return; } - setLabel(label, { - status: 'done', + setLabelDone(label, { summary, completion, ...(sha ? { sha } : {}), diff --git a/dispatch/watcher.mjs b/dispatch/watcher.mjs index b2aef76..368161d 100755 --- a/dispatch/watcher.mjs +++ b/dispatch/watcher.mjs @@ -684,6 +684,112 @@ function getJsonlMidTurnReason(sessionId, agentDir = 'main') { return null; // Last assistant entry appears to be a complete text reply -- safe to proceed } +/** + * Check the JSONL tail for a pending tool handoff without requiring recent + * file activity. Long-running tool calls can leave the transcript flat for + * minutes, so stale mtime alone is not enough to declare the agent stuck. + * + * @param {string} sessionId - Internal session UUID + * @param {string} agentDir - Agent directory (default: 'main') + * @returns {string|null} reason string if a tool handoff appears pending + */ +function getJsonlPendingToolReason(sessionId, agentDir = 'main') { + const lastLines = readJsonlLastLines(sessionId, agentDir, 3); + if (!lastLines || lastLines.length === 0) return null; + + const last = lastLines[lastLines.length - 1]; + + if (last?.role === 'assistant') { + const content = Array.isArray(last.content) ? last.content : []; + const toolUse = content.find(c => c?.type === 'tool_use'); + if (toolUse) { + return `last assistant entry has tool_use (${toolUse.name || 'unknown'}) -- awaiting tool result`; + } + if (last.type === 'tool_use') { + return `last entry is tool_use (${last.name || 'unknown'}) -- awaiting tool result`; + } + } + + if (last?.role === 'user') { + const content = Array.isArray(last.content) ? last.content : []; + if (content.some(c => c?.type === 'tool_result')) { + return 'last entry is tool_result (tool executed, awaiting assistant reply)'; + } + } + + if (last?.type === 'tool_result') { + return 'last entry is tool_result (tool executed, awaiting assistant reply)'; + } + + return null; +} + +function parseTimestampMs(value) { + if (!value) return null; + if (typeof value === 'number') { + return Number.isFinite(value) ? value : null; + } + if (value instanceof Date) { + const timestamp = value.getTime(); + return Number.isFinite(timestamp) ? timestamp : null; + } + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : null; +} + +/** + * Detect an agent session that has stopped making progress even though the + * watcher process itself is still alive and writing lastPing. + * + * This closes the failure mode where OpenClaw's Codex app-server retires a + * timed-out turn, but dispatch status keeps reporting "running" because the + * delivery watcher is still polling. + */ +function getRunningSessionStallReason(status, thresholdMs) { + if (!status?.sessionKey) return null; + + const sessionAgent = status.sessionKey.split(':')[1] || 'main'; + const entry = getSessionStoreEntry(status.sessionKey); + if (!entry) return null; + + const sessionId = entry.sessionId || null; + const now = Date.now(); + const activityTimes = [ + parseTimestampMs(entry.updatedAt), + parseTimestampMs(entry.lastActivityAt), + parseTimestampMs(entry.sessionStartedAt), + parseTimestampMs(entry.startedAt), + ].filter(t => typeof t === 'number'); + + const jsonlMtime = sessionId ? getSessionJsonlMtime(sessionId, sessionAgent) : null; + if (typeof jsonlMtime === 'number') activityTimes.push(jsonlMtime); + + if (typeof status?.liveness?.ageMs === 'number' && status.liveness.ageMs < thresholdMs) { + return null; + } + + const lastActivityMs = activityTimes.length ? Math.max(...activityTimes) : null; + if (lastActivityMs !== null && now - lastActivityMs < thresholdMs) { + return null; + } + + const pendingToolReason = sessionId ? getJsonlPendingToolReason(sessionId, sessionAgent) : null; + if (pendingToolReason) { + process.stderr.write( + `[watcher] ${status.label || 'session'} stale telemetry but pending tool handoff detected: ${pendingToolReason}\n` + ); + return null; + } + + const idleMinutes = lastActivityMs === null + ? Math.ceil(thresholdMs / 60000) + : Math.max(1, Math.floor((now - lastActivityMs) / 60000)); + return ( + `agent session stalled: no session/jsonl activity for ~${idleMinutes}min ` + + `while delivery watcher remained alive; likely app-server turn retired or stopped producing events` + ); +} + /** * Read the last assistant entry's stop_reason from the session JSONL. * Returns the stop_reason string (e.g. 'end_turn', 'tool_use') or null if unavailable. @@ -754,6 +860,7 @@ function markLabelError(label, errorSummary) { updateExistingLabel(label, (entry) => { if (entry.status === 'done') return false; entry.status = 'error'; + entry.error = errorSummary || 'failed without result'; entry.summary = errorSummary || 'failed without result'; }); } catch (e) { @@ -761,6 +868,8 @@ function markLabelError(label, errorSummary) { } } +let exitZeroOnTerminal = false; + /** * Format and output the delivery message, then exit 0. * Also marks the label as done in labels.json before exiting. @@ -794,7 +903,7 @@ function deliverResult(label, lastReply, fallbackSummary, completionPayload = nu `**Error:** ${stderr || 'non-zero exit'}\n\n` + `Job marked as \`error\`. The agent may have reported done without completing the actual work.\n` ); - process.exit(1); + process.exit(exitZeroOnTerminal ? 0 : 1); } } } catch (loadErr) { @@ -826,7 +935,7 @@ function deliverResult(label, lastReply, fallbackSummary, completionPayload = nu `⚠️ dispatch [${label}] completed, but no clean user-facing completion was captured. ` + `Internal diagnostics were suppressed; check scheduler run logs for details.\n` ); - process.exit(1); + process.exit(exitZeroOnTerminal ? 0 : 1); } function emitInterruptedOutcome(label, summary, result = null) { @@ -836,12 +945,12 @@ function emitInterruptedOutcome(label, summary, result = null) { `⚠️ dispatch [${label}] session went idle before completing -- work may be incomplete` + `${formatDiagnosticSnippet(result?.diagnosticReply || result?.lastReply || null)}\n` ); - process.exit(1); + process.exit(exitZeroOnTerminal ? 0 : 1); } function emitTimeoutOutcome(label, message, result = null) { process.stdout.write(`${message}${formatDiagnosticSnippet(result?.diagnosticReply || result?.lastReply || null)}\n`); - process.exit(1); + process.exit(exitZeroOnTerminal ? 0 : 1); } // -- Watcher heartbeat interval ref -------------------------------------- @@ -876,6 +985,8 @@ const flags = parseFlags(process.argv.slice(2)); const label = flags.label; const timeoutS = parseInt(flags.timeout || '600', 10); const pollS = parseInt(flags['poll-interval'] || '20', 10); +const once = flags.once === true || flags.once === 'true'; +exitZeroOnTerminal = once; // How long a session must be idle before we proactively check result const IDLE_RESULT_CHECK_MS = 60000; @@ -885,6 +996,144 @@ if (!label) { process.exit(2); } +function touchWatcherPing(label) { + updateExistingLabel(label, (entry) => { + if (entry.status !== 'running') return false; + entry.lastPing = new Date().toISOString(); + }); +} + +function markWatcherPending(label, reason = 'target still running') { + process.stderr.write(`[watcher] WATCHER_PENDING label=${label} reason=${reason}\n`); + process.exit(0); +} + +function clearWatcherRetryAfter(label) { + updateExistingLabel(label, (entry) => { + if (!entry.watcherRetryAfter) return false; + delete entry.watcherRetryAfter; + }); +} + +function handleOnce529(label, errorMsg) { + const labels = loadLabels(); + const entry = labels[label] || {}; + const retryCount = getRetryCount(label); + + if (retryCount >= MAX_529_RETRIES) { + markLabelError(label, `max_retries_exceeded (${retryCount}x 529): ${errorMsg}`); + process.stdout.write( + `🌶️ *dispatch* [${label}] failed after ${MAX_529_RETRIES} retries (529 overload)\n` + + `Error: ${errorMsg}\n` + ); + process.exit(0); + } + + const retryAfterMs = parseTimestampMs(entry.watcherRetryAfter); + if (!retryAfterMs) { + const retryResult = attempt529Retry(label, retryCount, errorMsg); + if (!retryResult.retry) return handleOnce529(label, errorMsg); + updateExistingLabel(label, (current) => { + current.watcherRetryAfter = new Date(Date.now() + retryResult.delayMs).toISOString(); + }); + markWatcherPending(label, `529 retry scheduled for future tick (${retryResult.delayMs / 1000}s)`); + } + + if (Date.now() < retryAfterMs) { + markWatcherPending(label, '529 retry backoff active'); + } + + if (respawnSession(label)) { + clearWatcherRetryAfter(label); + markWatcherPending(label, '529 retry dispatched'); + } + + markLabelError(label, `529 retry failed -- could not respawn session: ${errorMsg}`); + process.stdout.write( + `🌶️ *dispatch* [${label}] 529 retry failed -- could not respawn session\n` + + `Error: ${errorMsg}\n` + ); + process.exit(0); +} + +function runOnceAndExit() { + try { + touchWatcherPing(label); + } catch { + // Best-effort -- a quick-poll tick must not fail because heartbeat metadata raced. + } + + const status = dispatch('status', ['--label', label]); + if (!status?.ok) { + markWatcherPending(label, 'status unavailable'); + } + + if (status.status === 'error') { + const errorMsg = status.error || status.summary || ''; + if (is529Error(errorMsg)) { + handleOnce529(label, errorMsg); + } + } + + if (status.status !== 'running') { + const terminalResult = dispatch('result', ['--label', label]); + const terminalCompletion = terminalResult?.completion || status?.completion || null; + + if (status.status === 'done') { + const currentRetryCount = getRetryCount(label); + if (currentRetryCount > 0) setRetryCount(label, 0); + const gwRetryCount = getGwRestartRetryCount(label); + if (gwRetryCount > 0) setGwRestartRetryCount(label, 0); + deliverResult(label, terminalResult?.lastReply, status.summary, terminalCompletion); + } + + if (status.status === 'interrupted') { + emitInterruptedOutcome(label, status.summary, terminalResult); + } + + const summary = status.error || status.summary || `terminal failure (${status.status || 'unknown'})`; + markLabelError(label, summary); + process.stdout.write(`🌶️ *dispatch* [${label}] failed\nSummary: ${summary}\n`); + process.exit(0); + } + + if (status.sessionKey) { + const entry = getSessionStoreEntry(status.sessionKey); + const sessionId = entry?.sessionId || null; + const sessionAgent = status.sessionKey.split(':')[1] || 'main'; + const terminalJsonlReply = sessionId ? getSessionTerminalReply(sessionId, sessionAgent) : null; + if (sessionId && terminalJsonlReply && isSessionCleanlyFinished(sessionId, sessionAgent)) { + const result = dispatch('result', ['--label', label]); + deliverResult(label, result?.lastReply || terminalJsonlReply, 'completed (stop_reason=end_turn)', result?.completion || null); + } + } + + const ageMs = status.liveness?.ageMs; + if (ageMs != null && ageMs >= IDLE_RESULT_CHECK_MS) { + const result = dispatch('result', ['--label', label]); + if (result?.lastReply || hasCompletionSignal(result?.completion)) { + deliverResult(label, result?.lastReply || null, null, result?.completion || null); + } + + const stallReason = getRunningSessionStallReason(status, IDLE_RESULT_CHECK_MS); + if (stallReason) { + process.stderr.write(`[watcher] [${label}] ${stallReason}\n`); + markLabelError(label, stallReason); + process.stdout.write( + `❌ *dispatch* [${label}] failed\n` + + `Summary: ${stallReason}\n` + ); + process.exit(0); + } + } + + markWatcherPending(label); +} + +if (once) { + runOnceAndExit(); +} + // -- Start heartbeat ----------------------------------------------------- // Write lastPing to labels.json every PING_INTERVAL_MS while the session is // still running. The watchdog guard in index.mjs reads lastPing to know this @@ -1245,6 +1494,17 @@ while (Date.now() < deadline) { if (result?.lastReply || hasCompletionSignal(result?.completion)) { deliverResult(label, result?.lastReply || null, null, result?.completion || null); } + + const stallReason = getRunningSessionStallReason(status, IDLE_RESULT_CHECK_MS); + if (stallReason) { + process.stderr.write(`[watcher] [${label}] ${stallReason}\n`); + markLabelError(label, stallReason); + process.stdout.write( + `❌ *dispatch* [${label}] failed\n` + + `Summary: ${stallReason}\n` + ); + process.exit(1); + } } diff --git a/dispatcher-strategies.js b/dispatcher-strategies.js index 62dce99..de06694 100644 --- a/dispatcher-strategies.js +++ b/dispatcher-strategies.js @@ -1099,6 +1099,11 @@ function isCompletionDeliveryWatcherJob(job) { return /^(?:dispatch|chilisaus)-deliver:/.test(String(job?.name || '')); } +function isCompletionWatcherPendingTick(shellResult) { + return !(shellResult.stdout || '').trim() + && /\bWATCHER_PENDING\b/.test(shellResult.stderr || ''); +} + function buildCompletionWatcherNoPayloadMessage(job, shellResult) { const statusLabel = shellResult.status === 'ok' ? 'completed without a deliverable result' @@ -1147,7 +1152,14 @@ export async function executeShell(job, ctx, deps) { const watcherStdout = (shellResult.stdout || '').trim(); const watcherStderr = (shellResult.stderr || '').trim(); - if (watcherStdout) { + if (isCompletionWatcherPendingTick(shellResult)) { + result.status = 'skipped'; + result.summary = 'Completion delivery watcher pending; target session is still running'; + result.content = ''; + result.errorMessage = null; + result.idemAction = 'release'; + result.skipDelivery = true; + } else if (watcherStdout) { // Completion watcher stdout is the only user-facing contract. Stderr is // diagnostics-only and must never be repackaged as a "successful" final // completion if the watcher suppressed the real payload. diff --git a/test.js b/test.js index 3d9cb8f..6981193 100644 --- a/test.js +++ b/test.js @@ -4982,13 +4982,15 @@ console.log('\n-- Dispatch Spawn Failure Detection --'); assert(indexSrc.includes('Synced as spawn failure'), 'cmdSync can reconcile local bootstrap failures as errors'); assert(indexSrc.includes('never produced transcript/history within'), 'cmdEnqueue escalates silent bootstrap failures'); // post-spawn poll code exists - assert(indexSrc.includes('spawn-warning'), 'Fix 3: legacy spawn-warning marker remains in source for compatibility'); + assert(indexSrc.includes("status: 'error'"), 'Fix 3: post-spawn poll escalates missing startup signal to error'); assert(indexSrc.includes('SPAWN_POLL_MAX'), 'Fix 3: post-spawn poll loop present'); + assert(indexSrc.includes('hasStartedSignal'), 'Fix 3: post-spawn poll accepts session start signal'); + assert(indexSrc.includes('signal.hasStartedSignal || signal.hasActivitySignal'), 'Fix 3: post-spawn poll does not require transcript/history during startup'); - // 2. Status/sync bootstrap reconciliation: session entered sessions store but - // never produced transcript/history, so dispatch should mark it as a spawn failure - // instead of an interrupted idle session. Use an age inside the bootstrap - // reconciliation window (> startupGrace, < startupGrace * 2). + // 2. Status/sync bootstrap reconciliation: a Codex session can enter + // sessions.json before chat.history, JSONL, or token counters are written. + // PR #10 keeps that state running unless the gateway lane recorded an + // explicit startup error; the watcher/job timeout owns later failure handling. const bootstrapDir = mkdtempSync(join(tmpdir(), 'dispatch-bootstrap-')); const bootstrapHome = join(bootstrapDir, 'home'); const bootstrapBin = join(bootstrapDir, 'bin'); @@ -5048,10 +5050,10 @@ process.exit(1); }); const statusJson = JSON.parse(statusOut); const statusLabels = JSON.parse(readFileSync(bootstrapStatusLabels, 'utf8')); - assert(statusJson.status === 'error', 'bootstrap status: auto-resolves local startup failure to error'); - assert((statusJson.syncAction || '').includes('spawn failure'), 'bootstrap status: reports spawn-failure syncAction'); - assert((statusJson.error || '').includes('spawn-failure: session entered sessions store but never wrote transcript/history'), 'bootstrap status: preserves spawn-failure diagnostic'); - assert(statusLabels['bootstrap-status']?.status === 'error', 'bootstrap status: labels.json updated to error'); + assert(statusJson.status === 'running', 'bootstrap status: session with no transcript remains running while booting'); + assert(!statusJson.syncAction, 'bootstrap status: no spawn-failure syncAction without explicit lane error'); + assert(!statusJson.error, 'bootstrap status: no spawn-failure diagnostic without explicit lane error'); + assert(statusLabels['bootstrap-status']?.status === 'running', 'bootstrap status: labels.json remains running'); const bootstrapSyncLabels = join(bootstrapDir, 'labels-sync.json'); writeFileSync(bootstrapSyncLabels, JSON.stringify({ @@ -5077,10 +5079,10 @@ process.exit(1); }); const syncJson = JSON.parse(syncOut); const syncLabels = JSON.parse(readFileSync(bootstrapSyncLabels, 'utf8')); - assert(syncJson.changes === 1, 'bootstrap sync: reports one reconciled label'); - assert(syncJson.details?.[0]?.to === 'error', 'bootstrap sync: reconciles running label to error'); - assert(syncLabels['bootstrap-sync']?.status === 'error', 'bootstrap sync: labels.json updated to error'); - assert((syncLabels['bootstrap-sync']?.summary || '').includes('Synced as spawn failure'), 'bootstrap sync: stores spawn-failure summary'); + assert(syncJson.changes === 0, 'bootstrap sync: does not reconcile booting session without explicit lane error'); + assert((syncJson.details || []).length === 0, 'bootstrap sync: reports no bootstrap failure detail'); + assert(syncLabels['bootstrap-sync']?.status === 'running', 'bootstrap sync: labels.json remains running'); + assert(!(syncLabels['bootstrap-sync']?.summary || '').includes('Synced as spawn failure'), 'bootstrap sync: stores no spawn-failure summary'); // 3. Spawn-failure path: watcher exits non-zero when session never appears in gateway // We use a mock dispatch that always returns status=done with liveness error, @@ -6162,16 +6164,18 @@ if (sub === 'status') { rmSync(tmpDir, { recursive: true, force: true }); } -// -- Watcher run_timeout_ms covers MAX_DEADLINE_EXTENSION -- -console.log('\n-- Watcher run_timeout_ms ceiling --'); +// -- Watcher jobs quick-poll instead of blocking dispatcher -- +console.log('\n-- Watcher quick-poll scheduler jobs --'); { const indexSrc = readFileSync( join(dirname(fileURLToPath(import.meta.url)), 'dispatch', 'index.mjs'), 'utf8' ); - // The run_timeout_ms formula must use Math.max to cover the rolling extension cap - assert(indexSrc.includes('Math.max(watcherTimeoutS, 4 * 3600)'), 'run_timeout_ms: uses Math.max to cover MAX_DEADLINE_EXTENSION (4h) ceiling'); - assert(indexSrc.includes('420 * 1000'), 'run_timeout_ms: includes 7min headroom (2*FLAT_WINDOW + slop)'); + assert(indexSrc.includes('--once'), 'watcher job: invokes watcher.mjs in one-shot quick-poll mode'); + assert(indexSrc.includes("schedule_kind: 'cron'"), 'watcher job: uses cron schedule, not long-running at-job'); + assert(indexSrc.includes("schedule_cron: config.deliver_watcher_cron || '* * * * *'"), 'watcher job: polls every minute by default'); + assert(indexSrc.includes('next_run_at: nowUtc'), 'watcher job: first quick-poll tick is due immediately'); + assert(indexSrc.includes('run_timeout_ms: 120_000'), 'watcher job: has a short per-tick timeout'); } // =========================================================== @@ -6375,6 +6379,60 @@ console.log('\n-- Completion payload helpers --'); assert(alreadyHumanTechnical.deliveryText === 'Updated the completion watcher so structured summaries survive handoff and users now get one clean final report.', 'completion helper: already-human technical summary passes through unchanged'); assert(!alreadyHumanTechnical.deliveryText.includes('Technical details:'), 'completion helper: already-human technical summary does not grow a technical footer'); + const observedBadCaseReport = [ + 'Implemented root cause fix for chilisaus completion delivery:', + '', + 'Files changed:', + '- watcher-delivery.mjs: added plain-human completion summary detection.', + '- tests/watcher-delivery.test.mjs: added regression coverage for a >3-line section/bullet human summary delivered via done.', + '- tests/watcher.test.mjs: aligned existing failure-path assertions with the watcher exit-code contract.', + '', + 'Validation:', + '- npm test -- watcher-delivery.test.mjs', + '- npm test -- watcher.test.mjs', + '', + 'Highlights:', + '- The final agent report is now treated as authoritative human text.', + '- Fallback synthesized summaries are only used when no actual final report exists.', + ].join('\n'); + const observedBadCasePayload = buildTerminalCompletionPayload({ + summary: observedBadCaseReport, + checklist: { work_complete: true, tests_passed: true }, + }); + assert(observedBadCasePayload.summary_human === observedBadCaseReport, 'completion helper: multi-section done summary is preserved before delivery'); + const observedBadCaseDelivery = resolveCompletionDelivery({ + lastReply: null, + completion: observedBadCasePayload, + fallbackSummary: 'completed (agent signal)', + }); + assert(observedBadCaseDelivery.deliveryText === observedBadCaseReport, 'completion helper: exact observed section/bullet report is passed through'); + assert(!observedBadCaseDelivery.deliveryText.includes('Files changed: Validation:'), 'completion helper: section headings are not collapsed into fragments'); + assert(!observedBadCaseDelivery.deliveryText.includes('Highlights: watcher-delivery.mjs'), 'completion helper: bullet details are not rewritten as synthetic highlights'); + + const terminalHumanReport = [ + 'Root cause:', + '- The watcher was summarizing the final assistant report instead of delivering it.', + '', + 'Tests run:', + '- npm test', + ].join('\n'); + const terminalHumanDelivery = resolveCompletionDelivery({ + lastReply: terminalHumanReport, + completion: null, + fallbackSummary: null, + }); + assert(terminalHumanDelivery.deliveryText === terminalHumanReport, 'completion helper: terminal assistant final report is preserved without synthesis'); + + const technicalMetadataOnly = resolveCompletionDelivery({ + lastReply: null, + completion: { + summary: 'completed (agent signal)', + checklist: { work_complete: true, tests_passed: true }, + }, + fallbackSummary: 'completed (agent signal)', + }); + assert(technicalMetadataOnly.source === 'technical-synthesis', 'completion helper: synthesized fallback is explicit when no human final report exists'); + const technicalCommitSummary = 'fix(dispatch): normalize completion delivery; add watcher tests; preserve structured completion summary'; const humanizedTechnicalPayload = buildTerminalCompletionPayload({ summary: technicalCommitSummary, diff --git a/tests/delivery-fixes.test.mjs b/tests/delivery-fixes.test.mjs index c4950b5..d7a8457 100644 --- a/tests/delivery-fixes.test.mjs +++ b/tests/delivery-fixes.test.mjs @@ -1,7 +1,7 @@ import test from 'node:test'; import assert from 'node:assert/strict'; import { spawnSync } from 'node:child_process'; -import { mkdtempSync, readFileSync, rmSync } from 'node:fs'; +import { mkdirSync, mkdtempSync, readFileSync, rmSync, utimesSync, writeFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -120,6 +120,16 @@ test('dispatch watchdog checks terminal result output, not stuck-list stdout', ( assert.match(dispatchIndexSrc, /result --label/); }); +test('delivery watcher jobs are non-blocking cron quick-poll jobs', () => { + const dispatchIndexSrc = readFileSync(join(__dirname, '..', 'dispatch', 'index.mjs'), 'utf8'); + assert.match(dispatchIndexSrc, /--once/); + assert.match(dispatchIndexSrc, /schedule_kind:\s+'cron'/); + assert.match(dispatchIndexSrc, /schedule_cron:\s+config\.deliver_watcher_cron \|\| '\* \* \* \* \*'/); + assert.match(dispatchIndexSrc, /next_run_at:\s+nowUtc/); + assert.match(dispatchIndexSrc, /run_timeout_ms:\s+120_000/); + assert.doesNotMatch(dispatchIndexSrc, /schedule_kind:\s+'at'[\s\S]{0,400}watcherCmd/); +}); + test('main fire-and-forget delivery instructions use the scheduler post office, not the message tool', async () => { const prompts = []; @@ -231,6 +241,191 @@ test('completion watcher stderr-only success is treated as delivery failure, not assert.ok(logs.some(entry => entry.level === 'warn' && /no deliverable stdout/.test(entry.msg))); }); +test('completion watcher pending quick-poll tick is skipped without delivery failure', async () => { + const logs = []; + const result = await executeShell({ + id: 'job-deliver-pending', + name: 'chilisaus-deliver:pending-result', + payload_message: 'node watcher.mjs --once', + delivery_mode: 'announce-always', + run_timeout_ms: 30_000, + }, { run: { id: 'run-deliver-pending' } }, { + runShellCommand: async () => ({ + stdout: '', + stderr: '[watcher] WATCHER_PENDING label=pending-result reason=target still running', + error: null, + }), + normalizeShellResult: (shellExec) => ({ + status: 'ok', + exitCode: 0, + signal: null, + timedOut: false, + stdout: shellExec.stdout, + stderr: shellExec.stderr, + stdoutPath: null, + stderrPath: null, + stdoutBytes: 0, + stderrBytes: shellExec.stderr.length, + stdoutTruncated: false, + stderrTruncated: false, + summary: `stderr:\n${shellExec.stderr}`, + deliveryText: `stderr:\n${shellExec.stderr}`, + imageAttachments: [], + errorMessage: null, + contextSummary: {}, + }), + log: (level, msg, meta) => logs.push({ level, msg, meta }), + }); + + assert.equal(result.status, 'skipped'); + assert.equal(result.skipDelivery, true); + assert.equal(result.deliveryOverride, null); + assert.equal(result.idemAction, 'release'); + assert.equal(logs.some(entry => /no deliverable stdout/.test(entry.msg)), false); +}); + +test('watcher --once exits quickly while target session is incomplete', () => { + const tempDir = mkdtempSync(join(tmpdir(), 'watcher-once-incomplete-')); + const labelsPath = join(tempDir, 'labels.json'); + const mockDispatch = join(tempDir, 'mock-dispatch.mjs'); + const label = 'quick-poll-incomplete'; + const sessionKey = 'agent:main:subagent:quick-poll'; + const watcherPath = join(__dirname, '..', 'dispatch', 'watcher.mjs'); + + try { + writeFileSync(labelsPath, JSON.stringify({ + [label]: { + sessionKey, + status: 'running', + agent: 'main', + spawnedAt: new Date().toISOString(), + timeoutSeconds: 600, + }, + }) + '\n'); + writeFileSync(mockDispatch, ` +const sub = process.argv[2]; +if (sub === 'status') { + process.stdout.write(JSON.stringify({ + ok: true, + label: ${JSON.stringify(label)}, + status: 'running', + sessionKey: ${JSON.stringify(sessionKey)}, + agent: 'main', + liveness: { ageMs: 5000 } + }) + '\\n'); +} else if (sub === 'result') { + process.stdout.write(JSON.stringify({ ok: true, status: 'running' }) + '\\n'); +} else { + process.stdout.write(JSON.stringify({ ok: true }) + '\\n'); +} +`); + + const started = Date.now(); + const run = spawnSync(process.execPath, [ + watcherPath, '--label', label, '--timeout', '600', '--poll-interval', '20', '--once', + ], { + env: { + ...process.env, + HOME: tempDir, + DISPATCH_INDEX_PATH: mockDispatch, + DISPATCH_LABELS_PATH: labelsPath, + OPENCLAW_SCHEDULER_NOTIFY_DISABLED: '1', + }, + encoding: 'utf8', + timeout: 5000, + }); + const elapsedMs = Date.now() - started; + const labels = JSON.parse(readFileSync(labelsPath, 'utf8')); + + assert.equal(run.status, 0); + assert.equal((run.stdout || '').trim(), ''); + assert.match(run.stderr || '', /WATCHER_PENDING/); + assert.ok(elapsedMs < 2000, `watcher --once should exit quickly, elapsed=${elapsedMs}ms`); + assert.equal(labels[label].status, 'running'); + assert.ok(labels[label].lastPing, 'watcher --once records one lastPing'); + } finally { + rmSync(tempDir, { recursive: true, force: true }); + } +}); + +test('watcher --once detects stale sessions despite fresh watcher lastPing', () => { + const tempDir = mkdtempSync(join(tmpdir(), 'watcher-once-stale-')); + const labelsPath = join(tempDir, 'labels.json'); + const mockDispatch = join(tempDir, 'mock-dispatch.mjs'); + const label = 'quick-poll-stale'; + const sessionKey = 'agent:main:subagent:stale-session'; + const sessionId = 'stale-jsonl-id'; + const watcherPath = join(__dirname, '..', 'dispatch', 'watcher.mjs'); + const sessionsDir = join(tempDir, '.openclaw', 'agents', 'main', 'sessions'); + + try { + mkdirSync(sessionsDir, { recursive: true }); + writeFileSync(join(sessionsDir, 'sessions.json'), JSON.stringify({ + [sessionKey]: { + sessionId, + updatedAt: new Date(Date.now() - 77 * 60 * 1000).toISOString(), + model: 'test', + }, + }) + '\n'); + const jsonlPath = join(sessionsDir, `${sessionId}.jsonl`); + writeFileSync(jsonlPath, JSON.stringify({ + role: 'assistant', + content: [{ type: 'text', text: 'Still working.' }], + }) + '\n'); + const staleDate = new Date(Date.now() - 77 * 60 * 1000); + utimesSync(jsonlPath, staleDate, staleDate); + writeFileSync(labelsPath, JSON.stringify({ + [label]: { + sessionKey, + status: 'running', + agent: 'main', + spawnedAt: new Date(Date.now() - 90 * 60 * 1000).toISOString(), + timeoutSeconds: 7200, + lastPing: new Date().toISOString(), + }, + }) + '\n'); + writeFileSync(mockDispatch, ` +const sub = process.argv[2]; +if (sub === 'status') { + process.stdout.write(JSON.stringify({ + ok: true, + label: ${JSON.stringify(label)}, + status: 'running', + sessionKey: ${JSON.stringify(sessionKey)}, + agent: 'main', + liveness: { ageMs: ${77 * 60 * 1000} } + }) + '\\n'); +} else if (sub === 'result') { + process.stdout.write(JSON.stringify({ ok: true, status: 'running' }) + '\\n'); +} else { + process.stdout.write(JSON.stringify({ ok: true }) + '\\n'); +} +`); + + const run = spawnSync(process.execPath, [ + watcherPath, '--label', label, '--timeout', '7200', '--poll-interval', '20', '--once', + ], { + env: { + ...process.env, + HOME: tempDir, + DISPATCH_INDEX_PATH: mockDispatch, + DISPATCH_LABELS_PATH: labelsPath, + OPENCLAW_SCHEDULER_NOTIFY_DISABLED: '1', + }, + encoding: 'utf8', + timeout: 5000, + }); + const labels = JSON.parse(readFileSync(labelsPath, 'utf8')); + + assert.equal(run.status, 0); + assert.match(run.stdout || '', /agent session stalled/); + assert.equal(labels[label].status, 'error'); + assert.match(labels[label].error, /agent session stalled/); + } finally { + rmSync(tempDir, { recursive: true, force: true }); + } +}); + test('messages send accepts channel and delivery-to overrides for durable delivery', async (t) => { const tempDir = mkdtempSync(join(tmpdir(), 'openclaw-scheduler-test-')); const dbPath = join(tempDir, 'scheduler.sqlite');