Skip to content

Commit 6320534

Browse files
committed
Cache dynamic bucket lookups.
1 parent b23a3eb commit 6320534

File tree

2 files changed

+85
-33
lines changed

2 files changed

+85
-33
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

+61-21
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
2828
import { MongoCompactor } from './MongoCompactor.js';
2929
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
3030
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
31+
import { LRUCache } from 'lru-cache';
3132

3233
export class MongoSyncBucketStorage
3334
extends DisposableObserver<storage.SyncRulesBucketStorageListener>
@@ -849,25 +850,40 @@ export class MongoSyncBucketStorage
849850
return pipeline;
850851
}
851852

852-
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
853-
const dataBuckets = await this.db.bucket_data
854-
.find(
855-
{
856-
'_id.g': this.group_id,
857-
'_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
858-
},
859-
{
860-
projection: {
861-
'_id.b': 1
862-
},
863-
limit: 1001,
864-
batchSize: 1001,
865-
singleBatch: true
866-
}
867-
)
868-
.toArray();
869-
const invalidateDataBuckets = dataBuckets.length > 1000;
853+
private async getDataBucketChanges(
854+
options: GetCheckpointChangesOptions
855+
): Promise<Pick<CheckpointChanges, 'updatedDataBuckets' | 'invalidateDataBuckets'>> {
856+
// The query below can be slow, since we don't have an index on _id.o.
857+
// We could try to query the oplog for these, but that is risky.
858+
// Or we could store updated buckets in a separate collection, and query those.
859+
// For now, we ignore this optimization
860+
861+
// const dataBuckets = await this.db.bucket_data
862+
// .find(
863+
// {
864+
// '_id.g': this.group_id,
865+
// '_id.o': { $gt: BigInt(options.lastCheckpoint), $lte: BigInt(options.nextCheckpoint) }
866+
// },
867+
// {
868+
// projection: {
869+
// '_id.b': 1
870+
// },
871+
// limit: 1001,
872+
// batchSize: 1001,
873+
// singleBatch: true
874+
// }
875+
// )
876+
// .toArray();
877+
878+
return {
879+
invalidateDataBuckets: true,
880+
updatedDataBuckets: []
881+
};
882+
}
870883

884+
private async getParameterBucketChanges(
885+
options: GetCheckpointChangesOptions
886+
): Promise<Pick<CheckpointChanges, 'updatedParameterBucketDefinitions' | 'invalidateParameterBuckets'>> {
871887
const parameterUpdates = await this.db.bucket_parameters
872888
.find(
873889
{
@@ -887,13 +903,37 @@ export class MongoSyncBucketStorage
887903
const invalidateParameterUpdates = parameterUpdates.length > 1000;
888904

889905
return {
890-
invalidateDataBuckets,
891-
updatedDataBuckets: invalidateDataBuckets ? [] : dataBuckets.map((b) => b._id.b),
892-
893906
invalidateParameterBuckets: invalidateParameterUpdates,
894907
updatedParameterBucketDefinitions: invalidateParameterUpdates
895908
? []
896909
: [...new Set<string>(parameterUpdates.map((p) => getLookupBucketDefinitionName(p.lookup)))]
897910
};
898911
}
912+
913+
private checkpointChangesCache = new LRUCache<string, CheckpointChanges, { options: GetCheckpointChangesOptions }>({
914+
max: 50,
915+
maxSize: 10 * 1024 * 1024,
916+
sizeCalculation: (value: CheckpointChanges) => {
917+
return value.updatedParameterBucketDefinitions.reduce<number>((a, b) => a + b.length, 0);
918+
},
919+
fetchMethod: async (_key, _staleValue, options) => {
920+
return this.getCheckpointChangesInternal(options.context.options);
921+
}
922+
});
923+
924+
async getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
925+
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;
926+
const result = await this.checkpointChangesCache.fetch(key, { context: { options } });
927+
return result!;
928+
}
929+
930+
private async getCheckpointChangesInternal(options: GetCheckpointChangesOptions): Promise<CheckpointChanges> {
931+
const dataUpdates = await this.getDataBucketChanges(options);
932+
const parameterUpdates = await this.getParameterBucketChanges(options);
933+
934+
return {
935+
...dataUpdates,
936+
...parameterUpdates
937+
};
938+
}
899939
}

packages/service-core/src/sync/BucketChecksumState.ts

+24-12
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ export class BucketParameterState {
253253
private readonly querier: BucketParameterQuerier;
254254
private readonly staticBuckets: Map<string, BucketDescription>;
255255

256+
private cachedDynamicBuckets: BucketDescription[] | null = null;
257+
256258
constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) {
257259
this.bucketStorage = bucketStorage;
258260
this.syncRules = syncRules;
@@ -335,29 +337,39 @@ export class BucketParameterState {
335337
const staticBuckets = querier.staticBuckets;
336338
const update = checkpoint.update;
337339

338-
let hasChange = false;
340+
let hasDataChange = false;
341+
let hasParameterChange = false;
339342
if (update.invalidateDataBuckets || update.updatedDataBuckets?.length > 0) {
340-
hasChange = true;
341-
} else if (update.invalidateParameterBuckets) {
342-
hasChange = true;
343+
hasDataChange = true;
344+
}
345+
346+
if (update.invalidateParameterBuckets) {
347+
hasParameterChange = true;
343348
} else {
344-
for (let bucket of update.updatedParameterBucketDefinitions ?? []) {
349+
for (let bucket of update.updatedParameterBucketDefinitions) {
350+
// This is a very coarse check, but helps
345351
if (querier.dynamicBucketDefinitions.has(bucket)) {
346-
hasChange = true;
352+
hasParameterChange = true;
347353
break;
348354
}
349355
}
350356
}
351357

352-
if (!hasChange) {
358+
if (!hasDataChange && !hasParameterChange) {
353359
return null;
354360
}
355361

356-
const dynamicBuckets = await querier.queryDynamicBucketDescriptions({
357-
getParameterSets(lookups) {
358-
return storage.getParameterSets(checkpoint.base.checkpoint, lookups);
359-
}
360-
});
362+
let dynamicBuckets: BucketDescription[];
363+
if (hasParameterChange || this.cachedDynamicBuckets == null) {
364+
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
365+
getParameterSets(lookups) {
366+
return storage.getParameterSets(checkpoint.base.checkpoint, lookups);
367+
}
368+
});
369+
this.cachedDynamicBuckets = dynamicBuckets;
370+
} else {
371+
dynamicBuckets = this.cachedDynamicBuckets;
372+
}
361373
const allBuckets = [...staticBuckets, ...dynamicBuckets];
362374

363375
return {

0 commit comments

Comments
 (0)