Skip to content

Commit 1115e65

Browse files
committed
Fix reported progress around compaction
1 parent d61e679 commit 1115e65

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,61 @@ class SyncProgressTest {
282282
syncLines.close()
283283
}
284284

285+
@Test
286+
fun interruptedWithDefrag() =
287+
databaseTest {
288+
database.connect(connector)
289+
290+
turbineScope {
291+
val turbine = database.currentStatus.asFlow().testIn(this)
292+
turbine.waitFor { it.connected && !it.downloading }
293+
syncLines.send(
294+
SyncLine.FullCheckpoint(
295+
Checkpoint(
296+
lastOpId = "10",
297+
checksums = listOf(bucket("a", 10)),
298+
),
299+
),
300+
)
301+
turbine.expectProgress(0 to 10)
302+
303+
addDataLine("a", 5)
304+
turbine.expectProgress(5 to 10)
305+
306+
turbine.cancel()
307+
}
308+
309+
// Close and re-connect
310+
database.close()
311+
syncLines.close()
312+
database = openDatabase()
313+
syncLines = Channel()
314+
database.connect(connector)
315+
316+
turbineScope {
317+
val turbine = database.currentStatus.asFlow().testIn(this)
318+
turbine.waitFor { it.connected && !it.downloading }
319+
320+
// A sync rule deploy could reset buckets, making the new bucket smaller than the
321+
// existing one.
322+
syncLines.send(
323+
SyncLine.FullCheckpoint(
324+
Checkpoint(
325+
lastOpId = "14",
326+
checksums = listOf(bucket("a", 4)),
327+
),
328+
),
329+
)
330+
331+
// In this special case, don't report 5/4 as progress
332+
turbine.expectProgress(0 to 4)
333+
turbine.cancel()
334+
}
335+
336+
database.close()
337+
syncLines.close()
338+
}
339+
285340
@Test
286341
fun differentPriorities() =
287342
databaseTest {

core/src/commonMain/kotlin/com/powersync/sync/Progress.kt

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.powersync.sync
33
import com.powersync.bucket.BucketPriority
44
import com.powersync.bucket.Checkpoint
55
import com.powersync.bucket.LocalOperationCounters
6+
import kotlin.math.min
67

78
/**
89
* Information about a progressing download.
@@ -83,18 +84,38 @@ public data class SyncDownloadProgress private constructor(
8384
*/
8485
internal constructor(localProgress: Map<String, LocalOperationCounters>, target: Checkpoint) : this(
8586
buildMap {
87+
var invalidated = false
88+
8689
for (entry in target.checksums) {
8790
val savedProgress = localProgress[entry.bucket]
91+
val atLast = savedProgress?.atLast ?: 0
92+
val sinceLast = savedProgress?.sinceLast ?: 0
8893

8994
put(
9095
entry.bucket,
9196
BucketProgress(
9297
priority = entry.priority,
93-
atLast = savedProgress?.atLast ?: 0,
94-
sinceLast = savedProgress?.sinceLast ?: 0,
9598
targetCount = entry.count ?: 0,
99+
atLast = atLast,
100+
sinceLast = sinceLast,
96101
),
97102
)
103+
104+
entry.count?.let { knownCount ->
105+
if (knownCount < atLast + sinceLast) {
106+
// Either due to a defrag / sync rule deploy or a compaction operation, the
107+
// size of the bucket shrank so much that the local ops exceed the ops in
108+
// the updated bucket. We can't possibly report progress in this case (it
109+
// would overshoot 100%).
110+
invalidated = true
111+
}
112+
}
113+
}
114+
115+
if (invalidated) {
116+
for ((key, value) in entries) {
117+
put(key, value.copy(sinceLast = 0, atLast = 0))
118+
}
98119
}
99120
},
100121
)
@@ -120,7 +141,7 @@ public data class SyncDownloadProgress private constructor(
120141
put(
121142
bucket.bucket,
122143
previous.copy(
123-
sinceLast = previous.sinceLast + bucket.data.size,
144+
sinceLast = min(previous.sinceLast + bucket.data.size, previous.total),
124145
),
125146
)
126147
}

0 commit comments

Comments
 (0)