Skip to content

Commit bfece49

Browse files
authored
Cache parameter queries and buckets (#200)
* POC of incremental update lookups. * Cache dynamic bucket lookups. * Fix sizeCalculation. * Optimization: skip checking for bucket_parameters changes if there are none. * Fix tests. * Fix import. * Improve parameter query filtering. * Track last_op for each bucket. * Fix cache size calculation. * Update bucket counts after compact. * Fix some tests. * Use an explicit ParameterLookup class for better typing. * Fix sync-rules tests. * Fix another test. * Add changeset. * Remove op_count from bucket_state. * Fix compact. * Cleanup and comments. * Simplify type guard. * Tweaks and tests for hasIntersection. * Use set intersection. * Fix handling of checkpoints only containing a write checkpoint update. * Use a Symbol instead of null for INVALIDATE_ALL_BUCKETS. * Fix typo. * Add tests; fix parameter query lookup issue. * Update snapshots for postgres.
1 parent 613d20f commit bfece49

28 files changed

+1224
-228
lines changed

.changeset/swift-wolves-sleep.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-core': minor
6+
'@powersync/service-sync-rules': minor
7+
---
8+
9+
Cache parameter queries and buckets to reduce incremental sync overhead
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { migrations } from '@powersync/service-core';
2+
import * as storage from '../../../storage/storage-index.js';
3+
import { MongoStorageConfig } from '../../../types/types.js';
4+
5+
const INDEX_NAME = 'bucket_updates';
6+
7+
export const up: migrations.PowerSyncMigrationFunction = async (context) => {
8+
const {
9+
service_context: { configuration }
10+
} = context;
11+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
12+
13+
try {
14+
await db.bucket_state.createIndex(
15+
{
16+
'_id.g': 1,
17+
last_op: 1
18+
},
19+
{ name: INDEX_NAME, unique: true }
20+
);
21+
} finally {
22+
await db.client.close();
23+
}
24+
};
25+
26+
export const down: migrations.PowerSyncMigrationFunction = async (context) => {
27+
const {
28+
service_context: { configuration }
29+
} = context;
30+
31+
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
32+
33+
try {
34+
if (await db.bucket_state.indexExists(INDEX_NAME)) {
35+
await db.bucket_state.dropIndex(INDEX_NAME);
36+
}
37+
} finally {
38+
await db.client.close();
39+
}
40+
};

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,12 @@ export class MongoCompactor {
314314
let lastOpId: BucketDataKey | null = null;
315315
let targetOp: bigint | null = null;
316316
let gotAnOp = false;
317+
let numberOfOpsToClear = 0;
317318
for await (let op of query.stream()) {
318319
if (op.op == 'MOVE' || op.op == 'REMOVE' || op.op == 'CLEAR') {
319320
checksum = utils.addChecksums(checksum, op.checksum);
320321
lastOpId = op._id;
322+
numberOfOpsToClear += 1;
321323
if (op.op != 'CLEAR') {
322324
gotAnOp = true;
323325
}
@@ -337,7 +339,7 @@ export class MongoCompactor {
337339
return;
338340
}
339341

340-
logger.info(`Flushing CLEAR at ${lastOpId?.o}`);
342+
logger.info(`Flushing CLEAR for ${numberOfOpsToClear} ops at ${lastOpId?.o}`);
341343
await this.db.bucket_data.deleteMany(
342344
{
343345
_id: {

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

+116-13
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,29 @@ import {
99
} from '@powersync/lib-services-framework';
1010
import {
1111
BroadcastIterable,
12-
CHECKPOINT_INVALIDATE_ALL,
1312
CheckpointChanges,
1413
GetCheckpointChangesOptions,
1514
InternalOpId,
1615
internalToExternalOpId,
1716
ProtocolOpId,
1817
ReplicationCheckpoint,
19-
SourceTable,
2018
storage,
2119
utils,
22-
WatchWriteCheckpointOptions
20+
WatchWriteCheckpointOptions,
21+
CHECKPOINT_INVALIDATE_ALL,
22+
deserializeParameterLookup
2323
} from '@powersync/service-core';
24-
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
24+
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
2525
import * as bson from 'bson';
2626
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
27+
import { LRUCache } from 'lru-cache';
2728
import * as timers from 'timers/promises';
2829
import { MongoBucketStorage } from '../MongoBucketStorage.js';
2930
import { PowerSyncMongo } from './db.js';
3031
import {
3132
BucketDataDocument,
3233
BucketDataKey,
34+
BucketStateDocument,
3335
SourceKey,
3436
SourceTableDocument,
3537
SyncRuleCheckpointState,
@@ -39,6 +41,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
3941
import { MongoCompactor } from './MongoCompactor.js';
4042
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4143
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
44+
import { JSONBig } from '@powersync/service-jsonbig';
4245

4346
export class MongoSyncBucketStorage
4447
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -154,7 +157,7 @@ export class MongoSyncBucketStorage
154157

155158
await callback(batch);
156159
await batch.flush();
157-
if (batch.last_flushed_op) {
160+
if (batch.last_flushed_op != null) {
158161
return { flushed_op: batch.last_flushed_op };
159162
} else {
160163
return null;
@@ -252,7 +255,7 @@ export class MongoSyncBucketStorage
252255
return result!;
253256
}
254257

255-
async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
258+
async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
256259
const lookupFilter = lookups.map((lookup) => {
257260
return storage.serializeLookup(lookup);
258261
});
@@ -585,6 +588,13 @@ export class MongoSyncBucketStorage
585588
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
586589
);
587590

591+
await this.db.bucket_state.deleteMany(
592+
{
593+
_id: idPrefixFilter<BucketStateDocument['_id']>({ g: this.group_id }, ['b'])
594+
},
595+
{ maxTimeMS: lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
596+
);
597+
588598
await this.db.source_tables.deleteMany(
589599
{
590600
group_id: this.group_id
@@ -795,12 +805,7 @@ export class MongoSyncBucketStorage
795805

796806
const updates: CheckpointChanges =
797807
lastCheckpoint == null
798-
? {
799-
invalidateDataBuckets: true,
800-
invalidateParameterBuckets: true,
801-
updatedDataBuckets: [],
802-
updatedParameterBucketDefinitions: []
803-
}
808+
? CHECKPOINT_INVALIDATE_ALL
804809
: await this.getCheckpointChanges({
805810
lastCheckpoint: lastCheckpoint,
806811
nextCheckpoint: checkpoint
@@ -869,7 +874,105 @@ export class MongoSyncBucketStorage
869874
return pipeline;
870875
}
871876

877+
private async getDataBucketChanges(
878+
options: GetCheckpointChangesOptions
879+
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
880+
const limit = 1000;
881+
const bucketStateUpdates = await this.db.bucket_state
882+
.find(
883+
{
884+
// We have an index on (_id.g, last_op).
885+
'_id.g': this.group_id,
886+
last_op: { $gt: BigInt(options.lastCheckpoint) }
887+
},
888+
{
889+
projection: {
890+
'_id.b': 1
891+
},
892+
limit: limit + 1,
893+
batchSize: limit + 1,
894+
singleBatch: true
895+
}
896+
)
897+
.toArray();
898+
899+
const buckets = bucketStateUpdates.map((doc) => doc._id.b);
900+
const invalidateDataBuckets = buckets.length > limit;
901+
902+
return {
903+
invalidateDataBuckets: invalidateDataBuckets,
904+
updatedDataBuckets: invalidateDataBuckets ? new Set<string>() : new Set(buckets)
905+
};
906+
}
907+
908+
private async getParameterBucketChanges(
909+
options: GetCheckpointChangesOptions
910+
): Promise<Pick<CheckpointChanges, 'updatedParameterLookups' | 'invalidateParameterBuckets'>> {
911+
const limit = 1000;
912+
const parameterUpdates = await this.db.bucket_parameters
913+
.find(
914+
{
915+
_id: { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) },
916+
'key.g': this.group_id
917+
},
918+
{
919+
projection: {
920+
lookup: 1
921+
},
922+
limit: limit + 1,
923+
batchSize: limit + 1,
924+
singleBatch: true
925+
}
926+
)
927+
.toArray();
928+
const invalidateParameterUpdates = parameterUpdates.length > limit;
929+
930+
return {
931+
invalidateParameterBuckets: invalidateParameterUpdates,
932+
updatedParameterLookups: invalidateParameterUpdates
933+
? new Set<string>()
934+
: new Set<string>(parameterUpdates.map((p) => JSONBig.stringify(deserializeParameterLookup(p.lookup))))
935+
};
936+
}
937+
938+
// If we processed all connections together for each checkpoint, we could do a single lookup for all connections.
939+
// In practice, specific connections may fall behind. So instead, we just cache the results of each specific lookup.
940+
// TODO (later):
941+
// We can optimize this by implementing it like ChecksumCache: We can use partial cache results to do
942+
// more efficient lookups in some cases.
943+
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
944+
// Limit to 50 cache entries, or 10MB, whichever comes first.
945+
// Some rough calculations:
946+
// If we process 10 checkpoints per second, and a connection may be 2 seconds behind, we could have
947+
// up to 20 relevant checkpoints. That gives us 20*20 = 400 potentially-relevant cache entries.
948+
// That is a worst-case scenario, so we don't actually store that many. In real life, the cache keys
949+
// would likely be clustered around a few values, rather than spread over all 400 potential values.
950+
max: 50,
951+
maxSize: 10 * 1024 * 1024,
952+
sizeCalculation: (value: CheckpointChanges) => {
953+
// Estimate of memory usage
954+
const paramSize = [...value.updatedParameterLookups].reduce<number>((a, b) => a + b.length, 0);
955+
const bucketSize = [...value.updatedDataBuckets].reduce<number>((a, b) => a + b.length, 0);
956+
return 100 + paramSize + bucketSize;
957+
},
958+
fetchMethod: async (_key, _staleValue, options) => {
959+
return this.getCheckpointChangesInternal(options.context.options);
960+
}
961+
});
962+
872963
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
873-
return CHECKPOINT_INVALIDATE_ALL;
964+
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
965+
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
966+
return result!;
967+
}
968+
969+
private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
970+
const dataUpdates = await this.getDataBucketChanges(options);
971+
const parameterUpdates = await this.getParameterBucketChanges(options);
972+
973+
return {
974+
...dataUpdates,
975+
...parameterUpdates
976+
};
874977
}
875978
}

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

+52
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { PowerSyncMongo } from './db.js';
1111
import {
1212
BucketDataDocument,
1313
BucketParameterDocument,
14+
BucketStateDocument,
1415
CurrentBucket,
1516
CurrentDataDocument,
1617
SourceKey
@@ -48,6 +49,7 @@ export class PersistedBatch {
4849
bucketData: mongo.AnyBulkWriteOperation<BucketDataDocument>[] = [];
4950
bucketParameters: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
5051
currentData: mongo.AnyBulkWriteOperation<CurrentDataDocument>[] = [];
52+
bucketStates: Map<string, BucketStateUpdate> = new Map();
5153

5254
/**
5355
* For debug logging only.
@@ -66,6 +68,19 @@ export class PersistedBatch {
6668
this.currentSize = writtenSize;
6769
}
6870

71+
private incrementBucket(bucket: string, op_id: InternalOpId) {
72+
let existingState = this.bucketStates.get(bucket);
73+
if (existingState) {
74+
existingState.lastOp = op_id;
75+
existingState.incrementCount += 1;
76+
} else {
77+
this.bucketStates.set(bucket, {
78+
lastOp: op_id,
79+
incrementCount: 1
80+
});
81+
}
82+
}
83+
6984
saveBucketData(options: {
7085
op_seq: MongoIdSequence;
7186
sourceKey: storage.ReplicaId;
@@ -120,6 +135,7 @@ export class PersistedBatch {
120135
}
121136
}
122137
});
138+
this.incrementBucket(k.bucket, op_id);
123139
}
124140

125141
for (let bd of remaining_buckets.values()) {
@@ -147,6 +163,7 @@ export class PersistedBatch {
147163
}
148164
});
149165
this.currentSize += 200;
166+
this.incrementBucket(bd.bucket, op_id);
150167
}
151168
}
152169

@@ -277,6 +294,14 @@ export class PersistedBatch {
277294
});
278295
}
279296

297+
if (this.bucketStates.size > 0) {
298+
await db.bucket_state.bulkWrite(this.getBucketStateUpdates(), {
299+
session,
300+
// Per-bucket operation - order doesn't matter
301+
ordered: false
302+
});
303+
}
304+
280305
const duration = performance.now() - startAt;
281306
logger.info(
282307
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
@@ -287,7 +312,34 @@ export class PersistedBatch {
287312
this.bucketData = [];
288313
this.bucketParameters = [];
289314
this.currentData = [];
315+
this.bucketStates.clear();
290316
this.currentSize = 0;
291317
this.debugLastOpId = null;
292318
}
319+
320+
private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {
321+
return Array.from(this.bucketStates.entries()).map(([bucket, state]) => {
322+
return {
323+
updateOne: {
324+
filter: {
325+
_id: {
326+
g: this.group_id,
327+
b: bucket
328+
}
329+
},
330+
update: {
331+
$set: {
332+
last_op: state.lastOp
333+
}
334+
},
335+
upsert: true
336+
}
337+
} satisfies mongo.AnyBulkWriteOperation<BucketStateDocument>;
338+
});
339+
}
340+
}
341+
342+
interface BucketStateUpdate {
343+
lastOp: InternalOpId;
344+
incrementCount: number;
293345
}

0 commit comments

Comments
 (0)