Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/common/sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
///
/// Use this to access the database in background isolates.
IsolateConnectionFactory isolateConnectionFactory();

/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
/// This can be useful to run the same statement on all connections. For instance,
/// ATTACHing a database, that is expected to be available in all connections.
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block);
}

/// A SQLite database instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,12 @@ final class SingleConnectionDatabase
return connection.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(connection, []));
}
}
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@ class SqliteDatabaseImpl
Future<bool> getAutoCommit() {
throw UnimplementedError();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
throw UnimplementedError();
}
}
67 changes: 67 additions & 0 deletions packages/sqlite_async/lib/src/native/database/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {

final MutexImpl mutex;

bool _runningWithAllConnections = false;

@override
bool closed = false;

Expand Down Expand Up @@ -88,6 +90,11 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
return;
}

if (_availableReadConnections.isEmpty && _runningWithAllConnections) {
// Wait until withAllConnections is done
return;
}

var nextItem = _queue.removeFirst();
while (nextItem.completer.isCompleted) {
// This item already timed out - try the next one if available
Expand Down Expand Up @@ -232,6 +239,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
await connection.refreshSchema();
}
}

Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) async {
try {
_runningWithAllConnections = true;

final blockCompleter = Completer<T>();
final (write, reads) = await _lockAllConns<T>(blockCompleter);

try {
final res = await block(write, reads);
blockCompleter.complete(res);
return res;
} catch (e, st) {
blockCompleter.completeError(e, st);
rethrow;
}
} finally {
_runningWithAllConnections = false;

// Continue processing any pending read requests that may have been queued while
// the block was running.
Timer.run(_nextRead);
}
}

/// Locks all connections, returning the acquired contexts.
/// We pass a completer that would be called after the locks are taken.
Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
Completer<T> lockCompleter) async {
final List<Completer<SqliteReadContext>> readLockedCompleters = [];
final Completer<SqliteWriteContext> writeLockedCompleter = Completer();

// Take the write lock
writeLock((ctx) {
writeLockedCompleter.complete(ctx);
return lockCompleter.future;
});

// Take all the read locks
for (final readConn in _allReadConnections) {
final completer = Completer<SqliteReadContext>();
readLockedCompleters.add(completer);

readConn.readLock((ctx) {
completer.complete(ctx);
return lockCompleter.future;
});
}

// Wait after all locks are taken
final [writer as SqliteWriteContext, ...readers] = await Future.wait([
writeLockedCompleter.future,
...readLockedCompleters.map((e) => e.future)
]);

return (writer, readers);
}
}

typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,12 @@ class SqliteDatabaseImpl
Future<void> refreshSchema() {
return _pool.refreshSchema();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return _pool.withAllConnections(block);
}
}
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ class WebDatabase
await isInitialized;
return _database.fileSystem.flush();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(this, []));
}
}

final class _UnscopedContext extends UnscopedContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,12 @@ class SqliteDatabaseImpl
Future<WebDatabaseEndpoint> exposeEndpoint() async {
return await _connection.exposeEndpoint();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(_connection, []));
}
}
149 changes: 149 additions & 0 deletions packages/sqlite_async/test/native/basic_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
library;

import 'dart:async';
import 'dart:io';
import 'dart:math';

import 'package:collection/collection.dart';
import 'package:path/path.dart' show join;
import 'package:sqlite3/common.dart' as sqlite;
import 'package:sqlite3/sqlite3.dart' show Row;
import 'package:sqlite_async/sqlite_async.dart';
import 'package:test/test.dart';

import '../utils/abstract_test_utils.dart';
import '../utils/test_utils_impl.dart';

final testUtils = TestUtils();
Expand Down Expand Up @@ -100,6 +105,122 @@ void main() {
print("${DateTime.now()} done");
});

