Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions STRUCTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ cloud/
├── workers/ # Cloudflare Workers (queues/cron/polling)
│ ├── clickhouseQueueConsumer.ts # Queue consumer for outbox sync
│ ├── clickhouseCron.ts # Cron trigger for re-enqueue
│ ├── clickhouseSyncLocal.ts # Local polling worker
│ └── outboxProcessor.ts # Shared processing logic
├── tests/ # Test utilities
│ ├── api.ts # API test utilities
Expand Down Expand Up @@ -298,7 +297,7 @@ PostgreSQL (OLTP)
1) spans_outbox row is created when a span is inserted (db/traces.ts)
2) Worker picks up the outbox row:
- Production: Cloudflare Queue -> clickhouseQueueConsumer.ts
- Local dev: polling loop -> clickhouseSyncLocal.ts
- Local dev: run Queue Consumer via `bun run dev` (Workers)
3) processOutboxMessages (outboxProcessor.ts):
- lock row (status=processing, lockedAt/lockedBy)
- load span + trace from Postgres
Expand Down
1 change: 0 additions & 1 deletion cloud/workers/clickhouseQueueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ export interface QueueConsumerEnvironment extends CloudflareEnvironment {
* - uuid8: First 8 characters of a UUID for uniqueness
*
* Note: In Workers environment, os.hostname() and process.pid are not available.
* This format is consistent with clickhouseSyncLocal.ts but adapted for Workers.
*/
const generateWorkerId = (queueName: string): string => {
const uuid = crypto.randomUUID().slice(0, 8);
Expand Down
4 changes: 2 additions & 2 deletions cloud/workers/outboxProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @fileoverview Outbox processor for ClickHouse sync.
*
* Shared processing logic for both Queue Consumer and local polling worker.
* Shared processing logic for the Queue Consumer.
* Handles span extraction from PostgreSQL, transformation to ClickHouse format,
* and batch insertion with retry logic.
*/
Expand Down Expand Up @@ -247,7 +247,7 @@ export const transformSpanForClickHouse = (
/**
* Process a batch of outbox messages.
*
* Shared between Queue Consumer and local polling worker.
* Shared between Queue Consumer and outbox handling utilities.
* Handles locking, transformation, ClickHouse insertion, and status updates.
*
* @param messages - Array of outbox messages to process
Expand Down
Loading