Skip to content

Commit 67d0be5

Browse files
committed
Only reset upload lock when held
1 parent b3e767e commit 67d0be5

File tree

3 files changed

+46
-13
lines changed

3 files changed

+46
-13
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

+13-12
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ import com.powersync.testutils.factory
2626
import com.powersync.testutils.generatePrintLogWriter
2727
import com.powersync.testutils.waitFor
2828
import com.powersync.utils.JsonUtil
29-
import dev.mokkery.answering.calls
3029
import dev.mokkery.answering.returns
3130
import dev.mokkery.everySuspend
32-
import dev.mokkery.matcher.any
3331
import dev.mokkery.mock
3432
import dev.mokkery.verify
3533
import kotlinx.coroutines.CompletableDeferred
34+
import kotlinx.coroutines.CoroutineScope
3635
import kotlinx.coroutines.DelicateCoroutinesApi
37-
import kotlinx.coroutines.GlobalScope
3836
import kotlinx.coroutines.channels.Channel
3937
import kotlinx.coroutines.runBlocking
4038
import kotlinx.coroutines.test.runTest
@@ -109,7 +107,7 @@ class SyncIntegrationTest {
109107
) as PowerSyncDatabaseImpl
110108

111109
@OptIn(DelicateCoroutinesApi::class)
112-
private fun syncStream(): SyncStream {
110+
private fun CoroutineScope.syncStream(): SyncStream {
113111
val client = MockSyncService(syncLines, { checkpointResponse() })
114112
return SyncStream(
115113
bucketStorage = database.bucketStorage,
@@ -119,7 +117,7 @@ class SyncIntegrationTest {
119117
retryDelayMs = 10,
120118
logger = logger,
121119
params = JsonObject(emptyMap()),
122-
scope = GlobalScope,
120+
scope = this,
123121
)
124122
}
125123

@@ -543,6 +541,8 @@ class SyncIntegrationTest {
543541
@Test
544542
fun `handles checkpoints during uploads`() =
545543
runTest {
544+
val testConnector = TestConnector()
545+
connector = testConnector
546546
database.connectInternal(syncStream(), 1000L)
547547

548548
suspend fun expectUserRows(amount: Int) {
@@ -552,13 +552,13 @@ class SyncIntegrationTest {
552552

553553
val completeUpload = CompletableDeferred<Unit>()
554554
val uploadStarted = CompletableDeferred<Unit>()
555-
everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) ->
556-
val batch = db.getCrudBatch()
557-
if (batch == null) return@calls
558-
559-
uploadStarted.complete(Unit)
560-
completeUpload.await()
561-
batch.complete.invoke(null)
555+
testConnector.uploadDataCallback = { db ->
556+
println("upload data callback called")
557+
db.getCrudBatch()?.let { batch ->
558+
uploadStarted.complete(Unit)
559+
completeUpload.await()
560+
batch.complete.invoke(null)
561+
}
562562
}
563563

564564
// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
@@ -629,6 +629,7 @@ class SyncIntegrationTest {
629629
requestedCheckpoint.complete(Unit)
630630
WriteCheckpointResponse(WriteCheckpointData(""))
631631
}
632+
println("marking update as completed")
632633
completeUpload.complete(Unit)
633634
requestedCheckpoint.await()
634635

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,20 @@ internal class SyncStream(
124124
fun triggerCrudUploadAsync(): Job =
125125
scope.launch {
126126
val thisIteration = PendingCrudUpload(CompletableDeferred())
127+
var holdingUploadLock = false
128+
127129
try {
128130
if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) {
129131
return@launch
130132
}
131133

134+
holdingUploadLock = true
132135
uploadAllCrud()
133136
} finally {
134-
isUploadingCrud.set(null)
137+
if (holdingUploadLock) {
138+
isUploadingCrud.set(null)
139+
}
140+
135141
thisIteration.done.complete(Unit)
136142
}
137143
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.powersync
2+
3+
import com.powersync.connectors.PowerSyncBackendConnector
4+
import com.powersync.connectors.PowerSyncCredentials
5+
6+
class TestConnector: PowerSyncBackendConnector() {
7+
var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = {
8+
PowerSyncCredentials(
9+
token = "test-token",
10+
userId = "test-user",
11+
endpoint = "https://test.com",
12+
)
13+
}
14+
var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = {
15+
val tx = it.getNextCrudTransaction()
16+
tx?.complete(null)
17+
}
18+
19+
override suspend fun fetchCredentials(): PowerSyncCredentials? {
20+
return fetchCredentialsCallback()
21+
}
22+
23+
override suspend fun uploadData(database: PowerSyncDatabase) {
24+
uploadDataCallback(database)
25+
}
26+
}

0 commit comments

Comments
 (0)