Skip to content

Commit d976830

Browse files
authored
Fix SnapshotTooOld in parameter queries (#366)
* Implement fix for SnapshotTooOld on parameter queries. * Fix error handling for test-client concurrent-connections on http streams. * Remove filter. * More fixes around streams ending. * Changeset.
1 parent 3f01b05 commit d976830

File tree

3 files changed

+92
-49
lines changed

3 files changed

+92
-49
lines changed

.changeset/lovely-olives-type.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
Fix SnapshotTooOld on parameter queries in some cases.

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ export interface MongoSyncBucketStorageOptions {
4141
checksumOptions?: MongoChecksumOptions;
4242
}
4343

44+
/**
45+
* Only keep checkpoints around for a minute, before fetching a fresh one.
46+
*
47+
* The reason is that we keep a MongoDB snapshot reference (clusterTime) with the checkpoint,
48+
* and they expire after 5 minutes by default. This is an issue if the checkpoint stream is idle,
49+
* but new clients connect and use an outdated checkpoint snapshot for parameter queries.
50+
*
51+
* These will be filtered out for existing clients, so should not create significant overhead.
52+
*/
53+
const CHECKPOINT_TIMEOUT_MS = 60_000;
54+
4455
export class MongoSyncBucketStorage
4556
extends BaseObserver<storage.SyncRulesBucketStorageListener>
4657
implements storage.SyncRulesBucketStorage
@@ -680,25 +691,45 @@ export class MongoSyncBucketStorage
680691

681692
// We only watch changes to the active sync rules.
682693
// If it changes to inactive, we abort and restart with the new sync rules.
683-
let lastOp: storage.ReplicationCheckpoint | null = null;
694+
try {
695+
while (true) {
696+
// If the stream is idle, we wait a max of a minute (CHECKPOINT_TIMEOUT_MS)
697+
// before we get another checkpoint, to avoid stale checkpoint snapshots.
698+
const timeout = timers
699+
.setTimeout(CHECKPOINT_TIMEOUT_MS, { done: false }, { signal })
700+
.catch(() => ({ done: true }));
701+
try {
702+
const result = await Promise.race([stream.next(), timeout]);
703+
if (result.done) {
704+
break;
705+
}
706+
} catch (e) {
707+
if (e.name == 'AbortError') {
708+
break;
709+
}
710+
throw e;
711+
}
684712

685-
for await (const _ of stream) {
686-
if (signal.aborted) {
687-
break;
688-
}
713+
if (signal.aborted) {
714+
// Would likely have been caught by the signal on the timeout or the upstream stream, but we check here anyway
715+
break;
716+
}
689717

690-
const op = await this.getCheckpointInternal();
691-
if (op == null) {
692-
// Sync rules have changed - abort and restart.
693-
// We do a soft close of the stream here - no error
694-
break;
695-
}
718+
const op = await this.getCheckpointInternal();
719+
if (op == null) {
720+
// Sync rules have changed - abort and restart.
721+
// We do a soft close of the stream here - no error
722+
break;
723+
}
696724

697-
// Check for LSN / checkpoint changes - ignore other metadata changes
698-
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
699-
lastOp = op;
725+
// Previously, we only yielded when the checkpoint or lsn changed.
726+
// However, we always want to use the latest snapshotTime, so we skip that filtering here.
727+
// That filtering could be added in the per-user streams if needed, but in general the capped collection
728+
// should already only contain useful changes in most cases.
700729
yield op;
701730
}
731+
} finally {
732+
await stream.return(null);
702733
}
703734
}
704735

test-client/src/ndjson.ts

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,48 +10,53 @@ export function ndjsonStream<T>(
1010
var decoder = new TextDecoder();
1111
var data_buf = '';
1212

13-
reader.read().then(function processResult(result): void | Promise<any> {
14-
if (result.done) {
15-
if (cancellationRequest) {
16-
// Immediately exit
17-
return;
18-
}
19-
20-
data_buf = data_buf.trim();
21-
if (data_buf.length !== 0) {
22-
try {
23-
var data_l = JSON.parse(data_buf);
24-
controller.enqueue({ chunk: data_l, size: data_l.length });
25-
} catch (e) {
26-
controller.error(e);
13+
reader
14+
.read()
15+
.then(function processResult(result): void | Promise<any> {
16+
if (result.done) {
17+
if (cancellationRequest) {
18+
// Immediately exit
2719
return;
2820
}
21+
22+
data_buf = data_buf.trim();
23+
if (data_buf.length !== 0) {
24+
try {
25+
var data_l = JSON.parse(data_buf);
26+
controller.enqueue({ chunk: data_l, size: data_l.length });
27+
} catch (e) {
28+
controller.error(e);
29+
return;
30+
}
31+
}
32+
controller.close();
33+
return;
2934
}
30-
controller.close();
31-
return;
32-
}
3335

34-
var data = decoder.decode(result.value, { stream: true });
35-
data_buf += data;
36-
var lines = data_buf.split('\n');
37-
for (var i = 0; i < lines.length - 1; ++i) {
38-
var l = lines[i].trim();
39-
if (l.length > 0) {
40-
try {
41-
var data_line = JSON.parse(l);
42-
controller.enqueue({ chunk: data_line, size: l.length });
43-
} catch (e) {
44-
controller.error(e);
45-
cancellationRequest = true;
46-
reader.cancel();
47-
return;
36+
var data = decoder.decode(result.value, { stream: true });
37+
data_buf += data;
38+
var lines = data_buf.split('\n');
39+
for (var i = 0; i < lines.length - 1; ++i) {
40+
var l = lines[i].trim();
41+
if (l.length > 0) {
42+
try {
43+
var data_line = JSON.parse(l);
44+
controller.enqueue({ chunk: data_line, size: l.length });
45+
} catch (e) {
46+
controller.error(e);
47+
cancellationRequest = true;
48+
reader.cancel();
49+
return;
50+
}
4851
}
4952
}
50-
}
51-
data_buf = lines[lines.length - 1];
53+
data_buf = lines[lines.length - 1];
5254

53-
return reader.read().then(processResult);
54-
});
55+
return reader.read().then(processResult);
56+
})
57+
.catch((e) => {
58+
controller.error(e);
59+
});
5560
},
5661
cancel: function (reason) {
5762
cancellationRequest = true;

0 commit comments

Comments
 (0)