Skip to content

Commit 8e9a316

Browse files
authored
Merge pull request #39 from powersync-ja/sync-stats
Log user_id and sync stats for each connection
2 parents c7c3f8c + bdbf95c commit 8e9a316

File tree

7 files changed

+101
-14
lines changed

7 files changed

+101
-14
lines changed

.changeset/plenty-dryers-look.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-core': patch
3+
'@powersync/service-image': patch
4+
---
5+
6+
Log user_id and sync stats for each connection

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { streamResponse } from '../../sync/sync.js';
77
import * as util from '../../util/util-index.js';
88
import { SocketRouteGenerator } from '../router-socket.js';
99
import { SyncRoutes } from './sync-stream.js';
10+
import { RequestTracker } from '../../sync/RequestTracker.js';
1011

1112
export const syncStreamReactive: SocketRouteGenerator = (router) =>
1213
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
@@ -66,6 +67,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
6667
});
6768

6869
Metrics.getInstance().concurrent_connections.add(1);
70+
const tracker = new RequestTracker();
6971
try {
7072
for await (const data of streamResponse({
7173
storage,
@@ -79,6 +81,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
7981
// RSocket handles keepalive events by default
8082
keep_alive: false
8183
},
84+
tracker,
8285
signal: controller.signal
8386
})) {
8487
if (data == null) {
@@ -94,7 +97,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
9497
const serialized = serialize(data) as Buffer;
9598
responder.onNext({ data: serialized }, false);
9699
requestedN--;
97-
Metrics.getInstance().data_synced_bytes.add(serialized.length);
100+
tracker.addDataSynced(serialized.length);
98101
}
99102

100103
if (requestedN <= 0) {
@@ -126,6 +129,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
126129
responder.onComplete();
127130
removeStopHandler();
128131
disposer();
132+
logger.info(`Sync stream complete`, {
133+
user_id: syncParams.user_id,
134+
operations_synced: tracker.operationsSynced,
135+
data_synced_bytes: tracker.dataSyncedBytes
136+
});
129137
Metrics.getInstance().concurrent_connections.add(-1);
130138
}
131139
}

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import * as util from '../../util/util-index.js';
88
import { Metrics } from '../../metrics/Metrics.js';
99
import { authUser } from '../auth.js';
1010
import { routeDefinition } from '../router.js';
11+
import { RequestTracker } from '../../sync/RequestTracker.js';
1112

1213
export enum SyncRoutes {
1314
STREAM = '/sync/stream'
@@ -43,6 +44,7 @@ export const syncStreamed = routeDefinition({
4344
});
4445
}
4546
const controller = new AbortController();
47+
const tracker = new RequestTracker();
4648
try {
4749
Metrics.getInstance().concurrent_connections.add(1);
4850
const stream = Readable.from(
@@ -53,9 +55,11 @@ export const syncStreamed = routeDefinition({
5355
params,
5456
syncParams,
5557
token: payload.context.token_payload!,
58+
tracker,
5659
signal: controller.signal
5760
})
58-
)
61+
),
62+
tracker
5963
),
6064
{ objectMode: false, highWaterMark: 16 * 1024 }
6165
);
@@ -86,6 +90,11 @@ export const syncStreamed = routeDefinition({
8690
afterSend: async () => {
8791
controller.abort();
8892
Metrics.getInstance().concurrent_connections.add(-1);
93+
logger.info(`Sync stream complete`, {
94+
user_id: syncParams.user_id,
95+
operations_synced: tracker.operationsSynced,
96+
data_synced_bytes: tracker.dataSyncedBytes
97+
});
8998
}
9099
});
91100
} catch (ex) {
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Metrics } from '../metrics/Metrics.js';
2+
3+
/**
4+
* Record sync stats per request stream.
5+
*/
6+
export class RequestTracker {
7+
operationsSynced = 0;
8+
dataSyncedBytes = 0;
9+
10+
addOperationsSynced(operations: number) {
11+
this.operationsSynced += operations;
12+
13+
Metrics.getInstance().operations_synced_total.add(operations);
14+
}
15+
16+
addDataSynced(bytes: number) {
17+
this.dataSyncedBytes += bytes;
18+
19+
Metrics.getInstance().data_synced_bytes.add(bytes);
20+
}
21+
}

