Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/metering/lib/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DefaultTransport } from '@nangohq/pubsub';
import { Clickhouse, getUsageTracker, migrate as migrateUsage } from '@nangohq/usage';
import { initSentry, once, report } from '@nangohq/utils';

import { billingEventsS3DLQMonitorCron } from './crons/billingEventsS3DLQMonitor.js';
import { billingEventsS3ExportCron } from './crons/billingEventsS3Export.js';
import { exportUsageCron } from './crons/usage.js';
import { e2bSandboxesDaemon } from './daemons/e2b-sandboxes.daemon.js';
Expand Down Expand Up @@ -58,6 +59,7 @@ try {
// Crons
exportUsageCron();
billingEventsS3ExportCron();
billingEventsS3DLQMonitorCron();
const e2bSandboxesDaemonHandle = e2bSandboxesDaemon();

// Graceful shutdown
Expand Down
101 changes: 101 additions & 0 deletions packages/metering/lib/crons/billingEventsS3DLQMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3';
import tracer from 'dd-trace';
import * as cron from 'node-cron';

import { getLocking } from '@nangohq/kvstore';
import { getLogger, metrics } from '@nangohq/utils';

import { envs } from '../env.js';

import type { Lock } from '@nangohq/kvstore';

const logger = getLogger('cron.billingEventsS3DLQMonitor');
const cronMinute = envs.CRON_BILLING_EVENTS_S3_DLQ_MONITOR_MINUTE;
const bucket = envs.BILLING_EVENTS_S3_DLQ_BUCKET;
const region = envs.BILLING_EVENTS_S3_REGION;

const LOCK_KEY = 'lock:cron:billingEventsS3DLQMonitor';
// Cron fires hourly; lock should expire well before the next tick.
const lockTtlMs = 30 * 60 * 1000;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it hourly? If this is detecting things being broken, wouldn't it be worth knowing faster? I might be wrong, but it doesn't look like it's an expensive operation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We upload files once a day, so knowing it - worst case - after one hour is ok. From there we have "plenty" of time to fix the issue and reexport the data. Until we do not fix the issue we would be just alerted that there are files in the DLQ that require our attention, once fixed the issue we must remove manually the files from the DLQ

Im planning to provide a full demo of all of this


const s3 = new S3Client({ region });

export function billingEventsS3DLQMonitorCron(): void {
if (cronMinute < 0) {
logger.info(`Skipping (CRON_BILLING_EVENTS_S3_DLQ_MONITOR_MINUTE=${cronMinute})`);
return;
}
if (!bucket) {
logger.warning(`Skipping (BILLING_EVENTS_S3_DLQ_BUCKET not set)`);
return;
}
cron.schedule(`${cronMinute} * * * *`, () => {
exec().catch((err: unknown) => {
logger.error('Cron tick failed unexpectedly', err);
});
});
}

export async function exec(): Promise<void> {
await tracer.trace<Promise<void>>('nango.cron.billingEventsS3DLQMonitor', async () => {
logger.info(`Starting`);
await withLock(async () => {
let success = false;
try {
const fileCount = await countObjects();
metrics.gauge(metrics.Types.BILLING_EVENTS_S3_DLQ_FILES, fileCount);
if (fileCount > 0) {
logger.warning(`DLQ bucket s3://${bucket!} has ${fileCount} file(s); alert should fire.`);
} else {
logger.info(`DLQ bucket s3://${bucket!} is empty.`);
}
success = true;
} catch (err) {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
logger.error(`Failed to inspect DLQ bucket s3://${bucket!}`, err);
} finally {
metrics.increment(metrics.Types.BILLING_EVENTS_S3_DLQ_MONITOR_RUN_RESULT, 1, { success: success ? 'true' : 'false' });
}
});
});
}

// We're already broken past 100 files; no need to paginate further.
const MAX_FILES_TO_COUNT = 100;

async function countObjects(): Promise<number> {
let count = 0;
let continuationToken: string | undefined;
do {
const res = await s3.send(
new ListObjectsV2Command({
Bucket: bucket,
ContinuationToken: continuationToken
})
);
count += res.KeyCount ?? 0;
if (count >= MAX_FILES_TO_COUNT) return MAX_FILES_TO_COUNT;
continuationToken = res.IsTruncated ? res.NextContinuationToken : undefined;
} while (continuationToken);
return count;
}

async function withLock(fn: () => Promise<void>): Promise<void> {
const locking = await getLocking();
let lock: Lock;
try {
lock = await locking.acquire(LOCK_KEY, lockTtlMs);
} catch {
logger.info(`Could not acquire lock, skipping`);
return;
}
logger.info(`Lock acquired`);
try {
await fn();
} finally {
try {
await locking.release(lock);
} catch (err) {
logger.error('Error releasing lock', { lock: lock.key, error: err });
}
}
}
5 changes: 5 additions & 0 deletions packages/utils/lib/environment/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ export const ENVS = z.object({
// ClickHouse a ~15min buffer to ingest the previous UTC day's tail before we
// snapshot it.
CRON_BILLING_EVENTS_S3_HOURLY_EXPORT_MINUTE: z.coerce.number().min(-1).max(59).optional().default(-1),
// Triggers the count of files in BILLING_EVENTS_S3_DLQ_BUCKET (must also be set). -1 disables.
CRON_BILLING_EVENTS_S3_DLQ_MONITOR_MINUTE: z.coerce.number().min(-1).max(59).optional().default(-1),

// Metering
METERING_USAGE_EVENTS_SUBSCRIBE_CONCURRENCY: z.coerce.number().int().min(1).optional().default(1),
Expand Down Expand Up @@ -341,6 +343,9 @@ export const ENVS = z.object({
BILLING_EVENTS_S3_WRITER_ROLE_ARN: z.string().optional(),
BILLING_EVENTS_S3_EVENT_NAME_SUFFIX: z.string().optional(),
BILLING_EVENTS_S3_REGION: z.string().optional().default('us-west-2'),
// DLQ bucket Orb writes to when it can't ingest a billing event. Watched by the
// metering DLQ monitor cron (CRON_BILLING_EVENTS_S3_DLQ_MONITOR_MINUTE).
BILLING_EVENTS_S3_DLQ_BUCKET: z.string().optional(),

// ClickHouse
CLICKHOUSE_URL: z.string().optional(),
Expand Down
2 changes: 2 additions & 0 deletions packages/utils/lib/telemetry/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ export enum Types {
BILLING_USAGE_CLICKHOUSE_S3_EXPORT_RUN_RESULT = 'nango.billing.usage.clickhouse.s3_export.run.result',
BILLING_USAGE_CLICKHOUSE_S3_EXPORT_DURATION_MS = 'nango.billing.usage.clickhouse.s3_export.duration_ms',
BILLING_USAGE_TRACKER_CALLS = 'nango.billing.usage.tracker.calls',
BILLING_EVENTS_S3_DLQ_FILES = 'nango.billing.events.s3.dlq.files',
BILLING_EVENTS_S3_DLQ_MONITOR_RUN_RESULT = 'nango.billing.events.s3.dlq.monitor.run.result',

USAGE_IS_CAPPED = 'nango.capping.isCapped',

Expand Down
Loading