test('with all connections', () async {
final db = SqliteDatabase.withFactory(
await testUtils.testFactory(path: path),
maxReaders: 3);
await db.initialize();
await createTables(db);

Future<Row> readWithRandomDelay(SqliteReadContext ctx, int id) async {
return await ctx.get(
'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
[id, 5 + Random().nextInt(10)]);
}

// Warm up to spawn the max readers
await Future.wait(
[1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)),
);

bool finishedWithAllConns = false;

late Future<void> readsCalledWhileWithAllConnsRunning;

print("${DateTime.now()} start");
await db.withAllConnections((writer, readers) async {
expect(readers.length, 3);

// Run some reads during the block that they should run after the block finishes and releases
// all locks
readsCalledWhileWithAllConnsRunning = Future.wait(
[1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
final r = await db.readLock((c) async {
expect(finishedWithAllConns, isTrue);
return await readWithRandomDelay(c, i);
});
print(
"${DateTime.now()} After withAllConnections, started while running $r");
}),
);

await Future.wait([
writer.execute(
"INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *",
[
123,
5 + Random().nextInt(20)
]).then((value) =>
print("${DateTime.now()} withAllConnections writer done $value")),
...readers
.mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) {
print(
"${DateTime.now()} withAllConnections readers done $results");
}))
]);
}).then((_) => finishedWithAllConns = true);

await readsCalledWhileWithAllConnsRunning;
});

test('prevent opening new readers while in withAllConnections', () async {
final sharedStateDir = Directory.systemTemp.createTempSync();
addTearDown(() => sharedStateDir.deleteSync(recursive: true));

final File sharedStateFile =
File(join(sharedStateDir.path, 'shared-state.txt'));

sharedStateFile.writeAsStringSync('initial');

final db = SqliteDatabase.withFactory(
_TestSqliteOpenFactoryWithSharedStateFile(
path: path, sharedStateFilePath: sharedStateFile.path),
maxReaders: 3);
await db.initialize();
await createTables(db);

// The writer saw 'initial' in the file when opening the connection
expect(
await db
.writeLock((c) => c.get('SELECT file_contents_on_open() AS state')),
{'state': 'initial'},
);

final withAllConnectionsCompleter = Completer<void>();

final withAllConnsFut = db.withAllConnections((writer, readers) async {
expect(readers.length, 0); // No readers yet

// Simulate some work until the file is updated
await Future.delayed(const Duration(milliseconds: 200));
sharedStateFile.writeAsStringSync('updated');

await withAllConnectionsCompleter.future;
});

// Start a reader that gets the contents of the shared file
bool readFinished = false;
final someReadFut =
db.get('SELECT file_contents_on_open() AS state', []).then((r) {
readFinished = true;
return r;
});

// The withAllConnections should prevent the reader from opening
await Future.delayed(const Duration(milliseconds: 100));
expect(readFinished, isFalse);

// Free all the locks
withAllConnectionsCompleter.complete();
await withAllConnsFut;

final readerInfo = await someReadFut;
expect(readFinished, isTrue);
// The read should see the updated value in the file. This checks
// that a reader doesn't spawn while running withAllConnections
expect(readerInfo, {'state': 'updated'});
});

test('read-only transactions', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
Expand Down Expand Up @@ -379,3 +500,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory {
];
}
}

class _TestSqliteOpenFactoryWithSharedStateFile
extends TestDefaultSqliteOpenFactory {
final String sharedStateFilePath;

_TestSqliteOpenFactoryWithSharedStateFile(
{required super.path, required this.sharedStateFilePath});

@override
sqlite.CommonDatabase open(SqliteOpenOptions options) {
final File sharedStateFile = File(sharedStateFilePath);
final String sharedState = sharedStateFile.readAsStringSync();

final db = super.open(options);

// Function to return the contents of the shared state file at the time of opening
// so that we know at which point the factory was called.
db.createFunction(
functionName: 'file_contents_on_open',
argumentCount: const sqlite.AllowedArgumentCount(0),
function: (args) {
return sharedState;
},
);

return db;
}
}