Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a0b25ca

Browse files
committedMar 20, 2025··
Use a shared iterator for custom write checkpoints.
1 parent 1a59edc commit a0b25ca

File tree

2 files changed

+106
-75
lines changed

2 files changed

+106
-75
lines changed
 

‎modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ export class MongoSyncBucketStorage
6969
this.db = factory.db;
7070
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
7171
db: this.db,
72-
mode: writeCheckpointMode
72+
mode: writeCheckpointMode,
73+
sync_rules_id: group_id
7374
});
7475
}
7576

‎modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts

+104-74
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,18 @@ import { CustomWriteCheckpointDocument, WriteCheckpointDocument } from './models
1313
export type MongoCheckpointAPIOptions = {
1414
db: PowerSyncMongo;
1515
mode: storage.WriteCheckpointMode;
16+
sync_rules_id: number;
1617
};
1718

1819
export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
1920
readonly db: PowerSyncMongo;
2021
private _mode: storage.WriteCheckpointMode;
22+
private sync_rules_id: number;
2123

2224
constructor(options: MongoCheckpointAPIOptions) {
2325
this.db = options.db;
2426
this._mode = options.mode;
27+
this.sync_rules_id = options.sync_rules_id;
2528
}
2629

2730
get writeCheckpointMode() {
@@ -89,14 +92,8 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
8992
}
9093
}
9194

