From 323f9904b76de0ae3a091a48ce40027fa81b1889 Mon Sep 17 00:00:00 2001 From: "Claw (AINYC Agent)" Date: Thu, 4 Jun 2026 20:36:08 +0000 Subject: [PATCH] fix(traffic): bound the Vercel sync drain so a dense/slow window can't wedge a source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A drifted watermark or a dense/slow request-logs window made the synchronous Vercel sync drain run for many minutes — timing out the caller and leaving the run stuck 'running', the source ingesting nothing until a manual reset. Two bounds on the incremental Vercel sync, on top of the existing per-fetch 30s timeout + retry (which never bounded the TOTAL drain): - Window cap (VERCEL_MAX_SYNC_WINDOW_MS = 24h): clamp the start forward so a watermark that drifted past the cap can't request a multi-day pull. The skipped span is surfaced (warn), not silent — a backfill recovers it. - Drain wall-clock deadline (DEFAULT_VERCEL_SYNC_DEADLINE_MS = 4m, override via vercelSyncDeadlineMs): the adaptive drain stops before a sub-window once the budget elapses and reports how far it got. The route commits that partial window and advances lastSyncedAt only to there (the additive rollup makes a partial window safe), so the next sync resumes from the boundary instead of one sync grinding unbounded. If nothing drained before the budget the run fails (visible) instead of orphaning a 'running' row. No API surface change (internal options only), so no SDK regen. 4.70.0 -> 4.70.1. Co-Authored-By: Claude Opus 4.8 --- package.json | 2 +- packages/api-routes/AGENTS.md | 2 +- packages/api-routes/src/index.ts | 3 + packages/api-routes/src/traffic.ts | 94 +++++++++++++++++-- packages/api-routes/test/traffic.test.ts | 80 ++++++++++++++++ packages/canonry/package.json | 2 +- packages/integration-vercel/AGENTS.md | 12 ++- packages/integration-vercel/src/drain.ts | 48 +++++++++- .../integration-vercel/test/drain.test.ts | 53 +++++++++++ 9 files changed, 281 insertions(+), 15 deletions(-) diff --git a/package.json b/package.json index 580d1b7d..9db7210f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "canonry", "private": true, - "version": "4.70.0", + "version": "4.70.1", "type": "module", "packageManager": "pnpm@10.28.2", "scripts": { diff --git a/packages/api-routes/AGENTS.md b/packages/api-routes/AGENTS.md index 1f63375c..0130c512 100644 --- a/packages/api-routes/AGENTS.md +++ b/packages/api-routes/AGENTS.md @@ -25,7 +25,7 @@ Shared Fastify route plugins used by both the local server (`packages/canonry`) | `src/report.ts` | `GET /projects/:name/report` (JSON DTO) and `GET /projects/:name/report.html` (standalone downloadable HTML) — aggregated client-facing AEO report bundle (13 sections) | | `src/report-renderer.ts` | `renderReportHtml(report)` — server-side HTML renderer with inline SVG charts and inline CSS, re-exported from `@ainyc/canonry-api-routes` for the CLI | | `src/wordpress.ts` | WordPress integration routes | -| `src/traffic.ts` | Server-side traffic ingestion routes — `POST /traffic/connect/cloud-run`, `POST /traffic/connect/wordpress`, `POST /traffic/connect/vercel` (Vercel connect seeds `lastSyncedAt = NOW` so the first scheduled sync uses a tight window — leaving it null would fall back to `DEFAULT_SYNC_WINDOW_MINUTES = 30 days`, which exceeds Vercel `request-logs` retention (~14d) and would make every first sync throw a retention error — and **auto-creates the project's `traffic-sync` schedule** (`*/30 * * * *`, idempotent via the unique `(project, kind)` index, registered with the live scheduler through `onScheduleUpdated`) in the same transaction as the source upsert, so the source actually keeps syncing without a manual `schedule set` step: seeding `lastSyncedAt = NOW` only keeps the FIRST window tight, and the schedule is what stops the watermark drifting into an unbounded — wedging — pull on a later trigger), `POST /traffic/sources/:id/sync`, `POST /traffic/sources/:id/backfill` (async — returns `{ runId, status: "running" }` immediately, background task replaces rollup buckets + sample slice in the window inside one transaction, days clamped to `MAX_BACKFILL_DAYS=30` to match Cloud Logging `_Default` retention, `lastSyncedAt` only advances forward so backfill never undoes incremental progress; supports `cloud-run`, `wordpress`, and `vercel` source types), `POST /traffic/sources/:id/reset` (operator recovery: requires `{ advanceToNow: true }` — advances `lastSyncedAt` to NOW, sets `status` back to `connected`, clears `last_error`; used when an idle source has aged past the upstream retention boundary and every sync now throws), plus reads: `GET /traffic/sources` (list non-archived), `GET /traffic/status` (composite of detail-per-source — single call powering `canonry traffic status`), `GET /traffic/sources/:id` (detail + last-24h totals + latest run, run filtered by `runs.source_id` so multi-source is correct), `GET /traffic/events` (windowed crawler / ai-referral rollups, defaults to last 24h, totals reflect the full window even when `limit` truncates). Credentials resolved through injected stores (`cloudRunCredentialStore`, `wordpressTrafficCredentialStore`, `vercelTrafficCredentialStore`); the per-adapter pull functions and access-token resolver are also injectable for tests. Upstream/auth failures throw `providerError()` (502) so CLI exit codes signal system errors. **Sync dispatcher:** the sync route resolves the source row, sets up the run + shared error path, then branches by `sourceType`. Cloud Run uses a clamped time window (`startTime`/`endTime` + `lastSyncedAt` clamp); WordPress pages through the plugin's opaque `next_cursor` driven by the response's `hasMore` flag, persisting the final cursor to `traffic_sources.last_cursor` inside the same transaction as the rollup writes; Vercel uses a clamped time window like Cloud Run but the `request-logs` endpoint paginates by page number with no resumable cursor — so a Vercel sync drains the whole window in one pass via a generous `DEFAULT_VERCEL_MAX_PAGES=50` budget and **fails loudly (never advances `lastSyncedAt`) if the adapter still reports `hasMore`**, so a partially-drained window is retried rather than silently skipped. Dedupe + rollup + telemetry are shared across all three branches. **Backfill dispatcher:** the backfill route mirrors the same shape — `runBackfillTask` is adapter-agnostic and takes an injected `pullForBackfill: () => Promise` closure plus a `pullErrorPrefix` string so error attribution stays specific. The route handler validates credentials per `sourceType` up-front, then builds the closure: Cloud Run pulls a single `[startTime, endTime]` window via the Cloud Logging API; WordPress pages through the plugin's `[since, until)` window via opaque cursor; Vercel pulls the `[windowStart, windowEnd]` window with the large `BACKFILL_MAX_PAGES` budget — replace mode, so a budget exhaustion (`hasMore` still true) fails the run loudly rather than wiping the window's rollups and leaving a partial set. All reuse the shared replace-mode rollup transaction and the `lastSyncedAt`-never-rewinds invariant. **Cross-sync dedupe:** for Cloud Run and Vercel, `lastSyncedAt` clamps the fetch window forward to avoid wholesale re-pulls; the boundary second is then deduped via `traffic_sources.last_event_ids` (bounded ring buffer of `MAX_TRACKED_EVENT_IDS=1000` normalized event IDs from prior syncs, persisted inside the same transaction as the rollup writes). New sync IDs are prepended to retained previous IDs so a dup that re-appears across multiple subsequent syncs stays deduped. WordPress reuses the same ring-buffer logic for plugin-side cursor-boundary re-emissions. | +| `src/traffic.ts` | Server-side traffic ingestion routes — `POST /traffic/connect/cloud-run`, `POST /traffic/connect/wordpress`, `POST /traffic/connect/vercel` (Vercel connect seeds `lastSyncedAt = NOW` so the first scheduled sync uses a tight window — leaving it null would fall back to `DEFAULT_SYNC_WINDOW_MINUTES = 30 days`, which exceeds Vercel `request-logs` retention (~14d) and would make every first sync throw a retention error — and **auto-creates the project's `traffic-sync` schedule** (`*/30 * * * *`, idempotent via the unique `(project, kind)` index, registered with the live scheduler through `onScheduleUpdated`) in the same transaction as the source upsert, so the source actually keeps syncing without a manual `schedule set` step: seeding `lastSyncedAt = NOW` only keeps the FIRST window tight, and the schedule is what stops the watermark drifting into an unbounded — wedging — pull on a later trigger), `POST /traffic/sources/:id/sync`, `POST /traffic/sources/:id/backfill` (async — returns `{ runId, status: "running" }` immediately, background task replaces rollup buckets + sample slice in the window inside one transaction, days clamped to `MAX_BACKFILL_DAYS=30` to match Cloud Logging `_Default` retention, `lastSyncedAt` only advances forward so backfill never undoes incremental progress; supports `cloud-run`, `wordpress`, and `vercel` source types), `POST /traffic/sources/:id/reset` (operator recovery: requires `{ advanceToNow: true }` — advances `lastSyncedAt` to NOW, sets `status` back to `connected`, clears `last_error`; used when an idle source has aged past the upstream retention boundary and every sync now throws), plus reads: `GET /traffic/sources` (list non-archived), `GET /traffic/status` (composite of detail-per-source — single call powering `canonry traffic status`), `GET /traffic/sources/:id` (detail + last-24h totals + latest run, run filtered by `runs.source_id` so multi-source is correct), `GET /traffic/events` (windowed crawler / ai-referral rollups, defaults to last 24h, totals reflect the full window even when `limit` truncates). Credentials resolved through injected stores (`cloudRunCredentialStore`, `wordpressTrafficCredentialStore`, `vercelTrafficCredentialStore`); the per-adapter pull functions and access-token resolver are also injectable for tests. Upstream/auth failures throw `providerError()` (502) so CLI exit codes signal system errors. **Sync dispatcher:** the sync route resolves the source row, sets up the run + shared error path, then branches by `sourceType`. Cloud Run uses a clamped time window (`startTime`/`endTime` + `lastSyncedAt` clamp); WordPress pages through the plugin's opaque `next_cursor` driven by the response's `hasMore` flag, persisting the final cursor to `traffic_sources.last_cursor` inside the same transaction as the rollup writes; Vercel uses a clamped time window like Cloud Run but the `request-logs` endpoint paginates by page number with no resumable cursor, so the window is drained in adaptive time sub-windows (`drainVercelTrafficEvents`, `DEFAULT_VERCEL_MAX_PAGES=50` per sub-window). Two bounds keep a dense or drifted window from wedging the synchronous sync: **(1)** the start is capped to at most `VERCEL_MAX_SYNC_WINDOW_MS=24h` before the sync instant — a watermark that drifted further is clamped forward and the skipped span is surfaced via `warn` (a backfill recovers it); **(2)** the drain runs under a wall-clock budget (`DEFAULT_VERCEL_SYNC_DEADLINE_MS=4m`, override `vercelSyncDeadlineMs`) — on the budget it stops and the route commits the partial window and advances `lastSyncedAt` **only to where it drained** (the additive rollup makes a partial window safe), so the next sync resumes from there instead of one sync grinding for many minutes; if nothing drained before the budget the run **fails (visible)** rather than orphaning a `running` row. Retention is still enforced: if the drain can only serve a clamped tail it fails so `lastSyncedAt` never advances across missing history. Dedupe + rollup + telemetry are shared across all three branches. **Backfill dispatcher:** the backfill route mirrors the same shape — `runBackfillTask` is adapter-agnostic and takes an injected `pullForBackfill: () => Promise` closure plus a `pullErrorPrefix` string so error attribution stays specific. The route handler validates credentials per `sourceType` up-front, then builds the closure: Cloud Run pulls a single `[startTime, endTime]` window via the Cloud Logging API; WordPress pages through the plugin's `[since, until)` window via opaque cursor; Vercel pulls the `[windowStart, windowEnd]` window with the large `BACKFILL_MAX_PAGES` budget — replace mode, so a budget exhaustion (`hasMore` still true) fails the run loudly rather than wiping the window's rollups and leaving a partial set. All reuse the shared replace-mode rollup transaction and the `lastSyncedAt`-never-rewinds invariant. **Cross-sync dedupe:** for Cloud Run and Vercel, `lastSyncedAt` clamps the fetch window forward to avoid wholesale re-pulls; the boundary second is then deduped via `traffic_sources.last_event_ids` (bounded ring buffer of `MAX_TRACKED_EVENT_IDS=1000` normalized event IDs from prior syncs, persisted inside the same transaction as the rollup writes). New sync IDs are prepended to retained previous IDs so a dup that re-appears across multiple subsequent syncs stays deduped. WordPress reuses the same ring-buffer logic for plugin-side cursor-boundary re-emissions. | | `src/backlinks.ts` | Backlinks (Common Crawl sync + per-project extract/summary/domains/history) routes | | `src/doctor.ts` | `GET /doctor` and `GET /projects/:name/doctor` — runs check registry, returns `DoctorReport` | | `src/doctor/registry.ts` | `ALL_CHECKS` — single source of truth for the doctor check catalog | diff --git a/packages/api-routes/src/index.ts b/packages/api-routes/src/index.ts index 2d0cad6d..7909251d 100644 --- a/packages/api-routes/src/index.ts +++ b/packages/api-routes/src/index.ts @@ -154,6 +154,8 @@ export interface ApiRoutesOptions { vercelTrafficCredentialStore?: TrafficRoutesOptions['vercelTrafficCredentialStore'] /** Override Vercel traffic pull (tests) — see `TrafficRoutesOptions` */ pullVercelTrafficEvents?: TrafficRoutesOptions['pullVercelTrafficEvents'] + /** Wall-clock budget (ms) for a Vercel sync's adaptive drain — see `TrafficRoutesOptions` */ + vercelSyncDeadlineMs?: TrafficRoutesOptions['vercelSyncDeadlineMs'] /** Fired after every traffic sync (success OR failure). Used by canonry to emit `traffic.synced` telemetry. */ onTrafficSynced?: TrafficRoutesOptions['onTrafficSynced'] /** Discovery feature callback — fires after a discovery_sessions row + matching runs row are inserted. */ @@ -384,6 +386,7 @@ export async function apiRoutes(app: FastifyInstance, opts: ApiRoutesOptions) { pullWordpressTrafficEvents: opts.pullWordpressTrafficEvents, vercelTrafficCredentialStore: opts.vercelTrafficCredentialStore, pullVercelTrafficEvents: opts.pullVercelTrafficEvents, + vercelSyncDeadlineMs: opts.vercelSyncDeadlineMs, onTrafficSynced: opts.onTrafficSynced, onScheduleUpdated: opts.onScheduleUpdated, allowLoopbackWebhooks: opts.allowLoopbackWebhooks, diff --git a/packages/api-routes/src/traffic.ts b/packages/api-routes/src/traffic.ts index 980b7490..7ea3c22b 100644 --- a/packages/api-routes/src/traffic.ts +++ b/packages/api-routes/src/traffic.ts @@ -205,6 +205,14 @@ export interface TrafficRoutesOptions { defaultWordpressMaxPages?: number /** Cap on the number of raw_event_samples written per sync. */ defaultSampleLimit?: number + /** + * Wall-clock budget (ms) for a single incremental Vercel sync's adaptive + * drain. On hit the drain stops and the route commits the partial window + + * advances `lastSyncedAt` to where it reached, so a dense or slow window can't + * run unbounded. Defaults to `DEFAULT_VERCEL_SYNC_DEADLINE_MS`; tests pass a + * tiny value to exercise the deadline path. + */ + vercelSyncDeadlineMs?: number /** Fire-and-forget hook called after every sync completes (success OR failure). Used by canonry to emit telemetry. */ onTrafficSynced?: (event: TrafficSyncedEvent) => void /** @@ -245,6 +253,21 @@ const DEFAULT_VERCEL_MAX_PAGES = 50 // gives up. This bounds provider calls for pathological windows while still // leaving room for bursty minutes to drain through one-second slices. const VERCEL_MAX_SUB_WINDOWS = 5_000 +// Cap how far back a single incremental Vercel sync reaches. A watermark that +// has drifted — the source idled while its schedule was paused or missing — +// would otherwise request a pull back to DEFAULT_SYNC_WINDOW_MINUTES (30 days) +// and make the adaptive drain grind through days of sub-windows on one sync. +// Clamp the start to at most this far before the sync instant; the skipped +// pre-cap span is surfaced (warn), never silently dropped — a backfill recovers +// it. The per-sync deadline below bounds the drain even within this window. +const VERCEL_MAX_SYNC_WINDOW_MS = 24 * 60 * 60_000 +// Wall-clock budget for a single incremental Vercel sync's adaptive drain. The +// drain checks this before each sub-window pull; on hit it stops and reports how +// far it got, and the route commits that partial window + advances `lastSyncedAt` +// to it. Without this bound a dense or slow window runs for many minutes — timing +// out the caller and leaving an orphaned 'running' run. Override via +// `vercelSyncDeadlineMs`; a fully-drained window never approaches it. +const DEFAULT_VERCEL_SYNC_DEADLINE_MS = 4 * 60_000 // Vercel request-logs uses page-number pagination inside a fixed time window. // Backfill large ranges as independent hour chunks so each chunk gets the full // adaptive sub-window budget and one dense hour cannot make a multi-day @@ -672,6 +695,7 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt } } const vercelMaxPages = opts.defaultVercelMaxPages ?? DEFAULT_VERCEL_MAX_PAGES + const vercelSyncDeadlineMs = opts.vercelSyncDeadlineMs ?? DEFAULT_VERCEL_SYNC_DEADLINE_MS const syncWindowMinutes = opts.defaultSyncWindowMinutes ?? DEFAULT_SYNC_WINDOW_MINUTES const pageSize = opts.defaultPageSize ?? DEFAULT_PAGE_SIZE const maxPages = opts.defaultMaxPages ?? DEFAULT_MAX_PAGES @@ -1240,6 +1264,10 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt let allEvents: NormalizedTrafficRequest[] let nextCursor: string | undefined let auditAction: string + // The instant `lastSyncedAt` advances to on success. Defaults to windowEnd + // (the pull's upper bound); the Vercel branch lowers it to the partial + // boundary when an adaptive drain stops at its deadline mid-window. + let effectiveWindowEnd = windowEnd if (sourceRow.sourceType === TrafficSourceTypes['cloud-run']) { auditAction = 'traffic.cloud-run.synced' @@ -1432,9 +1460,24 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt const lastSyncedMs = sourceRow.lastSyncedAt ? new Date(sourceRow.lastSyncedAt).getTime() : Number.NEGATIVE_INFINITY - windowStart = new Date( - Math.min(windowEnd.getTime(), Math.max(requestedStartMs, lastSyncedMs)), - ) + const clampedStartMs = Math.min(windowEnd.getTime(), Math.max(requestedStartMs, lastSyncedMs)) + // Cap how far back one sync reaches. A watermark that drifted past the cap + // (source idle while its schedule was paused/missing) would otherwise make + // the adaptive drain grind through days of sub-windows in a single sync. + // Skipping the pre-cap span is surfaced, not silent — a backfill recovers it. + const cappedStartMs = Math.max(clampedStartMs, windowEnd.getTime() - VERCEL_MAX_SYNC_WINDOW_MS) + if (cappedStartMs > clampedStartMs) { + request.log.warn( + { + sourceId: sourceRow.id, + requestedStart: new Date(clampedStartMs).toISOString(), + cappedStart: new Date(cappedStartMs).toISOString(), + }, + 'Vercel sync window exceeded the max single-sync span; clamped the start forward ' + + '(older traffic skipped — run a backfill to recover it)', + ) + } + windowStart = new Date(cappedStartMs) try { const drained = await drainVercelTrafficEvents({ @@ -1447,10 +1490,39 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt endDate: windowEnd.getTime(), pagesPerSubWindow: vercelMaxPages, maxSubWindows: VERCEL_MAX_SUB_WINDOWS, + // Bound the drain's wall-clock so a dense/slow window can't run for + // many minutes. On hit the drain stops and reports how far it got. + deadlineMs: syncStartedAtMs + vercelSyncDeadlineMs, }) if (drained.retentionClamped) { throw vercelRetentionClampError(windowStart.getTime(), drained.effectiveStartMs) } + if (drained.deadlineReached) { + if (drained.drainedThroughMs <= windowStart.getTime()) { + // No sub-window completed before the budget elapsed — request-logs is + // slow or unavailable. Fail (the catch marks the run failed) rather + // than committing an empty 'completed' window or orphaning a 'running' + // run; the schedule retries next tick. + throw new Error( + `sync exceeded its ${vercelSyncDeadlineMs}ms drain budget without ` + + 'completing any sub-window (request-logs slow or unavailable)', + ) + } + // Partial progress: commit what drained and advance `lastSyncedAt` only + // to there, so the next sync resumes from the boundary instead of + // re-pulling. The additive rollup makes a partial window safe. + effectiveWindowEnd = new Date(drained.drainedThroughMs) + request.log.warn( + { + sourceId: sourceRow.id, + drainedThrough: effectiveWindowEnd.toISOString(), + requestedEnd: windowEnd.toISOString(), + subWindows: drained.subWindowCount, + }, + 'Vercel drain hit its time budget; committing the partial window and advancing to it ' + + '— next sync resumes from here', + ) + } if (drained.truncatedSliceCount > 0) { // A one-second slice exceeded the page budget and could not be sliced // thinner. The drain ingested a sample and advanced rather than @@ -1702,11 +1774,13 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt // untouched (drizzle omits undefined fields from the SET clause). const sourceUpdate: Partial = { status: TrafficSourceStatuses.connected, - // Advance to windowEnd, not finishedAt — events arriving at the - // source between windowEnd and finishedAt aren't in this pull's - // range. If we stored finishedAt, the next sync's clamp would skip - // past them and they'd be lost. - lastSyncedAt: windowEnd.toISOString(), + // Advance to effectiveWindowEnd, not finishedAt — events arriving at the + // source between the window end and finishedAt aren't in this pull's + // range. If we stored finishedAt, the next sync's clamp would skip past + // them and they'd be lost. effectiveWindowEnd equals windowEnd on a full + // sync; for a Vercel drain that stopped at its deadline it is the partial + // boundary, so the next sync resumes exactly where this one left off. + lastSyncedAt: effectiveWindowEnd.toISOString(), lastError: null, lastEventIds: nextEventIds, updatedAt: finishedAt, @@ -1777,7 +1851,9 @@ export async function trafficRoutes(app: FastifyInstance, opts: TrafficRoutesOpt aiReferralBucketRows, sampleRows, windowStart: windowStart.toISOString(), - windowEnd: windowEnd.toISOString(), + // The window actually synced: equals windowEnd on a full sync, or the + // partial boundary when a Vercel drain stopped at its deadline. + windowEnd: effectiveWindowEnd.toISOString(), } return response }) diff --git a/packages/api-routes/test/traffic.test.ts b/packages/api-routes/test/traffic.test.ts index c8e1aa4b..ccd19c35 100644 --- a/packages/api-routes/test/traffic.test.ts +++ b/packages/api-routes/test/traffic.test.ts @@ -156,6 +156,8 @@ async function buildHarness( maxPages: number | undefined environment: string | undefined }) => VercelTrafficEventsPage + /** Wall-clock budget (ms) for the Vercel sync drain. Tests set a tiny/zero value to exercise the deadline path. */ + vercelSyncDeadlineMs?: number } = {}, ) { const trafficSyncedEvents: Array = [] @@ -312,6 +314,7 @@ async function buildHarness( endpoint: 'https://vercel.com/api/logs/request-logs', } }, + vercelSyncDeadlineMs: options.vercelSyncDeadlineMs, onTrafficSynced: (event) => { trafficSyncedEvents.push(event) }, onScheduleUpdated: (action, projectId, kind) => { scheduleUpdates.push({ action, projectId, kind }) }, }) @@ -1062,6 +1065,83 @@ describe('POST /traffic/sources/:id/sync — Vercel', () => { await h.close() } }) + + it('fails the run (not eternal running) when the drain budget elapses before any sub-window completes', async () => { + // Regression for the production wedge: a dense/slow window made the + // synchronous drain run for many minutes, timing out the caller and leaving + // the run stuck 'running'. A zero budget trips the deadline before the first + // pull, so the drain makes no progress — the route must fail the run rather + // than complete an empty window or orphan a 'running' row. + const h = await buildHarness([], { + vercelSyncDeadlineMs: 0, + vercelPullPages: ({ maxPages }) => { + if (maxPages === 1) return { events: [], rawEntryCount: 0, skippedEntryCount: 0, hasMore: false, endpoint: '' } + // Never reached — the deadline trips before the first sub-window pull. + return { events: [], rawEntryCount: 0, skippedEntryCount: 0, hasMore: true, endpoint: '' } + }, + }) + try { + const sourceId = await connectVercel(h) + const stale = backdateLastSyncedAt(h.db, sourceId, 60 * 60_000) + + const syncRes = await h.app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: {}, + }) + expect(syncRes.statusCode).toBe(502) + expect(JSON.parse(syncRes.payload).error.message).toMatch(/drain budget without completing any sub-window/) + + const sourceRow = h.db.select().from(trafficSources).where(eq(trafficSources.id, sourceId)).get()! + expect(sourceRow.status).toBe(TrafficSourceStatuses.error) + // Zero progress → the watermark must not advance. + expect(sourceRow.lastSyncedAt).toBe(stale) + + const runRows = h.db.select().from(runs).all() + expect(runRows.length).toBe(1) + expect(runRows[0].status).toBe(RunStatuses.failed) + } finally { + await h.close() + } + }) + + it('caps a drifted sync window to the last 24h instead of pulling from the stale watermark', async () => { + // A watermark that drifted days back (schedule paused/missing) must not make + // the drain request a multi-day window. The start is clamped forward to the + // cap; the skipped span is surfaced and the watermark still advances to ~now. + const observedStarts: number[] = [] + const h = await buildHarness([], { + vercelPullPages: ({ startDate, maxPages }) => { + if (maxPages !== 1) observedStarts.push(startDate) + return { events: [], rawEntryCount: 0, skippedEntryCount: 0, hasMore: false, endpoint: '' } + }, + }) + try { + const sourceId = await connectVercel(h) + backdateLastSyncedAt(h.db, sourceId, 5 * 86_400_000) // 5 days + const beforeMs = Date.now() + + const syncRes = await h.app.inject({ + method: 'POST', + url: `/api/v1/projects/test-project/traffic/sources/${sourceId}/sync`, + payload: {}, + }) + expect(syncRes.statusCode).toBe(200) + + // No real pull reached back past the 24h cap (with a minute of slack). + expect(observedStarts.length).toBeGreaterThan(0) + const earliestStart = Math.min(...observedStarts) + expect(earliestStart).toBeGreaterThanOrEqual(beforeMs - 24 * 60 * 60_000 - 60_000) + + // The capped window drained and committed, advancing past the drift. + const sourceRow = h.db.select().from(trafficSources).where(eq(trafficSources.id, sourceId)).get()! + expect(new Date(sourceRow.lastSyncedAt!).getTime()).toBeGreaterThanOrEqual(beforeMs) + expect(sourceRow.status).toBe(TrafficSourceStatuses.connected) + expect(sourceRow.lastError).toBeNull() + } finally { + await h.close() + } + }) }) describe('POST /traffic/sources/:id/backfill — Vercel', () => { diff --git a/packages/canonry/package.json b/packages/canonry/package.json index 26594bba..3bded066 100644 --- a/packages/canonry/package.json +++ b/packages/canonry/package.json @@ -1,6 +1,6 @@ { "name": "@ainyc/canonry", - "version": "4.70.0", + "version": "4.70.1", "type": "module", "description": "Agent-first open-source AEO operating platform - track how answer engines cite your domain", "license": "FSL-1.1-ALv2", diff --git a/packages/integration-vercel/AGENTS.md b/packages/integration-vercel/AGENTS.md index 2cf3b403..08cec759 100644 --- a/packages/integration-vercel/AGENTS.md +++ b/packages/integration-vercel/AGENTS.md @@ -17,7 +17,7 @@ with **no in-app instrumentation** required on the user's Vercel project. | File | Role | |------|------| | `src/client.ts` | `listVercelTrafficEvents` — page-paginated `request-logs` pull, `VercelLogsApiError` | -| `src/drain.ts` | `drainVercelTrafficEvents` — adaptive time-sub-window drain over a wide window; retention-clamps the start | +| `src/drain.ts` | `drainVercelTrafficEvents` — adaptive time-sub-window drain over a wide window; retention-clamps the start; optional wall-clock `deadlineMs` stops early with `drainedThroughMs` for resumable partial progress | | `src/normalize.ts` | `normalizeVercelLogRow` — converts a raw `request-logs` row into a `NormalizedTrafficRequest` | | `src/types.ts` | Adapter option/response shapes (`VercelRequestLogRow`, `ListVercelTrafficEventsOptions`, `VercelTrafficEventsPage`) | | `src/index.ts` | Re-exports public API | @@ -54,6 +54,16 @@ with **no in-app instrumentation** required on the user's Vercel project. safe and `lastSyncedAt` keeps moving); the **replace-mode backfill** instead fails loud on truncation so it never overwrites a full window with a partial sample. +- **Wall-clock deadline (optional).** `deadlineMs` (with an injectable `now`, + defaulting to `Date.now`) bounds a single drain's wall-clock cost: the loop + stops before starting a sub-window once the clock passes it, returning + `deadlineReached: true` and `drainedThroughMs` at the last fully-drained + boundary. An additive incremental caller commits `[startDate, drainedThroughMs]` + and advances `lastSyncedAt` there, so a dense or slow window converges over + several syncs instead of one unbounded grind — which would time out the + synchronous sync route and orphan a `running` run. Left unset, the drain runs + to completion or `maxSubWindows` (replace-mode backfill keeps that). One + in-flight pull can overrun the deadline, so the bound is approximate. - **Retention clamp.** Vercel rejects a window starting before the plan's `request-logs` retention with HTTP 400 `ExceedsBillingLimitError`. `drainVercelTrafficEvents` detects that, binary-searches the retention diff --git a/packages/integration-vercel/src/drain.ts b/packages/integration-vercel/src/drain.ts index 632abe35..f7b8bfad 100644 --- a/packages/integration-vercel/src/drain.ts +++ b/packages/integration-vercel/src/drain.ts @@ -67,6 +67,20 @@ export interface DrainVercelTrafficEventsOptions { pagesPerSubWindow: number /** Hard cap on sub-window pulls before the drain gives up. */ maxSubWindows: number + /** + * Optional wall-clock deadline (epoch ms). Checked before each sub-window + * pull: once `now() >= deadlineMs` the drain stops and returns what it has + * drained so far with `deadlineReached: true` and `drainedThroughMs` set to + * the last fully-drained instant. An additive incremental-sync caller commits + * that partial window and advances `lastSyncedAt` to `drainedThroughMs`, so a + * dense or slow window makes forward progress every run instead of grinding + * unbounded (timing out the caller and orphaning a 'running' run). Left unset, + * the drain runs until the window is fully drained or `maxSubWindows` is hit — + * the original behaviour, which replace-mode backfill keeps. + */ + deadlineMs?: number + /** Injectable clock for the deadline check; defaults to `Date.now`. Tests override it. */ + now?: () => number /** * Fail immediately if a floor-width slice overflows even `FLOOR_SLICE_MAX_PAGES` * instead of sampling-and-advancing past it. Replace-mode callers (backfill) @@ -104,6 +118,19 @@ export interface DrainVercelTrafficEventsResult { truncatedSliceCount: number /** Epoch-ms start of each truncated floor slice, for operator logging. */ truncatedSliceStartsMs: number[] + /** + * Furthest instant (epoch ms) the drain fully completed. Equals `endDate` on + * a normal full drain; on a `deadlineReached` stop it is the last cleanly + * drained sub-window boundary, so `[startDate, drainedThroughMs]` is complete + * and the caller can safely advance `lastSyncedAt` to it. + */ + drainedThroughMs: number + /** + * True when the drain stopped early because it reached `deadlineMs` before + * fully draining the window. `[drainedThroughMs, endDate]` is still undrained; + * the caller resumes from `drainedThroughMs` on the next run. + */ + deadlineReached: boolean } function toMs(value: Date | number): number { @@ -206,17 +233,25 @@ async function resolveRetainedStart( * slice so a replace-mode caller fails fast instead of draining the rest of a * window it will reject. Throws also if `maxSubWindows` is reached before the * window is fully drained. + * + * If `deadlineMs` is set, the drain stops before starting a sub-window once the + * wall clock reaches it, returning `deadlineReached: true` and `drainedThroughMs` + * at the last fully-drained boundary. This bounds a single sync's wall-clock + * cost: an additive caller commits the partial window and resumes next run, so a + * dense or slow window converges over several syncs instead of one unbounded + * grind. (One in-flight pull can overrun the deadline; the bound is approximate.) */ export async function drainVercelTrafficEvents( options: DrainVercelTrafficEventsOptions, ): Promise { const startMs = toMs(options.startDate) const endMs = toMs(options.endDate) + const now = options.now ?? (() => Date.now()) const events: NormalizedTrafficRequest[] = [] const seenEventIds = new Set() if (endMs <= startMs) { - return { events, subWindowCount: 0, effectiveStartMs: startMs, retentionClamped: false, truncatedSliceCount: 0, truncatedSliceStartsMs: [] } + return { events, subWindowCount: 0, effectiveStartMs: startMs, retentionClamped: false, truncatedSliceCount: 0, truncatedSliceStartsMs: [], drainedThroughMs: startMs, deadlineReached: false } } let cursorMs = startMs @@ -228,9 +263,18 @@ export async function drainVercelTrafficEvents( let floorSpanProbeCountdown = 0 let floorPageBudgetCountdown = 0 let truncatedSliceCount = 0 + let deadlineReached = false const truncatedSliceStartsMs: number[] = [] while (cursorMs < endMs) { + // Wall-clock budget: stop before starting another sub-window once the + // deadline passes. `cursorMs` is the last fully-drained boundary, so the + // caller can commit `[startMs, cursorMs]` and resume from there next run + // rather than letting one sync grind the whole window unbounded. + if (options.deadlineMs !== undefined && now() >= options.deadlineMs) { + deadlineReached = true + break + } if (subWindowCount >= options.maxSubWindows) { throw new Error( `Vercel window not drained within ${options.maxSubWindows} sub-windows — narrow the time range`, @@ -343,5 +387,5 @@ export async function drainVercelTrafficEvents( } } - return { events, subWindowCount, effectiveStartMs, retentionClamped, truncatedSliceCount, truncatedSliceStartsMs } + return { events, subWindowCount, effectiveStartMs, retentionClamped, truncatedSliceCount, truncatedSliceStartsMs, drainedThroughMs: cursorMs, deadlineReached } } diff --git a/packages/integration-vercel/test/drain.test.ts b/packages/integration-vercel/test/drain.test.ts index 54ed5921..f80dc41e 100644 --- a/packages/integration-vercel/test/drain.test.ts +++ b/packages/integration-vercel/test/drain.test.ts @@ -337,4 +337,57 @@ describe('drainVercelTrafficEvents', () => { expect(result.events).toEqual([]) expect(result.subWindowCount).toBe(0) }) + + test('reports a full drain through endDate with no deadline set', async () => { + const pull = vi.fn(async () => page([makeEvent('a')], false)) + const result = await drainVercelTrafficEvents({ + ...baseOptions, + pull, + startDate: 0, + endDate: 4 * HOUR, + }) + expect(result.deadlineReached).toBe(false) + expect(result.drainedThroughMs).toBe(4 * HOUR) + }) + + test('stops before the first pull and makes no progress when the deadline has already passed', async () => { + const pull = vi.fn(async () => page([makeEvent('x')], false)) + const result = await drainVercelTrafficEvents({ + ...baseOptions, + pull, + startDate: 0, + endDate: 4 * HOUR, + deadlineMs: 100, + now: () => 1_000, // already past the deadline + }) + expect(result.deadlineReached).toBe(true) + expect(result.drainedThroughMs).toBe(0) // == startDate: nothing drained + expect(result.events).toEqual([]) + expect(pull).not.toHaveBeenCalled() + }) + + test('stops at the deadline after partial progress and reports the boundary it reached', async () => { + // Slices wider than an hour overflow and get subdivided; one-hour-or-less + // slices drain cleanly. The injected clock advances one tick per sub-window + // check, so the deadline trips after several hours have drained but well + // before the full 100-hour window is done. + const pull = vi.fn(async (o: ListVercelTrafficEventsOptions) => { + const span = Number(o.endDate) - Number(o.startDate) + if (span > HOUR) return page([], true) + return page([makeEvent(`ev-${Number(o.startDate)}`)], false) + }) + let tick = 0 + const result = await drainVercelTrafficEvents({ + ...baseOptions, + pull, + startDate: 0, + endDate: 100 * HOUR, + deadlineMs: 25, + now: () => (tick += 1), + }) + expect(result.deadlineReached).toBe(true) + expect(result.drainedThroughMs).toBeGreaterThan(0) // made progress + expect(result.drainedThroughMs).toBeLessThan(100 * HOUR) // but did not finish + expect(result.events.length).toBeGreaterThan(0) + }) })