Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/powersync/lib/src/powersync_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:isolate';

import 'package:logging/logging.dart';
import 'package:powersync/src/log_internal.dart';
import 'package:sqlite_async/mutex.dart';
import 'package:sqlite_async/sqlite3.dart' as sqlite;
import 'package:sqlite_async/sqlite_async.dart';

Expand Down Expand Up @@ -69,6 +70,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
/// null when disconnected, present when connecting or connected
AbortController? _disconnecter;

/// Use to prevent multiple connections from being opened concurrently
final Mutex _connectMutex = Mutex();

/// The Logger used by this [PowerSyncDatabase].
///
/// The default is [autoLogger], which logs to the console in debug builds.
Expand Down Expand Up @@ -190,6 +194,13 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
/// Throttle time between CRUD operations
/// Defaults to 10 milliseconds.
Duration crudThrottleTime = const Duration(milliseconds: 10)}) async {
_connectMutex.lock(() =>
_connect(connector: connector, crudThrottleTime: crudThrottleTime));
}

Future<void> _connect(
{required PowerSyncBackendConnector connector,
required Duration crudThrottleTime}) async {
await initialize();

// Disconnect if connected
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
flutter:
sdk: flutter

sqlite_async: ^0.6.0
sqlite_async: ^0.6.1
sqlite3_flutter_libs: ^0.5.15
http: ^1.1.0
uuid: ^4.2.0
Expand Down
60 changes: 44 additions & 16 deletions packages/powersync/test/streaming_sync_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';

import 'package:powersync/powersync.dart';
Expand Down Expand Up @@ -44,12 +43,8 @@ void main() {
var server = await createServer();

credentialsCallback() async {
final endpoint = 'http://${server.address.host}:${server.port}';
return PowerSyncCredentials(
endpoint: endpoint,
token: 'token',
userId: 'u1',
expiresAt: DateTime.now());
endpoint: server.endpoint, token: 'token');
}

final pdb = await setupPowerSync(path: path);
Expand All @@ -59,12 +54,12 @@ void main() {

await Future.delayed(Duration(milliseconds: random.nextInt(100)));
if (random.nextBool()) {
server.close(force: true).ignore();
server.close();
}

await pdb.close();

server.close(force: true).ignore();
server.close();
}
});

Expand All @@ -81,18 +76,13 @@ void main() {
// [PowerSync] WARNING: 2023-06-29 16:10:17.667537: Sync Isolate error
// [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)

HttpServer? server;
TestServer? server;

credentialsCallback() async {
if (server == null) {
throw AssertionError('No active server');
}
final endpoint = 'http://${server.address.host}:${server.port}';
return PowerSyncCredentials(
endpoint: endpoint,
token: 'token',
userId: 'u1',
expiresAt: DateTime.now());
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
}

final pdb = await setupPowerSync(path: path);
Expand All @@ -107,9 +97,47 @@ void main() {
// 2ms: HttpException: HttpServer is not bound to a socket
// 20ms: Connection closed while receiving data
await Future.delayed(Duration(milliseconds: 20));
server.close(force: true).ignore();
server.close();
}
await pdb.close();
});

test('multiple connect calls', () async {
// Test calling connect() multiple times.
// We check that this does not cause multiple connections to be opened concurrently.
final random = Random();
var server = await createServer();

credentialsCallback() async {
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
}

final pdb = await setupPowerSync(path: path);
pdb.retryDelay = Duration(milliseconds: 5000);
var connector = TestConnector(credentialsCallback);
pdb.connect(connector: connector);
pdb.connect(connector: connector);

final watch = Stopwatch()..start();

// Wait for at least one connection
while (server.connectionCount < 1 && watch.elapsedMilliseconds < 500) {
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
}
// Give some time for a second connection if any
await Future.delayed(Duration(milliseconds: random.nextInt(50)));

await pdb.close();

// Give some time for connections to close
while (server.connectionCount != 0 && watch.elapsedMilliseconds < 1000) {
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
}

expect(server.connectionCount, equals(0));
expect(server.maxConnectionCount, equals(1));

server.close();
});
});
}
79 changes: 54 additions & 25 deletions packages/powersync/test/test_server.dart
Original file line number Diff line number Diff line change
@@ -1,18 +1,67 @@
import 'dart:async';
import 'dart:convert' as convert;
import 'dart:io';
import 'dart:math';

import 'package:http/http.dart' show ByteStream;
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_router/shelf_router.dart';

Future<HttpServer> createServer() async {
var app = Router();
class TestServer {
late HttpServer server;
Router app = Router();
int connectionCount = 0;
int maxConnectionCount = 0;
int tokenExpiresIn;

app.post('/sync/stream', handleSyncStream);
// Open on an arbitrary open port
var server = await shelf_io.serve(app.call, 'localhost', 0);
TestServer({this.tokenExpiresIn = 65});

Future<void> init() async {
app.post('/sync/stream', handleSyncStream);
// Open on an arbitrary open port
server = await shelf_io.serve(app.call, 'localhost', 0);
}

String get endpoint {
return 'http://${server.address.host}:${server.port}';
}

Future<Response> handleSyncStream(Request request) async {
connectionCount += 1;
maxConnectionCount = max(connectionCount, maxConnectionCount);

stream() async* {
try {
var blob = "*" * 5000;
for (var i = 0; i < 50; i++) {
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
await Future.delayed(Duration(microseconds: 1));
}
} finally {
connectionCount -= 1;
}
}

return Response.ok(
encodeNdjson(stream()),
headers: {
'Content-Type': 'application/x-ndjson',
},
context: {
'shelf.io.buffer_output': false,
},
);
}

void close() {
server.close(force: true).ignore();
}
}

Future<TestServer> createServer() async {
var server = TestServer();
await server.init();
return server;
}

Expand All @@ -22,23 +71,3 @@ ByteStream encodeNdjson(Stream<Object> jsonInput) {
final byteInput = stringInput.transform(convert.utf8.encoder);
return ByteStream(byteInput);
}

Future<Response> handleSyncStream(Request request) async {
stream() async* {
var blob = "*" * 5000;
for (var i = 0; i < 50; i++) {
yield {"token_expires_in": 5, "blob": blob};
await Future.delayed(Duration(microseconds: 1));
}
}

return Response.ok(
encodeNdjson(stream()),
headers: {
'Content-Type': 'application/x-ndjson',
},
context: {
'shelf.io.buffer_output': false,
},
);
}