Skip to content

Commit b97be61

Browse files
Get all Sqlite connections in the pool (#101)
1 parent 0234d4f commit b97be61

File tree

9 files changed

+314
-0
lines changed

9 files changed

+314
-0
lines changed

packages/sqlite_async/lib/src/common/sqlite_database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
3939
///
4040
/// Use this to access the database in background isolates.
4141
IsolateConnectionFactory isolateConnectionFactory();
42+
43+
/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
44+
/// This can be useful to run the same statement on all connections. For instance,
45+
/// ATTACHing a database, that is expected to be available in all connections.
46+
Future<T> withAllConnections<T>(
47+
Future<T> Function(
48+
SqliteWriteContext writer, List<SqliteReadContext> readers)
49+
block);
4250
}
4351

4452
/// A SQLite database instance.

packages/sqlite_async/lib/src/impl/single_connection_database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,12 @@ final class SingleConnectionDatabase
5757
return connection.writeLock(callback,
5858
lockTimeout: lockTimeout, debugContext: debugContext);
5959
}
60+
61+
@override
62+
Future<T> withAllConnections<T>(
63+
Future<T> Function(
64+
SqliteWriteContext writer, List<SqliteReadContext> readers)
65+
block) {
66+
return writeLock((_) => block(connection, []));
67+
}
6068
}

packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,12 @@ class SqliteDatabaseImpl
6464
Future<bool> getAutoCommit() {
6565
throw UnimplementedError();
6666
}
67+
68+
@override
69+
Future<T> withAllConnections<T>(
70+
Future<T> Function(
71+
SqliteWriteContext writer, List<SqliteReadContext> readers)
72+
block) {
73+
throw UnimplementedError();
74+
}
6775
}

packages/sqlite_async/lib/src/native/database/connection_pool.dart

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
3131

3232
final MutexImpl mutex;
3333

34+
int _runningWithAllConnectionsCount = 0;
35+
3436
@override
3537
bool closed = false;
3638

@@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
8890
return;
8991
}
9092

93+
if (_availableReadConnections.isEmpty &&
94+
_runningWithAllConnectionsCount > 0) {
95+
// Wait until [withAllConnections] is done. Otherwise we could spawn a new
96+
// reader while the user is configuring all the connections,
97+
// e.g. a global open factory configuration shared across all connections.
98+
return;
99+
}
100+
91101
var nextItem = _queue.removeFirst();
92102
while (nextItem.completer.isCompleted) {
93103
// This item already timed out - try the next one if available
@@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
232242
await connection.refreshSchema();
233243
}
234244
}
245+
246+
Future<T> withAllConnections<T>(
247+
Future<T> Function(
248+
SqliteWriteContext writer, List<SqliteReadContext> readers)
249+
block) async {
250+
try {
251+
_runningWithAllConnectionsCount++;
252+
253+
final blockCompleter = Completer<T>();
254+
final (write, reads) = await _lockAllConns<T>(blockCompleter);
255+
256+
try {
257+
final res = await block(write, reads);
258+
blockCompleter.complete(res);
259+
return res;
260+
} catch (e, st) {
261+
blockCompleter.completeError(e, st);
262+
rethrow;
263+
}
264+
} finally {
265+
_runningWithAllConnectionsCount--;
266+
267+
// Continue processing any pending read requests that may have been queued while
268+
// the block was running.
269+
Timer.run(_nextRead);
270+
}
271+
}
272+
273+
/// Locks all connections, returning the acquired contexts.
274+
/// We pass a completer that would be called after the locks are taken.
275+
Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
276+
Completer<T> lockCompleter) async {
277+
final List<Completer<SqliteReadContext>> readLockedCompleters = [];
278+
final Completer<SqliteWriteContext> writeLockedCompleter = Completer();
279+
280+
// Take the write lock
281+
writeLock((ctx) {
282+
writeLockedCompleter.complete(ctx);
283+
return lockCompleter.future;
284+
});
285+
286+
// Take all the read locks
287+
for (final readConn in _allReadConnections) {
288+
final completer = Completer<SqliteReadContext>();
289+
readLockedCompleters.add(completer);
290+
291+
readConn.readLock((ctx) {
292+
completer.complete(ctx);
293+
return lockCompleter.future;
294+
});
295+
}
296+
297+
// Wait after all locks are taken
298+
final [writer as SqliteWriteContext, ...readers] = await Future.wait([
299+
writeLockedCompleter.future,
300+
...readLockedCompleters.map((e) => e.future)
301+
]);
302+
303+
return (writer, readers);
304+
}
235305
}
236306

