From 21a28e951252d26ee78381feb7382028011fb13b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 11 Apr 2024 13:39:56 +0200 Subject: [PATCH 1/5] sqlite_async v0.6.1 --- packages/powersync/pubspec.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/powersync/pubspec.yaml b/packages/powersync/pubspec.yaml index 5b9f44d7..820f9d7b 100644 --- a/packages/powersync/pubspec.yaml +++ b/packages/powersync/pubspec.yaml @@ -10,7 +10,7 @@ dependencies: flutter: sdk: flutter - sqlite_async: ^0.6.0 + sqlite_async: ^0.6.1 sqlite3_flutter_libs: ^0.5.15 http: ^1.1.0 uuid: ^4.2.0 From 90cff10d0230646a3253f70aab1d5d593cb9b049 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 11 Apr 2024 13:54:01 +0200 Subject: [PATCH 2/5] Improve streaming_sync_tests. --- .../powersync/test/streaming_sync_test.dart | 21 ++--- packages/powersync/test/test_server.dart | 79 +++++++++++++------ 2 files changed, 60 insertions(+), 40 deletions(-) diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index aff9c71c..6cd64ff6 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -44,12 +44,8 @@ void main() { var server = await createServer(); credentialsCallback() async { - final endpoint = 'http://${server.address.host}:${server.port}'; return PowerSyncCredentials( - endpoint: endpoint, - token: 'token', - userId: 'u1', - expiresAt: DateTime.now()); + endpoint: server.endpoint, token: 'token'); } final pdb = await setupPowerSync(path: path); @@ -59,12 +55,12 @@ void main() { await Future.delayed(Duration(milliseconds: random.nextInt(100))); if (random.nextBool()) { - server.close(force: true).ignore(); + server.close(); } await pdb.close(); - server.close(force: true).ignore(); + server.close(); } }); @@ -81,18 +77,13 @@ void main() { // [PowerSync] WARNING: 2023-06-29 16:10:17.667537: Sync Isolate error // [Connection closed while receiving data, #0 IOClient.send. (package:http/src/io_client.dart:76:13) - HttpServer? server; + TestServer? server; credentialsCallback() async { if (server == null) { throw AssertionError('No active server'); } - final endpoint = 'http://${server.address.host}:${server.port}'; - return PowerSyncCredentials( - endpoint: endpoint, - token: 'token', - userId: 'u1', - expiresAt: DateTime.now()); + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); } final pdb = await setupPowerSync(path: path); @@ -107,7 +98,7 @@ void main() { // 2ms: HttpException: HttpServer is not bound to a socket // 20ms: Connection closed while receiving data await Future.delayed(Duration(milliseconds: 20)); - server.close(force: true).ignore(); + server.close(); } await pdb.close(); }); diff --git a/packages/powersync/test/test_server.dart b/packages/powersync/test/test_server.dart index c85101a9..2b9ac8b6 100644 --- a/packages/powersync/test/test_server.dart +++ b/packages/powersync/test/test_server.dart @@ -1,18 +1,67 @@ import 'dart:async'; import 'dart:convert' as convert; import 'dart:io'; +import 'dart:math'; import 'package:http/http.dart' show ByteStream; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_router/shelf_router.dart'; -Future createServer() async { - var app = Router(); +class TestServer { + late HttpServer server; + Router app = Router(); + int connectionCount = 0; + int maxConnectionCount = 0; + int tokenExpiresIn; - app.post('/sync/stream', handleSyncStream); - // Open on an arbitrary open port - var server = await shelf_io.serve(app.call, 'localhost', 0); + TestServer({this.tokenExpiresIn = 65}); + + Future init() async { + app.post('/sync/stream', handleSyncStream); + // Open on an arbitrary open port + server = await shelf_io.serve(app.call, 'localhost', 0); + } + + String get endpoint { + return 'http://${server.address.host}:${server.port}'; + } + + Future handleSyncStream(Request request) async { + connectionCount += 1; + maxConnectionCount = max(connectionCount, maxConnectionCount); + + stream() async* { + try { + var blob = "*" * 5000; + for (var i = 0; i < 50; i++) { + yield {"token_expires_in": tokenExpiresIn, "blob": blob}; + await Future.delayed(Duration(microseconds: 1)); + } + } finally { + connectionCount -= 1; + } + } + + return Response.ok( + encodeNdjson(stream()), + headers: { + 'Content-Type': 'application/x-ndjson', + }, + context: { + 'shelf.io.buffer_output': false, + }, + ); + } + + void close() { + server.close(force: true).ignore(); + } +} + +Future createServer() async { + var server = TestServer(); + await server.init(); return server; } @@ -22,23 +71,3 @@ ByteStream encodeNdjson(Stream jsonInput) { final byteInput = stringInput.transform(convert.utf8.encoder); return ByteStream(byteInput); } - -Future handleSyncStream(Request request) async { - stream() async* { - var blob = "*" * 5000; - for (var i = 0; i < 50; i++) { - yield {"token_expires_in": 5, "blob": blob}; - await Future.delayed(Duration(microseconds: 1)); - } - } - - return Response.ok( - encodeNdjson(stream()), - headers: { - 'Content-Type': 'application/x-ndjson', - }, - context: { - 'shelf.io.buffer_output': false, - }, - ); -} From 58fa056bc6bc1c63dfa88156e0ec92b7b89ef043 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 11 Apr 2024 13:54:34 +0200 Subject: [PATCH 3/5] Add failing test. --- .../powersync/test/streaming_sync_test.dart | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index 6cd64ff6..e86da411 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -102,5 +102,43 @@ void main() { } await pdb.close(); }); + + test('multiple connect calls', () async { + // Test repeatedly creating new PowerSync connections, then disconnect + // and close the connection. + final random = Random(); + var server = await createServer(); + + credentialsCallback() async { + return PowerSyncCredentials(endpoint: server.endpoint, token: 'token'); + } + + final pdb = await setupPowerSync(path: path); + pdb.retryDelay = Duration(milliseconds: 5000); + var connector = TestConnector(credentialsCallback); + pdb.connect(connector: connector); + pdb.connect(connector: connector); + + final watch = Stopwatch()..start(); + + // Wait for at least one connection + while (server.connectionCount < 1 && watch.elapsedMilliseconds < 500) { + await Future.delayed(Duration(milliseconds: random.nextInt(10))); + } + // Give some time for a second connection if any + await Future.delayed(Duration(milliseconds: random.nextInt(50))); + + await pdb.close(); + + // Give some time for connections to close + while (server.connectionCount != 0 && watch.elapsedMilliseconds < 1000) { + await Future.delayed(Duration(milliseconds: random.nextInt(10))); + } + + expect(server.connectionCount, equals(0)); + expect(server.maxConnectionCount, equals(1)); + + server.close(); + }); }); } From f32b30ccde88d7f75a39e5c2eb721206e93c5ab7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 11 Apr 2024 13:58:44 +0200 Subject: [PATCH 4/5] Fix issue when calling connect() multiple times. --- packages/powersync/lib/src/powersync_database.dart | 11 +++++++++++ packages/powersync/test/streaming_sync_test.dart | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/powersync/lib/src/powersync_database.dart b/packages/powersync/lib/src/powersync_database.dart index 3445fd2b..434d3c22 100644 --- a/packages/powersync/lib/src/powersync_database.dart +++ b/packages/powersync/lib/src/powersync_database.dart @@ -3,6 +3,7 @@ import 'dart:isolate'; import 'package:logging/logging.dart'; import 'package:powersync/src/log_internal.dart'; +import 'package:sqlite_async/mutex.dart'; import 'package:sqlite_async/sqlite3.dart' as sqlite; import 'package:sqlite_async/sqlite_async.dart'; @@ -69,6 +70,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// null when disconnected, present when connecting or connected AbortController? _disconnecter; + /// Use to prevent multiple connections from being opened concurrently + final Mutex _connectMutex = Mutex(); + /// The Logger used by this [PowerSyncDatabase]. /// /// The default is [autoLogger], which logs to the console in debug builds. @@ -190,6 +194,13 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection { /// Throttle time between CRUD operations /// Defaults to 10 milliseconds. Duration crudThrottleTime = const Duration(milliseconds: 10)}) async { + _connectMutex.lock(() => + _connect(connector: connector, crudThrottleTime: crudThrottleTime)); + } + + Future _connect( + {required PowerSyncBackendConnector connector, + required Duration crudThrottleTime}) async { await initialize(); // Disconnect if connected diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index e86da411..85d647a5 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -104,8 +104,8 @@ void main() { }); test('multiple connect calls', () async { - // Test repeatedly creating new PowerSync connections, then disconnect - // and close the connection. + // Test calling connect() multiple times. + // We check that this does not cause multiple connections to be opened concurrently. final random = Random(); var server = await createServer(); From ccaf293c10af24842dd4b4e4d5e3069c6c5b83a2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 11 Apr 2024 14:01:13 +0200 Subject: [PATCH 5/5] Fix lint issue. --- packages/powersync/test/streaming_sync_test.dart | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/powersync/test/streaming_sync_test.dart b/packages/powersync/test/streaming_sync_test.dart index 85d647a5..adda1399 100644 --- a/packages/powersync/test/streaming_sync_test.dart +++ b/packages/powersync/test/streaming_sync_test.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:io'; import 'dart:math'; import 'package:powersync/powersync.dart';