Skip to content

Commit 66fed0f

Browse files
committed
Fix handling of checkpoints only containing a write checkpoint update.
1 parent 1a76b2c commit 66fed0f

File tree

1 file changed

+11
-23
lines changed

1 file changed

+11
-23
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ export class BucketChecksumState {
7070
const storage = this.bucketStorage;
7171

7272
const update = await this.parameterState.getCheckpointUpdate(next);
73-
if (update == null) {
73+
if (update == null && this.lastWriteCheckpoint == writeCheckpoint) {
7474
return null;
7575
}
76-
7776
const { buckets: allBuckets, updatedBuckets } = update;
7877

7978
let dataBucketsNew = new Map<string, BucketSyncState>();
@@ -115,16 +114,19 @@ export class BucketChecksumState {
115114
}
116115
}
117116

118-
let updatedChecksums = await storage.getChecksums(base.checkpoint, checksumLookups);
119-
for (let [bucket, value] of updatedChecksums.entries()) {
120-
newChecksums.set(bucket, value);
117+
if (checksumLookups.length > 0) {
118+
let updatedChecksums = await storage.getChecksums(base.checkpoint, checksumLookups);
119+
for (let [bucket, value] of updatedChecksums.entries()) {
120+
newChecksums.set(bucket, value);
121+
}
121122
}
122123
checksumMap = newChecksums;
123124
} else {
124125
// Re-check all buckets
125126
const bucketList = [...dataBucketsNew.keys()];
126127
checksumMap = await storage.getChecksums(base.checkpoint, bucketList);
127128
}
129+
128130
// Subset of buckets for which there may be new data in this batch.
129131
let bucketsToFetch: BucketDescription[];
130132

@@ -291,19 +293,15 @@ export class BucketParameterState {
291293
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
292294
}
293295

294-
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate | null> {
296+
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
295297
const querier = this.querier;
296-
let update: CheckpointUpdate | null;
298+
let update: CheckpointUpdate;
297299
if (querier.hasDynamicBuckets) {
298300
update = await this.getCheckpointUpdateDynamic(checkpoint);
299301
} else {
300302
update = await this.getCheckpointUpdateStatic(checkpoint);
301303
}
302304

303-
if (update == null) {
304-
return null;
305-
}
306-
307305
if (update.buckets.length > this.context.maxParameterQueryResults) {
308306
// TODO: Limit number of results even before we get to this point
309307
// This limit applies _before_ we get the unique set
@@ -325,9 +323,7 @@ export class BucketParameterState {
325323
/**
326324
* For static buckets, we can keep track of which buckets have been updated.
327325
*/
328-
private async getCheckpointUpdateStatic(
329-
checkpoint: storage.StorageCheckpointUpdate
330-
): Promise<CheckpointUpdate | null> {
326+
private async getCheckpointUpdateStatic(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
331327
const querier = this.querier;
332328
const update = checkpoint.update;
333329

@@ -339,12 +335,6 @@ export class BucketParameterState {
339335
}
340336

341337
const updatedBuckets = new Set<string>(getIntersection(this.staticBuckets, update.updatedDataBuckets));
342-
343-
if (updatedBuckets.size == 0) {
344-
// No change - skip this checkpoint
345-
return null;
346-
}
347-
348338
return {
349339
buckets: querier.staticBuckets,
350340
updatedBuckets
@@ -354,9 +344,7 @@ export class BucketParameterState {
354344
/**
355345
* For dynamic buckets, we need to re-query the list of buckets every time.
356346
*/
357-
private async getCheckpointUpdateDynamic(
358-
checkpoint: storage.StorageCheckpointUpdate
359-
): Promise<CheckpointUpdate | null> {
347+
private async getCheckpointUpdateDynamic(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
360348
const querier = this.querier;
361349
const storage = this.bucketStorage;
362350
const staticBuckets = querier.staticBuckets;

0 commit comments

Comments
 (0)