Skip to content

Commit b66135a

Browse files
authored
Merge pull request #14 from powersync-ja/fix-snapshot-performance
Fix snapshot performance issues
2 parents 8d8149a + 5f5163f commit b66135a

File tree

5 files changed

+27
-12
lines changed

5 files changed

+27
-12
lines changed

.changeset/tender-emus-beat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': patch
3+
---
4+
5+
Fix performance issues and improve logging for initial snapshot replication.

.changeset/tender-emus-drum.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-jpgwire': patch
3+
---
4+
5+
Fix performance issue when reading a lot of data from a socket.

packages/jpgwire/src/pgwire_node.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class SocketAdapter {
8686
this._readPauseAsync = (resolve) => (this._readResume = resolve);
8787
this._writePauseAsync = (resolve) => (this._writeResume = resolve);
8888
this._error = null;
89+
/** @type {net.Socket} */
8990
this._socket = socket;
9091
this._socket.on('readable', (_) => this._readResume());
9192
this._socket.on('end', (_) => this._readResume());
@@ -126,16 +127,17 @@ class SocketAdapter {
126127
for (;;) {
127128
if (this._error) throw this._error; // TODO callstack
128129
if (this._socket.readableEnded) return null;
129-
buf = this._socket.read();
130+
// POWERSYNC FIX: Read only as much data as available, instead of reading everything and
131+
// unshifting back onto the socket
132+
const toRead = Math.min(out.length, this._socket.readableLength);
133+
buf = this._socket.read(toRead);
134+
130135
if (buf?.length) break;
131136
if (!buf) await new Promise(this._readPauseAsync);
132137
}
138+
133139
if (buf.length > out.length) {
134-
out.set(buf.subarray(0, out.length));
135-
this._socket.unshift(buf.subarray(out.length));
136-
// POWERSYNC: Add metrics
137-
recordBytesRead(out.length);
138-
return out.length;
140+
throw new Error('Read more data than expected');
139141
}
140142
out.set(buf);
141143
// POWERSYNC: Add metrics

packages/service-core/src/replication/WalStream.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ WHERE oid = $1::regclass`,
274274
params: [{ value: table.qualifiedName, type: 'varchar' }]
275275
});
276276
const row = results.rows[0];
277-
if (row?.[0] ?? -1n == -1n) {
277+
if ((row?.[0] ?? -1n) == -1n) {
278278
return '?';
279279
} else {
280280
return `~${row[0]}`;
@@ -374,8 +374,12 @@ WHERE oid = $1::regclass`,
374374
micro.logger.info(`${this.slot_name} Replicating ${table.qualifiedName}`);
375375
const estimatedCount = await this.estimatedCount(db, table);
376376
let at = 0;
377+
let lastLogIndex = 0;
377378
const cursor = await db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` });
378379
let columns: { i: number; name: string }[] = [];
380+
// pgwire streams rows in chunks.
381+
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.
382+
379383
for await (let chunk of cursor) {
380384
if (chunk.tag == 'RowDescription') {
381385
let i = 0;
@@ -392,22 +396,21 @@ WHERE oid = $1::regclass`,
392396
}
393397
return q;
394398
});
395-
if (at % 5000 == 0 && rows.length > 0) {
399+
if (rows.length > 0 && at - lastLogIndex >= 5000) {
396400
micro.logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
401+
lastLogIndex = at;
397402
}
398403
if (this.abort_signal.aborted) {
399404
throw new Error(`Aborted initial replication of ${this.slot_name}`);
400405
}
401406

402407
for (let record of WalStream.getQueryData(rows)) {
408+
// This auto-flushes when the batch reaches its size limit
403409
await batch.save({ tag: 'insert', sourceTable: table, before: undefined, after: record });
404410
}
405411
at += rows.length;
406412
Metrics.getInstance().rows_replicated_total.add(rows.length);
407413

408-
// pgwire streaming uses reasonable chunk sizes, so we flush at the end
409-
// of each chunk.
410-
await batch.flush();
411414
await touch();
412415
}
413416

packages/service-core/src/storage/mongo/OperationBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const MAX_BATCH_COUNT = 2000;
1212
/**
1313
* Maximum size of operations in the batch (estimated).
1414
*/
15-
const MAX_RECORD_BATCH_SIZE = 14_000_000;
15+
const MAX_RECORD_BATCH_SIZE = 5_000_000;
1616

1717
/**
1818
* Maximum size of size of current_data documents we lookup at a time.

0 commit comments

Comments
 (0)