diff --git a/CHANGELOG.md b/CHANGELOG.md index 93a8b0d..821f8db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Packages with breaking changes: Packages with other changes: + - [`sqlite_async` - `v0.12.1`](#sqlite_async---v0121) - [`sqlite_async` - `v0.12.0`](#sqlite_async---v0120) - [`drift_sqlite_async` - `v0.2.3+1`](#drift_sqlite_async---v0231) @@ -26,6 +27,10 @@ Packages with dependency updates only: --- +#### `sqlite_async` - `v0.12.1` + +- Fix distributing updates from shared worker. + #### `sqlite_async` - `v0.12.0` - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 8cda2a6..342adca 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.12.1 + +- Fix distributing updates from shared worker. + ## 0.12.0 - Avoid large transactions creating a large internal update queue. diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 301d0af..e90739e 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -303,7 +303,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, final server = params.portServer; final commandPort = ReceivePort(); - db.throttledUpdatedTables.listen((changedTables) { + db.updatedTables.listen((changedTables) { client.fire(UpdateNotification(changedTables)); }); diff --git a/packages/sqlite_async/lib/src/utils/shared_utils.dart b/packages/sqlite_async/lib/src/utils/shared_utils.dart index ff291f4..542611e 100644 --- a/packages/sqlite_async/lib/src/utils/shared_utils.dart +++ b/packages/sqlite_async/lib/src/utils/shared_utils.dart @@ -79,68 +79,85 @@ List mapParameters(List parameters) { } extension ThrottledUpdates on CommonDatabase { - /// Wraps [updatesSync] to: + /// An unthrottled stream of updated tables that emits on every commit. /// - /// - Not fire in transactions. - /// - Fire asynchronously. - /// - Only report table names, which are buffered to avoid duplicates. - Stream> get throttledUpdatedTables { - StreamController>? controller; - var pendingUpdates = {}; - var paused = false; - - Timer? updateDebouncer; - - void maybeFireUpdates() { - updateDebouncer?.cancel(); - updateDebouncer = null; - - if (paused) { - // Continue collecting updates, but don't fire any - return; + /// A paused subscription on this stream will buffer changed tables into a + /// growing set instead of losing events, so this stream is simple to throttle + /// downstream. + Stream> get updatedTables { + final listeners = <_UpdateListener>[]; + var uncommitedUpdates = {}; + var underlyingSubscriptions = >[]; + + void handleUpdate(SqliteUpdate update) { + uncommitedUpdates.add(update.tableName); + } + + void afterCommit() { + for (final listener in listeners) { + listener.notify(uncommitedUpdates); } - if (!autocommit) { - // Inside a transaction - do not fire updates - return; + uncommitedUpdates.clear(); + } + + void afterRollback() { + uncommitedUpdates.clear(); + } + + void addListener(_UpdateListener listener) { + listeners.add(listener); + + if (listeners.length == 1) { + // First listener, start listening for raw updates on underlying + // database. + underlyingSubscriptions = [ + updatesSync.listen(handleUpdate), + commits.listen((_) => afterCommit()), + commits.listen((_) => afterRollback()) + ]; } + } - if (pendingUpdates.isNotEmpty) { - controller!.add(pendingUpdates); - pendingUpdates = {}; + void removeListener(_UpdateListener listener) { + listeners.remove(listener); + if (listeners.isEmpty) { + for (final sub in underlyingSubscriptions) { + sub.cancel(); + } } } - void collectUpdate(SqliteUpdate event) { - pendingUpdates.add(event.tableName); + return Stream.multi( + (listener) { + final wrapped = _UpdateListener(listener); + addListener(wrapped); - updateDebouncer ??= - Timer(const Duration(milliseconds: 1), maybeFireUpdates); + listener.onResume = wrapped.addPending; + listener.onCancel = () => removeListener(wrapped); + }, + isBroadcast: true, + ); + } +} + +class _UpdateListener { + final MultiStreamController> downstream; + Set buffered = {}; + + _UpdateListener(this.downstream); + + void notify(Set pendingUpdates) { + buffered.addAll(pendingUpdates); + if (!downstream.isPaused) { + addPending(); } + } - StreamSubscription? txSubscription; - StreamSubscription? sourceSubscription; - - controller = StreamController(onListen: () { - txSubscription = commits.listen((_) { - maybeFireUpdates(); - }, onError: (error) { - controller?.addError(error); - }); - - sourceSubscription = updatesSync.listen(collectUpdate, onError: (error) { - controller?.addError(error); - }); - }, onPause: () { - paused = true; - }, onResume: () { - paused = false; - maybeFireUpdates(); - }, onCancel: () { - txSubscription?.cancel(); - sourceSubscription?.cancel(); - }); - - return controller.stream; + void addPending() { + if (buffered.isNotEmpty) { + downstream.add(buffered); + buffered = {}; + } } } diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 0c3e8f7..059c281 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -56,7 +56,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { final Map _state = {}; AsyncSqliteDatabase({required this.database}) - : _updates = database.throttledUpdatedTables; + : _updates = database.updatedTables; _ConnectionState _findState(ClientConnection connection) { return _state.putIfAbsent(connection, _ConnectionState.new); @@ -144,12 +144,13 @@ class AsyncSqliteDatabase extends WorkerDatabase { state.unsubscribeUpdates(); _registerCloseListener(state, connection); - state.updatesNotification = _updates.listen((tables) { - connection.customRequest(CustomDatabaseMessage( + late StreamSubscription subscription; + subscription = state.updatesNotification = _updates.listen((tables) { + subscription.pause(connection.customRequest(CustomDatabaseMessage( CustomDatabaseMessageKind.notifyUpdates, id, tables.toList(), - )); + ))); }); } else { state.unsubscribeUpdates(); diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index 6e62e52..4b45d57 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -1,6 +1,6 @@ name: sqlite_async description: High-performance asynchronous interface for SQLite on Dart and Flutter. -version: 0.12.0 +version: 0.12.1 repository: https://github.com/powersync-ja/sqlite_async.dart environment: sdk: ">=3.5.0 <4.0.0" diff --git a/packages/sqlite_async/test/native/watch_test.dart b/packages/sqlite_async/test/native/watch_test.dart index 4e4fb83..0a97b17 100644 --- a/packages/sqlite_async/test/native/watch_test.dart +++ b/packages/sqlite_async/test/native/watch_test.dart @@ -7,6 +7,7 @@ import 'dart:math'; import 'package:sqlite3/common.dart'; import 'package:sqlite_async/sqlite_async.dart'; +import 'package:sqlite_async/src/utils/shared_utils.dart'; import 'package:test/test.dart'; import '../utils/test_utils_impl.dart'; @@ -31,6 +32,51 @@ void main() { return db; }); + test('raw update notifications', () async { + final factory = await testUtils.testFactory(path: path); + final db = factory + .openDB(SqliteOpenOptions(primaryConnection: true, readOnly: false)); + + db.execute('CREATE TABLE a (bar INTEGER);'); + db.execute('CREATE TABLE b (bar INTEGER);'); + final events = >[]; + final subscription = db.updatedTables.listen(events.add); + + db.execute('insert into a default values'); + expect(events, isEmpty); // should be async + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('insert into b default values'); + await pumpEventQueue(); + expect(events, isEmpty); // should only trigger on commit + db.execute('commit'); + + await pumpEventQueue(); + expect(events.removeLast(), {'a', 'b'}); + + db.execute('begin'); + db.execute('insert into a default values'); + db.execute('rollback'); + expect(events, isEmpty); + await pumpEventQueue(); + expect(events, isEmpty); // should ignore cancelled transactions + + // Should still listen during pause, and dispatch on resume + subscription.pause(); + db.execute('insert into a default values'); + await pumpEventQueue(); + expect(events, isEmpty); + + subscription.resume(); + await pumpEventQueue(); + expect(events.removeLast(), {'a'}); + + subscription.pause(); + }); + test('watch in isolate', () async { final db = await testUtils.setupDatabase(path: path); await createTables(db);