237307
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);

packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,12 @@ class SqliteDatabaseImpl
171171
Future<void> refreshSchema() {
172172
return _pool.refreshSchema();
173173
}
174+
175+
@override
176+
Future<T> withAllConnections<T>(
177+
Future<T> Function(
178+
SqliteWriteContext writer, List<SqliteReadContext> readers)
179+
block) {
180+
return _pool.withAllConnections(block);
181+
}
174182
}

packages/sqlite_async/lib/src/web/database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ class WebDatabase
171171
await isInitialized;
172172
return _database.fileSystem.flush();
173173
}
174+
175+
@override
176+
Future<T> withAllConnections<T>(
177+
Future<T> Function(
178+
SqliteWriteContext writer, List<SqliteReadContext> readers)
179+
block) {
180+
return writeLock((_) => block(this, []));
181+
}
174182
}
175183

176184
final class _UnscopedContext extends UnscopedContext {

packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,12 @@ class SqliteDatabaseImpl
178178
Future<WebDatabaseEndpoint> exposeEndpoint() async {
179179
return await _connection.exposeEndpoint();
180180
}
181+
182+
@override
183+
Future<T> withAllConnections<T>(
184+
Future<T> Function(
185+
SqliteWriteContext writer, List<SqliteReadContext> readers)
186+
block) {
187+
return writeLock((_) => block(_connection, []));
188+
}
181189
}

packages/sqlite_async/test/basic_test.dart

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart';
77

88
final testUtils = TestUtils();
99
const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm');
10+
const _isWeb = identical(0, 0.0) || _isDart2Wasm;
1011

1112
void main() {
1213
group('Shared Basic Tests', () {
@@ -301,6 +302,49 @@ void main() {
301302
'Web locks are managed with a shared worker, which does not support timeouts',
302303
)
303304
});
305+
306+
test('with all connections', () async {
307+
final maxReaders = _isWeb ? 0 : 3;
308+
309+
final db = SqliteDatabase.withFactory(
310+
await testUtils.testFactory(path: path),
311+
maxReaders: maxReaders,
312+
);
313+
await db.initialize();
314+
await createTables(db);
315+
316+
// Warm up to spawn the max readers
317+
await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]);
318+
319+
bool finishedWithAllConns = false;
320+
321+
late Future<void> readsCalledWhileWithAllConnsRunning;
322+
323+
final parentZone = Zone.current;
324+
await db.withAllConnections((writer, readers) async {
325+
expect(readers.length, maxReaders);
326+
327+
// Run some reads during the block that they should run after the block finishes and releases
328+
// all locks
329+
// Need a root zone here to avoid recursive lock errors.
330+
readsCalledWhileWithAllConnsRunning =
331+
Future(parentZone.bindCallback(() async {
332+
await Future.wait(
333+
[1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
334+
await db.readLock((c) async {
335+
expect(finishedWithAllConns, isTrue);
336+
await Future.delayed(const Duration(milliseconds: 100));
337+
});
338+
}),
339+
);
340+
}));
341+
342+
await Future.delayed(const Duration(milliseconds: 200));
343+
finishedWithAllConns = true;
344+
});
345+
346+
await readsCalledWhileWithAllConnsRunning;
347+
});
304348
});
305349
}
306350

0 commit comments

Comments
 (0)