Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize write checkpoints lookups #230

Merged
merged 19 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/plenty-jokes-admire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[Postgres Storage] Fix issue when creating custom write checkpoints
9 changes: 9 additions & 0 deletions .changeset/short-experts-fetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-core': minor
'@powersync/service-image': minor
---

[MongoDB Storage] Stream write checkpoint changes instead of polling, reducing overhead for large numbers of concurrent connections
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@ import {
} from '@powersync/lib-services-framework';
import {
BroadcastIterable,
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
deserializeParameterLookup,
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
mergeAsyncIterables,
ProtocolOpId,
ReplicationCheckpoint,
storage,
utils,
WatchWriteCheckpointOptions,
CHECKPOINT_INVALIDATE_ALL,
deserializeParameterLookup
WriteCheckpointResult
} from '@powersync/service-core';
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
import { JSONBig } from '@powersync/service-jsonbig';
import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache';
import * as timers from 'timers/promises';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
Expand All @@ -41,7 +43,6 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
import { JSONBig } from '@powersync/service-jsonbig';

export class MongoSyncBucketStorage
extends BaseObserver<storage.SyncRulesBucketStorageListener>
Expand All @@ -68,7 +69,8 @@ export class MongoSyncBucketStorage
this.db = factory.db;
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
db: this.db,
mode: writeCheckpointMode
mode: writeCheckpointMode,
sync_rules_id: group_id
});
}

Expand All @@ -86,13 +88,6 @@ export class MongoSyncBucketStorage
);
}

createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise<bigint> {
return this.writeCheckpointAPI.createCustomWriteCheckpoint({
...checkpoint,
sync_rules_id: this.group_id
});
}

createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise<bigint> {
return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint);
}
Expand Down Expand Up @@ -704,8 +699,7 @@ export class MongoSyncBucketStorage
if (doc == null) {
// Sync rules not present or not active.
// Abort the connections - clients will have to retry later.
// Should this error instead?
return;
throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available');
}

yield this.makeActiveCheckpoint(doc);
Expand Down Expand Up @@ -749,7 +743,7 @@ export class MongoSyncBucketStorage
}
if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) {
// Sync rules have changed - abort and restart.
// Should this error instead?
// We do a soft close of the stream here - no error
break;
}

Expand All @@ -772,28 +766,60 @@ export class MongoSyncBucketStorage
/**
* User-specific watch on the latest checkpoint and/or write checkpoint.
*/
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
const { user_id, signal } = options;
async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
const { signal } = options;
let lastCheckpoint: utils.InternalOpId | null = null;
let lastWriteCheckpoint: bigint | null = null;
let lastWriteCheckpointDoc: WriteCheckpointResult | null = null;
let nextWriteCheckpoint: bigint | null = null;
let lastCheckpointEvent: ReplicationCheckpoint | null = null;
let receivedWriteCheckpoint = false;

const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({
user_id: options.user_id,
signal,
sync_rules_id: this.group_id
});
const iter = mergeAsyncIterables<ReplicationCheckpoint | storage.WriteCheckpointResult>(
[this.sharedIter, writeCheckpointIter],
signal
);

const iter = wrapWithAbort(this.sharedIter, signal);
for await (const event of iter) {
const { checkpoint, lsn } = event;
if ('checkpoint' in event) {
lastCheckpointEvent = event;
} else {
lastWriteCheckpointDoc = event;
receivedWriteCheckpoint = true;
}

if (lastCheckpointEvent == null || !receivedWriteCheckpoint) {
// We need to wait until we received at least on checkpoint, and one write checkpoint.
continue;
}

// lsn changes are not important by itself.
// What is important is:
// 1. checkpoint (op_id) changes.
// 2. write checkpoint changes for the specific user

const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};
const lsn = lastCheckpointEvent?.lsn;

const currentWriteCheckpoint = await this.lastWriteCheckpoint({
user_id,
heads: {
...lsnFilters
if (
lastWriteCheckpointDoc != null &&
(lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn))
) {
const writeCheckpoint = lastWriteCheckpointDoc.id;
if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) {
nextWriteCheckpoint = writeCheckpoint;
}
});
// We used the doc - clear it
lastWriteCheckpointDoc = null;
}

const { checkpoint } = lastCheckpointEvent;

const currentWriteCheckpoint = nextWriteCheckpoint;

if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) {
// No change - wait for next one
Expand All @@ -815,7 +841,7 @@ export class MongoSyncBucketStorage
lastCheckpoint = checkpoint;

yield {
base: event,
base: lastCheckpointEvent,
writeCheckpoint: currentWriteCheckpoint,
update: updates
};
Expand Down
Loading