92-
private sharedIter = new Demultiplexer<WriteCheckpointResult>((signal) => {
93-
const clusterTimePromise = (async () => {
94-
const hello = await this.db.db.command({ hello: 1 });
95-
// Note: This is not valid on sharded clusters.
96-
const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp;
97-
startClusterTime;
98-
return startClusterTime;
99-
})();
95+
private sharedManagedIter = new Demultiplexer<WriteCheckpointResult>((signal) => {
96+
const clusterTimePromise = this.getClusterTime();
10097

10198
return {
10299
iterator: this.watchAllManagedWriteCheckpoints(clusterTimePromise, signal),
@@ -170,13 +167,6 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
170167
}
171168
);
172169

173-
const hello = await this.db.db.command({ hello: 1 });
174-
// Note: This is not valid on sharded clusters.
175-
const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp;
176-
if (startClusterTime == null) {
177-
throw new framework.ServiceAssertionError('Could not get clusterTime');
178-
}
179-
180170
signal.onabort = () => {
181171
stream.close();
182172
};
@@ -202,55 +192,75 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
202192
}
203193
}
204194

205-
async *watchManagedWriteCheckpoint(
206-
options: WatchUserWriteCheckpointOptions
207-
): AsyncIterable<storage.WriteCheckpointResult> {
208-
const stream = this.sharedIter.subscribe(options.user_id, options.signal);
195+
watchManagedWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpointResult> {
196+
const stream = this.sharedManagedIter.subscribe(options.user_id, options.signal);
197+
return this.orderedStream(stream);
198+
}
209199

210-
let lastId = -1n;
200+
private sharedCustomIter = new Demultiplexer<WriteCheckpointResult>((signal) => {
201+
const clusterTimePromise = this.getClusterTime();
211202

212-
for await (let doc of stream) {
213-
// Guard against out-of-order events
214-
if (lastId == -1n || (doc.id != null && doc.id > lastId)) {
215-
yield doc;
216-
if (doc.id != null) {
217-
lastId = doc.id;
218-
}
219-
}
220-
}
221-
}
203+
return {
204+
iterator: this.watchAllCustomWriteCheckpoints(clusterTimePromise, signal),
205+
getFirstValue: async (user_id: string) => {
206+
// We cater for the same potential race conditions as for managed write checkpoints.
222207

223-
async *watchCustomWriteCheckpoint(
224-
options: WatchUserWriteCheckpointOptions
225-
): AsyncIterable<storage.WriteCheckpointResult> {
226-
const { user_id, sync_rules_id, signal } = options;
208+
const changeStreamStart = await clusterTimePromise;
227209

228-
let doc = null as CustomWriteCheckpointDocument | null;
229-
let clusterTime = null as mongo.Timestamp | null;
210+
let doc = null as CustomWriteCheckpointDocument | null;
211+
let clusterTime = null as mongo.Timestamp | null;
230212

231-
await this.db.client.withSession(async (session) => {
232-
doc = await this.db.custom_write_checkpoints.findOne(
233-
{
234-
user_id: user_id,
235-
sync_rules_id: sync_rules_id
236-
},
237-
{
238-
session
213+
await this.db.client.withSession(async (session) => {
214+
doc = await this.db.custom_write_checkpoints.findOne(
215+
{
216+
user_id: user_id,
217+
sync_rules_id: this.sync_rules_id
218+
},
219+
{
220+
session
221+
}
222+
);
223+
const time = session.clusterTime?.clusterTime ?? null;
224+
clusterTime = time;
225+
});
226+
if (clusterTime == null) {
227+
throw new framework.ServiceAssertionError('Could not get clusterTime for write checkpoint');
228+
}
229+
230+
if (clusterTime.lessThan(changeStreamStart)) {
231+
throw new framework.ServiceAssertionError(
232+
'clusterTime for write checkpoint is older than changestream start'
233+
);
234+
}
235+
236+
if (doc == null) {
237+
// No write checkpoint, but we still need to return a result
238+
return {
239+
id: null,
240+
lsn: null
241+
};
239242
}
240-
);
241-
const time = session.clusterTime?.clusterTime ?? null;
242-
clusterTime = time;
243-
});
244-
if (clusterTime == null) {
245-
throw new framework.ServiceAssertionError('Could not get clusterTime');
246-
}
243+
244+
return {
245+
id: doc.checkpoint,
246+
// custom write checkpoints are not tied to a LSN
247+
lsn: null
248+
};
249+
}
250+
};
251+
});
252+
253+
private async *watchAllCustomWriteCheckpoints(
254+
clusterTimePromise: Promise<mongo.BSON.Timestamp>,
255+
signal: AbortSignal
256+
): AsyncGenerator<DemultiplexerValue<WriteCheckpointResult>> {
257+
const clusterTime = await clusterTimePromise;
247258

248259
const stream = this.db.custom_write_checkpoints.watch(
249260
[
250261
{
251262
$match: {
252-
'fullDocument.user_id': user_id,
253-
'fullDocument.sync_rules_id': sync_rules_id,
263+
'fullDocument.sync_rules_id': this.sync_rules_id,
254264
operationType: { $in: ['insert', 'update', 'replace'] }
255265
}
256266
}
@@ -270,34 +280,30 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
270280
return;
271281
}
272282

273-
let lastId = -1n;
274-
275-
if (doc != null) {
276-
yield {
277-
id: doc.checkpoint,
278-
lsn: null
279-
};
280-
lastId = doc.checkpoint;
281-
} else {
282-
yield {
283-
id: null,
284-
lsn: null
285-
};
286-
}
287-
288283
for await (let event of stream) {
289284
if (!('fullDocument' in event) || event.fullDocument == null) {
290285
continue;
291286
}
292-
// Guard against out-of-order events
293-
if (event.fullDocument.checkpoint > lastId) {
294-
yield {
287+
288+
const user_id = event.fullDocument.user_id;
289+
yield {
290+
key: user_id,
291+
value: {
295292
id: event.fullDocument.checkpoint,
293+
// Custom write checkpoints are not tied to a specific LSN
296294
lsn: null
297-
};
298-
lastId = event.fullDocument.checkpoint;
299-
}
295+
}
296+
};
297+
}
298+
}
299+
300+
watchCustomWriteCheckpoint(options: WatchUserWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpointResult> {
301+
if (options.sync_rules_id != this.sync_rules_id) {
302+
throw new framework.ServiceAssertionError('sync_rules_id does not match');
300303
}
304+
305+
const stream = this.sharedCustomIter.subscribe(options.user_id, options.signal);
306+
return this.orderedStream(stream);
301307
}
302308

303309
protected async lastCustomWriteCheckpoint(filters: storage.CustomWriteCheckpointFilters) {
@@ -323,6 +329,30 @@ export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
323329
});
324330
return lastWriteCheckpoint?.client_id ?? null;
325331
}
332+
333+
private async getClusterTime(): Promise<mongo.Timestamp> {
334+
const hello = await this.db.db.command({ hello: 1 });
335+
// Note: This is not valid on sharded clusters.
336+
const startClusterTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp;
337+
return startClusterTime;
338+
}
339+
340+
/**
341+
* Makes a write checkpoint stream an orderered one - any out-of-order events are discarded.
342+
*/
343+
private async *orderedStream(stream: AsyncIterable<storage.WriteCheckpointResult>) {
344+
let lastId = -1n;
345+
346+
for await (let event of stream) {
347+
// Guard against out-of-order events
348+
if (lastId == -1n || (event.id != null && event.id > lastId)) {
349+
yield event;
350+
if (event.id != null) {
351+
lastId = event.id;
352+
}
353+
}
354+
}
355+
}
326356
}
327357

328358
export async function batchCreateCustomWriteCheckpoints(

0 commit comments

Comments
 (0)
Please sign in to comment.