From d31784f9fc6c27830af6baef95a8cd70b178fdba Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 14 Mar 2025 15:45:42 +0200 Subject: [PATCH 01/18] Implement write checkpoint watching, instead of polling for each connection. --- .../implementation/MongoSyncBucketStorage.ts | 63 ++++++++++---- .../implementation/MongoWriteCheckpointAPI.ts | 84 ++++++++++++++++++- .../src/storage/PostgresSyncRulesStorage.ts | 2 +- .../checkpoints/PostgresWriteCheckpointAPI.ts | 7 ++ .../test/src/checkpoints.test.ts | 2 +- .../src/storage/SyncRulesBucketStorage.ts | 2 +- .../src/storage/WriteCheckpointAPI.ts | 22 +++++ packages/service-core/src/sync/sync.ts | 2 +- 8 files changed, 163 insertions(+), 21 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 57fc44ec..5a411ff4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -9,21 +9,23 @@ import { } from '@powersync/lib-services-framework'; import { BroadcastIterable, + CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, + deserializeParameterLookup, GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, + mergeAsyncIterables, ProtocolOpId, ReplicationCheckpoint, storage, utils, WatchWriteCheckpointOptions, - CHECKPOINT_INVALIDATE_ALL, - deserializeParameterLookup + WriteCheckpointResult } from '@powersync/service-core'; -import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules'; +import { JSONBig } from '@powersync/service-jsonbig'; +import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import { LRUCache } from 'lru-cache'; import * as timers from 'timers/promises'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; @@ -41,7 +43,6 @@ import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js'; -import { JSONBig } from '@powersync/service-jsonbig'; export class MongoSyncBucketStorage extends BaseObserver @@ -772,28 +773,58 @@ export class MongoSyncBucketStorage /** * User-specific watch on the latest checkpoint and/or write checkpoint. */ - async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { - const { user_id, signal } = options; + async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { + const { signal } = options; let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; + let lastWriteCheckpointDoc: WriteCheckpointResult | null = null; + let nextWriteCheckpoint: bigint | null = null; + let lastCheckpointEvent: ReplicationCheckpoint | null = null; + + const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({ + user_id: options.user_id, + signal, + sync_rules_id: this.group_id + }); + const iter = mergeAsyncIterables( + [this.sharedIter, writeCheckpointIter], + signal + ); - const iter = wrapWithAbort(this.sharedIter, signal); for await (const event of iter) { - const { checkpoint, lsn } = event; + if ('checkpoint' in event) { + const { lsn } = event; + lastCheckpointEvent = event; + } else { + lastWriteCheckpointDoc = event; + } + + if (lastCheckpointEvent == null) { + continue; + } // lsn changes are not important by itself. // What is important is: // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - const lsnFilters: Record = lsn ? { 1: lsn } : {}; + const lsn = lastCheckpointEvent?.lsn; - const currentWriteCheckpoint = await this.lastWriteCheckpoint({ - user_id, - heads: { - ...lsnFilters + if ( + lastWriteCheckpointDoc != null && + (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn > lastWriteCheckpointDoc.lsn)) + ) { + const writeCheckpoint = lastWriteCheckpointDoc.id; + if (nextWriteCheckpoint == null || writeCheckpoint > nextWriteCheckpoint) { + nextWriteCheckpoint = writeCheckpoint; } - }); + // We used the doc - clear it + lastWriteCheckpointDoc = null; + } + + const { checkpoint } = lastCheckpointEvent; + + const currentWriteCheckpoint = nextWriteCheckpoint; if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) { // No change - wait for next one @@ -815,7 +846,7 @@ export class MongoSyncBucketStorage lastCheckpoint = checkpoint; yield { - base: event, + base: lastCheckpointEvent, writeCheckpoint: currentWriteCheckpoint, update: updates }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index fe684837..a3d6a9e3 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,5 +1,5 @@ import * as framework from '@powersync/lib-services-framework'; -import { storage } from '@powersync/service-core'; +import { storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; export type MongoCheckpointAPIOptions = { @@ -93,6 +93,88 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } + async *watchUserWriteCheckpoint( + options: WatchUserWriteCheckpointOptions + ): AsyncIterable { + switch (this.writeCheckpointMode) { + case storage.WriteCheckpointMode.CUSTOM: + return this.watchCustomWriteCheckpoint(options); + case storage.WriteCheckpointMode.MANAGED: + return this.watchManagedWriteCheckpoint(options); + } + } + + async *watchManagedWriteCheckpoint( + options: WatchUserWriteCheckpointOptions + ): AsyncIterable { + // TODO: Share a single changestream across all users + const { user_id, signal } = options; + const stream = this.db.write_checkpoints.watch( + [{ $match: { 'fullDocument.user_id': user_id, operationType: { $in: ['insert', 'update', 'replace'] } } }], + { + fullDocument: 'updateLookup' + } + ); + + signal.onabort = () => { + stream.close(); + }; + + if (signal.aborted) { + stream.close(); + return; + } + + for await (let event of stream) { + if (!('fullDocument' in event) || event.fullDocument == null) { + continue; + } + yield { + id: event.fullDocument.client_id, + lsn: event.fullDocument.lsns['1'] + }; + } + } + + async *watchCustomWriteCheckpoint( + options: WatchUserWriteCheckpointOptions + ): AsyncIterable { + const { user_id, sync_rules_id, signal } = options; + const stream = this.db.custom_write_checkpoints.watch( + [ + { + $match: { + 'fullDocument.user_id': user_id, + 'fullDocument.sync_rules_id': sync_rules_id, + operationType: { $in: ['insert', 'update', 'replace'] } + } + } + ], + { + fullDocument: 'updateLookup' + } + ); + + signal.onabort = () => { + stream.close(); + }; + + if (signal.aborted) { + stream.close(); + return; + } + + for await (let event of stream) { + if (!('fullDocument' in event) || event.fullDocument == null) { + continue; + } + yield { + id: event.fullDocument.checkpoint, + lsn: null + }; + } + } + protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) { const { user_id, sync_rules_id } = filters; const lastWriteCheckpoint = await this.db.custom_write_checkpoints.findOne({ diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index b7d896f9..03ea1d1f 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -748,7 +748,7 @@ export class PostgresSyncRulesStorage return this.makeActiveCheckpoint(activeCheckpoint); } - async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { + async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index a12c405b..d04e9412 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -86,6 +86,13 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return row!.write_checkpoint; } + watchUserWriteCheckpoint( + options: storage.WatchUserWriteCheckpointOptions + ): AsyncIterable { + // Not used for Postgres currently + throw new Error('Method not implemented.'); + } + async lastWriteCheckpoint(filters: storage.LastWriteCheckpointFilters): Promise { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index c2f72162..844e2ee8 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -30,7 +30,7 @@ describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => { const controller = new AbortController(); try { - const stream = storage.watchWriteCheckpoint({ + const stream = storage.watchCheckpointChanges({ user_id: checkpointUserId('test_user', 'test_client'), signal: controller.signal }); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index e59b2455..148f3be1 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -90,7 +90,7 @@ export interface SyncRulesBucketStorage * * The stream stops or errors if this is not the active sync rules (anymore). */ - watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable; + watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable; /** * Get a "batch" of data for a checkpoint. diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index d38ac397..4806b61c 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -50,11 +50,31 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti heads: Record; } +export interface WriteCheckpointResult { + id: bigint; + /** + * LSN for the checkpoint. + * + * This will change when we support multiple connections. + * + * For managed write checkpoints, this LSN must be exceeded by the checkpoint / replication head to be valid. + * + * For custom write checkpoints, this will be null, and the write checkpoint is valid for all LSNs. + */ + lsn: string | null; +} + export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; +export interface WatchUserWriteCheckpointOptions { + user_id: string; + sync_rules_id: number; + signal: AbortSignal; +} + export interface BaseWriteCheckpointAPI { readonly writeCheckpointMode: WriteCheckpointMode; setWriteCheckpointMode(mode: WriteCheckpointMode): void; @@ -80,6 +100,8 @@ export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; + + watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable; } export const DEFAULT_WRITE_CHECKPOINT_MODE = WriteCheckpointMode.MANAGED; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 64e55a41..7d02ad07 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -105,7 +105,7 @@ async function* streamResponseInner( after: BigInt(bucket.after) })) }); - const stream = bucketStorage.watchWriteCheckpoint({ + const stream = bucketStorage.watchCheckpointChanges({ user_id: checkpointUserId, signal }); From a505e00f8973a3f00832c0767a1c0688934584e0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 14 Mar 2025 15:54:56 +0200 Subject: [PATCH 02/18] Handle initial write checkpoint lookup. --- .../implementation/MongoWriteCheckpointAPI.ts | 93 +++++++++++++++++-- 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index a3d6a9e3..3919e7b2 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,6 +1,8 @@ import * as framework from '@powersync/lib-services-framework'; import { storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; +import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; +import { mongo } from '@powersync/lib-service-mongodb'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -108,11 +110,32 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { options: WatchUserWriteCheckpointOptions ): AsyncIterable { // TODO: Share a single changestream across all users + + let doc = null as WriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; + + await this.db.client.withSession(async (session) => { + doc = await this.db.write_checkpoints.findOne( + { + user_id: user_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime'); + } + const { user_id, signal } = options; const stream = this.db.write_checkpoints.watch( [{ $match: { 'fullDocument.user_id': user_id, operationType: { $in: ['insert', 'update', 'replace'] } } }], { - fullDocument: 'updateLookup' + fullDocument: 'updateLookup', + startAtOperationTime: clusterTime } ); @@ -125,14 +148,28 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return; } + let lastId = -1n; + + if (doc != null) { + yield { + id: doc.client_id, + lsn: doc.lsns['1'] + }; + lastId = doc.client_id; + } + for await (let event of stream) { if (!('fullDocument' in event) || event.fullDocument == null) { continue; } - yield { - id: event.fullDocument.client_id, - lsn: event.fullDocument.lsns['1'] - }; + // Guard against out-of-order events + if (event.fullDocument.client_id > lastId) { + yield { + id: event.fullDocument.client_id, + lsn: event.fullDocument.lsns['1'] + }; + lastId = event.fullDocument.client_id; + } } } @@ -140,6 +177,27 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { options: WatchUserWriteCheckpointOptions ): AsyncIterable { const { user_id, sync_rules_id, signal } = options; + + let doc = null as CustomWriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; + + await this.db.client.withSession(async (session) => { + doc = await this.db.custom_write_checkpoints.findOne( + { + user_id: user_id, + sync_rules_id: sync_rules_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime'); + } + const stream = this.db.custom_write_checkpoints.watch( [ { @@ -151,7 +209,8 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } ], { - fullDocument: 'updateLookup' + fullDocument: 'updateLookup', + startAtOperationTime: clusterTime } ); @@ -164,14 +223,28 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return; } + let lastId = -1n; + + if (doc != null) { + yield { + id: doc.checkpoint, + lsn: null + }; + lastId = doc.checkpoint; + } + for await (let event of stream) { if (!('fullDocument' in event) || event.fullDocument == null) { continue; } - yield { - id: event.fullDocument.checkpoint, - lsn: null - }; + // Guard against out-of-order events + if (event.fullDocument.checkpoint > lastId) { + yield { + id: event.fullDocument.checkpoint, + lsn: null + }; + lastId = event.fullDocument.checkpoint; + } } } From 9e16d1a2e54b775f6cae8d03edafb0dfc1b25c8a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 14 Mar 2025 16:48:16 +0200 Subject: [PATCH 03/18] Use a single shared changestream. --- .../implementation/MongoWriteCheckpointAPI.ts | 263 +++++++++++++++--- 1 file changed, 226 insertions(+), 37 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 3919e7b2..b01cbd6f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,8 +1,12 @@ import * as framework from '@powersync/lib-services-framework'; -import { storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; +import { IterableSource, storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; import { mongo } from '@powersync/lib-service-mongodb'; +import { AbortError } from 'ix/aborterror.js'; +import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; + +import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -106,39 +110,84 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - async *watchManagedWriteCheckpoint( - options: WatchUserWriteCheckpointOptions - ): AsyncIterable { - // TODO: Share a single changestream across all users - - let doc = null as WriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; + private sharedIter = new FilteredIterable((signal) => { + const clusterTimePromise = (async () => { + const hello = await this.db.db.command({ hello: 1 }); + // Note: This is not valid on sharded clusters. + const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + startClusterTime; + return startClusterTime; + })(); + + return { + iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal), + getFirstValue: async (user_id: string) => { + // Potential race conditions we cater for: + + // Case 1: changestream is behind. + // We get a doc now, then the same or older doc again later. + // No problem! + + // Case 2: Query is behind. I.e. doc has been created, and emitted on the changestream, but the query doesn't see it yet. + // Not possible luckily, but can we make sure? + + // Case 3: changestream delays openeing. A doc is created after our query here, but before the changestream is opened. + // Awaiting clusterTimePromise should be sufficient here, but as a sanity check we also confirm that our query + // timestamp is > the startClusterTime. + + const changeStreamStart = await clusterTimePromise; + + let doc = null as WriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; + + await this.db.client.withSession(async (session) => { + doc = await this.db.write_checkpoints.findOne( + { + user_id: user_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); + } - await this.db.client.withSession(async (session) => { - doc = await this.db.write_checkpoints.findOne( - { - user_id: user_id - }, - { - session + if (clusterTime.lessThan(changeStreamStart)) { + throw new framework.ServiceAssertionError( + 'clusterTime for write checkpoint is older than changestream start' + ); } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime'); - } - const { user_id, signal } = options; + return doc; + } + }; + }); + + private async *watchAllManagedWriteCheckpoints( + clusterTimePromise: Promise, + signal: AbortSignal + ): AsyncGenerator> { + const clusterTime = await clusterTimePromise; + const stream = this.db.write_checkpoints.watch( - [{ $match: { 'fullDocument.user_id': user_id, operationType: { $in: ['insert', 'update', 'replace'] } } }], + [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }], { fullDocument: 'updateLookup', startAtOperationTime: clusterTime } ); + const hello = await this.db.db.command({ hello: 1 }); + // Note: This is not valid on sharded clusters. + const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + if (startClusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime'); + } + signal.onabort = () => { stream.close(); }; @@ -148,27 +197,34 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return; } - let lastId = -1n; + for await (let event of stream) { + if (!('fullDocument' in event) || event.fullDocument == null) { + continue; + } - if (doc != null) { + const user_id = event.fullDocument.user_id; yield { - id: doc.client_id, - lsn: doc.lsns['1'] + key: user_id, + value: event.fullDocument }; - lastId = doc.client_id; } + } - for await (let event of stream) { - if (!('fullDocument' in event) || event.fullDocument == null) { - continue; - } + async *watchManagedWriteCheckpoint( + options: WatchUserWriteCheckpointOptions + ): AsyncIterable { + const stream = this.sharedIter.subscribe(options.user_id, options.signal); + + let lastId = -1n; + + for await (let doc of stream) { // Guard against out-of-order events - if (event.fullDocument.client_id > lastId) { + if (doc.client_id > lastId) { yield { - id: event.fullDocument.client_id, - lsn: event.fullDocument.lsns['1'] + id: doc.client_id, + lsn: doc.lsns['1'] }; - lastId = event.fullDocument.client_id; + lastId = doc.client_id; } } } @@ -297,3 +353,136 @@ export async function batchCreateCustomWriteCheckpoints( {} ); } + +export interface FilterIterableValue { + key: string; + value: T; +} + +interface FilteredIterableSource { + iterator: AsyncIterable>; + getFirstValue(key: string): Promise; +} + +type FilteredIterableSourceFactory = (signal: AbortSignal) => FilteredIterableSource; + +export class FilteredIterable { + private subscribers: Map>> | undefined = undefined; + private abortController: AbortController | undefined = undefined; + private currentSource: FilteredIterableSource | undefined = undefined; + + constructor(private source: FilteredIterableSourceFactory) {} + + private start(filter: string, sink: AsyncSink) { + const abortController = new AbortController(); + const listeners = new Map(); + listeners.set(filter, new Set([sink])); + + this.abortController = abortController; + this.subscribers = listeners; + + const source = this.source(abortController.signal); + this.currentSource = source; + this.loop(source, abortController, listeners); + return source; + } + + private async loop( + source: FilteredIterableSource, + abortController: AbortController, + sinks: Map>> + ) { + try { + for await (let doc of source.iterator) { + if (abortController.signal.aborted || sinks.size == 0) { + throw new AbortError(); + } + const key = doc.key; + const keySinks = sinks.get(key); + if (keySinks == null) { + continue; + } + + for (let sink of keySinks) { + sink.write(doc.value); + } + } + + // End of stream + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.end(); + } + } + } catch (e) { + // Just in case the error is not from the source + abortController.abort(); + + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.error(e); + } + } + } finally { + // Clear state, so that a new subscription may be started + if (this.subscribers === sinks) { + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + } + + private removeSink(key: string, sink: AsyncSink) { + const existing = this.subscribers?.get(key); + if (existing == null) { + return; + } + existing.delete(sink); + if (existing.size == 0) { + this.subscribers!.delete(key); + } + + if (this.subscribers?.size == 0) { + // This is not immediate - there may be a delay until it is fully stopped, + // depending on the underlying source. + this.abortController?.abort(); + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + + private addSink(key: string, sink: AsyncSink) { + if (this.currentSource == null) { + return this.start(key, sink); + } else { + const existing = this.subscribers!.get(key); + if (existing != null) { + existing.add(sink); + } else { + this.subscribers!.set(key, new Set([sink])); + } + return this.currentSource; + } + } + + async *subscribe(key: string, signal: AbortSignal): AsyncIterable { + const sink = new AsyncSink(); + // Important that we register the sink before calling getFirstValue(). + const source = this.addSink(key, sink); + try { + const firstValue = await source.getFirstValue(key); + if (firstValue != null) { + yield firstValue; + } + yield* wrapWithAbort(sink, signal); + } finally { + this.removeSink(key, sink); + } + } + + get active() { + return this.subscribers != null; + } +} From 6ba51b1439d3297fb9ccb5b4330f5e34ac4083a0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Fri, 14 Mar 2025 16:50:08 +0200 Subject: [PATCH 04/18] Cleanup. --- .../src/storage/implementation/MongoWriteCheckpointAPI.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index b01cbd6f..5b1dd4ec 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,10 +1,10 @@ -import * as framework from '@powersync/lib-services-framework'; -import { IterableSource, storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; -import { PowerSyncMongo } from './db.js'; -import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; import { mongo } from '@powersync/lib-service-mongodb'; +import * as framework from '@powersync/lib-services-framework'; +import { storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; import { AbortError } from 'ix/aborterror.js'; import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; +import { PowerSyncMongo } from './db.js'; +import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; From ced791246e9b4338b2c65efc9d516c30e1a44b15 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 17 Mar 2025 10:28:40 +0200 Subject: [PATCH 05/18] Fixes for watching write checkpoints. --- .../implementation/MongoSyncBucketStorage.ts | 10 ++-- .../implementation/MongoWriteCheckpointAPI.ts | 46 +++++++++++-------- .../src/storage/WriteCheckpointAPI.ts | 8 +++- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 5a411ff4..b40a0147 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -780,6 +780,7 @@ export class MongoSyncBucketStorage let lastWriteCheckpointDoc: WriteCheckpointResult | null = null; let nextWriteCheckpoint: bigint | null = null; let lastCheckpointEvent: ReplicationCheckpoint | null = null; + let receivedWriteCheckpoint = false; const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({ user_id: options.user_id, @@ -793,13 +794,14 @@ export class MongoSyncBucketStorage for await (const event of iter) { if ('checkpoint' in event) { - const { lsn } = event; lastCheckpointEvent = event; } else { lastWriteCheckpointDoc = event; + receivedWriteCheckpoint = true; } - if (lastCheckpointEvent == null) { + if (lastCheckpointEvent == null || !receivedWriteCheckpoint) { + // We need to wait until we received at least on checkpoint, and one write checkpoint. continue; } @@ -812,10 +814,10 @@ export class MongoSyncBucketStorage if ( lastWriteCheckpointDoc != null && - (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn > lastWriteCheckpointDoc.lsn)) + (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn)) ) { const writeCheckpoint = lastWriteCheckpointDoc.id; - if (nextWriteCheckpoint == null || writeCheckpoint > nextWriteCheckpoint) { + if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) { nextWriteCheckpoint = writeCheckpoint; } // We used the doc - clear it diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 5b1dd4ec..9f10f8c9 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,6 +1,6 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { storage, WatchUserWriteCheckpointOptions } from '@powersync/service-core'; +import { storage, WatchUserWriteCheckpointOptions, WriteCheckpointResult } from '@powersync/service-core'; import { AbortError } from 'ix/aborterror.js'; import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; import { PowerSyncMongo } from './db.js'; @@ -99,18 +99,18 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - async *watchUserWriteCheckpoint( - options: WatchUserWriteCheckpointOptions - ): AsyncIterable { + watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: return this.watchCustomWriteCheckpoint(options); case storage.WriteCheckpointMode.MANAGED: return this.watchManagedWriteCheckpoint(options); + default: + throw new Error('Invalid write checkpoint mode'); } } - private sharedIter = new FilteredIterable((signal) => { + private sharedIter = new FilteredIterable((signal) => { const clusterTimePromise = (async () => { const hello = await this.db.db.command({ hello: 1 }); // Note: This is not valid on sharded clusters. @@ -162,7 +162,17 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { ); } - return doc; + if (doc == null) { + return { + id: null, + lsn: null + }; + } + + return { + id: doc.client_id, + lsn: doc.lsns['1'] + }; } }; }); @@ -170,7 +180,7 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { private async *watchAllManagedWriteCheckpoints( clusterTimePromise: Promise, signal: AbortSignal - ): AsyncGenerator> { + ): AsyncGenerator> { const clusterTime = await clusterTimePromise; const stream = this.db.write_checkpoints.watch( @@ -205,7 +215,10 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { const user_id = event.fullDocument.user_id; yield { key: user_id, - value: event.fullDocument + value: { + id: event.fullDocument.client_id, + lsn: event.fullDocument.lsns['1'] + } }; } } @@ -219,12 +232,11 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { for await (let doc of stream) { // Guard against out-of-order events - if (doc.client_id > lastId) { - yield { - id: doc.client_id, - lsn: doc.lsns['1'] - }; - lastId = doc.client_id; + if (lastId == -1n || (doc.id != null && doc.id > lastId)) { + yield doc; + if (doc.id != null) { + lastId = doc.id; + } } } } @@ -361,7 +373,7 @@ export interface FilterIterableValue { interface FilteredIterableSource { iterator: AsyncIterable>; - getFirstValue(key: string): Promise; + getFirstValue(key: string): Promise; } type FilteredIterableSourceFactory = (signal: AbortSignal) => FilteredIterableSource; @@ -473,9 +485,7 @@ export class FilteredIterable { const source = this.addSink(key, sink); try { const firstValue = await source.getFirstValue(key); - if (firstValue != null) { - yield firstValue; - } + yield firstValue; yield* wrapWithAbort(sink, signal); } finally { this.removeSink(key, sink); diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index 4806b61c..31126818 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -51,7 +51,13 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti } export interface WriteCheckpointResult { - id: bigint; + /** + * Write checkpoint id (also referred to as client_id). + * + * If null, there is no write checkpoint for the client. + */ + id: bigint | null; + /** * LSN for the checkpoint. * From 2a510244b4fb7bb32b26058221ee15c55ba44fda Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 17 Mar 2025 10:47:26 +0200 Subject: [PATCH 06/18] Move stream utilities to a separate folder. --- .../implementation/MongoWriteCheckpointAPI.ts | 140 +--------------- packages/service-core/src/index.ts | 3 + .../{sync => streams}/BroadcastIterable.ts | 0 .../service-core/src/streams/Demultiplexer.ts | 157 ++++++++++++++++++ .../src/{sync => streams}/LastValueSink.ts | 0 .../src/{sync => streams}/merge.ts | 2 +- .../service-core/src/streams/streams-index.ts | 4 + packages/service-core/src/sync/sync-index.ts | 3 - packages/service-core/src/sync/sync.ts | 2 +- .../test/src/broadcast_iterable.test.ts | 2 +- .../test/src/merge_iterable.test.ts | 2 +- 11 files changed, 171 insertions(+), 144 deletions(-) rename packages/service-core/src/{sync => streams}/BroadcastIterable.ts (100%) create mode 100644 packages/service-core/src/streams/Demultiplexer.ts rename packages/service-core/src/{sync => streams}/LastValueSink.ts (100%) rename packages/service-core/src/{sync => streams}/merge.ts (99%) create mode 100644 packages/service-core/src/streams/streams-index.ts diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 9f10f8c9..cad45e34 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,12 +1,9 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; import { storage, WatchUserWriteCheckpointOptions, WriteCheckpointResult } from '@powersync/service-core'; -import { AbortError } from 'ix/aborterror.js'; -import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; import { PowerSyncMongo } from './db.js'; import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; - -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; +import { Demultiplexer, DemultiplexerValue } from '@powersync/service-core/src/streams/Demultiplexer.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; @@ -110,7 +107,7 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - private sharedIter = new FilteredIterable((signal) => { + private sharedIter = new Demultiplexer((signal) => { const clusterTimePromise = (async () => { const hello = await this.db.db.command({ hello: 1 }); // Note: This is not valid on sharded clusters. @@ -180,7 +177,7 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { private async *watchAllManagedWriteCheckpoints( clusterTimePromise: Promise, signal: AbortSignal - ): AsyncGenerator> { + ): AsyncGenerator> { const clusterTime = await clusterTimePromise; const stream = this.db.write_checkpoints.watch( @@ -365,134 +362,3 @@ export async function batchCreateCustomWriteCheckpoints( {} ); } - -export interface FilterIterableValue { - key: string; - value: T; -} - -interface FilteredIterableSource { - iterator: AsyncIterable>; - getFirstValue(key: string): Promise; -} - -type FilteredIterableSourceFactory = (signal: AbortSignal) => FilteredIterableSource; - -export class FilteredIterable { - private subscribers: Map>> | undefined = undefined; - private abortController: AbortController | undefined = undefined; - private currentSource: FilteredIterableSource | undefined = undefined; - - constructor(private source: FilteredIterableSourceFactory) {} - - private start(filter: string, sink: AsyncSink) { - const abortController = new AbortController(); - const listeners = new Map(); - listeners.set(filter, new Set([sink])); - - this.abortController = abortController; - this.subscribers = listeners; - - const source = this.source(abortController.signal); - this.currentSource = source; - this.loop(source, abortController, listeners); - return source; - } - - private async loop( - source: FilteredIterableSource, - abortController: AbortController, - sinks: Map>> - ) { - try { - for await (let doc of source.iterator) { - if (abortController.signal.aborted || sinks.size == 0) { - throw new AbortError(); - } - const key = doc.key; - const keySinks = sinks.get(key); - if (keySinks == null) { - continue; - } - - for (let sink of keySinks) { - sink.write(doc.value); - } - } - - // End of stream - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.end(); - } - } - } catch (e) { - // Just in case the error is not from the source - abortController.abort(); - - for (let keySinks of sinks.values()) { - for (let sink of keySinks) { - sink.error(e); - } - } - } finally { - // Clear state, so that a new subscription may be started - if (this.subscribers === sinks) { - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - } - - private removeSink(key: string, sink: AsyncSink) { - const existing = this.subscribers?.get(key); - if (existing == null) { - return; - } - existing.delete(sink); - if (existing.size == 0) { - this.subscribers!.delete(key); - } - - if (this.subscribers?.size == 0) { - // This is not immediate - there may be a delay until it is fully stopped, - // depending on the underlying source. - this.abortController?.abort(); - this.subscribers = undefined; - this.abortController = undefined; - this.currentSource = undefined; - } - } - - private addSink(key: string, sink: AsyncSink) { - if (this.currentSource == null) { - return this.start(key, sink); - } else { - const existing = this.subscribers!.get(key); - if (existing != null) { - existing.add(sink); - } else { - this.subscribers!.set(key, new Set([sink])); - } - return this.currentSource; - } - } - - async *subscribe(key: string, signal: AbortSignal): AsyncIterable { - const sink = new AsyncSink(); - // Important that we register the sink before calling getFirstValue(). - const source = this.addSink(key, sink); - try { - const firstValue = await source.getFirstValue(key); - yield firstValue; - yield* wrapWithAbort(sink, signal); - } finally { - this.removeSink(key, sink); - } - } - - get active() { - return this.subscribers != null; - } -} diff --git a/packages/service-core/src/index.ts b/packages/service-core/src/index.ts index 9a90ceeb..e0c8a286 100644 --- a/packages/service-core/src/index.ts +++ b/packages/service-core/src/index.ts @@ -38,3 +38,6 @@ export * as system from './system/system-index.js'; export * from './util/util-index.js'; export * as utils from './util/util-index.js'; + +export * from './streams/streams-index.js'; +export * as streams from './streams/streams-index.js'; diff --git a/packages/service-core/src/sync/BroadcastIterable.ts b/packages/service-core/src/streams/BroadcastIterable.ts similarity index 100% rename from packages/service-core/src/sync/BroadcastIterable.ts rename to packages/service-core/src/streams/BroadcastIterable.ts diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts new file mode 100644 index 00000000..6ec6c769 --- /dev/null +++ b/packages/service-core/src/streams/Demultiplexer.ts @@ -0,0 +1,157 @@ +import { AbortError } from 'ix/aborterror.js'; +import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; +import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; + +export interface DemultiplexerValue { + /** + * The key used for demultiplexing, for example the user id. + */ + key: string; + /** + * The stream value. + */ + value: T; +} + +export interface DemultiplexerSource { + iterator: AsyncIterable>; + getFirstValue(key: string): Promise; +} + +export type DemultiplexerSourceFactory = (signal: AbortSignal) => DemultiplexerSource; + +/** + * Takes a multiplexed stream (e.g. a changestream covering many individual users), + * and allows subscribing to individual streams. + * + * The source subscription is lazy: + * 1. We only start subscribing when there is a downstream subscriber. + * 2. When all downstream subscriptions have ended, we end the source subscription. + * + * The Demultiplexer does not handle backpressure. If subscribers are slow, a queue may build up + * for each. + */ +export class Demultiplexer { + private subscribers: Map>> | undefined = undefined; + private abortController: AbortController | undefined = undefined; + private currentSource: DemultiplexerSource | undefined = undefined; + + constructor(private source: DemultiplexerSourceFactory) {} + + private start(filter: string, sink: AsyncSink) { + const abortController = new AbortController(); + const listeners = new Map(); + listeners.set(filter, new Set([sink])); + + this.abortController = abortController; + this.subscribers = listeners; + + const source = this.source(abortController.signal); + this.currentSource = source; + this.loop(source, abortController, listeners); + return source; + } + + private async loop( + source: DemultiplexerSource, + abortController: AbortController, + sinks: Map>> + ) { + try { + for await (let doc of source.iterator) { + if (abortController.signal.aborted || sinks.size == 0) { + throw new AbortError(); + } + const key = doc.key; + const keySinks = sinks.get(key); + if (keySinks == null) { + continue; + } + + for (let sink of keySinks) { + sink.write(doc.value); + } + } + + // End of stream + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.end(); + } + } + } catch (e) { + // Just in case the error is not from the source + abortController.abort(); + + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.error(e); + } + } + } finally { + // Clear state, so that a new subscription may be started + if (this.subscribers === sinks) { + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + } + + private removeSink(key: string, sink: AsyncSink) { + const existing = this.subscribers?.get(key); + if (existing == null) { + return; + } + existing.delete(sink); + if (existing.size == 0) { + this.subscribers!.delete(key); + } + + if (this.subscribers?.size == 0) { + // This is not immediate - there may be a delay until it is fully stopped, + // depending on the underlying source. + this.abortController?.abort(); + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + + private addSink(key: string, sink: AsyncSink) { + if (this.currentSource == null) { + return this.start(key, sink); + } else { + const existing = this.subscribers!.get(key); + if (existing != null) { + existing.add(sink); + } else { + this.subscribers!.set(key, new Set([sink])); + } + return this.currentSource; + } + } + + /** + * Subscribe to a specific stream. + * + * @param key The key used for demultiplexing, e.g. user id. + * @param signal + */ + async *subscribe(key: string, signal: AbortSignal): AsyncIterable { + const sink = new AsyncSink(); + // Important that we register the sink before calling getFirstValue(). + const source = this.addSink(key, sink); + try { + const firstValue = await source.getFirstValue(key); + yield firstValue; + yield* wrapWithAbort(sink, signal); + } finally { + this.removeSink(key, sink); + } + } + + get active() { + return this.subscribers != null; + } +} diff --git a/packages/service-core/src/sync/LastValueSink.ts b/packages/service-core/src/streams/LastValueSink.ts similarity index 100% rename from packages/service-core/src/sync/LastValueSink.ts rename to packages/service-core/src/streams/LastValueSink.ts diff --git a/packages/service-core/src/sync/merge.ts b/packages/service-core/src/streams/merge.ts similarity index 99% rename from packages/service-core/src/sync/merge.ts rename to packages/service-core/src/streams/merge.ts index fcc6aca7..ef046fbb 100644 --- a/packages/service-core/src/sync/merge.ts +++ b/packages/service-core/src/streams/merge.ts @@ -1,7 +1,7 @@ import { throwIfAborted } from 'ix/aborterror.js'; import { AsyncIterableX } from 'ix/asynciterable/index.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; -import { safeRace } from './safeRace.js'; +import { safeRace } from '../sync/safeRace.js'; /** * Merge multiple source AsyncIterables into one output AsyncIterable. diff --git a/packages/service-core/src/streams/streams-index.ts b/packages/service-core/src/streams/streams-index.ts new file mode 100644 index 00000000..0d8eafa4 --- /dev/null +++ b/packages/service-core/src/streams/streams-index.ts @@ -0,0 +1,4 @@ +export * from './merge.js'; +export * from './Demultiplexer.js'; +export * from './LastValueSink.js'; +export * from './BroadcastIterable.js'; diff --git a/packages/service-core/src/sync/sync-index.ts b/packages/service-core/src/sync/sync-index.ts index d43145d3..df47b0da 100644 --- a/packages/service-core/src/sync/sync-index.ts +++ b/packages/service-core/src/sync/sync-index.ts @@ -1,6 +1,3 @@ -export * from './BroadcastIterable.js'; -export * from './LastValueSink.js'; -export * from './merge.js'; export * from './RequestTracker.js'; export * from './safeRace.js'; export * from './sync.js'; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 7d02ad07..8c3dafc1 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -9,7 +9,7 @@ import * as util from '../util/util-index.js'; import { logger } from '@powersync/lib-services-framework'; import { BucketChecksumState } from './BucketChecksumState.js'; -import { mergeAsyncIterables } from './merge.js'; +import { mergeAsyncIterables } from '../streams/streams-index.js'; import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; import { SyncContext } from './SyncContext.js'; import { RequestTracker } from './RequestTracker.js'; diff --git a/packages/service-core/test/src/broadcast_iterable.test.ts b/packages/service-core/test/src/broadcast_iterable.test.ts index 4d2cced7..aa062e5a 100644 --- a/packages/service-core/test/src/broadcast_iterable.test.ts +++ b/packages/service-core/test/src/broadcast_iterable.test.ts @@ -1,4 +1,4 @@ -import { BroadcastIterable, IterableSource } from '@/sync/BroadcastIterable.js'; +import { BroadcastIterable, IterableSource } from '@/streams/BroadcastIterable.js'; import { AsyncIterableX, interval } from 'ix/asynciterable/index.js'; import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; import { take } from 'ix/asynciterable/operators/take.js'; diff --git a/packages/service-core/test/src/merge_iterable.test.ts b/packages/service-core/test/src/merge_iterable.test.ts index bd512376..237c6e8e 100644 --- a/packages/service-core/test/src/merge_iterable.test.ts +++ b/packages/service-core/test/src/merge_iterable.test.ts @@ -1,4 +1,4 @@ -import { mergeAsyncIterablesNew, mergeAsyncIterablesOld } from '@/sync/merge.js'; +import { mergeAsyncIterablesNew, mergeAsyncIterablesOld } from '@/streams/merge.js'; import * as timers from 'timers/promises'; import { describe, expect, test } from 'vitest'; From 85467bfd92275b465672fcee2def6a31f581d975 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 17 Mar 2025 10:52:50 +0200 Subject: [PATCH 07/18] Use LastValueSink for Demultiplexer. --- .../src/storage/PostgresSyncRulesStorage.ts | 4 ++-- .../src/streams/BroadcastIterable.ts | 4 ++-- .../service-core/src/streams/Demultiplexer.ts | 17 ++++++++--------- .../service-core/src/streams/LastValueSink.ts | 4 ++-- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 03ea1d1f..63fe01a9 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -815,12 +815,12 @@ export class PostgresSyncRulesStorage const sink = new LastValueSink(undefined); const disposeListener = this.db.registerListener({ - notification: (notification) => sink.next(notification.payload) + notification: (notification) => sink.write(notification.payload) }); signal.addEventListener('aborted', async () => { disposeListener(); - sink.complete(); + sink.end(); }); yield this.makeActiveCheckpoint(doc); diff --git a/packages/service-core/src/streams/BroadcastIterable.ts b/packages/service-core/src/streams/BroadcastIterable.ts index e8d6e290..2922c709 100644 --- a/packages/service-core/src/streams/BroadcastIterable.ts +++ b/packages/service-core/src/streams/BroadcastIterable.ts @@ -52,13 +52,13 @@ export class BroadcastIterable implements AsyncIterable { } this.last = doc; for (let sink of sinks) { - sink.next(doc); + sink.write(doc); } } // End of stream for (let sink of sinks) { - sink.complete(); + sink.end(); } } catch (e) { // Just in case the error is not from the source diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts index 6ec6c769..92c13b73 100644 --- a/packages/service-core/src/streams/Demultiplexer.ts +++ b/packages/service-core/src/streams/Demultiplexer.ts @@ -1,6 +1,6 @@ import { AbortError } from 'ix/aborterror.js'; -import { AsyncSink } from 'ix/asynciterable/asynciterablex.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; +import { LastValueSink } from './LastValueSink.js'; export interface DemultiplexerValue { /** @@ -28,17 +28,16 @@ export type DemultiplexerSourceFactory = (signal: AbortSignal) => Demultiplex * 1. We only start subscribing when there is a downstream subscriber. * 2. When all downstream subscriptions have ended, we end the source subscription. * - * The Demultiplexer does not handle backpressure. If subscribers are slow, a queue may build up - * for each. + * For each subscriber, if backpressure builds up, we only keep the _last_ value. */ export class Demultiplexer { - private subscribers: Map>> | undefined = undefined; + private subscribers: Map>> | undefined = undefined; private abortController: AbortController | undefined = undefined; private currentSource: DemultiplexerSource | undefined = undefined; constructor(private source: DemultiplexerSourceFactory) {} - private start(filter: string, sink: AsyncSink) { + private start(filter: string, sink: LastValueSink) { const abortController = new AbortController(); const listeners = new Map(); listeners.set(filter, new Set([sink])); @@ -55,7 +54,7 @@ export class Demultiplexer { private async loop( source: DemultiplexerSource, abortController: AbortController, - sinks: Map>> + sinks: Map>> ) { try { for await (let doc of source.iterator) { @@ -98,7 +97,7 @@ export class Demultiplexer { } } - private removeSink(key: string, sink: AsyncSink) { + private removeSink(key: string, sink: LastValueSink) { const existing = this.subscribers?.get(key); if (existing == null) { return; @@ -118,7 +117,7 @@ export class Demultiplexer { } } - private addSink(key: string, sink: AsyncSink) { + private addSink(key: string, sink: LastValueSink) { if (this.currentSource == null) { return this.start(key, sink); } else { @@ -139,7 +138,7 @@ export class Demultiplexer { * @param signal */ async *subscribe(key: string, signal: AbortSignal): AsyncIterable { - const sink = new AsyncSink(); + const sink = new LastValueSink(undefined); // Important that we register the sink before calling getFirstValue(). const source = this.addSink(key, sink); try { diff --git a/packages/service-core/src/streams/LastValueSink.ts b/packages/service-core/src/streams/LastValueSink.ts index 68a164f9..23ef7dfa 100644 --- a/packages/service-core/src/streams/LastValueSink.ts +++ b/packages/service-core/src/streams/LastValueSink.ts @@ -17,7 +17,7 @@ export class LastValueSink implements AsyncIterable { } } - next(value: T) { + write(value: T) { this.push({ value, done: false, @@ -25,7 +25,7 @@ export class LastValueSink implements AsyncIterable { }); } - complete() { + end() { this.push({ value: undefined, done: true, From 1418c5029f80b08051a97613ed14f0007cd5f431 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 17 Mar 2025 12:07:51 +0200 Subject: [PATCH 08/18] Add tests; fix abort issue. --- .../service-core/src/streams/Demultiplexer.ts | 11 +- .../test/src/broadcast_iterable.test.ts | 14 +- .../test/src/demultiplexer.test.ts | 205 ++++++++++++++++++ 3 files changed, 222 insertions(+), 8 deletions(-) create mode 100644 packages/service-core/test/src/demultiplexer.test.ts diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts index 92c13b73..86ebcc40 100644 --- a/packages/service-core/src/streams/Demultiplexer.ts +++ b/packages/service-core/src/streams/Demultiplexer.ts @@ -14,7 +14,16 @@ export interface DemultiplexerValue { } export interface DemultiplexerSource { + /** + * The async iterator providing a stream of values. + */ iterator: AsyncIterable>; + + /** + * Fetches the first value for a given key. + * + * This is used to get an initial value for each subscription. + */ getFirstValue(key: string): Promise; } @@ -144,7 +153,7 @@ export class Demultiplexer { try { const firstValue = await source.getFirstValue(key); yield firstValue; - yield* wrapWithAbort(sink, signal); + yield* sink.withSignal(signal); } finally { this.removeSink(key, sink); } diff --git a/packages/service-core/test/src/broadcast_iterable.test.ts b/packages/service-core/test/src/broadcast_iterable.test.ts index aa062e5a..02696313 100644 --- a/packages/service-core/test/src/broadcast_iterable.test.ts +++ b/packages/service-core/test/src/broadcast_iterable.test.ts @@ -5,10 +5,10 @@ import { take } from 'ix/asynciterable/operators/take.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import { toArray } from 'ix/asynciterable/toarray.js'; import * as timers from 'timers/promises'; -import { describe, expect, test } from 'vitest'; +import { describe, expect, it } from 'vitest'; describe('BroadcastIterable', () => { - test('should iterate', async () => { + it('should iterate', async () => { const range = AsyncIterableX.from([1, 2, 3]); const broadcast = new BroadcastIterable(() => range); @@ -17,7 +17,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should skip values if sink is slow', async () => { + it('should skip values if sink is slow', async () => { const range = AsyncIterableX.from([1, 2, 3]); const broadcast = new BroadcastIterable(() => range); @@ -30,7 +30,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should abort', async () => { + it('should abort', async () => { const range = AsyncIterableX.from([1, 2, 3]); let recordedSignal: AbortSignal | undefined; const broadcast = new BroadcastIterable((signal) => { @@ -46,7 +46,7 @@ describe('BroadcastIterable', () => { expect(recordedSignal!.aborted).toEqual(true); }); - test('should handle indefinite sources', async () => { + it('should handle indefinite sources', async () => { const source: IterableSource = (signal) => { return wrapWithAbort(interval(1), signal); }; @@ -65,7 +65,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should handle multiple subscribers', async () => { + it('should handle multiple subscribers', async () => { let sourceIndex = 0; const source = async function* (signal: AbortSignal) { // Test value out by 1000 means it may have used the wrong iteration of the source @@ -111,7 +111,7 @@ describe('BroadcastIterable', () => { expect(results3[4]).toBeLessThan(2145); }); - test('should handle errors on multiple subscribers', async () => { + it('should handle errors on multiple subscribers', async () => { let sourceIndex = 0; const source = async function* (signal: AbortSignal) { // Test value out by 1000 means it may have used the wrong iteration of the source diff --git a/packages/service-core/test/src/demultiplexer.test.ts b/packages/service-core/test/src/demultiplexer.test.ts new file mode 100644 index 00000000..69ba9c87 --- /dev/null +++ b/packages/service-core/test/src/demultiplexer.test.ts @@ -0,0 +1,205 @@ +// Vitest Unit Tests +import { Demultiplexer, DemultiplexerSource, DemultiplexerSourceFactory, DemultiplexerValue } from '@/index.js'; +import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; +import { take } from 'ix/asynciterable/operators/take.js'; +import { toArray } from 'ix/asynciterable/toarray.js'; +import * as timers from 'node:timers/promises'; +import { describe, expect, it } from 'vitest'; + +describe('Demultiplexer', () => { + it('should start subscription lazily and provide first value', async () => { + const mockSource: DemultiplexerSourceFactory = (signal: AbortSignal) => { + const iterator = (async function* (): AsyncIterable> {})(); + return { + iterator, + getFirstValue: async (key: string) => `first-${key}` + }; + }; + + const demux = new Demultiplexer(mockSource); + const signal = new AbortController().signal; + + const iter = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const result = await iter.next(); + expect(result.value).toBe('first-user1'); + }); + + it('should handle multiple subscribers to the same key', async () => { + const iter = (async function* () { + yield { key: 'user1', value: 'value1' }; + yield { key: 'user1', value: 'value2' }; + })(); + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skipped + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should handle multiple subscribers to the same key (2)', async () => { + const p1 = Promise.withResolvers(); + const p2 = Promise.withResolvers(); + const p3 = Promise.withResolvers(); + + const iter = (async function* () { + await p1.promise; + yield { key: 'user1', value: 'value1' }; + await p2.promise; + yield { key: 'user1', value: 'value2' }; + await p3.promise; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skilled + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); + p1.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + expect(await iter2.next()).toEqual({ value: 'value1', done: false }); + p2.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value2', done: false }); + p3.resolve(); + + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should handle multiple subscribers to different keys', async () => { + const p1 = Promise.withResolvers(); + const p2 = Promise.withResolvers(); + const p3 = Promise.withResolvers(); + + const iter = (async function* () { + await p1.promise; + yield { key: 'user1', value: 'value1' }; + await p2.promise; + yield { key: 'user2', value: 'value2' }; + await p3.promise; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user2', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skilled + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: 'first-user2', done: false }); + p1.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + p2.resolve(); + + expect(await iter2.next()).toEqual({ value: 'value2', done: false }); + p3.resolve(); + + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should abort', async () => { + const iter = (async function* () { + yield { key: 'user1', value: 'value1' }; + yield { key: 'user1', value: 'value2' }; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const controller = new AbortController(); + + const iter1 = demux.subscribe('user1', controller.signal)[Symbol.asyncIterator](); + + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + controller.abort(); + + await expect(iter1.next()).rejects.toThrow('The operation has been aborted'); + }); + + it('should handle errors on multiple subscribers', async () => { + let sourceIndex = 0; + const sourceFn = async function* (signal: AbortSignal): AsyncIterable> { + // Test value out by 1000 means it may have used the wrong iteration of the source + const base = (sourceIndex += 1000); + const abortedPromise = new Promise((resolve) => { + signal.addEventListener('abort', resolve, { once: true }); + }); + for (let i = 0; !signal.aborted; i++) { + if (base + i == 1005) { + throw new Error('simulated failure'); + } + yield { key: 'u1', value: base + i }; + await Promise.race([abortedPromise, timers.setTimeout(1)]); + } + // Test value out by 100 means this wasn't reached + sourceIndex += 100; + }; + + const sourceFactory: DemultiplexerSourceFactory = (signal) => { + const source: DemultiplexerSource = { + iterator: sourceFn(signal), + getFirstValue: async (key: string) => -1 + }; + return source; + }; + const demux = new Demultiplexer(sourceFactory); + + const controller = new AbortController(); + + const delayed1 = delayEach(9)(demux.subscribe('u1', controller.signal)); + const delayed2 = delayEach(10)(demux.subscribe('u1', controller.signal)); + expect(demux.active).toBe(false); + const results1Promise = toArray(take(5)(delayed1)) as Promise; + const results2Promise = toArray(take(5)(delayed2)) as Promise; + + const [r1, r2] = await Promise.allSettled([results1Promise, results2Promise]); + + expect(r1).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); + expect(r2).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); + + expect(demux.active).toBe(false); + + // This starts a new source + const delayed3 = delayEach(10)(demux.subscribe('u1', controller.signal)); + const results3 = await toArray(take(6)(delayed3)); + expect(results3.length).toEqual(6); + expect(results3[0]).toEqual(-1); // Initial value + // There should be approximately 10ms between each value, but we allow for some slack + expect(results3[5]).toBeGreaterThan(2005); + expect(results3[5]).toBeLessThan(2200); + }); +}); From de6c1ac7621e8d28f659614af99dc6f12772ed26 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 17 Mar 2025 14:27:41 +0200 Subject: [PATCH 09/18] Fix import. --- .../storage/implementation/MongoWriteCheckpointAPI.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index cad45e34..c261719c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,9 +1,14 @@ import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { storage, WatchUserWriteCheckpointOptions, WriteCheckpointResult } from '@powersync/service-core'; +import { + Demultiplexer, + DemultiplexerValue, + storage, + WatchUserWriteCheckpointOptions, + WriteCheckpointResult +} from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; -import { Demultiplexer, DemultiplexerValue } from '@powersync/service-core/src/streams/Demultiplexer.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; From 291be9b6c651fae9ea050efece3dde8031f1df9b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 10:05:12 +0200 Subject: [PATCH 10/18] Error when sync rules are not active. --- .../src/storage/implementation/MongoSyncBucketStorage.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index b40a0147..b607ad57 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -705,8 +705,7 @@ export class MongoSyncBucketStorage if (doc == null) { // Sync rules not present or not active. // Abort the connections - clients will have to retry later. - // Should this error instead? - return; + throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); } yield this.makeActiveCheckpoint(doc); @@ -750,7 +749,7 @@ export class MongoSyncBucketStorage } if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. - // Should this error instead? + // We do a soft close of the stream here - no error break; } From 6865479f2c6230544c9d5b5e0532944c0306d954 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 10:07:50 +0200 Subject: [PATCH 11/18] Add write checkpoint tests. --- .../src/tests/register-data-storage-tests.ts | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index bdb8b851..0aad307f 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1477,4 +1477,104 @@ bucket_definitions: expect(parsedSchema3).not.equals(parsedSchema2); expect(parsedSchema3.getSourceTables()[0].schema).equals('databasename'); }); + + test('managed write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '5/0' }, + user_id: 'user1' + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); + + test('managed write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '6/0' }, + user_id: 'user1' + }); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't need this. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('6/0'); + }); + + const result2 = await iter.next(); + expect(result2).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '6/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); } From d29984a43d5267f98f5b2672e577a9a715e55ccf Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 10:13:39 +0200 Subject: [PATCH 12/18] Remove createCustomWriteCheckpoint in favor of batchCreateCustomWriteCheckpoints. --- .../implementation/MongoWriteCheckpointAPI.ts | 23 --------------- .../src/storage/PostgresSyncRulesStorage.ts | 7 ----- .../checkpoints/PostgresWriteCheckpointAPI.ts | 28 ------------------- .../src/storage/WriteCheckpointAPI.ts | 2 -- 4 files changed, 60 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index c261719c..addcbd43 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -36,29 +36,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } - async createCustomWriteCheckpoint(options: storage.CustomWriteCheckpointOptions): Promise { - if (this.writeCheckpointMode !== storage.WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` - ); - } - - const { checkpoint, user_id, sync_rules_id } = options; - const doc = await this.db.custom_write_checkpoints.findOneAndUpdate( - { - user_id: user_id, - sync_rules_id - }, - { - $set: { - checkpoint - } - }, - { upsert: true, returnDocument: 'after' } - ); - return doc!.checkpoint; - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 63fe01a9..7aa1bd73 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -113,13 +113,6 @@ export class PostgresSyncRulesStorage ); } - createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint({ - ...checkpoint, - sync_rules_id: this.group_id - }); - } - lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { return this.writeCheckpointAPI.lastWriteCheckpoint({ ...filters, diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index d04e9412..e8042e14 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -30,34 +30,6 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } - async createCustomWriteCheckpoint(options: storage.CustomWriteCheckpointOptions): Promise { - if (this.writeCheckpointMode !== storage.WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` - ); - } - - const { checkpoint, user_id, sync_rules_id } = options; - const row = await this.db.sql` - INSERT INTO - custom_write_checkpoints (user_id, write_checkpoint, sync_rules_id) - VALUES - ( - ${{ type: 'varchar', value: user_id }}, - ${{ type: 'int8', value: checkpoint }}, - ${{ type: 'int4', value: sync_rules_id }} - ) - ON CONFLICT DO UPDATE - SET - write_checkpoint = EXCLUDED.write_checkpoint - RETURNING - *; - ` - .decoded(models.CustomWriteCheckpoint) - .first(); - return row!.write_checkpoint; - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index 31126818..b9d14ce9 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -94,7 +94,6 @@ export interface BaseWriteCheckpointAPI { */ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise; - createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise; lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise; } @@ -104,7 +103,6 @@ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { */ export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; - createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable; From 1a59edc9c9a0d05c9dc83b8ff9b14058e7deb9f3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 10:41:33 +0200 Subject: [PATCH 13/18] Tests and fixes for custom write checkpoints. --- .../implementation/MongoSyncBucketStorage.ts | 7 - .../implementation/MongoWriteCheckpointAPI.ts | 7 +- .../src/storage/implementation/db.ts | 1 + .../checkpoints/PostgresWriteCheckpointAPI.ts | 17 ++- .../src/tests/register-data-storage-tests.ts | 133 +++++++++++++++++- 5 files changed, 153 insertions(+), 12 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index b607ad57..7c85d48c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -87,13 +87,6 @@ export class MongoSyncBucketStorage ); } - createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint({ - ...checkpoint, - sync_rules_id: this.group_id - }); - } - createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index addcbd43..5122727b 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -278,6 +278,11 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { lsn: null }; lastId = doc.checkpoint; + } else { + yield { + id: null, + lsn: null + }; } for await (let event of stream) { @@ -324,7 +329,7 @@ export async function batchCreateCustomWriteCheckpoints( db: PowerSyncMongo, checkpoints: storage.CustomWriteCheckpointOptions[] ): Promise { - if (!checkpoints.length) { + if (checkpoints.length == 0) { return; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 56d6d9da..00ccd544 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -74,6 +74,7 @@ export class PowerSyncMongo { await this.instance.deleteOne({}); await this.locks.deleteMany({}); await this.bucket_state.deleteMany({}); + await this.custom_write_checkpoints.deleteMany({}); } /** diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index e8042e14..913894f2 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -1,7 +1,7 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as framework from '@powersync/lib-services-framework'; -import { storage } from '@powersync/service-core'; -import { JSONBig } from '@powersync/service-jsonbig'; +import { storage, sync } from '@powersync/service-core'; +import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { models } from '../../types/types.js'; export type PostgresCheckpointAPIOptions = { @@ -129,11 +129,22 @@ export async function batchCreateCustomWriteCheckpoints( return; } + // Needs to be encoded using plain JSON.stringify + const mappedCheckpoints = checkpoints.map((cp) => { + return { + user_id: cp.user_id, + // Cannot encode bigint directly using JSON.stringify. + // The ::int8 in the query below will take care of casting back to a number + checkpoint: String(cp.checkpoint), + sync_rules_id: cp.sync_rules_id + }; + }); + await db.sql` WITH json_data AS ( SELECT - jsonb_array_elements(${{ type: 'jsonb', value: JSONBig.stringify(checkpoints) }}) AS + jsonb_array_elements(${{ type: 'jsonb', value: mappedCheckpoints }}) AS CHECKPOINT ) INSERT INTO diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 0aad307f..42bf7ffa 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1560,7 +1560,7 @@ bucket_definitions: }); // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. // This is what is effetively triggered with RouteAPI.createReplicationHead(). - // MongoDB storage doesn't need this. + // MongoDB storage doesn't explicitly need this anymore. await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.keepalive('6/0'); }); @@ -1577,4 +1577,135 @@ bucket_definitions: } }); }); + + test('custom write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 5n, + user_id: 'user1' + } + ]); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: 5n + } + }); + }); + + test('custom write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 6n, + user_id: 'user1' + } + ]); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't explicitly need this anymore. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('6/0'); + }); + + const result2 = await iter.next(); + expect(result2).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '6/0' + }, + writeCheckpoint: 6n + } + }); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 7n, + user_id: 'user1' + } + ]); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't explicitly need this anymore. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('7/0'); + }); + + const result3 = await iter.next(); + expect(result3).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '7/0' + }, + writeCheckpoint: 7n + } + }); + }); } From a0b25ca1c21edc755319ad47de8a770be66cd2d2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 10:55:18 +0200 Subject: [PATCH 14/18] Use a shared iterator for custom write checkpoints. --- .../implementation/MongoSyncBucketStorage.ts | 3 +- .../implementation/MongoWriteCheckpointAPI.ts | 178 ++++++++++-------- 2 files changed, 106 insertions(+), 75 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 7c85d48c..c2679fc6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -69,7 +69,8 @@ export class MongoSyncBucketStorage this.db = factory.db; this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ db: this.db, - mode: writeCheckpointMode + mode: writeCheckpointMode, + sync_rules_id: group_id }); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 5122727b..785898cf 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -13,15 +13,18 @@ import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; mode: storage.WriteCheckpointMode; + sync_rules_id: number; }; export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { readonly db: PowerSyncMongo; private _mode: storage.WriteCheckpointMode; + private sync_rules_id: number; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; this._mode = options.mode; + this.sync_rules_id = options.sync_rules_id; } get writeCheckpointMode() { @@ -89,14 +92,8 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - private sharedIter = new Demultiplexer((signal) => { - const clusterTimePromise = (async () => { - const hello = await this.db.db.command({ hello: 1 }); - // Note: This is not valid on sharded clusters. - const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; - startClusterTime; - return startClusterTime; - })(); + private sharedManagedIter = new Demultiplexer((signal) => { + const clusterTimePromise = this.getClusterTime(); return { iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal), @@ -170,13 +167,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } ); - const hello = await this.db.db.command({ hello: 1 }); - // Note: This is not valid on sharded clusters. - const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; - if (startClusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime'); - } - signal.onabort = () => { stream.close(); }; @@ -202,55 +192,75 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } - async *watchManagedWriteCheckpoint( - options: WatchUserWriteCheckpointOptions - ): AsyncIterable { - const stream = this.sharedIter.subscribe(options.user_id, options.signal); + watchManagedWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + const stream = this.sharedManagedIter.subscribe(options.user_id, options.signal); + return this.orderedStream(stream); + } - let lastId = -1n; + private sharedCustomIter = new Demultiplexer((signal) => { + const clusterTimePromise = this.getClusterTime(); - for await (let doc of stream) { - // Guard against out-of-order events - if (lastId == -1n || (doc.id != null && doc.id > lastId)) { - yield doc; - if (doc.id != null) { - lastId = doc.id; - } - } - } - } + return { + iterator: this.watchAllCustomWriteCheckpoints(clusterTimePromise, signal), + getFirstValue: async (user_id: string) => { + // We cater for the same potential race conditions as for managed write checkpoints. - async *watchCustomWriteCheckpoint( - options: WatchUserWriteCheckpointOptions - ): AsyncIterable { - const { user_id, sync_rules_id, signal } = options; + const changeStreamStart = await clusterTimePromise; - let doc = null as CustomWriteCheckpointDocument | null; - let clusterTime = null as mongo.Timestamp | null; + let doc = null as CustomWriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; - await this.db.client.withSession(async (session) => { - doc = await this.db.custom_write_checkpoints.findOne( - { - user_id: user_id, - sync_rules_id: sync_rules_id - }, - { - session + await this.db.client.withSession(async (session) => { + doc = await this.db.custom_write_checkpoints.findOne( + { + user_id: user_id, + sync_rules_id: this.sync_rules_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); + } + + if (clusterTime.lessThan(changeStreamStart)) { + throw new framework.ServiceAssertionError( + 'clusterTime for write checkpoint is older than changestream start' + ); + } + + if (doc == null) { + // No write checkpoint, but we still need to return a result + return { + id: null, + lsn: null + }; } - ); - const time = session.clusterTime?.clusterTime ?? null; - clusterTime = time; - }); - if (clusterTime == null) { - throw new framework.ServiceAssertionError('Could not get clusterTime'); - } + + return { + id: doc.checkpoint, + // custom write checkpoints are not tied to a LSN + lsn: null + }; + } + }; + }); + + private async *watchAllCustomWriteCheckpoints( + clusterTimePromise: Promise, + signal: AbortSignal + ): AsyncGenerator> { + const clusterTime = await clusterTimePromise; const stream = this.db.custom_write_checkpoints.watch( [ { $match: { - 'fullDocument.user_id': user_id, - 'fullDocument.sync_rules_id': sync_rules_id, + 'fullDocument.sync_rules_id': this.sync_rules_id, operationType: { $in: ['insert', 'update', 'replace'] } } } @@ -270,34 +280,30 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return; } - let lastId = -1n; - - if (doc != null) { - yield { - id: doc.checkpoint, - lsn: null - }; - lastId = doc.checkpoint; - } else { - yield { - id: null, - lsn: null - }; - } - for await (let event of stream) { if (!('fullDocument' in event) || event.fullDocument == null) { continue; } - // Guard against out-of-order events - if (event.fullDocument.checkpoint > lastId) { - yield { + + const user_id = event.fullDocument.user_id; + yield { + key: user_id, + value: { id: event.fullDocument.checkpoint, + // Custom write checkpoints are not tied to a specific LSN lsn: null - }; - lastId = event.fullDocument.checkpoint; - } + } + }; + } + } + + watchCustomWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + if (options.sync_rules_id != this.sync_rules_id) { + throw new framework.ServiceAssertionError('sync_rules_id does not match'); } + + const stream = this.sharedCustomIter.subscribe(options.user_id, options.signal); + return this.orderedStream(stream); } protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) { @@ -323,6 +329,30 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { }); return lastWriteCheckpoint?.client_id ?? null; } + + private async getClusterTime(): Promise { + const hello = await this.db.db.command({ hello: 1 }); + // Note: This is not valid on sharded clusters. + const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + return startClusterTime; + } + + /** + * Makes a write checkpoint stream an orderered one - any out-of-order events are discarded. + */ + private async *orderedStream(stream: AsyncIterable) { + let lastId = -1n; + + for await (let event of stream) { + // Guard against out-of-order events + if (lastId == -1n || (event.id != null && event.id > lastId)) { + yield event; + if (event.id != null) { + lastId = event.id; + } + } + } + } } export async function batchCreateCustomWriteCheckpoints( From e4d61d4ec4fac345518e75a50bdafc9d15ca9a1f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 11:03:54 +0200 Subject: [PATCH 15/18] Workaround to make tests more stable. --- .../src/tests/register-data-storage-tests.ts | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 42bf7ffa..7fbf0a36 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1565,7 +1565,11 @@ bucket_definitions: await batch.keepalive('6/0'); }); - const result2 = await iter.next(); + let result2 = await iter.next(); + if (result2.value?.base?.lsn == '5/0') { + // Events could arrive in a different order in some cases - this caters for it + result2 = await iter.next(); + } expect(result2).toMatchObject({ done: false, value: { @@ -1671,7 +1675,11 @@ bucket_definitions: await batch.keepalive('6/0'); }); - const result2 = await iter.next(); + let result2 = await iter.next(); + if (result2.value?.base?.lsn == '5/0') { + // Events could arrive in a different order in some cases - this caters for it + result2 = await iter.next(); + } expect(result2).toMatchObject({ done: false, value: { @@ -1689,14 +1697,15 @@ bucket_definitions: user_id: 'user1' } ]); - // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. - // This is what is effetively triggered with RouteAPI.createReplicationHead(). - // MongoDB storage doesn't explicitly need this anymore. await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.keepalive('7/0'); }); - const result3 = await iter.next(); + let result3 = await iter.next(); + if (result3.value?.base?.lsn == '6/0') { + // Events could arrive in a different order in some cases - this caters for it + result3 = await iter.next(); + } expect(result3).toMatchObject({ done: false, value: { From 053db0e88700bcc2029b6ab53bd214aa1f9028b2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 12:07:16 +0200 Subject: [PATCH 16/18] Actual fix for the tests. --- .../src/tests/register-data-storage-tests.ts | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 7fbf0a36..21ab8986 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1676,16 +1676,13 @@ bucket_definitions: }); let result2 = await iter.next(); - if (result2.value?.base?.lsn == '5/0') { - // Events could arrive in a different order in some cases - this caters for it - result2 = await iter.next(); - } expect(result2).toMatchObject({ done: false, value: { base: { - checkpoint: 0n, - lsn: '6/0' + checkpoint: 0n + // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints + // lsn: '6/0' }, writeCheckpoint: 6n } @@ -1702,16 +1699,13 @@ bucket_definitions: }); let result3 = await iter.next(); - if (result3.value?.base?.lsn == '6/0') { - // Events could arrive in a different order in some cases - this caters for it - result3 = await iter.next(); - } expect(result3).toMatchObject({ done: false, value: { base: { - checkpoint: 0n, - lsn: '7/0' + checkpoint: 0n + // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints + // lsn: '7/0' }, writeCheckpoint: 7n } From 30cd5a8156bab5e3e2037a251fe57f7f5be69202 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 12:12:58 +0200 Subject: [PATCH 17/18] Changesets. --- .changeset/plenty-jokes-admire.md | 7 +++++++ .changeset/short-experts-fetch.md | 9 +++++++++ 2 files changed, 16 insertions(+) create mode 100644 .changeset/plenty-jokes-admire.md create mode 100644 .changeset/short-experts-fetch.md diff --git a/.changeset/plenty-jokes-admire.md b/.changeset/plenty-jokes-admire.md new file mode 100644 index 00000000..4d7bda01 --- /dev/null +++ b/.changeset/plenty-jokes-admire.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[Postgres Storage] Fix issue when creating custom write checkpoints diff --git a/.changeset/short-experts-fetch.md b/.changeset/short-experts-fetch.md new file mode 100644 index 00000000..73e8f727 --- /dev/null +++ b/.changeset/short-experts-fetch.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-core': minor +'@powersync/service-image': minor +--- + +[MongoDB Storage] Stream write checkpoint changes intead of polling, reducing overhead for large numbers of concurrent connections From 92a856fdb1ebc00330f6b678a3fa7a71652effe0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 20 Mar 2025 12:18:24 +0200 Subject: [PATCH 18/18] Fix typo. --- .changeset/short-experts-fetch.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/short-experts-fetch.md b/.changeset/short-experts-fetch.md index 73e8f727..f62deb69 100644 --- a/.changeset/short-experts-fetch.md +++ b/.changeset/short-experts-fetch.md @@ -6,4 +6,4 @@ '@powersync/service-image': minor --- -[MongoDB Storage] Stream write checkpoint changes intead of polling, reducing overhead for large numbers of concurrent connections +[MongoDB Storage] Stream write checkpoint changes instead of polling, reducing overhead for large numbers of concurrent connections