Skip to content

Commit 833e8f2

Browse files
authored
Optimize write checkpoints lookups (#230)
* Implement write checkpoint watching, instead of polling for each connection. * Handle initial write checkpoint lookup. * Use a single shared changestream. * Cleanup. * Fixes for watching write checkpoints. * Move stream utilities to a separate folder. * Use LastValueSink for Demultiplexer. * Add tests; fix abort issue. * Fix import. * Error when sync rules are not active. * Add write checkpoint tests. * Remove createCustomWriteCheckpoint in favor of batchCreateCustomWriteCheckpoints. * Tests and fixes for custom write checkpoints. * Use a shared iterator for custom write checkpoints. * Workaround to make tests more stable. * Actual fix for the tests. * Changesets. * Fix typo.
1 parent 2cb5252 commit 833e8f2

File tree

22 files changed

+1013
-116
lines changed

22 files changed

+1013
-116
lines changed

.changeset/plenty-jokes-admire.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
[Postgres Storage] Fix issue when creating custom write checkpoints

.changeset/short-experts-fetch.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': minor
3+
'@powersync/service-core-tests': minor
4+
'@powersync/service-module-postgres': minor
5+
'@powersync/service-core': minor
6+
'@powersync/service-image': minor
7+
---
8+
9+
[MongoDB Storage] Stream write checkpoint changes instead of polling, reducing overhead for large numbers of concurrent connections

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,23 @@ import {
99
} from '@powersync/lib-services-framework';
1010
import {
1111
BroadcastIterable,
12+
CHECKPOINT_INVALIDATE_ALL,
1213
CheckpointChanges,
14+
deserializeParameterLookup,
1315
GetCheckpointChangesOptions,
1416
InternalOpId,
1517
internalToExternalOpId,
18+
mergeAsyncIterables,
1619
ProtocolOpId,
1720
ReplicationCheckpoint,
1821
storage,
1922
utils,
2023
WatchWriteCheckpointOptions,
21-
CHECKPOINT_INVALIDATE_ALL,
22-
deserializeParameterLookup
24+
WriteCheckpointResult
2325
} from '@powersync/service-core';
24-
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
26+
import { JSONBig } from '@powersync/service-jsonbig';
27+
import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
2528
import * as bson from 'bson';
26-
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
2729
import { LRUCache } from 'lru-cache';
2830
import * as timers from 'timers/promises';
2931
import { MongoBucketStorage } from '../MongoBucketStorage.js';
@@ -41,7 +43,6 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
4143
import { MongoCompactor } from './MongoCompactor.js';
4244
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4345
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
44-
import { JSONBig } from '@powersync/service-jsonbig';
4546

4647
export class MongoSyncBucketStorage
4748
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -68,7 +69,8 @@ export class MongoSyncBucketStorage
6869
this.db = factory.db;
6970
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
7071
db: this.db,
71-
mode: writeCheckpointMode
72+
mode: writeCheckpointMode,
73+
sync_rules_id: group_id
7274
});
7375
}
7476

@@ -86,13 +88,6 @@ export class MongoSyncBucketStorage
8688
);
8789
}
8890

89-
createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise<bigint> {
90-
return this.writeCheckpointAPI.createCustomWriteCheckpoint({
91-
...checkpoint,
92-
sync_rules_id: this.group_id
93-
});
94-
}
95-
9691
createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise<bigint> {
9792
return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint);
9893
}
@@ -704,8 +699,7 @@ export class MongoSyncBucketStorage
704699
if (doc == null) {
705700
// Sync rules not present or not active.
706701
// Abort the connections - clients will have to retry later.
707-
// Should this error instead?
708-
return;
702+
throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available');
709703
}
710704

711705
yield this.makeActiveCheckpoint(doc);
@@ -749,7 +743,7 @@ export class MongoSyncBucketStorage
749743
}
750744
if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) {
751745
// Sync rules have changed - abort and restart.
752-
// Should this error instead?
746+
// We do a soft close of the stream here - no error
753747
break;
754748
}
755749

@@ -772,28 +766,60 @@ export class MongoSyncBucketStorage
772766
/**
773767
* User-specific watch on the latest checkpoint and/or write checkpoint.
774768
*/
775-
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
776-
const { user_id, signal } = options;
769+
async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
770+
const { signal } = options;
777771
let lastCheckpoint: utils.InternalOpId | null = null;
778772
let lastWriteCheckpoint: bigint | null = null;
773+
let lastWriteCheckpointDoc: WriteCheckpointResult | null = null;
774+
let nextWriteCheckpoint: bigint | null = null;
775+
let lastCheckpointEvent: ReplicationCheckpoint | null = null;
776+
let receivedWriteCheckpoint = false;
777+
778+
const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({
779+
user_id: options.user_id,
780+
signal,
781+
sync_rules_id: this.group_id
782+
});
783+
const iter = mergeAsyncIterables<ReplicationCheckpoint | storage.WriteCheckpointResult>(
784+
[this.sharedIter, writeCheckpointIter],
785+
signal
786+
);
779787

780-
const iter = wrapWithAbort(this.sharedIter, signal);
781788
for await (const event of iter) {
782-
const { checkpoint, lsn } = event;
789+
if ('checkpoint' in event) {
790+
lastCheckpointEvent = event;
791+
} else {
792+
lastWriteCheckpointDoc = event;
793+
receivedWriteCheckpoint = true;
794+
}
795+
796+
if (lastCheckpointEvent == null || !receivedWriteCheckpoint) {
797+
// We need to wait until we received at least on checkpoint, and one write checkpoint.
798+
continue;
799+
}
783800

784801
// lsn changes are not important by itself.
785802
// What is important is:
786803
// 1. checkpoint (op_id) changes.
787804
// 2. write checkpoint changes for the specific user
788805

789-
const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};
806+
const lsn = lastCheckpointEvent?.lsn;
790807

791-
const currentWriteCheckpoint = await this.lastWriteCheckpoint({
792-
user_id,
793-
heads: {
794-
...lsnFilters
808+
if (
809+
lastWriteCheckpointDoc != null &&
810+
(lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn))
811+
) {
812+
const writeCheckpoint = lastWriteCheckpointDoc.id;
813+
if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) {
814+
nextWriteCheckpoint = writeCheckpoint;
795815
}
796-
});
816+
// We used the doc - clear it
817+
lastWriteCheckpointDoc = null;
818+
}
819+
820+
const { checkpoint } = lastCheckpointEvent;
821+
822+
const currentWriteCheckpoint = nextWriteCheckpoint;
797823

798824
if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) {
799825
// No change - wait for next one
@@ -815,7 +841,7 @@ export class MongoSyncBucketStorage
815841
lastCheckpoint = checkpoint;
816842

817843
yield {
818-
base: event,
844+
base: lastCheckpointEvent,
819845
writeCheckpoint: currentWriteCheckpoint,
820846
update: updates
821847
};

0 commit comments

Comments
 (0)