From 6a415dd5128d6b73654f1f2c4e4ac06797c7541e Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Tue, 6 Jan 2026 16:02:45 +0900 Subject: [PATCH 1/2] fix(workers): update cron settings typing --- cloud/workers/clickhouseCron.ts | 235 ++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 cloud/workers/clickhouseCron.ts diff --git a/cloud/workers/clickhouseCron.ts b/cloud/workers/clickhouseCron.ts new file mode 100644 index 0000000000..b8c08c80dc --- /dev/null +++ b/cloud/workers/clickhouseCron.ts @@ -0,0 +1,235 @@ +/** + * @fileoverview Cloudflare Cron Trigger for outbox re-enqueue. + * + * Periodically re-enqueues pending outbox rows that failed to be + * sent to the queue or were not processed within the timeout. + * + * ## Responsibilities + * + * 1. **Lock Recovery**: Reclaims stale processing rows where the worker + * died mid-processing (lockedAt > LOCK_TIMEOUT) + * + * 2. **Re-enqueue**: Sends pending rows to Cloudflare Queue for processing + * by the Queue Consumer + * + * ## Trigger Configuration + * + * Configure in wrangler.jsonc with a cron expression like "0/5 * * * *" (every 5 min). + */ + +import { Effect, Layer } from "effect"; +import { DrizzleORM } from "@/db/client"; +import { spansOutbox } from "@/db/schema"; +import { and, eq, lt, or, isNull, lte } from "drizzle-orm"; +import { + SettingsService, + getSettingsFromEnvironment, + type CloudflareEnvironment, +} from "@/settings"; +import { DatabaseError } from "@/errors"; + +// ============================================================================= +// Constants +// ============================================================================= + +/** + * Lock timeout in milliseconds. + * If a row has been in 'processing' state longer than this, it's considered stale. + */ +const LOCK_TIMEOUT_MS = 30000; // 30 seconds + +/** + * Maximum retry count before a row is marked as failed. + */ +const MAX_RETRIES = 5; + +/** + * Maximum number of rows to process per cron invocation. + * Prevents overwhelming the queue. + */ +const BATCH_LIMIT = 500; + +// ============================================================================= +// Types +// ============================================================================= + +/** + * Cloudflare Scheduled Event type. + */ +interface ScheduledEvent { + readonly scheduledTime: number; + readonly cron: string; +} + +/** + * Cloudflare Queue binding type. + */ +interface Queue { + sendBatch( + messages: Array<{ body: { spanId: string; operation: string } }>, + ): Promise; +} + +/** + * Extended Cloudflare environment bindings for Cron Trigger. + */ +export interface CronTriggerEnv extends CloudflareEnvironment { + /** Cloudflare Queue binding for spans outbox */ + readonly SPANS_OUTBOX_QUEUE?: Queue; + /** Hyperdrive binding for PostgreSQL connection pooling */ + readonly HYPERDRIVE?: { + readonly connectionString: string; + }; + /** Direct database URL (fallback when Hyperdrive is not configured) */ + readonly DATABASE_URL?: string; +} + +// ============================================================================= +// Cron Handler +// ============================================================================= + +export default { + /** + * Scheduled event handler. + * + * @param _event - Cloudflare scheduled event (unused, but required by interface) + * @param env - Cloudflare Workers environment bindings + */ + async scheduled(_event: ScheduledEvent, env: CronTriggerEnv): Promise { + // Check required bindings + if (!env.SPANS_OUTBOX_QUEUE) { + console.warn("SPANS_OUTBOX_QUEUE binding not configured, skipping cron"); + return; + } + + // Database connection string from Hyperdrive or direct URL + const databaseUrl = env.HYPERDRIVE?.connectionString ?? env.DATABASE_URL; + if (!databaseUrl) { + console.error( + "No database connection available (HYPERDRIVE or DATABASE_URL required)", + ); + return; + } + + const queue = env.SPANS_OUTBOX_QUEUE; + + const program = Effect.gen(function* () { + const client = yield* DrizzleORM; + + const staleTime = new Date(Date.now() - LOCK_TIMEOUT_MS); + const now = new Date(); + + // ===================================================================== + // 1. Reclaim stale processing rows + // ===================================================================== + // Rows that have been in 'processing' state longer than LOCK_TIMEOUT + // are likely from a worker that crashed. Reset them to 'pending'. + + yield* client + .update(spansOutbox) + .set({ + status: "pending", + lockedAt: null, + lockedBy: null, + }) + .where( + and( + eq(spansOutbox.status, "processing"), + lt(spansOutbox.lockedAt, staleTime), + ), + ) + .pipe( + Effect.mapError( + (e) => + new DatabaseError({ + message: "Failed to reclaim stale locks", + cause: e, + }), + ), + ); + + // ===================================================================== + // 2. Query pending rows ready for processing + // ===================================================================== + // Conditions: + // - status = 'pending' + // - processAfter <= now (backoff expired) + // - retryCount < MAX_RETRIES + // - not currently locked (lockedAt is null or stale) + + const pendingRows = yield* client + .select({ + spanId: spansOutbox.spanId, + operation: spansOutbox.operation, + }) + .from(spansOutbox) + .where( + and( + eq(spansOutbox.status, "pending"), + lte(spansOutbox.processAfter, now), + lt(spansOutbox.retryCount, MAX_RETRIES), + or( + isNull(spansOutbox.lockedAt), + lt(spansOutbox.lockedAt, staleTime), + ), + ), + ) + .orderBy(spansOutbox.createdAt) + .limit(BATCH_LIMIT) + .pipe( + Effect.mapError( + (e) => + new DatabaseError({ + message: "Failed to query pending rows", + cause: e, + }), + ), + ); + + if (pendingRows.length === 0) { + return; + } + + console.log(`Re-enqueuing ${pendingRows.length} pending outbox rows`); + + // ===================================================================== + // 3. Send to Cloudflare Queue + // ===================================================================== + yield* Effect.tryPromise({ + try: () => + queue.sendBatch( + pendingRows.map((row) => ({ + body: { + spanId: row.spanId, + operation: row.operation, + }, + })), + ), + catch: (e) => + new DatabaseError({ + message: `Failed to send batch to queue: ${e instanceof Error ? e.message : String(e)}`, + cause: e, + }), + }); + }); + + // Build layers + const settingsLayer = Layer.succeed( + SettingsService, + getSettingsFromEnvironment(env), + ); + const drizzleLayer = DrizzleORM.layer({ connectionString: databaseUrl }); + + // Run the program + await Effect.runPromise( + program.pipe( + Effect.provide(drizzleLayer), + Effect.provide(settingsLayer), + Effect.catchAll((error) => { + console.error("Cron trigger error:", error); + return Effect.void; + }), + ), + ); + }, +}; From a9a52923fb14bfd27f25283aded7ae3358378f7a Mon Sep 17 00:00:00 2001 From: Koudai Aono Date: Tue, 6 Jan 2026 16:10:49 +0900 Subject: [PATCH 2/2] feat(workers): process outbox rows in cron --- cloud/eslint.config.ts | 3 ++ cloud/package.json | 2 + cloud/server-entry.ts | 38 ++++++++++++++++++ cloud/workers/clickhouseCron.ts | 66 +++++++++++--------------------- cloud/workers/outboxProcessor.ts | 4 +- cloud/wrangler.jsonc | 8 ++-- 6 files changed, 72 insertions(+), 49 deletions(-) create mode 100644 cloud/server-entry.ts diff --git a/cloud/eslint.config.ts b/cloud/eslint.config.ts index 0232b70f82..c3c8984fcd 100644 --- a/cloud/eslint.config.ts +++ b/cloud/eslint.config.ts @@ -19,6 +19,9 @@ export default [ globals: { ...globals.browser, ...globals.node, + // Cloudflare Workers globals + ExportedHandlerFetchHandler: "readonly", + ExecutionContext: "readonly", }, }, plugins: { diff --git a/cloud/package.json b/cloud/package.json index 8f25273d55..e70a0a76a6 100644 --- a/cloud/package.json +++ b/cloud/package.json @@ -8,6 +8,8 @@ "build": "vite build", "preview": "vite preview", "deploy": "bun run build && wrangler deploy", + "cron:dev": "bun run build && wrangler dev --test-scheduled --config dist/server/wrangler.json --port 8787", + "cron:trigger": "curl -s -o /dev/null -w \"%{http_code}\\n\" http://localhost:8787/__scheduled", "cf-typegen": "wrangler types", "db:start": "docker compose -f docker/compose.yml up -d", "db:stop": "docker compose -f docker/compose.yml down", diff --git a/cloud/server-entry.ts b/cloud/server-entry.ts new file mode 100644 index 0000000000..48afc83498 --- /dev/null +++ b/cloud/server-entry.ts @@ -0,0 +1,38 @@ +import { + createStartHandler, + defaultStreamHandler, +} from "@tanstack/react-start/server"; +import clickhouseCron, { type CronTriggerEnv } from "@/workers/clickhouseCron"; + +interface ScheduledEvent { + readonly scheduledTime: number; + readonly cron: string; +} + +const fetchHandler = createStartHandler(defaultStreamHandler); +const scheduled = clickhouseCron.scheduled.bind(clickhouseCron); + +const fetch: ExportedHandlerFetchHandler = ( + request, + env, + ctx, +) => { + if (env.ENVIRONMENT === "local") { + const { pathname } = new URL(request.url); + if (pathname === "/__scheduled") { + const event: ScheduledEvent = { + cron: "local", + scheduledTime: Date.now(), + }; + ctx.waitUntil(scheduled(event, env)); + return new Response("Ran scheduled event"); + } + } + + return fetchHandler(request); +}; + +export default { + fetch, + scheduled, +}; diff --git a/cloud/workers/clickhouseCron.ts b/cloud/workers/clickhouseCron.ts index b8c08c80dc..897f057b32 100644 --- a/cloud/workers/clickhouseCron.ts +++ b/cloud/workers/clickhouseCron.ts @@ -1,16 +1,15 @@ /** * @fileoverview Cloudflare Cron Trigger for outbox re-enqueue. * - * Periodically re-enqueues pending outbox rows that failed to be - * sent to the queue or were not processed within the timeout. + * Periodically processes pending outbox rows that failed to be + * synced or were not processed within the timeout. * * ## Responsibilities * * 1. **Lock Recovery**: Reclaims stale processing rows where the worker * died mid-processing (lockedAt > LOCK_TIMEOUT) * - * 2. **Re-enqueue**: Sends pending rows to Cloudflare Queue for processing - * by the Queue Consumer + * 2. **Process**: Polls pending rows and syncs them to ClickHouse * * ## Trigger Configuration * @@ -21,12 +20,14 @@ import { Effect, Layer } from "effect"; import { DrizzleORM } from "@/db/client"; import { spansOutbox } from "@/db/schema"; import { and, eq, lt, or, isNull, lte } from "drizzle-orm"; +import { ClickHouse } from "@/clickhouse/client"; import { SettingsService, getSettingsFromEnvironment, type CloudflareEnvironment, } from "@/settings"; import { DatabaseError } from "@/errors"; +import { processOutboxMessages } from "@/workers/outboxProcessor"; // ============================================================================= // Constants @@ -61,21 +62,10 @@ interface ScheduledEvent { readonly cron: string; } -/** - * Cloudflare Queue binding type. - */ -interface Queue { - sendBatch( - messages: Array<{ body: { spanId: string; operation: string } }>, - ): Promise; -} - /** * Extended Cloudflare environment bindings for Cron Trigger. */ export interface CronTriggerEnv extends CloudflareEnvironment { - /** Cloudflare Queue binding for spans outbox */ - readonly SPANS_OUTBOX_QUEUE?: Queue; /** Hyperdrive binding for PostgreSQL connection pooling */ readonly HYPERDRIVE?: { readonly connectionString: string; @@ -96,12 +86,6 @@ export default { * @param env - Cloudflare Workers environment bindings */ async scheduled(_event: ScheduledEvent, env: CronTriggerEnv): Promise { - // Check required bindings - if (!env.SPANS_OUTBOX_QUEUE) { - console.warn("SPANS_OUTBOX_QUEUE binding not configured, skipping cron"); - return; - } - // Database connection string from Hyperdrive or direct URL const databaseUrl = env.HYPERDRIVE?.connectionString ?? env.DATABASE_URL; if (!databaseUrl) { @@ -111,10 +95,9 @@ export default { return; } - const queue = env.SPANS_OUTBOX_QUEUE; - const program = Effect.gen(function* () { const client = yield* DrizzleORM; + const workerId = `workers-cron-${Date.now()}`; const staleTime = new Date(Date.now() - LOCK_TIMEOUT_MS); const now = new Date(); @@ -190,27 +173,20 @@ export default { return; } - console.log(`Re-enqueuing ${pendingRows.length} pending outbox rows`); + console.log(`Processing ${pendingRows.length} pending outbox rows`); - // ===================================================================== - // 3. Send to Cloudflare Queue - // ===================================================================== - yield* Effect.tryPromise({ - try: () => - queue.sendBatch( - pendingRows.map((row) => ({ - body: { - spanId: row.spanId, - operation: row.operation, - }, - })), - ), - catch: (e) => - new DatabaseError({ - message: `Failed to send batch to queue: ${e instanceof Error ? e.message : String(e)}`, - cause: e, - }), - }); + const messages = pendingRows.map((row) => ({ + spanId: row.spanId, + operation: row.operation as "INSERT" | "UPDATE" | "DELETE", + messageKey: `${row.spanId}:${row.operation}`, + })); + + yield* processOutboxMessages( + messages, + () => {}, + () => {}, + workerId, + ); }); // Build layers @@ -219,11 +195,15 @@ export default { getSettingsFromEnvironment(env), ); const drizzleLayer = DrizzleORM.layer({ connectionString: databaseUrl }); + const clickhouseLayer = ClickHouse.Default.pipe( + Layer.provide(settingsLayer), + ); // Run the program await Effect.runPromise( program.pipe( Effect.provide(drizzleLayer), + Effect.provide(clickhouseLayer), Effect.provide(settingsLayer), Effect.catchAll((error) => { console.error("Cron trigger error:", error); diff --git a/cloud/workers/outboxProcessor.ts b/cloud/workers/outboxProcessor.ts index f7c8f5a975..8adbef4695 100644 --- a/cloud/workers/outboxProcessor.ts +++ b/cloud/workers/outboxProcessor.ts @@ -1,7 +1,7 @@ /** * @fileoverview Outbox processor for ClickHouse sync. * - * Shared processing logic for both Queue Consumer and local polling worker. + * Shared processing logic for the Queue Consumer and Cron Trigger. * Handles span extraction from PostgreSQL, transformation to ClickHouse format, * and batch insertion with retry logic. */ @@ -247,7 +247,7 @@ export const transformSpanForClickHouse = ( /** * Process a batch of outbox messages. * - * Shared between Queue Consumer and local polling worker. + * Shared between Queue Consumer and Cron Trigger. * Handles locking, transformation, ClickHouse insertion, and status updates. * * @param messages - Array of outbox messages to process diff --git a/cloud/wrangler.jsonc b/cloud/wrangler.jsonc index 40384e3d1d..f1bdf7f599 100644 --- a/cloud/wrangler.jsonc +++ b/cloud/wrangler.jsonc @@ -3,13 +3,13 @@ "compatibility_date": "2025-06-17", "compatibility_flags": ["nodejs_compat", "enable_nodejs_http_modules"], "$schema": "node_modules/wrangler/config-schema.json", - "main": "@tanstack/react-start/server-entry", + "main": "server-entry.ts", "observability": { "enabled": true, }, - // "triggers": { - // "crons": ["0 0 * * 7"] - // } + "triggers": { + "crons": ["*/1 * * * *"], + }, // "env": { // "production": { // "name": "mirascope",