diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0cc6c341..e9912bc8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,4 +44,14 @@ jobs: ./gradlew \ -PGITHUB_PUBLISH_TOKEN=${{ secrets.GITHUB_TOKEN }} \ ${{ matrix.targets }} - shell: bash \ No newline at end of file + shell: bash + + # Credit: https://github.com/gradle/actions/issues/76#issuecomment-2007584323 + - name: Upload reports on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: report-for-${{ matrix.os }} + path: | + **/build/reports/ + **/build/test-results/ diff --git a/CHANGELOG.md b/CHANGELOG.md index a6554b2b..c634d0e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ``` Sqlite operation failure database is locked attempted to run migration and failed. closing connection ``` +* Fix race condition causing data received during uploads not to be applied. ## 1.0.0-BETA28 diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 57a8b494..fd5ab33c 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -346,6 +346,7 @@ tasks.withType { testLogging { events("PASSED", "FAILED", "SKIPPED") exceptionFormat = TestExceptionFormat.FULL + showCauses = true showStandardStreams = true showStackTraces = true } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 598595e4..3ce4a791 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -7,6 +7,8 @@ import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.WriteCheckpointData +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.sync.SyncLine @@ -17,6 +19,7 @@ import com.powersync.utils.JsonUtil import dev.mokkery.verify import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.shouldBe +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.serialization.encodeToString import kotlin.test.Test @@ -35,7 +38,7 @@ class SyncIntegrationTest { @OptIn(DelicateCoroutinesApi::class) fun closesResponseStreamOnDatabaseClose() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { @@ -55,7 +58,7 @@ class SyncIntegrationTest { @OptIn(DelicateCoroutinesApi::class) fun cleansResourcesOnDisconnect() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { @@ -77,7 +80,7 @@ class SyncIntegrationTest { @Test fun cannotUpdateSchemaWhileConnected() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { @@ -96,7 +99,7 @@ class SyncIntegrationTest { @Test fun testPartialSync() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) val checksums = @@ -188,7 +191,7 @@ class SyncIntegrationTest { @Test fun testRemembersLastPartialSync() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) syncLines.send( @@ -225,7 +228,7 @@ class SyncIntegrationTest { @Test fun setsDownloadingState() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { @@ -258,7 +261,7 @@ class SyncIntegrationTest { fun setsConnectingState() = databaseTest { turbineScope(timeout = 10.0.seconds) { - val syncStream = syncStream() + val syncStream = database.syncStream() val turbine = database.currentStatus.asFlow().testIn(this) database.connectInternal(syncStream, 1000L) @@ -274,7 +277,7 @@ class SyncIntegrationTest { @Test fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = databaseTest { - val syncStream = syncStream() + val syncStream = database.syncStream() database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { @@ -404,4 +407,109 @@ class SyncIntegrationTest { turbine.cancel() } } + + @Test + @OptIn(ExperimentalKermitApi::class) + fun `handles checkpoints during uploads`() = + databaseTest { + val testConnector = TestConnector() + connector = testConnector + database.connectInternal(database.syncStream(), 1000L) + + suspend fun expectUserRows(amount: Int) { + val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } + assertEquals(amount, row.toInt()) + } + + val completeUpload = CompletableDeferred() + val uploadStarted = CompletableDeferred() + testConnector.uploadDataCallback = { db -> + db.getCrudBatch()?.let { batch -> + uploadStarted.complete(Unit) + completeUpload.await() + batch.complete.invoke(null) + } + } + + // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully + // connected). + database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org")) + syncLines.send(SyncLine.KeepAlive(1234)) + expectUserRows(1) + uploadStarted.await() + + // Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns. + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + writeCheckpoint = "1", + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.downloading } + turbine.cancelAndIgnoreRemainingEvents() + } + + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from local", "email": ""}""", + ), + OplogEntry( + checksum = 0, + opId = "2", + op = OpType.PUT, + rowId = "2", + rowType = "users", + data = """{"id": "test1", "name": "additional entry", "email": ""}""", + ), + ), + after = null, + nextAfter = null, + hasMore = false, + ), + ) + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2")) + + // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data. + waitFor { + assertNotNull( + logWriter.logs.find { + it.message.contains("Could not apply checkpoint due to local data") + }, + ) + } + database.expectUserCount(1) + + // Mark the upload as completed, this should trigger a write_checkpoint.json request + val requestedCheckpoint = CompletableDeferred() + checkpointResponse = { + requestedCheckpoint.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("1")) + } + completeUpload.complete(Unit) + requestedCheckpoint.await() + + // This should apply the checkpoint + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { !it.downloading } + turbine.cancelAndIgnoreRemainingEvents() + } + + // Meaning that the two rows are now visible + database.expectUserCount(2) + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 5d164515..298f433c 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -8,6 +8,8 @@ import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase +import com.powersync.bucket.WriteCheckpointData +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.PowerSyncDatabaseImpl @@ -75,6 +77,9 @@ internal class ActiveDatabaseTest( ) val syncLines = Channel() + var checkpointResponse: () -> WriteCheckpointResponse = { + WriteCheckpointResponse(WriteCheckpointData("1000")) + } val testDirectory by lazy { getTempDir() } val databaseName by lazy { @@ -84,7 +89,7 @@ internal class ActiveDatabaseTest( "db-$suffix" } - val connector = + var connector = mock { everySuspend { getCredentialsCached() } returns PowerSyncCredentials( @@ -113,16 +118,17 @@ internal class ActiveDatabaseTest( suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } } - fun syncStream(): SyncStream { - val client = MockSyncService(syncLines) + fun PowerSyncDatabase.syncStream(): SyncStream { + val client = MockSyncService(syncLines) { checkpointResponse() } return SyncStream( bucketStorage = database.bucketStorage, connector = connector, httpEngine = client, - uploadCrud = { }, + uploadCrud = { connector.uploadData(this) }, retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = scope, ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 40973e81..bc47feb8 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -160,6 +160,7 @@ internal class PowerSyncDatabaseImpl( retryDelayMs = retryDelayMs, logger = logger, params = params.toJsonObject(), + scope = scope, ), crudThrottleMs, ) @@ -232,7 +233,7 @@ internal class PowerSyncDatabaseImpl( .filter { it.contains(InternalTable.CRUD.toString()) } .throttle(crudThrottleMs) .collect { - stream.triggerCrudUpload() + stream.triggerCrudUploadAsync().join() } } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 23b3e5ed..60225952 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -1,7 +1,7 @@ package com.powersync.sync import co.touchlab.kermit.Logger -import co.touchlab.stately.concurrency.AtomicBoolean +import co.touchlab.stately.concurrency.AtomicReference import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage @@ -29,9 +29,13 @@ import io.ktor.http.contentType import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch import kotlinx.datetime.Clock import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonObject @@ -43,9 +47,10 @@ internal class SyncStream( private val retryDelayMs: Long = 5000L, private val logger: Logger, private val params: JsonObject, + private val scope: CoroutineScope, httpEngine: HttpClientEngine? = null, ) { - private var isUploadingCrud = AtomicBoolean(false) + private var isUploadingCrud = AtomicReference(null) /** * The current sync status. This instance is updated as changes occur @@ -116,14 +121,26 @@ internal class SyncStream( } } - suspend fun triggerCrudUpload() { - if (!status.connected || isUploadingCrud.value) { - return + fun triggerCrudUploadAsync(): Job = + scope.launch { + val thisIteration = PendingCrudUpload(CompletableDeferred()) + var holdingUploadLock = false + + try { + if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) { + return@launch + } + + holdingUploadLock = true + uploadAllCrud() + } finally { + if (holdingUploadLock) { + isUploadingCrud.set(null) + } + + thisIteration.done.complete(Unit) + } } - isUploadingCrud.value = true - uploadAllCrud() - isUploadingCrud.value = false - } private suspend fun uploadAllCrud() { var checkedCrudItem: CrudEntry? = null @@ -153,8 +170,13 @@ internal class SyncStream( break } } catch (e: Exception) { - logger.e { "Error uploading crud: ${e.message}" } status.update(uploading = false, uploadError = e) + + if (e is CancellationException) { + throw e + } + + logger.e { "Error uploading crud: ${e.message}" } delay(retryDelayMs) break } @@ -237,7 +259,6 @@ internal class SyncStream( validatedCheckpoint = null, appliedCheckpoint = null, bucketSet = initialBuckets.keys.toMutableSet(), - retry = false, ) bucketEntries.forEach { entry -> @@ -253,7 +274,12 @@ internal class SyncStream( streamingSyncRequest(req).collect { value -> val line = JsonUtil.json.decodeFromString(value) + state = handleInstruction(line, value, state) + + if (state.abortIteration) { + return@collect + } } status.update(downloading = false) @@ -314,30 +340,40 @@ internal class SyncStream( } private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { - val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!) + val checkpoint = state.targetCheckpoint!! + var result = bucketStorage.syncLocalDatabase(checkpoint) + val pending = isUploadingCrud.get() + if (!result.checkpointValid) { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off delay(50) - state.retry = true + state.abortIteration = true // TODO handle retries return state - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // landing here the whole time - } else { - state.appliedCheckpoint = state.targetCheckpoint!!.clone() - logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + } else if (!result.ready && pending != null) { + // We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which + // prevented this checkpoint from applying. Wait for that to complete and try again. + logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." } + pending.done.await() + + result = bucketStorage.syncLocalDatabase(checkpoint) } - state.validatedCheckpoint = state.targetCheckpoint - status.update( - lastSyncedAt = Clock.System.now(), - downloading = false, - hasSynced = true, - clearDownloadError = true, - ) + if (result.checkpointValid && result.ready) { + state.appliedCheckpoint = checkpoint.clone() + logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + + state.validatedCheckpoint = state.targetCheckpoint + status.update( + lastSyncedAt = Clock.System.now(), + downloading = false, + hasSynced = true, + clearDownloadError = true, + ) + } else { + logger.d { "Could not apply checkpoint. Waiting for next sync complete line" } + } return state } @@ -352,12 +388,12 @@ internal class SyncStream( // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off delay(50) - state.retry = true + state.abortIteration = true // TODO handle retries return state } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. + // Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this + // once we have a complete checkpoint if the problem persists. } else { logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } } @@ -441,10 +477,11 @@ internal class SyncStream( // Connection would be closed automatically right after this logger.i { "Token expiring reconnect" } connector.invalidateCredentials() - state.retry = true + state.abortIteration = true return state } - triggerCrudUpload() + // Don't await the upload job, we can keep receiving sync lines + triggerCrudUploadAsync() return state } } @@ -454,5 +491,9 @@ internal data class SyncStreamState( var validatedCheckpoint: Checkpoint?, var appliedCheckpoint: Checkpoint?, var bucketSet: MutableSet?, - var retry: Boolean, + var abortIteration: Boolean = false, +) + +private class PendingCrudUpload( + val done: CompletableDeferred, ) diff --git a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt new file mode 100644 index 00000000..1319e637 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt @@ -0,0 +1,24 @@ +package com.powersync + +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials + +class TestConnector : PowerSyncBackendConnector() { + var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = { + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + } + var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = { + val tx = it.getNextCrudTransaction() + tx?.complete(null) + } + + override suspend fun fetchCredentials(): PowerSyncCredentials? = fetchCredentialsCallback() + + override suspend fun uploadData(database: PowerSyncDatabase) { + uploadDataCallback(database) + } +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 41573e3f..069139bd 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -11,6 +11,8 @@ import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.WriteCheckpointData +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry @@ -105,6 +107,7 @@ class SyncStreamTest { uploadCrud = {}, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) syncStream.invalidateCredentials() @@ -141,10 +144,11 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) syncStream.status.update(connected = true) - syncStream.triggerCrudUpload() + syncStream.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -180,6 +184,7 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) // Launch streaming sync in a coroutine that we'll cancel after verification @@ -209,7 +214,7 @@ class SyncStreamTest { // TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything // Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point val syncLines = Channel() - val client = MockSyncService(syncLines) + val client = MockSyncService(syncLines, { WriteCheckpointResponse(WriteCheckpointData("1000")) }) syncStream = SyncStream( @@ -220,6 +225,7 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) val job = launch { syncStream.streamingSync() } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 56ab90ec..42831a11 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -1,6 +1,7 @@ package com.powersync.testutils import app.cash.turbine.ReceiveTurbine +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.sync.SyncLine import com.powersync.sync.SyncStatusData import com.powersync.utils.JsonUtil @@ -33,6 +34,7 @@ import kotlinx.serialization.encodeToString */ internal class MockSyncService( private val lines: ReceiveChannel, + private val generateCheckpoint: () -> WriteCheckpointResponse, ) : HttpClientEngineBase("sync-service") { override val config: HttpClientEngineConfig get() = Config @@ -70,6 +72,15 @@ internal class MockSyncService( job.channel, context, ) + } else if (data.url.encodedPath == "/write-checkpoint2.json") { + HttpResponseData( + HttpStatusCode.OK, + GMTDate(), + headersOf(), + HttpProtocolVersion.HTTP_1_1, + JsonUtil.json.encodeToString(generateCheckpoint()), + context, + ) } else { HttpResponseData( HttpStatusCode.BadRequest,