Skip to content

Fix checkpoints during uploads not being applied #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,14 @@ jobs:
./gradlew \
-PGITHUB_PUBLISH_TOKEN=${{ secrets.GITHUB_TOKEN }} \
${{ matrix.targets }}
shell: bash
shell: bash

# Credit: https://github.com/gradle/actions/issues/76#issuecomment-2007584323
- name: Upload reports on failure
if: failure()
uses: actions/upload-artifact@v4
with:
name: report-for-${{ matrix.os }}
path: |
**/build/reports/
**/build/test-results/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
```
Sqlite operation failure database is locked attempted to run migration and failed. closing connection
```
* Fix race condition causing data received during uploads not to be applied.

## 1.0.0-BETA28

Expand Down
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ tasks.withType<KotlinTest> {
testLogging {
events("PASSED", "FAILED", "SKIPPED")
exceptionFormat = TestExceptionFormat.FULL
showCauses = true
showStandardStreams = true
showStackTraces = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.SyncLine
Expand All @@ -17,6 +19,7 @@ import com.powersync.utils.JsonUtil
import dev.mokkery.verify
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.serialization.encodeToString
import kotlin.test.Test
Expand All @@ -35,7 +38,7 @@ class SyncIntegrationTest {
@OptIn(DelicateCoroutinesApi::class)
fun closesResponseStreamOnDatabaseClose() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
Expand All @@ -55,7 +58,7 @@ class SyncIntegrationTest {
@OptIn(DelicateCoroutinesApi::class)
fun cleansResourcesOnDisconnect() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
Expand All @@ -77,7 +80,7 @@ class SyncIntegrationTest {
@Test
fun cannotUpdateSchemaWhileConnected() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
Expand All @@ -96,7 +99,7 @@ class SyncIntegrationTest {
@Test
fun testPartialSync() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

val checksums =
Expand Down Expand Up @@ -188,7 +191,7 @@ class SyncIntegrationTest {
@Test
fun testRemembersLastPartialSync() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

syncLines.send(
Expand Down Expand Up @@ -225,7 +228,7 @@ class SyncIntegrationTest {
@Test
fun setsDownloadingState() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
Expand Down Expand Up @@ -258,7 +261,7 @@ class SyncIntegrationTest {
fun setsConnectingState() =
databaseTest {
turbineScope(timeout = 10.0.seconds) {
val syncStream = syncStream()
val syncStream = database.syncStream()
val turbine = database.currentStatus.asFlow().testIn(this)

database.connectInternal(syncStream, 1000L)
Expand All @@ -274,7 +277,7 @@ class SyncIntegrationTest {
@Test
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
databaseTest {
val syncStream = syncStream()
val syncStream = database.syncStream()
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
Expand Down Expand Up @@ -404,4 +407,109 @@ class SyncIntegrationTest {
turbine.cancel()
}
}

@Test
@OptIn(ExperimentalKermitApi::class)
fun `handles checkpoints during uploads`() =
databaseTest {
val testConnector = TestConnector()
connector = testConnector
database.connectInternal(database.syncStream(), 1000L)

suspend fun expectUserRows(amount: Int) {
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
assertEquals(amount, row.toInt())
}

val completeUpload = CompletableDeferred<Unit>()
val uploadStarted = CompletableDeferred<Unit>()
testConnector.uploadDataCallback = { db ->
db.getCrudBatch()?.let { batch ->
uploadStarted.complete(Unit)
completeUpload.await()
batch.complete.invoke(null)
}
}

// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
// connected).
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
syncLines.send(SyncLine.KeepAlive(1234))
expectUserRows(1)
uploadStarted.await()

// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
writeCheckpoint = "1",
lastOpId = "2",
checksums = listOf(BucketChecksum("a", checksum = 0)),
),
),
)
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.downloading }
turbine.cancelAndIgnoreRemainingEvents()
}

syncLines.send(
SyncLine.SyncDataBucket(
bucket = "a",
data =
listOf(
OplogEntry(
checksum = 0,
opId = "1",
op = OpType.PUT,
rowId = "1",
rowType = "users",
data = """{"id": "test1", "name": "from local", "email": ""}""",
),
OplogEntry(
checksum = 0,
opId = "2",
op = OpType.PUT,
rowId = "2",
rowType = "users",
data = """{"id": "test1", "name": "additional entry", "email": ""}""",
),
),
after = null,
nextAfter = null,
hasMore = false,
),
)
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))

// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
waitFor {
assertNotNull(
logWriter.logs.find {
it.message.contains("Could not apply checkpoint due to local data")
},
)
}
database.expectUserCount(1)

// Mark the upload as completed, this should trigger a write_checkpoint.json request
val requestedCheckpoint = CompletableDeferred<Unit>()
checkpointResponse = {
requestedCheckpoint.complete(Unit)
WriteCheckpointResponse(WriteCheckpointData("1"))
}
completeUpload.complete(Unit)
requestedCheckpoint.await()

// This should apply the checkpoint
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { !it.downloading }
turbine.cancelAndIgnoreRemainingEvents()
}

// Meaning that the two rows are now visible
database.expectUserCount(2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import co.touchlab.kermit.TestConfig
import co.touchlab.kermit.TestLogWriter
import com.powersync.DatabaseDriverFactory
import com.powersync.PowerSyncDatabase
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.PowerSyncDatabaseImpl
Expand Down Expand Up @@ -75,6 +77,9 @@ internal class ActiveDatabaseTest(
)

val syncLines = Channel<SyncLine>()
var checkpointResponse: () -> WriteCheckpointResponse = {
WriteCheckpointResponse(WriteCheckpointData("1000"))
}

val testDirectory by lazy { getTempDir() }
val databaseName by lazy {
Expand All @@ -84,7 +89,7 @@ internal class ActiveDatabaseTest(
"db-$suffix"
}

val connector =
var connector =
mock<PowerSyncBackendConnector> {
everySuspend { getCredentialsCached() } returns
PowerSyncCredentials(
Expand Down Expand Up @@ -113,16 +118,17 @@ internal class ActiveDatabaseTest(

suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } }

fun syncStream(): SyncStream {
val client = MockSyncService(syncLines)
fun PowerSyncDatabase.syncStream(): SyncStream {
val client = MockSyncService(syncLines) { checkpointResponse() }
return SyncStream(
bucketStorage = database.bucketStorage,
connector = connector,
httpEngine = client,
uploadCrud = { },
uploadCrud = { connector.uploadData(this) },
retryDelayMs = 10,
logger = logger,
params = JsonObject(emptyMap()),
scope = scope,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ internal class PowerSyncDatabaseImpl(
retryDelayMs = retryDelayMs,
logger = logger,
params = params.toJsonObject(),
scope = scope,
),
crudThrottleMs,
)
Expand Down Expand Up @@ -232,7 +233,7 @@ internal class PowerSyncDatabaseImpl(
.filter { it.contains(InternalTable.CRUD.toString()) }
.throttle(crudThrottleMs)
.collect {
stream.triggerCrudUpload()
stream.triggerCrudUploadAsync().join()
}
}
}
Expand Down
Loading