Skip to content

Commit b3e767e

Browse files
committed
Reformat
1 parent cbae24e commit b3e767e

File tree

3 files changed

+107
-99
lines changed

3 files changed

+107
-99
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

+91-83
Original file line numberDiff line numberDiff line change
@@ -541,97 +541,105 @@ class SyncIntegrationTest {
541541
}
542542

543543
@Test
544-
fun `handles checkpoints during uploads`() = runTest {
545-
database.connectInternal(syncStream(), 1000L)
544+
fun `handles checkpoints during uploads`() =
545+
runTest {
546+
database.connectInternal(syncStream(), 1000L)
546547

547-
suspend fun expectUserRows(amount: Int) {
548-
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
549-
assertEquals(amount, row.toInt())
550-
}
548+
suspend fun expectUserRows(amount: Int) {
549+
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
550+
assertEquals(amount, row.toInt())
551+
}
551552

552-
val completeUpload = CompletableDeferred<Unit>()
553-
val uploadStarted = CompletableDeferred<Unit>()
554-
everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) ->
555-
val batch = db.getCrudBatch()
556-
if (batch == null) return@calls
553+
val completeUpload = CompletableDeferred<Unit>()
554+
val uploadStarted = CompletableDeferred<Unit>()
555+
everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) ->
556+
val batch = db.getCrudBatch()
557+
if (batch == null) return@calls
557558

558-
uploadStarted.complete(Unit)
559-
completeUpload.await()
560-
batch.complete.invoke(null)
561-
}
559+
uploadStarted.complete(Unit)
560+
completeUpload.await()
561+
batch.complete.invoke(null)
562+
}
562563

563-
// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
564-
// connected).
565-
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
566-
syncLines.send(SyncLine.KeepAlive(1234))
567-
expectUserRows(1)
568-
uploadStarted.await()
569-
570-
// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
571-
syncLines.send(SyncLine.FullCheckpoint(Checkpoint(
572-
writeCheckpoint = "1",
573-
lastOpId = "2",
574-
checksums = listOf(BucketChecksum("a", checksum = 0))
575-
)))
576-
turbineScope {
577-
val turbine = database.currentStatus.asFlow().testIn(this)
578-
turbine.waitFor { it.downloading }
579-
turbine.cancel()
580-
}
564+
// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
565+
// connected).
566+
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
567+
syncLines.send(SyncLine.KeepAlive(1234))
568+
expectUserRows(1)
569+
uploadStarted.await()
581570

582-
syncLines.send(SyncLine.SyncDataBucket(
583-
bucket = "a",
584-
data = listOf(
585-
OplogEntry(
586-
checksum = 0,
587-
opId = "1",
588-
op = OpType.PUT,
589-
rowId = "1",
590-
rowType = "users",
591-
data = """{"id": "test1", "name": "from local", "email": ""}"""
571+
// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
572+
syncLines.send(
573+
SyncLine.FullCheckpoint(
574+
Checkpoint(
575+
writeCheckpoint = "1",
576+
lastOpId = "2",
577+
checksums = listOf(BucketChecksum("a", checksum = 0)),
578+
),
592579
),
593-
OplogEntry(
594-
checksum = 0,
595-
opId = "2",
596-
op = OpType.PUT,
597-
rowId = "2",
598-
rowType = "users",
599-
data = """{"id": "test1", "name": "additional entry", "email": ""}"""
580+
)
581+
turbineScope {
582+
val turbine = database.currentStatus.asFlow().testIn(this)
583+
turbine.waitFor { it.downloading }
584+
turbine.cancel()
585+
}
586+
587+
syncLines.send(
588+
SyncLine.SyncDataBucket(
589+
bucket = "a",
590+
data =
591+
listOf(
592+
OplogEntry(
593+
checksum = 0,
594+
opId = "1",
595+
op = OpType.PUT,
596+
rowId = "1",
597+
rowType = "users",
598+
data = """{"id": "test1", "name": "from local", "email": ""}""",
599+
),
600+
OplogEntry(
601+
checksum = 0,
602+
opId = "2",
603+
op = OpType.PUT,
604+
rowId = "2",
605+
rowType = "users",
606+
data = """{"id": "test1", "name": "additional entry", "email": ""}""",
607+
),
608+
),
609+
after = null,
610+
nextAfter = null,
611+
hasMore = false,
600612
),
601-
),
602-
after = null,
603-
nextAfter = null,
604-
hasMore = false,
605-
))
606-
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))
607-
608-
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
609-
waitFor {
610-
assertNotNull(
611-
logWriter.logs.find {
612-
it.message.contains("Could not apply checkpoint due to local data")
613-
},
614613
)
615-
}
616-
expectUserCount(1)
614+
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))
615+
616+
// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
617+
waitFor {
618+
assertNotNull(
619+
logWriter.logs.find {
620+
it.message.contains("Could not apply checkpoint due to local data")
621+
},
622+
)
623+
}
624+
expectUserCount(1)
617625

