diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cb1814b..0e0d34b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -3,13 +3,15 @@ name: Test on: push: branches: - - "**" + - "*" + pull_request: jobs: build: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 - name: Install Melos @@ -29,6 +31,7 @@ jobs: test: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) strategy: fail-fast: false matrix: @@ -49,7 +52,7 @@ jobs: sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" dart_sdk: stable steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 with: sdk: ${{ matrix.dart_sdk }} diff --git a/CHANGELOG.md b/CHANGELOG.md index d9cda07..5181152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,89 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-10-13 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.2`](#sqlite_async---v0122) + - [`drift_sqlite_async` - `v0.2.5`](#drift_sqlite_async---v025) + +--- + +#### `sqlite_async` - `v0.12.2` + + - Add `withAllConnections` method to run statements on all connections in the pool. + +#### `drift_sqlite_async` - `v0.2.5` + + - Allow customizing update notifications from `sqlite_async`. + + +## 2025-08-08 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121) + - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) + - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) + +Packages with dependency updates only: + +> Packages listed below depend on other packages in this workspace that have had changes. Their versions have been incremented to bump the minimum dependency versions of the packages they depend upon in this project. + + - `drift_sqlite_async` - `v0.2.3+1` + +--- + +#### `sqlite_async` - `v0.12.1` + +- Fix distributing updates from shared worker. + +#### `sqlite_async` - `v0.12.0` + + - Avoid large transactions creating a large internal update queue. + + +## 2025-07-29 + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.11.8`](#sqlite_async---v0118) + - [`drift_sqlite_async` - `v0.2.3`](#drift_sqlite_async---v023) + +--- + +#### `sqlite_async` - `v0.11.8` + +- Support nested transactions (emulated with `SAVEPOINT` statements). +- Fix web compilation issues with version `2.8.0` of `package:sqlite3`. + +#### `drift_sqlite_async` - `v0.2.3` + +- Support nested transactions. + ## 2025-06-03 --- diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index b101ac4..bd36dd1 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,19 @@ +## 0.2.5 + + - Allow customizing update notifications from `sqlite_async`. + +## 0.2.4 + +- Allow transforming table updates from sqlite_async. + +## 0.2.3+1 + + - Update a dependency to the latest release. + +## 0.2.3 + +- Support nested transactions. + ## 0.2.2 - Fix write detection when using UPDATE/INSERT/DELETE with RETURNING in raw queries. diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index a1af55a..6c34ed4 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,12 +15,22 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; - SqliteAsyncDriftConnection(SqliteConnection db, {bool logStatements = false}) - : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { + /// [transformTableUpdates] is useful to map local table names from PowerSync that are backed by a view name + /// which is the entity that the user interacts with. + SqliteAsyncDriftConnection( + SqliteConnection db, { + bool logStatements = false, + Set Function(UpdateNotification)? transformTableUpdates, + }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { - var setUpdates = {}; - for (var tableName in event.tables) { - setUpdates.add(TableUpdate(tableName)); + final Set setUpdates; + if (transformTableUpdates != null) { + setUpdates = transformTableUpdates(event); + } else { + setUpdates = {}; + for (var tableName in event.tables) { + setUpdates.add(TableUpdate(tableName)); + } } super.streamQueries.handleTableUpdates(setUpdates); }); diff --git a/packages/drift_sqlite_async/lib/src/executor.dart b/packages/drift_sqlite_async/lib/src/executor.dart index 41886f3..127462e 100644 --- a/packages/drift_sqlite_async/lib/src/executor.dart +++ b/packages/drift_sqlite_async/lib/src/executor.dart @@ -129,13 +129,28 @@ class _SqliteAsyncTransactionDelegate extends SupportedTransactionDelegate { _SqliteAsyncTransactionDelegate(this._db); + @override + FutureOr Function(QueryDelegate, Future Function(QueryDelegate))? + get startNested => _startNested; + @override Future startTransaction(Future Function(QueryDelegate p1) run) async { - await _db.writeTransaction((context) async { + await _startTransaction(_db, run); + } + + Future _startTransaction( + SqliteWriteContext context, Future Function(QueryDelegate p1) run) async { + await context.writeTransaction((context) async { final delegate = _SqliteAsyncQueryDelegate(context, null); return run(delegate); }); } + + Future _startNested( + QueryDelegate outer, Future Function(QueryDelegate) block) async { + await _startTransaction( + (outer as _SqliteAsyncQueryDelegate)._context, block); + } } class _SqliteAsyncVersionDelegate extends DynamicVersionDelegate { diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 62a64da..bd2b701 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.2 +version: 0.2.5 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -14,12 +14,12 @@ topics: environment: sdk: ">=3.0.0 <4.0.0" dependencies: - drift: ">=2.19.0 <3.0.0" - sqlite_async: ^0.11.0 + drift: ">=2.28.0 <3.0.0" + sqlite_async: ^0.12.2 dev_dependencies: build_runner: ^2.4.8 - drift_dev: ">=2.19.0 <3.0.0" + drift_dev: ">=2.28.0 <3.0.0" glob: ^2.1.2 lints: ^5.0.0 sqlite3: ^2.4.0 diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index 339cc04..cac6f55 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -11,6 +11,7 @@ import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import './utils/test_utils.dart'; +import 'generated/database.dart'; class EmptyDatabase extends GeneratedDatabase { EmptyDatabase(super.executor); @@ -182,7 +183,7 @@ void main() { {'description': 'Test 1'}, {'description': 'Test 3'} ])); - }, skip: 'sqlite_async does not support nested transactions'); + }); test('Concurrent select', () async { var completer1 = Completer(); @@ -245,4 +246,43 @@ INSERT INTO test_data(description) VALUES('test data'); expect(row, isEmpty); }); }); + + test('transform table updates', () async { + final path = dbPath(); + await cleanDb(path: path); + + final db = await setupDatabase(path: path); + final connection = SqliteAsyncDriftConnection( + db, + // tables with the local_ prefix are mapped to the name without the prefix + transformTableUpdates: (event) { + final updates = {}; + + for (final originalTableName in event.tables) { + final effectiveName = originalTableName.startsWith("local_") + ? originalTableName.substring(6) + : originalTableName; + updates.add(TableUpdate(effectiveName)); + } + + return updates; + }, + ); + + // Create table with a different name than drift. (Mimicking a table name backed by a view in PowerSync with the optional sync strategy) + await db.execute( + 'CREATE TABLE local_todos(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)', + ); + + final dbu = TodoDatabase.fromSqliteAsyncConnection(connection); + + final tableUpdatesFut = + dbu.tableUpdates(TableUpdateQuery.onTableName("todos")).first; + + // This insert will trigger the sqlite_async "updates" stream + await db.execute("INSERT INTO local_todos(description) VALUES('Test 1')"); + + expect(await tableUpdatesFut.timeout(const Duration(seconds: 2)), + {TableUpdate("todos")}); + }); } diff --git a/packages/drift_sqlite_async/test/db_test.dart b/packages/drift_sqlite_async/test/db_test.dart index bda09df..f5b269a 100644 --- a/packages/drift_sqlite_async/test/db_test.dart +++ b/packages/drift_sqlite_async/test/db_test.dart @@ -117,5 +117,35 @@ void main() { final deleted = await dbu.delete(dbu.todoItems).go(); expect(deleted, 10); }); + + test('nested transactions', () async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'root')); + + await dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx0')); + + await dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx1')); + + await expectLater(() { + return dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx2')); + throw 'rollback'; + }); + }, throwsA(anything)); + }); + }); + + final items = await dbu.todoItems.all().get(); + expect(items.map((e) => e.description).toSet(), {'root', 'tx0', 'tx1'}); + }); }); } diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index 928c7dd..4e03600 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -16,6 +16,8 @@ class TodoItems extends Table { class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); + @override int get schemaVersion => 1; } diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 97dbf50..1bbc939 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,20 @@ +## 0.12.2 + + - Add `withAllConnections` method to run statements on all connections in the pool. + +## 0.12.1 + +- Fix distributing updates from shared worker. + +## 0.12.0 + + - Avoid large transactions creating a large internal update queue. + +## 0.11.8 + +- Support nested transactions (emulated with `SAVEPOINT` statements). +- Fix web compilation issues with version `2.8.0` of `package:sqlite3`. + ## 0.11.7 - Shared worker: Release locks owned by connected client tab when it closes. diff --git a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart index d84bdaa..f63e0df 100644 --- a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart @@ -8,9 +8,11 @@ import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; +import '../../impl/context.dart'; + /// A simple "synchronous" connection which provides the async SqliteConnection /// implementation using a synchronous SQLite connection -class SyncSqliteConnection extends SqliteConnection with SqliteQueries { +class SyncSqliteConnection with SqliteQueries implements SqliteConnection { final CommonDatabase db; late Mutex mutex; @override @@ -44,7 +46,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncReadContext(db, parent: task)); + return ScopedReadContext.assumeReadLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -59,7 +64,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncWriteContext(db, parent: task)); + return ScopedWriteContext.assumeWriteLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -80,12 +88,12 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { } } -class SyncReadContext implements SqliteReadContext { +final class _UnsafeSyncContext extends UnscopedContext { final TimelineTask? task; CommonDatabase db; - SyncReadContext(this.db, {TimelineTask? parent}) + _UnsafeSyncContext(this.db, {TimelineTask? parent}) : task = TimelineTask(parent: parent); @override @@ -129,10 +137,6 @@ class SyncReadContext implements SqliteReadContext { Future getAutoCommit() async { return db.autocommit; } -} - -class SyncWriteContext extends SyncReadContext implements SqliteWriteContext { - SyncWriteContext(super.db, {super.parent}); @override Future execute(String sql, diff --git a/packages/sqlite_async/lib/src/common/sqlite_database.dart b/packages/sqlite_async/lib/src/common/sqlite_database.dart index 3cb12bb..3201135 100644 --- a/packages/sqlite_async/lib/src/common/sqlite_database.dart +++ b/packages/sqlite_async/lib/src/common/sqlite_database.dart @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { /// /// Use this to access the database in background isolates. IsolateConnectionFactory isolateConnectionFactory(); + + /// Locks all underlying connections making up this database, and gives [block] access to all of them at once. + /// This can be useful to run the same statement on all connections. For instance, + /// ATTACHing a database, that is expected to be available in all connections. + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block); } /// A SQLite database instance. diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart new file mode 100644 index 0000000..0e3eef6 --- /dev/null +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -0,0 +1,208 @@ +import 'package:sqlite3/common.dart'; + +import '../sqlite_connection.dart'; + +/// A context that can be used to run both reading and writing queries - +/// basically a [SqliteWriteContext] without the ability to start transactions. +/// +/// Instances of this are not given out to clients - instead, they are wrapped +/// with [ScopedReadContext] and [ScopedWriteContext] after obtaining a lock. +/// Those wrapped views have a shorter lifetime (they can be closed +/// independently, and verify that they're not being used after being closed). +abstract base class UnscopedContext implements SqliteReadContext { + Future execute(String sql, List parameters); + Future executeBatch(String sql, List> parameterSets); + + /// Returns an [UnscopedContext] useful as the outermost transaction. + /// + /// This is called by [ScopedWriteContext.writeTransaction] _after_ executing + /// the first `BEGIN` statement. + /// This is used on the web to assert that the auto-commit state is false + /// before running statements. + UnscopedContext interceptOutermostTransaction() { + return this; + } +} + +/// A view over an [UnscopedContext] implementing [SqliteReadContext]. +final class ScopedReadContext implements SqliteReadContext { + final UnscopedContext _context; + + /// Whether this context view is locked on an inner operation like a + /// transaction. + /// + /// We don't use a mutex because we don't want to serialize access - we just + /// want to forbid concurrent operations. + bool _isLocked = false; + + /// Whether this particular view of a read context has been closed, e.g. + /// because the callback owning it has returned. + bool _closed = false; + + ScopedReadContext(this._context); + + void _checkNotLocked() { + _checkStillOpen(); + + if (_isLocked) { + throw StateError( + 'The context from the callback was locked, e.g. due to a nested ' + 'transaction.'); + } + } + + void _checkStillOpen() { + if (_closed) { + throw StateError('This context to a callback is no longer open. ' + 'Make sure to await all statements on a database to avoid a context ' + 'still being used after its callback has finished.'); + } + } + + @override + bool get closed => _closed || _context.closed; + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) async { + _checkNotLocked(); + return await _context.computeWithDatabase(compute); + } + + @override + Future get(String sql, [List parameters = const []]) async { + _checkNotLocked(); + return _context.get(sql, parameters); + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.getAll(sql, parameters); + } + + @override + Future getAutoCommit() async { + _checkStillOpen(); + return _context.getAutoCommit(); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return _context.getOptional(sql, parameters); + } + + void invalidate() => _closed = true; + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] read-access to the database. + /// + /// Assumes that a read lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeReadLock( + UnscopedContext unsafe, + Future Function(SqliteReadContext) callback, + ) async { + final scoped = ScopedReadContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} + +final class ScopedWriteContext extends ScopedReadContext + implements SqliteWriteContext { + /// The "depth" of virtual nested transaction. + /// + /// A value of `0` indicates that this is operating outside of a transaction. + /// A value of `1` indicates a regular transaction (guarded with `BEGIN` and + /// `COMMIT` statements). + /// All higher values indicate a nested transaction implemented with + /// savepoint statements. + final int transactionDepth; + + ScopedWriteContext(super._context, {this.transactionDepth = 0}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.execute(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + _checkNotLocked(); + + return await _context.executeBatch(sql, parameterSets); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback) async { + _checkNotLocked(); + final (begin, commit, rollback) = _beginCommitRollback(transactionDepth); + ScopedWriteContext? inner; + + final innerContext = transactionDepth == 0 + ? _context.interceptOutermostTransaction() + : _context; + + try { + _isLocked = true; + + await _context.execute(begin, const []); + + inner = ScopedWriteContext(innerContext, + transactionDepth: transactionDepth + 1); + final result = await callback(inner); + await innerContext.execute(commit, const []); + return result; + } catch (e) { + try { + await innerContext.execute(rollback, const []); + } catch (e) { + // In rare cases, a ROLLBACK may fail. + // Safe to ignore. + } + rethrow; + } finally { + _isLocked = false; + inner?.invalidate(); + } + } + + static (String, String, String) _beginCommitRollback(int level) { + return switch (level) { + 0 => ('BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK'), + final nested => ( + 'SAVEPOINT s$nested', + 'RELEASE s$nested', + 'ROLLBACK TO s$nested' + ) + }; + } + + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] access to the database. + /// + /// Assumes that a write lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. + static Future assumeWriteLock( + UnscopedContext unsafe, + Future Function(SqliteWriteContext) callback, + ) async { + final scoped = ScopedWriteContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart index 4cd3144..7ca4357 100644 --- a/packages/sqlite_async/lib/src/impl/single_connection_database.dart +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -57,4 +57,12 @@ final class SingleConnectionDatabase return connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart index 29db641..ee254f3 100644 --- a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart @@ -64,4 +64,12 @@ class SqliteDatabaseImpl Future getAutoCommit() { throw UnimplementedError(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + throw UnimplementedError(); + } } diff --git a/packages/sqlite_async/lib/src/native/database/connection_pool.dart b/packages/sqlite_async/lib/src/native/database/connection_pool.dart index 9521b34..8dab27e 100644 --- a/packages/sqlite_async/lib/src/native/database/connection_pool.dart +++ b/packages/sqlite_async/lib/src/native/database/connection_pool.dart @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final MutexImpl mutex; + int _runningWithAllConnectionsCount = 0; + @override bool closed = false; @@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } + if (_availableReadConnections.isEmpty && + _runningWithAllConnectionsCount > 0) { + // Wait until [withAllConnections] is done. Otherwise we could spawn a new + // reader while the user is configuring all the connections, + // e.g. a global open factory configuration shared across all connections. + return; + } + var nextItem = _queue.removeFirst(); while (nextItem.completer.isCompleted) { // This item already timed out - try the next one if available @@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await connection.refreshSchema(); } } + + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) async { + try { + _runningWithAllConnectionsCount++; + + final blockCompleter = Completer(); + final (write, reads) = await _lockAllConns(blockCompleter); + + try { + final res = await block(write, reads); + blockCompleter.complete(res); + return res; + } catch (e, st) { + blockCompleter.completeError(e, st); + rethrow; + } + } finally { + _runningWithAllConnectionsCount--; + + // Continue processing any pending read requests that may have been queued while + // the block was running. + Timer.run(_nextRead); + } + } + + /// Locks all connections, returning the acquired contexts. + /// We pass a completer that would be called after the locks are taken. + Future<(SqliteWriteContext, List)> _lockAllConns( + Completer lockCompleter) async { + final List> readLockedCompleters = []; + final Completer writeLockedCompleter = Completer(); + + // Take the write lock + writeLock((ctx) { + writeLockedCompleter.complete(ctx); + return lockCompleter.future; + }); + + // Take all the read locks + for (final readConn in _allReadConnections) { + final completer = Completer(); + readLockedCompleters.add(completer); + + readConn.readLock((ctx) { + completer.complete(ctx); + return lockCompleter.future; + }); + } + + // Wait after all locks are taken + final [writer as SqliteWriteContext, ...readers] = await Future.wait([ + writeLockedCompleter.future, + ...readLockedCompleters.map((e) => e.future) + ]); + + return (writer, readers); + } } typedef ReadCallback = Future Function(SqliteReadContext tx); diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 38c184e..e90739e 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -14,6 +14,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import '../../impl/context.dart'; import 'upstream_updates.dart'; typedef TxCallback = Future Function(CommonDatabase db); @@ -64,8 +65,8 @@ class SqliteConnectionImpl return _isolateClient.closed; } - _TransactionContext _context() { - return _TransactionContext( + _UnsafeContext _context() { + return _UnsafeContext( _isolateClient, profileQueries ? TimelineTask() : null); } @@ -137,7 +138,7 @@ class SqliteConnectionImpl return _connectionMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return await ScopedReadContext.assumeReadLock(ctx, callback); } finally { await ctx.close(); } @@ -160,7 +161,7 @@ class SqliteConnectionImpl return await _writeMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return await ScopedWriteContext.assumeWriteLock(ctx, callback); } finally { await ctx.close(); } @@ -177,14 +178,14 @@ class SqliteConnectionImpl int _nextCtxId = 1; -class _TransactionContext implements SqliteWriteContext { +final class _UnsafeContext extends UnscopedContext { final PortClient _sendPort; bool _closed = false; final int ctxId = _nextCtxId++; final TimelineTask? task; - _TransactionContext(this._sendPort, this.task); + _UnsafeContext(this._sendPort, this.task); @override bool get closed { @@ -302,40 +303,13 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - Timer? updateDebouncer; - Set updatedTables = {}; + db.updatedTables.listen((changedTables) { + client.fire(UpdateNotification(changedTables)); + }); + int? txId; Object? txError; - void maybeFireUpdates() { - // We keep buffering the set of updated tables until we are not - // in a transaction. Firing transactions inside a transaction - // has multiple issues: - // 1. Watched queries would detect changes to the underlying tables, - // but the data would not be visible to queries yet. - // 2. It would trigger many more notifications than required. - // - // This still includes updates for transactions that are rolled back. - // We could handle those better at a later stage. - - if (updatedTables.isNotEmpty && db.autocommit) { - client.fire(UpdateNotification(updatedTables)); - updatedTables.clear(); - } - updateDebouncer?.cancel(); - updateDebouncer = null; - } - - db.updates.listen((event) { - updatedTables.add(event.tableName); - - // This handles two cases: - // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). - // 2. Long-running _SqliteIsolateClosure that should fire updates while running. - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - }); - ResultSet runStatement(_SqliteIsolateStatement data) { if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { if (txId != null) { @@ -387,8 +361,6 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, throw sqlite.SqliteException( 0, 'Transaction must be closed within the read or write lock'); } - // We would likely have received updates by this point - fire now. - maybeFireUpdates(); return null; case _SqliteIsolateStatement(): return task.timeSync( @@ -398,11 +370,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, parameters: data.args, ); case _SqliteIsolateClosure(): - try { - return await data.cb(db); - } finally { - maybeFireUpdates(); - } + return await data.cb(db); case _SqliteIsolateConnectionClose(): db.dispose(); return null; diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 7bea111..22cacf3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -171,4 +171,12 @@ class SqliteDatabaseImpl Future refreshSchema() { return _pool.refreshSchema(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return _pool.withAllConnections(block); + } } diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index bd2292b..0e360fc 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -8,7 +8,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'common/connection/sync_sqlite_connection.dart'; /// Abstract class representing calls available in a read-only or read-write context. -abstract class SqliteReadContext { +abstract interface class SqliteReadContext { /// Execute a read-only (SELECT) query and return the results. Future getAll(String sql, [List parameters = const []]); @@ -66,7 +66,7 @@ abstract class SqliteReadContext { } /// Abstract class representing calls available in a read-write context. -abstract class SqliteWriteContext extends SqliteReadContext { +abstract interface class SqliteWriteContext extends SqliteReadContext { /// Execute a write query (INSERT, UPDATE, DELETE) and return the results (if any). Future execute(String sql, [List parameters = const []]); @@ -75,13 +75,28 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// parameter set. This is faster than executing separately with each /// parameter set. Future executeBatch(String sql, List> parameterSets); + + /// Open a read-write transaction on this write context. + /// + /// When called on a [SqliteConnection], this takes a global lock - only one + /// write write transaction can execute against the database at a time. This + /// applies even when constructing separate [SqliteDatabase] instances for the + /// same database file. + /// + /// Statements within the transaction must be done on the provided + /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] + /// instance will error. + /// It is forbidden to use the [SqliteWriteContext] after the [callback] + /// completes. + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback); } /// Abstract class representing a connection to the SQLite database. /// /// This package typically pools multiple [SqliteConnection] instances into a /// managed [SqliteDatabase] automatically. -abstract class SqliteConnection extends SqliteWriteContext { +abstract interface class SqliteConnection extends SqliteWriteContext { /// Default constructor for subclasses. SqliteConnection(); @@ -123,6 +138,7 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] /// instance will error. + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}); diff --git a/packages/sqlite_async/lib/src/sqlite_queries.dart b/packages/sqlite_async/lib/src/sqlite_queries.dart index 80bc568..367d23f 100644 --- a/packages/sqlite_async/lib/src/sqlite_queries.dart +++ b/packages/sqlite_async/lib/src/sqlite_queries.dart @@ -107,7 +107,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) async { return writeLock((ctx) async { - return await internalWriteTransaction(ctx, callback); + return ctx.writeTransaction(callback); }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); } diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index c911bbc..542611e 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:convert'; +import 'package:sqlite3/common.dart'; + import '../sqlite_connection.dart'; Future internalReadTransaction(SqliteReadContext ctx, @@ -21,24 +23,6 @@ Future internalReadTransaction(SqliteReadContext ctx, } } -Future internalWriteTransaction(SqliteWriteContext ctx, - Future Function(SqliteWriteContext tx) callback) async { - try { - await ctx.execute('BEGIN IMMEDIATE'); - final result = await callback(ctx); - await ctx.execute('COMMIT'); - return result; - } catch (e) { - try { - await ctx.execute('ROLLBACK'); - } catch (e) { - // In rare cases, a ROLLBACK may fail. - // Safe to ignore. - } - rethrow; - } -} - /// Given a SELECT query, return the tables that the query depends on. Future> getSourceTablesText( SqliteReadContext ctx, String sql) async { @@ -93,3 +77,87 @@ Object? mapParameter(Object? parameter) { List mapParameters(List parameters) { return [for (var p in parameters) mapParameter(p)]; } + +extension ThrottledUpdates on CommonDatabase { + /// An unthrottled stream of updated tables that emits on every commit. + /// + /// A paused subscription on this stream will buffer changed tables into a + /// growing set instead of losing events, so this stream is simple to throttle + /// downstream. + Stream> get updatedTables { + final listeners = <_UpdateListener>[]; + var uncommitedUpdates = {}; + var underlyingSubscriptions = >[]; + + void handleUpdate(SqliteUpdate update) { + uncommitedUpdates.add(update.tableName); + } + + void afterCommit() { + for (final listener in listeners) { + listener.notify(uncommitedUpdates); + } + + uncommitedUpdates.clear(); + } + + void afterRollback() { + uncommitedUpdates.clear(); + } + + void addListener(_UpdateListener listener) { + listeners.add(listener); + + if (listeners.length == 1) { + // First listener, start listening for raw updates on underlying + // database. + underlyingSubscriptions = [ + updatesSync.listen(handleUpdate), + commits.listen((_) => afterCommit()), + commits.listen((_) => afterRollback()) + ]; + } + } + + void removeListener(_UpdateListener listener) { + listeners.remove(listener); + if (listeners.isEmpty) { + for (final sub in underlyingSubscriptions) { + sub.cancel(); + } + } + } + + return Stream.multi( + (listener) { + final wrapped = _UpdateListener(listener); + addListener(wrapped); + + listener.onResume = wrapped.addPending; + listener.onCancel = () => removeListener(wrapped); + }, + isBroadcast: true, + ); + } +} + +class _UpdateListener { + final MultiStreamController> downstream; + Set buffered = {}; + + _UpdateListener(this.downstream); + + void notify(Set pendingUpdates) { + buffered.addAll(pendingUpdates); + if (!downstream.isPaused) { + addPending(); + } + } + + void addPending() { + if (buffered.isNotEmpty) { + downstream.add(buffered); + buffered = {}; + } + } +} diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cd7e215..f2dc998 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -8,9 +8,9 @@ import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/web.dart'; +import '../impl/context.dart'; import 'protocol.dart'; import 'web_mutex.dart'; @@ -21,6 +21,9 @@ class WebDatabase final Mutex? _mutex; final bool profileQueries; + @override + final Stream updates; + /// For persistent databases that aren't backed by a shared worker, we use /// web broadcast channels to forward local update events to other tabs. final BroadcastUpdates? broadcastUpdates; @@ -32,6 +35,7 @@ class WebDatabase this._database, this._mutex, { required this.profileQueries, + required this.updates, this.broadcastUpdates, }); @@ -94,13 +98,9 @@ class WebDatabase Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () async { - final context = _SharedContext(this); - try { - return await callback(context); - } finally { - context.markClosed(); - } + return await mutex.lock(timeout: lockTimeout, () { + return ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); }); } else { // No custom mutex, coordinate locks through shared worker. @@ -108,7 +108,8 @@ class WebDatabase CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); try { - return await callback(_SharedContext(this)); + return await ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); } finally { await _database.customRequest( CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); @@ -116,39 +117,33 @@ class WebDatabase } } - @override - Stream get updates => - _database.updates.map((event) => UpdateNotification({event.tableName})); - @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock( - (writeContext) => - internalWriteTransaction(writeContext, (context) async { - // All execute calls done in the callback will be checked for the - // autocommit state - return callback(_ExclusiveTransactionContext(this, writeContext)); - }), + return writeLock((writeContext) { + return ScopedWriteContext.assumeWriteLock( + _UnscopedContext(this), + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); + }, debugContext: 'writeTransaction()', lockTimeout: lockTimeout, flush: flush); } @override - - /// Internal writeLock which intercepts transaction context's to verify auto commit is not active Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -158,11 +153,10 @@ class WebDatabase // No custom mutex, coordinate locks through shared worker. await _database.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -177,19 +171,26 @@ class WebDatabase await isInitialized; return _database.fileSystem.flush(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(this, [])); + } } -class _SharedContext implements SqliteReadContext { +final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; - bool _contextClosed = false; final TimelineTask? _task; - _SharedContext(this._database) + _UnscopedContext(this._database) : _task = _database.profileQueries ? TimelineTask() : null; @override - bool get closed => _contextClosed || _database.closed; + bool get closed => _database.closed; @override Future computeWithDatabase( @@ -230,14 +231,6 @@ class _SharedContext implements SqliteReadContext { return results.firstOrNull; } - void markClosed() { - _contextClosed = true; - } -} - -class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { - _ExclusiveContext(super.database); - @override Future execute(String sql, [List parameters = const []]) { return _task.timeAsync('execute', sql: sql, parameters: parameters, () { @@ -258,15 +251,17 @@ class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { }); }); } -} - -class _ExclusiveTransactionContext extends _ExclusiveContext { - SqliteWriteContext baseContext; - - _ExclusiveTransactionContext(super.database, this.baseContext); @override - bool get closed => baseContext.closed; + UnscopedContext interceptOutermostTransaction() { + // All execute calls done in the callback will be checked for the + // autocommit state + return _ExclusiveTransactionContext(_database); + } +} + +final class _ExclusiveTransactionContext extends _UnscopedContext { + _ExclusiveTransactionContext(super._database); Future _executeInternal( String sql, List parameters) async { diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index c6d1b75..69f01ab 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -178,4 +178,12 @@ class SqliteDatabaseImpl Future exposeEndpoint() async { return await _connection.exposeEndpoint(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(_connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index cb3a5fd..d17c06b 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -13,6 +13,8 @@ enum CustomDatabaseMessageKind { getAutoCommit, executeInTransaction, executeBatchInTransaction, + updateSubscriptionManagement, + notifyUpdates, } extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart new file mode 100644 index 0000000..f04a785 --- /dev/null +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -0,0 +1,60 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:sqlite3_web/sqlite3_web.dart'; + +import '../update_notification.dart'; +import 'protocol.dart'; + +/// Utility to request a stream of update notifications from the worker. +/// +/// Because we want to debounce update notifications on the worker, we're using +/// custom requests instead of the default [Database.updates] stream. +/// +/// Clients send a message to the worker to subscribe or unsubscribe, providing +/// an id for the subscription. The worker distributes update notifications with +/// custom requests to the client, which [handleRequest] distributes to the +/// original streams. +final class UpdateNotificationStreams { + var _idCounter = 0; + final Map> _updates = {}; + + Future handleRequest(JSAny? request) async { + final customRequest = request as CustomDatabaseMessage; + if (customRequest.kind == CustomDatabaseMessageKind.notifyUpdates) { + final notification = UpdateNotification(customRequest.rawParameters.toDart + .map((e) => (e as JSString).toDart) + .toSet()); + + final controller = _updates[customRequest.rawSql.toDart]; + controller?.add(notification); + } + + return null; + } + + Stream updatesFor(Database database) { + final id = (_idCounter++).toString(); + final controller = _updates[id] = StreamController(); + + controller + ..onListen = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [true], + )); + } + ..onCancel = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [false], + )); + + _updates.remove(id); + }; + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index a724329..513b1f2 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -10,7 +10,7 @@ import 'package:sqlite_async/web.dart'; import 'database.dart'; import 'worker/worker_utils.dart'; -Map> webSQLiteImplementations = {}; +Map> _webSQLiteImplementations = {}; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -20,13 +20,13 @@ class DefaultSqliteOpenFactory final cacheKey = sqliteOptions.webSqliteOptions.wasmUri + sqliteOptions.webSqliteOptions.workerUri; - if (webSQLiteImplementations.containsKey(cacheKey)) { - return webSQLiteImplementations[cacheKey]!; + if (_webSQLiteImplementations.containsKey(cacheKey)) { + return _webSQLiteImplementations[cacheKey]!; } - webSQLiteImplementations[cacheKey] = + _webSQLiteImplementations[cacheKey] = openWebSqlite(sqliteOptions.webSqliteOptions); - return webSQLiteImplementations[cacheKey]!; + return _webSQLiteImplementations[cacheKey]!; }); DefaultSqliteOpenFactory( @@ -42,6 +42,7 @@ class DefaultSqliteOpenFactory wasmModule: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), worker: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), controller: AsyncSqliteController(), + handleCustomRequest: handleCustomRequest, ); } @@ -69,15 +70,19 @@ class DefaultSqliteOpenFactory ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier - BroadcastUpdates? updates; + BroadcastUpdates? broadcastUpdates; if (connection.access != AccessMode.throughSharedWorker && connection.storage != StorageMode.inMemory) { - updates = BroadcastUpdates(path); + broadcastUpdates = BroadcastUpdates(path); } - return WebDatabase(connection.database, options.mutex ?? mutex, - broadcastUpdates: updates, - profileQueries: sqliteOptions.profileQueries); + return WebDatabase( + connection.database, + options.mutex ?? mutex, + broadcastUpdates: broadcastUpdates, + profileQueries: sqliteOptions.profileQueries, + updates: updatesFor(connection.database), + ); } @override diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart deleted file mode 100644 index 5f33b6d..0000000 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ /dev/null @@ -1,187 +0,0 @@ -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_wasm.dart'; - -/// Wrap a CommonDatabase to throttle its updates stream. -/// This is so that we can throttle the updates _within_ -/// the worker process, avoiding mass notifications over -/// the MessagePort. -class ThrottledCommonDatabase extends CommonDatabase { - final CommonDatabase _db; - final StreamController _transactionController = - StreamController.broadcast(); - - ThrottledCommonDatabase(this._db); - - @override - int get userVersion => _db.userVersion; - - @override - set userVersion(int userVersion) { - _db.userVersion = userVersion; - } - - @override - bool get autocommit => _db.autocommit; - - @override - DatabaseConfig get config => _db.config; - - @override - void createAggregateFunction( - {required String functionName, - required AggregateFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true}) { - _db.createAggregateFunction(functionName: functionName, function: function); - } - - @override - void createCollation( - {required String name, required CollatingFunction function}) { - _db.createCollation(name: name, function: function); - } - - @override - void createFunction( - {required String functionName, - required ScalarFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true}) { - _db.createFunction(functionName: functionName, function: function); - } - - @override - void dispose() { - _db.dispose(); - } - - @override - void execute(String sql, [List parameters = const []]) { - _db.execute(sql, parameters); - } - - @override - int getUpdatedRows() { - // ignore: deprecated_member_use - return _db.getUpdatedRows(); - } - - @override - int get lastInsertRowId => _db.lastInsertRowId; - - @override - CommonPreparedStatement prepare(String sql, - {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { - return _db.prepare(sql, - persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); - } - - @override - List prepareMultiple(String sql, - {bool persistent = false, bool vtab = true}) { - return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); - } - - @override - ResultSet select(String sql, [List parameters = const []]) { - bool preAutocommit = _db.autocommit; - final result = _db.select(sql, parameters); - bool postAutocommit = _db.autocommit; - if (!preAutocommit && postAutocommit) { - _transactionController.add(true); - } - return result; - } - - @override - int get updatedRows => _db.updatedRows; - - @override - Stream get updates { - return throttledUpdates(_db, _transactionController.stream); - } - - @override - VoidPredicate? get commitFilter => _db.commitFilter; - - @override - set commitFilter(VoidPredicate? filter) => _db.commitFilter = filter; - - @override - Stream get commits => _db.commits; - - @override - Stream get rollbacks => _db.rollbacks; -} - -/// This throttles the database update stream to: -/// 1. Trigger max once every 1ms. -/// 2. Only trigger _after_ transactions. -Stream throttledUpdates( - CommonDatabase source, Stream transactionStream) { - StreamController? controller; - Set pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; - } - - if (!source.autocommit) { - // Inside a transaction - do not fire updates - return; - } - - if (pendingUpdates.isNotEmpty) { - for (var update in pendingUpdates) { - controller!.add(update); - } - - pendingUpdates.clear(); - } - } - - void collectUpdate(SqliteUpdate event) { - // We merge updates with the same kind and tableName. - // rowId is never used in sqlite_async. - pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); - - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - } - - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = transactionStream.listen((event) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 3ecb257..059c281 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; @@ -6,8 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; - -import 'throttled_common_database.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import '../protocol.dart'; @@ -22,13 +22,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - final throttled = ThrottledCommonDatabase(db); - - return AsyncSqliteDatabase(database: throttled); + return AsyncSqliteDatabase(database: db); } - /// Opens a database with the `sqlite3` package that will be wrapped in a - /// [ThrottledCommonDatabase] for [openDatabase]. @visibleForOverriding CommonDatabase openUnderlying( WasmSqlite3 sqlite3, @@ -51,6 +47,7 @@ base class AsyncSqliteController extends DatabaseController { class AsyncSqliteDatabase extends WorkerDatabase { @override final CommonDatabase database; + final Stream> _updates; // This mutex is only used for lock requests from clients. Clients only send // these requests for shared workers, so we can assume each database is only @@ -58,7 +55,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final mutex = ReadWriteMutex(); final Map _state = {}; - AsyncSqliteDatabase({required this.database}); + AsyncSqliteDatabase({required this.database}) + : _updates = database.updatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -67,9 +65,15 @@ class AsyncSqliteDatabase extends WorkerDatabase { void _markHoldsMutex(ClientConnection connection) { final state = _findState(connection); state.holdsMutex = true; + _registerCloseListener(state, connection); + } + + void _registerCloseListener( + _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { state.hasOnCloseListener = true; connection.closed.then((_) { + state.unsubscribeUpdates(); if (state.holdsMutex) { mutex.release(); } @@ -93,6 +97,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { _findState(connection).holdsMutex = false; mutex.release(); case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; @@ -129,6 +134,27 @@ class AsyncSqliteDatabase extends WorkerDatabase { "Transaction rolled back by earlier statement. Cannot execute: $sql"); } database.execute(sql, parameters); + case CustomDatabaseMessageKind.updateSubscriptionManagement: + final shouldSubscribe = + (message.rawParameters.toDart[0] as JSBoolean).toDart; + final id = message.rawSql.toDart; + final state = _findState(connection); + + if (shouldSubscribe) { + state.unsubscribeUpdates(); + _registerCloseListener(state, connection); + + late StreamSubscription subscription; + subscription = state.updatesNotification = _updates.listen((tables) { + subscription.pause(connection.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.notifyUpdates, + id, + tables.toList(), + ))); + }); + } else { + state.unsubscribeUpdates(); + } } return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); @@ -148,4 +174,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { final class _ConnectionState { bool hasOnCloseListener = false; bool holdsMutex = false; + StreamSubscription>? updatesNotification; + + void unsubscribeUpdates() { + if (updatesNotification case final active?) { + updatesNotification = null; + active.cancel(); + } + } } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index 3a65115..e318697 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -4,12 +4,15 @@ /// workers. library; +import 'dart:js_interop'; + import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:web/web.dart'; import 'sqlite3_common.dart'; import 'sqlite_async.dart'; import 'src/web/database.dart'; +import 'src/web/update_notifications.dart'; /// An endpoint that can be used, by any running JavaScript context in the same /// website, to connect to an existing [WebSqliteConnection]. @@ -26,6 +29,8 @@ typedef WebDatabaseEndpoint = ({ String? lockName, }); +final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + /// An additional interface for [SqliteOpenFactory] exposing additional /// functionality that is only relevant when compiling to the web. /// @@ -33,6 +38,11 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { + /// Handles a custom request sent from the worker to the client. + Future handleCustomRequest(JSAny? request) { + return _updateStreams.handleRequest(request); + } + /// Opens a [WebSqlite] instance for the given [options]. /// /// This method can be overriden in scenarios where the way [WebSqlite] is @@ -43,6 +53,7 @@ abstract mixin class WebSqliteOpenFactory return WebSqlite.open( worker: Uri.parse(options.workerUri), wasmModule: Uri.parse(options.wasmUri), + handleCustomRequest: handleCustomRequest, ); } @@ -54,6 +65,14 @@ abstract mixin class WebSqliteOpenFactory WebSqlite sqlite, String name) { return sqlite.connectToRecommended(name); } + + /// Obtains a stream of [UpdateNotification]s from a [database]. + /// + /// The default implementation uses custom requests to allow workers to + /// debounce the stream on their side to avoid messages where possible. + Stream updatesFor(Database database) { + return _updateStreams.updatesFor(database); + } } /// A [SqliteConnection] interface implemented by opened connections when @@ -85,8 +104,11 @@ abstract class WebSqliteConnection implements SqliteConnection { /// contexts to exchange opened database connections. static Future connectToEndpoint( WebDatabaseEndpoint endpoint) async { + final updates = UpdateNotificationStreams(); final rawSqlite = await WebSqlite.connectToPort( - (endpoint.connectPort, endpoint.connectName)); + (endpoint.connectPort, endpoint.connectName), + handleCustomRequest: updates.handleRequest, + ); final database = WebDatabase( rawSqlite, @@ -95,6 +117,7 @@ abstract class WebSqliteConnection implements SqliteConnection { null => null, }, profileQueries: false, + updates: updates.updatesFor(rawSqlite), ); return database; } diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index bb27d60..3d5130d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.7 +version: 0.12.2 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.7.2 - sqlite3_web: ^0.3.0 + sqlite3: ^2.9.0 + sqlite3_web: ^0.3.2 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index fb98e17..e2914b3 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm'); +const _isWeb = identical(0, 0.0) || _isDart2Wasm; void main() { group('Shared Basic Tests', () { @@ -122,7 +123,7 @@ void main() { ['Test Data']); expect(rs.rows[0], equals(['Test Data'])); }); - expect(await savedTx!.getAutoCommit(), equals(true)); + expect(await db.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); @@ -189,6 +190,73 @@ void main() { ); }); + group('nested transaction', () { + const insert = 'INSERT INTO test_data (description) VALUES(?);'; + late SqliteDatabase db; + + setUp(() async { + db = await testUtils.setupDatabase(path: path); + await createTables(db); + }); + + tearDown(() => db.close()); + + test('run in outer transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('can rollback inner transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + await expectLater(() async { + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['third']); + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(3)); + throw 'rollback please'; + }); + }, throwsA(anything)); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('cannot use outer transaction while inner is active', () async { + await db.writeTransaction((outer) async { + await outer.writeTransaction((inner) async { + await expectLater(outer.execute('SELECT 1'), throwsStateError); + }); + }); + }); + + test('cannot use inner after leaving scope', () async { + await db.writeTransaction((tx) async { + late SqliteWriteContext inner; + await tx.writeTransaction((tx) async { + inner = tx; + }); + + await expectLater(inner.execute('SELECT 1'), throwsStateError); + }); + }); + }); + test('can use raw database instance', () async { final factory = await testUtils.testFactory(); final raw = await factory.openDatabaseForSingleConnection(); @@ -234,6 +302,49 @@ void main() { 'Web locks are managed with a shared worker, which does not support timeouts', ) }); + + test('with all connections', () async { + final maxReaders = _isWeb ? 0 : 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + // Warm up to spawn the max readers + await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + final parentZone = Zone.current; + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + // Need a root zone here to avoid recursive lock errors. + readsCalledWhileWithAllConnsRunning = + Future(parentZone.bindCallback(() async { + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + await Future.delayed(const Duration(milliseconds: 100)); + }); + }), + ); + })); + + await Future.delayed(const Duration(milliseconds: 200)); + finishedWithAllConns = true; + }); + + await readsCalledWhileWithAllConnsRunning; + }); }); } diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index dec1fed..3f348e6 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -2,12 +2,16 @@ library; import 'dart:async'; +import 'dart:io'; import 'dart:math'; +import 'package:collection/collection.dart'; +import 'package:path/path.dart' show join; import 'package:sqlite3/common.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; +import '../utils/abstract_test_utils.dart'; import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); @@ -100,6 +104,126 @@ void main() { print("${DateTime.now()} done"); }); + test('prevent opening new readers while in withAllConnections', () async { + final sharedStateDir = Directory.systemTemp.createTempSync(); + addTearDown(() => sharedStateDir.deleteSync(recursive: true)); + + final File sharedStateFile = + File(join(sharedStateDir.path, 'shared-state.txt')); + + sharedStateFile.writeAsStringSync('initial'); + + final db = SqliteDatabase.withFactory( + _TestSqliteOpenFactoryWithSharedStateFile( + path: path, sharedStateFilePath: sharedStateFile.path), + maxReaders: 3); + await db.initialize(); + await createTables(db); + + // The writer saw 'initial' in the file when opening the connection + expect( + await db + .writeLock((c) => c.get('SELECT file_contents_on_open() AS state')), + {'state': 'initial'}, + ); + + final withAllConnectionsCompleter = Completer(); + + final withAllConnsFut = db.withAllConnections((writer, readers) async { + expect(readers.length, 0); // No readers yet + + // Simulate some work until the file is updated + await Future.delayed(const Duration(milliseconds: 200)); + sharedStateFile.writeAsStringSync('updated'); + + await withAllConnectionsCompleter.future; + }); + + // Start a reader that gets the contents of the shared file + bool readFinished = false; + final someReadFut = + db.get('SELECT file_contents_on_open() AS state', []).then((r) { + readFinished = true; + return r; + }); + + // The withAllConnections should prevent the reader from opening + await Future.delayed(const Duration(milliseconds: 100)); + expect(readFinished, isFalse); + + // Free all the locks + withAllConnectionsCompleter.complete(); + await withAllConnsFut; + + final readerInfo = await someReadFut; + expect(readFinished, isTrue); + // The read should see the updated value in the file. This checks + // that a reader doesn't spawn while running withAllConnections + expect(readerInfo, {'state': 'updated'}); + }); + + test('with all connections', () async { + final maxReaders = 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + Future readWithRandomDelay( + SqliteReadContext ctx, int id) async { + return await ctx.get( + 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection', + [id, 5 + Random().nextInt(10)]); + } + + // Warm up to spawn the max readers + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)), + ); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + print("${DateTime.now()} start"); + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + readsCalledWhileWithAllConnsRunning = Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + final r = await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + return await readWithRandomDelay(c, i); + }); + print( + "${DateTime.now()} After withAllConnections, started while running $r"); + }), + ); + + await Future.wait([ + writer.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + 123, + 5 + Random().nextInt(20) + ]).then((value) => + print("${DateTime.now()} withAllConnections writer done $value")), + ...readers + .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) { + print( + "${DateTime.now()} withAllConnections readers done $results"); + })) + ]); + }).then((_) => finishedWithAllConns = true); + + await readsCalledWhileWithAllConnsRunning; + }); + test('read-only transactions', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); @@ -379,3 +503,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory { ]; } } + +class _TestSqliteOpenFactoryWithSharedStateFile + extends TestDefaultSqliteOpenFactory { + final String sharedStateFilePath; + + _TestSqliteOpenFactoryWithSharedStateFile( + {required super.path, required this.sharedStateFilePath}); + + @override + sqlite.CommonDatabase open(SqliteOpenOptions options) { + final File sharedStateFile = File(sharedStateFilePath); + final String sharedState = sharedStateFile.readAsStringSync(); + + final db = super.open(options); + + // Function to return the contents of the shared state file at the time of opening + // so that we know at which point the factory was called. + db.createFunction( + functionName: 'file_contents_on_open', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + return sharedState; + }, + ); + + return db; + } +} diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 4e4fb83..0a97b17 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -7,6 +7,7 @@ import 'dart:math'; import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:test/test.dart'; import '../utils/test_utils_impl.dart'; @@ -31,6 +32,51 @@ void main() { return db; }); + test('raw update notifications', () async { + final factory = await testUtils.testFactory(path: path); + final db = factory + .openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + + db.execute('CREATE TABLE a (bar INTEGER);'); + db.execute('CREATE TABLE b (bar INTEGER);'); + final events = >[]; + final subscription = db.updatedTables.listen(events.add); + + db.execute('insert into a default values'); + expect(events, isEmpty); // should be async + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('insert into b default values'); + await pumpEventQueue(); + expect(events, isEmpty); // should only trigger on commit + db.execute('commit'); + + await pumpEventQueue(); + expect(events.removeLast(), {'a', 'b'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('rollback'); + expect(events, isEmpty); + await pumpEventQueue(); + expect(events, isEmpty); // should ignore cancelled transactions + + // Should still listen during pause, and dispatch on resume + subscription.pause(); + db.execute('insert into a default values'); + await pumpEventQueue(); + expect(events, isEmpty); + + subscription.resume(); + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + subscription.pause(); + }); + test('watch in isolate', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); diff --git a/scripts/sqlite3_wasm_download.dart b/scripts/sqlite3_wasm_download.dart index 62acbbe..9716eee 100644 --- a/scripts/sqlite3_wasm_download.dart +++ b/scripts/sqlite3_wasm_download.dart @@ -4,7 +4,7 @@ library; import 'dart:io'; final sqliteUrl = - 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.4.3/sqlite3.wasm'; + 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.8.0/sqlite3.wasm'; void main() async { // Create assets directory if it doesn't exist