Skip to content

Commit 87759eb

Browse files
committed
Adopt isolate mutex implementations
1 parent 3387dc0 commit 87759eb

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

Diff for: packages/powersync_core/lib/src/database/native/native_powersync_database.dart

+44-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import 'package:powersync_core/src/streaming_sync.dart';
1919
import 'package:powersync_core/src/sync_status.dart';
2020
import 'package:sqlite_async/sqlite3_common.dart';
2121
import 'package:sqlite_async/sqlite_async.dart';
22+
// ignore: implementation_imports
23+
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
2224

2325
/// A PowerSync managed database.
2426
///
@@ -44,6 +46,8 @@ class PowerSyncDatabaseImpl
4446
@protected
4547
late Future<void> isInitialized;
4648

49+
final SimpleMutex _syncMutex = SimpleMutex(), _crudMutex = SimpleMutex();
50+
4751
@override
4852

4953
/// The Logger used by this [PowerSyncDatabase].
@@ -224,7 +228,13 @@ class PowerSyncDatabaseImpl
224228
await Isolate.spawn(
225229
_syncIsolate,
226230
_PowerSyncDatabaseIsolateArgs(
227-
receiveMessages.sendPort, dbRef, retryDelay, clientParams),
231+
receiveMessages.sendPort,
232+
dbRef,
233+
retryDelay,
234+
clientParams,
235+
_crudMutex.shared,
236+
_syncMutex.shared,
237+
),
228238
debugName: 'Sync ${database.openFactory.path}',
229239
onError: receiveUnhandledErrors.sendPort,
230240
errorsAreFatal: true,
@@ -259,16 +269,32 @@ class PowerSyncDatabaseImpl
259269
return database.writeLock(callback,
260270
debugContext: debugContext, lockTimeout: lockTimeout);
261271
}
272+
273+
@override
274+
Future<void> close() async {
275+
await super.close();
276+
277+
await _crudMutex.close();
278+
await _crudMutex.close();
279+
}
262280
}
263281

264282
class _PowerSyncDatabaseIsolateArgs {
265283
final SendPort sPort;
266284
final IsolateConnectionFactory dbRef;
267285
final Duration retryDelay;
268286
final Map<String, dynamic>? parameters;
287+
final SerializedMutex crudMutex;
288+
final SerializedMutex syncMutex;
269289

270290
_PowerSyncDatabaseIsolateArgs(
271-
this.sPort, this.dbRef, this.retryDelay, this.parameters);
291+
this.sPort,
292+
this.dbRef,
293+
this.retryDelay,
294+
this.parameters,
295+
this.crudMutex,
296+
this.syncMutex,
297+
);
272298
}
273299

274300
Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
@@ -277,6 +303,9 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
277303
StreamController<String> crudUpdateController = StreamController.broadcast();
278304
final upstreamDbClient = args.dbRef.upstreamPort.open();
279305

306+
final crudMutex = args.crudMutex.open();
307+
final syncMutex = args.syncMutex.open();
308+
280309
CommonDatabase? db;
281310
final Mutex mutex = args.dbRef.mutex.open();
282311
StreamingSyncImplementation? openedStreamingSync;
@@ -294,6 +323,8 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
294323
// It needs to be closed before killing the isolate
295324
// in order to free the mutex for other operations.
296325
await mutex.close();
326+
await crudMutex.close();
327+
await syncMutex.close();
297328
rPort.close();
298329

299330
// TODO: If we closed our resources properly, this wouldn't be necessary...
@@ -348,14 +379,17 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
348379

349380
final storage = BucketStorage(connection);
350381
final sync = StreamingSyncImplementation(
351-
adapter: storage,
352-
credentialsCallback: loadCredentials,
353-
invalidCredentialsCallback: invalidateCredentials,
354-
uploadCrud: uploadCrud,
355-
crudUpdateTriggerStream: crudUpdateController.stream,
356-
retryDelay: args.retryDelay,
357-
client: http.Client(),
358-
syncParameters: args.parameters);
382+
adapter: storage,
383+
credentialsCallback: loadCredentials,
384+
invalidCredentialsCallback: invalidateCredentials,
385+
uploadCrud: uploadCrud,
386+
crudUpdateTriggerStream: crudUpdateController.stream,
387+
retryDelay: args.retryDelay,
388+
client: http.Client(),
389+
syncParameters: args.parameters,
390+
crudMutex: crudMutex,
391+
syncMutex: syncMutex,
392+
);
359393
openedStreamingSync = sync;
360394
sync.streamingSync();
361395
sync.statusStream.listen((event) {

Diff for: packages/powersync_core/lib/src/streaming_sync.dart

+17-15
Original file line numberDiff line numberDiff line change
@@ -74,21 +74,23 @@ class StreamingSyncImplementation implements StreamingSync {
7474

7575
String? clientId;
7676

77-
StreamingSyncImplementation(
78-
{required this.adapter,
79-
required this.credentialsCallback,
80-
this.invalidCredentialsCallback,
81-
required this.uploadCrud,
82-
required this.crudUpdateTriggerStream,
83-
required this.retryDelay,
84-
this.syncParameters,
85-
required http.Client client,
86-
87-
/// A unique identifier for this streaming sync implementation
88-
/// A good value is typically the DB file path which it will mutate when syncing.
89-
String? identifier = "unknown"})
90-
: syncMutex = Mutex(identifier: "sync-$identifier"),
91-
crudMutex = Mutex(identifier: "crud-$identifier"),
77+
StreamingSyncImplementation({
78+
required this.adapter,
79+
required this.credentialsCallback,
80+
this.invalidCredentialsCallback,
81+
required this.uploadCrud,
82+
required this.crudUpdateTriggerStream,
83+
required this.retryDelay,
84+
this.syncParameters,
85+
required http.Client client,
86+
Mutex? syncMutex,
87+
Mutex? crudMutex,
88+
89+
/// A unique identifier for this streaming sync implementation
90+
/// A good value is typically the DB file path which it will mutate when syncing.
91+
String? identifier = "unknown",
92+
}) : syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"),
93+
crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"),
9294
_userAgentHeaders = userAgentHeaders() {
9395
_client = client;
9496
statusStream = _statusStreamController.stream;

0 commit comments

Comments
 (0)