packages/service-core/src/sync/sync.ts

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { logger } from '@powersync/lib-services-framework';
1111
import { Metrics } from '../metrics/Metrics.js';
1212
import { mergeAsyncIterables } from './merge.js';
1313
import { TokenStreamOptions, tokenStream } from './util.js';
14+
import { RequestTracker } from './RequestTracker.js';
1415

1516
/**
1617
* Maximum number of connections actively fetching data.
@@ -28,12 +29,14 @@ export interface SyncStreamParameters {
2829
*/
2930
signal?: AbortSignal;
3031
tokenStreamOptions?: Partial<TokenStreamOptions>;
32+
33+
tracker: RequestTracker;
3134
}
3235

3336
export async function* streamResponse(
3437
options: SyncStreamParameters
3538
): AsyncIterable<util.StreamingSyncLine | string | null> {
36-
const { storage, params, syncParams, token, tokenStreamOptions, signal } = options;
39+
const { storage, params, syncParams, token, tokenStreamOptions, tracker, signal } = options;
3740
// We also need to be able to abort, so we create our own controller.
3841
const controller = new AbortController();
3942
if (signal) {
@@ -49,7 +52,7 @@ export async function* streamResponse(
4952
}
5053
}
5154
const ki = tokenStream(token, controller.signal, tokenStreamOptions);
52-
const stream = streamResponseInner(storage, params, syncParams, controller.signal);
55+
const stream = streamResponseInner(storage, params, syncParams, tracker, controller.signal);
5356
// Merge the two streams, and abort as soon as one of the streams end.
5457
const merged = mergeAsyncIterables([stream, ki], controller.signal);
5558

@@ -72,6 +75,7 @@ async function* streamResponseInner(
7275
storage: storage.BucketStorageFactory,
7376
params: util.StreamingSyncRequest,
7477
syncParams: RequestParameters,
78+
tracker: RequestTracker,
7579
signal: AbortSignal
7680
): AsyncGenerator<util.StreamingSyncLine | string | null> {
7781
// Bucket state of bucket id -> op_id.
@@ -109,6 +113,11 @@ async function* streamResponseInner(
109113
});
110114

111115
if (allBuckets.length > 1000) {
116+
logger.error(`Too many buckets`, {
117+
checkpoint,
118+
user_id: syncParams.user_id,
119+
buckets: allBuckets.length
120+
});
112121
// TODO: Limit number of buckets even before we get to this point
113122
throw new Error(`Too many buckets: ${allBuckets.length}`);
114123
}
@@ -137,11 +146,18 @@ async function* streamResponseInner(
137146
}
138147
bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket);
139148

