Skip to content

Commit 394c406

Browse files
feat: use navigator locks
1 parent 5d0fd64 commit 394c406

File tree

7 files changed

+268
-58
lines changed

7 files changed

+268
-58
lines changed

packages/sqlite_async/lib/src/common/mutex.dart

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import 'package:sqlite_async/src/impl/mutex_impl.dart';
22

33
abstract class Mutex {
4-
factory Mutex() {
5-
return MutexImpl();
4+
factory Mutex(
5+
{
6+
/// An optional identifier for this Mutex instance.
7+
/// This could be used for platform specific logic or debugging purposes.
8+
String? identifier}) {
9+
return MutexImpl(identifier: identifier);
610
}
711

812
/// timeout is a timeout for acquiring the lock, not for the callback

packages/sqlite_async/lib/src/impl/stub_mutex.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import 'package:sqlite_async/src/common/mutex.dart';
22

33
class MutexImpl implements Mutex {
4+
String? identifier;
5+
6+
MutexImpl({this.identifier});
7+
48
@override
59
Future<void> close() {
610
throw UnimplementedError();

packages/sqlite_async/lib/src/web/web_mutex.dart

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,36 @@
1+
import 'dart:async';
2+
import 'dart:js_interop_unsafe';
3+
import 'dart:math';
4+
15
import 'package:mutex/mutex.dart' as mutex;
26
import 'package:sqlite_async/src/common/mutex.dart';
7+
import 'dart:js_interop';
8+
import 'package:web/web.dart';
9+
10+
@JS('navigator')
11+
external Navigator get _navigator;
12+
13+
@JS('AbortController')
14+
external AbortController get _abortController;
315

416
/// Web implementation of [Mutex]
5-
/// This should use `navigator.locks` in future
617
class MutexImpl implements Mutex {
7-
late final mutex.Mutex m;
18+
late final mutex.Mutex fallback;
19+
String? identifier;
20+
String _resolvedIdentifier;
21+
22+
MutexImpl({this.identifier})
823

9-
MutexImpl() {
10-
m = mutex.Mutex();
24+
/// On web a lock name is required for Navigator locks.
25+
/// Having exclusive Mutex instances requires a somewhat unique lock name.
26+
/// This provides a best effort unique identifier, if no identifier is provided.
27+
/// This should be fine for most use cases:
28+
/// - The uuid package could be added for better uniqueness if required.
29+
/// - This would add another package dependency to `sqlite_async` which is potentially unnecessary at this point.
30+
/// An identifier should be supplied for better exclusion.
31+
: _resolvedIdentifier = identifier ??
32+
"${DateTime.now().microsecondsSinceEpoch}-${Random().nextDouble()}" {
33+
fallback = mutex.Mutex();
1134
}
1235

1336
@override
@@ -17,8 +40,77 @@ class MutexImpl implements Mutex {
1740

1841
@override
1942
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) {
20-
// Note this lock is only valid in a single web tab
21-
return m.protect(callback);
43+
if ((_navigator as JSObject).hasProperty('locks'.toJS).toDart) {
44+
return _webLock(callback, timeout: timeout);
45+
} else {
46+
return _fallbackLock(callback, timeout: timeout);
47+
}
48+
}
49+
50+
Future<T> _fallbackLock<T>(Future<T> Function() callback,
51+
{Duration? timeout}) {
52+
final completer = Completer<T>();
53+
// Need to implement timeout manually for this
54+
bool isTimedOut = false;
55+
bool lockObtained = false;
56+
if (timeout != null) {
57+
Future.delayed(timeout, () {
58+
isTimedOut = true;
59+
if (lockObtained == false) {
60+
completer.completeError(LockError('Timeout reached'));
61+
}
62+
});
63+
}
64+
65+
fallback.protect(() async {
66+
try {
67+
if (isTimedOut) {
68+
// Don't actually run logic
69+
return;
70+
}
71+
lockObtained = true;
72+
final result = await callback();
73+
completer.complete(result);
74+
} catch (ex) {
75+
completer.completeError(ex);
76+
}
77+
});
78+
79+
return completer.future;
80+
}
81+
82+
Future<T> _webLock<T>(Future<T> Function() callback, {Duration? timeout}) {
83+
final completer = Completer<T>();
84+
// Navigator locks can be timed out by using an AbortSignal
85+
final controller = AbortController();
86+
87+
bool lockAcquired = false;
88+
if (timeout != null) {
89+
// Can't really abort the `delayed` call easily :(
90+
Future.delayed(timeout, () {
91+
if (lockAcquired == true) {
92+
return;
93+
}
94+
completer.completeError(LockError('Timeout reached'));
95+
controller.abort('Timeout'.toJS);
96+
});
97+
}
98+
99+
JSPromise jsCallback(JSAny lock) {
100+
lockAcquired = true;
101+
callback().then((value) {
102+
completer.complete(value);
103+
}).catchError((error) {
104+
completer.completeError(error);
105+
});
106+
return completer.future.toJS;
107+
}
108+
109+
final lockOptions = JSObject();
110+
lockOptions['signal'] = controller.signal;
111+
_navigator.locks.request(_resolvedIdentifier, lockOptions, jsCallback.toJS);
112+
113+
return completer.future;
22114
}
23115

24116
@override

packages/sqlite_async/pubspec.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies:
1818
collection: ^1.17.0
1919
mutex: ^3.1.0
2020
meta: ^1.10.0
21+
web: ^0.5.1
2122

2223
dev_dependencies:
2324
dcli: ^4.0.0
Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,80 @@
1-
@TestOn('!browser')
2-
import 'dart:isolate';
1+
import 'dart:math';
32

4-
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
3+
import 'package:sqlite_async/sqlite_async.dart';
54
import 'package:test/test.dart';
65

6+
import 'utils/test_utils_impl.dart';
7+
8+
final testUtils = TestUtils();
9+
710
void main() {
8-
group('Mutex Tests', () {
9-
test('Closing', () async {
10-
// Test that locks are properly released when calling SharedMutex.close()
11-
// in in Isolate.
12-
// A timeout in this test indicates a likely error.
13-
for (var i = 0; i < 50; i++) {
14-
final mutex = SimpleMutex();
15-
final serialized = mutex.shared;
16-
17-
final result = await Isolate.run(() async {
18-
return _lockInIsolate(serialized);
11+
group('Shared Mutex Tests', () {
12+
test('Queue exclusive operations', () async {
13+
final m = Mutex();
14+
final collection = List.generate(10, (index) => index);
15+
final results = <int>[];
16+
17+
final futures = collection.map((element) async {
18+
return m.lock(() async {
19+
// Simulate some asynchronous work
20+
await Future.delayed(Duration(milliseconds: Random().nextInt(100)));
21+
results.add(element);
22+
return element;
1923
});
24+
}).toList();
2025

21-
await mutex.lock(() async {});
26+
// Await all the promises
27+
await Future.wait(futures);
2228

23-
expect(result, equals(5));
24-
}
29+
// Check if the results are in ascending order
30+
expect(results, equals(collection));
2531
});
32+
});
2633

27-
test('Re-use after closing', () async {
28-
// Test that shared locks can be opened and closed multiple times.
29-
final mutex = SimpleMutex();
30-
final serialized = mutex.shared;
34+
test('Timeout should throw a LockError', () async {
35+
final m = Mutex();
36+
m.lock(() async {
37+
await Future.delayed(Duration(milliseconds: 300));
38+
});
3139

32-
final result = await Isolate.run(() async {
33-
return _lockInIsolate(serialized);
34-
});
40+
await expectLater(
41+
m.lock(() async {
42+
print('This should not get executed');
43+
}, timeout: Duration(milliseconds: 200)),
44+
throwsA((e) => e is LockError && e.message.contains('Timeout')));
45+
});
3546

36-
final result2 = await Isolate.run(() async {
37-
return _lockInIsolate(serialized);
38-
});
47+
test('In-time timeout should function normally', () async {
48+
final m = Mutex();
49+
final results = [];
50+
m.lock(() async {
51+
await Future.delayed(Duration(milliseconds: 100));
52+
results.add(1);
53+
});
3954

40-
await mutex.lock(() async {});
55+
await m.lock(() async {
56+
results.add(2);
57+
}, timeout: Duration(milliseconds: 200));
4158

42-
expect(result, equals(5));
43-
expect(result2, equals(5));
44-
});
45-
}, timeout: const Timeout(Duration(milliseconds: 5000)));
46-
}
59+
expect(results, equals([1, 2]));
60+
});
4761

48-
Future<Object> _lockInIsolate(
49-
SerializedMutex smutex,
50-
) async {
51-
final mutex = smutex.open();
52-
// Start a "thread" that repeatedly takes a lock
53-
_infiniteLock(mutex).ignore();
54-
await Future.delayed(const Duration(milliseconds: 10));
55-
// Then close the mutex while the above loop is running.
56-
await mutex.close();
57-
58-
return 5;
59-
}
62+
test('Different Mutex instances should cause separate locking', () async {
63+
final m1 = Mutex();
64+
final m2 = Mutex();
6065

61-
Future<void> _infiniteLock(SharedMutex mutex) async {
62-
while (true) {
63-
await mutex.lock(() async {
64-
await Future.delayed(const Duration(milliseconds: 1));
66+
final results = [];
67+
final p1 = m1.lock(() async {
68+
await Future.delayed(Duration(milliseconds: 300));
69+
results.add(1);
6570
});
66-
}
71+
72+
final p2 = m2.lock(() async {
73+
results.add(2);
74+
});
75+
76+
await p1;
77+
await p2;
78+
expect(results, equals([2, 1]));
79+
});
6780
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
@TestOn('!browser')
2+
import 'dart:isolate';
3+
4+
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
5+
import 'package:test/test.dart';
6+
7+
void main() {
8+
group('Mutex Tests', () {
9+
test('Closing', () async {
10+
// Test that locks are properly released when calling SharedMutex.close()
11+
// in in Isolate.
12+
// A timeout in this test indicates a likely error.
13+
for (var i = 0; i < 50; i++) {
14+
final mutex = SimpleMutex();
15+
final serialized = mutex.shared;
16+
17+
final result = await Isolate.run(() async {
18+
return _lockInIsolate(serialized);
19+
});
20+
21+
await mutex.lock(() async {});
22+
23+
expect(result, equals(5));
24+
}
25+
});
26+
27+
test('Re-use after closing', () async {
28+
// Test that shared locks can be opened and closed multiple times.
29+
final mutex = SimpleMutex();
30+
final serialized = mutex.shared;
31+
32+
final result = await Isolate.run(() async {
33+
return _lockInIsolate(serialized);
34+
});
35+
36+
final result2 = await Isolate.run(() async {
37+
return _lockInIsolate(serialized);
38+
});
39+
40+
await mutex.lock(() async {});
41+
42+
expect(result, equals(5));
43+
expect(result2, equals(5));
44+
});
45+
}, timeout: const Timeout(Duration(milliseconds: 5000)));
46+
}
47+
48+
Future<Object> _lockInIsolate(
49+
SerializedMutex smutex,
50+
) async {
51+
final mutex = smutex.open();
52+
// Start a "thread" that repeatedly takes a lock
53+
_infiniteLock(mutex).ignore();
54+
await Future.delayed(const Duration(milliseconds: 10));
55+
// Then close the mutex while the above loop is running.
56+
await mutex.close();
57+
58+
return 5;
59+
}
60+
61+
Future<void> _infiniteLock(SharedMutex mutex) async {
62+
while (true) {
63+
await mutex.lock(() async {
64+
await Future.delayed(const Duration(milliseconds: 1));
65+
});
66+
}
67+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import 'package:sqlite_async/sqlite_async.dart';
2+
import 'package:test/test.dart';
3+
4+
import '../utils/test_utils_impl.dart';
5+
6+
final testUtils = TestUtils();
7+
8+
void main() {
9+
group('Web Mutex Tests', () {
10+
test('Web should share locking with identical identifiers', () async {
11+
final m1 = Mutex(identifier: 'sync');
12+
final m2 = Mutex(identifier: 'sync');
13+
14+
final results = [];
15+
final p1 = m1.lock(() async {
16+
results.add(1);
17+
});
18+
19+
final p2 = m2.lock(() async {
20+
results.add(2);
21+
});
22+
23+
await p1;
24+
await p2;
25+
// It should be correctly ordered as if it was the same mutex
26+
expect(results, equals([1, 2]));
27+
});
28+
});
29+
}

0 commit comments

Comments
 (0)