From e5d31b2d17c21c278b54d55608e0e940ef026f35 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 13 Jun 2025 17:59:49 +0200 Subject: [PATCH 01/29] Support nested transactions --- .../connection/sync_sqlite_connection.dart | 22 ++- .../sqlite_async/lib/src/impl/context.dart | 186 ++++++++++++++++++ .../native_sqlite_connection_impl.dart | 13 +- .../lib/src/sqlite_connection.dart | 22 ++- .../sqlite_async/lib/src/sqlite_queries.dart | 2 +- .../lib/src/utils/shared_utils.dart | 18 -- .../sqlite_async/lib/src/web/database.dart | 67 +++---- 7 files changed, 251 insertions(+), 79 deletions(-) create mode 100644 packages/sqlite_async/lib/src/impl/context.dart diff --git a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart index d84bdaa..f63e0df 100644 --- a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart @@ -8,9 +8,11 @@ import 'package:sqlite_async/src/sqlite_queries.dart'; import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; +import '../../impl/context.dart'; + /// A simple "synchronous" connection which provides the async SqliteConnection /// implementation using a synchronous SQLite connection -class SyncSqliteConnection extends SqliteConnection with SqliteQueries { +class SyncSqliteConnection with SqliteQueries implements SqliteConnection { final CommonDatabase db; late Mutex mutex; @override @@ -44,7 +46,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncReadContext(db, parent: task)); + return ScopedReadContext.assumeReadLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -59,7 +64,10 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { return mutex.lock( () { task?.finish(); - return callback(SyncWriteContext(db, parent: task)); + return ScopedWriteContext.assumeWriteLock( + _UnsafeSyncContext(db, parent: task), + callback, + ); }, timeout: lockTimeout, ); @@ -80,12 +88,12 @@ class SyncSqliteConnection extends SqliteConnection with SqliteQueries { } } -class SyncReadContext implements SqliteReadContext { +final class _UnsafeSyncContext extends UnscopedContext { final TimelineTask? task; CommonDatabase db; - SyncReadContext(this.db, {TimelineTask? parent}) + _UnsafeSyncContext(this.db, {TimelineTask? parent}) : task = TimelineTask(parent: parent); @override @@ -129,10 +137,6 @@ class SyncReadContext implements SqliteReadContext { Future getAutoCommit() async { return db.autocommit; } -} - -class SyncWriteContext extends SyncReadContext implements SqliteWriteContext { - SyncWriteContext(super.db, {super.parent}); @override Future execute(String sql, diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart new file mode 100644 index 0000000..8a06998 --- /dev/null +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -0,0 +1,186 @@ +import 'package:sqlite3/common.dart'; + +import '../sqlite_connection.dart'; + +abstract base class UnscopedContext implements SqliteReadContext { + Future execute(String sql, List parameters); + Future executeBatch(String sql, List> parameterSets); + + /// Returns an [UnscopedContext] useful as the outermost transaction. + /// + /// This is called by [ScopedWriteContext.writeTransaction] _after_ executing + /// the first `BEGIN` statement. + /// This is used on the web to assert that the auto-commit state is false + /// before running statements. + UnscopedContext interceptOutermostTransaction() { + return this; + } +} + +final class ScopedReadContext implements SqliteReadContext { + final UnscopedContext _context; + + /// Whether this context view is locked on an inner operation like a + /// transaction. + /// + /// We don't use a mutex because we don't want to serialize access - we just + /// want to forbid concurrent operations. + bool _isLocked = false; + + /// Whether this particular view of a read context has been closed, e.g. + /// because the callback owning it has returned. + bool _closed = false; + + ScopedReadContext(this._context); + + void _checkNotLocked() { + _checkStillOpen(); + + if (_isLocked) { + throw StateError( + 'The context from the callback was locked, e.g. due to a nested ' + 'transaction.'); + } + } + + void _checkStillOpen() { + if (_closed) { + throw StateError('This context to a callback is no longer open. ' + 'Make sure to await all statements on a database to avoid a context ' + 'still being used after its callback has finished.'); + } + } + + @override + bool get closed => _closed || _context.closed; + + @override + Future computeWithDatabase( + Future Function(CommonDatabase db) compute) async { + _checkNotLocked(); + return await _context.computeWithDatabase(compute); + } + + @override + Future get(String sql, [List parameters = const []]) async { + _checkNotLocked(); + final rows = await getAll(sql, parameters); + return rows.first; + } + + @override + Future getAll(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.getAll(sql, parameters); + } + + @override + Future getAutoCommit() async { + _checkStillOpen(); + return _context.getAutoCommit(); + } + + @override + Future getOptional(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + final rows = await getAll(sql, parameters); + return rows.firstOrNull; + } + + void invalidate() => _closed = true; + + static Future assumeReadLock( + UnscopedContext unsafe, + Future Function(SqliteReadContext) callback, + ) async { + final scoped = ScopedReadContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} + +final class ScopedWriteContext extends ScopedReadContext + implements SqliteWriteContext { + /// The "depth" of virtual nested transaction. + /// + /// A value of `0` indicates that this is operating outside of a transaction. + /// A value of `1` indicates a regular transaction (guarded with `BEGIN` and + /// `COMMIT` statements). + /// All higher values indicate a nested transaction implemented with + /// savepoint statements. + final int transactionDepth; + + ScopedWriteContext(super._context, {this.transactionDepth = 0}); + + @override + Future execute(String sql, + [List parameters = const []]) async { + _checkNotLocked(); + return await _context.execute(sql, parameters); + } + + @override + Future executeBatch( + String sql, List> parameterSets) async { + _checkNotLocked(); + + return await _context.executeBatch(sql, parameterSets); + } + + @override + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback) async { + _checkNotLocked(); + final (begin, commit, rollback) = _beginCommitRollback(transactionDepth); + ScopedWriteContext? inner; + + try { + _isLocked = true; + + await _context.execute(begin, const []); + inner = + ScopedWriteContext(_context, transactionDepth: transactionDepth + 1); + final result = await callback(inner); + await _context.execute(commit, const []); + return result; + } catch (e) { + try { + await _context.execute(rollback, const []); + } catch (e) { + // In rare cases, a ROLLBACK may fail. + // Safe to ignore. + } + rethrow; + } finally { + inner?.invalidate(); + } + } + + static (String, String, String) _beginCommitRollback(int level) { + return switch (level) { + 0 => ('BEGIN IMMEDIATE', 'COMMIT', 'ROLLBACK'), + final nested => ( + 'SAVEPOINT s$nested', + 'RELEASE s$nested', + 'ROLLBACK TO s$nested' + ) + }; + } + + static Future assumeWriteLock( + UnscopedContext unsafe, + Future Function(SqliteWriteContext) callback, + ) async { + final scoped = ScopedWriteContext(unsafe); + try { + return await callback(scoped); + } finally { + scoped.invalidate(); + } + } +} diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 38c184e..65f9db8 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -14,6 +14,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/utils/shared_utils.dart'; +import '../../impl/context.dart'; import 'upstream_updates.dart'; typedef TxCallback = Future Function(CommonDatabase db); @@ -64,8 +65,8 @@ class SqliteConnectionImpl return _isolateClient.closed; } - _TransactionContext _context() { - return _TransactionContext( + _UnsafeContext _context() { + return _UnsafeContext( _isolateClient, profileQueries ? TimelineTask() : null); } @@ -137,7 +138,7 @@ class SqliteConnectionImpl return _connectionMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return ScopedReadContext.assumeReadLock(ctx, callback); } finally { await ctx.close(); } @@ -160,7 +161,7 @@ class SqliteConnectionImpl return await _writeMutex.lock(() async { final ctx = _context(); try { - return await callback(ctx); + return ScopedWriteContext.assumeWriteLock(ctx, callback); } finally { await ctx.close(); } @@ -177,14 +178,14 @@ class SqliteConnectionImpl int _nextCtxId = 1; -class _TransactionContext implements SqliteWriteContext { +final class _UnsafeContext extends UnscopedContext { final PortClient _sendPort; bool _closed = false; final int ctxId = _nextCtxId++; final TimelineTask? task; - _TransactionContext(this._sendPort, this.task); + _UnsafeContext(this._sendPort, this.task); @override bool get closed { diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index bd2292b..0e360fc 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -8,7 +8,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'common/connection/sync_sqlite_connection.dart'; /// Abstract class representing calls available in a read-only or read-write context. -abstract class SqliteReadContext { +abstract interface class SqliteReadContext { /// Execute a read-only (SELECT) query and return the results. Future getAll(String sql, [List parameters = const []]); @@ -66,7 +66,7 @@ abstract class SqliteReadContext { } /// Abstract class representing calls available in a read-write context. -abstract class SqliteWriteContext extends SqliteReadContext { +abstract interface class SqliteWriteContext extends SqliteReadContext { /// Execute a write query (INSERT, UPDATE, DELETE) and return the results (if any). Future execute(String sql, [List parameters = const []]); @@ -75,13 +75,28 @@ abstract class SqliteWriteContext extends SqliteReadContext { /// parameter set. This is faster than executing separately with each /// parameter set. Future executeBatch(String sql, List> parameterSets); + + /// Open a read-write transaction on this write context. + /// + /// When called on a [SqliteConnection], this takes a global lock - only one + /// write write transaction can execute against the database at a time. This + /// applies even when constructing separate [SqliteDatabase] instances for the + /// same database file. + /// + /// Statements within the transaction must be done on the provided + /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] + /// instance will error. + /// It is forbidden to use the [SqliteWriteContext] after the [callback] + /// completes. + Future writeTransaction( + Future Function(SqliteWriteContext tx) callback); } /// Abstract class representing a connection to the SQLite database. /// /// This package typically pools multiple [SqliteConnection] instances into a /// managed [SqliteDatabase] automatically. -abstract class SqliteConnection extends SqliteWriteContext { +abstract interface class SqliteConnection extends SqliteWriteContext { /// Default constructor for subclasses. SqliteConnection(); @@ -123,6 +138,7 @@ abstract class SqliteConnection extends SqliteWriteContext { /// Statements within the transaction must be done on the provided /// [SqliteWriteContext] - attempting statements on the [SqliteConnection] /// instance will error. + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}); diff --git a/packages/sqlite_async/lib/src/sqlite_queries.dart b/packages/sqlite_async/lib/src/sqlite_queries.dart index 80bc568..367d23f 100644 --- a/packages/sqlite_async/lib/src/sqlite_queries.dart +++ b/packages/sqlite_async/lib/src/sqlite_queries.dart @@ -107,7 +107,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection { Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout}) async { return writeLock((ctx) async { - return await internalWriteTransaction(ctx, callback); + return ctx.writeTransaction(callback); }, lockTimeout: lockTimeout, debugContext: 'writeTransaction()'); } diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index c911bbc..9faf928 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -21,24 +21,6 @@ Future internalReadTransaction(SqliteReadContext ctx, } } -Future internalWriteTransaction(SqliteWriteContext ctx, - Future Function(SqliteWriteContext tx) callback) async { - try { - await ctx.execute('BEGIN IMMEDIATE'); - final result = await callback(ctx); - await ctx.execute('COMMIT'); - return result; - } catch (e) { - try { - await ctx.execute('ROLLBACK'); - } catch (e) { - // In rare cases, a ROLLBACK may fail. - // Safe to ignore. - } - rethrow; - } -} - /// Given a SELECT query, return the tables that the query depends on. Future> getSourceTablesText( SqliteReadContext ctx, String sql) async { diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cd7e215..0464ca0 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -8,9 +8,9 @@ import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; -import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; import 'package:sqlite_async/web.dart'; +import '../impl/context.dart'; import 'protocol.dart'; import 'web_mutex.dart'; @@ -95,12 +95,8 @@ class WebDatabase {Duration? lockTimeout, String? debugContext}) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _SharedContext(this); - try { - return await callback(context); - } finally { - context.markClosed(); - } + return ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); }); } else { // No custom mutex, coordinate locks through shared worker. @@ -108,7 +104,8 @@ class WebDatabase CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); try { - return await callback(_SharedContext(this)); + return ScopedReadContext.assumeReadLock( + _UnscopedContext(this), callback); } finally { await _database.customRequest( CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); @@ -125,30 +122,24 @@ class WebDatabase Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock( - (writeContext) => - internalWriteTransaction(writeContext, (context) async { - // All execute calls done in the callback will be checked for the - // autocommit state - return callback(_ExclusiveTransactionContext(this, writeContext)); - }), + return writeLock((writeContext) { + return ScopedWriteContext.assumeWriteLock( + _UnscopedContext(this), callback); + }, debugContext: 'writeTransaction()', lockTimeout: lockTimeout, flush: flush); } @override - - /// Internal writeLock which intercepts transaction context's to verify auto commit is not active Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -158,11 +149,10 @@ class WebDatabase // No custom mutex, coordinate locks through shared worker. await _database.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _ExclusiveContext(this); + final context = _UnscopedContext(this); try { - return await callback(context); + return await ScopedWriteContext.assumeWriteLock(context, callback); } finally { - context.markClosed(); if (flush != false) { await this.flush(); } @@ -179,17 +169,16 @@ class WebDatabase } } -class _SharedContext implements SqliteReadContext { +final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; - bool _contextClosed = false; final TimelineTask? _task; - _SharedContext(this._database) + _UnscopedContext(this._database) : _task = _database.profileQueries ? TimelineTask() : null; @override - bool get closed => _contextClosed || _database.closed; + bool get closed => _database.closed; @override Future computeWithDatabase( @@ -230,14 +219,6 @@ class _SharedContext implements SqliteReadContext { return results.firstOrNull; } - void markClosed() { - _contextClosed = true; - } -} - -class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { - _ExclusiveContext(super.database); - @override Future execute(String sql, [List parameters = const []]) { return _task.timeAsync('execute', sql: sql, parameters: parameters, () { @@ -258,15 +239,17 @@ class _ExclusiveContext extends _SharedContext implements SqliteWriteContext { }); }); } -} - -class _ExclusiveTransactionContext extends _ExclusiveContext { - SqliteWriteContext baseContext; - - _ExclusiveTransactionContext(super.database, this.baseContext); @override - bool get closed => baseContext.closed; + UnscopedContext interceptOutermostTransaction() { + // All execute calls done in the callback will be checked for the + // autocommit state + return _ExclusiveTransactionContext(_database); + } +} + +final class _ExclusiveTransactionContext extends _UnscopedContext { + _ExclusiveTransactionContext(super._database); Future _executeInternal( String sql, List parameters) async { From 8212a2323931e6738044b5a2d82c8124be3e9af0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 16 Jun 2025 11:44:29 +0200 Subject: [PATCH 02/29] Fix tests --- packages/sqlite_async/lib/src/impl/context.dart | 13 +++++++++---- .../database/native_sqlite_connection_impl.dart | 4 ++-- packages/sqlite_async/lib/src/web/database.dart | 10 +++++++--- packages/sqlite_async/test/basic_test.dart | 2 +- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart index 8a06998..e568b33 100644 --- a/packages/sqlite_async/lib/src/impl/context.dart +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -139,18 +139,23 @@ final class ScopedWriteContext extends ScopedReadContext final (begin, commit, rollback) = _beginCommitRollback(transactionDepth); ScopedWriteContext? inner; + final innerContext = transactionDepth == 0 + ? _context.interceptOutermostTransaction() + : _context; + try { _isLocked = true; await _context.execute(begin, const []); - inner = - ScopedWriteContext(_context, transactionDepth: transactionDepth + 1); + + inner = ScopedWriteContext(innerContext, + transactionDepth: transactionDepth + 1); final result = await callback(inner); - await _context.execute(commit, const []); + await innerContext.execute(commit, const []); return result; } catch (e) { try { - await _context.execute(rollback, const []); + await innerContext.execute(rollback, const []); } catch (e) { // In rare cases, a ROLLBACK may fail. // Safe to ignore. diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 65f9db8..e2df0f3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -138,7 +138,7 @@ class SqliteConnectionImpl return _connectionMutex.lock(() async { final ctx = _context(); try { - return ScopedReadContext.assumeReadLock(ctx, callback); + return await ScopedReadContext.assumeReadLock(ctx, callback); } finally { await ctx.close(); } @@ -161,7 +161,7 @@ class SqliteConnectionImpl return await _writeMutex.lock(() async { final ctx = _context(); try { - return ScopedWriteContext.assumeWriteLock(ctx, callback); + return await ScopedWriteContext.assumeWriteLock(ctx, callback); } finally { await ctx.close(); } diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 0464ca0..3e0797b 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -94,7 +94,7 @@ class WebDatabase Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () async { + return await mutex.lock(timeout: lockTimeout, () { return ScopedReadContext.assumeReadLock( _UnscopedContext(this), callback); }); @@ -104,7 +104,7 @@ class WebDatabase CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); try { - return ScopedReadContext.assumeReadLock( + return await ScopedReadContext.assumeReadLock( _UnscopedContext(this), callback); } finally { await _database.customRequest( @@ -124,7 +124,11 @@ class WebDatabase bool? flush}) { return writeLock((writeContext) { return ScopedWriteContext.assumeWriteLock( - _UnscopedContext(this), callback); + _UnscopedContext(this), + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); }, debugContext: 'writeTransaction()', lockTimeout: lockTimeout, diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index fb98e17..4bbd86b 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -122,7 +122,7 @@ void main() { ['Test Data']); expect(rs.rows[0], equals(['Test Data'])); }); - expect(await savedTx!.getAutoCommit(), equals(true)); + expect(await db.getAutoCommit(), equals(true)); expect(savedTx!.closed, equals(true)); }); From d15545911878a20d2358782e48e621d3ce00d5e2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 16 Jun 2025 12:00:14 +0200 Subject: [PATCH 03/29] Add tests --- .../sqlite_async/lib/src/impl/context.dart | 1 + packages/sqlite_async/test/basic_test.dart | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart index e568b33..afc2ba5 100644 --- a/packages/sqlite_async/lib/src/impl/context.dart +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -162,6 +162,7 @@ final class ScopedWriteContext extends ScopedReadContext } rethrow; } finally { + _isLocked = false; inner?.invalidate(); } } diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index 4bbd86b..6a315da 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -189,6 +189,73 @@ void main() { ); }); + group('nested transaction', () { + const insert = 'INSERT INTO test_data (description) VALUES(?);'; + late SqliteDatabase db; + + setUp(() async { + db = await testUtils.setupDatabase(path: path); + await createTables(db); + }); + + tearDown(() => db.close()); + + test('run in outer transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('can rollback inner transaction', () async { + await db.writeTransaction((tx) async { + await tx.execute(insert, ['first']); + + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['second']); + }); + + await expectLater(() async { + await tx.writeTransaction((tx) async { + await tx.execute(insert, ['third']); + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(3)); + throw 'rollback please'; + }); + }, throwsA(anything)); + + expect(await tx.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + expect(await db.getAll('SELECT * FROM test_data'), hasLength(2)); + }); + + test('cannot use outer transaction while inner is active', () async { + await db.writeTransaction((outer) async { + await outer.writeTransaction((inner) async { + await expectLater(outer.execute('SELECT 1'), throwsStateError); + }); + }); + }); + + test('cannot use inner after leaving scope', () async { + await db.writeTransaction((tx) async { + late SqliteWriteContext inner; + await tx.writeTransaction((tx) async { + inner = tx; + }); + + await expectLater(inner.execute('SELECT 1'), throwsStateError); + }); + }); + }); + test('can use raw database instance', () async { final factory = await testUtils.testFactory(); final raw = await factory.openDatabaseForSingleConnection(); From 97bc2c83484b1a6ab6c7f929c666dbaae24f996c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 16 Jun 2025 14:07:54 +0200 Subject: [PATCH 04/29] Docs --- .../sqlite_async/lib/src/impl/context.dart | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart index afc2ba5..548681e 100644 --- a/packages/sqlite_async/lib/src/impl/context.dart +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -2,6 +2,13 @@ import 'package:sqlite3/common.dart'; import '../sqlite_connection.dart'; +/// A context that can be used to run both reading and writing queries - +/// basically a [SqliteWriteContext] without the ability to start transactions. +/// +/// Instances of this are not given out to clients - instead, they are wrapped +/// with [ScopedReadContext] and [ScopedWriteContext] after obtaining a lock. +/// Those wrapped views have a shorter lifetime (they can be closed +/// independently, and verify that they're not being used after being closed). abstract base class UnscopedContext implements SqliteReadContext { Future execute(String sql, List parameters); Future executeBatch(String sql, List> parameterSets); @@ -17,6 +24,7 @@ abstract base class UnscopedContext implements SqliteReadContext { } } +/// A view over an [UnscopedContext] implementing [SqliteReadContext]. final class ScopedReadContext implements SqliteReadContext { final UnscopedContext _context; @@ -91,6 +99,11 @@ final class ScopedReadContext implements SqliteReadContext { void invalidate() => _closed = true; + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] read-access to the database. + /// + /// Assumes that a read lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. static Future assumeReadLock( UnscopedContext unsafe, Future Function(SqliteReadContext) callback, @@ -178,6 +191,11 @@ final class ScopedWriteContext extends ScopedReadContext }; } + /// Creates a short-lived wrapper around the [unsafe] context to safely give + /// [callback] access to the database. + /// + /// Assumes that a write lock providing sound access to the inner + /// [UnscopedContext] is held until this future returns. static Future assumeWriteLock( UnscopedContext unsafe, Future Function(SqliteWriteContext) callback, From daa5e7a5bef3f91f9075f7ecb9b2054085fd8ce4 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 16 Jun 2025 14:14:58 +0200 Subject: [PATCH 05/29] Forward get and getOptional calls --- packages/sqlite_async/lib/src/impl/context.dart | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/sqlite_async/lib/src/impl/context.dart b/packages/sqlite_async/lib/src/impl/context.dart index 548681e..0e3eef6 100644 --- a/packages/sqlite_async/lib/src/impl/context.dart +++ b/packages/sqlite_async/lib/src/impl/context.dart @@ -72,8 +72,7 @@ final class ScopedReadContext implements SqliteReadContext { @override Future get(String sql, [List parameters = const []]) async { _checkNotLocked(); - final rows = await getAll(sql, parameters); - return rows.first; + return _context.get(sql, parameters); } @override @@ -93,8 +92,7 @@ final class ScopedReadContext implements SqliteReadContext { Future getOptional(String sql, [List parameters = const []]) async { _checkNotLocked(); - final rows = await getAll(sql, parameters); - return rows.firstOrNull; + return _context.getOptional(sql, parameters); } void invalidate() => _closed = true; From 4b5607625b1d3f37d6f1610ec562f7be2a79ec86 Mon Sep 17 00:00:00 2001 From: Jorge Sardina Date: Fri, 20 Jun 2025 12:52:04 +0200 Subject: [PATCH 06/29] Allow transforming table updates from sqlite_async. --- packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ .../lib/src/connection.dart | 20 ++++++++++++++----- packages/drift_sqlite_async/pubspec.yaml | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index b101ac4..fa769a1 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.3 + +- Allow transforming table updates from sqlite_async. + ## 0.2.2 - Fix write detection when using UPDATE/INSERT/DELETE with RETURNING in raw queries. diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index a1af55a..bcde1bf 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,12 +15,22 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; - SqliteAsyncDriftConnection(SqliteConnection db, {bool logStatements = false}) - : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { + SqliteAsyncDriftConnection( + SqliteConnection db, { + bool logStatements = false, + Set Function(UpdateNotification)? transformTableUpdate, + }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { - var setUpdates = {}; - for (var tableName in event.tables) { - setUpdates.add(TableUpdate(tableName)); + final Set setUpdates; + // This is useful to map local table names from PowerSync that are backed by a view name + // which is the entity that the user interacts with. + if (transformTableUpdate != null) { + setUpdates = transformTableUpdate(event); + } else { + setUpdates = {}; + for (var tableName in event.tables) { + setUpdates.add(TableUpdate(tableName)); + } } super.streamQueries.handleTableUpdates(setUpdates); }); diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 62a64da..c17e833 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.2 +version: 0.2.3 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. From a2b6c4cd2226f867569b03d25318b87d6aea6fde Mon Sep 17 00:00:00 2001 From: David Martos Date: Sun, 22 Jun 2025 12:35:16 +0200 Subject: [PATCH 07/29] typo --- packages/drift_sqlite_async/lib/src/connection.dart | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index bcde1bf..fa56e2e 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -18,14 +18,14 @@ class SqliteAsyncDriftConnection extends DatabaseConnection { SqliteAsyncDriftConnection( SqliteConnection db, { bool logStatements = false, - Set Function(UpdateNotification)? transformTableUpdate, + Set Function(UpdateNotification)? transformTableUpdates, }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { final Set setUpdates; // This is useful to map local table names from PowerSync that are backed by a view name // which is the entity that the user interacts with. - if (transformTableUpdate != null) { - setUpdates = transformTableUpdate(event); + if (transformTableUpdates != null) { + setUpdates = transformTableUpdates(event); } else { setUpdates = {}; for (var tableName in event.tables) { From 6e51f5dec53947909869fc7b889712f9f62c81a1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 29 Jul 2025 16:41:49 +0200 Subject: [PATCH 08/29] Support 3.8.0 of `package:sqlite3`. --- .../web/worker/throttled_common_database.dart | 28 +++++++++++-------- packages/sqlite_async/pubspec.yaml | 2 +- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart index 5f33b6d..8a0e901 100644 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -28,12 +28,14 @@ class ThrottledCommonDatabase extends CommonDatabase { DatabaseConfig get config => _db.config; @override - void createAggregateFunction( - {required String functionName, - required AggregateFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true}) { + void createAggregateFunction({ + required String functionName, + required AggregateFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true, + bool subtype = false, + }) { _db.createAggregateFunction(functionName: functionName, function: function); } @@ -44,12 +46,14 @@ class ThrottledCommonDatabase extends CommonDatabase { } @override - void createFunction( - {required String functionName, - required ScalarFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true}) { + void createFunction({ + required String functionName, + required ScalarFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true, + bool subtype = false, + }) { _db.createFunction(functionName: functionName, function: function); } diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index bb27d60..e610d7f 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -12,7 +12,7 @@ topics: - flutter dependencies: - sqlite3: ^2.7.2 + sqlite3: ^2.8.0 sqlite3_web: ^0.3.0 async: ^2.10.0 collection: ^1.17.0 From 40243bf761ecc0b5268edab6a28408f5bd44b4ff Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 15 Jul 2025 17:42:48 +0200 Subject: [PATCH 09/29] Drift: Support nested transactions --- packages/drift_sqlite_async/CHANGELOG.md | 4 +++ .../drift_sqlite_async/lib/src/executor.dart | 17 ++++++++++- packages/drift_sqlite_async/pubspec.yaml | 6 ++-- packages/drift_sqlite_async/test/db_test.dart | 30 +++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index b101ac4..e70cd40 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.3 + +- Support nested transactions. + ## 0.2.2 - Fix write detection when using UPDATE/INSERT/DELETE with RETURNING in raw queries. diff --git a/packages/drift_sqlite_async/lib/src/executor.dart b/packages/drift_sqlite_async/lib/src/executor.dart index 41886f3..127462e 100644 --- a/packages/drift_sqlite_async/lib/src/executor.dart +++ b/packages/drift_sqlite_async/lib/src/executor.dart @@ -129,13 +129,28 @@ class _SqliteAsyncTransactionDelegate extends SupportedTransactionDelegate { _SqliteAsyncTransactionDelegate(this._db); + @override + FutureOr Function(QueryDelegate, Future Function(QueryDelegate))? + get startNested => _startNested; + @override Future startTransaction(Future Function(QueryDelegate p1) run) async { - await _db.writeTransaction((context) async { + await _startTransaction(_db, run); + } + + Future _startTransaction( + SqliteWriteContext context, Future Function(QueryDelegate p1) run) async { + await context.writeTransaction((context) async { final delegate = _SqliteAsyncQueryDelegate(context, null); return run(delegate); }); } + + Future _startNested( + QueryDelegate outer, Future Function(QueryDelegate) block) async { + await _startTransaction( + (outer as _SqliteAsyncQueryDelegate)._context, block); + } } class _SqliteAsyncVersionDelegate extends DynamicVersionDelegate { diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 62a64da..aea987f 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.2 +version: 0.2.3 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -14,12 +14,12 @@ topics: environment: sdk: ">=3.0.0 <4.0.0" dependencies: - drift: ">=2.19.0 <3.0.0" + drift: ">=2.28.0 <3.0.0" sqlite_async: ^0.11.0 dev_dependencies: build_runner: ^2.4.8 - drift_dev: ">=2.19.0 <3.0.0" + drift_dev: ">=2.28.0 <3.0.0" glob: ^2.1.2 lints: ^5.0.0 sqlite3: ^2.4.0 diff --git a/packages/drift_sqlite_async/test/db_test.dart b/packages/drift_sqlite_async/test/db_test.dart index bda09df..f5b269a 100644 --- a/packages/drift_sqlite_async/test/db_test.dart +++ b/packages/drift_sqlite_async/test/db_test.dart @@ -117,5 +117,35 @@ void main() { final deleted = await dbu.delete(dbu.todoItems).go(); expect(deleted, 10); }); + + test('nested transactions', () async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'root')); + + await dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx0')); + + await dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx1')); + + await expectLater(() { + return dbu.transaction(() async { + await dbu + .into(dbu.todoItems) + .insert(TodoItemsCompanion.insert(description: 'tx2')); + throw 'rollback'; + }); + }, throwsA(anything)); + }); + }); + + final items = await dbu.todoItems.all().get(); + expect(items.map((e) => e.description).toSet(), {'root', 'tx0', 'tx1'}); + }); }); } From 1f720565467cd16400c4c97e871d70947fb1ea33 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 15 Jul 2025 17:47:12 +0200 Subject: [PATCH 10/29] Prepare for release --- packages/drift_sqlite_async/pubspec.yaml | 2 +- packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index aea987f..77f250d 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.11.0 + sqlite_async: ^0.11.8 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 97dbf50..eedfb93 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.11.8 + +- Support nested transactions (emulated with `SAVEPOINT` statements). + ## 0.11.7 - Shared worker: Release locks owned by connected client tab when it closes. diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index e610d7f..a155f2b 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.7 +version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" From 17012edf88a498e0c04255dbb4e0f6a2d054c299 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 29 Jul 2025 17:29:02 +0200 Subject: [PATCH 11/29] Update changelog --- CHANGELOG.md | 24 ++++++++++++++++++++++++ packages/sqlite_async/CHANGELOG.md | 1 + 2 files changed, 25 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9cda07..cafde71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,30 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-07-29 + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.11.8`](#sqlite_async---v0118) + - [`drift_sqlite_async` - `v0.2.3`](#drift_sqlite_async---v023) + +--- + +#### `sqlite_async` - `v0.11.8` + +- Support nested transactions (emulated with `SAVEPOINT` statements). +- Fix web compilation issues with version `2.8.0` of `package:sqlite3`. + +#### `drift_sqlite_async` - `v0.2.3` + +- Support nested transactions. + ## 2025-06-03 --- diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index eedfb93..387c944 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,6 +1,7 @@ ## 0.11.8 - Support nested transactions (emulated with `SAVEPOINT` statements). +- Fix web compilation issues with version `2.8.0` of `package:sqlite3`. ## 0.11.7 From 7e0a0c74a89b7481e42a882f2b0fdc97122b684c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 10:30:30 +0200 Subject: [PATCH 12/29] Refactor update streams on web --- .../lib/src/utils/shared_utils.dart | 69 +++++++ .../sqlite_async/lib/src/web/database.dart | 8 +- .../sqlite_async/lib/src/web/protocol.dart | 2 + .../lib/src/web/update_notifications.dart | 57 ++++++ .../lib/src/web/web_sqlite_open_factory.dart | 14 +- .../web/worker/throttled_common_database.dart | 191 ------------------ .../lib/src/web/worker/worker_utils.dart | 48 ++++- packages/sqlite_async/lib/web.dart | 25 ++- 8 files changed, 205 insertions(+), 209 deletions(-) create mode 100644 packages/sqlite_async/lib/src/web/update_notifications.dart delete mode 100644 packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 9faf928..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:convert'; +import 'package:sqlite3/common.dart'; + import '../sqlite_connection.dart'; Future internalReadTransaction(SqliteReadContext ctx, @@ -75,3 +77,70 @@ Object? mapParameter(Object? parameter) { List mapParameters(List parameters) { return [for (var p in parameters) mapParameter(p)]; } + +extension ThrottledUpdates on CommonDatabase { + /// Wraps [updatesSync] to: + /// + /// - Not fire in transactions. + /// - Fire asynchronously. + /// - Only report table names, which are buffered to avoid duplicates. + Stream> get throttledUpdatedTables { + StreamController>? controller; + var pendingUpdates = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (pendingUpdates.isNotEmpty) { + controller!.add(pendingUpdates); + pendingUpdates = {}; + } + } + + void collectUpdate(SqliteUpdate event) { + pendingUpdates.add(event.tableName); + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = commits.listen((_) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 3e0797b..cfaf987 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -21,6 +21,9 @@ class WebDatabase final Mutex? _mutex; final bool profileQueries; + @override + final Stream updates; + /// For persistent databases that aren't backed by a shared worker, we use /// web broadcast channels to forward local update events to other tabs. final BroadcastUpdates? broadcastUpdates; @@ -32,6 +35,7 @@ class WebDatabase this._database, this._mutex, { required this.profileQueries, + required this.updates, this.broadcastUpdates, }); @@ -113,10 +117,6 @@ class WebDatabase } } - @override - Stream get updates => - _database.updates.map((event) => UpdateNotification({event.tableName})); - @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index cb3a5fd..d17c06b 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -13,6 +13,8 @@ enum CustomDatabaseMessageKind { getAutoCommit, executeInTransaction, executeBatchInTransaction, + updateSubscriptionManagement, + notifyUpdates, } extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart new file mode 100644 index 0000000..ecea60d --- /dev/null +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -0,0 +1,57 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:sqlite3_web/sqlite3_web.dart'; + +import '../update_notification.dart'; +import 'protocol.dart'; + +/// Utility to request a stream of update notifications from the worker. +/// +/// Because we want to debounce update notifications on the worker, we're using +/// custom requests instead of the default [Database.updates] stream. +/// +/// Clients send a message to the worker to subscribe or unsubscribe, providing +/// an id for the subscription. The worker distributes update notifications with +/// custom requests to the client, which [handleRequest] distributes to the +/// original streams. +final class UpdateNotificationStreams { + var _idCounter = 0; + final Map> _updates = {}; + + Future handleRequest(JSAny? request) async { + final customRequest = request as CustomDatabaseMessage; + if (customRequest.kind == CustomDatabaseMessageKind.notifyUpdates) { + final notification = UpdateNotification(customRequest.rawParameters.toDart + .map((e) => (e as JSString).toDart) + .toSet()); + + _updates[customRequest.rawSql.toDart]?.add(notification); + } + + return null; + } + + Stream updatesFor(Database database) { + final id = (_idCounter++).toString(); + final controller = _updates[id] = StreamController(); + + controller + ..onListen = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [true], + )); + } + ..onCancel = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [false], + )); + }; + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index a724329..205734f 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -69,15 +69,19 @@ class DefaultSqliteOpenFactory ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier - BroadcastUpdates? updates; + BroadcastUpdates? broadcastUpdates; if (connection.access != AccessMode.throughSharedWorker && connection.storage != StorageMode.inMemory) { - updates = BroadcastUpdates(path); + broadcastUpdates = BroadcastUpdates(path); } - return WebDatabase(connection.database, options.mutex ?? mutex, - broadcastUpdates: updates, - profileQueries: sqliteOptions.profileQueries); + return WebDatabase( + connection.database, + options.mutex ?? mutex, + broadcastUpdates: broadcastUpdates, + profileQueries: sqliteOptions.profileQueries, + updates: updatesFor(connection.database), + ); } @override diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart deleted file mode 100644 index 8a0e901..0000000 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ /dev/null @@ -1,191 +0,0 @@ -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_wasm.dart'; - -/// Wrap a CommonDatabase to throttle its updates stream. -/// This is so that we can throttle the updates _within_ -/// the worker process, avoiding mass notifications over -/// the MessagePort. -class ThrottledCommonDatabase extends CommonDatabase { - final CommonDatabase _db; - final StreamController _transactionController = - StreamController.broadcast(); - - ThrottledCommonDatabase(this._db); - - @override - int get userVersion => _db.userVersion; - - @override - set userVersion(int userVersion) { - _db.userVersion = userVersion; - } - - @override - bool get autocommit => _db.autocommit; - - @override - DatabaseConfig get config => _db.config; - - @override - void createAggregateFunction({ - required String functionName, - required AggregateFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createAggregateFunction(functionName: functionName, function: function); - } - - @override - void createCollation( - {required String name, required CollatingFunction function}) { - _db.createCollation(name: name, function: function); - } - - @override - void createFunction({ - required String functionName, - required ScalarFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createFunction(functionName: functionName, function: function); - } - - @override - void dispose() { - _db.dispose(); - } - - @override - void execute(String sql, [List parameters = const []]) { - _db.execute(sql, parameters); - } - - @override - int getUpdatedRows() { - // ignore: deprecated_member_use - return _db.getUpdatedRows(); - } - - @override - int get lastInsertRowId => _db.lastInsertRowId; - - @override - CommonPreparedStatement prepare(String sql, - {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { - return _db.prepare(sql, - persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); - } - - @override - List prepareMultiple(String sql, - {bool persistent = false, bool vtab = true}) { - return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); - } - - @override - ResultSet select(String sql, [List parameters = const []]) { - bool preAutocommit = _db.autocommit; - final result = _db.select(sql, parameters); - bool postAutocommit = _db.autocommit; - if (!preAutocommit && postAutocommit) { - _transactionController.add(true); - } - return result; - } - - @override - int get updatedRows => _db.updatedRows; - - @override - Stream get updates { - return throttledUpdates(_db, _transactionController.stream); - } - - @override - VoidPredicate? get commitFilter => _db.commitFilter; - - @override - set commitFilter(VoidPredicate? filter) => _db.commitFilter = filter; - - @override - Stream get commits => _db.commits; - - @override - Stream get rollbacks => _db.rollbacks; -} - -/// This throttles the database update stream to: -/// 1. Trigger max once every 1ms. -/// 2. Only trigger _after_ transactions. -Stream throttledUpdates( - CommonDatabase source, Stream transactionStream) { - StreamController? controller; - Set pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; - } - - if (!source.autocommit) { - // Inside a transaction - do not fire updates - return; - } - - if (pendingUpdates.isNotEmpty) { - for (var update in pendingUpdates) { - controller!.add(update); - } - - pendingUpdates.clear(); - } - } - - void collectUpdate(SqliteUpdate event) { - // We merge updates with the same kind and tableName. - // rowId is never used in sqlite_async. - pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); - - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - } - - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = transactionStream.listen((event) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 3ecb257..7603306 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; @@ -6,8 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; - -import 'throttled_common_database.dart'; +import 'package:sqlite_async/src/utils/database_utils.dart'; import '../protocol.dart'; @@ -22,13 +22,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - final throttled = ThrottledCommonDatabase(db); - - return AsyncSqliteDatabase(database: throttled); + return AsyncSqliteDatabase(database: db); } - /// Opens a database with the `sqlite3` package that will be wrapped in a - /// [ThrottledCommonDatabase] for [openDatabase]. @visibleForOverriding CommonDatabase openUnderlying( WasmSqlite3 sqlite3, @@ -51,6 +47,7 @@ base class AsyncSqliteController extends DatabaseController { class AsyncSqliteDatabase extends WorkerDatabase { @override final CommonDatabase database; + final Stream> _updates; // This mutex is only used for lock requests from clients. Clients only send // these requests for shared workers, so we can assume each database is only @@ -58,7 +55,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final mutex = ReadWriteMutex(); final Map _state = {}; - AsyncSqliteDatabase({required this.database}); + AsyncSqliteDatabase({required this.database}) + : _updates = database.throttledUpdatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -67,9 +65,15 @@ class AsyncSqliteDatabase extends WorkerDatabase { void _markHoldsMutex(ClientConnection connection) { final state = _findState(connection); state.holdsMutex = true; + _registerCloseListener(state, connection); + } + + void _registerCloseListener( + _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { state.hasOnCloseListener = true; connection.closed.then((_) { + state.unsubscribeUpdates(); if (state.holdsMutex) { mutex.release(); } @@ -93,6 +97,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { _findState(connection).holdsMutex = false; mutex.release(); case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; @@ -129,6 +134,25 @@ class AsyncSqliteDatabase extends WorkerDatabase { "Transaction rolled back by earlier statement. Cannot execute: $sql"); } database.execute(sql, parameters); + case CustomDatabaseMessageKind.updateSubscriptionManagement: + final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final id = message.rawSql.toDart; + final state = _findState(connection); + + if (shouldSubscribe) { + state.unsubscribeUpdates(); + _registerCloseListener(state, connection); + + state.updatesNotification = _updates.listen((tables) { + connection.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.notifyUpdates, + id, + tables.toList(), + )); + }); + } else { + state.unsubscribeUpdates(); + } } return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); @@ -148,4 +172,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { final class _ConnectionState { bool hasOnCloseListener = false; bool holdsMutex = false; + StreamSubscription>? updatesNotification; + + void unsubscribeUpdates() { + if (updatesNotification case final active?) { + updatesNotification = null; + active.cancel(); + } + } } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index 3a65115..f151b81 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -4,12 +4,15 @@ /// workers. library; +import 'dart:js_interop'; + import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:web/web.dart'; import 'sqlite3_common.dart'; import 'sqlite_async.dart'; import 'src/web/database.dart'; +import 'src/web/update_notifications.dart'; /// An endpoint that can be used, by any running JavaScript context in the same /// website, to connect to an existing [WebSqliteConnection]. @@ -33,6 +36,13 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { + final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + + /// Handles a custom request sent from the worker to the client. + Future handleCustomRequest(JSAny? request) { + return _updateStreams.handleRequest(request); + } + /// Opens a [WebSqlite] instance for the given [options]. /// /// This method can be overriden in scenarios where the way [WebSqlite] is @@ -43,6 +53,7 @@ abstract mixin class WebSqliteOpenFactory return WebSqlite.open( worker: Uri.parse(options.workerUri), wasmModule: Uri.parse(options.wasmUri), + handleCustomRequest: handleCustomRequest, ); } @@ -54,6 +65,14 @@ abstract mixin class WebSqliteOpenFactory WebSqlite sqlite, String name) { return sqlite.connectToRecommended(name); } + + /// Obtains a stream of [UpdateNotification]s from a [database]. + /// + /// The default implementation uses custom requests to allow workers to + /// debounce the stream on their side to avoid messages where possible. + Stream updatesFor(Database database) { + return _updateStreams.updatesFor(database); + } } /// A [SqliteConnection] interface implemented by opened connections when @@ -85,8 +104,11 @@ abstract class WebSqliteConnection implements SqliteConnection { /// contexts to exchange opened database connections. static Future connectToEndpoint( WebDatabaseEndpoint endpoint) async { + final updates = UpdateNotificationStreams(); final rawSqlite = await WebSqlite.connectToPort( - (endpoint.connectPort, endpoint.connectName)); + (endpoint.connectPort, endpoint.connectName), + handleCustomRequest: updates.handleRequest, + ); final database = WebDatabase( rawSqlite, @@ -95,6 +117,7 @@ abstract class WebSqliteConnection implements SqliteConnection { null => null, }, profileQueries: false, + updates: updates.updatesFor(rawSqlite), ); return database; } From 64260569f6491a137dd680d7df62938befef48e0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:02:24 +0200 Subject: [PATCH 13/29] Fix tests --- .../native_sqlite_connection_impl.dart | 43 +++---------------- .../lib/src/utils/shared_utils.dart | 1 + .../lib/src/web/update_notifications.dart | 5 ++- .../lib/src/web/web_sqlite_open_factory.dart | 11 ++--- packages/sqlite_async/lib/web.dart | 4 +- scripts/sqlite3_wasm_download.dart | 2 +- 6 files changed, 19 insertions(+), 47 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index e2df0f3..301d0af 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,40 +303,13 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - Timer? updateDebouncer; - Set updatedTables = {}; + db.throttledUpdatedTables.listen((changedTables) { + client.fire(UpdateNotification(changedTables)); + }); + int? txId; Object? txError; - void maybeFireUpdates() { - // We keep buffering the set of updated tables until we are not - // in a transaction. Firing transactions inside a transaction - // has multiple issues: - // 1. Watched queries would detect changes to the underlying tables, - // but the data would not be visible to queries yet. - // 2. It would trigger many more notifications than required. - // - // This still includes updates for transactions that are rolled back. - // We could handle those better at a later stage. - - if (updatedTables.isNotEmpty && db.autocommit) { - client.fire(UpdateNotification(updatedTables)); - updatedTables.clear(); - } - updateDebouncer?.cancel(); - updateDebouncer = null; - } - - db.updates.listen((event) { - updatedTables.add(event.tableName); - - // This handles two cases: - // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). - // 2. Long-running _SqliteIsolateClosure that should fire updates while running. - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - }); - ResultSet runStatement(_SqliteIsolateStatement data) { if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { if (txId != null) { @@ -388,8 +361,6 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, throw sqlite.SqliteException( 0, 'Transaction must be closed within the read or write lock'); } - // We would likely have received updates by this point - fire now. - maybeFireUpdates(); return null; case _SqliteIsolateStatement(): return task.timeSync( @@ -399,11 +370,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, parameters: data.args, ); case _SqliteIsolateClosure(): - try { - return await data.cb(db); - } finally { - maybeFireUpdates(); - } + return await data.cb(db); case _SqliteIsolateConnectionClose(): db.dispose(); return null; diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..0e9c9d6 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,6 +123,7 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { + print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart index ecea60d..f04a785 100644 --- a/packages/sqlite_async/lib/src/web/update_notifications.dart +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -26,7 +26,8 @@ final class UpdateNotificationStreams { .map((e) => (e as JSString).toDart) .toSet()); - _updates[customRequest.rawSql.toDart]?.add(notification); + final controller = _updates[customRequest.rawSql.toDart]; + controller?.add(notification); } return null; @@ -50,6 +51,8 @@ final class UpdateNotificationStreams { id, [false], )); + + _updates.remove(id); }; return controller.stream; diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 205734f..513b1f2 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -10,7 +10,7 @@ import 'package:sqlite_async/web.dart'; import 'database.dart'; import 'worker/worker_utils.dart'; -Map> webSQLiteImplementations = {}; +Map> _webSQLiteImplementations = {}; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -20,13 +20,13 @@ class DefaultSqliteOpenFactory final cacheKey = sqliteOptions.webSqliteOptions.wasmUri + sqliteOptions.webSqliteOptions.workerUri; - if (webSQLiteImplementations.containsKey(cacheKey)) { - return webSQLiteImplementations[cacheKey]!; + if (_webSQLiteImplementations.containsKey(cacheKey)) { + return _webSQLiteImplementations[cacheKey]!; } - webSQLiteImplementations[cacheKey] = + _webSQLiteImplementations[cacheKey] = openWebSqlite(sqliteOptions.webSqliteOptions); - return webSQLiteImplementations[cacheKey]!; + return _webSQLiteImplementations[cacheKey]!; }); DefaultSqliteOpenFactory( @@ -42,6 +42,7 @@ class DefaultSqliteOpenFactory wasmModule: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), worker: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), controller: AsyncSqliteController(), + handleCustomRequest: handleCustomRequest, ); } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index f151b81..e318697 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -29,6 +29,8 @@ typedef WebDatabaseEndpoint = ({ String? lockName, }); +final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + /// An additional interface for [SqliteOpenFactory] exposing additional /// functionality that is only relevant when compiling to the web. /// @@ -36,8 +38,6 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { - final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); - /// Handles a custom request sent from the worker to the client. Future handleCustomRequest(JSAny? request) { return _updateStreams.handleRequest(request); diff --git a/scripts/sqlite3_wasm_download.dart b/scripts/sqlite3_wasm_download.dart index 62acbbe..9716eee 100644 --- a/scripts/sqlite3_wasm_download.dart +++ b/scripts/sqlite3_wasm_download.dart @@ -4,7 +4,7 @@ library; import 'dart:io'; final sqliteUrl = - 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.4.3/sqlite3.wasm'; + 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.8.0/sqlite3.wasm'; void main() async { // Create assets directory if it doesn't exist From bc66337140941e8e57dab988a2339b70c8a02273 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:18:58 +0200 Subject: [PATCH 14/29] Remove debug print --- packages/sqlite_async/lib/src/utils/shared_utils.dart | 1 - packages/sqlite_async/pubspec.yaml | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 0e9c9d6..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,7 +123,6 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { - print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index a155f2b..7d7c1c3 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.5.0 <4.0.0" + sdk: ">=3.6.0 <4.0.0" topics: - sqlite @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.8.0 - sqlite3_web: ^0.3.0 + sqlite3: ^2.9.0 + sqlite3_web: ^0.3.1 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From ed46766541cc90a5c2f459772665deacba7f988b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 14:39:11 +0200 Subject: [PATCH 15/29] Update sqlite3_web as well --- packages/sqlite_async/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 7d7c1c3..586bcc6 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -13,7 +13,7 @@ topics: dependencies: sqlite3: ^2.9.0 - sqlite3_web: ^0.3.1 + sqlite3_web: ^0.3.2 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From e0a1a3c0424892aeed965814d4538ac75fc07de0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 15:44:28 +0200 Subject: [PATCH 16/29] Fix import --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 7603306..265f4f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -7,7 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; -import 'package:sqlite_async/src/utils/database_utils.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import '../protocol.dart'; From 6f4c6a065c8da6805e633d92fa9d75cbb09bb0a1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 10:49:36 +0200 Subject: [PATCH 17/29] Avoid updating minimum SDK constraint --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 3 ++- packages/sqlite_async/pubspec.yaml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 265f4f7..0c3e8f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -135,7 +135,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { } database.execute(sql, parameters); case CustomDatabaseMessageKind.updateSubscriptionManagement: - final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final shouldSubscribe = + (message.rawParameters.toDart[0] as JSBoolean).toDart; final id = message.rawSql.toDart; final state = _findState(connection); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 586bcc6..464bd2d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.6.0 <4.0.0" + sdk: ">=3.5.0 <4.0.0" topics: - sqlite From eaab81ea6aacf8e8ac3da08f1d32cb6db91bdf6a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 11:18:17 +0200 Subject: [PATCH 18/29] chore(release): publish packages - sqlite_async@0.12.0 - drift_sqlite_async@0.2.3+1 --- CHANGELOG.md | 28 ++++++++++++++++++++++++ packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ packages/drift_sqlite_async/pubspec.yaml | 4 ++-- packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 5 files changed, 39 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cafde71..93a8b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,34 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-08-08 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) + - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) + +Packages with dependency updates only: + +> Packages listed below depend on other packages in this workspace that have had changes. Their versions have been incremented to bump the minimum dependency versions of the packages they depend upon in this project. + + - `drift_sqlite_async` - `v0.2.3+1` + +--- + +#### `sqlite_async` - `v0.12.0` + + - Avoid large transactions creating a large internal update queue. + + ## 2025-07-29 --- diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index e70cd40..7c381da 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.3+1 + + - Update a dependency to the latest release. + ## 0.2.3 - Support nested transactions. diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 77f250d..b865809 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.3 +version: 0.2.3+1 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.11.8 + sqlite_async: ^0.12.0 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 387c944..8cda2a6 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.0 + + - Avoid large transactions creating a large internal update queue. + ## 0.11.8 - Support nested transactions (emulated with `SAVEPOINT` statements). diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 464bd2d..6e62e52 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.8 +version: 0.12.0 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" From 0e2fa41567e96f181cd43d5bb25a3321c941c78f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:39:36 +0200 Subject: [PATCH 19/29] Make table updates a multi-subscription stream --- .../native_sqlite_connection_impl.dart | 2 +- .../lib/src/utils/shared_utils.dart | 123 ++++++++++-------- .../lib/src/web/worker/worker_utils.dart | 3 +- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 301d0af..e90739e 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,7 +303,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - db.throttledUpdatedTables.listen((changedTables) { + db.updatedTables.listen((changedTables) { client.fire(UpdateNotification(changedTables)); }); diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..ad9a2f0 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -79,68 +79,87 @@ List mapParameters(List parameters) { } extension ThrottledUpdates on CommonDatabase { - /// Wraps [updatesSync] to: + /// An unthrottled stream of updated tables that emits on every commit. /// - /// - Not fire in transactions. - /// - Fire asynchronously. - /// - Only report table names, which are buffered to avoid duplicates. - Stream> get throttledUpdatedTables { - StreamController>? controller; - var pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; + /// A paused subscription on this stream will buffer changed tables into a + /// growing set instead of losing events, so this stream is simple to throttle + /// downstream. + Stream> get updatedTables { + final listeners = <_UpdateListener>[]; + var uncommitedUpdates = {}; + var underlyingSubscriptions = >[]; + + void handleUpdate(SqliteUpdate update) { + uncommitedUpdates.add(update.tableName); + } + + void afterCommit() { + for (final listener in listeners) { + listener.notify(uncommitedUpdates); } - if (!autocommit) { - // Inside a transaction - do not fire updates - return; + uncommitedUpdates.clear(); + } + + void afterRollback() { + uncommitedUpdates.clear(); + } + + void addListener(_UpdateListener listener) { + listeners.add(listener); + + if (listeners.length == 1) { + // First listener, start listening for raw updates on underlying + // database. + underlyingSubscriptions = [ + updatesSync.listen(handleUpdate), + commits.listen((_) => afterCommit()), + commits.listen((_) => afterRollback()) + ]; } + } - if (pendingUpdates.isNotEmpty) { - controller!.add(pendingUpdates); - pendingUpdates = {}; + void removeListener(_UpdateListener listener) { + listeners.remove(listener); + if (listeners.isEmpty) { + for (final sub in underlyingSubscriptions) { + sub.cancel(); + } } } - void collectUpdate(SqliteUpdate event) { - pendingUpdates.add(event.tableName); + return Stream.multi( + (listener) { + final wrapped = _UpdateListener(listener); + addListener(wrapped); - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); + listener.onCancel = () => removeListener(wrapped); + }, + isBroadcast: true, + ); + } +} + +class _UpdateListener { + final MultiStreamController> downstream; + Set buffered = {}; + + _UpdateListener(this.downstream); + + void notify(Set pendingUpdates) { + buffered.addAll(pendingUpdates); + if (!downstream.isPaused) { + downstream.add(buffered); + buffered = {}; } + } +} - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = commits.listen((_) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; +extension StreamUtils on Stream { + Stream pauseAfterEvent(Duration duration) async* { + await for (final event in this) { + yield event; + await Future.delayed(duration); + } } } diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 0c3e8f7..a6dae4c 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -56,7 +56,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.throttledUpdatedTables; + : _updates = database.updatedTables + .pauseAfterEvent(const Duration(milliseconds: 1)); _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); From e0938c4ef0138ee7b77b5911f54fa44fe8722f58 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:45:53 +0200 Subject: [PATCH 20/29] Support multiple listeners for table updates --- packages/sqlite_async/lib/src/utils/shared_utils.dart | 9 --------- .../sqlite_async/lib/src/web/worker/worker_utils.dart | 10 +++++----- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ad9a2f0..ae38e85 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -154,12 +154,3 @@ class _UpdateListener { } } } - -extension StreamUtils on Stream { - Stream pauseAfterEvent(Duration duration) async* { - await for (final event in this) { - yield event; - await Future.delayed(duration); - } - } -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index a6dae4c..059c281 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -56,8 +56,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.updatedTables - .pauseAfterEvent(const Duration(milliseconds: 1)); + : _updates = database.updatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -145,12 +144,13 @@ class AsyncSqliteDatabase extends WorkerDatabase { state.unsubscribeUpdates(); _registerCloseListener(state, connection); - state.updatesNotification = _updates.listen((tables) { - connection.customRequest(CustomDatabaseMessage( + late StreamSubscription subscription; + subscription = state.updatesNotification = _updates.listen((tables) { + subscription.pause(connection.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.notifyUpdates, id, tables.toList(), - )); + ))); }); } else { state.unsubscribeUpdates(); From b9c8d2d6a9879eda6e7293ab967c529f1cb9e9af Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:48:40 +0200 Subject: [PATCH 21/29] Prepare release --- CHANGELOG.md | 5 +++++ packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93a8b0d..821f8db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Packages with breaking changes: Packages with other changes: + - [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121) - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) @@ -26,6 +27,10 @@ Packages with dependency updates only: --- +#### `sqlite_async` - `v0.12.1` + +- Fix distributing updates from shared worker. + #### `sqlite_async` - `v0.12.0` - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 8cda2a6..342adca 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.1 + +- Fix distributing updates from shared worker. + ## 0.12.0 - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 6e62e52..4b45d57 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.12.0 +version: 0.12.1 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" From 6705d13335df92b8f548a182e53a097a91c16e37 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:57:24 +0200 Subject: [PATCH 22/29] Add tests --- .../lib/src/utils/shared_utils.dart | 7 +++ .../sqlite_async/test/native/watch_test.dart | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ae38e85..542611e 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -133,6 +133,7 @@ extension ThrottledUpdates on CommonDatabase { final wrapped = _UpdateListener(listener); addListener(wrapped); + listener.onResume = wrapped.addPending; listener.onCancel = () => removeListener(wrapped); }, isBroadcast: true, @@ -149,6 +150,12 @@ class _UpdateListener { void notify(Set pendingUpdates) { buffered.addAll(pendingUpdates); if (!downstream.isPaused) { + addPending(); + } + } + + void addPending() { + if (buffered.isNotEmpty) { downstream.add(buffered); buffered = {}; } diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 4e4fb83..0a97b17 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -7,6 +7,7 @@ import 'dart:math'; import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:test/test.dart'; import '../utils/test_utils_impl.dart'; @@ -31,6 +32,51 @@ void main() { return db; }); + test('raw update notifications', () async { + final factory = await testUtils.testFactory(path: path); + final db = factory + .openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + + db.execute('CREATE TABLE a (bar INTEGER);'); + db.execute('CREATE TABLE b (bar INTEGER);'); + final events = >[]; + final subscription = db.updatedTables.listen(events.add); + + db.execute('insert into a default values'); + expect(events, isEmpty); // should be async + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('insert into b default values'); + await pumpEventQueue(); + expect(events, isEmpty); // should only trigger on commit + db.execute('commit'); + + await pumpEventQueue(); + expect(events.removeLast(), {'a', 'b'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('rollback'); + expect(events, isEmpty); + await pumpEventQueue(); + expect(events, isEmpty); // should ignore cancelled transactions + + // Should still listen during pause, and dispatch on resume + subscription.pause(); + db.execute('insert into a default values'); + await pumpEventQueue(); + expect(events, isEmpty); + + subscription.resume(); + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + subscription.pause(); + }); + test('watch in isolate', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); From d76435962555b05f472ae0d2aa2621f0ead38004 Mon Sep 17 00:00:00 2001 From: David Martos Date: Fri, 19 Sep 2025 12:11:17 +0200 Subject: [PATCH 23/29] update doc --- packages/drift_sqlite_async/lib/src/connection.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index fa56e2e..6c34ed4 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,6 +15,8 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; + /// [transformTableUpdates] is useful to map local table names from PowerSync that are backed by a view name + /// which is the entity that the user interacts with. SqliteAsyncDriftConnection( SqliteConnection db, { bool logStatements = false, @@ -22,8 +24,6 @@ class SqliteAsyncDriftConnection extends DatabaseConnection { }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { final Set setUpdates; - // This is useful to map local table names from PowerSync that are backed by a view name - // which is the entity that the user interacts with. if (transformTableUpdates != null) { setUpdates = transformTableUpdates(event); } else { From 09b841ba9b34a1e309504808e399695239e08c17 Mon Sep 17 00:00:00 2001 From: David Martos Date: Fri, 19 Sep 2025 14:14:10 +0200 Subject: [PATCH 24/29] add test --- .../drift_sqlite_async/test/basic_test.dart | 40 +++++++++++++++++++ .../test/generated/database.dart | 2 + 2 files changed, 42 insertions(+) diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index 339cc04..dd7a159 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -11,6 +11,7 @@ import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import './utils/test_utils.dart'; +import 'generated/database.dart'; class EmptyDatabase extends GeneratedDatabase { EmptyDatabase(super.executor); @@ -245,4 +246,43 @@ INSERT INTO test_data(description) VALUES('test data'); expect(row, isEmpty); }); }); + + test('transform table updates', () async { + final path = dbPath(); + await cleanDb(path: path); + + final db = await setupDatabase(path: path); + final connection = SqliteAsyncDriftConnection( + db, + // tables with the local_ prefix are mapped to the name without the prefix + transformTableUpdates: (event) { + final updates = {}; + + for (final originalTableName in event.tables) { + final effectiveName = originalTableName.startsWith("local_") + ? originalTableName.substring(6) + : originalTableName; + updates.add(TableUpdate(effectiveName)); + } + + return updates; + }, + ); + + // Create table with a different name than drift. (Mimicking a table name backed by a view in PowerSync with the optional sync strategy) + await db.execute( + 'CREATE TABLE local_todos(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)', + ); + + final dbu = TodoDatabase.fromSqliteAsyncConnection(connection); + + final tableUpdatesFut = + dbu.tableUpdates(TableUpdateQuery.onTableName("todos")).first; + + // This insert will trigger the sqlite_async "updates" stream + await db.execute("INSERT INTO local_todos(description) VALUES('Test 1')"); + + expect(await tableUpdatesFut.timeout(const Duration(seconds: 2)), + {TableUpdate("todos")}); + }); } diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index 928c7dd..b8a5109 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -15,6 +15,8 @@ class TodoItems extends Table { @DriftDatabase(tables: [TodoItems]) class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); + + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); @override int get schemaVersion => 1; From 0a3d31e3b686ae23673f458eef1e6afb8658acf4 Mon Sep 17 00:00:00 2001 From: David Martos Date: Thu, 25 Sep 2025 19:17:54 +0200 Subject: [PATCH 25/29] Update basic_test.dart Remove legacy skip. It now supports nested transactions --- packages/drift_sqlite_async/test/basic_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index dd7a159..cac6f55 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -183,7 +183,7 @@ void main() { {'description': 'Test 1'}, {'description': 'Test 3'} ])); - }, skip: 'sqlite_async does not support nested transactions'); + }); test('Concurrent select', () async { var completer1 = Completer(); From 1252190e3522de429b751ecaadeb301137cb3d68 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 29 Sep 2025 16:07:25 +0200 Subject: [PATCH 26/29] Run CI for pull requests --- .github/workflows/test.yaml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cb1814b..0e0d34b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -3,13 +3,15 @@ name: Test on: push: branches: - - "**" + - "*" + pull_request: jobs: build: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 - name: Install Melos @@ -29,6 +31,7 @@ jobs: test: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) strategy: fail-fast: false matrix: @@ -49,7 +52,7 @@ jobs: sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" dart_sdk: stable steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 with: sdk: ${{ matrix.dart_sdk }} From 0234d4fa469702f2709d6933ba199ffac9a19941 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 29 Sep 2025 16:07:40 +0200 Subject: [PATCH 27/29] Format --- packages/drift_sqlite_async/test/generated/database.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index b8a5109..4e03600 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -15,7 +15,7 @@ class TodoItems extends Table { @DriftDatabase(tables: [TodoItems]) class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); - + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); @override From b97be616b5ab5095480cf71848a74d812ee74fac Mon Sep 17 00:00:00 2001 From: David Martos Date: Tue, 7 Oct 2025 16:26:06 +0100 Subject: [PATCH 28/29] Get all Sqlite connections in the pool (#101) --- .../lib/src/common/sqlite_database.dart | 8 + .../src/impl/single_connection_database.dart | 8 + .../lib/src/impl/stub_sqlite_database.dart | 8 + .../src/native/database/connection_pool.dart | 70 ++++++++ .../database/native_sqlite_database.dart | 8 + .../sqlite_async/lib/src/web/database.dart | 8 + .../src/web/database/web_sqlite_database.dart | 8 + packages/sqlite_async/test/basic_test.dart | 44 +++++ .../sqlite_async/test/native/basic_test.dart | 152 ++++++++++++++++++ 9 files changed, 314 insertions(+) diff --git a/packages/sqlite_async/lib/src/common/sqlite_database.dart b/packages/sqlite_async/lib/src/common/sqlite_database.dart index 3cb12bb..3201135 100644 --- a/packages/sqlite_async/lib/src/common/sqlite_database.dart +++ b/packages/sqlite_async/lib/src/common/sqlite_database.dart @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { /// /// Use this to access the database in background isolates. IsolateConnectionFactory isolateConnectionFactory(); + + /// Locks all underlying connections making up this database, and gives [block] access to all of them at once. + /// This can be useful to run the same statement on all connections. For instance, + /// ATTACHing a database, that is expected to be available in all connections. + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block); } /// A SQLite database instance. diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart index 4cd3144..7ca4357 100644 --- a/packages/sqlite_async/lib/src/impl/single_connection_database.dart +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -57,4 +57,12 @@ final class SingleConnectionDatabase return connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart index 29db641..ee254f3 100644 --- a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart @@ -64,4 +64,12 @@ class SqliteDatabaseImpl Future getAutoCommit() { throw UnimplementedError(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + throw UnimplementedError(); + } } diff --git a/packages/sqlite_async/lib/src/native/database/connection_pool.dart b/packages/sqlite_async/lib/src/native/database/connection_pool.dart index 9521b34..8dab27e 100644 --- a/packages/sqlite_async/lib/src/native/database/connection_pool.dart +++ b/packages/sqlite_async/lib/src/native/database/connection_pool.dart @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final MutexImpl mutex; + int _runningWithAllConnectionsCount = 0; + @override bool closed = false; @@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } + if (_availableReadConnections.isEmpty && + _runningWithAllConnectionsCount > 0) { + // Wait until [withAllConnections] is done. Otherwise we could spawn a new + // reader while the user is configuring all the connections, + // e.g. a global open factory configuration shared across all connections. + return; + } + var nextItem = _queue.removeFirst(); while (nextItem.completer.isCompleted) { // This item already timed out - try the next one if available @@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await connection.refreshSchema(); } } + + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) async { + try { + _runningWithAllConnectionsCount++; + + final blockCompleter = Completer(); + final (write, reads) = await _lockAllConns(blockCompleter); + + try { + final res = await block(write, reads); + blockCompleter.complete(res); + return res; + } catch (e, st) { + blockCompleter.completeError(e, st); + rethrow; + } + } finally { + _runningWithAllConnectionsCount--; + + // Continue processing any pending read requests that may have been queued while + // the block was running. + Timer.run(_nextRead); + } + } + + /// Locks all connections, returning the acquired contexts. + /// We pass a completer that would be called after the locks are taken. + Future<(SqliteWriteContext, List)> _lockAllConns( + Completer lockCompleter) async { + final List> readLockedCompleters = []; + final Completer writeLockedCompleter = Completer(); + + // Take the write lock + writeLock((ctx) { + writeLockedCompleter.complete(ctx); + return lockCompleter.future; + }); + + // Take all the read locks + for (final readConn in _allReadConnections) { + final completer = Completer(); + readLockedCompleters.add(completer); + + readConn.readLock((ctx) { + completer.complete(ctx); + return lockCompleter.future; + }); + } + + // Wait after all locks are taken + final [writer as SqliteWriteContext, ...readers] = await Future.wait([ + writeLockedCompleter.future, + ...readLockedCompleters.map((e) => e.future) + ]); + + return (writer, readers); + } } typedef ReadCallback = Future Function(SqliteReadContext tx); diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 7bea111..22cacf3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -171,4 +171,12 @@ class SqliteDatabaseImpl Future refreshSchema() { return _pool.refreshSchema(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return _pool.withAllConnections(block); + } } diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cfaf987..f2dc998 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -171,6 +171,14 @@ class WebDatabase await isInitialized; return _database.fileSystem.flush(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(this, [])); + } } final class _UnscopedContext extends UnscopedContext { diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index c6d1b75..69f01ab 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -178,4 +178,12 @@ class SqliteDatabaseImpl Future exposeEndpoint() async { return await _connection.exposeEndpoint(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(_connection, [])); + } } diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index 6a315da..e2914b3 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm'); +const _isWeb = identical(0, 0.0) || _isDart2Wasm; void main() { group('Shared Basic Tests', () { @@ -301,6 +302,49 @@ void main() { 'Web locks are managed with a shared worker, which does not support timeouts', ) }); + + test('with all connections', () async { + final maxReaders = _isWeb ? 0 : 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + // Warm up to spawn the max readers + await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + final parentZone = Zone.current; + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + // Need a root zone here to avoid recursive lock errors. + readsCalledWhileWithAllConnsRunning = + Future(parentZone.bindCallback(() async { + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + await Future.delayed(const Duration(milliseconds: 100)); + }); + }), + ); + })); + + await Future.delayed(const Duration(milliseconds: 200)); + finishedWithAllConns = true; + }); + + await readsCalledWhileWithAllConnsRunning; + }); }); } diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index dec1fed..3f348e6 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -2,12 +2,16 @@ library; import 'dart:async'; +import 'dart:io'; import 'dart:math'; +import 'package:collection/collection.dart'; +import 'package:path/path.dart' show join; import 'package:sqlite3/common.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; +import '../utils/abstract_test_utils.dart'; import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); @@ -100,6 +104,126 @@ void main() { print("${DateTime.now()} done"); }); + test('prevent opening new readers while in withAllConnections', () async { + final sharedStateDir = Directory.systemTemp.createTempSync(); + addTearDown(() => sharedStateDir.deleteSync(recursive: true)); + + final File sharedStateFile = + File(join(sharedStateDir.path, 'shared-state.txt')); + + sharedStateFile.writeAsStringSync('initial'); + + final db = SqliteDatabase.withFactory( + _TestSqliteOpenFactoryWithSharedStateFile( + path: path, sharedStateFilePath: sharedStateFile.path), + maxReaders: 3); + await db.initialize(); + await createTables(db); + + // The writer saw 'initial' in the file when opening the connection + expect( + await db + .writeLock((c) => c.get('SELECT file_contents_on_open() AS state')), + {'state': 'initial'}, + ); + + final withAllConnectionsCompleter = Completer(); + + final withAllConnsFut = db.withAllConnections((writer, readers) async { + expect(readers.length, 0); // No readers yet + + // Simulate some work until the file is updated + await Future.delayed(const Duration(milliseconds: 200)); + sharedStateFile.writeAsStringSync('updated'); + + await withAllConnectionsCompleter.future; + }); + + // Start a reader that gets the contents of the shared file + bool readFinished = false; + final someReadFut = + db.get('SELECT file_contents_on_open() AS state', []).then((r) { + readFinished = true; + return r; + }); + + // The withAllConnections should prevent the reader from opening + await Future.delayed(const Duration(milliseconds: 100)); + expect(readFinished, isFalse); + + // Free all the locks + withAllConnectionsCompleter.complete(); + await withAllConnsFut; + + final readerInfo = await someReadFut; + expect(readFinished, isTrue); + // The read should see the updated value in the file. This checks + // that a reader doesn't spawn while running withAllConnections + expect(readerInfo, {'state': 'updated'}); + }); + + test('with all connections', () async { + final maxReaders = 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + Future readWithRandomDelay( + SqliteReadContext ctx, int id) async { + return await ctx.get( + 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection', + [id, 5 + Random().nextInt(10)]); + } + + // Warm up to spawn the max readers + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)), + ); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + print("${DateTime.now()} start"); + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + readsCalledWhileWithAllConnsRunning = Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + final r = await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + return await readWithRandomDelay(c, i); + }); + print( + "${DateTime.now()} After withAllConnections, started while running $r"); + }), + ); + + await Future.wait([ + writer.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + 123, + 5 + Random().nextInt(20) + ]).then((value) => + print("${DateTime.now()} withAllConnections writer done $value")), + ...readers + .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) { + print( + "${DateTime.now()} withAllConnections readers done $results"); + })) + ]); + }).then((_) => finishedWithAllConns = true); + + await readsCalledWhileWithAllConnsRunning; + }); + test('read-only transactions', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); @@ -379,3 +503,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory { ]; } } + +class _TestSqliteOpenFactoryWithSharedStateFile + extends TestDefaultSqliteOpenFactory { + final String sharedStateFilePath; + + _TestSqliteOpenFactoryWithSharedStateFile( + {required super.path, required this.sharedStateFilePath}); + + @override + sqlite.CommonDatabase open(SqliteOpenOptions options) { + final File sharedStateFile = File(sharedStateFilePath); + final String sharedState = sharedStateFile.readAsStringSync(); + + final db = super.open(options); + + // Function to return the contents of the shared state file at the time of opening + // so that we know at which point the factory was called. + db.createFunction( + functionName: 'file_contents_on_open', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + return sharedState; + }, + ); + + return db; + } +} From 21df6da19d3e69762047a638c1e6fef8b710e713 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 13 Oct 2025 17:25:46 +0200 Subject: [PATCH 29/29] chore(release): publish packages - sqlite_async@0.12.2 - drift_sqlite_async@0.2.5 --- CHANGELOG.md | 26 ++++++++++++++++++++++++ packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ packages/drift_sqlite_async/pubspec.yaml | 4 ++-- packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 821f8db..5181152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,32 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-10-13 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.2`](#sqlite_async---v0122) + - [`drift_sqlite_async` - `v0.2.5`](#drift_sqlite_async---v025) + +--- + +#### `sqlite_async` - `v0.12.2` + + - Add `withAllConnections` method to run statements on all connections in the pool. + +#### `drift_sqlite_async` - `v0.2.5` + + - Allow customizing update notifications from `sqlite_async`. + + ## 2025-08-08 ### Changes diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index 8dd5e5a..bd36dd1 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.5 + + - Allow customizing update notifications from `sqlite_async`. + ## 0.2.4 - Allow transforming table updates from sqlite_async. diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 06e2963..bd2b701 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.4 +version: 0.2.5 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.12.0 + sqlite_async: ^0.12.2 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 342adca..1bbc939 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.2 + + - Add `withAllConnections` method to run statements on all connections in the pool. + ## 0.12.1 - Fix distributing updates from shared worker. diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 4b45d57..3d5130d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.12.1 +version: 0.12.2 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0"