diff --git a/cloud/eslint.config.ts b/cloud/eslint.config.ts index 0232b70f82..c3c8984fcd 100644 --- a/cloud/eslint.config.ts +++ b/cloud/eslint.config.ts @@ -19,6 +19,9 @@ export default [ globals: { ...globals.browser, ...globals.node, + // Cloudflare Workers globals + ExportedHandlerFetchHandler: "readonly", + ExecutionContext: "readonly", }, }, plugins: { diff --git a/cloud/package.json b/cloud/package.json index 8f25273d55..e70a0a76a6 100644 --- a/cloud/package.json +++ b/cloud/package.json @@ -8,6 +8,8 @@ "build": "vite build", "preview": "vite preview", "deploy": "bun run build && wrangler deploy", + "cron:dev": "bun run build && wrangler dev --test-scheduled --config dist/server/wrangler.json --port 8787", + "cron:trigger": "curl -s -o /dev/null -w \"%{http_code}\\n\" http://localhost:8787/__scheduled", "cf-typegen": "wrangler types", "db:start": "docker compose -f docker/compose.yml up -d", "db:stop": "docker compose -f docker/compose.yml down", diff --git a/cloud/server-entry.ts b/cloud/server-entry.ts new file mode 100644 index 0000000000..48afc83498 --- /dev/null +++ b/cloud/server-entry.ts @@ -0,0 +1,38 @@ +import { + createStartHandler, + defaultStreamHandler, +} from "@tanstack/react-start/server"; +import clickhouseCron, { type CronTriggerEnv } from "@/workers/clickhouseCron"; + +interface ScheduledEvent { + readonly scheduledTime: number; + readonly cron: string; +} + +const fetchHandler = createStartHandler(defaultStreamHandler); +const scheduled = clickhouseCron.scheduled.bind(clickhouseCron); + +const fetch: ExportedHandlerFetchHandler = ( + request, + env, + ctx, +) => { + if (env.ENVIRONMENT === "local") { + const { pathname } = new URL(request.url); + if (pathname === "/__scheduled") { + const event: ScheduledEvent = { + cron: "local", + scheduledTime: Date.now(), + }; + ctx.waitUntil(scheduled(event, env)); + return new Response("Ran scheduled event"); + } + } + + return fetchHandler(request); +}; + +export default { + fetch, + scheduled, +}; diff --git a/cloud/workers/clickhouseCron.ts b/cloud/workers/clickhouseCron.ts new file mode 100644 index 0000000000..897f057b32 --- /dev/null +++ b/cloud/workers/clickhouseCron.ts @@ -0,0 +1,215 @@ +/** + * @fileoverview Cloudflare Cron Trigger for outbox re-enqueue. + * + * Periodically processes pending outbox rows that failed to be + * synced or were not processed within the timeout. + * + * ## Responsibilities + * + * 1. **Lock Recovery**: Reclaims stale processing rows where the worker + * died mid-processing (lockedAt > LOCK_TIMEOUT) + * + * 2. **Process**: Polls pending rows and syncs them to ClickHouse + * + * ## Trigger Configuration + * + * Configure in wrangler.jsonc with a cron expression like "0/5 * * * *" (every 5 min). + */ + +import { Effect, Layer } from "effect"; +import { DrizzleORM } from "@/db/client"; +import { spansOutbox } from "@/db/schema"; +import { and, eq, lt, or, isNull, lte } from "drizzle-orm"; +import { ClickHouse } from "@/clickhouse/client"; +import { + SettingsService, + getSettingsFromEnvironment, + type CloudflareEnvironment, +} from "@/settings"; +import { DatabaseError } from "@/errors"; +import { processOutboxMessages } from "@/workers/outboxProcessor"; + +// ============================================================================= +// Constants +// ============================================================================= + +/** + * Lock timeout in milliseconds. + * If a row has been in 'processing' state longer than this, it's considered stale. + */ +const LOCK_TIMEOUT_MS = 30000; // 30 seconds + +/** + * Maximum retry count before a row is marked as failed. + */ +const MAX_RETRIES = 5; + +/** + * Maximum number of rows to process per cron invocation. + * Prevents overwhelming the queue. + */ +const BATCH_LIMIT = 500; + +// ============================================================================= +// Types +// ============================================================================= + +/** + * Cloudflare Scheduled Event type. + */ +interface ScheduledEvent { + readonly scheduledTime: number; + readonly cron: string; +} + +/** + * Extended Cloudflare environment bindings for Cron Trigger. + */ +export interface CronTriggerEnv extends CloudflareEnvironment { + /** Hyperdrive binding for PostgreSQL connection pooling */ + readonly HYPERDRIVE?: { + readonly connectionString: string; + }; + /** Direct database URL (fallback when Hyperdrive is not configured) */ + readonly DATABASE_URL?: string; +} + +// ============================================================================= +// Cron Handler +// ============================================================================= + +export default { + /** + * Scheduled event handler. + * + * @param _event - Cloudflare scheduled event (unused, but required by interface) + * @param env - Cloudflare Workers environment bindings + */ + async scheduled(_event: ScheduledEvent, env: CronTriggerEnv): Promise { + // Database connection string from Hyperdrive or direct URL + const databaseUrl = env.HYPERDRIVE?.connectionString ?? env.DATABASE_URL; + if (!databaseUrl) { + console.error( + "No database connection available (HYPERDRIVE or DATABASE_URL required)", + ); + return; + } + + const program = Effect.gen(function* () { + const client = yield* DrizzleORM; + const workerId = `workers-cron-${Date.now()}`; + + const staleTime = new Date(Date.now() - LOCK_TIMEOUT_MS); + const now = new Date(); + + // ===================================================================== + // 1. Reclaim stale processing rows + // ===================================================================== + // Rows that have been in 'processing' state longer than LOCK_TIMEOUT + // are likely from a worker that crashed. Reset them to 'pending'. + + yield* client + .update(spansOutbox) + .set({ + status: "pending", + lockedAt: null, + lockedBy: null, + }) + .where( + and( + eq(spansOutbox.status, "processing"), + lt(spansOutbox.lockedAt, staleTime), + ), + ) + .pipe( + Effect.mapError( + (e) => + new DatabaseError({ + message: "Failed to reclaim stale locks", + cause: e, + }), + ), + ); + + // ===================================================================== + // 2. Query pending rows ready for processing + // ===================================================================== + // Conditions: + // - status = 'pending' + // - processAfter <= now (backoff expired) + // - retryCount < MAX_RETRIES + // - not currently locked (lockedAt is null or stale) + + const pendingRows = yield* client + .select({ + spanId: spansOutbox.spanId, + operation: spansOutbox.operation, + }) + .from(spansOutbox) + .where( + and( + eq(spansOutbox.status, "pending"), + lte(spansOutbox.processAfter, now), + lt(spansOutbox.retryCount, MAX_RETRIES), + or( + isNull(spansOutbox.lockedAt), + lt(spansOutbox.lockedAt, staleTime), + ), + ), + ) + .orderBy(spansOutbox.createdAt) + .limit(BATCH_LIMIT) + .pipe( + Effect.mapError( + (e) => + new DatabaseError({ + message: "Failed to query pending rows", + cause: e, + }), + ), + ); + + if (pendingRows.length === 0) { + return; + } + + console.log(`Processing ${pendingRows.length} pending outbox rows`); + + const messages = pendingRows.map((row) => ({ + spanId: row.spanId, + operation: row.operation as "INSERT" | "UPDATE" | "DELETE", + messageKey: `${row.spanId}:${row.operation}`, + })); + + yield* processOutboxMessages( + messages, + () => {}, + () => {}, + workerId, + ); + }); + + // Build layers + const settingsLayer = Layer.succeed( + SettingsService, + getSettingsFromEnvironment(env), + ); + const drizzleLayer = DrizzleORM.layer({ connectionString: databaseUrl }); + const clickhouseLayer = ClickHouse.Default.pipe( + Layer.provide(settingsLayer), + ); + + // Run the program + await Effect.runPromise( + program.pipe( + Effect.provide(drizzleLayer), + Effect.provide(clickhouseLayer), + Effect.provide(settingsLayer), + Effect.catchAll((error) => { + console.error("Cron trigger error:", error); + return Effect.void; + }), + ), + ); + }, +}; diff --git a/cloud/workers/outboxProcessor.ts b/cloud/workers/outboxProcessor.ts index f7c8f5a975..8adbef4695 100644 --- a/cloud/workers/outboxProcessor.ts +++ b/cloud/workers/outboxProcessor.ts @@ -1,7 +1,7 @@ /** * @fileoverview Outbox processor for ClickHouse sync. * - * Shared processing logic for both Queue Consumer and local polling worker. + * Shared processing logic for the Queue Consumer and Cron Trigger. * Handles span extraction from PostgreSQL, transformation to ClickHouse format, * and batch insertion with retry logic. */ @@ -247,7 +247,7 @@ export const transformSpanForClickHouse = ( /** * Process a batch of outbox messages. * - * Shared between Queue Consumer and local polling worker. + * Shared between Queue Consumer and Cron Trigger. * Handles locking, transformation, ClickHouse insertion, and status updates. * * @param messages - Array of outbox messages to process diff --git a/cloud/wrangler.jsonc b/cloud/wrangler.jsonc index 40384e3d1d..f1bdf7f599 100644 --- a/cloud/wrangler.jsonc +++ b/cloud/wrangler.jsonc @@ -3,13 +3,13 @@ "compatibility_date": "2025-06-17", "compatibility_flags": ["nodejs_compat", "enable_nodejs_http_modules"], "$schema": "node_modules/wrangler/config-schema.json", - "main": "@tanstack/react-start/server-entry", + "main": "server-entry.ts", "observability": { "enabled": true, }, - // "triggers": { - // "crons": ["0 0 * * 7"] - // } + "triggers": { + "crons": ["*/1 * * * *"], + }, // "env": { // "production": { // "name": "mirascope",