diff --git a/packages/server/lib/controllers/records/getRecords.ts b/packages/server/lib/controllers/records/getRecords.ts index 219700b0fc..79472de81f 100644 --- a/packages/server/lib/controllers/records/getRecords.ts +++ b/packages/server/lib/controllers/records/getRecords.ts @@ -7,6 +7,7 @@ import { ENVS, metrics, parseEnvs, zodErrorToHTTP } from '@nangohq/utils'; import { connectionIdSchema, modelSchema, providerConfigKeySchema, variantSchema } from '../../helpers/validation.js'; import { asyncWrapper } from '../../utils/asyncWrapper.js'; +import { egressTelemetryRecorder } from '../../utils/egressTelemetry.js'; import type { GetPublicRecords } from '@nangohq/types'; @@ -107,6 +108,18 @@ export const getPublicRecords = asyncWrapper(async (req, res) metrics.increment(metrics.Types.GET_RECORDS_SIZE_IN_BYTES, responseSize, { accountId: account.id }); metrics.distribution(metrics.Types.GET_RECORDS_RESPONSE_SIZE_BYTES, responseSize); + egressTelemetryRecorder.record({ + accountId: account.id, + environmentId: environment.id, + environmentName: environment.name, + integrationId: headers['provider-config-key'], + connectionId: connection.connection_id, + package: 'server', + callsite: 'get_/records', + egressedBytes: responseSize, + count: 1 + }); + if (result.value.budgetTruncated) { metrics.increment(metrics.Types.RECORDS_BUDGET_TRUNCATE, 1, { accountId: account.id, diff --git a/packages/server/lib/middleware/egress-meter.middleware.ts b/packages/server/lib/middleware/egress-meter.middleware.ts index 813b10171a..1534a9bdec 100644 --- a/packages/server/lib/middleware/egress-meter.middleware.ts +++ b/packages/server/lib/middleware/egress-meter.middleware.ts @@ -3,6 +3,12 @@ import { metrics } from '@nangohq/utils'; import type { RequestLocals } from '../utils/express.js'; import type { NextFunction, Request, Response } from 'express'; +function getCallsite(req: Request): string { + const method = req.method.toLowerCase(); + const path = req.route?.path ?? ''; + return `${method}_${path}`; +} + export const egressMeterMiddleware = (req: Request, res: Response, next: NextFunction) => { if (res.locals['apiKeyAuthSource'] !== 'customer_key') { next(); @@ -15,7 +21,7 @@ export const egressMeterMiddleware = (req: Request, res: Response { if (recorded) return; const bytes = (req.socket?.bytesWritten ?? 0) - baseline; - metrics.increment(metrics.Types.EGRESS_BYTES, bytes); + metrics.increment(metrics.Types.EGRESS_BYTES, bytes, { callsite: getCallsite(req) }); recorded = true; }; diff --git a/packages/server/lib/server.ts b/packages/server/lib/server.ts index da04e40dac..bd2327fd4b 100644 --- a/packages/server/lib/server.ts +++ b/packages/server/lib/server.ts @@ -30,6 +30,7 @@ import { migrateFleets, stopFleets } from './fleet.js'; import { beginShutdown } from './ready.js'; import { router } from './routes.js'; import { tasks } from './tasks/index.js'; +import { egressTelemetryRecorder } from './utils/egressTelemetry.js'; import migrate from './utils/migrate.js'; import type { WebSocket } from 'ws'; @@ -138,6 +139,7 @@ const close = once(() => { await destroyKvstore(); await destroyFeatureFlags(); await billing.shutdown(); + await egressTelemetryRecorder.shutdown(); await pubsub.disconnect(); logger.close(); diff --git a/packages/server/lib/utils/egressTelemetry.ts b/packages/server/lib/utils/egressTelemetry.ts new file mode 100644 index 0000000000..717c861086 --- /dev/null +++ b/packages/server/lib/utils/egressTelemetry.ts @@ -0,0 +1,76 @@ +import { makeDataTransferEvent, pubsub } from '@nangohq/shared'; +import { Batcher, ENVS, getLogger, parseEnvs } from '@nangohq/utils'; + +import type { DataTransferCallsite } from '@nangohq/types'; +import type { Grouping, Result } from '@nangohq/utils'; + +const envs = parseEnvs(ENVS); +const logger = getLogger('server.egress.telemetry'); + +export interface ServerEgressTelemetry { + package: 'server'; + callsite: Extract; + accountId: number; + connectionId: string; + integrationId: string; + environmentId: number; + environmentName: string; + egressedBytes: number; + count: number; +} + +const grouping: Grouping = { + groupingKey: (t) => `${t.callsite}:${t.accountId}:${t.environmentId}:${t.integrationId}:${t.connectionId}`, + aggregate: (acc, t) => ({ + ...acc, + egressedBytes: Math.min(acc.egressedBytes + t.egressedBytes, Number.MAX_SAFE_INTEGER), + count: acc.count + t.count + }) +}; + +const batcher = new Batcher({ + maxBatchSize: envs.SERVER_EGRESS_TELEMETRY_BATCH_SIZE, + flushIntervalMs: envs.SERVER_EGRESS_TELEMETRY_FLUSH_INTERVAL_MS, + maxQueueSize: envs.SERVER_EGRESS_TELEMETRY_MAX_QUEUE_SIZE, + grouping, + logger, + process: async (events) => { + const res = await pubsub.publisher.publishBatch({ + subject: 'usage', + events: events.map((t) => + makeDataTransferEvent({ + pkg: t.package, + callsite: t.callsite, + accountId: t.accountId, + connectionId: t.connectionId, + integrationId: t.integrationId, + environmentId: t.environmentId, + environmentName: t.environmentName, + meteredBytes: { sent: t.egressedBytes, received: 0 }, + count: t.count + }) + ) + }); + if (res.isErr()) { + // throw so the Batcher re-queues and retries + throw res.error; + } + } +}); + +export const egressTelemetryRecorder = { + record(entry: ServerEgressTelemetry): void { + const res = batcher.add(entry); + if (res.isErr()) { + logger.error(`Dropped server egress telemetry: ${res.error.message}`); + } + }, + async shutdown(opts?: { timeoutMs: number }): Promise> { + const res = await batcher.shutdown(opts); + if (res.isErr()) { + logger.error(`Server egress telemetry recorder shutdown error: ${res.error.message}`); + } + + return res; + } +}; diff --git a/packages/types/lib/pubsub/events.ts b/packages/types/lib/pubsub/events.ts index dfb07eece6..b760734ffa 100644 --- a/packages/types/lib/pubsub/events.ts +++ b/packages/types/lib/pubsub/events.ts @@ -160,7 +160,8 @@ export type DataTransferCallsite = | 'proxy' | 'uncontrolled_fetch' | 'persist_logs' - | 'persist_records'; + | 'persist_records' + | 'get_/records'; export type UsageDataTransferEvent = UsageEventBase< 'usage.data_transfer', diff --git a/packages/utils/lib/environment/parse.ts b/packages/utils/lib/environment/parse.ts index 4551fd00dc..f37f68b5eb 100644 --- a/packages/utils/lib/environment/parse.ts +++ b/packages/utils/lib/environment/parse.ts @@ -33,6 +33,9 @@ export const ENVS = z.object({ NANGO_ADMIN_INVITE_TOKEN: z.string().optional(), NANGO_SERVER_PUBLIC_BODY_LIMIT: z.string().optional().default('75mb'), SERVER_SHUTDOWN_DELAY_MS: z.coerce.number().optional().default(0), + SERVER_EGRESS_TELEMETRY_BATCH_SIZE: z.coerce.number().int().positive().default(1_000), + SERVER_EGRESS_TELEMETRY_FLUSH_INTERVAL_MS: z.coerce.number().int().nonnegative().default(60_000), + SERVER_EGRESS_TELEMETRY_MAX_QUEUE_SIZE: z.coerce.number().int().positive().default(100_000), NANGO_PROXY_BASE_URL_OVERRIDE_ENABLED: z.stringbool().optional().default(true), NANGO_PROXY_BASE_URL_OVERRIDE_DENYLIST: z .string()