Skip to content

Commit 3eda581

Browse files
authored
Merge pull request #160 from powersync-ja/fix-download-race-condition
Fix checkpoints during uploads not being applied
2 parents fc06c3b + 2761784 commit 3eda581

File tree

10 files changed

+258
-49
lines changed

10 files changed

+258
-49
lines changed

.github/workflows/test.yml

+11-1
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,14 @@ jobs:
4444
./gradlew \
4545
-PGITHUB_PUBLISH_TOKEN=${{ secrets.GITHUB_TOKEN }} \
4646
${{ matrix.targets }}
47-
shell: bash
47+
shell: bash
48+
49+
# Credit: https://github.com/gradle/actions/issues/76#issuecomment-2007584323
50+
- name: Upload reports on failure
51+
if: failure()
52+
uses: actions/upload-artifact@v4
53+
with:
54+
name: report-for-${{ matrix.os }}
55+
path: |
56+
**/build/reports/
57+
**/build/test-results/

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
```
99
Sqlite operation failure database is locked attempted to run migration and failed. closing connection
1010
```
11+
* Fix race condition causing data received during uploads not to be applied.
1112

1213
## 1.0.0-BETA28
1314

core/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ tasks.withType<KotlinTest> {
346346
testLogging {
347347
events("PASSED", "FAILED", "SKIPPED")
348348
exceptionFormat = TestExceptionFormat.FULL
349+
showCauses = true
349350
showStandardStreams = true
350351
showStackTraces = true
351352
}

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

+116-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import com.powersync.bucket.BucketPriority
77
import com.powersync.bucket.Checkpoint
88
import com.powersync.bucket.OpType
99
import com.powersync.bucket.OplogEntry
10+
import com.powersync.bucket.WriteCheckpointData
11+
import com.powersync.bucket.WriteCheckpointResponse
1012
import com.powersync.db.PowerSyncDatabaseImpl
1113
import com.powersync.db.schema.Schema
1214
import com.powersync.sync.SyncLine
@@ -17,6 +19,7 @@ import com.powersync.utils.JsonUtil
1719
import dev.mokkery.verify
1820
import io.kotest.matchers.collections.shouldHaveSize
1921
import io.kotest.matchers.shouldBe
22+
import kotlinx.coroutines.CompletableDeferred
2023
import kotlinx.coroutines.DelicateCoroutinesApi
2124
import kotlinx.serialization.encodeToString
2225
import kotlin.test.Test
@@ -35,7 +38,7 @@ class SyncIntegrationTest {
3538
@OptIn(DelicateCoroutinesApi::class)
3639
fun closesResponseStreamOnDatabaseClose() =
3740
databaseTest {
38-
val syncStream = syncStream()
41+
val syncStream = database.syncStream()
3942
database.connectInternal(syncStream, 1000L)
4043

4144
turbineScope(timeout = 10.0.seconds) {
@@ -55,7 +58,7 @@ class SyncIntegrationTest {
5558
@OptIn(DelicateCoroutinesApi::class)
5659
fun cleansResourcesOnDisconnect() =
5760
databaseTest {
58-
val syncStream = syncStream()
61+
val syncStream = database.syncStream()
5962
database.connectInternal(syncStream, 1000L)
6063

6164
turbineScope(timeout = 10.0.seconds) {
@@ -77,7 +80,7 @@ class SyncIntegrationTest {
7780
@Test
7881
fun cannotUpdateSchemaWhileConnected() =
7982
databaseTest {
80-
val syncStream = syncStream()
83+
val syncStream = database.syncStream()
8184
database.connectInternal(syncStream, 1000L)
8285

8386
turbineScope(timeout = 10.0.seconds) {
@@ -96,7 +99,7 @@ class SyncIntegrationTest {
9699
@Test
97100
fun testPartialSync() =
98101
databaseTest {
99-
val syncStream = syncStream()
102+
val syncStream = database.syncStream()
100103
database.connectInternal(syncStream, 1000L)
101104

102105
val checksums =
@@ -188,7 +191,7 @@ class SyncIntegrationTest {
188191
@Test
189192
fun testRemembersLastPartialSync() =
190193
databaseTest {
191-
val syncStream = syncStream()
194+
val syncStream = database.syncStream()
192195
database.connectInternal(syncStream, 1000L)
193196

194197
syncLines.send(
@@ -225,7 +228,7 @@ class SyncIntegrationTest {
225228
@Test
226229
fun setsDownloadingState() =
227230
databaseTest {
228-
val syncStream = syncStream()
231+
val syncStream = database.syncStream()
229232
database.connectInternal(syncStream, 1000L)
230233

231234
turbineScope(timeout = 10.0.seconds) {
@@ -258,7 +261,7 @@ class SyncIntegrationTest {
258261
fun setsConnectingState() =
259262
databaseTest {
260263
turbineScope(timeout = 10.0.seconds) {
261-
val syncStream = syncStream()
264+
val syncStream = database.syncStream()
262265
val turbine = database.currentStatus.asFlow().testIn(this)
263266

264267
database.connectInternal(syncStream, 1000L)
@@ -274,7 +277,7 @@ class SyncIntegrationTest {
274277
@Test
275278
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
276279
databaseTest {
277-
val syncStream = syncStream()
280+
val syncStream = database.syncStream()
278281
database.connectInternal(syncStream, 1000L)
279282

280283
turbineScope(timeout = 10.0.seconds) {
@@ -404,4 +407,109 @@ class SyncIntegrationTest {
404407
turbine.cancel()
405408
}
406409
}
410+
411+
@Test
412+
@OptIn(ExperimentalKermitApi::class)
413+
fun `handles checkpoints during uploads`() =
414+
databaseTest {
415+
val testConnector = TestConnector()
416+
connector = testConnector
417+
database.connectInternal(database.syncStream(), 1000L)
418+
419+
suspend fun expectUserRows(amount: Int) {
420+
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
421+
assertEquals(amount, row.toInt())
422+
}
423+
424+
val completeUpload = CompletableDeferred<Unit>()
425+
val uploadStarted = CompletableDeferred<Unit>()
426+
testConnector.uploadDataCallback = { db ->
427+
db.getCrudBatch()?.let { batch ->
428+
uploadStarted.complete(Unit)
429+
completeUpload.await()
430+
batch.complete.invoke(null)
431+
}
432+
}
433+
434+
// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
435+
// connected).
436+
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
437+
syncLines.send(SyncLine.KeepAlive(1234))
438+
expectUserRows(1)
439+
uploadStarted.await()
440+
441+
// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
442+
syncLines.send(
443+
SyncLine.FullCheckpoint(
444+
Checkpoint(
445+
writeCheckpoint = "1",
446+
lastOpId = "2",
447+
checksums = listOf(BucketChecksum("a", checksum = 0)),
448+
),
449+
),
450+
)
451+
turbineScope {
452+
val turbine = database.currentStatus.asFlow().testIn(this)
453+
turbine.waitFor { it.downloading }
454+
turbine.cancelAndIgnoreRemainingEvents()
455+
}
456+
457+
syncLines.send(
458+
SyncLine.SyncDataBucket(
459+
bucket = "a",
460+
data =
461+
listOf(
462+
OplogEntry(
463+
checksum = 0,
464+
opId = "1",
465+
op = OpType.PUT,
466+
rowId = "1",
467+
rowType = "users",
468+
data = """{"id": "test1", "name": "from local", "email": ""}""",
469+
),
470+
OplogEntry(
471+
checksum = 0,
472+
opId = "2",
473+
op = OpType.PUT,
474+
rowId = "2",
475+
rowType = "users",
476+
data = """{"id": "test1", "name": "additional entry", "email": ""}""",
477+
),
478+
),
479+
after = null,
480+
nextAfter = null,
481+
hasMore = false,
482+
),
483+
)
484+
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))
485+
486+
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
487+
waitFor {
488+
assertNotNull(
489+
logWriter.logs.find {
490+
it.message.contains("Could not apply checkpoint due to local data")
491+
},
492+
)
493+
}
494+
database.expectUserCount(1)
495+
496+
// Mark the upload as completed, this should trigger a write_checkpoint.json request
497+
val requestedCheckpoint = CompletableDeferred<Unit>()
498+
checkpointResponse = {
499+
requestedCheckpoint.complete(Unit)
500+
WriteCheckpointResponse(WriteCheckpointData("1"))
501+
}
502+
completeUpload.complete(Unit)
503+
requestedCheckpoint.await()
504+
505+
// This should apply the checkpoint
506+
turbineScope {
507+
val turbine = database.currentStatus.asFlow().testIn(this)
508+
turbine.waitFor { !it.downloading }
509+
turbine.cancelAndIgnoreRemainingEvents()
510+
}
511+
512+
// Meaning that the two rows are now visible
513+
database.expectUserCount(2)
514+
}
407515
}

core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt

+10-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import co.touchlab.kermit.TestConfig
88
import co.touchlab.kermit.TestLogWriter
99
import com.powersync.DatabaseDriverFactory
1010
import com.powersync.PowerSyncDatabase
11+
import com.powersync.bucket.WriteCheckpointData
12+
import com.powersync.bucket.WriteCheckpointResponse
1113
import com.powersync.connectors.PowerSyncBackendConnector
1214
import com.powersync.connectors.PowerSyncCredentials
1315
import com.powersync.db.PowerSyncDatabaseImpl
@@ -75,6 +77,9 @@ internal class ActiveDatabaseTest(
7577
)
7678

7779
val syncLines = Channel<SyncLine>()
80+
var checkpointResponse: () -> WriteCheckpointResponse = {
81+
WriteCheckpointResponse(WriteCheckpointData("1000"))
82+
}
7883

7984
val testDirectory by lazy { getTempDir() }
8085
val databaseName by lazy {
@@ -84,7 +89,7 @@ internal class ActiveDatabaseTest(
8489
"db-$suffix"
8590
}
8691

87-
val connector =
92+
var connector =
8893
mock<PowerSyncBackendConnector> {
8994
everySuspend { getCredentialsCached() } returns
9095
PowerSyncCredentials(
@@ -113,16 +118,17 @@ internal class ActiveDatabaseTest(
113118

114119
suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } }
115120

116-
fun syncStream(): SyncStream {
117-
val client = MockSyncService(syncLines)
121+
fun PowerSyncDatabase.syncStream(): SyncStream {
122+
val client = MockSyncService(syncLines) { checkpointResponse() }
118123
return SyncStream(
119124
bucketStorage = database.bucketStorage,
120125
connector = connector,
121126
httpEngine = client,
122-
uploadCrud = { },
127+
uploadCrud = { connector.uploadData(this) },
123128
retryDelayMs = 10,
124129
logger = logger,
125130
params = JsonObject(emptyMap()),
131+
scope = scope,
126132
)
127133
}
128134

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ internal class PowerSyncDatabaseImpl(
160160
retryDelayMs = retryDelayMs,
161161
logger = logger,
162162
params = params.toJsonObject(),
163+
scope = scope,
163164
),
164165
crudThrottleMs,
165166
)
@@ -232,7 +233,7 @@ internal class PowerSyncDatabaseImpl(
232233
.filter { it.contains(InternalTable.CRUD.toString()) }
233234
.throttle(crudThrottleMs)
234235
.collect {
235-
stream.triggerCrudUpload()
236+
stream.triggerCrudUploadAsync().join()
236237
}
237238
}
238239
}

0 commit comments

Comments
 (0)