618-
// Mark the upload as completed, this should trigger a write_checkpoint.json request
619-
val requestedCheckpoint = CompletableDeferred<Unit>()
620-
checkpointResponse = {
621-
requestedCheckpoint.complete(Unit)
622-
WriteCheckpointResponse(WriteCheckpointData(""))
623-
}
624-
completeUpload.complete(Unit)
625-
requestedCheckpoint.await()
626-
627-
// This should apply the checkpoint
628-
turbineScope {
629-
val turbine = database.currentStatus.asFlow().testIn(this)
630-
turbine.waitFor { !it.downloading }
631-
turbine.cancel()
632-
}
626+
// Mark the upload as completed, this should trigger a write_checkpoint.json request
627+
val requestedCheckpoint = CompletableDeferred<Unit>()
628+
checkpointResponse = {
629+
requestedCheckpoint.complete(Unit)
630+
WriteCheckpointResponse(WriteCheckpointData(""))
631+
}
632+
completeUpload.complete(Unit)
633+
requestedCheckpoint.await()
633634

634-
// Meaning that the two rows are now visible
635-
expectUserCount(2)
636-
}
635+
// This should apply the checkpoint
636+
turbineScope {
637+
val turbine = database.currentStatus.asFlow().testIn(this)
638+
turbine.waitFor { !it.downloading }
639+
turbine.cancel()
640+
}
641+
642+
// Meaning that the two rows are now visible
643+
expectUserCount(2)
644+
}
637645
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

+16-13
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,20 @@ internal class SyncStream(
121121
}
122122
}
123123

124-
fun triggerCrudUploadAsync(): Job = scope.launch {
125-
val thisIteration = PendingCrudUpload(CompletableDeferred())
126-
try {
127-
if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) {
128-
return@launch
129-
}
124+
fun triggerCrudUploadAsync(): Job =
125+
scope.launch {
126+
val thisIteration = PendingCrudUpload(CompletableDeferred())
127+
try {
128+
if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) {
129+
return@launch
130+
}
130131

131-
uploadAllCrud()
132-
} finally {
133-
isUploadingCrud.set(null)
134-
thisIteration.done.complete(Unit)
132+
uploadAllCrud()
133+
} finally {
134+
isUploadingCrud.set(null)
135+
thisIteration.done.complete(Unit)
136+
}
135137
}
136-
}
137138

138139
private suspend fun uploadAllCrud() {
139140
var checkedCrudItem: CrudEntry? = null
@@ -484,7 +485,9 @@ internal data class SyncStreamState(
484485
var validatedCheckpoint: Checkpoint?,
485486
var appliedCheckpoint: Checkpoint?,
486487
var bucketSet: MutableSet<String>?,
487-
var abortIteration: Boolean = false
488+
var abortIteration: Boolean = false,
488489
)
489490

490-
private class PendingCrudUpload(val done: CompletableDeferred<Unit>)
491+
private class PendingCrudUpload(
492+
val done: CompletableDeferred<Unit>,
493+
)

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

-3
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ import dev.mokkery.verify.VerifyMode.Companion.order
3030
import dev.mokkery.verifyNoMoreCalls
3131
import dev.mokkery.verifySuspend
3232
import io.ktor.client.engine.mock.MockEngine
33-
import kotlinx.coroutines.DelicateCoroutinesApi
34-
import kotlinx.coroutines.GlobalScope
3533
import kotlinx.coroutines.channels.Channel
36-
import kotlinx.coroutines.coroutineScope
3734
import kotlinx.coroutines.delay
3835
import kotlinx.coroutines.launch
3936
import kotlinx.coroutines.test.runTest

0 commit comments

Comments
 (0)