140-
let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
149+
let message = `Updated checkpoint: ${checkpoint} | `;
150+
message += `write: ${writeCheckpoint} | `;
141151
message += `buckets: ${allBuckets.length} | `;
142152
message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `;
143-
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `;
144-
logger.info(message);
153+
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`;
154+
logger.info(message, {
155+
checkpoint,
156+
user_id: syncParams.user_id,
157+
buckets: allBuckets.length,
158+
updated: diff.updatedBuckets.length,
159+
removed: diff.removedBuckets.length
160+
});
145161

146162
const checksum_line: util.StreamingSyncCheckpointDiff = {
147163
checkpoint_diff: {
@@ -156,7 +172,7 @@ async function* streamResponseInner(
156172
} else {
157173
let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
158174
message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`;
159-
logger.info(message);
175+
logger.info(message, { checkpoint, user_id: syncParams.user_id, buckets: allBuckets.length });
160176
bucketsToFetch = allBuckets;
161177
const checksum_line: util.StreamingSyncCheckpoint = {
162178
checkpoint: {
@@ -172,7 +188,16 @@ async function* streamResponseInner(
172188

173189
// This incrementally updates dataBuckets with each individual bucket position.
174190
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
175-
yield* bucketDataInBatches({ storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal });
191+
yield* bucketDataInBatches({
192+
storage,
193+
checkpoint,
194+
bucketsToFetch,
195+
dataBuckets,
196+
raw_data,
197+
binary_data,
198+
signal,
199+
tracker
200+
});
176201

177202
await new Promise((resolve) => setTimeout(resolve, 10));
178203
}
@@ -186,6 +211,7 @@ interface BucketDataRequest {
186211
dataBuckets: Map<string, string>;
187212
raw_data: boolean | undefined;
188213
binary_data: boolean | undefined;
214+
tracker: RequestTracker;
189215
signal: AbortSignal;
190216
}
191217

@@ -221,11 +247,16 @@ async function* bucketDataInBatches(request: BucketDataRequest) {
221247
}
222248
}
223249

250+
interface BucketDataBatchResult {
251+
done: boolean;
252+
data: any;
253+
}
254+
224255
/**
225256
* Extracted as a separate internal function just to avoid memory leaks.
226257
*/
227-
async function* bucketDataBatch(request: BucketDataRequest) {
228-
const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request;
258+
async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<BucketDataBatchResult, void> {
259+
const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, tracker, signal } = request;
229260

230261
const [_, release] = await syncSemaphore.acquire();
231262
try {
@@ -272,7 +303,7 @@ async function* bucketDataBatch(request: BucketDataRequest) {
272303
// iterator memory in case if large data sent.
273304
yield { data: null, done: false };
274305
}
275-
Metrics.getInstance().operations_synced_total.add(r.data.length);
306+
tracker.addOperationsSynced(r.data.length);
276307

277308
dataBuckets.set(r.bucket, r.next_after);
278309
}

packages/service-core/src/sync/util.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as timers from 'timers/promises';
22

33
import * as util from '../util/util-index.js';
44
import { Metrics } from '../metrics/Metrics.js';
5+
import { RequestTracker } from './RequestTracker.js';
56

67
export type TokenStreamOptions = {
78
/**
@@ -89,10 +90,13 @@ export async function* ndjson(iterator: AsyncIterable<string | null | Record<str
8990
}
9091
}
9192

92-
export async function* transformToBytesTracked(iterator: AsyncIterable<string>): AsyncGenerator<Buffer> {
93+
export async function* transformToBytesTracked(
94+
iterator: AsyncIterable<string>,
95+
tracker: RequestTracker
96+
): AsyncGenerator<Buffer> {
9397
for await (let data of iterator) {
9498
const encoded = Buffer.from(data, 'utf8');
95-
Metrics.getInstance().data_synced_bytes.add(encoded.length);
99+
tracker.addDataSynced(encoded.length);
96100
yield encoded;
97101
}
98102
}

packages/service-core/test/src/sync.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { streamResponse } from '../../src/sync/sync.js';
99
import * as timers from 'timers/promises';
1010
import { lsnMakeComparable } from '@powersync/service-jpgwire';
1111
import { RequestParameters } from '@powersync/service-sync-rules';
12+
import { RequestTracker } from '@/sync/RequestTracker.js';
1213

1314
describe('sync - mongodb', function () {
1415
defineTests(MONGO_STORAGE_FACTORY);
@@ -38,6 +39,8 @@ bucket_definitions:
3839
`;
3940

4041
function defineTests(factory: StorageFactory) {
42+
const tracker = new RequestTracker();
43+
4144
test('sync global data', async () => {
4245
const f = await factory();
4346

@@ -78,6 +81,7 @@ function defineTests(factory: StorageFactory) {
7881
include_checksum: true,
7982
raw_data: true
8083
},
84+
tracker,
8185
syncParams: new RequestParameters({ sub: '' }, {}),
8286
token: { exp: Date.now() / 1000 + 10 } as any
8387
});
@@ -118,6 +122,7 @@ function defineTests(factory: StorageFactory) {
118122
include_checksum: true,
119123
raw_data: false
120124
},
125+
tracker,
121126
syncParams: new RequestParameters({ sub: '' }, {}),
122127
token: { exp: Date.now() / 1000 + 10 } as any
123128
});
@@ -146,6 +151,7 @@ function defineTests(factory: StorageFactory) {
146151
include_checksum: true,
147152
raw_data: true
148153
},
154+
tracker,
149155
syncParams: new RequestParameters({ sub: '' }, {}),
150156
token: { exp: 0 } as any
151157
});
@@ -172,6 +178,7 @@ function defineTests(factory: StorageFactory) {
172178
include_checksum: true,
173179
raw_data: true
174180
},
181+
tracker,
175182
syncParams: new RequestParameters({ sub: '' }, {}),
176183
token: { exp: Date.now() / 1000 + 10 } as any
177184
});
@@ -232,6 +239,7 @@ function defineTests(factory: StorageFactory) {
232239
include_checksum: true,
233240
raw_data: true
234241
},
242+
tracker,
235243
syncParams: new RequestParameters({ sub: '' }, {}),
236244
token: { exp: exp } as any
237245
});

0 commit comments

Comments
 (0)