Skip to content

Commit 4cba4f8

Browse files
committed
Make table updates a multi-subscription stream
1 parent eaab81e commit 4cba4f8

File tree

3 files changed

+74
-54
lines changed

3 files changed

+74
-54
lines changed

packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
303303
final server = params.portServer;
304304
final commandPort = ReceivePort();
305305

306-
db.throttledUpdatedTables.listen((changedTables) {
306+
db.updatedTables.listen((changedTables) {
307307
client.fire(UpdateNotification(changedTables));
308308
});
309309

packages/sqlite_async/lib/src/utils/shared_utils.dart

Lines changed: 71 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -79,68 +79,87 @@ List<Object?> mapParameters(List<Object?> parameters) {
7979
}
8080

8181
extension ThrottledUpdates on CommonDatabase {
82-
/// Wraps [updatesSync] to:
82+
/// An unthrottled stream of updated tables that emits on every commit.
8383
///
84-
/// - Not fire in transactions.
85-
/// - Fire asynchronously.
86-
/// - Only report table names, which are buffered to avoid duplicates.
87-
Stream<Set<String>> get throttledUpdatedTables {
88-
StreamController<Set<String>>? controller;
89-
var pendingUpdates = <String>{};
90-
var paused = false;
91-
92-
Timer? updateDebouncer;
93-
94-
void maybeFireUpdates() {
95-
updateDebouncer?.cancel();
96-
updateDebouncer = null;
97-
98-
if (paused) {
99-
// Continue collecting updates, but don't fire any
100-
return;
84+
/// A paused subscription on this stream will buffer changed tables into a
85+
/// growing set instead of losing events, so this stream is simple to throttle
86+
/// downstream.
87+
Stream<Set<String>> get updatedTables {
88+
final listeners = <_UpdateListener>[];
89+
var uncommitedUpdates = <String>{};
90+
var underlyingSubscriptions = <StreamSubscription<void>>[];
91+
92+
void handleUpdate(SqliteUpdate update) {
93+
uncommitedUpdates.add(update.tableName);
94+
}
95+
96+
void afterCommit() {
97+
for (final listener in listeners) {
98+
listener.notify(uncommitedUpdates);
10199
}
102100

103-
if (!autocommit) {
104-
// Inside a transaction - do not fire updates
105-
return;
101+
uncommitedUpdates.clear();
102+
}
103+
104+
void afterRollback() {
105+
uncommitedUpdates.clear();
106+
}
107+
108+
void addListener(_UpdateListener listener) {
109+
listeners.add(listener);
110+
111+
if (listeners.length == 1) {
112+
// First listener, start listening for raw updates on underlying
113+
// database.
114+
underlyingSubscriptions = [
115+
updatesSync.listen(handleUpdate),
116+
commits.listen((_) => afterCommit()),
117+
commits.listen((_) => afterRollback())
118+
];
106119
}
120+
}
107121

108-
if (pendingUpdates.isNotEmpty) {
109-
controller!.add(pendingUpdates);
110-
pendingUpdates = {};
122+
void removeListener(_UpdateListener listener) {
123+
listeners.remove(listener);
124+
if (listeners.isEmpty) {
125+
for (final sub in underlyingSubscriptions) {
126+
sub.cancel();
127+
}
111128
}
112129
}
113130

114-
void collectUpdate(SqliteUpdate event) {
115-
pendingUpdates.add(event.tableName);
131+
return Stream.multi(
132+
(listener) {
133+
final wrapped = _UpdateListener(listener);
134+
addListener(wrapped);
116135

117-
updateDebouncer ??=
118-
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
136+
listener.onCancel = () => removeListener(wrapped);
137+
},
138+
isBroadcast: true,
139+
);
140+
}
141+
}
142+
143+
class _UpdateListener {
144+
final MultiStreamController<Set<String>> downstream;
145+
Set<String> buffered = {};
146+
147+
_UpdateListener(this.downstream);
148+
149+
void notify(Set<String> pendingUpdates) {
150+
buffered.addAll(pendingUpdates);
151+
if (!downstream.isPaused) {
152+
downstream.add(buffered);
153+
buffered = {};
119154
}
155+
}
156+
}
120157

121-
StreamSubscription? txSubscription;
122-
StreamSubscription? sourceSubscription;
123-
124-
controller = StreamController(onListen: () {
125-
txSubscription = commits.listen((_) {
126-
maybeFireUpdates();
127-
}, onError: (error) {
128-
controller?.addError(error);
129-
});
130-
131-
sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) {
132-
controller?.addError(error);
133-
});
134-
}, onPause: () {
135-
paused = true;
136-
}, onResume: () {
137-
paused = false;
138-
maybeFireUpdates();
139-
}, onCancel: () {
140-
txSubscription?.cancel();
141-
sourceSubscription?.cancel();
142-
});
143-
144-
return controller.stream;
158+
extension StreamUtils<T> on Stream<T> {
159+
Stream<T> pauseAfterEvent(Duration duration) async* {
160+
await for (final event in this) {
161+
yield event;
162+
await Future.delayed(duration);
163+
}
145164
}
146165
}

packages/sqlite_async/lib/src/web/worker/worker_utils.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class AsyncSqliteDatabase extends WorkerDatabase {
5656
final Map<ClientConnection, _ConnectionState> _state = {};
5757

5858
AsyncSqliteDatabase({required this.database})
59-
: _updates = database.throttledUpdatedTables;
59+
: _updates = database.updatedTables
60+
.pauseAfterEvent(const Duration(milliseconds: 1));
6061

6162
_ConnectionState _findState(ClientConnection connection) {
6263
return _state.putIfAbsent(connection, _ConnectionState.new);

0 commit comments

Comments
 (0)