Skip to content

Commit 5f412e1

Browse files
committed
Implement fix for SnapshotTooOld on parameter queries.
1 parent 49f2bd7 commit 5f412e1

File tree

1 file changed

+41
-14
lines changed

1 file changed

+41
-14
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ export interface MongoSyncBucketStorageOptions {
4141
checksumOptions?: MongoChecksumOptions;
4242
}
4343

44+
/**
45+
* Only keep checkpoints around for a minute, before fetching a fresh one.
46+
*
47+
* The reason is that we keep a MongoDB snapshot reference (clusterTime) with the checkpoint,
48+
* and they expire after 5 minutes by default. This is an issue if the checkpoint stream is idle,
49+
* but new clients connect and use an outdated checkpoint snapshot for parameter queries.
50+
*
51+
* These will be filtered out for existing clients, so should not create significant overhead.
52+
*/
53+
const CHECKPOINT_TIMEOUT_MS = 60_000;
54+
4455
export class MongoSyncBucketStorage
4556
extends BaseObserver<storage.SyncRulesBucketStorageListener>
4657
implements storage.SyncRulesBucketStorage
@@ -682,23 +693,39 @@ export class MongoSyncBucketStorage
682693
// If it changes to inactive, we abort and restart with the new sync rules.
683694
let lastOp: storage.ReplicationCheckpoint | null = null;
684695

685-
for await (const _ of stream) {
686-
if (signal.aborted) {
687-
break;
688-
}
696+
try {
697+
while (true) {
698+
// If the stream is idle, we wait a max of a minute (CHECKPOINT_TIMEOUT_MS)
699+
// before we get another checkpoint, to avoid stale checkpoint snapshots.
700+
const timeout = timers.setTimeout(CHECKPOINT_TIMEOUT_MS, null, { signal });
701+
try {
702+
await Promise.race([stream.next(), timeout]);
703+
} catch (e) {
704+
if (e.name == 'AbortError') {
705+
break;
706+
}
707+
throw e;
708+
}
689709

690-
const op = await this.getCheckpointInternal();
691-
if (op == null) {
692-
// Sync rules have changed - abort and restart.
693-
// We do a soft close of the stream here - no error
694-
break;
695-
}
710+
if (signal.aborted) {
711+
break;
712+
}
713+
714+
const op = await this.getCheckpointInternal();
715+
if (op == null) {
716+
// Sync rules have changed - abort and restart.
717+
// We do a soft close of the stream here - no error
718+
break;
719+
}
696720

697-
// Check for LSN / checkpoint changes - ignore other metadata changes
698-
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
699-
lastOp = op;
700-
yield op;
721+
// Check for LSN / checkpoint changes - ignore other metadata changes
722+
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
723+
lastOp = op;
724+
yield op;
725+
}
701726
}
727+
} finally {
728+
await stream.return(null);
702729
}
703730
}
704731

0 commit comments

Comments
 (0)