From 4b5607625b1d3f37d6f1610ec562f7be2a79ec86 Mon Sep 17 00:00:00 2001 From: Jorge Sardina Date: Fri, 20 Jun 2025 12:52:04 +0200 Subject: [PATCH 01/20] Allow transforming table updates from sqlite_async. --- packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ .../lib/src/connection.dart | 20 ++++++++++++++----- packages/drift_sqlite_async/pubspec.yaml | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index b101ac4..fa769a1 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.3 + +- Allow transforming table updates from sqlite_async. + ## 0.2.2 - Fix write detection when using UPDATE/INSERT/DELETE with RETURNING in raw queries. diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index a1af55a..bcde1bf 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,12 +15,22 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; - SqliteAsyncDriftConnection(SqliteConnection db, {bool logStatements = false}) - : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { + SqliteAsyncDriftConnection( + SqliteConnection db, { + bool logStatements = false, + Set Function(UpdateNotification)? transformTableUpdate, + }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { - var setUpdates = {}; - for (var tableName in event.tables) { - setUpdates.add(TableUpdate(tableName)); + final Set setUpdates; + // This is useful to map local table names from PowerSync that are backed by a view name + // which is the entity that the user interacts with. + if (transformTableUpdate != null) { + setUpdates = transformTableUpdate(event); + } else { + setUpdates = {}; + for (var tableName in event.tables) { + setUpdates.add(TableUpdate(tableName)); + } } super.streamQueries.handleTableUpdates(setUpdates); }); diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 62a64da..c17e833 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.2 +version: 0.2.3 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. From a2b6c4cd2226f867569b03d25318b87d6aea6fde Mon Sep 17 00:00:00 2001 From: David Martos Date: Sun, 22 Jun 2025 12:35:16 +0200 Subject: [PATCH 02/20] typo --- packages/drift_sqlite_async/lib/src/connection.dart | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index bcde1bf..fa56e2e 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -18,14 +18,14 @@ class SqliteAsyncDriftConnection extends DatabaseConnection { SqliteAsyncDriftConnection( SqliteConnection db, { bool logStatements = false, - Set Function(UpdateNotification)? transformTableUpdate, + Set Function(UpdateNotification)? transformTableUpdates, }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { final Set setUpdates; // This is useful to map local table names from PowerSync that are backed by a view name // which is the entity that the user interacts with. - if (transformTableUpdate != null) { - setUpdates = transformTableUpdate(event); + if (transformTableUpdates != null) { + setUpdates = transformTableUpdates(event); } else { setUpdates = {}; for (var tableName in event.tables) { From 7e0a0c74a89b7481e42a882f2b0fdc97122b684c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 10:30:30 +0200 Subject: [PATCH 03/20] Refactor update streams on web --- .../lib/src/utils/shared_utils.dart | 69 +++++++ .../sqlite_async/lib/src/web/database.dart | 8 +- .../sqlite_async/lib/src/web/protocol.dart | 2 + .../lib/src/web/update_notifications.dart | 57 ++++++ .../lib/src/web/web_sqlite_open_factory.dart | 14 +- .../web/worker/throttled_common_database.dart | 191 ------------------ .../lib/src/web/worker/worker_utils.dart | 48 ++++- packages/sqlite_async/lib/web.dart | 25 ++- 8 files changed, 205 insertions(+), 209 deletions(-) create mode 100644 packages/sqlite_async/lib/src/web/update_notifications.dart delete mode 100644 packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 9faf928..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -1,6 +1,8 @@ import 'dart:async'; import 'dart:convert'; +import 'package:sqlite3/common.dart'; + import '../sqlite_connection.dart'; Future internalReadTransaction(SqliteReadContext ctx, @@ -75,3 +77,70 @@ Object? mapParameter(Object? parameter) { List mapParameters(List parameters) { return [for (var p in parameters) mapParameter(p)]; } + +extension ThrottledUpdates on CommonDatabase { + /// Wraps [updatesSync] to: + /// + /// - Not fire in transactions. + /// - Fire asynchronously. + /// - Only report table names, which are buffered to avoid duplicates. + Stream> get throttledUpdatedTables { + StreamController>? controller; + var pendingUpdates = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (pendingUpdates.isNotEmpty) { + controller!.add(pendingUpdates); + pendingUpdates = {}; + } + } + + void collectUpdate(SqliteUpdate event) { + pendingUpdates.add(event.tableName); + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = commits.listen((_) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 3e0797b..cfaf987 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -21,6 +21,9 @@ class WebDatabase final Mutex? _mutex; final bool profileQueries; + @override + final Stream updates; + /// For persistent databases that aren't backed by a shared worker, we use /// web broadcast channels to forward local update events to other tabs. final BroadcastUpdates? broadcastUpdates; @@ -32,6 +35,7 @@ class WebDatabase this._database, this._mutex, { required this.profileQueries, + required this.updates, this.broadcastUpdates, }); @@ -113,10 +117,6 @@ class WebDatabase } } - @override - Stream get updates => - _database.updates.map((event) => UpdateNotification({event.tableName})); - @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index cb3a5fd..d17c06b 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -13,6 +13,8 @@ enum CustomDatabaseMessageKind { getAutoCommit, executeInTransaction, executeBatchInTransaction, + updateSubscriptionManagement, + notifyUpdates, } extension type CustomDatabaseMessage._raw(JSObject _) implements JSObject { diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart new file mode 100644 index 0000000..ecea60d --- /dev/null +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -0,0 +1,57 @@ +import 'dart:async'; +import 'dart:js_interop'; + +import 'package:sqlite3_web/sqlite3_web.dart'; + +import '../update_notification.dart'; +import 'protocol.dart'; + +/// Utility to request a stream of update notifications from the worker. +/// +/// Because we want to debounce update notifications on the worker, we're using +/// custom requests instead of the default [Database.updates] stream. +/// +/// Clients send a message to the worker to subscribe or unsubscribe, providing +/// an id for the subscription. The worker distributes update notifications with +/// custom requests to the client, which [handleRequest] distributes to the +/// original streams. +final class UpdateNotificationStreams { + var _idCounter = 0; + final Map> _updates = {}; + + Future handleRequest(JSAny? request) async { + final customRequest = request as CustomDatabaseMessage; + if (customRequest.kind == CustomDatabaseMessageKind.notifyUpdates) { + final notification = UpdateNotification(customRequest.rawParameters.toDart + .map((e) => (e as JSString).toDart) + .toSet()); + + _updates[customRequest.rawSql.toDart]?.add(notification); + } + + return null; + } + + Stream updatesFor(Database database) { + final id = (_idCounter++).toString(); + final controller = _updates[id] = StreamController(); + + controller + ..onListen = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [true], + )); + } + ..onCancel = () { + database.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.updateSubscriptionManagement, + id, + [false], + )); + }; + + return controller.stream; + } +} diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index a724329..205734f 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -69,15 +69,19 @@ class DefaultSqliteOpenFactory ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier - BroadcastUpdates? updates; + BroadcastUpdates? broadcastUpdates; if (connection.access != AccessMode.throughSharedWorker && connection.storage != StorageMode.inMemory) { - updates = BroadcastUpdates(path); + broadcastUpdates = BroadcastUpdates(path); } - return WebDatabase(connection.database, options.mutex ?? mutex, - broadcastUpdates: updates, - profileQueries: sqliteOptions.profileQueries); + return WebDatabase( + connection.database, + options.mutex ?? mutex, + broadcastUpdates: broadcastUpdates, + profileQueries: sqliteOptions.profileQueries, + updates: updatesFor(connection.database), + ); } @override diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart deleted file mode 100644 index 8a0e901..0000000 --- a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart +++ /dev/null @@ -1,191 +0,0 @@ -import 'dart:async'; - -import 'package:sqlite_async/sqlite3_wasm.dart'; - -/// Wrap a CommonDatabase to throttle its updates stream. -/// This is so that we can throttle the updates _within_ -/// the worker process, avoiding mass notifications over -/// the MessagePort. -class ThrottledCommonDatabase extends CommonDatabase { - final CommonDatabase _db; - final StreamController _transactionController = - StreamController.broadcast(); - - ThrottledCommonDatabase(this._db); - - @override - int get userVersion => _db.userVersion; - - @override - set userVersion(int userVersion) { - _db.userVersion = userVersion; - } - - @override - bool get autocommit => _db.autocommit; - - @override - DatabaseConfig get config => _db.config; - - @override - void createAggregateFunction({ - required String functionName, - required AggregateFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createAggregateFunction(functionName: functionName, function: function); - } - - @override - void createCollation( - {required String name, required CollatingFunction function}) { - _db.createCollation(name: name, function: function); - } - - @override - void createFunction({ - required String functionName, - required ScalarFunction function, - AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), - bool deterministic = false, - bool directOnly = true, - bool subtype = false, - }) { - _db.createFunction(functionName: functionName, function: function); - } - - @override - void dispose() { - _db.dispose(); - } - - @override - void execute(String sql, [List parameters = const []]) { - _db.execute(sql, parameters); - } - - @override - int getUpdatedRows() { - // ignore: deprecated_member_use - return _db.getUpdatedRows(); - } - - @override - int get lastInsertRowId => _db.lastInsertRowId; - - @override - CommonPreparedStatement prepare(String sql, - {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { - return _db.prepare(sql, - persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); - } - - @override - List prepareMultiple(String sql, - {bool persistent = false, bool vtab = true}) { - return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); - } - - @override - ResultSet select(String sql, [List parameters = const []]) { - bool preAutocommit = _db.autocommit; - final result = _db.select(sql, parameters); - bool postAutocommit = _db.autocommit; - if (!preAutocommit && postAutocommit) { - _transactionController.add(true); - } - return result; - } - - @override - int get updatedRows => _db.updatedRows; - - @override - Stream get updates { - return throttledUpdates(_db, _transactionController.stream); - } - - @override - VoidPredicate? get commitFilter => _db.commitFilter; - - @override - set commitFilter(VoidPredicate? filter) => _db.commitFilter = filter; - - @override - Stream get commits => _db.commits; - - @override - Stream get rollbacks => _db.rollbacks; -} - -/// This throttles the database update stream to: -/// 1. Trigger max once every 1ms. -/// 2. Only trigger _after_ transactions. -Stream throttledUpdates( - CommonDatabase source, Stream transactionStream) { - StreamController? controller; - Set pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; - } - - if (!source.autocommit) { - // Inside a transaction - do not fire updates - return; - } - - if (pendingUpdates.isNotEmpty) { - for (var update in pendingUpdates) { - controller!.add(update); - } - - pendingUpdates.clear(); - } - } - - void collectUpdate(SqliteUpdate event) { - // We merge updates with the same kind and tableName. - // rowId is never used in sqlite_async. - pendingUpdates.add(SqliteUpdate(event.kind, event.tableName, 0)); - - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - } - - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = transactionStream.listen((event) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 3ecb257..7603306 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:js_interop'; import 'dart:js_interop_unsafe'; @@ -6,8 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; - -import 'throttled_common_database.dart'; +import 'package:sqlite_async/src/utils/database_utils.dart'; import '../protocol.dart'; @@ -22,13 +22,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - final throttled = ThrottledCommonDatabase(db); - - return AsyncSqliteDatabase(database: throttled); + return AsyncSqliteDatabase(database: db); } - /// Opens a database with the `sqlite3` package that will be wrapped in a - /// [ThrottledCommonDatabase] for [openDatabase]. @visibleForOverriding CommonDatabase openUnderlying( WasmSqlite3 sqlite3, @@ -51,6 +47,7 @@ base class AsyncSqliteController extends DatabaseController { class AsyncSqliteDatabase extends WorkerDatabase { @override final CommonDatabase database; + final Stream> _updates; // This mutex is only used for lock requests from clients. Clients only send // these requests for shared workers, so we can assume each database is only @@ -58,7 +55,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final mutex = ReadWriteMutex(); final Map _state = {}; - AsyncSqliteDatabase({required this.database}); + AsyncSqliteDatabase({required this.database}) + : _updates = database.throttledUpdatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -67,9 +65,15 @@ class AsyncSqliteDatabase extends WorkerDatabase { void _markHoldsMutex(ClientConnection connection) { final state = _findState(connection); state.holdsMutex = true; + _registerCloseListener(state, connection); + } + + void _registerCloseListener( + _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { state.hasOnCloseListener = true; connection.closed.then((_) { + state.unsubscribeUpdates(); if (state.holdsMutex) { mutex.release(); } @@ -93,6 +97,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { _findState(connection).holdsMutex = false; mutex.release(); case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; @@ -129,6 +134,25 @@ class AsyncSqliteDatabase extends WorkerDatabase { "Transaction rolled back by earlier statement. Cannot execute: $sql"); } database.execute(sql, parameters); + case CustomDatabaseMessageKind.updateSubscriptionManagement: + final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final id = message.rawSql.toDart; + final state = _findState(connection); + + if (shouldSubscribe) { + state.unsubscribeUpdates(); + _registerCloseListener(state, connection); + + state.updatesNotification = _updates.listen((tables) { + connection.customRequest(CustomDatabaseMessage( + CustomDatabaseMessageKind.notifyUpdates, + id, + tables.toList(), + )); + }); + } else { + state.unsubscribeUpdates(); + } } return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); @@ -148,4 +172,12 @@ class AsyncSqliteDatabase extends WorkerDatabase { final class _ConnectionState { bool hasOnCloseListener = false; bool holdsMutex = false; + StreamSubscription>? updatesNotification; + + void unsubscribeUpdates() { + if (updatesNotification case final active?) { + updatesNotification = null; + active.cancel(); + } + } } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index 3a65115..f151b81 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -4,12 +4,15 @@ /// workers. library; +import 'dart:js_interop'; + import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:web/web.dart'; import 'sqlite3_common.dart'; import 'sqlite_async.dart'; import 'src/web/database.dart'; +import 'src/web/update_notifications.dart'; /// An endpoint that can be used, by any running JavaScript context in the same /// website, to connect to an existing [WebSqliteConnection]. @@ -33,6 +36,13 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { + final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + + /// Handles a custom request sent from the worker to the client. + Future handleCustomRequest(JSAny? request) { + return _updateStreams.handleRequest(request); + } + /// Opens a [WebSqlite] instance for the given [options]. /// /// This method can be overriden in scenarios where the way [WebSqlite] is @@ -43,6 +53,7 @@ abstract mixin class WebSqliteOpenFactory return WebSqlite.open( worker: Uri.parse(options.workerUri), wasmModule: Uri.parse(options.wasmUri), + handleCustomRequest: handleCustomRequest, ); } @@ -54,6 +65,14 @@ abstract mixin class WebSqliteOpenFactory WebSqlite sqlite, String name) { return sqlite.connectToRecommended(name); } + + /// Obtains a stream of [UpdateNotification]s from a [database]. + /// + /// The default implementation uses custom requests to allow workers to + /// debounce the stream on their side to avoid messages where possible. + Stream updatesFor(Database database) { + return _updateStreams.updatesFor(database); + } } /// A [SqliteConnection] interface implemented by opened connections when @@ -85,8 +104,11 @@ abstract class WebSqliteConnection implements SqliteConnection { /// contexts to exchange opened database connections. static Future connectToEndpoint( WebDatabaseEndpoint endpoint) async { + final updates = UpdateNotificationStreams(); final rawSqlite = await WebSqlite.connectToPort( - (endpoint.connectPort, endpoint.connectName)); + (endpoint.connectPort, endpoint.connectName), + handleCustomRequest: updates.handleRequest, + ); final database = WebDatabase( rawSqlite, @@ -95,6 +117,7 @@ abstract class WebSqliteConnection implements SqliteConnection { null => null, }, profileQueries: false, + updates: updates.updatesFor(rawSqlite), ); return database; } From 64260569f6491a137dd680d7df62938befef48e0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:02:24 +0200 Subject: [PATCH 04/20] Fix tests --- .../native_sqlite_connection_impl.dart | 43 +++---------------- .../lib/src/utils/shared_utils.dart | 1 + .../lib/src/web/update_notifications.dart | 5 ++- .../lib/src/web/web_sqlite_open_factory.dart | 11 ++--- packages/sqlite_async/lib/web.dart | 4 +- scripts/sqlite3_wasm_download.dart | 2 +- 6 files changed, 19 insertions(+), 47 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index e2df0f3..301d0af 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,40 +303,13 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - Timer? updateDebouncer; - Set updatedTables = {}; + db.throttledUpdatedTables.listen((changedTables) { + client.fire(UpdateNotification(changedTables)); + }); + int? txId; Object? txError; - void maybeFireUpdates() { - // We keep buffering the set of updated tables until we are not - // in a transaction. Firing transactions inside a transaction - // has multiple issues: - // 1. Watched queries would detect changes to the underlying tables, - // but the data would not be visible to queries yet. - // 2. It would trigger many more notifications than required. - // - // This still includes updates for transactions that are rolled back. - // We could handle those better at a later stage. - - if (updatedTables.isNotEmpty && db.autocommit) { - client.fire(UpdateNotification(updatedTables)); - updatedTables.clear(); - } - updateDebouncer?.cancel(); - updateDebouncer = null; - } - - db.updates.listen((event) { - updatedTables.add(event.tableName); - - // This handles two cases: - // 1. Update arrived after _SqliteIsolateClose (not sure if this could happen). - // 2. Long-running _SqliteIsolateClosure that should fire updates while running. - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); - }); - ResultSet runStatement(_SqliteIsolateStatement data) { if (data.sql == 'BEGIN' || data.sql == 'BEGIN IMMEDIATE') { if (txId != null) { @@ -388,8 +361,6 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, throw sqlite.SqliteException( 0, 'Transaction must be closed within the read or write lock'); } - // We would likely have received updates by this point - fire now. - maybeFireUpdates(); return null; case _SqliteIsolateStatement(): return task.timeSync( @@ -399,11 +370,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, parameters: data.args, ); case _SqliteIsolateClosure(): - try { - return await data.cb(db); - } finally { - maybeFireUpdates(); - } + return await data.cb(db); case _SqliteIsolateConnectionClose(): db.dispose(); return null; diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..0e9c9d6 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,6 +123,7 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { + print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/lib/src/web/update_notifications.dart b/packages/sqlite_async/lib/src/web/update_notifications.dart index ecea60d..f04a785 100644 --- a/packages/sqlite_async/lib/src/web/update_notifications.dart +++ b/packages/sqlite_async/lib/src/web/update_notifications.dart @@ -26,7 +26,8 @@ final class UpdateNotificationStreams { .map((e) => (e as JSString).toDart) .toSet()); - _updates[customRequest.rawSql.toDart]?.add(notification); + final controller = _updates[customRequest.rawSql.toDart]; + controller?.add(notification); } return null; @@ -50,6 +51,8 @@ final class UpdateNotificationStreams { id, [false], )); + + _updates.remove(id); }; return controller.stream; diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 205734f..513b1f2 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -10,7 +10,7 @@ import 'package:sqlite_async/web.dart'; import 'database.dart'; import 'worker/worker_utils.dart'; -Map> webSQLiteImplementations = {}; +Map> _webSQLiteImplementations = {}; /// Web implementation of [AbstractDefaultSqliteOpenFactory] class DefaultSqliteOpenFactory @@ -20,13 +20,13 @@ class DefaultSqliteOpenFactory final cacheKey = sqliteOptions.webSqliteOptions.wasmUri + sqliteOptions.webSqliteOptions.workerUri; - if (webSQLiteImplementations.containsKey(cacheKey)) { - return webSQLiteImplementations[cacheKey]!; + if (_webSQLiteImplementations.containsKey(cacheKey)) { + return _webSQLiteImplementations[cacheKey]!; } - webSQLiteImplementations[cacheKey] = + _webSQLiteImplementations[cacheKey] = openWebSqlite(sqliteOptions.webSqliteOptions); - return webSQLiteImplementations[cacheKey]!; + return _webSQLiteImplementations[cacheKey]!; }); DefaultSqliteOpenFactory( @@ -42,6 +42,7 @@ class DefaultSqliteOpenFactory wasmModule: Uri.parse(sqliteOptions.webSqliteOptions.wasmUri), worker: Uri.parse(sqliteOptions.webSqliteOptions.workerUri), controller: AsyncSqliteController(), + handleCustomRequest: handleCustomRequest, ); } diff --git a/packages/sqlite_async/lib/web.dart b/packages/sqlite_async/lib/web.dart index f151b81..e318697 100644 --- a/packages/sqlite_async/lib/web.dart +++ b/packages/sqlite_async/lib/web.dart @@ -29,6 +29,8 @@ typedef WebDatabaseEndpoint = ({ String? lockName, }); +final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); + /// An additional interface for [SqliteOpenFactory] exposing additional /// functionality that is only relevant when compiling to the web. /// @@ -36,8 +38,6 @@ typedef WebDatabaseEndpoint = ({ /// compiling for the web. abstract mixin class WebSqliteOpenFactory implements SqliteOpenFactory { - final UpdateNotificationStreams _updateStreams = UpdateNotificationStreams(); - /// Handles a custom request sent from the worker to the client. Future handleCustomRequest(JSAny? request) { return _updateStreams.handleRequest(request); diff --git a/scripts/sqlite3_wasm_download.dart b/scripts/sqlite3_wasm_download.dart index 62acbbe..9716eee 100644 --- a/scripts/sqlite3_wasm_download.dart +++ b/scripts/sqlite3_wasm_download.dart @@ -4,7 +4,7 @@ library; import 'dart:io'; final sqliteUrl = - 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.4.3/sqlite3.wasm'; + 'https://github.com/simolus3/sqlite3.dart/releases/download/sqlite3-2.8.0/sqlite3.wasm'; void main() async { // Create assets directory if it doesn't exist From bc66337140941e8e57dab988a2339b70c8a02273 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 12:18:58 +0200 Subject: [PATCH 05/20] Remove debug print --- packages/sqlite_async/lib/src/utils/shared_utils.dart | 1 - packages/sqlite_async/pubspec.yaml | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index 0e9c9d6..ff291f4 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -123,7 +123,6 @@ extension ThrottledUpdates on CommonDatabase { controller = StreamController(onListen: () { txSubscription = commits.listen((_) { - print('did commit'); maybeFireUpdates(); }, onError: (error) { controller?.addError(error); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index a155f2b..7d7c1c3 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.5.0 <4.0.0" + sdk: ">=3.6.0 <4.0.0" topics: - sqlite @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.8.0 - sqlite3_web: ^0.3.0 + sqlite3: ^2.9.0 + sqlite3_web: ^0.3.1 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From ed46766541cc90a5c2f459772665deacba7f988b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 14:39:11 +0200 Subject: [PATCH 06/20] Update sqlite3_web as well --- packages/sqlite_async/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 7d7c1c3..586bcc6 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -13,7 +13,7 @@ topics: dependencies: sqlite3: ^2.9.0 - sqlite3_web: ^0.3.1 + sqlite3_web: ^0.3.2 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From e0a1a3c0424892aeed965814d4538ac75fc07de0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 7 Aug 2025 15:44:28 +0200 Subject: [PATCH 07/20] Fix import --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 7603306..265f4f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -7,7 +7,7 @@ import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; import 'package:sqlite3_web/protocol_utils.dart' as proto; -import 'package:sqlite_async/src/utils/database_utils.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import '../protocol.dart'; From 6f4c6a065c8da6805e633d92fa9d75cbb09bb0a1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 10:49:36 +0200 Subject: [PATCH 08/20] Avoid updating minimum SDK constraint --- packages/sqlite_async/lib/src/web/worker/worker_utils.dart | 3 ++- packages/sqlite_async/pubspec.yaml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 265f4f7..0c3e8f7 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -135,7 +135,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { } database.execute(sql, parameters); case CustomDatabaseMessageKind.updateSubscriptionManagement: - final shouldSubscribe = (message.rawParameters[0] as JSBoolean).toDart; + final shouldSubscribe = + (message.rawParameters.toDart[0] as JSBoolean).toDart; final id = message.rawSql.toDart; final state = _findState(connection); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 586bcc6..464bd2d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -3,7 +3,7 @@ description: High-performance asynchronous interface for SQLite on Dart and Flut version: 0.11.8 repository: https://github.com/powersync-ja/sqlite_async.dart environment: - sdk: ">=3.6.0 <4.0.0" + sdk: ">=3.5.0 <4.0.0" topics: - sqlite From eaab81ea6aacf8e8ac3da08f1d32cb6db91bdf6a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 11:18:17 +0200 Subject: [PATCH 09/20] chore(release): publish packages - sqlite_async@0.12.0 - drift_sqlite_async@0.2.3+1 --- CHANGELOG.md | 28 ++++++++++++++++++++++++ packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ packages/drift_sqlite_async/pubspec.yaml | 4 ++-- packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 5 files changed, 39 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cafde71..93a8b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,34 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-08-08 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) + - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) + +Packages with dependency updates only: + +> Packages listed below depend on other packages in this workspace that have had changes. Their versions have been incremented to bump the minimum dependency versions of the packages they depend upon in this project. + + - `drift_sqlite_async` - `v0.2.3+1` + +--- + +#### `sqlite_async` - `v0.12.0` + + - Avoid large transactions creating a large internal update queue. + + ## 2025-07-29 --- diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index e70cd40..7c381da 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.3+1 + + - Update a dependency to the latest release. + ## 0.2.3 - Support nested transactions. diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 77f250d..b865809 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.3 +version: 0.2.3+1 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.11.8 + sqlite_async: ^0.12.0 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 387c944..8cda2a6 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.0 + + - Avoid large transactions creating a large internal update queue. + ## 0.11.8 - Support nested transactions (emulated with `SAVEPOINT` statements). diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 464bd2d..6e62e52 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.11.8 +version: 0.12.0 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" From 0e2fa41567e96f181cd43d5bb25a3321c941c78f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:39:36 +0200 Subject: [PATCH 10/20] Make table updates a multi-subscription stream --- .../native_sqlite_connection_impl.dart | 2 +- .../lib/src/utils/shared_utils.dart | 123 ++++++++++-------- .../lib/src/web/worker/worker_utils.dart | 3 +- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 301d0af..e90739e 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,7 +303,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - db.throttledUpdatedTables.listen((changedTables) { + db.updatedTables.listen((changedTables) { client.fire(UpdateNotification(changedTables)); }); diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..ad9a2f0 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -79,68 +79,87 @@ List mapParameters(List parameters) { } extension ThrottledUpdates on CommonDatabase { - /// Wraps [updatesSync] to: + /// An unthrottled stream of updated tables that emits on every commit. /// - /// - Not fire in transactions. - /// - Fire asynchronously. - /// - Only report table names, which are buffered to avoid duplicates. - Stream> get throttledUpdatedTables { - StreamController>? controller; - var pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; + /// A paused subscription on this stream will buffer changed tables into a + /// growing set instead of losing events, so this stream is simple to throttle + /// downstream. + Stream> get updatedTables { + final listeners = <_UpdateListener>[]; + var uncommitedUpdates = {}; + var underlyingSubscriptions = >[]; + + void handleUpdate(SqliteUpdate update) { + uncommitedUpdates.add(update.tableName); + } + + void afterCommit() { + for (final listener in listeners) { + listener.notify(uncommitedUpdates); } - if (!autocommit) { - // Inside a transaction - do not fire updates - return; + uncommitedUpdates.clear(); + } + + void afterRollback() { + uncommitedUpdates.clear(); + } + + void addListener(_UpdateListener listener) { + listeners.add(listener); + + if (listeners.length == 1) { + // First listener, start listening for raw updates on underlying + // database. + underlyingSubscriptions = [ + updatesSync.listen(handleUpdate), + commits.listen((_) => afterCommit()), + commits.listen((_) => afterRollback()) + ]; } + } - if (pendingUpdates.isNotEmpty) { - controller!.add(pendingUpdates); - pendingUpdates = {}; + void removeListener(_UpdateListener listener) { + listeners.remove(listener); + if (listeners.isEmpty) { + for (final sub in underlyingSubscriptions) { + sub.cancel(); + } } } - void collectUpdate(SqliteUpdate event) { - pendingUpdates.add(event.tableName); + return Stream.multi( + (listener) { + final wrapped = _UpdateListener(listener); + addListener(wrapped); - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); + listener.onCancel = () => removeListener(wrapped); + }, + isBroadcast: true, + ); + } +} + +class _UpdateListener { + final MultiStreamController> downstream; + Set buffered = {}; + + _UpdateListener(this.downstream); + + void notify(Set pendingUpdates) { + buffered.addAll(pendingUpdates); + if (!downstream.isPaused) { + downstream.add(buffered); + buffered = {}; } + } +} - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = commits.listen((_) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; +extension StreamUtils on Stream { + Stream pauseAfterEvent(Duration duration) async* { + await for (final event in this) { + yield event; + await Future.delayed(duration); + } } } diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 0c3e8f7..a6dae4c 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -56,7 +56,8 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.throttledUpdatedTables; + : _updates = database.updatedTables + .pauseAfterEvent(const Duration(milliseconds: 1)); _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); From e0938c4ef0138ee7b77b5911f54fa44fe8722f58 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:45:53 +0200 Subject: [PATCH 11/20] Support multiple listeners for table updates --- packages/sqlite_async/lib/src/utils/shared_utils.dart | 9 --------- .../sqlite_async/lib/src/web/worker/worker_utils.dart | 10 +++++----- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ad9a2f0..ae38e85 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -154,12 +154,3 @@ class _UpdateListener { } } } - -extension StreamUtils on Stream { - Stream pauseAfterEvent(Duration duration) async* { - await for (final event in this) { - yield event; - await Future.delayed(duration); - } - } -} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index a6dae4c..059c281 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -56,8 +56,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.updatedTables - .pauseAfterEvent(const Duration(milliseconds: 1)); + : _updates = database.updatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -145,12 +144,13 @@ class AsyncSqliteDatabase extends WorkerDatabase { state.unsubscribeUpdates(); _registerCloseListener(state, connection); - state.updatesNotification = _updates.listen((tables) { - connection.customRequest(CustomDatabaseMessage( + late StreamSubscription subscription; + subscription = state.updatesNotification = _updates.listen((tables) { + subscription.pause(connection.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.notifyUpdates, id, tables.toList(), - )); + ))); }); } else { state.unsubscribeUpdates(); From b9c8d2d6a9879eda6e7293ab967c529f1cb9e9af Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:48:40 +0200 Subject: [PATCH 12/20] Prepare release --- CHANGELOG.md | 5 +++++ packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93a8b0d..821f8db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Packages with breaking changes: Packages with other changes: + - [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121) - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) @@ -26,6 +27,10 @@ Packages with dependency updates only: --- +#### `sqlite_async` - `v0.12.1` + +- Fix distributing updates from shared worker. + #### `sqlite_async` - `v0.12.0` - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 8cda2a6..342adca 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.1 + +- Fix distributing updates from shared worker. + ## 0.12.0 - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 6e62e52..4b45d57 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.12.0 +version: 0.12.1 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" From 6705d13335df92b8f548a182e53a097a91c16e37 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 8 Aug 2025 16:57:24 +0200 Subject: [PATCH 13/20] Add tests --- .../lib/src/utils/shared_utils.dart | 7 +++ .../sqlite_async/test/native/watch_test.dart | 46 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ae38e85..542611e 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -133,6 +133,7 @@ extension ThrottledUpdates on CommonDatabase { final wrapped = _UpdateListener(listener); addListener(wrapped); + listener.onResume = wrapped.addPending; listener.onCancel = () => removeListener(wrapped); }, isBroadcast: true, @@ -149,6 +150,12 @@ class _UpdateListener { void notify(Set pendingUpdates) { buffered.addAll(pendingUpdates); if (!downstream.isPaused) { + addPending(); + } + } + + void addPending() { + if (buffered.isNotEmpty) { downstream.add(buffered); buffered = {}; } diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 4e4fb83..0a97b17 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -7,6 +7,7 @@ import 'dart:math'; import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:test/test.dart'; import '../utils/test_utils_impl.dart'; @@ -31,6 +32,51 @@ void main() { return db; }); + test('raw update notifications', () async { + final factory = await testUtils.testFactory(path: path); + final db = factory + .openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + + db.execute('CREATE TABLE a (bar INTEGER);'); + db.execute('CREATE TABLE b (bar INTEGER);'); + final events = >[]; + final subscription = db.updatedTables.listen(events.add); + + db.execute('insert into a default values'); + expect(events, isEmpty); // should be async + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('insert into b default values'); + await pumpEventQueue(); + expect(events, isEmpty); // should only trigger on commit + db.execute('commit'); + + await pumpEventQueue(); + expect(events.removeLast(), {'a', 'b'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('rollback'); + expect(events, isEmpty); + await pumpEventQueue(); + expect(events, isEmpty); // should ignore cancelled transactions + + // Should still listen during pause, and dispatch on resume + subscription.pause(); + db.execute('insert into a default values'); + await pumpEventQueue(); + expect(events, isEmpty); + + subscription.resume(); + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + subscription.pause(); + }); + test('watch in isolate', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); From d76435962555b05f472ae0d2aa2621f0ead38004 Mon Sep 17 00:00:00 2001 From: David Martos Date: Fri, 19 Sep 2025 12:11:17 +0200 Subject: [PATCH 14/20] update doc --- packages/drift_sqlite_async/lib/src/connection.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/drift_sqlite_async/lib/src/connection.dart b/packages/drift_sqlite_async/lib/src/connection.dart index fa56e2e..6c34ed4 100644 --- a/packages/drift_sqlite_async/lib/src/connection.dart +++ b/packages/drift_sqlite_async/lib/src/connection.dart @@ -15,6 +15,8 @@ import 'package:sqlite_async/sqlite_async.dart'; class SqliteAsyncDriftConnection extends DatabaseConnection { late StreamSubscription _updateSubscription; + /// [transformTableUpdates] is useful to map local table names from PowerSync that are backed by a view name + /// which is the entity that the user interacts with. SqliteAsyncDriftConnection( SqliteConnection db, { bool logStatements = false, @@ -22,8 +24,6 @@ class SqliteAsyncDriftConnection extends DatabaseConnection { }) : super(SqliteAsyncQueryExecutor(db, logStatements: logStatements)) { _updateSubscription = (db as SqliteQueries).updates!.listen((event) { final Set setUpdates; - // This is useful to map local table names from PowerSync that are backed by a view name - // which is the entity that the user interacts with. if (transformTableUpdates != null) { setUpdates = transformTableUpdates(event); } else { From 09b841ba9b34a1e309504808e399695239e08c17 Mon Sep 17 00:00:00 2001 From: David Martos Date: Fri, 19 Sep 2025 14:14:10 +0200 Subject: [PATCH 15/20] add test --- .../drift_sqlite_async/test/basic_test.dart | 40 +++++++++++++++++++ .../test/generated/database.dart | 2 + 2 files changed, 42 insertions(+) diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index 339cc04..dd7a159 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -11,6 +11,7 @@ import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; import './utils/test_utils.dart'; +import 'generated/database.dart'; class EmptyDatabase extends GeneratedDatabase { EmptyDatabase(super.executor); @@ -245,4 +246,43 @@ INSERT INTO test_data(description) VALUES('test data'); expect(row, isEmpty); }); }); + + test('transform table updates', () async { + final path = dbPath(); + await cleanDb(path: path); + + final db = await setupDatabase(path: path); + final connection = SqliteAsyncDriftConnection( + db, + // tables with the local_ prefix are mapped to the name without the prefix + transformTableUpdates: (event) { + final updates = {}; + + for (final originalTableName in event.tables) { + final effectiveName = originalTableName.startsWith("local_") + ? originalTableName.substring(6) + : originalTableName; + updates.add(TableUpdate(effectiveName)); + } + + return updates; + }, + ); + + // Create table with a different name than drift. (Mimicking a table name backed by a view in PowerSync with the optional sync strategy) + await db.execute( + 'CREATE TABLE local_todos(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)', + ); + + final dbu = TodoDatabase.fromSqliteAsyncConnection(connection); + + final tableUpdatesFut = + dbu.tableUpdates(TableUpdateQuery.onTableName("todos")).first; + + // This insert will trigger the sqlite_async "updates" stream + await db.execute("INSERT INTO local_todos(description) VALUES('Test 1')"); + + expect(await tableUpdatesFut.timeout(const Duration(seconds: 2)), + {TableUpdate("todos")}); + }); } diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index 928c7dd..b8a5109 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -15,6 +15,8 @@ class TodoItems extends Table { @DriftDatabase(tables: [TodoItems]) class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); + + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); @override int get schemaVersion => 1; From 0a3d31e3b686ae23673f458eef1e6afb8658acf4 Mon Sep 17 00:00:00 2001 From: David Martos Date: Thu, 25 Sep 2025 19:17:54 +0200 Subject: [PATCH 16/20] Update basic_test.dart Remove legacy skip. It now supports nested transactions --- packages/drift_sqlite_async/test/basic_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/drift_sqlite_async/test/basic_test.dart b/packages/drift_sqlite_async/test/basic_test.dart index dd7a159..cac6f55 100644 --- a/packages/drift_sqlite_async/test/basic_test.dart +++ b/packages/drift_sqlite_async/test/basic_test.dart @@ -183,7 +183,7 @@ void main() { {'description': 'Test 1'}, {'description': 'Test 3'} ])); - }, skip: 'sqlite_async does not support nested transactions'); + }); test('Concurrent select', () async { var completer1 = Completer(); From 1252190e3522de429b751ecaadeb301137cb3d68 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 29 Sep 2025 16:07:25 +0200 Subject: [PATCH 17/20] Run CI for pull requests --- .github/workflows/test.yaml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cb1814b..0e0d34b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -3,13 +3,15 @@ name: Test on: push: branches: - - "**" + - "*" + pull_request: jobs: build: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 - name: Install Melos @@ -29,6 +31,7 @@ jobs: test: runs-on: ubuntu-latest + if: github.event_name == 'push' || (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository) strategy: fail-fast: false matrix: @@ -49,7 +52,7 @@ jobs: sqlite_url: "https://www.sqlite.org/2022/sqlite-autoconf-3380000.tar.gz" dart_sdk: stable steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v5 - uses: dart-lang/setup-dart@v1 with: sdk: ${{ matrix.dart_sdk }} From 0234d4fa469702f2709d6933ba199ffac9a19941 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 29 Sep 2025 16:07:40 +0200 Subject: [PATCH 18/20] Format --- packages/drift_sqlite_async/test/generated/database.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/drift_sqlite_async/test/generated/database.dart b/packages/drift_sqlite_async/test/generated/database.dart index b8a5109..4e03600 100644 --- a/packages/drift_sqlite_async/test/generated/database.dart +++ b/packages/drift_sqlite_async/test/generated/database.dart @@ -15,7 +15,7 @@ class TodoItems extends Table { @DriftDatabase(tables: [TodoItems]) class TodoDatabase extends _$TodoDatabase { TodoDatabase(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); - + TodoDatabase.fromSqliteAsyncConnection(SqliteAsyncDriftConnection super.conn); @override From b97be616b5ab5095480cf71848a74d812ee74fac Mon Sep 17 00:00:00 2001 From: David Martos Date: Tue, 7 Oct 2025 16:26:06 +0100 Subject: [PATCH 19/20] Get all Sqlite connections in the pool (#101) --- .../lib/src/common/sqlite_database.dart | 8 + .../src/impl/single_connection_database.dart | 8 + .../lib/src/impl/stub_sqlite_database.dart | 8 + .../src/native/database/connection_pool.dart | 70 ++++++++ .../database/native_sqlite_database.dart | 8 + .../sqlite_async/lib/src/web/database.dart | 8 + .../src/web/database/web_sqlite_database.dart | 8 + packages/sqlite_async/test/basic_test.dart | 44 +++++ .../sqlite_async/test/native/basic_test.dart | 152 ++++++++++++++++++ 9 files changed, 314 insertions(+) diff --git a/packages/sqlite_async/lib/src/common/sqlite_database.dart b/packages/sqlite_async/lib/src/common/sqlite_database.dart index 3cb12bb..3201135 100644 --- a/packages/sqlite_async/lib/src/common/sqlite_database.dart +++ b/packages/sqlite_async/lib/src/common/sqlite_database.dart @@ -39,6 +39,14 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries { /// /// Use this to access the database in background isolates. IsolateConnectionFactory isolateConnectionFactory(); + + /// Locks all underlying connections making up this database, and gives [block] access to all of them at once. + /// This can be useful to run the same statement on all connections. For instance, + /// ATTACHing a database, that is expected to be available in all connections. + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block); } /// A SQLite database instance. diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart index 4cd3144..7ca4357 100644 --- a/packages/sqlite_async/lib/src/impl/single_connection_database.dart +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -57,4 +57,12 @@ final class SingleConnectionDatabase return connection.writeLock(callback, lockTimeout: lockTimeout, debugContext: debugContext); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(connection, [])); + } } diff --git a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart index 29db641..ee254f3 100644 --- a/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/impl/stub_sqlite_database.dart @@ -64,4 +64,12 @@ class SqliteDatabaseImpl Future getAutoCommit() { throw UnimplementedError(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + throw UnimplementedError(); + } } diff --git a/packages/sqlite_async/lib/src/native/database/connection_pool.dart b/packages/sqlite_async/lib/src/native/database/connection_pool.dart index 9521b34..8dab27e 100644 --- a/packages/sqlite_async/lib/src/native/database/connection_pool.dart +++ b/packages/sqlite_async/lib/src/native/database/connection_pool.dart @@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { final MutexImpl mutex; + int _runningWithAllConnectionsCount = 0; + @override bool closed = false; @@ -88,6 +90,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { return; } + if (_availableReadConnections.isEmpty && + _runningWithAllConnectionsCount > 0) { + // Wait until [withAllConnections] is done. Otherwise we could spawn a new + // reader while the user is configuring all the connections, + // e.g. a global open factory configuration shared across all connections. + return; + } + var nextItem = _queue.removeFirst(); while (nextItem.completer.isCompleted) { // This item already timed out - try the next one if available @@ -232,6 +242,66 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection { await connection.refreshSchema(); } } + + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) async { + try { + _runningWithAllConnectionsCount++; + + final blockCompleter = Completer(); + final (write, reads) = await _lockAllConns(blockCompleter); + + try { + final res = await block(write, reads); + blockCompleter.complete(res); + return res; + } catch (e, st) { + blockCompleter.completeError(e, st); + rethrow; + } + } finally { + _runningWithAllConnectionsCount--; + + // Continue processing any pending read requests that may have been queued while + // the block was running. + Timer.run(_nextRead); + } + } + + /// Locks all connections, returning the acquired contexts. + /// We pass a completer that would be called after the locks are taken. + Future<(SqliteWriteContext, List)> _lockAllConns( + Completer lockCompleter) async { + final List> readLockedCompleters = []; + final Completer writeLockedCompleter = Completer(); + + // Take the write lock + writeLock((ctx) { + writeLockedCompleter.complete(ctx); + return lockCompleter.future; + }); + + // Take all the read locks + for (final readConn in _allReadConnections) { + final completer = Completer(); + readLockedCompleters.add(completer); + + readConn.readLock((ctx) { + completer.complete(ctx); + return lockCompleter.future; + }); + } + + // Wait after all locks are taken + final [writer as SqliteWriteContext, ...readers] = await Future.wait([ + writeLockedCompleter.future, + ...readLockedCompleters.map((e) => e.future) + ]); + + return (writer, readers); + } } typedef ReadCallback = Future Function(SqliteReadContext tx); diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 7bea111..22cacf3 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -171,4 +171,12 @@ class SqliteDatabaseImpl Future refreshSchema() { return _pool.refreshSchema(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return _pool.withAllConnections(block); + } } diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index cfaf987..f2dc998 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -171,6 +171,14 @@ class WebDatabase await isInitialized; return _database.fileSystem.flush(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(this, [])); + } } final class _UnscopedContext extends UnscopedContext { diff --git a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart index c6d1b75..69f01ab 100644 --- a/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart @@ -178,4 +178,12 @@ class SqliteDatabaseImpl Future exposeEndpoint() async { return await _connection.exposeEndpoint(); } + + @override + Future withAllConnections( + Future Function( + SqliteWriteContext writer, List readers) + block) { + return writeLock((_) => block(_connection, [])); + } } diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index 6a315da..e2914b3 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -7,6 +7,7 @@ import 'utils/test_utils_impl.dart'; final testUtils = TestUtils(); const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm'); +const _isWeb = identical(0, 0.0) || _isDart2Wasm; void main() { group('Shared Basic Tests', () { @@ -301,6 +302,49 @@ void main() { 'Web locks are managed with a shared worker, which does not support timeouts', ) }); + + test('with all connections', () async { + final maxReaders = _isWeb ? 0 : 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + // Warm up to spawn the max readers + await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + final parentZone = Zone.current; + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + // Need a root zone here to avoid recursive lock errors. + readsCalledWhileWithAllConnsRunning = + Future(parentZone.bindCallback(() async { + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + await Future.delayed(const Duration(milliseconds: 100)); + }); + }), + ); + })); + + await Future.delayed(const Duration(milliseconds: 200)); + finishedWithAllConns = true; + }); + + await readsCalledWhileWithAllConnsRunning; + }); }); } diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index dec1fed..3f348e6 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -2,12 +2,16 @@ library; import 'dart:async'; +import 'dart:io'; import 'dart:math'; +import 'package:collection/collection.dart'; +import 'package:path/path.dart' show join; import 'package:sqlite3/common.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; +import '../utils/abstract_test_utils.dart'; import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); @@ -100,6 +104,126 @@ void main() { print("${DateTime.now()} done"); }); + test('prevent opening new readers while in withAllConnections', () async { + final sharedStateDir = Directory.systemTemp.createTempSync(); + addTearDown(() => sharedStateDir.deleteSync(recursive: true)); + + final File sharedStateFile = + File(join(sharedStateDir.path, 'shared-state.txt')); + + sharedStateFile.writeAsStringSync('initial'); + + final db = SqliteDatabase.withFactory( + _TestSqliteOpenFactoryWithSharedStateFile( + path: path, sharedStateFilePath: sharedStateFile.path), + maxReaders: 3); + await db.initialize(); + await createTables(db); + + // The writer saw 'initial' in the file when opening the connection + expect( + await db + .writeLock((c) => c.get('SELECT file_contents_on_open() AS state')), + {'state': 'initial'}, + ); + + final withAllConnectionsCompleter = Completer(); + + final withAllConnsFut = db.withAllConnections((writer, readers) async { + expect(readers.length, 0); // No readers yet + + // Simulate some work until the file is updated + await Future.delayed(const Duration(milliseconds: 200)); + sharedStateFile.writeAsStringSync('updated'); + + await withAllConnectionsCompleter.future; + }); + + // Start a reader that gets the contents of the shared file + bool readFinished = false; + final someReadFut = + db.get('SELECT file_contents_on_open() AS state', []).then((r) { + readFinished = true; + return r; + }); + + // The withAllConnections should prevent the reader from opening + await Future.delayed(const Duration(milliseconds: 100)); + expect(readFinished, isFalse); + + // Free all the locks + withAllConnectionsCompleter.complete(); + await withAllConnsFut; + + final readerInfo = await someReadFut; + expect(readFinished, isTrue); + // The read should see the updated value in the file. This checks + // that a reader doesn't spawn while running withAllConnections + expect(readerInfo, {'state': 'updated'}); + }); + + test('with all connections', () async { + final maxReaders = 3; + + final db = SqliteDatabase.withFactory( + await testUtils.testFactory(path: path), + maxReaders: maxReaders, + ); + await db.initialize(); + await createTables(db); + + Future readWithRandomDelay( + SqliteReadContext ctx, int id) async { + return await ctx.get( + 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection', + [id, 5 + Random().nextInt(10)]); + } + + // Warm up to spawn the max readers + await Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)), + ); + + bool finishedWithAllConns = false; + + late Future readsCalledWhileWithAllConnsRunning; + + print("${DateTime.now()} start"); + await db.withAllConnections((writer, readers) async { + expect(readers.length, maxReaders); + + // Run some reads during the block that they should run after the block finishes and releases + // all locks + readsCalledWhileWithAllConnsRunning = Future.wait( + [1, 2, 3, 4, 5, 6, 7, 8].map((i) async { + final r = await db.readLock((c) async { + expect(finishedWithAllConns, isTrue); + return await readWithRandomDelay(c, i); + }); + print( + "${DateTime.now()} After withAllConnections, started while running $r"); + }), + ); + + await Future.wait([ + writer.execute( + "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *", + [ + 123, + 5 + Random().nextInt(20) + ]).then((value) => + print("${DateTime.now()} withAllConnections writer done $value")), + ...readers + .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) { + print( + "${DateTime.now()} withAllConnections readers done $results"); + })) + ]); + }).then((_) => finishedWithAllConns = true); + + await readsCalledWhileWithAllConnsRunning; + }); + test('read-only transactions', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db); @@ -379,3 +503,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory { ]; } } + +class _TestSqliteOpenFactoryWithSharedStateFile + extends TestDefaultSqliteOpenFactory { + final String sharedStateFilePath; + + _TestSqliteOpenFactoryWithSharedStateFile( + {required super.path, required this.sharedStateFilePath}); + + @override + sqlite.CommonDatabase open(SqliteOpenOptions options) { + final File sharedStateFile = File(sharedStateFilePath); + final String sharedState = sharedStateFile.readAsStringSync(); + + final db = super.open(options); + + // Function to return the contents of the shared state file at the time of opening + // so that we know at which point the factory was called. + db.createFunction( + functionName: 'file_contents_on_open', + argumentCount: const sqlite.AllowedArgumentCount(0), + function: (args) { + return sharedState; + }, + ); + + return db; + } +} From 21df6da19d3e69762047a638c1e6fef8b710e713 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 13 Oct 2025 17:25:46 +0200 Subject: [PATCH 20/20] chore(release): publish packages - sqlite_async@0.12.2 - drift_sqlite_async@0.2.5 --- CHANGELOG.md | 26 ++++++++++++++++++++++++ packages/drift_sqlite_async/CHANGELOG.md | 4 ++++ packages/drift_sqlite_async/pubspec.yaml | 4 ++-- packages/sqlite_async/CHANGELOG.md | 4 ++++ packages/sqlite_async/pubspec.yaml | 2 +- 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 821f8db..5181152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,32 @@ All notable changes to this project will be documented in this file. See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. +## 2025-10-13 + +### Changes + +--- + +Packages with breaking changes: + + - There are no breaking changes in this release. + +Packages with other changes: + + - [`sqlite_async` - `v0.12.2`](#sqlite_async---v0122) + - [`drift_sqlite_async` - `v0.2.5`](#drift_sqlite_async---v025) + +--- + +#### `sqlite_async` - `v0.12.2` + + - Add `withAllConnections` method to run statements on all connections in the pool. + +#### `drift_sqlite_async` - `v0.2.5` + + - Allow customizing update notifications from `sqlite_async`. + + ## 2025-08-08 ### Changes diff --git a/packages/drift_sqlite_async/CHANGELOG.md b/packages/drift_sqlite_async/CHANGELOG.md index 8dd5e5a..bd36dd1 100644 --- a/packages/drift_sqlite_async/CHANGELOG.md +++ b/packages/drift_sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.2.5 + + - Allow customizing update notifications from `sqlite_async`. + ## 0.2.4 - Allow transforming table updates from sqlite_async. diff --git a/packages/drift_sqlite_async/pubspec.yaml b/packages/drift_sqlite_async/pubspec.yaml index 06e2963..bd2b701 100644 --- a/packages/drift_sqlite_async/pubspec.yaml +++ b/packages/drift_sqlite_async/pubspec.yaml @@ -1,5 +1,5 @@ name: drift_sqlite_async -version: 0.2.4 +version: 0.2.5 homepage: https://github.com/powersync-ja/sqlite_async.dart repository: https://github.com/powersync-ja/sqlite_async.dart description: Use Drift with a sqlite_async database, allowing both to be used in the same application. @@ -15,7 +15,7 @@ environment: sdk: ">=3.0.0 <4.0.0" dependencies: drift: ">=2.28.0 <3.0.0" - sqlite_async: ^0.12.0 + sqlite_async: ^0.12.2 dev_dependencies: build_runner: ^2.4.8 diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 342adca..1bbc939 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.2 + + - Add `withAllConnections` method to run statements on all connections in the pool. + ## 0.12.1 - Fix distributing updates from shared worker. diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 4b45d57..3d5130d 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.12.1 +version: 0.12.2 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0"