Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6ebefa9

Browse files
committedMar 11, 2025··
Improve parameter query filtering.
1 parent 74e66e3 commit 6ebefa9

File tree

10 files changed

+114
-66
lines changed

10 files changed

+114
-66
lines changed
 

‎modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

+20-14
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import {
1818
ReplicationCheckpoint,
1919
storage,
2020
utils,
21-
WatchWriteCheckpointOptions
21+
WatchWriteCheckpointOptions,
22+
CHECKPOINT_INVALIDATE_ALL,
23+
deserializeParameterLookup
2224
} from '@powersync/service-core';
2325
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
2426
import * as bson from 'bson';
@@ -39,6 +41,7 @@ import { MongoBucketBatch } from './MongoBucketBatch.js';
3941
import { MongoCompactor } from './MongoCompactor.js';
4042
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
4143
import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js';
44+
import { JSONBig } from '@powersync/service-jsonbig';
4245

4346
export class MongoSyncBucketStorage
4447
extends BaseObserver<storage.SyncRulesBucketStorageListener>
@@ -795,12 +798,7 @@ export class MongoSyncBucketStorage
795798

796799
const updates: CheckpointChanges =
797800
lastCheckpoint == null
798-
? {
799-
invalidateDataBuckets: true,
800-
invalidateParameterBuckets: true,
801-
updatedDataBuckets: [],
802-
updatedParameterBucketDefinitions: []
803-
}
801+
? CHECKPOINT_INVALIDATE_ALL
804802
: await this.getCheckpointChanges({
805803
lastCheckpoint: lastCheckpoint,
806804
nextCheckpoint: checkpoint
@@ -877,7 +875,7 @@ export class MongoSyncBucketStorage
877875
// Or we could store updated buckets in a separate collection, and query those.
878876
// For now, we ignore this optimization
879877

880-
// const dataBuckets = await this.db.bucket_data
878+
// const dataBucketDocuments = await this.db.bucket_data
881879
// .find(
882880
// {
883881
// '_id.g': this.group_id,
@@ -894,6 +892,14 @@ export class MongoSyncBucketStorage
894892
// )
895893
// .toArray();
896894

895+
// const buckets = dataBucketDocuments.map((doc) => doc._id.b);
896+
// const invalidateDataBuckets = buckets.length > 1000;
897+
898+
// return {
899+
// invalidateDataBuckets: invalidateDataBuckets,
900+
// updatedDataBuckets: invalidateDataBuckets ? [] : buckets
901+
// };
902+
897903
return {
898904
invalidateDataBuckets: true,
899905
updatedDataBuckets: []
@@ -902,7 +908,7 @@ export class MongoSyncBucketStorage
902908

903909
private async getParameterBucketChanges(
904910
options: GetCheckpointChangesOptions
905-
): Promise<Pick<CheckpointChanges, 'updatedParameterBucketDefinitions' | 'invalidateParameterBuckets'>> {
911+
): Promise<Pick<CheckpointChanges, 'updatedParameterLookups' | 'invalidateParameterBuckets'>> {
906912
// TODO: limit max query running time
907913
const parameterUpdates = await this.db.bucket_parameters
908914
.find(
@@ -924,9 +930,9 @@ export class MongoSyncBucketStorage
924930

925931
return {
926932
invalidateParameterBuckets: invalidateParameterUpdates,
927-
updatedParameterBucketDefinitions: invalidateParameterUpdates
928-
? []
929-
: [...new Set<string>(parameterUpdates.map((p) => getLookupBucketDefinitionName(p.lookup)))]
933+
updatedParameterLookups: invalidateParameterUpdates
934+
? new Set<string>()
935+
: new Set<string>(parameterUpdates.map((p) => JSONBig.stringify(deserializeParameterLookup(p.lookup))))
930936
};
931937
}
932938

@@ -937,7 +943,7 @@ export class MongoSyncBucketStorage
937943
max: 50,
938944
maxSize: 10 * 1024 * 1024,
939945
sizeCalculation: (value: CheckpointChanges) => {
940-
return 100 + value.updatedParameterBucketDefinitions.reduce<number>((a, b) => a + b.length, 0);
946+
return 100 + [...value.updatedParameterLookups].reduce<number>((a, b) => a + b.length, 0);
941947
},
942948
fetchMethod: async (_key, _staleValue, options) => {
943949
return this.getCheckpointChangesInternal(options.context.options);
@@ -966,7 +972,7 @@ export class MongoSyncBucketStorage
966972
invalidateDataBuckets: true,
967973
updatedDataBuckets: [],
968974
invalidateParameterBuckets: false,
969-
updatedParameterBucketDefinitions: []
975+
updatedParameterLookups: new Set<string>()
970976
};
971977
}
972978
const key = `${options.lastCheckpoint}_${options.nextCheckpoint}`;

‎packages/service-core/src/storage/SyncRulesBucketStorage.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,14 @@ export interface GetCheckpointChangesOptions {
253253
export interface CheckpointChanges {
254254
updatedDataBuckets: string[];
255255
invalidateDataBuckets: boolean;
256-
updatedParameterBucketDefinitions: string[];
256+
/** Serialized using JSONBig */
257+
updatedParameterLookups: Set<string>;
257258
invalidateParameterBuckets: boolean;
258259
}
259260

260261
export const CHECKPOINT_INVALIDATE_ALL: CheckpointChanges = {
261262
updatedDataBuckets: [],
262263
invalidateDataBuckets: true,
263-
updatedParameterBucketDefinitions: [],
264+
updatedParameterLookups: new Set<string>(),
264265
invalidateParameterBuckets: true
265266
};

‎packages/service-core/src/storage/bson.ts

+7-9
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,20 @@ export const BSON_DESERIALIZE_DATA_OPTIONS: bson.DeserializeOptions = {
2525
* @param lookup
2626
*/
2727
export const serializeLookupBuffer = (lookup: SqliteJsonValue[]): NodeBuffer => {
28-
const normalized = lookup.map((value) => {
29-
if (typeof value == 'number' && Number.isInteger(value)) {
30-
return BigInt(value);
31-
} else {
32-
return value;
33-
}
34-
});
35-
return bson.serialize({ l: normalized }) as NodeBuffer;
28+
return bson.serialize({ l: lookup }) as NodeBuffer;
3629
};
3730

3831
export const serializeLookup = (lookup: SqliteJsonValue[]) => {
3932
return new bson.Binary(serializeLookupBuffer(lookup));
4033
};
4134

42-
export const getLookupBucketDefinitionName = (lookup: bson.Binary) => {
35+
export const deserializeParameterLookup = (lookup: bson.Binary) => {
4336
const parsed = bson.deserialize(lookup.buffer, BSON_DESERIALIZE_INTERNAL_OPTIONS).l as SqliteJsonValue[];
37+
return parsed;
38+
};
39+
40+
export const getLookupBucketDefinitionName = (lookup: bson.Binary) => {
41+
const parsed = deserializeParameterLookup(lookup);
4442
return parsed[0] as string;
4543
};
4644

‎packages/service-core/src/sync/BucketChecksumState.ts

+40-19
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersy
77
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
88
import { BucketSyncState } from './sync.js';
99
import { SyncContext } from './SyncContext.js';
10+
import { JSONBig } from '@powersync/service-jsonbig';
11+
import { hasIntersection } from './util.js';
1012

1113
export interface BucketChecksumStateOptions {
1214
syncContext: SyncContext;
@@ -269,6 +271,9 @@ export class BucketParameterState {
269271
private readonly querier: BucketParameterQuerier;
270272
private readonly staticBuckets: Map<string, BucketDescription>;
271273
private cachedDynamicBuckets: BucketDescription[] | null = null;
274+
private cachedDynamicBucketSet: Set<string> | null = null;
275+
276+
private readonly lookups: Set<string>;
272277

273278
constructor(
274279
context: SyncContext,
@@ -283,6 +288,7 @@ export class BucketParameterState {
283288

284289
this.querier = syncRules.getBucketParameterQuerier(this.syncParams);
285290
this.staticBuckets = new Map<string, BucketDescription>(this.querier.staticBuckets.map((b) => [b.bucket, b]));
291+
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l)));
286292
}
287293

288294
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate | null> {
@@ -362,46 +368,61 @@ export class BucketParameterState {
362368
const staticBuckets = querier.staticBuckets;
363369
const update = checkpoint.update;
364370

365-
let hasDataChange = false;
366371
let hasParameterChange = false;
367-
if (update.invalidateDataBuckets || update.updatedDataBuckets?.length > 0) {
368-
hasDataChange = true;
372+
let invalidateDataBuckets = false;
373+
// If hasParameterChange == true, then invalidateDataBuckets = true
374+
// If invalidateDataBuckets == true, we ignore updatedBuckets
375+
let updatedBuckets = new Set<string>();
376+
377+
if (update.invalidateDataBuckets) {
378+
invalidateDataBuckets = true;
369379
}
370380

371381
if (update.invalidateParameterBuckets) {
372382
hasParameterChange = true;
373383
} else {
374-
for (let bucket of update.updatedParameterBucketDefinitions) {
375-
// This is a very coarse check, but helps
376-
if (querier.dynamicBucketDefinitions.has(bucket)) {
377-
hasParameterChange = true;
378-
break;
379-
}
384+
if (hasIntersection(this.lookups, update.updatedParameterLookups)) {
385+
// This is a very coarse re-check of all queries
386+
hasParameterChange = true;
380387
}
381388
}
382389

383-
if (!hasDataChange && !hasParameterChange) {
384-
return null;
385-
}
386-
387390
let dynamicBuckets: BucketDescription[];
388-
if (hasParameterChange || this.cachedDynamicBuckets == null) {
391+
if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) {
389392
dynamicBuckets = await querier.queryDynamicBucketDescriptions({
390393
getParameterSets(lookups) {
391394
return storage.getParameterSets(checkpoint.base.checkpoint, lookups);
392395
}
393396
});
394397
this.cachedDynamicBuckets = dynamicBuckets;
398+
this.cachedDynamicBucketSet = new Set<string>(dynamicBuckets.map((b) => b.bucket));
399+
invalidateDataBuckets = true;
395400
} else {
396401
dynamicBuckets = this.cachedDynamicBuckets;
402+
403+
if (!invalidateDataBuckets) {
404+
// TODO: Do set intersection instead
405+
for (let bucket of update.updatedDataBuckets ?? []) {
406+
if (this.staticBuckets.has(bucket) || this.cachedDynamicBucketSet.has(bucket)) {
407+
updatedBuckets.add(bucket);
408+
}
409+
}
410+
}
397411
}
398412
const allBuckets = [...staticBuckets, ...dynamicBuckets];
399413

400-
return {
401-
buckets: allBuckets,
402-
// We cannot track individual bucket updates for dynamic lookups yet
403-
updatedBuckets: null
404-
};
414+
if (invalidateDataBuckets) {
415+
return {
416+
buckets: allBuckets,
417+
// We cannot track individual bucket updates for dynamic lookups yet
418+
updatedBuckets: null
419+
};
420+
} else {
421+
return {
422+
buckets: allBuckets,
423+
updatedBuckets: updatedBuckets
424+
};
425+
}
405426
}
406427
}
407428

‎packages/service-core/src/sync/util.ts

+12
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,15 @@ export function settledPromise<T>(promise: Promise<T>): Promise<PromiseSettledRe
153153
}
154154
);
155155
}
156+
157+
export function hasIntersection<T>(a: Set<T>, b: Set<T>) {
158+
if (a.size > b.size) {
159+
[a, b] = [b, a];
160+
}
161+
// Now, a is always smaller than b, so iterate over a
162+
for (let value of a) {
163+
if (b.has(value)) {
164+
return true;
165+
}
166+
}
167+
}

‎packages/sync-rules/src/BucketParameterQuerier.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ export interface BucketParameterQuerier {
1919
* True if there are dynamic buckets, meaning queryDynamicBucketDescriptions() should be used.
2020
*
2121
* If this is false, queryDynamicBucketDescriptions() will always return an empty array,
22-
* and dynamicBucketDefinitions.size == 0.
22+
* and parameterQueryLookups.length == 0.
2323
*/
2424
readonly hasDynamicBuckets: boolean;
2525

26-
readonly dynamicBucketDefinitions: Set<string>;
26+
readonly parameterQueryLookups: SqliteJsonValue[][];
2727

2828
/**
2929
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
@@ -47,11 +47,11 @@ export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
4747
}
4848

4949
export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[]): BucketParameterQuerier {
50-
const dynamicBucketDefinitions = new Set<string>(queriers.flatMap((q) => [...q.dynamicBucketDefinitions]));
50+
const parameterQueryLookups = queriers.flatMap((q) => q.parameterQueryLookups);
5151
return {
5252
staticBuckets: queriers.flatMap((q) => q.staticBuckets),
53-
hasDynamicBuckets: dynamicBucketDefinitions.size > 0,
54-
dynamicBucketDefinitions,
53+
hasDynamicBuckets: parameterQueryLookups.length > 0,
54+
parameterQueryLookups: parameterQueryLookups,
5555
async queryDynamicBucketDescriptions(source: ParameterLookupSource) {
5656
let results: BucketDescription[] = [];
5757
for (let q of queriers) {

‎packages/sync-rules/src/SqlBucketDescriptor.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export class SqlBucketDescriptor {
113113
const staticQuerier = {
114114
staticBuckets,
115115
hasDynamicBuckets: false,
116-
dynamicBucketDefinitions: new Set<string>(),
116+
parameterQueryLookups: [],
117117
queryDynamicBucketDescriptions: async () => []
118118
} satisfies BucketParameterQuerier;
119119

‎packages/sync-rules/src/SqlParameterQuery.ts

+7-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import {
2323
SqliteJsonValue,
2424
SqliteRow
2525
} from './types.js';
26-
import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement } from './utils.js';
26+
import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement, normalizeParameterValue } from './utils.js';
2727

2828
/**
2929
* Represents a parameter query, such as:
@@ -308,7 +308,7 @@ export class SqlParameterQuery {
308308
const value = param.parametersToLookupValue(parameters);
309309

310310
if (isJsonValue(value)) {
311-
return value;
311+
return normalizeParameterValue(value);
312312
} else {
313313
valid = false;
314314
return null;
@@ -339,17 +339,18 @@ export class SqlParameterQuery {
339339
.map((expandedValue) => {
340340
let lookup: SqliteJsonValue[] = [this.descriptor_name!, this.id!];
341341
let valid = true;
342+
const normalizedExpandedValue = normalizeParameterValue(expandedValue);
342343
lookup.push(
343344
...this.input_parameters!.map((param): SqliteJsonValue => {
344345
if (param == this.expanded_input_parameter) {
345346
// Expand array value
346-
return expandedValue;
347+
return normalizedExpandedValue;
347348
} else {
348349
// Scalar value
349350
const value = param.parametersToLookupValue(parameters);
350351

351352
if (isJsonValue(value)) {
352-
return value;
353+
return normalizeParameterValue(value);
353354
} else {
354355
valid = false;
355356
return null;
@@ -375,15 +376,15 @@ export class SqlParameterQuery {
375376
return {
376377
staticBuckets: [],
377378
hasDynamicBuckets: false,
378-
dynamicBucketDefinitions: new Set<string>(),
379+
parameterQueryLookups: [],
379380
queryDynamicBucketDescriptions: async () => []
380381
};
381382
}
382383

383384
return {
384385
staticBuckets: [],
385386
hasDynamicBuckets: true,
386-
dynamicBucketDefinitions: new Set<string>([this.descriptor_name!]),
387+
parameterQueryLookups: lookups,
387388
queryDynamicBucketDescriptions: async (source: ParameterLookupSource) => {
388389
const bucketParameters = await source.getParameterSets(lookups);
389390
return this.resolveBucketDescriptions(bucketParameters, requestParameters);

‎packages/sync-rules/src/utils.ts

+10
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,13 @@ export const JSONBucketNameSerialize = {
170170
return stringifyRaw(value, replacer, space)!;
171171
}
172172
};
173+
174+
/**
175+
* Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers.
176+
*/
177+
export function normalizeParameterValue(value: SqliteJsonValue): SqliteJsonValue {
178+
if (typeof value == 'number' && Number.isInteger(value)) {
179+
return BigInt(value);
180+
}
181+
return value;
182+
}

0 commit comments

Comments
 (0)
Please sign in to comment.