Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/server/lib/controllers/records/getRecords.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ENVS, metrics, parseEnvs, zodErrorToHTTP } from '@nangohq/utils';

import { connectionIdSchema, modelSchema, providerConfigKeySchema, variantSchema } from '../../helpers/validation.js';
import { asyncWrapper } from '../../utils/asyncWrapper.js';
import { egressTelemetryRecorder } from '../../utils/egressTelemetry.js';

import type { GetPublicRecords } from '@nangohq/types';

Expand Down Expand Up @@ -107,6 +108,18 @@ export const getPublicRecords = asyncWrapper<GetPublicRecords>(async (req, res)
metrics.increment(metrics.Types.GET_RECORDS_SIZE_IN_BYTES, responseSize, { accountId: account.id });
metrics.distribution(metrics.Types.GET_RECORDS_RESPONSE_SIZE_BYTES, responseSize);

egressTelemetryRecorder.record({
accountId: account.id,
environmentId: environment.id,
environmentName: environment.name,
integrationId: headers['provider-config-key'],
connectionId: connection.connection_id,
package: 'server',
callsite: 'get_/records',
egressedBytes: responseSize,
count: 1
});

if (result.value.budgetTruncated) {
metrics.increment(metrics.Types.RECORDS_BUDGET_TRUNCATE, 1, {
accountId: account.id,
Expand Down
8 changes: 7 additions & 1 deletion packages/server/lib/middleware/egress-meter.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { metrics } from '@nangohq/utils';
import type { RequestLocals } from '../utils/express.js';
import type { NextFunction, Request, Response } from 'express';

function getCallsite(req: Request): string {
const method = req.method.toLowerCase();
const path = req.route?.path ?? '';
return `${method}_${path}`;
}

export const egressMeterMiddleware = (req: Request, res: Response<any, RequestLocals>, next: NextFunction) => {
if (res.locals['apiKeyAuthSource'] !== 'customer_key') {
next();
Expand All @@ -15,7 +21,7 @@ export const egressMeterMiddleware = (req: Request, res: Response<any, RequestLo
const meterEgressedBytes = () => {
if (recorded) return;
const bytes = (req.socket?.bytesWritten ?? 0) - baseline;
metrics.increment(metrics.Types.EGRESS_BYTES, bytes);
metrics.increment(metrics.Types.EGRESS_BYTES, bytes, { callsite: getCallsite(req) });
recorded = true;
};

Expand Down
2 changes: 2 additions & 0 deletions packages/server/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { migrateFleets, stopFleets } from './fleet.js';
import { beginShutdown } from './ready.js';
import { router } from './routes.js';
import { tasks } from './tasks/index.js';
import { egressTelemetryRecorder } from './utils/egressTelemetry.js';
import migrate from './utils/migrate.js';

import type { WebSocket } from 'ws';
Expand Down Expand Up @@ -138,6 +139,7 @@ const close = once(() => {
await destroyKvstore();
await destroyFeatureFlags();
await billing.shutdown();
await egressTelemetryRecorder.shutdown();
await pubsub.disconnect();

logger.close();
Expand Down
76 changes: 76 additions & 0 deletions packages/server/lib/utils/egressTelemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { makeDataTransferEvent, pubsub } from '@nangohq/shared';
import { Batcher, ENVS, getLogger, parseEnvs } from '@nangohq/utils';

import type { DataTransferCallsite } from '@nangohq/types';
import type { Grouping, Result } from '@nangohq/utils';

const envs = parseEnvs(ENVS);
const logger = getLogger('server.egress.telemetry');

export interface ServerEgressTelemetry {
package: 'server';
callsite: Extract<DataTransferCallsite, 'get_/records'>;
accountId: number;
connectionId: string;
integrationId: string;
environmentId: number;
environmentName: string;
egressedBytes: number;
count: number;
}

const grouping: Grouping<ServerEgressTelemetry> = {
groupingKey: (t) => `${t.callsite}:${t.accountId}:${t.environmentId}:${t.integrationId}:${t.connectionId}`,
aggregate: (acc, t) => ({
...acc,
egressedBytes: Math.min(acc.egressedBytes + t.egressedBytes, Number.MAX_SAFE_INTEGER),
count: acc.count + t.count
})
};

const batcher = new Batcher<ServerEgressTelemetry>({
maxBatchSize: envs.SERVER_EGRESS_TELEMETRY_BATCH_SIZE,
flushIntervalMs: envs.SERVER_EGRESS_TELEMETRY_FLUSH_INTERVAL_MS,
maxQueueSize: envs.SERVER_EGRESS_TELEMETRY_MAX_QUEUE_SIZE,
grouping,
logger,
process: async (events) => {
const res = await pubsub.publisher.publishBatch({
subject: 'usage',
events: events.map((t) =>
makeDataTransferEvent({
pkg: t.package,
callsite: t.callsite,
accountId: t.accountId,
connectionId: t.connectionId,
integrationId: t.integrationId,
environmentId: t.environmentId,
environmentName: t.environmentName,
meteredBytes: { sent: t.egressedBytes, received: 0 },
count: t.count
})
)
});
if (res.isErr()) {
Comment thread
ErickRDev marked this conversation as resolved.
// throw so the Batcher re-queues and retries
throw res.error;
}
}
});

export const egressTelemetryRecorder = {
record(entry: ServerEgressTelemetry): void {
const res = batcher.add(entry);
if (res.isErr()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will tell me, but do we need some cohesive metric here for knowing when the batcher is strugling, for knowing when we are droping metrics? For clickhouse we implemented this metric for knwoing when events are dropped, either bc the queue was full, we reached max number of retries, etc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently relying on log-based filters and was planning on extracting metrics out of those logs with this. So the tl;dr is that I'm tracking it, just not with a regular metric maintained at the app level.

Alternatively, since the Batcher class is a shared utility, we could introduce a unified metric that adds a dimension based on where the Batcher is used (or something similar), so we'd be able to track specific instances of it. This feels like more than I'd like to do in this PR, though, and I'm trying to move fast with this PR to unlock the next analysis phase.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im ok on doing this in a new PR!

logger.error(`Dropped server egress telemetry: ${res.error.message}`);
}
},
async shutdown(opts?: { timeoutMs: number }): Promise<Result<void>> {
const res = await batcher.shutdown(opts);
if (res.isErr()) {
logger.error(`Server egress telemetry recorder shutdown error: ${res.error.message}`);
}

return res;
}
};
3 changes: 2 additions & 1 deletion packages/types/lib/pubsub/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ export type DataTransferCallsite =
| 'proxy'
| 'uncontrolled_fetch'
| 'persist_logs'
| 'persist_records';
| 'persist_records'
| 'get_/records';

export type UsageDataTransferEvent = UsageEventBase<
'usage.data_transfer',
Expand Down
3 changes: 3 additions & 0 deletions packages/utils/lib/environment/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export const ENVS = z.object({
NANGO_ADMIN_INVITE_TOKEN: z.string().optional(),
NANGO_SERVER_PUBLIC_BODY_LIMIT: z.string().optional().default('75mb'),
SERVER_SHUTDOWN_DELAY_MS: z.coerce.number().optional().default(0),
SERVER_EGRESS_TELEMETRY_BATCH_SIZE: z.coerce.number().int().positive().default(1_000),
SERVER_EGRESS_TELEMETRY_FLUSH_INTERVAL_MS: z.coerce.number().int().nonnegative().default(60_000),
SERVER_EGRESS_TELEMETRY_MAX_QUEUE_SIZE: z.coerce.number().int().positive().default(100_000),
Comment on lines +36 to +38

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they be prefixed with NANGO?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if they should, this boat has kind of sailed already as I introduced these two in a previous PR:

    RUNNER_TELEMETRY_BATCH_SIZE: z.coerce.number().int().positive().max(1000).default(500),
    RUNNER_TELEMETRY_FLUSH_INTERVAL_MS: z.coerce.number().int().nonnegative().default(10_000),

But I also see a bunch of other env vars without the NANGO_ prefix, so I'm not sure. Are we shooting to have that prefix as a standard?

NANGO_PROXY_BASE_URL_OVERRIDE_ENABLED: z.stringbool().optional().default(true),
NANGO_PROXY_BASE_URL_OVERRIDE_DENYLIST: z
.string()
Expand Down
Loading