Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/common/sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
///
/// Use this to access the database in background isolates.
IsolateConnectionFactory isolateConnectionFactory();

/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
/// This can be useful to run the same statement on all connections. For instance,
/// ATTACHing a database, that is expected to be available in all connections.
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block);
}

/// A SQLite database instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,12 @@ final class SingleConnectionDatabase
return connection.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(connection, []));
}
}
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,12 @@ class SqliteDatabaseImpl
Future<bool> getAutoCommit() {
throw UnimplementedError();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
throw UnimplementedError();
}
}
70 changes: 70 additions & 0 deletions packages/sqlite_async/lib/src/native/database/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {

final MutexImpl mutex;

int _runningWithAllConnectionsCount = 0;

@override
bool closed = false;

Expand Down Expand Up @@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
return;
}

if (_availableReadConnections.isEmpty &&
_runningWithAllConnectionsCount > 0) {
// Wait until [withAllConnections] is done. Otherwise we could spawn a new
// reader while the user is configuring all the connections,
// e.g. a global open factory configuration shared across all connections.
return;
}

var nextItem = _queue.removeFirst();
while (nextItem.completer.isCompleted) {
// This item already timed out - try the next one if available
Expand Down Expand Up @@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
await connection.refreshSchema();
}
}

Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) async {
try {
_runningWithAllConnectionsCount++;

final blockCompleter = Completer<T>();
final (write, reads) = await _lockAllConns<T>(blockCompleter);

try {
final res = await block(write, reads);
blockCompleter.complete(res);
return res;
} catch (e, st) {
blockCompleter.completeError(e, st);
rethrow;
}
} finally {
_runningWithAllConnectionsCount--;

// Continue processing any pending read requests that may have been queued while
// the block was running.
Timer.run(_nextRead);
}
}

/// Locks all connections, returning the acquired contexts.
/// We pass a completer that would be called after the locks are taken.
Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
Completer<T> lockCompleter) async {
final List<Completer<SqliteReadContext>> readLockedCompleters = [];
final Completer<SqliteWriteContext> writeLockedCompleter = Completer();

// Take the write lock
writeLock((ctx) {
writeLockedCompleter.complete(ctx);
return lockCompleter.future;
});

// Take all the read locks
for (final readConn in _allReadConnections) {
final completer = Completer<SqliteReadContext>();
readLockedCompleters.add(completer);

readConn.readLock((ctx) {
completer.complete(ctx);
return lockCompleter.future;
});
}

// Wait after all locks are taken
final [writer as SqliteWriteContext, ...readers] = await Future.wait([
writeLockedCompleter.future,
...readLockedCompleters.map((e) => e.future)
]);

return (writer, readers);
}
}

typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,12 @@ class SqliteDatabaseImpl
Future<void> refreshSchema() {
return _pool.refreshSchema();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return _pool.withAllConnections(block);
}
}
8 changes: 8 additions & 0 deletions packages/sqlite_async/lib/src/web/database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ class WebDatabase
await isInitialized;
return _database.fileSystem.flush();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(this, []));
}
}

final class _UnscopedContext extends UnscopedContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,12 @@ class SqliteDatabaseImpl
Future<WebDatabaseEndpoint> exposeEndpoint() async {
return await _connection.exposeEndpoint();
}

@override
Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) {
return writeLock((_) => block(_connection, []));
}
}
44 changes: 44 additions & 0 deletions packages/sqlite_async/test/basic_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();
const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm');
const _isWeb = identical(0, 0.0) || _isDart2Wasm;

void main() {
group('Shared Basic Tests', () {
Expand Down Expand Up @@ -301,6 +302,49 @@ void main() {
'Web locks are managed with a shared worker, which does not support timeouts',
)
});

test('with all connections', () async {
final maxReaders = _isWeb ? 0 : 3;

final db = SqliteDatabase.withFactory(
await testUtils.testFactory(path: path),
maxReaders: maxReaders,
);
await db.initialize();
await createTables(db);

// Warm up to spawn the max readers
await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]);

bool finishedWithAllConns = false;

late Future<void> readsCalledWhileWithAllConnsRunning;

final parentZone = Zone.current;
await db.withAllConnections((writer, readers) async {
expect(readers.length, maxReaders);

// Run some reads during the block that they should run after the block finishes and releases
// all locks
// Need a root zone here to avoid recursive lock errors.
readsCalledWhileWithAllConnsRunning =
Future(parentZone.bindCallback(() async {
await Future.wait(
[1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
await db.readLock((c) async {
expect(finishedWithAllConns, isTrue);
await Future.delayed(const Duration(milliseconds: 100));
});
}),
);
}));

await Future.delayed(const Duration(milliseconds: 200));
finishedWithAllConns = true;
});

await readsCalledWhileWithAllConnsRunning;
});
});
}

Expand Down
Loading