Skip to content

Commit 71f279b

Browse files
committed
Refactor update streams on web
1 parent 76e3a20 commit 71f279b

File tree

8 files changed

+205
-209
lines changed

8 files changed

+205
-209
lines changed

packages/sqlite_async/lib/src/utils/shared_utils.dart

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import 'dart:async';
22
import 'dart:convert';
33

4+
import 'package:sqlite3/common.dart';
5+
46
import '../sqlite_connection.dart';
57

68
Future<T> internalReadTransaction<T>(SqliteReadContext ctx,
@@ -75,3 +77,70 @@ Object? mapParameter(Object? parameter) {
7577
List<Object?> mapParameters(List<Object?> parameters) {
7678
return [for (var p in parameters) mapParameter(p)];
7779
}
80+
81+
extension ThrottledUpdates on CommonDatabase {
82+
/// Wraps [updatesSync] to:
83+
///
84+
/// - Not fire in transactions.
85+
/// - Fire asynchronously.
86+
/// - Only report table names, which are buffered to avoid duplicates.
87+
Stream<Set<String>> get throttledUpdatedTables {
88+
StreamController<Set<String>>? controller;
89+
var pendingUpdates = <String>{};
90+
var paused = false;
91+
92+
Timer? updateDebouncer;
93+
94+
void maybeFireUpdates() {
95+
updateDebouncer?.cancel();
96+
updateDebouncer = null;
97+
98+
if (paused) {
99+
// Continue collecting updates, but don't fire any
100+
return;
101+
}
102+
103+
if (!autocommit) {
104+
// Inside a transaction - do not fire updates
105+
return;
106+
}
107+
108+
if (pendingUpdates.isNotEmpty) {
109+
controller!.add(pendingUpdates);
110+
pendingUpdates = {};
111+
}
112+
}
113+
114+
void collectUpdate(SqliteUpdate event) {
115+
pendingUpdates.add(event.tableName);
116+
117+
updateDebouncer ??=
118+
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
119+
}
120+
121+
StreamSubscription? txSubscription;
122+
StreamSubscription? sourceSubscription;
123+
124+
controller = StreamController(onListen: () {
125+
txSubscription = commits.listen((_) {
126+
maybeFireUpdates();
127+
}, onError: (error) {
128+
controller?.addError(error);
129+
});
130+
131+
sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) {
132+
controller?.addError(error);
133+
});
134+
}, onPause: () {
135+
paused = true;
136+
}, onResume: () {
137+
paused = false;
138+
maybeFireUpdates();
139+
}, onCancel: () {
140+
txSubscription?.cancel();
141+
sourceSubscription?.cancel();
142+
});
143+
144+
return controller.stream;
145+
}
146+
}

packages/sqlite_async/lib/src/web/database.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ class WebDatabase
2121
final Mutex? _mutex;
2222
final bool profileQueries;
2323

24+
@override
25+
final Stream<UpdateNotification> updates;
26+
2427
/// For persistent databases that aren't backed by a shared worker, we use
2528
/// web broadcast channels to forward local update events to other tabs.
2629
final BroadcastUpdates? broadcastUpdates;
@@ -32,6 +35,7 @@ class WebDatabase
3235
this._database,
3336
this._mutex, {
3437
required this.profileQueries,
38+
required this.updates,
3539
this.broadcastUpdates,
3640
});
3741

@@ -113,10 +117,6 @@ class WebDatabase
113117
}
114118
}
115119

116-
@override
117-
Stream<UpdateNotification> get updates =>
118-
_database.updates.map((event) => UpdateNotification({event.tableName}));
119-
120120
@override
121121
Future<T> writeTransaction<T>(
122122
Future<T> Function(SqliteWriteContext tx) callback,

packages/sqlite_async/lib/src/web/protocol.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ enum CustomDatabaseMessageKind {
1313
getAutoCommit,
1414
executeInTransaction,
1515
executeBatchInTransaction,
16+
updateSubscriptionManagement,
17+
notifyUpdates,
1618
}
1719

1820
extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import 'dart:async';
2+
import 'dart:js_interop';
3+
4+
import 'package:sqlite3_web/sqlite3_web.dart';
5+
6+
import '../update_notification.dart';
7+
import 'protocol.dart';
8+
9+
/// Utility to request a stream of update notifications from the worker.
10+
///
11+
/// Because we want to debounce update notifications on the worker, we're using
12+
/// custom requests instead of the default [Database.updates] stream.
13+
///
14+
/// Clients send a message to the worker to subscribe or unsubscribe, providing
15+
/// an id for the subscription. The worker distributes update notifications with
16+
/// custom requests to the client, which [handleRequest] distributes to the
17+
/// original streams.
18+
final class UpdateNotificationStreams {
19+
var _idCounter = 0;
20+
final Map<String, StreamController<UpdateNotification>> _updates = {};
21+
22+
Future<JSAny?> handleRequest(JSAny? request) async {
23+
final customRequest = request as CustomDatabaseMessage;
24+
if (customRequest.kind == CustomDatabaseMessageKind.notifyUpdates) {
25+
final notification = UpdateNotification(customRequest.rawParameters.toDart
26+
.map((e) => (e as JSString).toDart)
27+
.toSet());
28+
29+
_updates[customRequest.rawSql.toDart]?.add(notification);
30+
}
31+
32+
return null;
33+
}
34+
35+
Stream<UpdateNotification> updatesFor(Database database) {
36+
final id = (_idCounter++).toString();
37+
final controller = _updates[id] = StreamController();
38+
39+
controller
40+
..onListen = () {
41+
database.customRequest(CustomDatabaseMessage(
42+
CustomDatabaseMessageKind.updateSubscriptionManagement,
43+
id,
44+
[true],
45+
));
46+
}
47+
..onCancel = () {
48+
database.customRequest(CustomDatabaseMessage(
49+
CustomDatabaseMessageKind.updateSubscriptionManagement,
50+
id,
51+
[false],
52+
));
53+
};
54+
55+
return controller.stream;
56+
}
57+
}

packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,19 @@ class DefaultSqliteOpenFactory
6969
? null
7070
: MutexImpl(identifier: path); // Use the DB path as a mutex identifier
7171

72-
BroadcastUpdates? updates;
72+
BroadcastUpdates? broadcastUpdates;
7373
if (connection.access != AccessMode.throughSharedWorker &&
7474
connection.storage != StorageMode.inMemory) {
75-
updates = BroadcastUpdates(path);
75+
broadcastUpdates = BroadcastUpdates(path);
7676
}
7777

78-
return WebDatabase(connection.database, options.mutex ?? mutex,
79-
broadcastUpdates: updates,
80-
profileQueries: sqliteOptions.profileQueries);
78+
return WebDatabase(
79+
connection.database,
80+
options.mutex ?? mutex,
81+
broadcastUpdates: broadcastUpdates,
82+
profileQueries: sqliteOptions.profileQueries,
83+
updates: updatesFor(connection.database),
84+
);
8185
}
8286

8387
@override

packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart

Lines changed: 0 additions & 191 deletions
This file was deleted.

0 commit comments

Comments
 (0)