diff --git a/.changeset/plenty-jokes-admire.md b/.changeset/plenty-jokes-admire.md new file mode 100644 index 00000000..4d7bda01 --- /dev/null +++ b/.changeset/plenty-jokes-admire.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[Postgres Storage] Fix issue when creating custom write checkpoints diff --git a/.changeset/short-experts-fetch.md b/.changeset/short-experts-fetch.md new file mode 100644 index 00000000..f62deb69 --- /dev/null +++ b/.changeset/short-experts-fetch.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-core': minor +'@powersync/service-image': minor +--- + +[MongoDB Storage] Stream write checkpoint changes instead of polling, reducing overhead for large numbers of concurrent connections diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 57fc44ec..c2679fc6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -9,21 +9,23 @@ import { } from '@powersync/lib-services-framework'; import { BroadcastIterable, + CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, + deserializeParameterLookup, GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, + mergeAsyncIterables, ProtocolOpId, ReplicationCheckpoint, storage, utils, WatchWriteCheckpointOptions, - CHECKPOINT_INVALIDATE_ALL, - deserializeParameterLookup + WriteCheckpointResult } from '@powersync/service-core'; -import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules'; +import { JSONBig } from '@powersync/service-jsonbig'; +import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; import * as bson from 'bson'; -import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import { LRUCache } from 'lru-cache'; import * as timers from 'timers/promises'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; @@ -41,7 +43,6 @@ import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js'; -import { JSONBig } from '@powersync/service-jsonbig'; export class MongoSyncBucketStorage extends BaseObserver @@ -68,7 +69,8 @@ export class MongoSyncBucketStorage this.db = factory.db; this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ db: this.db, - mode: writeCheckpointMode + mode: writeCheckpointMode, + sync_rules_id: group_id }); } @@ -86,13 +88,6 @@ export class MongoSyncBucketStorage ); } - createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint({ - ...checkpoint, - sync_rules_id: this.group_id - }); - } - createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); } @@ -704,8 +699,7 @@ export class MongoSyncBucketStorage if (doc == null) { // Sync rules not present or not active. // Abort the connections - clients will have to retry later. - // Should this error instead? - return; + throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); } yield this.makeActiveCheckpoint(doc); @@ -749,7 +743,7 @@ export class MongoSyncBucketStorage } if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { // Sync rules have changed - abort and restart. - // Should this error instead? + // We do a soft close of the stream here - no error break; } @@ -772,28 +766,60 @@ export class MongoSyncBucketStorage /** * User-specific watch on the latest checkpoint and/or write checkpoint. */ - async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { - const { user_id, signal } = options; + async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { + const { signal } = options; let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; + let lastWriteCheckpointDoc: WriteCheckpointResult | null = null; + let nextWriteCheckpoint: bigint | null = null; + let lastCheckpointEvent: ReplicationCheckpoint | null = null; + let receivedWriteCheckpoint = false; + + const writeCheckpointIter = this.writeCheckpointAPI.watchUserWriteCheckpoint({ + user_id: options.user_id, + signal, + sync_rules_id: this.group_id + }); + const iter = mergeAsyncIterables( + [this.sharedIter, writeCheckpointIter], + signal + ); - const iter = wrapWithAbort(this.sharedIter, signal); for await (const event of iter) { - const { checkpoint, lsn } = event; + if ('checkpoint' in event) { + lastCheckpointEvent = event; + } else { + lastWriteCheckpointDoc = event; + receivedWriteCheckpoint = true; + } + + if (lastCheckpointEvent == null || !receivedWriteCheckpoint) { + // We need to wait until we received at least on checkpoint, and one write checkpoint. + continue; + } // lsn changes are not important by itself. // What is important is: // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - const lsnFilters: Record = lsn ? { 1: lsn } : {}; + const lsn = lastCheckpointEvent?.lsn; - const currentWriteCheckpoint = await this.lastWriteCheckpoint({ - user_id, - heads: { - ...lsnFilters + if ( + lastWriteCheckpointDoc != null && + (lastWriteCheckpointDoc.lsn == null || (lsn != null && lsn >= lastWriteCheckpointDoc.lsn)) + ) { + const writeCheckpoint = lastWriteCheckpointDoc.id; + if (nextWriteCheckpoint == null || (writeCheckpoint != null && writeCheckpoint > nextWriteCheckpoint)) { + nextWriteCheckpoint = writeCheckpoint; } - }); + // We used the doc - clear it + lastWriteCheckpointDoc = null; + } + + const { checkpoint } = lastCheckpointEvent; + + const currentWriteCheckpoint = nextWriteCheckpoint; if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) { // No change - wait for next one @@ -815,7 +841,7 @@ export class MongoSyncBucketStorage lastCheckpoint = checkpoint; yield { - base: event, + base: lastCheckpointEvent, writeCheckpoint: currentWriteCheckpoint, update: updates }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index fe684837..785898cf 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -1,19 +1,30 @@ +import { mongo } from '@powersync/lib-service-mongodb'; import * as framework from '@powersync/lib-services-framework'; -import { storage } from '@powersync/service-core'; +import { + Demultiplexer, + DemultiplexerValue, + storage, + WatchUserWriteCheckpointOptions, + WriteCheckpointResult +} from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; +import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models.js'; export type MongoCheckpointAPIOptions = { db: PowerSyncMongo; mode: storage.WriteCheckpointMode; + sync_rules_id: number; }; export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { readonly db: PowerSyncMongo; private _mode: storage.WriteCheckpointMode; + private sync_rules_id: number; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; this._mode = options.mode; + this.sync_rules_id = options.sync_rules_id; } get writeCheckpointMode() { @@ -28,29 +39,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } - async createCustomWriteCheckpoint(options: storage.CustomWriteCheckpointOptions): Promise { - if (this.writeCheckpointMode !== storage.WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` - ); - } - - const { checkpoint, user_id, sync_rules_id } = options; - const doc = await this.db.custom_write_checkpoints.findOneAndUpdate( - { - user_id: user_id, - sync_rules_id - }, - { - $set: { - checkpoint - } - }, - { upsert: true, returnDocument: 'after' } - ); - return doc!.checkpoint; - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( @@ -93,6 +81,231 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { } } + watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + switch (this.writeCheckpointMode) { + case storage.WriteCheckpointMode.CUSTOM: + return this.watchCustomWriteCheckpoint(options); + case storage.WriteCheckpointMode.MANAGED: + return this.watchManagedWriteCheckpoint(options); + default: + throw new Error('Invalid write checkpoint mode'); + } + } + + private sharedManagedIter = new Demultiplexer((signal) => { + const clusterTimePromise = this.getClusterTime(); + + return { + iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal), + getFirstValue: async (user_id: string) => { + // Potential race conditions we cater for: + + // Case 1: changestream is behind. + // We get a doc now, then the same or older doc again later. + // No problem! + + // Case 2: Query is behind. I.e. doc has been created, and emitted on the changestream, but the query doesn't see it yet. + // Not possible luckily, but can we make sure? + + // Case 3: changestream delays openeing. A doc is created after our query here, but before the changestream is opened. + // Awaiting clusterTimePromise should be sufficient here, but as a sanity check we also confirm that our query + // timestamp is > the startClusterTime. + + const changeStreamStart = await clusterTimePromise; + + let doc = null as WriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; + + await this.db.client.withSession(async (session) => { + doc = await this.db.write_checkpoints.findOne( + { + user_id: user_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); + } + + if (clusterTime.lessThan(changeStreamStart)) { + throw new framework.ServiceAssertionError( + 'clusterTime for write checkpoint is older than changestream start' + ); + } + + if (doc == null) { + return { + id: null, + lsn: null + }; + } + + return { + id: doc.client_id, + lsn: doc.lsns['1'] + }; + } + }; + }); + + private async *watchAllManagedWriteCheckpoints( + clusterTimePromise: Promise, + signal: AbortSignal + ): AsyncGenerator> { + const clusterTime = await clusterTimePromise; + + const stream = this.db.write_checkpoints.watch( + [{ $match: { operationType: { $in: ['insert', 'update', 'replace'] } } }], + { + fullDocument: 'updateLookup', + startAtOperationTime: clusterTime + } + ); + + signal.onabort = () => { + stream.close(); + }; + + if (signal.aborted) { + stream.close(); + return; + } + + for await (let event of stream) { + if (!('fullDocument' in event) || event.fullDocument == null) { + continue; + } + + const user_id = event.fullDocument.user_id; + yield { + key: user_id, + value: { + id: event.fullDocument.client_id, + lsn: event.fullDocument.lsns['1'] + } + }; + } + } + + watchManagedWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + const stream = this.sharedManagedIter.subscribe(options.user_id, options.signal); + return this.orderedStream(stream); + } + + private sharedCustomIter = new Demultiplexer((signal) => { + const clusterTimePromise = this.getClusterTime(); + + return { + iterator: this.watchAllCustomWriteCheckpoints(clusterTimePromise, signal), + getFirstValue: async (user_id: string) => { + // We cater for the same potential race conditions as for managed write checkpoints. + + const changeStreamStart = await clusterTimePromise; + + let doc = null as CustomWriteCheckpointDocument | null; + let clusterTime = null as mongo.Timestamp | null; + + await this.db.client.withSession(async (session) => { + doc = await this.db.custom_write_checkpoints.findOne( + { + user_id: user_id, + sync_rules_id: this.sync_rules_id + }, + { + session + } + ); + const time = session.clusterTime?.clusterTime ?? null; + clusterTime = time; + }); + if (clusterTime == null) { + throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint'); + } + + if (clusterTime.lessThan(changeStreamStart)) { + throw new framework.ServiceAssertionError( + 'clusterTime for write checkpoint is older than changestream start' + ); + } + + if (doc == null) { + // No write checkpoint, but we still need to return a result + return { + id: null, + lsn: null + }; + } + + return { + id: doc.checkpoint, + // custom write checkpoints are not tied to a LSN + lsn: null + }; + } + }; + }); + + private async *watchAllCustomWriteCheckpoints( + clusterTimePromise: Promise, + signal: AbortSignal + ): AsyncGenerator> { + const clusterTime = await clusterTimePromise; + + const stream = this.db.custom_write_checkpoints.watch( + [ + { + $match: { + 'fullDocument.sync_rules_id': this.sync_rules_id, + operationType: { $in: ['insert', 'update', 'replace'] } + } + } + ], + { + fullDocument: 'updateLookup', + startAtOperationTime: clusterTime + } + ); + + signal.onabort = () => { + stream.close(); + }; + + if (signal.aborted) { + stream.close(); + return; + } + + for await (let event of stream) { + if (!('fullDocument' in event) || event.fullDocument == null) { + continue; + } + + const user_id = event.fullDocument.user_id; + yield { + key: user_id, + value: { + id: event.fullDocument.checkpoint, + // Custom write checkpoints are not tied to a specific LSN + lsn: null + } + }; + } + } + + watchCustomWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable { + if (options.sync_rules_id != this.sync_rules_id) { + throw new framework.ServiceAssertionError('sync_rules_id does not match'); + } + + const stream = this.sharedCustomIter.subscribe(options.user_id, options.signal); + return this.orderedStream(stream); + } + protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) { const { user_id, sync_rules_id } = filters; const lastWriteCheckpoint = await this.db.custom_write_checkpoints.findOne({ @@ -116,13 +329,37 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { }); return lastWriteCheckpoint?.client_id ?? null; } + + private async getClusterTime(): Promise { + const hello = await this.db.db.command({ hello: 1 }); + // Note: This is not valid on sharded clusters. + const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + return startClusterTime; + } + + /** + * Makes a write checkpoint stream an orderered one - any out-of-order events are discarded. + */ + private async *orderedStream(stream: AsyncIterable) { + let lastId = -1n; + + for await (let event of stream) { + // Guard against out-of-order events + if (lastId == -1n || (event.id != null && event.id > lastId)) { + yield event; + if (event.id != null) { + lastId = event.id; + } + } + } + } } export async function batchCreateCustomWriteCheckpoints( db: PowerSyncMongo, checkpoints: storage.CustomWriteCheckpointOptions[] ): Promise { - if (!checkpoints.length) { + if (checkpoints.length == 0) { return; } diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index 56d6d9da..00ccd544 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -74,6 +74,7 @@ export class PowerSyncMongo { await this.instance.deleteOne({}); await this.locks.deleteMany({}); await this.bucket_state.deleteMany({}); + await this.custom_write_checkpoints.deleteMany({}); } /** diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index b7d896f9..7aa1bd73 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -113,13 +113,6 @@ export class PostgresSyncRulesStorage ); } - createCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint({ - ...checkpoint, - sync_rules_id: this.group_id - }); - } - lastWriteCheckpoint(filters: storage.SyncStorageLastWriteCheckpointFilters): Promise { return this.writeCheckpointAPI.lastWriteCheckpoint({ ...filters, @@ -748,7 +741,7 @@ export class PostgresSyncRulesStorage return this.makeActiveCheckpoint(activeCheckpoint); } - async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable { + async *watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable { let lastCheckpoint: utils.InternalOpId | null = null; let lastWriteCheckpoint: bigint | null = null; @@ -815,12 +808,12 @@ export class PostgresSyncRulesStorage const sink = new LastValueSink(undefined); const disposeListener = this.db.registerListener({ - notification: (notification) => sink.next(notification.payload) + notification: (notification) => sink.write(notification.payload) }); signal.addEventListener('aborted', async () => { disposeListener(); - sink.complete(); + sink.end(); }); yield this.makeActiveCheckpoint(doc); diff --git a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts index a12c405b..913894f2 100644 --- a/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts +++ b/modules/module-postgres-storage/src/storage/checkpoints/PostgresWriteCheckpointAPI.ts @@ -1,7 +1,7 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as framework from '@powersync/lib-services-framework'; -import { storage } from '@powersync/service-core'; -import { JSONBig } from '@powersync/service-jsonbig'; +import { storage, sync } from '@powersync/service-core'; +import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { models } from '../../types/types.js'; export type PostgresCheckpointAPIOptions = { @@ -30,34 +30,6 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return batchCreateCustomWriteCheckpoints(this.db, checkpoints); } - async createCustomWriteCheckpoint(options: storage.CustomWriteCheckpointOptions): Promise { - if (this.writeCheckpointMode !== storage.WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` - ); - } - - const { checkpoint, user_id, sync_rules_id } = options; - const row = await this.db.sql` - INSERT INTO - custom_write_checkpoints (user_id, write_checkpoint, sync_rules_id) - VALUES - ( - ${{ type: 'varchar', value: user_id }}, - ${{ type: 'int8', value: checkpoint }}, - ${{ type: 'int4', value: sync_rules_id }} - ) - ON CONFLICT DO UPDATE - SET - write_checkpoint = EXCLUDED.write_checkpoint - RETURNING - *; - ` - .decoded(models.CustomWriteCheckpoint) - .first(); - return row!.write_checkpoint; - } - async createManagedWriteCheckpoint(checkpoint: storage.ManagedWriteCheckpointOptions): Promise { if (this.writeCheckpointMode !== storage.WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( @@ -86,6 +58,13 @@ export class PostgresWriteCheckpointAPI implements storage.WriteCheckpointAPI { return row!.write_checkpoint; } + watchUserWriteCheckpoint( + options: storage.WatchUserWriteCheckpointOptions + ): AsyncIterable { + // Not used for Postgres currently + throw new Error('Method not implemented.'); + } + async lastWriteCheckpoint(filters: storage.LastWriteCheckpointFilters): Promise { switch (this.writeCheckpointMode) { case storage.WriteCheckpointMode.CUSTOM: @@ -150,11 +129,22 @@ export async function batchCreateCustomWriteCheckpoints( return; } + // Needs to be encoded using plain JSON.stringify + const mappedCheckpoints = checkpoints.map((cp) => { + return { + user_id: cp.user_id, + // Cannot encode bigint directly using JSON.stringify. + // The ::int8 in the query below will take care of casting back to a number + checkpoint: String(cp.checkpoint), + sync_rules_id: cp.sync_rules_id + }; + }); + await db.sql` WITH json_data AS ( SELECT - jsonb_array_elements(${{ type: 'jsonb', value: JSONBig.stringify(checkpoints) }}) AS + jsonb_array_elements(${{ type: 'jsonb', value: mappedCheckpoints }}) AS CHECKPOINT ) INSERT INTO diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index c2f72162..844e2ee8 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -30,7 +30,7 @@ describe.skipIf(!(env.CI || env.SLOW_TESTS))('checkpoint tests', () => { const controller = new AbortController(); try { - const stream = storage.watchWriteCheckpoint({ + const stream = storage.watchCheckpointChanges({ user_id: checkpointUserId('test_user', 'test_client'), signal: controller.signal }); diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index bdb8b851..21ab8986 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1477,4 +1477,238 @@ bucket_definitions: expect(parsedSchema3).not.equals(parsedSchema2); expect(parsedSchema3.getSourceTables()[0].schema).equals('databasename'); }); + + test('managed write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '5/0' }, + user_id: 'user1' + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); + + test('managed write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + const writeCheckpoint = await bucketStorage.createManagedWriteCheckpoint({ + heads: { '1': '6/0' }, + user_id: 'user1' + }); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't explicitly need this anymore. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('6/0'); + }); + + let result2 = await iter.next(); + if (result2.value?.base?.lsn == '5/0') { + // Events could arrive in a different order in some cases - this caters for it + result2 = await iter.next(); + } + expect(result2).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '6/0' + }, + writeCheckpoint: writeCheckpoint + } + }); + }); + + test('custom write checkpoints - checkpoint after write', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 5n, + user_id: 'user1' + } + ]); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: 5n + } + }); + }); + + test('custom write checkpoints - write after checkpoint', async (context) => { + await using factory = await generateStorageFactory(); + const r = await factory.configureSyncRules({ + content: ` +bucket_definitions: + mybucket: + data: [] + `, + validate: false + }); + const bucketStorage = factory.getInstance(r.persisted_sync_rules!); + await bucketStorage.autoActivate(); + bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); + + const abortController = new AbortController(); + context.onTestFinished(() => abortController.abort()); + const iter = bucketStorage + .watchCheckpointChanges({ user_id: 'user1', signal: abortController.signal }) + [Symbol.asyncIterator](); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('5/0'); + }); + + const result = await iter.next(); + expect(result).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n, + lsn: '5/0' + }, + writeCheckpoint: null + } + }); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 6n, + user_id: 'user1' + } + ]); + // We have to trigger a new keepalive after the checkpoint, at least to cover postgres storage. + // This is what is effetively triggered with RouteAPI.createReplicationHead(). + // MongoDB storage doesn't explicitly need this anymore. + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('6/0'); + }); + + let result2 = await iter.next(); + expect(result2).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n + // can be 5/0 or 6/0 - actual value not relevant for custom write checkpoints + // lsn: '6/0' + }, + writeCheckpoint: 6n + } + }); + + await bucketStorage.batchCreateCustomWriteCheckpoints([ + { + checkpoint: 7n, + user_id: 'user1' + } + ]); + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('7/0'); + }); + + let result3 = await iter.next(); + expect(result3).toMatchObject({ + done: false, + value: { + base: { + checkpoint: 0n + // can be 5/0, 6/0 or 7/0 - actual value not relevant for custom write checkpoints + // lsn: '7/0' + }, + writeCheckpoint: 7n + } + }); + }); } diff --git a/packages/service-core/src/index.ts b/packages/service-core/src/index.ts index 9a90ceeb..e0c8a286 100644 --- a/packages/service-core/src/index.ts +++ b/packages/service-core/src/index.ts @@ -38,3 +38,6 @@ export * as system from './system/system-index.js'; export * from './util/util-index.js'; export * as utils from './util/util-index.js'; + +export * from './streams/streams-index.js'; +export * as streams from './streams/streams-index.js'; diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index e59b2455..148f3be1 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -90,7 +90,7 @@ export interface SyncRulesBucketStorage * * The stream stops or errors if this is not the active sync rules (anymore). */ - watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable; + watchCheckpointChanges(options: WatchWriteCheckpointOptions): AsyncIterable; /** * Get a "batch" of data for a checkpoint. diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index d38ac397..b9d14ce9 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -50,11 +50,37 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti heads: Record; } +export interface WriteCheckpointResult { + /** + * Write checkpoint id (also referred to as client_id). + * + * If null, there is no write checkpoint for the client. + */ + id: bigint | null; + + /** + * LSN for the checkpoint. + * + * This will change when we support multiple connections. + * + * For managed write checkpoints, this LSN must be exceeded by the checkpoint / replication head to be valid. + * + * For custom write checkpoints, this will be null, and the write checkpoint is valid for all LSNs. + */ + lsn: string | null; +} + export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; +export interface WatchUserWriteCheckpointOptions { + user_id: string; + sync_rules_id: number; + signal: AbortSignal; +} + export interface BaseWriteCheckpointAPI { readonly writeCheckpointMode: WriteCheckpointMode; setWriteCheckpointMode(mode: WriteCheckpointMode): void; @@ -68,7 +94,6 @@ export interface BaseWriteCheckpointAPI { */ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise; - createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise; lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise; } @@ -78,8 +103,9 @@ export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { */ export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; - createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; + + watchUserWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable; } export const DEFAULT_WRITE_CHECKPOINT_MODE = WriteCheckpointMode.MANAGED; diff --git a/packages/service-core/src/sync/BroadcastIterable.ts b/packages/service-core/src/streams/BroadcastIterable.ts similarity index 98% rename from packages/service-core/src/sync/BroadcastIterable.ts rename to packages/service-core/src/streams/BroadcastIterable.ts index e8d6e290..2922c709 100644 --- a/packages/service-core/src/sync/BroadcastIterable.ts +++ b/packages/service-core/src/streams/BroadcastIterable.ts @@ -52,13 +52,13 @@ export class BroadcastIterable implements AsyncIterable { } this.last = doc; for (let sink of sinks) { - sink.next(doc); + sink.write(doc); } } // End of stream for (let sink of sinks) { - sink.complete(); + sink.end(); } } catch (e) { // Just in case the error is not from the source diff --git a/packages/service-core/src/streams/Demultiplexer.ts b/packages/service-core/src/streams/Demultiplexer.ts new file mode 100644 index 00000000..86ebcc40 --- /dev/null +++ b/packages/service-core/src/streams/Demultiplexer.ts @@ -0,0 +1,165 @@ +import { AbortError } from 'ix/aborterror.js'; +import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; +import { LastValueSink } from './LastValueSink.js'; + +export interface DemultiplexerValue { + /** + * The key used for demultiplexing, for example the user id. + */ + key: string; + /** + * The stream value. + */ + value: T; +} + +export interface DemultiplexerSource { + /** + * The async iterator providing a stream of values. + */ + iterator: AsyncIterable>; + + /** + * Fetches the first value for a given key. + * + * This is used to get an initial value for each subscription. + */ + getFirstValue(key: string): Promise; +} + +export type DemultiplexerSourceFactory = (signal: AbortSignal) => DemultiplexerSource; + +/** + * Takes a multiplexed stream (e.g. a changestream covering many individual users), + * and allows subscribing to individual streams. + * + * The source subscription is lazy: + * 1. We only start subscribing when there is a downstream subscriber. + * 2. When all downstream subscriptions have ended, we end the source subscription. + * + * For each subscriber, if backpressure builds up, we only keep the _last_ value. + */ +export class Demultiplexer { + private subscribers: Map>> | undefined = undefined; + private abortController: AbortController | undefined = undefined; + private currentSource: DemultiplexerSource | undefined = undefined; + + constructor(private source: DemultiplexerSourceFactory) {} + + private start(filter: string, sink: LastValueSink) { + const abortController = new AbortController(); + const listeners = new Map(); + listeners.set(filter, new Set([sink])); + + this.abortController = abortController; + this.subscribers = listeners; + + const source = this.source(abortController.signal); + this.currentSource = source; + this.loop(source, abortController, listeners); + return source; + } + + private async loop( + source: DemultiplexerSource, + abortController: AbortController, + sinks: Map>> + ) { + try { + for await (let doc of source.iterator) { + if (abortController.signal.aborted || sinks.size == 0) { + throw new AbortError(); + } + const key = doc.key; + const keySinks = sinks.get(key); + if (keySinks == null) { + continue; + } + + for (let sink of keySinks) { + sink.write(doc.value); + } + } + + // End of stream + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.end(); + } + } + } catch (e) { + // Just in case the error is not from the source + abortController.abort(); + + for (let keySinks of sinks.values()) { + for (let sink of keySinks) { + sink.error(e); + } + } + } finally { + // Clear state, so that a new subscription may be started + if (this.subscribers === sinks) { + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + } + + private removeSink(key: string, sink: LastValueSink) { + const existing = this.subscribers?.get(key); + if (existing == null) { + return; + } + existing.delete(sink); + if (existing.size == 0) { + this.subscribers!.delete(key); + } + + if (this.subscribers?.size == 0) { + // This is not immediate - there may be a delay until it is fully stopped, + // depending on the underlying source. + this.abortController?.abort(); + this.subscribers = undefined; + this.abortController = undefined; + this.currentSource = undefined; + } + } + + private addSink(key: string, sink: LastValueSink) { + if (this.currentSource == null) { + return this.start(key, sink); + } else { + const existing = this.subscribers!.get(key); + if (existing != null) { + existing.add(sink); + } else { + this.subscribers!.set(key, new Set([sink])); + } + return this.currentSource; + } + } + + /** + * Subscribe to a specific stream. + * + * @param key The key used for demultiplexing, e.g. user id. + * @param signal + */ + async *subscribe(key: string, signal: AbortSignal): AsyncIterable { + const sink = new LastValueSink(undefined); + // Important that we register the sink before calling getFirstValue(). + const source = this.addSink(key, sink); + try { + const firstValue = await source.getFirstValue(key); + yield firstValue; + yield* sink.withSignal(signal); + } finally { + this.removeSink(key, sink); + } + } + + get active() { + return this.subscribers != null; + } +} diff --git a/packages/service-core/src/sync/LastValueSink.ts b/packages/service-core/src/streams/LastValueSink.ts similarity index 98% rename from packages/service-core/src/sync/LastValueSink.ts rename to packages/service-core/src/streams/LastValueSink.ts index 68a164f9..23ef7dfa 100644 --- a/packages/service-core/src/sync/LastValueSink.ts +++ b/packages/service-core/src/streams/LastValueSink.ts @@ -17,7 +17,7 @@ export class LastValueSink implements AsyncIterable { } } - next(value: T) { + write(value: T) { this.push({ value, done: false, @@ -25,7 +25,7 @@ export class LastValueSink implements AsyncIterable { }); } - complete() { + end() { this.push({ value: undefined, done: true, diff --git a/packages/service-core/src/sync/merge.ts b/packages/service-core/src/streams/merge.ts similarity index 99% rename from packages/service-core/src/sync/merge.ts rename to packages/service-core/src/streams/merge.ts index fcc6aca7..ef046fbb 100644 --- a/packages/service-core/src/sync/merge.ts +++ b/packages/service-core/src/streams/merge.ts @@ -1,7 +1,7 @@ import { throwIfAborted } from 'ix/aborterror.js'; import { AsyncIterableX } from 'ix/asynciterable/index.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; -import { safeRace } from './safeRace.js'; +import { safeRace } from '../sync/safeRace.js'; /** * Merge multiple source AsyncIterables into one output AsyncIterable. diff --git a/packages/service-core/src/streams/streams-index.ts b/packages/service-core/src/streams/streams-index.ts new file mode 100644 index 00000000..0d8eafa4 --- /dev/null +++ b/packages/service-core/src/streams/streams-index.ts @@ -0,0 +1,4 @@ +export * from './merge.js'; +export * from './Demultiplexer.js'; +export * from './LastValueSink.js'; +export * from './BroadcastIterable.js'; diff --git a/packages/service-core/src/sync/sync-index.ts b/packages/service-core/src/sync/sync-index.ts index d43145d3..df47b0da 100644 --- a/packages/service-core/src/sync/sync-index.ts +++ b/packages/service-core/src/sync/sync-index.ts @@ -1,6 +1,3 @@ -export * from './BroadcastIterable.js'; -export * from './LastValueSink.js'; -export * from './merge.js'; export * from './RequestTracker.js'; export * from './safeRace.js'; export * from './sync.js'; diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 64e55a41..8c3dafc1 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -9,7 +9,7 @@ import * as util from '../util/util-index.js'; import { logger } from '@powersync/lib-services-framework'; import { BucketChecksumState } from './BucketChecksumState.js'; -import { mergeAsyncIterables } from './merge.js'; +import { mergeAsyncIterables } from '../streams/streams-index.js'; import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; import { SyncContext } from './SyncContext.js'; import { RequestTracker } from './RequestTracker.js'; @@ -105,7 +105,7 @@ async function* streamResponseInner( after: BigInt(bucket.after) })) }); - const stream = bucketStorage.watchWriteCheckpoint({ + const stream = bucketStorage.watchCheckpointChanges({ user_id: checkpointUserId, signal }); diff --git a/packages/service-core/test/src/broadcast_iterable.test.ts b/packages/service-core/test/src/broadcast_iterable.test.ts index 4d2cced7..02696313 100644 --- a/packages/service-core/test/src/broadcast_iterable.test.ts +++ b/packages/service-core/test/src/broadcast_iterable.test.ts @@ -1,14 +1,14 @@ -import { BroadcastIterable, IterableSource } from '@/sync/BroadcastIterable.js'; +import { BroadcastIterable, IterableSource } from '@/streams/BroadcastIterable.js'; import { AsyncIterableX, interval } from 'ix/asynciterable/index.js'; import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; import { take } from 'ix/asynciterable/operators/take.js'; import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js'; import { toArray } from 'ix/asynciterable/toarray.js'; import * as timers from 'timers/promises'; -import { describe, expect, test } from 'vitest'; +import { describe, expect, it } from 'vitest'; describe('BroadcastIterable', () => { - test('should iterate', async () => { + it('should iterate', async () => { const range = AsyncIterableX.from([1, 2, 3]); const broadcast = new BroadcastIterable(() => range); @@ -17,7 +17,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should skip values if sink is slow', async () => { + it('should skip values if sink is slow', async () => { const range = AsyncIterableX.from([1, 2, 3]); const broadcast = new BroadcastIterable(() => range); @@ -30,7 +30,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should abort', async () => { + it('should abort', async () => { const range = AsyncIterableX.from([1, 2, 3]); let recordedSignal: AbortSignal | undefined; const broadcast = new BroadcastIterable((signal) => { @@ -46,7 +46,7 @@ describe('BroadcastIterable', () => { expect(recordedSignal!.aborted).toEqual(true); }); - test('should handle indefinite sources', async () => { + it('should handle indefinite sources', async () => { const source: IterableSource = (signal) => { return wrapWithAbort(interval(1), signal); }; @@ -65,7 +65,7 @@ describe('BroadcastIterable', () => { expect(broadcast.active).toBe(false); }); - test('should handle multiple subscribers', async () => { + it('should handle multiple subscribers', async () => { let sourceIndex = 0; const source = async function* (signal: AbortSignal) { // Test value out by 1000 means it may have used the wrong iteration of the source @@ -111,7 +111,7 @@ describe('BroadcastIterable', () => { expect(results3[4]).toBeLessThan(2145); }); - test('should handle errors on multiple subscribers', async () => { + it('should handle errors on multiple subscribers', async () => { let sourceIndex = 0; const source = async function* (signal: AbortSignal) { // Test value out by 1000 means it may have used the wrong iteration of the source diff --git a/packages/service-core/test/src/demultiplexer.test.ts b/packages/service-core/test/src/demultiplexer.test.ts new file mode 100644 index 00000000..69ba9c87 --- /dev/null +++ b/packages/service-core/test/src/demultiplexer.test.ts @@ -0,0 +1,205 @@ +// Vitest Unit Tests +import { Demultiplexer, DemultiplexerSource, DemultiplexerSourceFactory, DemultiplexerValue } from '@/index.js'; +import { delayEach } from 'ix/asynciterable/operators/delayeach.js'; +import { take } from 'ix/asynciterable/operators/take.js'; +import { toArray } from 'ix/asynciterable/toarray.js'; +import * as timers from 'node:timers/promises'; +import { describe, expect, it } from 'vitest'; + +describe('Demultiplexer', () => { + it('should start subscription lazily and provide first value', async () => { + const mockSource: DemultiplexerSourceFactory = (signal: AbortSignal) => { + const iterator = (async function* (): AsyncIterable> {})(); + return { + iterator, + getFirstValue: async (key: string) => `first-${key}` + }; + }; + + const demux = new Demultiplexer(mockSource); + const signal = new AbortController().signal; + + const iter = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const result = await iter.next(); + expect(result.value).toBe('first-user1'); + }); + + it('should handle multiple subscribers to the same key', async () => { + const iter = (async function* () { + yield { key: 'user1', value: 'value1' }; + yield { key: 'user1', value: 'value2' }; + })(); + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skipped + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should handle multiple subscribers to the same key (2)', async () => { + const p1 = Promise.withResolvers(); + const p2 = Promise.withResolvers(); + const p3 = Promise.withResolvers(); + + const iter = (async function* () { + await p1.promise; + yield { key: 'user1', value: 'value1' }; + await p2.promise; + yield { key: 'user1', value: 'value2' }; + await p3.promise; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skilled + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: 'first-user1', done: false }); + p1.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + expect(await iter2.next()).toEqual({ value: 'value1', done: false }); + p2.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value2', done: false }); + p3.resolve(); + + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should handle multiple subscribers to different keys', async () => { + const p1 = Promise.withResolvers(); + const p2 = Promise.withResolvers(); + const p3 = Promise.withResolvers(); + + const iter = (async function* () { + await p1.promise; + yield { key: 'user1', value: 'value1' }; + await p2.promise; + yield { key: 'user2', value: 'value2' }; + await p3.promise; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const signal = new AbortController().signal; + + const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator](); + const iter2 = demux.subscribe('user2', signal)[Symbol.asyncIterator](); + + // Due to only keeping the last value, some values are skilled + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + expect(await iter2.next()).toEqual({ value: 'first-user2', done: false }); + p1.resolve(); + + expect(await iter1.next()).toEqual({ value: 'value1', done: false }); + p2.resolve(); + + expect(await iter2.next()).toEqual({ value: 'value2', done: false }); + p3.resolve(); + + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it('should abort', async () => { + const iter = (async function* () { + yield { key: 'user1', value: 'value1' }; + yield { key: 'user1', value: 'value2' }; + })(); + + const source: DemultiplexerSource = { + iterator: iter, + getFirstValue: async (key: string) => `first-${key}` + }; + + const demux = new Demultiplexer(() => source); + const controller = new AbortController(); + + const iter1 = demux.subscribe('user1', controller.signal)[Symbol.asyncIterator](); + + expect(await iter1.next()).toEqual({ value: 'first-user1', done: false }); + controller.abort(); + + await expect(iter1.next()).rejects.toThrow('The operation has been aborted'); + }); + + it('should handle errors on multiple subscribers', async () => { + let sourceIndex = 0; + const sourceFn = async function* (signal: AbortSignal): AsyncIterable> { + // Test value out by 1000 means it may have used the wrong iteration of the source + const base = (sourceIndex += 1000); + const abortedPromise = new Promise((resolve) => { + signal.addEventListener('abort', resolve, { once: true }); + }); + for (let i = 0; !signal.aborted; i++) { + if (base + i == 1005) { + throw new Error('simulated failure'); + } + yield { key: 'u1', value: base + i }; + await Promise.race([abortedPromise, timers.setTimeout(1)]); + } + // Test value out by 100 means this wasn't reached + sourceIndex += 100; + }; + + const sourceFactory: DemultiplexerSourceFactory = (signal) => { + const source: DemultiplexerSource = { + iterator: sourceFn(signal), + getFirstValue: async (key: string) => -1 + }; + return source; + }; + const demux = new Demultiplexer(sourceFactory); + + const controller = new AbortController(); + + const delayed1 = delayEach(9)(demux.subscribe('u1', controller.signal)); + const delayed2 = delayEach(10)(demux.subscribe('u1', controller.signal)); + expect(demux.active).toBe(false); + const results1Promise = toArray(take(5)(delayed1)) as Promise; + const results2Promise = toArray(take(5)(delayed2)) as Promise; + + const [r1, r2] = await Promise.allSettled([results1Promise, results2Promise]); + + expect(r1).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); + expect(r2).toEqual({ status: 'rejected', reason: new Error('simulated failure') }); + + expect(demux.active).toBe(false); + + // This starts a new source + const delayed3 = delayEach(10)(demux.subscribe('u1', controller.signal)); + const results3 = await toArray(take(6)(delayed3)); + expect(results3.length).toEqual(6); + expect(results3[0]).toEqual(-1); // Initial value + // There should be approximately 10ms between each value, but we allow for some slack + expect(results3[5]).toBeGreaterThan(2005); + expect(results3[5]).toBeLessThan(2200); + }); +}); diff --git a/packages/service-core/test/src/merge_iterable.test.ts b/packages/service-core/test/src/merge_iterable.test.ts index bd512376..237c6e8e 100644 --- a/packages/service-core/test/src/merge_iterable.test.ts +++ b/packages/service-core/test/src/merge_iterable.test.ts @@ -1,4 +1,4 @@ -import { mergeAsyncIterablesNew, mergeAsyncIterablesOld } from '@/sync/merge.js'; +import { mergeAsyncIterablesNew, mergeAsyncIterablesOld } from '@/streams/merge.js'; import * as timers from 'timers/promises'; import { describe, expect, test } from 'vitest';