@@ -7,6 +7,8 @@ import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersy
7
7
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js' ;
8
8
import { BucketSyncState } from './sync.js' ;
9
9
import { SyncContext } from './SyncContext.js' ;
10
+ import { JSONBig } from '@powersync/service-jsonbig' ;
11
+ import { hasIntersection } from './util.js' ;
10
12
11
13
export interface BucketChecksumStateOptions {
12
14
syncContext : SyncContext ;
@@ -269,6 +271,9 @@ export class BucketParameterState {
269
271
private readonly querier : BucketParameterQuerier ;
270
272
private readonly staticBuckets : Map < string , BucketDescription > ;
271
273
private cachedDynamicBuckets : BucketDescription [ ] | null = null ;
274
+ private cachedDynamicBucketSet : Set < string > | null = null ;
275
+
276
+ private readonly lookups : Set < string > ;
272
277
273
278
constructor (
274
279
context : SyncContext ,
@@ -283,6 +288,7 @@ export class BucketParameterState {
283
288
284
289
this . querier = syncRules . getBucketParameterQuerier ( this . syncParams ) ;
285
290
this . staticBuckets = new Map < string , BucketDescription > ( this . querier . staticBuckets . map ( ( b ) => [ b . bucket , b ] ) ) ;
291
+ this . lookups = new Set < string > ( this . querier . parameterQueryLookups . map ( ( l ) => JSONBig . stringify ( l ) ) ) ;
286
292
}
287
293
288
294
async getCheckpointUpdate ( checkpoint : storage . StorageCheckpointUpdate ) : Promise < CheckpointUpdate | null > {
@@ -362,46 +368,61 @@ export class BucketParameterState {
362
368
const staticBuckets = querier . staticBuckets ;
363
369
const update = checkpoint . update ;
364
370
365
- let hasDataChange = false ;
366
371
let hasParameterChange = false ;
367
- if ( update . invalidateDataBuckets || update . updatedDataBuckets ?. length > 0 ) {
368
- hasDataChange = true ;
372
+ let invalidateDataBuckets = false ;
373
+ // If hasParameterChange == true, then invalidateDataBuckets = true
374
+ // If invalidateDataBuckets == true, we ignore updatedBuckets
375
+ let updatedBuckets = new Set < string > ( ) ;
376
+
377
+ if ( update . invalidateDataBuckets ) {
378
+ invalidateDataBuckets = true ;
369
379
}
370
380
371
381
if ( update . invalidateParameterBuckets ) {
372
382
hasParameterChange = true ;
373
383
} else {
374
- for ( let bucket of update . updatedParameterBucketDefinitions ) {
375
- // This is a very coarse check, but helps
376
- if ( querier . dynamicBucketDefinitions . has ( bucket ) ) {
377
- hasParameterChange = true ;
378
- break ;
379
- }
384
+ if ( hasIntersection ( this . lookups , update . updatedParameterLookups ) ) {
385
+ // This is a very coarse re-check of all queries
386
+ hasParameterChange = true ;
380
387
}
381
388
}
382
389
383
- if ( ! hasDataChange && ! hasParameterChange ) {
384
- return null ;
385
- }
386
-
387
390
let dynamicBuckets : BucketDescription [ ] ;
388
- if ( hasParameterChange || this . cachedDynamicBuckets == null ) {
391
+ if ( hasParameterChange || this . cachedDynamicBuckets == null || this . cachedDynamicBucketSet == null ) {
389
392
dynamicBuckets = await querier . queryDynamicBucketDescriptions ( {
390
393
getParameterSets ( lookups ) {
391
394
return storage . getParameterSets ( checkpoint . base . checkpoint , lookups ) ;
392
395
}
393
396
} ) ;
394
397
this . cachedDynamicBuckets = dynamicBuckets ;
398
+ this . cachedDynamicBucketSet = new Set < string > ( dynamicBuckets . map ( ( b ) => b . bucket ) ) ;
399
+ invalidateDataBuckets = true ;
395
400
} else {
396
401
dynamicBuckets = this . cachedDynamicBuckets ;
402
+
403
+ if ( ! invalidateDataBuckets ) {
404
+ // TODO: Do set intersection instead
405
+ for ( let bucket of update . updatedDataBuckets ?? [ ] ) {
406
+ if ( this . staticBuckets . has ( bucket ) || this . cachedDynamicBucketSet . has ( bucket ) ) {
407
+ updatedBuckets . add ( bucket ) ;
408
+ }
409
+ }
410
+ }
397
411
}
398
412
const allBuckets = [ ...staticBuckets , ...dynamicBuckets ] ;
399
413
400
- return {
401
- buckets : allBuckets ,
402
- // We cannot track individual bucket updates for dynamic lookups yet
403
- updatedBuckets : null
404
- } ;
414
+ if ( invalidateDataBuckets ) {
415
+ return {
416
+ buckets : allBuckets ,
417
+ // We cannot track individual bucket updates for dynamic lookups yet
418
+ updatedBuckets : null
419
+ } ;
420
+ } else {
421
+ return {
422
+ buckets : allBuckets ,
423
+ updatedBuckets : updatedBuckets
424
+ } ;
425
+ }
405
426
}
406
427
}
407
428
0 commit comments