diff --git a/demos/benchmarks/pubspec.lock b/demos/benchmarks/pubspec.lock index 026996f9..b45942df 100644 --- a/demos/benchmarks/pubspec.lock +++ b/demos/benchmarks/pubspec.lock @@ -111,10 +111,10 @@ packages: dependency: "direct main" description: name: http - sha256: "2c11f3f94c687ee9bad77c171151672986360b2b001d109814ee7140b2cf261b" + sha256: bb2ce4590bc2667c96f318d68cac1b5a7987ec819351d32b1c987239a815e007 url: "https://pub.dev" source: hosted - version: "1.4.0" + version: "1.5.0" http_parser: dependency: transitive description: @@ -135,26 +135,26 @@ packages: dependency: transitive description: name: leak_tracker - sha256: "6bb818ecbdffe216e81182c2f0714a2e62b593f4a4f13098713ff1685dfb6ab0" + sha256: "8dcda04c3fc16c14f48a7bb586d4be1f0d1572731b6d81d51772ef47c02081e0" url: "https://pub.dev" source: hosted - version: "10.0.9" + version: "11.0.1" leak_tracker_flutter_testing: dependency: transitive description: name: leak_tracker_flutter_testing - sha256: f8b613e7e6a13ec79cfdc0e97638fddb3ab848452eff057653abd3edba760573 + sha256: "1dbc140bb5a23c75ea9c4811222756104fbcd1a27173f0c34ca01e16bea473c1" url: "https://pub.dev" source: hosted - version: "3.0.9" + version: "3.0.10" leak_tracker_testing: dependency: transitive description: name: leak_tracker_testing - sha256: "6ba465d5d76e67ddf503e1161d1f4a6bc42306f9d66ca1e8f079a47290fb06d3" + sha256: "8d5a2d49f4a66b49744b23b018848400d23e54caf9463f4eb20df3eb8acb2eb1" url: "https://pub.dev" source: hosted - version: "3.0.1" + version: "3.0.2" lints: dependency: transitive description: @@ -281,21 +281,21 @@ packages: path: "../../packages/powersync" relative: true source: path - version: "1.15.0" + version: "1.15.2" powersync_core: dependency: "direct overridden" description: path: "../../packages/powersync_core" relative: true source: path - version: "1.5.0" + version: "1.5.2" powersync_flutter_libs: dependency: "direct overridden" description: path: "../../packages/powersync_flutter_libs" relative: true source: path - version: "0.4.10" + version: "0.4.11" pub_semver: dependency: transitive description: @@ -337,10 +337,10 @@ packages: dependency: transitive description: name: sqlite3 - sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e" + sha256: f393d92c71bdcc118d6203d07c991b9be0f84b1a6f89dd4f7eed348131329924 url: "https://pub.dev" source: hosted - version: "2.7.5" + version: "2.9.0" sqlite3_flutter_libs: dependency: transitive description: @@ -353,18 +353,18 @@ packages: dependency: transitive description: name: sqlite3_web - sha256: "967e076442f7e1233bd7241ca61f3efe4c7fc168dac0f38411bdb3bdf471eb3c" + sha256: "0f6ebcb4992d1892ac5c8b5ecd22a458ab9c5eb6428b11ae5ecb5d63545844da" url: "https://pub.dev" source: hosted - version: "0.3.1" + version: "0.3.2" sqlite_async: dependency: "direct main" description: name: sqlite_async - sha256: a60e8d5c8df8e694933bd5a312c38393e79ad77d784bb91c6f38ba627bfb7aec + sha256: "6116bfc6aef6ce77730b478385ba4a58873df45721f6a9bc6ffabf39b6576e36" url: "https://pub.dev" source: hosted - version: "0.11.4" + version: "0.12.1" stack_trace: dependency: transitive description: @@ -401,10 +401,10 @@ packages: dependency: transitive description: name: test_api - sha256: fb31f383e2ee25fbbfe06b40fe21e1e458d14080e3c67e7ba0acfde4df4e0bbd + sha256: "522f00f556e73044315fa4585ec3270f1808a4b186c936e612cab0b565ff1e00" url: "https://pub.dev" source: hosted - version: "0.7.4" + version: "0.7.6" typed_data: dependency: transitive description: @@ -433,10 +433,10 @@ packages: dependency: transitive description: name: vector_math - sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803" + sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b url: "https://pub.dev" source: hosted - version: "2.1.4" + version: "2.2.0" vm_service: dependency: transitive description: @@ -470,5 +470,5 @@ packages: source: hosted version: "3.1.3" sdks: - dart: ">=3.7.0 <4.0.0" + dart: ">=3.8.0-0 <4.0.0" flutter: ">=3.27.0" diff --git a/demos/django-todolist/lib/widgets/guard_by_sync.dart b/demos/django-todolist/lib/widgets/guard_by_sync.dart index 000b5c8d..b65986b0 100644 --- a/demos/django-todolist/lib/widgets/guard_by_sync.dart +++ b/demos/django-todolist/lib/widgets/guard_by_sync.dart @@ -7,9 +7,9 @@ import 'package:powersync_django_todolist_demo/powersync.dart'; class GuardBySync extends StatelessWidget { final Widget child; - /// When set, wait only for a complete sync within the [BucketPriority] + /// When set, wait only for a complete sync within the [StreamPriority] /// instead of a full sync. - final BucketPriority? priority; + final StreamPriority? priority; const GuardBySync({ super.key, diff --git a/demos/supabase-todolist-drift/ios/Runner.xcworkspace/xcshareddata/swiftpm/Package.resolved b/demos/supabase-todolist-drift/ios/Runner.xcworkspace/xcshareddata/swiftpm/Package.resolved index 8e5eb05f..0c12c1e5 100644 --- a/demos/supabase-todolist-drift/ios/Runner.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/demos/supabase-todolist-drift/ios/Runner.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -5,7 +5,7 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/simolus3/CSQLite.git", "state" : { - "revision" : "a8d28afef08ad8faa4ee9ef7845f61c2e8ac5810" + "revision" : "a268235ae86718e66d6a29feef3bd22c772eb82b" } }, { @@ -13,8 +13,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "state" : { - "revision" : "00776db5157c8648671b00e6673603144fafbfeb", - "version" : "0.4.5" + "revision" : "b2a81af14e9ad83393eb187bb02e62e6db8b5ad6", + "version" : "0.4.6" } } ], diff --git a/demos/supabase-todolist-drift/lib/powersync/powersync.dart b/demos/supabase-todolist-drift/lib/powersync/powersync.dart index cfd73336..cb327bfd 100644 --- a/demos/supabase-todolist-drift/lib/powersync/powersync.dart +++ b/demos/supabase-todolist-drift/lib/powersync/powersync.dart @@ -47,18 +47,19 @@ Future powerSyncInstance(Ref ref) async { return db; } -final _syncStatusInternal = StreamProvider((ref) { +final _syncStatusInternal = StreamProvider((ref) { return Stream.fromFuture( ref.watch(powerSyncInstanceProvider.future), - ).asyncExpand((db) => db.statusStream).startWith(const SyncStatus()); + ).asyncExpand((db) => db.statusStream).startWith(null); }); final syncStatus = Provider((ref) { + // ignore: invalid_use_of_internal_member return ref.watch(_syncStatusInternal).value ?? const SyncStatus(); }); @riverpod -bool didCompleteSync(Ref ref, [BucketPriority? priority]) { +bool didCompleteSync(Ref ref, [StreamPriority? priority]) { final status = ref.watch(syncStatus); if (priority != null) { return status.statusForPriority(priority).hasSynced ?? false; diff --git a/demos/supabase-todolist-drift/lib/screens/lists.dart b/demos/supabase-todolist-drift/lib/screens/lists.dart index ebaa0857..c995a275 100644 --- a/demos/supabase-todolist-drift/lib/screens/lists.dart +++ b/demos/supabase-todolist-drift/lib/screens/lists.dart @@ -35,7 +35,7 @@ final class _ListsWidget extends ConsumerWidget { @override Widget build(BuildContext context, WidgetRef ref) { final lists = ref.watch(listsNotifierProvider); - final didSync = ref.watch(didCompleteSyncProvider(BucketPriority(1))); + final didSync = ref.watch(didCompleteSyncProvider(StreamPriority(1))); if (!didSync) { return const Text('Busy with sync...'); diff --git a/demos/supabase-todolist/lib/app_config_template.dart b/demos/supabase-todolist/lib/app_config_template.dart index ccfdfa21..05ea5164 100644 --- a/demos/supabase-todolist/lib/app_config_template.dart +++ b/demos/supabase-todolist/lib/app_config_template.dart @@ -6,4 +6,7 @@ class AppConfig { static const String powersyncUrl = 'https://foo.powersync.journeyapps.com'; static const String supabaseStorageBucket = ''; // Optional. Only required when syncing attachments and using Supabase Storage. See packages/powersync_attachments_helper. + // Whether the PowerSync instance uses sync streams to make fetching todo + // items optional. + static const bool hasSyncStreams = false; } diff --git a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart index d55ed4e3..6d4d12b9 100644 --- a/demos/supabase-todolist/lib/widgets/guard_by_sync.dart +++ b/demos/supabase-todolist/lib/widgets/guard_by_sync.dart @@ -7,9 +7,9 @@ import 'package:powersync_flutter_demo/powersync.dart'; class GuardBySync extends StatelessWidget { final Widget child; - /// When set, wait only for a complete sync within the [BucketPriority] + /// When set, wait only for a complete sync within the [StreamPriority] /// instead of a full sync. - final BucketPriority? priority; + final StreamPriority? priority; const GuardBySync({ super.key, diff --git a/demos/supabase-todolist/lib/widgets/list_item_sync_stream.dart b/demos/supabase-todolist/lib/widgets/list_item_sync_stream.dart new file mode 100644 index 00000000..e400f8aa --- /dev/null +++ b/demos/supabase-todolist/lib/widgets/list_item_sync_stream.dart @@ -0,0 +1,78 @@ +import 'package:flutter/material.dart'; + +import '../powersync.dart'; +import './todo_list_page.dart'; +import '../models/todo_list.dart'; + +/// A variant of the `ListItem` that only shows a summary of completed and +/// pending items when the respective list has an active sync stream. +class SyncStreamsAwareListItem extends StatelessWidget { + SyncStreamsAwareListItem({ + required this.list, + }) : super(key: ObjectKey(list)); + + final TodoList list; + + Future delete() async { + // Server will take care of deleting related todos + await list.delete(); + } + + @override + Widget build(BuildContext context) { + viewList() { + var navigator = Navigator.of(context); + + navigator.push( + MaterialPageRoute(builder: (context) => TodoListPage(list: list))); + } + + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, asyncSnapshot) { + final status = asyncSnapshot.requireData; + final stream = + status.forStream(db.syncStream('todos', {'list': list.id})); + + String subtext; + if (stream == null || !stream.subscription.active) { + subtext = 'Items not loaded - click to fetch.'; + } else { + subtext = + '${list.pendingCount} pending, ${list.completedCount} completed'; + } + + return Card( + child: Column( + mainAxisSize: MainAxisSize.min, + children: [ + ListTile( + onTap: viewList, + leading: const Icon(Icons.list), + title: Text(list.name), + subtitle: Text(subtext), + ), + Row( + mainAxisAlignment: MainAxisAlignment.end, + children: [ + IconButton( + iconSize: 30, + icon: const Icon( + Icons.delete, + color: Colors.red, + ), + tooltip: 'Delete List', + alignment: Alignment.centerRight, + onPressed: delete, + ), + const SizedBox(width: 8), + ], + ), + ], + ), + ); + }, + ); + } +} diff --git a/demos/supabase-todolist/lib/widgets/lists_page.dart b/demos/supabase-todolist/lib/widgets/lists_page.dart index c41aabbe..564e03ee 100644 --- a/demos/supabase-todolist/lib/widgets/lists_page.dart +++ b/demos/supabase-todolist/lib/widgets/lists_page.dart @@ -1,11 +1,13 @@ import 'package:flutter/material.dart'; import 'package:powersync/powersync.dart'; +import '../app_config.dart'; import './list_item.dart'; import './list_item_dialog.dart'; import '../main.dart'; import '../models/todo_list.dart'; import 'guard_by_sync.dart'; +import 'list_item_sync_stream.dart'; void _showAddDialog(BuildContext context) async { return showDialog( @@ -55,7 +57,9 @@ class ListsWidget extends StatelessWidget { return ListView( padding: const EdgeInsets.symmetric(vertical: 8.0), children: todoLists.map((list) { - return ListItemWidget(list: list); + return AppConfig.hasSyncStreams + ? SyncStreamsAwareListItem(list: list) + : ListItemWidget(list: list); }).toList(), ); } else { @@ -66,5 +70,5 @@ class ListsWidget extends StatelessWidget { ); } - static final _listsPriority = BucketPriority(1); + static final _listsPriority = StreamPriority(1); } diff --git a/demos/supabase-todolist/lib/widgets/todo_list_page.dart b/demos/supabase-todolist/lib/widgets/todo_list_page.dart index 7e28238e..b1245321 100644 --- a/demos/supabase-todolist/lib/widgets/todo_list_page.dart +++ b/demos/supabase-todolist/lib/widgets/todo_list_page.dart @@ -1,5 +1,7 @@ import 'package:flutter/material.dart'; +import 'package:powersync/powersync.dart'; +import '../app_config.dart'; import '../powersync.dart'; import './status_app_bar.dart'; import './todo_item_dialog.dart'; @@ -32,9 +34,12 @@ class TodoListPage extends StatelessWidget { ); return Scaffold( - appBar: StatusAppBar(title: Text(list.name)), - floatingActionButton: button, - body: TodoListWidget(list: list)); + appBar: StatusAppBar(title: Text(list.name)), + floatingActionButton: button, + body: AppConfig.hasSyncStreams + ? _SyncStreamTodoListWidget(list: list) + : TodoListWidget(list: list), + ); } } @@ -66,3 +71,84 @@ class TodoListWidget extends StatelessWidget { ); } } + +class _SyncStreamTodoListWidget extends StatefulWidget { + final TodoList list; + + const _SyncStreamTodoListWidget({required this.list}); + + @override + State<_SyncStreamTodoListWidget> createState() => _SyncStreamTodosState(); +} + +class _SyncStreamTodosState extends State<_SyncStreamTodoListWidget> { + SyncStreamSubscription? _listSubscription; + + void _subscribe(String listId) { + db + .syncStream('todos', {'list': listId}) + .subscribe(ttl: const Duration(hours: 1)) + .then((sub) { + if (mounted && widget.list.id == listId) { + setState(() { + _listSubscription = sub; + }); + } else { + sub.unsubscribe(); + } + }); + } + + @override + void initState() { + super.initState(); + _subscribe(widget.list.id); + } + + @override + void didUpdateWidget(covariant _SyncStreamTodoListWidget oldWidget) { + super.didUpdateWidget(oldWidget); + _subscribe(widget.list.id); + } + + @override + void dispose() { + super.dispose(); + _listSubscription?.unsubscribe(); + } + + @override + Widget build(BuildContext context) { + return StreamBuilder( + stream: db.statusStream, + initialData: db.currentStatus, + builder: (context, snapshot) { + final hasSynced = switch (_listSubscription) { + null => null, + final sub => snapshot.requireData.forStream(sub), + } + ?.subscription + .hasSynced ?? + false; + + if (!hasSynced) { + return const Center(child: CircularProgressIndicator()); + } else { + return StreamBuilder( + stream: widget.list.watchItems(), + builder: (context, snapshot) { + final items = snapshot.data ?? const []; + + return ListView( + padding: const EdgeInsets.symmetric(vertical: 8.0), + children: items.map((todo) { + return TodoItemWidget(todo: todo); + }).toList(), + ); + }, + ); + } + }, + ); + } +} diff --git a/packages/powersync_core/lib/powersync_core.dart b/packages/powersync_core/lib/powersync_core.dart index b4dfe35d..ef2e97c7 100644 --- a/packages/powersync_core/lib/powersync_core.dart +++ b/packages/powersync_core/lib/powersync_core.dart @@ -11,6 +11,7 @@ export 'src/log.dart'; export 'src/open_factory.dart'; export 'src/schema.dart'; export 'src/sync/options.dart' hide ResolvedSyncOptions; +export 'src/sync/stream.dart' hide CoreActiveStreamSubscription; export 'src/sync/sync_status.dart' - hide BucketProgress, InternalSyncDownloadProgress; + hide BucketProgress, InternalSyncDownloadProgress, InternalSyncStatusAccess; export 'src/uuid.dart'; diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 3ebb95f9..40489365 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -133,6 +133,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, required AbortController abort, required Zone asyncWorkZone, }) async { @@ -140,6 +142,7 @@ class PowerSyncDatabaseImpl bool triedSpawningIsolate = false; StreamSubscription? crudUpdateSubscription; + StreamSubscription? activeStreamsSubscription; final receiveMessages = ReceivePort(); final receiveUnhandledErrors = ReceivePort(); final receiveExit = ReceivePort(); @@ -157,6 +160,7 @@ class PowerSyncDatabaseImpl // Cleanup crudUpdateSubscription?.cancel(); + activeStreamsSubscription?.cancel(); receiveMessages.close(); receiveUnhandledErrors.close(); receiveExit.close(); @@ -198,6 +202,10 @@ class PowerSyncDatabaseImpl crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); + + activeStreamsSubscription = activeStreams.listen((streams) { + port.send(['changed_subscriptions', streams]); + }); } else if (action == 'uploadCrud') { await (data[1] as PortCompleter).handle(() async { await connector.uploadData(this); @@ -366,6 +374,9 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { } } else if (action == 'close') { await shutdown(); + } else if (action == 'changed_subscriptions') { + openedStreamingSync + ?.updateSubscriptions(message[1] as List); } } }); diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index a4f0b419..ae891cb7 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -7,6 +7,7 @@ import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/open_factory/abstract_powersync_open_factory.dart'; import '../sync/options.dart'; +import '../sync/streaming_sync.dart'; import 'powersync_database.dart'; import '../connector.dart'; @@ -115,6 +116,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required AbortController abort, + required List initiallyActiveStreams, + required Stream> activeStreams, required Zone asyncWorkZone, required ResolvedSyncOptions options, }) { diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index a53b5049..fd722a2a 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -14,9 +14,13 @@ import 'package:powersync_core/src/powersync_update_notification.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/schema_logic.dart'; import 'package:powersync_core/src/schema_logic.dart' as schema_logic; +import 'package:powersync_core/src/sync/connection_manager.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:powersync_core/src/sync/sync_status.dart'; +import '../sync/stream.dart'; +import '../sync/streaming_sync.dart'; + mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Schema used for the local database. Schema get schema; @@ -42,16 +46,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @Deprecated("This field is unused, pass params to connect() instead") Map? clientParams; + late final ConnectionManager _connections; + /// Current connection status. - SyncStatus currentStatus = - const SyncStatus(connected: false, lastSyncedAt: null); + SyncStatus get currentStatus => _connections.currentStatus; /// Use this stream to subscribe to connection status updates. - late final Stream statusStream; - - @protected - StreamController statusStreamController = - StreamController.broadcast(); + Stream get statusStream => _connections.statusStream; late final ActiveDatabaseGroup _activeGroup; @@ -81,15 +82,6 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @protected Future get isInitialized; - /// The abort controller for the current sync iteration. - /// - /// null when disconnected, present when connecting or connected. - /// - /// The controller must only be accessed from within a critical section of the - /// sync mutex. - @protected - AbortController? _abortActiveSync; - @protected Future baseInit() async { String identifier = 'memory'; @@ -107,15 +99,14 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { 'instantiation logic if this is not intentional', ); } - - statusStream = statusStreamController.stream; + _connections = ConnectionManager(this); updates = powerSyncUpdateNotifications(database.updates); await database.initialize(); await _checkVersion(); await database.execute('SELECT powersync_init()'); await updateSchema(schema); - await _updateHasSynced(); + await _connections.resolveOfflineSyncStatus(); } /// Check that a supported version of the powersync extension is loaded. @@ -141,55 +132,15 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { return isInitialized; } - Future _updateHasSynced() async { - // Query the database to see if any data has been synced. - final result = await database.getAll( - 'SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority;', - ); - const prioritySentinel = 2147483647; - var hasSynced = false; - DateTime? lastCompleteSync; - final priorityStatusEntries = []; - - DateTime parseDateTime(String sql) { - return DateTime.parse('${sql}Z').toLocal(); - } - - for (final row in result) { - final priority = row.columnAt(0) as int; - final lastSyncedAt = parseDateTime(row.columnAt(1) as String); - - if (priority == prioritySentinel) { - hasSynced = true; - lastCompleteSync = lastSyncedAt; - } else { - priorityStatusEntries.add(( - hasSynced: true, - lastSyncedAt: lastSyncedAt, - priority: BucketPriority(priority) - )); - } - } - - if (hasSynced != currentStatus.hasSynced) { - final status = SyncStatus( - hasSynced: hasSynced, - lastSyncedAt: lastCompleteSync, - priorityStatusEntries: priorityStatusEntries, - ); - setStatus(status); - } - } - /// Returns a [Future] which will resolve once at least one full sync cycle /// has completed (meaninng that the first consistent checkpoint has been /// reached across all buckets). /// /// When [priority] is null (the default), this method waits for the first - /// full sync checkpoint to complete. When set to a [BucketPriority] however, + /// full sync checkpoint to complete. When set to a [StreamPriority] however, /// it completes once all buckets within that priority (as well as those in /// higher priorities) have been synchronized at least once. - Future waitForFirstSync({BucketPriority? priority}) async { + Future waitForFirstSync({StreamPriority? priority}) async { bool matches(SyncStatus status) { if (priority == null) { return status.hasSynced == true; @@ -198,46 +149,13 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { } } - if (matches(currentStatus)) { - return; - } - await for (final result in statusStream) { - if (matches(result)) { - break; - } - } + return _connections.firstStatusMatching(matches); } @protected @visibleForTesting void setStatus(SyncStatus status) { - if (status != currentStatus) { - final newStatus = SyncStatus( - connected: status.connected, - downloading: status.downloading, - uploading: status.uploading, - connecting: status.connecting, - uploadError: status.uploadError, - downloadError: status.downloadError, - priorityStatusEntries: status.priorityStatusEntries, - downloadProgress: status.downloadProgress, - // Note that currently the streaming sync implementation will never set - // hasSynced. lastSyncedAt implies that syncing has completed at some - // point (hasSynced = true). - // The previous values of hasSynced should be preserved here. - lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, - hasSynced: status.lastSyncedAt != null - ? true - : status.hasSynced ?? currentStatus.hasSynced, - ); - - // If the absence of hasSynced was the only difference, the new states - // would be equal and don't require an event. So, check again. - if (newStatus != currentStatus) { - currentStatus = newStatus; - statusStreamController.add(currentStatus); - } - } + _connections.manuallyChangeSyncStatus(status); } @override @@ -262,9 +180,9 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // Now we can close the database await database.close(); - // If there are paused subscriptionso n the status stream, don't delay + // If there are paused subscriptions on the status stream, don't delay // closing the database because of that. - unawaited(statusStreamController.close()); + _connections.close(); await _activeGroup.close(); } } @@ -298,67 +216,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { params: params, ); - if (schema.rawTables.isNotEmpty && - resolvedOptions.source.syncImplementation != - SyncClientImplementation.rust) { - throw UnsupportedError( - 'Raw tables are only supported by the Rust client.'); - } - - // ignore: deprecated_member_use_from_same_package - clientParams = params; - var thisConnectAborter = AbortController(); - final zone = Zone.current; - - late void Function() retryHandler; - - Future connectWithSyncLock() async { - // Ensure there has not been a subsequent connect() call installing a new - // sync client. - assert(identical(_abortActiveSync, thisConnectAborter)); - assert(!thisConnectAborter.aborted); - - await connectInternal( - connector: connector, - options: resolvedOptions, - abort: thisConnectAborter, - // Run follow-up async tasks in the parent zone, a new one is introduced - // while we hold the lock (and async tasks won't hold the sync lock). - asyncWorkZone: zone, - ); - - thisConnectAborter.onCompletion.whenComplete(retryHandler); - } - - // If the sync encounters a failure without being aborted, retry - retryHandler = Zone.current.bindCallback(() async { - _activeGroup.syncConnectMutex.lock(() async { - // Is this still supposed to be active? (abort is only called within - // mutex) - if (!thisConnectAborter.aborted) { - // We only change _abortActiveSync after disconnecting, which resets - // the abort controller. - assert(identical(_abortActiveSync, thisConnectAborter)); - - // We need a new abort controller for this attempt - _abortActiveSync = thisConnectAborter = AbortController(); - - logger.warning('Sync client failed, retrying...'); - await connectWithSyncLock(); - } - }); - }); - - await _activeGroup.syncConnectMutex.lock(() async { - // Disconnect a previous sync client, if one is active. - await _abortCurrentSync(); - assert(_abortActiveSync == null); - - // Install the abort controller for this particular connect call, allowing - // it to be disconnected. - _abortActiveSync = thisConnectAborter; - await connectWithSyncLock(); - }); + await _connections.connect(connector: connector, options: resolvedOptions); } /// Internal method to establish a sync client connection. @@ -372,6 +230,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { Future connectInternal({ required PowerSyncBackendConnector connector, required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, required AbortController abort, required Zone asyncWorkZone, }); @@ -380,27 +240,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// /// Use [connect] to connect again. Future disconnect() async { - // Also wrap this in the sync mutex to ensure there's no race between us - // connecting and disconnecting. - await _activeGroup.syncConnectMutex.lock(_abortCurrentSync); - - setStatus( - SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); - } - - Future _abortCurrentSync() async { - if (_abortActiveSync case final disconnector?) { - /// Checking `disconnecter.aborted` prevents race conditions - /// where multiple calls to `disconnect` can attempt to abort - /// the controller more than once before it has finished aborting. - if (disconnector.aborted == false) { - await disconnector.abort(); - _abortActiveSync = null; - } else { - /// Wait for the abort to complete. Continue updating the sync status after completed - await disconnector.onCompletion; - } - } + await _connections.disconnect(); } /// Disconnect and clear the database. @@ -418,8 +258,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await tx.execute('select powersync_clear(?)', [clearLocal ? 1 : 0]); }); // The data has been deleted - reset these - currentStatus = SyncStatus(lastSyncedAt: null, hasSynced: false); - statusStreamController.add(currentStatus); + setStatus(SyncStatus(lastSyncedAt: null, hasSynced: false)); } @Deprecated('Use [disconnectAndClear] instead.') @@ -441,9 +280,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { schema.validate(); await _activeGroup.syncConnectMutex.lock(() async { - if (_abortActiveSync != null) { - throw AssertionError('Cannot update schema while connected'); - } + _connections.checkNotConnected(); this.schema = schema; await database.writeLock((tx) => schema_logic.updateSchema(tx, schema)); @@ -666,6 +503,13 @@ SELECT * FROM crud_entries; Future refreshSchema() async { await database.refreshSchema(); } + + /// Create a [SyncStream] instance for the given [name] and [parameters]. + /// + /// Use [SyncStream.subscribe] to subscribe to the returned stream. + SyncStream syncStream(String name, [Map? parameters]) { + return _connections.syncStream(name, parameters); + } } Stream powerSyncUpdateNotifications( diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 4af2821e..15a83c7d 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -128,6 +128,8 @@ class PowerSyncDatabaseImpl Future connectInternal({ required PowerSyncBackendConnector connector, required AbortController abort, + required List initiallyActiveStreams, + required Stream> activeStreams, required Zone asyncWorkZone, required ResolvedSyncOptions options, }) async { @@ -141,6 +143,7 @@ class PowerSyncDatabaseImpl connector: connector, options: options.source, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), + subscriptions: initiallyActiveStreams, ); } catch (e) { logger.warning( @@ -157,6 +160,7 @@ class PowerSyncDatabaseImpl crudUpdateTriggerStream: crudStream, options: options, client: BrowserClient(), + activeSubscriptions: initiallyActiveStreams, // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. identifier: database.openFactory.path, @@ -168,7 +172,10 @@ class PowerSyncDatabaseImpl }); sync.streamingSync(); + final subscriptions = activeStreams.listen(sync.updateSubscriptions); + abort.onAbort.then((_) async { + subscriptions.cancel(); await sync.abort(); abort.completeAbort(); }).ignore(); diff --git a/packages/powersync_core/lib/src/sync/connection_manager.dart b/packages/powersync_core/lib/src/sync/connection_manager.dart new file mode 100644 index 00000000..8a326642 --- /dev/null +++ b/packages/powersync_core/lib/src/sync/connection_manager.dart @@ -0,0 +1,396 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:meta/meta.dart'; +import 'package:powersync_core/src/abort_controller.dart'; +import 'package:powersync_core/src/connector.dart'; +import 'package:powersync_core/src/database/active_instances.dart'; +import 'package:powersync_core/src/database/powersync_db_mixin.dart'; +import 'package:powersync_core/src/sync/options.dart'; +import 'package:powersync_core/src/sync/stream.dart'; +import 'package:powersync_core/src/sync/sync_status.dart'; + +import 'instruction.dart'; +import 'mutable_sync_status.dart'; +import 'streaming_sync.dart'; + +/// A (stream name, JSON parameters) pair that uniquely identifies a stream +/// instantiation to subscribe to. +typedef _RawStreamKey = (String, String); + +@internal +final class ConnectionManager { + final PowerSyncDatabaseMixin db; + final ActiveDatabaseGroup _activeGroup; + + /// All streams (with parameters) for which a subscription has been requested + /// explicitly. + final Map<_RawStreamKey, _ActiveSubscription> _locallyActiveSubscriptions = + {}; + + final StreamController _statusController = + StreamController.broadcast(); + + /// Fires when an entry is added or removed from [_locallyActiveSubscriptions] + /// while we're connected. + StreamController? _subscriptionsChanged; + + SyncStatus _currentStatus = + const SyncStatus(connected: false, lastSyncedAt: null); + + SyncStatus get currentStatus => _currentStatus; + Stream get statusStream => _statusController.stream; + + /// The abort controller for the current sync iteration. + /// + /// null when disconnected, present when connecting or connected. + /// + /// The controller must only be accessed from within a critical section of the + /// sync mutex. + AbortController? _abortActiveSync; + + ConnectionManager(this.db) : _activeGroup = db.group; + + void checkNotConnected() { + if (_abortActiveSync != null) { + throw StateError('Cannot update schema while connected'); + } + } + + Future _abortCurrentSync() async { + if (_abortActiveSync case final disconnector?) { + /// Checking `disconnecter.aborted` prevents race conditions + /// where multiple calls to `disconnect` can attempt to abort + /// the controller more than once before it has finished aborting. + if (disconnector.aborted == false) { + await disconnector.abort(); + _abortActiveSync = null; + } else { + /// Wait for the abort to complete. Continue updating the sync status after completed + await disconnector.onCompletion; + } + } + } + + Future disconnect() async { + // Also wrap this in the sync mutex to ensure there's no race between us + // connecting and disconnecting. + await _activeGroup.syncConnectMutex.lock(() async { + await _abortCurrentSync(); + _subscriptionsChanged?.close(); + _subscriptionsChanged = null; + }); + + manuallyChangeSyncStatus( + SyncStatus(connected: false, lastSyncedAt: currentStatus.lastSyncedAt)); + } + + Future firstStatusMatching(bool Function(SyncStatus) predicate) async { + if (predicate(currentStatus)) { + return; + } + await for (final result in statusStream) { + if (predicate(result)) { + break; + } + } + } + + List get _subscribedStreams => [ + for (final active in _locallyActiveSubscriptions.values) + (name: active.name, parameters: active.encodedParameters) + ]; + + Future connect({ + required PowerSyncBackendConnector connector, + required ResolvedSyncOptions options, + }) async { + if (db.schema.rawTables.isNotEmpty && + options.source.syncImplementation != SyncClientImplementation.rust) { + throw UnsupportedError( + 'Raw tables are only supported by the Rust client.'); + } + + var thisConnectAborter = AbortController(); + final zone = Zone.current; + + late void Function() retryHandler; + + final subscriptionsChanged = StreamController(); + + Future connectWithSyncLock() async { + // Ensure there has not been a subsequent connect() call installing a new + // sync client. + assert(identical(_abortActiveSync, thisConnectAborter)); + assert(!thisConnectAborter.aborted); + + // ignore: invalid_use_of_protected_member + await db.connectInternal( + connector: connector, + options: options, + abort: thisConnectAborter, + initiallyActiveStreams: _subscribedStreams, + activeStreams: subscriptionsChanged.stream.map((_) { + return _subscribedStreams; + }), + // Run follow-up async tasks in the parent zone, a new one is introduced + // while we hold the lock (and async tasks won't hold the sync lock). + asyncWorkZone: zone, + ); + + thisConnectAborter.onCompletion.whenComplete(retryHandler); + } + + // If the sync encounters a failure without being aborted, retry + retryHandler = Zone.current.bindCallback(() async { + _activeGroup.syncConnectMutex.lock(() async { + // Is this still supposed to be active? (abort is only called within + // mutex) + if (!thisConnectAborter.aborted) { + // We only change _abortActiveSync after disconnecting, which resets + // the abort controller. + assert(identical(_abortActiveSync, thisConnectAborter)); + + // We need a new abort controller for this attempt + _abortActiveSync = thisConnectAborter = AbortController(); + + db.logger.warning('Sync client failed, retrying...'); + await connectWithSyncLock(); + } + }); + }); + + await _activeGroup.syncConnectMutex.lock(() async { + // Disconnect a previous sync client, if one is active. + await _abortCurrentSync(); + assert(_abortActiveSync == null); + _subscriptionsChanged = subscriptionsChanged; + + // Install the abort controller for this particular connect call, allowing + // it to be disconnected. + _abortActiveSync = thisConnectAborter; + await connectWithSyncLock(); + }); + } + + void manuallyChangeSyncStatus(SyncStatus status) { + if (status != currentStatus) { + final newStatus = SyncStatus( + connected: status.connected, + downloading: status.downloading, + uploading: status.uploading, + connecting: status.connecting, + uploadError: status.uploadError, + downloadError: status.downloadError, + priorityStatusEntries: status.priorityStatusEntries, + downloadProgress: status.downloadProgress, + // Note that currently the streaming sync implementation will never set + // hasSynced. lastSyncedAt implies that syncing has completed at some + // point (hasSynced = true). + // The previous values of hasSynced should be preserved here. + lastSyncedAt: status.lastSyncedAt ?? currentStatus.lastSyncedAt, + hasSynced: status.lastSyncedAt != null + ? true + : status.hasSynced ?? currentStatus.hasSynced, + streamSubscriptions: status.internalSubscriptions, + ); + + // If the absence of hasSynced was the only difference, the new states + // would be equal and don't require an event. So, check again. + if (newStatus != currentStatus) { + _currentStatus = newStatus; + _statusController.add(_currentStatus); + } + } + } + + _SyncStreamSubscriptionHandle _referenceStreamSubscription( + String stream, Map? parameters) { + final key = (stream, json.encode(parameters)); + _ActiveSubscription active; + + if (_locallyActiveSubscriptions[key] case final current?) { + active = current; + } else { + active = _ActiveSubscription(this, + name: stream, parameters: parameters, encodedParameters: key.$2); + _locallyActiveSubscriptions[key] = active; + _subscriptionsChanged?.add(null); + } + + return _SyncStreamSubscriptionHandle(active); + } + + void _clearSubscription(_ActiveSubscription subscription) { + assert(subscription.refcount == 0); + _locallyActiveSubscriptions + .remove((subscription.name, subscription.encodedParameters)); + _subscriptionsChanged?.add(null); + } + + Future _subscriptionsCommand(Object? command) async { + await db.writeTransaction((tx) { + return tx.execute( + 'SELECT powersync_control(?, ?)', + ['subscriptions', json.encode(command)], + ); + }); + _subscriptionsChanged?.add(null); + } + + Future subscribe({ + required String stream, + required Map? parameters, + Duration? ttl, + StreamPriority? priority, + }) async { + await _subscriptionsCommand({ + 'subscribe': { + 'stream': { + 'name': stream, + 'params': parameters, + }, + 'ttl': ttl?.inSeconds, + 'priority': priority, + }, + }); + + await _activeGroup.syncConnectMutex.lock(() async { + if (_abortActiveSync == null) { + // Since we're not connected, update the offline sync status to reflect + // the new subscription. + // With a connection, the sync client would include it in its state. + await resolveOfflineSyncStatus(); + } + }); + } + + Future unsubscribeAll({ + required String stream, + required Object? parameters, + }) async { + await _subscriptionsCommand({ + 'unsubscribe': { + 'name': stream, + 'params': parameters, + }, + }); + } + + Future resolveOfflineSyncStatus() async { + final row = await db.database.get( + 'SELECT powersync_offline_sync_status() AS r;', + ); + + final status = CoreSyncStatus.fromJson( + json.decode(row['r'] as String) as Map); + + manuallyChangeSyncStatus((MutableSyncStatus()..applyFromCore(status)) + .immutableSnapshot(setLastSynced: true)); + } + + SyncStream syncStream(String name, Map? parameters) { + return _SyncStreamImplementation(this, name, parameters); + } + + void close() { + _statusController.close(); + } +} + +final class _SyncStreamImplementation implements SyncStream { + @override + final String name; + + @override + final Map? parameters; + + final ConnectionManager _connections; + + _SyncStreamImplementation(this._connections, this.name, this.parameters); + + @override + Future subscribe({ + Duration? ttl, + StreamPriority? priority, + }) async { + await _connections.subscribe( + stream: name, + parameters: parameters, + ttl: ttl, + priority: priority, + ); + + return _connections._referenceStreamSubscription(name, parameters); + } + + @override + Future unsubscribeAll() async { + await _connections.unsubscribeAll(stream: name, parameters: parameters); + } +} + +final class _ActiveSubscription { + final ConnectionManager connections; + var refcount = 0; + + final String name; + final String encodedParameters; + final Map? parameters; + + _ActiveSubscription( + this.connections, { + required this.name, + required this.encodedParameters, + required this.parameters, + }); + + void decrementRefCount() { + refcount--; + if (refcount == 0) { + connections._clearSubscription(this); + } + } +} + +final class _SyncStreamSubscriptionHandle implements SyncStreamSubscription { + final _ActiveSubscription _source; + var _active = true; + + _SyncStreamSubscriptionHandle(this._source) { + _source.refcount++; + _finalizer.attach(this, _source, detach: this); + } + + @override + String get name => _source.name; + + @override + Map? get parameters => _source.parameters; + + @override + void unsubscribe() { + if (_active) { + _active = false; + _finalizer.detach(this); + _source.decrementRefCount(); + } + } + + @override + Future waitForFirstSync() async { + return _source.connections.firstStatusMatching((status) { + final currentProgress = status.forStream(this); + return currentProgress?.subscription.hasSynced ?? false; + }); + } + + static final Finalizer<_ActiveSubscription> _finalizer = Finalizer((sub) { + sub.connections.db.logger.warning( + 'A subscription to ${sub.name} (with parameters ${sub.parameters}) ' + 'leaked! Please ensure calling SyncStreamSubscription.unsubscribe() ' + "when you don't need a subscription anymore. For global " + 'subscriptions, consider storing them in global fields to avoid this ' + 'warning.'); + }); +} diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart index f0146e8e..3479e281 100644 --- a/packages/powersync_core/lib/src/sync/instruction.dart +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -1,3 +1,4 @@ +import 'stream.dart'; import 'sync_status.dart'; /// An internal instruction emitted by the sync client in the core extension in @@ -13,7 +14,8 @@ sealed class Instruction { EstablishSyncStream.fromJson(establish as Map), {'FetchCredentials': final creds} => FetchCredentials.fromJson(creds as Map), - {'CloseSyncStream': _} => const CloseSyncStream(), + {'CloseSyncStream': final closeOptions as Map} => + CloseSyncStream(closeOptions['hide_disconnect'] as bool), {'FlushFileSystem': _} => const FlushFileSystem(), {'DidCompleteSync': _} => const DidCompleteSync(), _ => UnknownSyncInstruction(json) @@ -62,12 +64,14 @@ final class CoreSyncStatus { final bool connecting; final List priorityStatus; final DownloadProgress? downloading; + final List? streams; CoreSyncStatus({ required this.connected, required this.connecting, required this.priorityStatus, required this.downloading, + required this.streams, }); factory CoreSyncStatus.fromJson(Map json) { @@ -82,12 +86,16 @@ final class CoreSyncStatus { null => null, final raw as Map => DownloadProgress.fromJson(raw), }, + streams: (json['streams'] as List) + .map((e) => + CoreActiveStreamSubscription.fromJson(e as Map)) + .toList(), ); } static SyncPriorityStatus _priorityStatusFromJson(Map json) { return ( - priority: BucketPriority(json['priority'] as int), + priority: StreamPriority(json['priority'] as int), hasSynced: json['has_synced'] as bool?, lastSyncedAt: switch (json['last_synced_at']) { null => null, @@ -116,7 +124,7 @@ final class DownloadProgress { static BucketProgress _bucketProgressFromJson(Map json) { return ( - priority: BucketPriority(json['priority'] as int), + priority: StreamPriority(json['priority'] as int), atLast: json['at_last'] as int, sinceLast: json['since_last'] as int, targetCount: json['target_count'] as int, @@ -135,7 +143,9 @@ final class FetchCredentials implements Instruction { } final class CloseSyncStream implements Instruction { - const CloseSyncStream(); + final bool hideDisconnect; + + const CloseSyncStream(this.hideDisconnect); } final class FlushFileSystem implements Instruction { diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 23e3becb..273cd597 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'instruction.dart'; +import 'stream.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; @@ -15,6 +16,7 @@ final class MutableSyncStatus { InternalSyncDownloadProgress? downloadProgress; List priorityStatusEntries = const []; + List? streams; DateTime? lastSyncedAt; @@ -51,9 +53,9 @@ final class MutableSyncStatus { hasSynced: true, lastSyncedAt: now, priority: maxBy( - applied.checksums.map((cs) => BucketPriority(cs.priority)), + applied.checksums.map((cs) => StreamPriority(cs.priority)), (priority) => priority, - compare: BucketPriority.comparator, + compare: StreamPriority.comparator, )!, ) ]; @@ -90,11 +92,12 @@ final class MutableSyncStatus { final downloading => InternalSyncDownloadProgress(downloading.buckets), }; lastSyncedAt = status.priorityStatus - .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) + .firstWhereOrNull((s) => s.priority == StreamPriority.fullSyncPriority) ?.lastSyncedAt; + streams = status.streams; } - SyncStatus immutableSnapshot() { + SyncStatus immutableSnapshot({bool setLastSynced = false}) { return SyncStatus( connected: connected, connecting: connecting, @@ -103,9 +106,10 @@ final class MutableSyncStatus { downloadProgress: downloadProgress?.asSyncDownloadProgress, priorityStatusEntries: UnmodifiableListView(priorityStatusEntries), lastSyncedAt: lastSyncedAt, - hasSynced: null, // Stream client is not supposed to set this value. + hasSynced: setLastSynced ? lastSyncedAt != null : null, uploadError: uploadError, downloadError: downloadError, + streamSubscriptions: streams, ); } } diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index 6ae94b25..ee8b3c63 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -27,11 +27,18 @@ final class SyncOptions { /// The [SyncClientImplementation] to use. final SyncClientImplementation syncImplementation; + /// Whether streams that have been defined with `auto_subscribe: true` should + /// be synced when they don't have an explicit subscription. + /// + /// This is enabled by default. + final bool? includeDefaultStreams; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, this.syncImplementation = SyncClientImplementation.defaultClient, + this.includeDefaultStreams, }); SyncOptions _copyWith({ @@ -44,6 +51,7 @@ final class SyncOptions { retryDelay: retryDelay, params: params ?? this.params, syncImplementation: syncImplementation, + includeDefaultStreams: includeDefaultStreams, ); } } @@ -96,16 +104,23 @@ extension type ResolvedSyncOptions(SyncOptions source) { Map get params => source.params ?? const {}; + bool get includeDefaultStreams => source.includeDefaultStreams ?? true; + (ResolvedSyncOptions, bool) applyFrom(SyncOptions other) { final newOptions = SyncOptions( crudThrottleTime: other.crudThrottleTime ?? crudThrottleTime, retryDelay: other.retryDelay ?? retryDelay, params: other.params ?? params, + syncImplementation: other.syncImplementation, + includeDefaultStreams: + other.includeDefaultStreams ?? includeDefaultStreams, ); final didChange = !_mapEquality.equals(newOptions.params, params) || newOptions.crudThrottleTime != crudThrottleTime || - newOptions.retryDelay != retryDelay; + newOptions.retryDelay != retryDelay || + newOptions.syncImplementation != source.syncImplementation || + newOptions.includeDefaultStreams != includeDefaultStreams; return (ResolvedSyncOptions(newOptions), didChange); } diff --git a/packages/powersync_core/lib/src/sync/stream.dart b/packages/powersync_core/lib/src/sync/stream.dart new file mode 100644 index 00000000..80c447be --- /dev/null +++ b/packages/powersync_core/lib/src/sync/stream.dart @@ -0,0 +1,176 @@ +import 'package:meta/meta.dart'; + +import 'sync_status.dart'; +import '../database/powersync_database.dart'; + +/// A description of a sync stream, consisting of its [name] and the +/// [parameters] used when subscribing. +abstract interface class SyncStreamDescription { + /// The name of the stream as it appears in the stream definition for the + /// PowerSync service. + String get name; + + /// The parameters used to subscribe to the stream, if any. + /// + /// The same stream can be subscribed to multiple times with different + /// parameters. + Map? get parameters; +} + +/// Information about a subscribed sync stream. +/// +/// This includes the [SyncStreamDescription] along with information about the +/// current sync status. +abstract interface class SyncSubscriptionDescription + extends SyncStreamDescription { + /// Whether this stream is active, meaning that the subscription has been + /// acknowledged by the sync serivce. + bool get active; + + /// Whether this stream subscription is included by default, regardless of + /// whether the stream has explicitly been subscribed to or not. + /// + /// It's possible for both [isDefault] and [hasExplicitSubscription] to be + /// true at the same time - this happens when a default stream was subscribed + /// explicitly. + bool get isDefault; + + /// Whether this stream has been subscribed to explicitly. + /// + /// It's possible for both [isDefault] and [hasExplicitSubscription] to be + /// true at the same time - this happens when a default stream was subscribed + /// explicitly. + bool get hasExplicitSubscription; + + /// For sync streams that have a time-to-live, the current time at which the + /// stream would expire if not subscribed to again. + DateTime? get expiresAt; + + /// Whether this stream subscription has been synced at least once. + bool get hasSynced; + + /// If [hasSynced] is true, the last time data from this stream has been + /// synced. + DateTime? get lastSyncedAt; +} + +/// A handle to a [SyncStreamDescription] that allows subscribing to the stream. +/// +/// To obtain an instance of [SyncStream], call [PowerSyncDatabase.syncStream]. +abstract interface class SyncStream extends SyncStreamDescription { + /// Adds a subscription to this stream, requesting it to be included when + /// connecting to the sync service. + /// + /// The [priority] can be used to override the priority of this stream. + Future subscribe({ + Duration? ttl, + StreamPriority? priority, + }); + + Future unsubscribeAll(); +} + +/// A [SyncStream] that has been subscribed to. +abstract interface class SyncStreamSubscription + implements SyncStreamDescription { + /// A variant of [PowerSyncDatabase.waitForFirstSync] that is specific to + /// this stream subscription. + Future waitForFirstSync(); + + /// Removes this subscription. + /// + /// Once all [SyncStreamSubscription]s for a [SyncStream] have been + /// unsubscribed, the `ttl` for that stream starts running. When it expires + /// without subscribing again, the stream will be evicted. + void unsubscribe(); +} + +/// An `ActiveStreamSubscription` as part of the sync status in Rust. +@internal +final class CoreActiveStreamSubscription + implements SyncSubscriptionDescription { + @override + final String name; + @override + final Map? parameters; + final StreamPriority priority; + final ({int total, int downloaded}) progress; + @override + final bool active; + @override + final bool isDefault; + @override + final bool hasExplicitSubscription; + @override + final DateTime? expiresAt; + @override + final DateTime? lastSyncedAt; + + @override + bool get hasSynced => lastSyncedAt != null; + + CoreActiveStreamSubscription._({ + required this.name, + required this.parameters, + required this.priority, + required this.progress, + required this.active, + required this.isDefault, + required this.hasExplicitSubscription, + required this.expiresAt, + required this.lastSyncedAt, + }); + + factory CoreActiveStreamSubscription.fromJson(Map json) { + return CoreActiveStreamSubscription._( + name: json['name'] as String, + parameters: json['parameters'] as Map?, + priority: switch (json['priority'] as int?) { + final prio? => StreamPriority(prio), + null => StreamPriority.fullSyncPriority, + }, + progress: _progressFromJson(json['progress'] as Map), + active: json['active'] as bool, + isDefault: json['is_default'] as bool, + hasExplicitSubscription: json['has_explicit_subscription'] as bool, + expiresAt: switch (json['expires_at']) { + null => null, + final timestamp as int => + DateTime.fromMillisecondsSinceEpoch(timestamp * 1000), + }, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final timestamp as int => + DateTime.fromMillisecondsSinceEpoch(timestamp * 1000), + }, + ); + } + + Map toJson() { + return { + 'name': name, + 'parameters': parameters, + 'priority': priority.priorityNumber, + 'progress': { + 'total': progress.total, + 'downloaded': progress.downloaded, + }, + 'active': active, + 'is_default': isDefault, + 'has_explicit_subscription': hasExplicitSubscription, + 'expires_at': switch (expiresAt) { + null => null, + final expiresAt => expiresAt.millisecondsSinceEpoch / 1000, + }, + 'last_synced_at': switch (lastSyncedAt) { + null => null, + final lastSyncedAt => lastSyncedAt.millisecondsSinceEpoch / 1000, + } + }; + } + + static ({int total, int downloaded}) _progressFromJson( + Map json) { + return (total: json['total'] as int, downloaded: json['downloaded'] as int); + } +} diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index ad0886a1..60deef12 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -21,6 +21,8 @@ import 'stream_utils.dart'; import 'sync_status.dart'; import 'protocol.dart'; +typedef SubscribedStream = ({String name, String parameters}); + abstract interface class StreamingSync { Stream get statusStream; @@ -28,6 +30,8 @@ abstract interface class StreamingSync { /// Close any active streams. Future abort(); + + void updateSubscriptions(List streams); } @internal @@ -36,6 +40,7 @@ class StreamingSyncImplementation implements StreamingSync { final BucketStorage adapter; final InternalConnector connector; final ResolvedSyncOptions options; + List _activeSubscriptions; final Logger logger; @@ -69,6 +74,7 @@ class StreamingSyncImplementation implements StreamingSync { required this.crudUpdateTriggerStream, required this.options, required http.Client client, + List activeSubscriptions = const [], Mutex? syncMutex, Mutex? crudMutex, Logger? logger, @@ -80,7 +86,8 @@ class StreamingSyncImplementation implements StreamingSync { syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"), crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"), _userAgentHeaders = userAgentHeaders(), - logger = logger ?? isolateLogger; + logger = logger ?? isolateLogger, + _activeSubscriptions = activeSubscriptions; Duration get _retryDelay => options.retryDelay; @@ -124,6 +131,14 @@ class StreamingSyncImplementation implements StreamingSync { return _abort?.aborted ?? false; } + @override + void updateSubscriptions(List streams) { + _activeSubscriptions = streams; + if (_nonLineSyncEvents.hasListener) { + _nonLineSyncEvents.add(HandleChangedSubscriptions(streams)); + } + } + @override Future streamingSync() async { try { @@ -294,7 +309,12 @@ class StreamingSyncImplementation implements StreamingSync { } Future _rustStreamingSyncIteration() async { - await _ActiveRustStreamingIteration(this).syncIteration(); + logger.info('Starting Rust sync iteration'); + final response = await _ActiveRustStreamingIteration(this).syncIteration(); + logger.info( + 'Ending Rust sync iteration. Immediate restart: ${response.immediateRestart}'); + // Note: With the current loop in streamingSync(), any return value that + // isn't an exception triggers an immediate restart. } Future<(List, Map)> @@ -370,7 +390,7 @@ class StreamingSyncImplementation implements StreamingSync { // checkpoint later. } else { _updateStatusForPriority(( - priority: BucketPriority(bucketPriority), + priority: StreamPriority(bucketPriority), lastSyncedAt: DateTime.now(), hasSynced: true, )); @@ -449,6 +469,7 @@ class StreamingSyncImplementation implements StreamingSync { _state.updateStatus((s) => s.setConnected()); await handleLine(line as StreamingSyncLine); case UploadCompleted(): + case HandleChangedSubscriptions(): // Only relevant for the Rust sync implementation. break; case AbortCurrentIteration(): @@ -507,7 +528,8 @@ class StreamingSyncImplementation implements StreamingSync { } Future _postStreamRequest( - Object? data, bool acceptBson) async { + Object? data, bool acceptBson, + {Future? onAbort}) async { const ndJson = 'application/x-ndjson'; const bson = 'application/vnd.powersync.bson-stream'; @@ -517,8 +539,8 @@ class StreamingSyncImplementation implements StreamingSync { } final uri = credentials.endpointUri('sync/stream'); - final request = - http.AbortableRequest('POST', uri, abortTrigger: _abort!.onAbort); + final request = http.AbortableRequest('POST', uri, + abortTrigger: onAbort ?? _abort!.onAbort); request.headers['Content-Type'] = 'application/json'; request.headers['Authorization'] = "Token ${credentials.token}"; request.headers['Accept'] = @@ -589,25 +611,35 @@ typedef BucketDescription = ({ final class _ActiveRustStreamingIteration { final StreamingSyncImplementation sync; + var _isActive = true; var _hadSyncLine = false; StreamSubscription? _completedUploads; - final Completer _completedStream = Completer(); + final Completer _completedStream = Completer(); _ActiveRustStreamingIteration(this.sync); - Future syncIteration() async { + List _encodeSubscriptions(List subscriptions) { + return sync._activeSubscriptions + .map((s) => + {'name': s.name, 'params': convert.json.decode(s.parameters)}) + .toList(); + } + + Future syncIteration() async { try { await _control( 'start', convert.json.encode({ 'parameters': sync.options.params, 'schema': convert.json.decode(sync.schemaJson), + 'include_defaults': sync.options.includeDefaultStreams, + 'active_streams': _encodeSubscriptions(sync._activeSubscriptions), }), ); assert(_completedStream.isCompleted, 'Should have started streaming'); - await _completedStream.future; + return await _completedStream.future; } finally { _isActive = false; _completedUploads?.cancel(); @@ -615,9 +647,10 @@ final class _ActiveRustStreamingIteration { } } - Stream _receiveLines(Object? data) { + Stream _receiveLines(Object? data, + {required Future onAbort}) { return streamFromFutureAwaitInCancellation( - sync._postStreamRequest(data, true)) + sync._postStreamRequest(data, true, onAbort: onAbort)) .asyncExpand((response) { if (response == null) { return null; @@ -630,31 +663,72 @@ final class _ActiveRustStreamingIteration { }).map(ReceivedLine.new); } - Future _handleLines(EstablishSyncStream request) async { + Future _handleLines( + EstablishSyncStream request) async { + // This is a workaround for https://github.com/dart-lang/http/issues/1820: + // When cancelling the stream subscription of an HTTP response with the + // fetch-based client implementation, cancelling the subscription is delayed + // until the next chunk (typically a token_expires_in message in our case). + // So, before cancelling, we complete an abort controller for the request to + // speed things up. This is not an issue in most cases because the abort + // controller on this stream would be completed when disconnecting. But + // when switching sync streams, that's not the case and we need a second + // abort controller for the inner iteration. + final innerAbort = Completer.sync(); final events = addBroadcast( - _receiveLines(request.request), sync._nonLineSyncEvents.stream); - + _receiveLines( + request.request, + onAbort: Future.any([ + sync._abort!.onAbort, + innerAbort.future, + ]), + ), + sync._nonLineSyncEvents.stream, + ); + + var needsImmediateRestart = false; loop: - await for (final event in events) { - if (!_isActive || sync.aborted) { - break; - } + try { + await for (final event in events) { + if (!_isActive || sync.aborted) { + innerAbort.complete(); + break; + } - switch (event) { - case ReceivedLine(line: final Uint8List line): - _triggerCrudUploadOnFirstLine(); - await _control('line_binary', line); - case ReceivedLine(line: final line as String): - _triggerCrudUploadOnFirstLine(); - await _control('line_text', line); - case UploadCompleted(): - await _control('completed_upload'); - case AbortCurrentIteration(): - break loop; - case TokenRefreshComplete(): - await _control('refreshed_token'); + switch (event) { + case ReceivedLine(line: final Uint8List line): + _triggerCrudUploadOnFirstLine(); + await _control('line_binary', line); + case ReceivedLine(line: final line as String): + _triggerCrudUploadOnFirstLine(); + await _control('line_text', line); + case UploadCompleted(): + await _control('completed_upload'); + case AbortCurrentIteration(:final hideDisconnectState): + innerAbort.complete(); + needsImmediateRestart = hideDisconnectState; + break loop; + case TokenRefreshComplete(): + await _control('refreshed_token'); + case HandleChangedSubscriptions(:final currentSubscriptions): + await _control( + 'update_subscriptions', + convert.json + .encode(_encodeSubscriptions(currentSubscriptions))); + } + } + } on http.RequestAbortedException { + // Unlike a regular cancellation, cancelling via the abort controller + // emits an error. We did mean to just cancel the stream, so we can + // safely ignore that. + if (innerAbort.isCompleted) { + // ignore + } else { + rethrow; } } + + return (immediateRestart: needsImmediateRestart); } /// Triggers a local CRUD upload when the first sync line has been received. @@ -708,10 +782,11 @@ final class _ActiveRustStreamingIteration { sync.logger.warning('Could not prefetch credentials', e, s); }); } - case CloseSyncStream(): + case CloseSyncStream(:final hideDisconnect): if (!sync.aborted) { _isActive = false; - sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + sync._nonLineSyncEvents + .add(AbortCurrentIteration(hideDisconnectState: hideDisconnect)); } case FlushFileSystem(): await sync.adapter.flushFileSystem(); @@ -723,6 +798,8 @@ final class _ActiveRustStreamingIteration { } } +typedef RustSyncIterationResult = ({bool immediateRestart}); + sealed class SyncEvent {} final class ReceivedLine implements SyncEvent { @@ -740,5 +817,18 @@ final class TokenRefreshComplete implements SyncEvent { } final class AbortCurrentIteration implements SyncEvent { - const AbortCurrentIteration(); + /// Whether we should immediately disconnect and hide the `disconnected` + /// state. + /// + /// This is used when we're changing subscription, to hide the brief downtime + /// we have while reconnecting. + final bool hideDisconnectState; + + const AbortCurrentIteration({this.hideDisconnectState = false}); +} + +final class HandleChangedSubscriptions implements SyncEvent { + final List currentSubscriptions; + + HandleChangedSubscriptions(this.currentSubscriptions); } diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index 62c48df1..61ae7c5f 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -5,6 +5,7 @@ import 'package:meta/meta.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; +import 'stream.dart'; final class SyncStatus { /// true if currently connected. @@ -54,6 +55,9 @@ final class SyncStatus { final List priorityStatusEntries; + final List? _internalSubscriptions; + + @internal const SyncStatus({ this.connected = false, this.connecting = false, @@ -65,7 +69,8 @@ final class SyncStatus { this.downloadError, this.uploadError, this.priorityStatusEntries = const [], - }); + List? streamSubscriptions, + }) : _internalSubscriptions = streamSubscriptions; @override bool operator ==(Object other) { @@ -78,8 +83,10 @@ final class SyncStatus { other.uploadError == uploadError && other.lastSyncedAt == lastSyncedAt && other.hasSynced == hasSynced && - _statusEquality.equals( + _listEquality.equals( other.priorityStatusEntries, priorityStatusEntries) && + _listEquality.equals( + other._internalSubscriptions, _internalSubscriptions) && other.downloadProgress == downloadProgress); } @@ -110,6 +117,16 @@ final class SyncStatus { ); } + /// All sync streams currently being tracked in the database. + /// + /// This returns null when the database is currently being opened and we + /// don't have reliable information about all included streams yet. + Iterable? get syncStreams { + return _internalSubscriptions?.map((subscription) { + return SyncStreamStatus._(subscription, downloadProgress); + }); + } + /// Get the current [downloadError] or [uploadError]. Object? get anyError { return downloadError ?? uploadError; @@ -128,9 +145,9 @@ final class SyncStatus { /// information extracted from the lower priority `2` since each partial sync /// in priority `2` necessarily includes a consistent view over data in /// priority `1`. - SyncPriorityStatus statusForPriority(BucketPriority priority) { + SyncPriorityStatus statusForPriority(StreamPriority priority) { assert(priorityStatusEntries.isSortedByCompare( - (e) => e.priority, BucketPriority.comparator)); + (e) => e.priority, StreamPriority.comparator)); for (final known in priorityStatusEntries) { // Lower-priority buckets are synchronized after higher-priority buckets, @@ -149,6 +166,21 @@ final class SyncStatus { ); } + /// If the [stream] appears in [syncStreams], returns the current status for + /// that stream. + SyncStreamStatus? forStream(SyncStreamDescription stream) { + final raw = _internalSubscriptions?.firstWhereOrNull( + (e) => + e.name == stream.name && + _mapEquality.equals(e.parameters, stream.parameters), + ); + + if (raw == null) { + return null; + } + return SyncStreamStatus._(raw, downloadProgress); + } + @override int get hashCode { return Object.hash( @@ -159,8 +191,9 @@ final class SyncStatus { uploadError, downloadError, lastSyncedAt, - _statusEquality.hash(priorityStatusEntries), + _listEquality.hash(priorityStatusEntries), downloadProgress, + _listEquality.hash(_internalSubscriptions), ); } @@ -169,37 +202,66 @@ final class SyncStatus { return "SyncStatus"; } - // This should be a ListEquality, but that appears to - // cause weird type errors with DDC (but only after hot reloads?!) - static const _statusEquality = ListEquality(); + static const _listEquality = ListEquality(); + static const _mapEquality = MapEquality(); +} + +@internal +extension InternalSyncStatusAccess on SyncStatus { + List? get internalSubscriptions => + _internalSubscriptions; +} + +/// Current information about a [SyncStream] that the sync client is subscribed +/// to. +final class SyncStreamStatus { + /// If the [SyncStatus] is currently [SyncStatus.downloading], download + /// progress for this stream. + final ProgressWithOperations? progress; + final CoreActiveStreamSubscription _internal; + + /// The [SyncSubscriptionDescription] providing information about the current + /// stream state. + SyncSubscriptionDescription get subscription => _internal; + + /// The [StreamPriority] of the current stream. + /// + /// New data on higher-priority streams can interrupt lower-priority streams. + StreamPriority get priority => _internal.priority; + + SyncStreamStatus._(this._internal, SyncDownloadProgress? progress) + : progress = progress?._internal._forStream(_internal); } -/// The priority of a PowerSync bucket. -extension type const BucketPriority._(int priorityNumber) { +@Deprecated('Use StreamPriority instead') +typedef BucketPriority = StreamPriority; + +/// The priority of a PowerSync stream. +extension type const StreamPriority._(int priorityNumber) { static const _highest = 0; - factory BucketPriority(int i) { + factory StreamPriority(int i) { assert(i >= _highest); - return BucketPriority._(i); + return StreamPriority._(i); } - bool operator >(BucketPriority other) => comparator(this, other) > 0; - bool operator >=(BucketPriority other) => comparator(this, other) >= 0; - bool operator <(BucketPriority other) => comparator(this, other) < 0; - bool operator <=(BucketPriority other) => comparator(this, other) <= 0; + bool operator >(StreamPriority other) => comparator(this, other) > 0; + bool operator >=(StreamPriority other) => comparator(this, other) >= 0; + bool operator <(StreamPriority other) => comparator(this, other) < 0; + bool operator <=(StreamPriority other) => comparator(this, other) <= 0; - /// A [Comparator] instance suitable for comparing [BucketPriority] values. - static int comparator(BucketPriority a, BucketPriority b) => + /// A [Comparator] instance suitable for comparing [StreamPriority] values. + static int comparator(StreamPriority a, StreamPriority b) => -a.priorityNumber.compareTo(b.priorityNumber); /// The priority used by PowerSync to indicate that a full sync was completed. - static const fullSyncPriority = BucketPriority._(2147483647); + static const fullSyncPriority = StreamPriority._(2147483647); } /// Partial information about the synchronization status for buckets within a /// priority. typedef SyncPriorityStatus = ({ - BucketPriority priority, + StreamPriority priority, DateTime? lastSyncedAt, bool? hasSynced, }); @@ -227,7 +289,7 @@ class UploadQueueStats { /// Per-bucket download progress information. @internal typedef BucketProgress = ({ - BucketPriority priority, + StreamPriority priority, int atLast, int sinceLast, int targetCount, @@ -253,7 +315,7 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { final sinceLast = savedProgress?.sinceLast ?? 0; buckets[bucket.bucket] = ( - priority: BucketPriority._(bucket.priority), + priority: StreamPriority._(bucket.priority), atLast: atLast, sinceLast: sinceLast, targetCount: bucket.count ?? 0, @@ -268,7 +330,7 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { return InternalSyncDownloadProgress({ for (final bucket in target.checksums) bucket.bucket: ( - priority: BucketPriority(bucket.priority), + priority: StreamPriority(bucket.priority), atLast: 0, sinceLast: 0, targetCount: knownCount, @@ -287,17 +349,16 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { /// Sums the total target and completed operations for all buckets up until /// the given [priority] (inclusive). - ProgressWithOperations untilPriority(BucketPriority priority) { - final (total, downloaded) = - buckets.values.where((e) => e.priority >= priority).fold( - (0, 0), - (prev, entry) { - final downloaded = entry.sinceLast; - final total = entry.targetCount - entry.atLast; - return (prev.$1 + total, prev.$2 + downloaded); - }, - ); + ProgressWithOperations untilPriority(StreamPriority priority) { + final (total, downloaded) = buckets.values + .where((e) => e.priority >= priority) + .fold((0, 0), _addProgress); + + return ProgressWithOperations._(total, downloaded); + } + ProgressWithOperations _forStream(CoreActiveStreamSubscription subscription) { + final (:total, :downloaded) = subscription.progress; return ProgressWithOperations._(total, downloaded); } @@ -340,6 +401,12 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations { } static const _mapEquality = MapEquality(); + + (int, int) _addProgress((int, int) prev, BucketProgress entry) { + final downloaded = entry.sinceLast; + final total = entry.targetCount - entry.atLast; + return (prev.$1 + total, prev.$2 + downloaded); + } } /// Information about a progressing download. @@ -403,7 +470,7 @@ extension type SyncDownloadProgress._(InternalSyncDownloadProgress _internal) /// The returned [ProgressWithOperations] tracks the target amount of /// operations that need to be downloaded in total and how many of them have /// already been received. - ProgressWithOperations untilPriority(BucketPriority priority) { + ProgressWithOperations untilPriority(StreamPriority priority) { return _internal.untilPriority(priority); } } diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index 7f05cff3..b3f0ef18 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -15,6 +15,7 @@ class SyncWorkerHandle implements StreamingSync { final PowerSyncBackendConnector connector; final SyncOptions options; late final WorkerCommunicationChannel _channel; + List subscriptions; final StreamController _status = StreamController.broadcast(); @@ -24,6 +25,7 @@ class SyncWorkerHandle implements StreamingSync { required this.options, required MessagePort sendToWorker, required SharedWorker worker, + required this.subscriptions, }) { _channel = WorkerCommunicationChannel( port: sendToWorker, @@ -81,6 +83,7 @@ class SyncWorkerHandle implements StreamingSync { required PowerSyncBackendConnector connector, required Uri workerUri, required SyncOptions options, + required List subscriptions, }) async { final worker = SharedWorker(workerUri.toString().toJS); final handle = SyncWorkerHandle._( @@ -89,6 +92,7 @@ class SyncWorkerHandle implements StreamingSync { connector: connector, sendToWorker: worker.port, worker: worker, + subscriptions: subscriptions, ); // Make sure that the worker is working, or throw immediately. @@ -116,6 +120,13 @@ class SyncWorkerHandle implements StreamingSync { database.database.openFactory.path, ResolvedSyncOptions(options), database.schema, + subscriptions, ); } + + @override + void updateSubscriptions(List streams) { + subscriptions = streams; + _channel.updateSubscriptions(streams); + } } diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index ddc4eaf0..1c92808f 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -8,6 +8,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'package:async/async.dart'; +import 'package:collection/collection.dart'; import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; @@ -45,8 +46,12 @@ class _SyncWorker { }); } - _SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options, - String schemaJson, _ConnectedClient client) { + _SyncRunner referenceSyncTask( + String databaseIdentifier, + SyncOptions options, + String schemaJson, + List subscriptions, + _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { return _SyncRunner(databaseIdentifier); }) @@ -54,6 +59,7 @@ class _SyncWorker { client, options, schemaJson, + subscriptions, ); } } @@ -90,13 +96,22 @@ class _ConnectedClient { }, ); - _runner = _worker.referenceSyncTask(request.databaseName, - recoveredOptions, request.schemaJson, this); + _runner = _worker.referenceSyncTask( + request.databaseName, + recoveredOptions, + request.schemaJson, + request.subscriptions?.toDart ?? const [], + this, + ); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.disconnectClient(this); _runner = null; return (JSObject(), null); + case SyncWorkerMessageType.updateSubscriptions: + _runner?.updateClientSubscriptions( + this, (payload as UpdateSubscriptions).toDart); + return (JSObject(), null); default: throw StateError('Unexpected message type $type'); } @@ -137,9 +152,10 @@ class _SyncRunner { final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); - StreamingSync? sync; + StreamingSyncImplementation? sync; _ConnectedClient? databaseHost; - final connections = <_ConnectedClient>[]; + final connections = <_ConnectedClient, List>{}; + List currentStreams = []; _SyncRunner(this.identifier) { _group.add(_mainEvents.stream); @@ -152,8 +168,9 @@ class _SyncRunner { :final client, :final options, :final schemaJson, + :final subscriptions, ): - connections.add(client); + connections[client] = subscriptions; final (newOptions, reconnect) = this.options.applyFrom(options); this.options = newOptions; this.schemaJson = schemaJson; @@ -165,6 +182,8 @@ class _SyncRunner { sync?.abort(); sync = null; await _requestDatabase(client); + } else { + reindexSubscriptions(); } case _RemoveConnection(:final client): connections.remove(client); @@ -191,6 +210,12 @@ class _SyncRunner { } else { await _requestDatabase(newHost); } + case _ClientSubscriptionsChanged( + :final client, + :final subscriptions + ): + connections[client] = subscriptions; + reindexSubscriptions(); } } catch (e, s) { _logger.warning('Error handling $event', e, s); @@ -199,12 +224,24 @@ class _SyncRunner { }); } + /// Updates [currentStreams] to the union of values in [connections]. + void reindexSubscriptions() { + final before = currentStreams.toSet(); + final after = connections.values.flattenedToSet; + if (!const SetEquality().equals(before, after)) { + _logger.info( + 'Subscriptions across tabs have changed, checking whether a reconnect is necessary'); + currentStreams = after.toList(); + sync?.updateSubscriptions(currentStreams); + } + } + /// Pings all current [connections], removing those that don't answer in 5s /// (as they are likely closed tabs as well). /// /// Returns the first client that responds (without waiting for others). Future<_ConnectedClient?> _collectActiveClients() async { - final candidates = connections.toList(); + final candidates = connections.keys.toList(); if (candidates.isEmpty) { return null; } @@ -269,6 +306,7 @@ class _SyncRunner { ); } + currentStreams = connections.values.flattenedToSet.toList(); sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), schemaJson: client._runner!.schemaJson, @@ -283,10 +321,12 @@ class _SyncRunner { options: options, client: BrowserClient(), identifier: identifier, + activeSubscriptions: currentStreams, + logger: _logger, ); sync!.statusStream.listen((event) { _logger.fine('Broadcasting sync event: $event'); - for (final client in connections) { + for (final client in connections.keys) { client.channel.notify(SyncWorkerMessageType.notifySyncStatus, SerializedSyncStatus.from(event)); } @@ -294,9 +334,9 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient( - _ConnectedClient client, SyncOptions options, String schemaJson) { - _mainEvents.add(_AddConnection(client, options, schemaJson)); + void registerClient(_ConnectedClient client, SyncOptions options, + String schemaJson, List subscriptions) { + _mainEvents.add(_AddConnection(client, options, schemaJson, subscriptions)); } /// Remove a client, disconnecting if no clients remain.. @@ -308,6 +348,11 @@ class _SyncRunner { void disconnectClient(_ConnectedClient client) { _mainEvents.add(_DisconnectClient(client)); } + + void updateClientSubscriptions( + _ConnectedClient client, List subscriptions) { + _mainEvents.add(_ClientSubscriptionsChanged(client, subscriptions)); + } } sealed class _RunnerEvent {} @@ -316,8 +361,10 @@ final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; final SyncOptions options; final String schemaJson; + final List subscriptions; - _AddConnection(this.client, this.options, this.schemaJson); + _AddConnection( + this.client, this.options, this.schemaJson, this.subscriptions); } final class _RemoveConnection implements _RunnerEvent { @@ -332,6 +379,13 @@ final class _DisconnectClient implements _RunnerEvent { _DisconnectClient(this.client); } +final class _ClientSubscriptionsChanged implements _RunnerEvent { + final _ConnectedClient client; + final List subscriptions; + + _ClientSubscriptionsChanged(this.client, this.subscriptions); +} + final class _ActiveDatabaseClosed implements _RunnerEvent { const _ActiveDatabaseClosed(); } diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 3c64d90f..950cd1d6 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -5,10 +5,12 @@ import 'dart:js_interop'; import 'package:logging/logging.dart'; import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/sync/options.dart'; +import 'package:powersync_core/src/sync/stream.dart'; import 'package:web/web.dart'; import '../connector.dart'; import '../log.dart'; +import '../sync/streaming_sync.dart'; import '../sync/sync_status.dart'; /// Names used in [SyncWorkerMessage] @@ -20,6 +22,9 @@ enum SyncWorkerMessageType { /// If parameters change, the sync worker reconnects. startSynchronization, + /// Update the active subscriptions that this client is interested in. + updateSubscriptions, + /// The [SyncWorkerMessage.payload] for the request is a numeric id, the /// response can be anything (void). /// This disconnects immediately, even if other clients are still open. @@ -74,6 +79,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String implementationName, required String schemaJson, String? syncParamsEncoded, + UpdateSubscriptions? subscriptions, }); external String get databaseName; @@ -83,6 +89,36 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external String? get implementationName; external String get schemaJson; external String? get syncParamsEncoded; + external UpdateSubscriptions? get subscriptions; +} + +@anonymous +extension type UpdateSubscriptions._raw(JSObject _inner) implements JSObject { + external factory UpdateSubscriptions._({ + required int requestId, + required JSArray content, + }); + + factory UpdateSubscriptions(int requestId, List streams) { + return UpdateSubscriptions._( + requestId: requestId, + content: streams + .map((e) => [e.name.toJS, e.parameters.toJS].toJS) + .toList() + .toJS, + ); + } + + external int get requestId; + external JSArray get content; + + List get toDart { + return content.toDart.map((e) { + final [name, parameters] = (e as JSArray).toDart; + + return (name: name.toDart, parameters: parameters.toDart); + }).toList(); + } } @anonymous @@ -190,7 +226,7 @@ extension type SerializedBucketProgress._(JSObject _) implements JSObject { return { for (final entry in array.toDart) entry.name: ( - priority: BucketPriority(entry.priority), + priority: StreamPriority(entry.priority), atLast: entry.atLast, sinceLast: entry.sinceLast, targetCount: entry.targetCount, @@ -212,6 +248,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { required String? downloadError, required JSArray? priorityStatusEntries, required JSArray? syncProgress, + required JSString streamSubscriptions, }); factory SerializedSyncStatus.from(SyncStatus status) { @@ -237,6 +274,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { var other => SerializedBucketProgress.serialize( InternalSyncDownloadProgress.ofPublic(other).buckets), }, + streamSubscriptions: json.encode(status.internalSubscriptions).toJS, ); } @@ -250,8 +288,11 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { external String? downloadError; external JSArray? priorityStatusEntries; external JSArray? syncProgress; + external JSString? streamSubscriptions; SyncStatus asSyncStatus() { + final streamSubscriptions = this.streamSubscriptions?.toDart; + return SyncStatus( connected: connected, connecting: connecting, @@ -271,7 +312,7 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { final syncedMillis = (rawSynced as JSNumber?)?.toDartInt; return ( - priority: BucketPriority((rawPriority as JSNumber).toDartInt), + priority: StreamPriority((rawPriority as JSNumber).toDartInt), lastSyncedAt: syncedMillis != null ? DateTime.fromMicrosecondsSinceEpoch(syncedMillis) : null, @@ -285,6 +326,13 @@ extension type SerializedSyncStatus._(JSObject _) implements JSObject { SerializedBucketProgress.deserialize(serializedProgress)) .asSyncDownloadProgress, }, + streamSubscriptions: switch (streamSubscriptions) { + null => null, + final serialized => (json.decode(serialized) as List) + .map((e) => CoreActiveStreamSubscription.fromJson( + e as Map)) + .toList(), + }, ); } } @@ -339,6 +387,8 @@ final class WorkerCommunicationChannel { return; case SyncWorkerMessageType.startSynchronization: requestId = (message.payload as StartSynchronization).requestId; + case SyncWorkerMessageType.updateSubscriptions: + requestId = (message.payload as UpdateSubscriptions).requestId; case SyncWorkerMessageType.requestEndpoint: case SyncWorkerMessageType.abortSynchronization: case SyncWorkerMessageType.credentialsCallback: @@ -413,7 +463,11 @@ final class WorkerCommunicationChannel { } Future startSynchronization( - String databaseName, ResolvedSyncOptions options, Schema schema) async { + String databaseName, + ResolvedSyncOptions options, + Schema schema, + List streams, + ) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, @@ -428,11 +482,22 @@ final class WorkerCommunicationChannel { null => null, final params => jsonEncode(params), }, + subscriptions: UpdateSubscriptions(-1, streams), ), )); await completion; } + Future updateSubscriptions(List streams) async { + final (id, completion) = _newRequest(); + port.postMessage(SyncWorkerMessage( + type: SyncWorkerMessageType.updateSubscriptions.name, + payload: UpdateSubscriptions(id, streams), + )); + + await completion; + } + Future abortSynchronization() async { await _numericRequest(SyncWorkerMessageType.abortSynchronization); } diff --git a/packages/powersync_core/pubspec.yaml b/packages/powersync_core/pubspec.yaml index fa3dd02a..929b2d5e 100644 --- a/packages/powersync_core/pubspec.yaml +++ b/packages/powersync_core/pubspec.yaml @@ -19,7 +19,7 @@ dependencies: uuid: ^4.2.0 async: ^2.10.0 logging: ^1.1.1 - collection: ^1.17.0 + collection: ^1.19.0 web: ^1.0.0 # Only used internally to download WASM / worker files. diff --git a/packages/powersync_core/test/bucket_storage_test.dart b/packages/powersync_core/test/sync/bucket_storage_test.dart similarity index 99% rename from packages/powersync_core/test/bucket_storage_test.dart rename to packages/powersync_core/test/sync/bucket_storage_test.dart index 94338791..496e5a49 100644 --- a/packages/powersync_core/test/bucket_storage_test.dart +++ b/packages/powersync_core/test/sync/bucket_storage_test.dart @@ -4,8 +4,9 @@ import 'package:powersync_core/src/sync/protocol.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/test_utils_impl.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; final testUtils = TestUtils(); @@ -39,11 +40,6 @@ const removeAsset1_4 = OplogEntry( const removeAsset1_5 = OplogEntry( opId: '5', op: OpType.remove, rowType: 'assets', rowId: 'O1', checksum: 5); -BucketChecksum checksum( - {required String bucket, required int checksum, int priority = 1}) { - return BucketChecksum(bucket: bucket, priority: priority, checksum: checksum); -} - SyncDataBatch syncDataBatch(List data) { return SyncDataBatch(data); } diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/sync/in_memory_sync_test.dart similarity index 88% rename from packages/powersync_core/test/in_memory_sync_test.dart rename to packages/powersync_core/test/sync/in_memory_sync_test.dart index e4ab531b..87c18d82 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/sync/in_memory_sync_test.dart @@ -5,17 +5,16 @@ import 'package:async/async.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/sqlite3_common.dart'; -import 'package:powersync_core/src/sync/streaming_sync.dart'; import 'package:powersync_core/src/sync/protocol.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf_router/shelf_router.dart'; import 'package:test/test.dart'; -import 'bucket_storage_test.dart'; -import 'server/sync_server/in_memory_sync_server.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/in_memory_http.dart'; -import 'utils/test_utils_impl.dart'; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; void main() { _declareTests( @@ -55,35 +54,32 @@ void _declareTests(String name, SyncOptions options, bool bson) { late TestPowerSyncFactory factory; late CommonDatabase raw; - late PowerSyncDatabase database; + late TestDatabase database; late MockSyncService syncService; late Logger logger; - late StreamingSync syncClient; var credentialsCallbackCount = 0; Future Function(PowerSyncDatabase) uploadData = (db) async {}; - void createSyncClient({Schema? schema}) { + Future connect() async { final (client, server) = inMemoryServer(); server.mount((req) => syncService.router(req)); - final thisSyncClient = syncClient = database.connectWithMockService( - client, - TestConnector(() async { - credentialsCallbackCount++; - return PowerSyncCredentials( - endpoint: server.url.toString(), - token: 'token$credentialsCallbackCount', - expiresAt: DateTime.now(), - ); - }, uploadData: (db) => uploadData(db)), + database.httpClient = client; + await database.connect( + connector: TestConnector( + () async { + credentialsCallbackCount++; + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token$credentialsCallbackCount', + expiresAt: DateTime.now(), + ); + }, + uploadData: (db) => uploadData(db), + ), options: options, - customSchema: schema, ); - - addTearDown(() async { - await thisSyncClient.abort(); - }); } setUp(() async { @@ -94,7 +90,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { factory = await testUtils.testFactory(); (raw, database) = await factory.openInMemoryDatabase(); await database.initialize(); - createSyncClient(); }); tearDown(() async { @@ -111,7 +106,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { } }); } - syncClient.streamingSync(); + await connect(); await syncService.waitForListener; expect(database.currentStatus.lastSyncedAt, isNull); @@ -146,7 +141,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); await expectLater( status, emits(isSyncStatus(downloading: false, hasSynced: true))); - await syncClient.abort(); + await database.disconnect(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -157,7 +152,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { // A complete sync also means that all partial syncs have completed expect( independentDb.currentStatus - .statusForPriority(BucketPriority(3)) + .statusForPriority(StreamPriority(3)) .hasSynced, isTrue); }); @@ -251,7 +246,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { database.watch('SELECT * FROM lists', throttle: Duration.zero)); await expectLater(query, emits(isEmpty)); - createSyncClient(schema: schema); + await database.updateSchema(schema); await waitForConnection(); syncService @@ -376,13 +371,13 @@ void _declareTests(String name, SyncOptions options, bool bson) { status, emitsThrough( isSyncStatus(downloading: true, hasSynced: false).having( - (e) => e.statusForPriority(BucketPriority(0)).hasSynced, + (e) => e.statusForPriority(StreamPriority(0)).hasSynced, 'status for $prio', isTrue, )), ); - await database.waitForFirstSync(priority: BucketPriority(prio)); + await database.waitForFirstSync(priority: StreamPriority(prio)); expect(await database.getAll('SELECT * FROM customers'), hasLength(prio + 1)); } @@ -419,9 +414,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { 'priority': 1, } }); - await database.waitForFirstSync(priority: BucketPriority(1)); + await database.waitForFirstSync(priority: StreamPriority(1)); expect(database.currentStatus.hasSynced, isFalse); - await syncClient.abort(); + await database.disconnect(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -430,12 +425,12 @@ void _declareTests(String name, SyncOptions options, bool bson) { // Completing a sync for prio 1 implies a completed sync for prio 0 expect( independentDb.currentStatus - .statusForPriority(BucketPriority(0)) + .statusForPriority(StreamPriority(0)) .hasSynced, isTrue); expect( independentDb.currentStatus - .statusForPriority(BucketPriority(3)) + .statusForPriority(StreamPriority(3)) .hasSynced, isFalse); }); @@ -623,7 +618,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { Future expectProgress( StreamQueue status, { required Object total, - Map priorities = const {}, + Map priorities = const {}, }) async { await expectLater( status, @@ -683,10 +678,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress(status, total: progress(5, 10)); // Emulate the app closing - create a new independent sync client. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); status = await waitForConnection(); // Send same checkpoint again @@ -717,10 +711,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress(status, total: progress(5, 10)); // Emulate the app closing - create a new independent sync client. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); status = await waitForConnection(); // Send checkpoint with additional data @@ -751,9 +744,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { // A sync rule deploy could reset buckets, making the new bucket smaller // than the existing one. - await syncClient.abort(); + await database.disconnect(); syncService.endCurrentListener(); - createSyncClient(); + status = await waitForConnection(); syncService.addLine({ 'checkpoint': Checkpoint( @@ -772,8 +765,8 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectProgress( status, priorities: { - BucketPriority(0): prio0, - BucketPriority(2): prio2, + StreamPriority(0): prio0, + StreamPriority(2): prio2, }, total: prio2, ); @@ -837,7 +830,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); await expectLater(status, emits(isSyncStatus(downloading: true))); - await syncClient.abort(); + await database.disconnect(); expect(syncService.controller.hasListener, isFalse); }); @@ -856,9 +849,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { syncService.addLine({ 'checkpoint_complete': {'last_op_id': '10'} }); - - await pumpEventQueue(); - expect(syncService.controller.hasListener, isFalse); syncService.endCurrentListener(); // Should reconnect after delay. @@ -878,9 +868,6 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectLater(status, emits(isSyncStatus(downloading: true))); syncService.addKeepAlive(0); - - await pumpEventQueue(); - expect(syncService.controller.hasListener, isFalse); syncService.endCurrentListener(); // Should reconnect after delay. @@ -952,11 +939,11 @@ void _declareTests(String name, SyncOptions options, bool bson) { await Completer().future; })); - syncClient.streamingSync(); + await connect(); await requestStarted.future; expect(database.currentStatus, isSyncStatus(connecting: true)); - await syncClient.abort(); + await database.disconnect(); expect(database.currentStatus.anyError, isNull); }); @@ -975,57 +962,9 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); await expectLater(status, emits(isSyncStatus(downloading: true))); - await syncClient.abort(); + await database.disconnect(); expect(database.currentStatus.anyError, isNull); }); }); }); } - -TypeMatcher isSyncStatus({ - Object? downloading, - Object? connected, - Object? connecting, - Object? hasSynced, - Object? downloadProgress, -}) { - var matcher = isA(); - if (downloading != null) { - matcher = matcher.having((e) => e.downloading, 'downloading', downloading); - } - if (connected != null) { - matcher = matcher.having((e) => e.connected, 'connected', connected); - } - if (connecting != null) { - matcher = matcher.having((e) => e.connecting, 'connecting', connecting); - } - if (hasSynced != null) { - matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); - } - if (downloadProgress != null) { - matcher = matcher.having( - (e) => e.downloadProgress, 'downloadProgress', downloadProgress); - } - - return matcher; -} - -TypeMatcher isSyncDownloadProgress({ - required Object progress, - Map priorities = const {}, -}) { - var matcher = - isA().having((e) => e, 'untilCompletion', progress); - priorities.forEach((priority, expected) { - matcher = matcher.having( - (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); - }); - - return matcher; -} - -TypeMatcher progress(int completed, int total) { - return isA() - .having((e) => e.downloadedOperations, 'completed', completed) - .having((e) => e.totalOperations, 'total', total); -} diff --git a/packages/powersync_core/test/sync/stream_test.dart b/packages/powersync_core/test/sync/stream_test.dart new file mode 100644 index 00000000..1625656c --- /dev/null +++ b/packages/powersync_core/test/sync/stream_test.dart @@ -0,0 +1,263 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:async/async.dart'; +import 'package:logging/logging.dart'; +import 'package:powersync_core/powersync_core.dart'; + +import 'package:test/test.dart'; + +import '../server/sync_server/in_memory_sync_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/in_memory_http.dart'; +import '../utils/test_utils_impl.dart'; +import 'utils.dart'; + +void main() { + late final testUtils = TestUtils(); + + late TestPowerSyncFactory factory; + + late TestDatabase database; + late MockSyncService syncService; + late Logger logger; + late SyncOptions options; + + var credentialsCallbackCount = 0; + + Future connect() async { + final (client, server) = inMemoryServer(); + server.mount(syncService.router.call); + + database.httpClient = client; + await database.connect( + connector: TestConnector( + () async { + credentialsCallbackCount++; + return PowerSyncCredentials( + endpoint: server.url.toString(), + token: 'token$credentialsCallbackCount', + expiresAt: DateTime.now(), + ); + }, + uploadData: (db) async {}, + ), + options: options, + ); + } + + setUp(() async { + options = SyncOptions(syncImplementation: SyncClientImplementation.rust); + logger = Logger.detached('powersync.active')..level = Level.ALL; + credentialsCallbackCount = 0; + syncService = MockSyncService(); + + factory = await testUtils.testFactory(); + (_, database) = await factory.openInMemoryDatabase(); + await database.initialize(); + }); + + tearDown(() async { + await database.close(); + await syncService.stop(); + }); + + Future> waitForConnection( + {bool expectNoWarnings = true}) async { + if (expectNoWarnings) { + logger.onRecord.listen((e) { + if (e.level >= Level.WARNING) { + fail('Unexpected log: $e, ${e.stackTrace}'); + } + }); + } + await connect(); + await syncService.waitForListener; + + expect(database.currentStatus.lastSyncedAt, isNull); + expect(database.currentStatus.downloading, isFalse); + final status = StreamQueue(database.statusStream); + addTearDown(status.cancel); + + syncService.addKeepAlive(); + await expectLater( + status, emitsThrough(isSyncStatus(connected: true, hasSynced: false))); + return status; + } + + test('can disable default streams', () async { + options = SyncOptions( + syncImplementation: SyncClientImplementation.rust, + includeDefaultStreams: false, + ); + + await waitForConnection(); + final request = await syncService.waitForListener; + expect(json.decode(await request.readAsString()), + containsPair('streams', containsPair('include_defaults', false))); + }); + + test('subscribes with streams', () async { + final a = await database.syncStream('stream', {'foo': 'a'}).subscribe(); + final b = await database.syncStream('stream', {'foo': 'b'}).subscribe( + priority: StreamPriority(1)); + + final statusStream = await waitForConnection(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', [ + { + 'stream': 'stream', + 'parameters': {'foo': 'a'}, + 'override_priority': null, + }, + { + 'stream': 'stream', + 'parameters': {'foo': 'b'}, + 'override_priority': 1, + }, + ]), + ), + ); + + syncService.addLine( + checkpoint( + lastOpId: 0, + buckets: [ + bucketDescription('a', subscriptions: [ + {'sub': 0} + ]), + bucketDescription('b', priority: 1, subscriptions: [ + {'sub': 1} + ]) + ], + streams: [ + stream('stream', false), + ], + ), + ); + + var status = await statusStream.next; + for (final subscription in [a, b]) { + expect(status.forStream(subscription)!.subscription.active, true); + expect(status.forStream(subscription)!.subscription.lastSyncedAt, isNull); + expect( + status.forStream(subscription)!.subscription.hasExplicitSubscription, + true, + ); + } + + syncService.addLine(checkpointComplete(priority: 1)); + status = await statusStream.next; + expect(status.forStream(a)!.subscription.lastSyncedAt, isNull); + expect(status.forStream(b)!.subscription.lastSyncedAt, isNotNull); + await b.waitForFirstSync(); + + syncService.addLine(checkpointComplete()); + await a.waitForFirstSync(); + }); + + test('reports default streams', () async { + final status = await waitForConnection(); + syncService.addLine( + checkpoint(lastOpId: 0, streams: [stream('default_stream', true)]), + ); + + await expectLater( + status, + emits( + isSyncStatus( + syncStreams: [ + isStreamStatus( + subscription: isSyncSubscription( + name: 'default_stream', + parameters: null, + isDefault: true, + ), + ), + ], + ), + ), + ); + }); + + test('changes subscriptions dynamically', () async { + await waitForConnection(); + syncService.addKeepAlive(); + + final subscription = await database.syncStream('a').subscribe(); + syncService.endCurrentListener(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', [ + { + 'stream': 'a', + 'parameters': null, + 'override_priority': null, + }, + ]), + ), + ); + + // Given that the subscription has a TTL, dropping the handle should not + // re-subscribe. + subscription.unsubscribe(); + await pumpEventQueue(); + expect(syncService.controller.hasListener, isTrue); + }); + + test('subscriptions update while offline', () async { + final stream = StreamQueue(database.statusStream); + + final subscription = await database.syncStream('foo').subscribe(); + var status = await stream.next; + expect(status.forStream(subscription), isNotNull); + }); + + test('unsubscribing multiple times has no effect', () async { + final a = await database.syncStream('a').subscribe(); + final aAgain = await database.syncStream('a').subscribe(); + a.unsubscribe(); + a.unsubscribe(); // Should not decrement the refcount again + + // Pretend the streams are expired - they should still be requested because + // the core extension extends the lifetime of streams currently referenced + // before connecting. + await database.execute( + 'UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000'); + + await waitForConnection(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', isNotEmpty), + ), + ); + aAgain.unsubscribe(); + }); + + test('unsubscribeAll', () async { + final a = await database.syncStream('a').subscribe(); + await database.syncStream('a').unsubscribeAll(); + + // Despite a being active, it should not be requested. + await waitForConnection(); + final request = await syncService.waitForListener; + expect( + json.decode(await request.readAsString()), + containsPair( + 'streams', + containsPair('subscriptions', isEmpty), + ), + ); + a.unsubscribe(); + }); +} diff --git a/packages/powersync_core/test/streaming_sync_test.dart b/packages/powersync_core/test/sync/streaming_sync_test.dart similarity index 97% rename from packages/powersync_core/test/streaming_sync_test.dart rename to packages/powersync_core/test/sync/streaming_sync_test.dart index 40becd16..5017993f 100644 --- a/packages/powersync_core/test/streaming_sync_test.dart +++ b/packages/powersync_core/test/sync/streaming_sync_test.dart @@ -9,10 +9,10 @@ import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; import 'package:test/test.dart'; -import 'server/sync_server/in_memory_sync_server.dart'; -import 'test_server.dart'; -import 'utils/abstract_test_utils.dart'; -import 'utils/test_utils_impl.dart'; +import '../server/sync_server/in_memory_sync_server.dart'; +import '../test_server.dart'; +import '../utils/abstract_test_utils.dart'; +import '../utils/test_utils_impl.dart'; final testUtils = TestUtils(); diff --git a/packages/powersync_core/test/sync_types_test.dart b/packages/powersync_core/test/sync/sync_types_test.dart similarity index 95% rename from packages/powersync_core/test/sync_types_test.dart rename to packages/powersync_core/test/sync/sync_types_test.dart index 261152b2..5cd24c9d 100644 --- a/packages/powersync_core/test/sync_types_test.dart +++ b/packages/powersync_core/test/sync/sync_types_test.dart @@ -216,11 +216,11 @@ void main() { } }); - test('bucket priority comparisons', () { - expect(BucketPriority(0) < BucketPriority(3), isFalse); - expect(BucketPriority(0) > BucketPriority(3), isTrue); - expect(BucketPriority(0) >= BucketPriority(3), isTrue); - expect(BucketPriority(0) >= BucketPriority(0), isTrue); + test('stream priority comparisons', () { + expect(StreamPriority(0) < StreamPriority(3), isFalse); + expect(StreamPriority(0) > StreamPriority(3), isTrue); + expect(StreamPriority(0) >= StreamPriority(3), isTrue); + expect(StreamPriority(0) >= StreamPriority(0), isTrue); }); }); } diff --git a/packages/powersync_core/test/sync/utils.dart b/packages/powersync_core/test/sync/utils.dart new file mode 100644 index 00000000..53654f12 --- /dev/null +++ b/packages/powersync_core/test/sync/utils.dart @@ -0,0 +1,136 @@ +import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/sync/protocol.dart'; +import 'package:test/test.dart'; + +TypeMatcher isSyncStatus({ + Object? downloading, + Object? connected, + Object? connecting, + Object? hasSynced, + Object? downloadProgress, + Object? syncStreams, +}) { + var matcher = isA(); + if (downloading != null) { + matcher = matcher.having((e) => e.downloading, 'downloading', downloading); + } + if (connected != null) { + matcher = matcher.having((e) => e.connected, 'connected', connected); + } + if (connecting != null) { + matcher = matcher.having((e) => e.connecting, 'connecting', connecting); + } + if (hasSynced != null) { + matcher = matcher.having((e) => e.hasSynced, 'hasSynced', hasSynced); + } + if (downloadProgress != null) { + matcher = matcher.having( + (e) => e.downloadProgress, 'downloadProgress', downloadProgress); + } + if (syncStreams != null) { + matcher = matcher.having((e) => e.syncStreams, 'syncStreams', syncStreams); + } + + return matcher; +} + +TypeMatcher isSyncDownloadProgress({ + required Object progress, + Map priorities = const {}, +}) { + var matcher = + isA().having((e) => e, 'untilCompletion', progress); + priorities.forEach((priority, expected) { + matcher = matcher.having( + (e) => e.untilPriority(priority), 'untilPriority($priority)', expected); + }); + + return matcher; +} + +TypeMatcher progress(int completed, int total) { + return isA() + .having((e) => e.downloadedOperations, 'completed', completed) + .having((e) => e.totalOperations, 'total', total); +} + +TypeMatcher isStreamStatus({ + required Object? subscription, + Object? progress, +}) { + var matcher = isA() + .having((e) => e.subscription, 'subscription', subscription); + if (progress case final progress?) { + matcher = matcher.having((e) => e.progress, 'progress', progress); + } + + return matcher; +} + +TypeMatcher isSyncSubscription({ + required Object name, + required Object? parameters, + bool? isDefault, +}) { + var matcher = isA() + .having((e) => e.name, 'name', name) + .having((e) => e.parameters, 'parameters', parameters); + + if (isDefault != null) { + matcher = matcher.having((e) => e.isDefault, 'isDefault', isDefault); + } + + return matcher; +} + +BucketChecksum checksum( + {required String bucket, required int checksum, int priority = 1}) { + return BucketChecksum(bucket: bucket, priority: priority, checksum: checksum); +} + +/// Creates a `checkpoint` line. +Object checkpoint({ + required int lastOpId, + List buckets = const [], + String? writeCheckpoint, + List streams = const [], +}) { + return { + 'checkpoint': { + 'last_op_id': '$lastOpId', + 'write_checkpoint': null, + 'buckets': buckets, + 'streams': streams, + } + }; +} + +Object stream(String name, bool isDefault, {List errors = const []}) { + return {'name': name, 'is_default': isDefault, 'errors': errors}; +} + +/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line. +Object checkpointComplete({int? priority, String lastOpId = '1'}) { + return { + priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': { + 'last_op_id': lastOpId, + if (priority != null) 'priority': priority, + }, + }; +} + +Object bucketDescription( + String name, { + int checksum = 0, + int priority = 3, + int count = 1, + Object? subscriptions, +}) { + return { + 'bucket': name, + 'checksum': checksum, + 'priority': priority, + 'count': count, + if (subscriptions != null) 'subscriptions': subscriptions, + }; +} diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index a95d2604..96469c5a 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,8 +1,11 @@ +import 'dart:async'; import 'dart:convert'; import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; +import 'package:powersync_core/src/abort_controller.dart'; +import 'package:powersync_core/src/database/powersync_db_mixin.dart'; import 'package:powersync_core/src/sync/bucket_storage.dart'; import 'package:powersync_core/src/sync/internal_connector.dart'; import 'package:powersync_core/src/sync/options.dart'; @@ -63,7 +66,7 @@ Logger _makeTestLogger({Level level = Level.ALL, String? name}) { abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { Future openRawInMemoryDatabase(); - Future<(CommonDatabase, PowerSyncDatabase)> openInMemoryDatabase({ + Future<(CommonDatabase, TestDatabase)> openInMemoryDatabase({ Schema? schema, Logger? logger, }) async { @@ -71,16 +74,16 @@ abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { return (raw, wrapRaw(raw, customSchema: schema, logger: logger)); } - PowerSyncDatabase wrapRaw( + TestDatabase wrapRaw( CommonDatabase raw, { Logger? logger, Schema? customSchema, }) { - return PowerSyncDatabase.withDatabase( - schema: customSchema ?? schema, + return TestDatabase( database: SqliteDatabase.singleConnection( SqliteConnection.synchronousWrapper(raw)), - logger: logger, + logger: logger ?? Logger.detached('PowerSync.test'), + schema: customSchema ?? schema, ); } } @@ -151,6 +154,83 @@ class TestConnector extends PowerSyncBackendConnector { } } +/// A [PowerSyncDatabase] implemented by a single in-memory database connection +/// and a mock-HTTP sync client. +/// +/// This ensures tests for sync cover the `ConnectionManager` and other methods +/// exposed by the mixin. +final class TestDatabase + with SqliteQueries, PowerSyncDatabaseMixin + implements PowerSyncDatabase { + @override + final SqliteDatabase database; + @override + final Logger logger; + @override + Schema schema; + + @override + late final Future isInitialized; + + Client? httpClient; + + TestDatabase({ + required this.database, + required this.logger, + required this.schema, + }) { + isInitialized = baseInit(); + } + + @override + Future connectInternal({ + required PowerSyncBackendConnector connector, + required ResolvedSyncOptions options, + required List initiallyActiveStreams, + required Stream> activeStreams, + required AbortController abort, + required Zone asyncWorkZone, + }) async { + final impl = StreamingSyncImplementation( + adapter: BucketStorage(this), + schemaJson: jsonEncode(schema), + client: httpClient!, + options: options, + connector: InternalConnector.wrap(connector, this), + logger: logger, + crudUpdateTriggerStream: database + .onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)), + activeSubscriptions: initiallyActiveStreams, + ); + impl.statusStream.listen(setStatus); + + asyncWorkZone.run(impl.streamingSync); + final subscriptions = activeStreams.listen(impl.updateSubscriptions); + + abort.onAbort.then((_) async { + subscriptions.cancel(); + await impl.abort(); + abort.completeAbort(); + }).ignore(); + } + + @override + Future readLock(Future Function(SqliteReadContext tx) callback, + {String? debugContext, Duration? lockTimeout}) async { + await isInitialized; + return database.readLock(callback, + debugContext: debugContext, lockTimeout: lockTimeout); + } + + @override + Future writeLock(Future Function(SqliteWriteContext tx) callback, + {String? debugContext, Duration? lockTimeout}) async { + await isInitialized; + return database.writeLock(callback, + debugContext: debugContext, lockTimeout: lockTimeout); + } +} + extension MockSync on PowerSyncDatabase { StreamingSyncImplementation connectWithMockService( Client client,