Skip to content

Conversation

davidmartos96
Copy link
Contributor

Fixes #62

This is a proposal API to fix the linked issue. We have been using it succesfully on our project for the past months. The main goal is to be able to attach a database on all the active connections, read and write. Additionally, we have our own open factory, which knows which databases are attached at any given time, so it will also attach new opened connections.

We use it like this:

extension SqliteAsyncExtension on SqliteDatabase {
  Future<void> attachDbOnAllConnections(String path, String alias, {String password = ''}) async {
    await detachDbOnAllConnections(alias);

    // This is the only way we can do an attach in all connections
    //
    // When using sqlcipher, the KEY needs to be provided always, otherwise it "inherits" the main db config
    // so we wouldn't be able to attach to an uneccrypted db
    try {
      await _runOnAllConnections((conn) => conn("ATTACH DATABASE ? AS ? KEY ?", [path, alias, password]));
    } catch (_) {
      // detach the db if it failed to attach on any connection
      await detachDbOnAllConnections(alias);
      rethrow;
    }
  }

  Future<void> detachDbOnAllConnections(String alias) async {
    await _runOnAllConnections((conn) async {
      // We need to check the list of attached databases to make the "warm" the detach command
      // if we try to try-catch it there are cases were it will block/not capture the error in the isolate
      final dbListRes = await conn(
        '''
        SELECT *
        FROM pragma_database_list
        WHERE name = ?
        ''',
        [alias],
      );

      final isAttached = dbListRes.isNotEmpty;
      if (isAttached) {
        // logger.i("Detached db: $alias");
        await conn('DETACH DATABASE ?', [alias]);
      }
    });
  }

  Future<void> _runOnAllConnections(
    Future<void> Function(ConnFun connFun) callback,
  ) async {
    final connections = getAllConnections();
    for (final conn in connections) {
      await callback(conn.getAll);
    }
  }
}

typedef ConnFun = Future<ResultSet> Function(String sql, [List<Object?> parameters]);

Copy link
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I'm wondering about is whether a snapshot of all connections enough - e.g. I could imagine a potential race condition between usages of this call and the pool growing.

Another API which should work as well but might be safer to use could be something like this:

/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
Future<void> withAllConnections(Future<void> Function(List<({SqliteWriteContext context, bool isReadOnlyConnection})> connections) block);

That also feels a bit clumsy though, so I'm not sure if just exposing all connections directly might be easier for this advanced use-case that realistically requires cooperation with a custom open factory anyway.

@davidmartos96
Copy link
Contributor Author

@simolus3 We are happy to discuss a different API. Our only use case is the snippet above, and it's only used for attaching/detaching databases.
We are currently solving the attaching dbs problem with a file that holds the current attached databases. We need to use a file because it's the only way we have at the moment to share data between the pool isolates. More information in the linked issue.
And even though we write to the file synchronously before we run the attach, it's true that there could be a race condition if the pool was growing at that exact moment. It won't be aware of the current attached databases.
Is there any way we can use an existing mutex/lock from the library, that works cross isolates?

@davidmartos96
Copy link
Contributor Author

@simolus3 Has there been any second thought about this proposal?

@simolus3
Copy link
Contributor

Sorry about the delay here! I still think that a callback-based approach would be a safer option, but I agree that this is a useful addition to have.

I know that in Kotlin, we actually have an API for this, and we're using it internally to ensure a schema refresh hits all connections (by running a pragma table_info('sqlite_master') on all read connections). Now I wonder whether not doing that in Dart may even be a bug 🤔

Anyway, I would happy with the following API:

/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
Future<T> withAllConnections<T>(Future<T> Function(SqliteWriteContext writer, List<SqliteReadContext> readers) block);

I know that on Kotlin we're obtaining the read and write locks independently, and that works reasonably well.

Is there any way we can use an existing mutex/lock from the library, that works cross isolates?

Would this still be necessary if the method was part of this package?

@rkistner
Copy link
Contributor

@simolus3 We handle that case here:

@davidmartos96
Copy link
Contributor Author

@simolus3 would it be ok to use the write lock in the implementation? That would lock the readers too, right?

@simolus3
Copy link
Contributor

I think we'd have to lock both, for the connection pool the idea is that readers can run concurrently to the writer thanks to WAL mode.

@davidmartos96
Copy link
Contributor Author

@simolus3 Would this implementation be ok? Not sure what would be the best way to test this.
Also, from the implementation below, would it be necessary a Mutex when calling withAllConnections and when spawning new readers? To avoid spawning new ones while the lock all is executing.

File: connection_pool.dart

Future<T> withAllConnections<T>(
      Future<T> Function(
              SqliteWriteContext writer, List<SqliteReadContext> readers)
          block) async {
    final blockCompleter = Completer<T>();
    final (write, reads) = await _lockAllConns<T>(blockCompleter);

    try {
      final res = await block(write, reads);
      blockCompleter.complete(res);
      return res;
    } catch (e, st) {
      blockCompleter.completeError(e, st);
      rethrow;
    }
  }

  /// Locks all connections, returning the acquired contexts.
  /// We pass a completer that would be called after the locks are taken.
  Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
      Completer<T> lockCompleter) async {
    final List<Completer<SqliteReadContext>> readLockedCompleters = [];
    final Completer<SqliteWriteContext> writeLockedCompleter = Completer();

    // Take the write lock
    writeLock((ctx) {
      writeLockedCompleter.complete(ctx);
      return lockCompleter.future;
    });

    // Take all the read locks
    for (final readConn in _allReadConnections) {
      final completer = Completer<SqliteReadContext>();
      readLockedCompleters.add(completer);

      readConn.readLock((ctx) {
        completer.complete(ctx);
        return lockCompleter.future;
      });
    }

    // Wait after all locks are taken
    final contexts = await Future.wait([
      writeLockedCompleter.future,
      ...readLockedCompleters.map((e) => e.future)
    ]);
    return (contexts.first as SqliteWriteContext, contexts.sublist(1));
  }

@simolus3
Copy link
Contributor

simolus3 commented Sep 24, 2025

Yes, that implementation looks good to me (btw I think you can use final [writer as SqliteWriteContext ...readers] = await Future.wait to potentially simplify the last thing).

would it be necessary a Mutex when calling withAllConnections and when spawning new readers? To avoid spawning new ones while the lock all is executing.

I think we can ignore that case. It sounds like a better way to handle that race is to also write a corresponding open factory to ensure new connections are in the state you want them to be in. I don't see much of a difference between this and a new reader being added after you return from withAllConnections: In either case you need to configure added connections for consistency.

Not sure what would be the best way to test this.

IMO a simple test in say test/basic_test.dart that first acquires all connections and runs a write followed by reads on all read connections would be a good way to exercise this.
Perhaps we can also asynchronously fetch a new write / reader connection in the block and assert that those futures only complete after we've returned from withAllConnections.

@davidmartos96
Copy link
Contributor Author

@simolus3 Thank you!

I don't see much of a difference between this and a new reader being added after you return from withAllConnections: In either case you need to configure added connections for consistency.

There is a difference in our only use case for this. We save information about the currently attached databases into a file, and the factory reads that file. So if we call withAllConnections, we need to make sure to wait for its completion before spawning a new reader, so that it knows about all the attachs, and it doesn't miss one because of a race condition

Copy link
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks good to me apart from one comment about a potential race condition.

@simolus3
Copy link
Contributor

simolus3 commented Oct 7, 2025

The implementation looks good to me. For the web test to work, we also need to take dart2js into consideration (and apply some web-specific fixes). It looks like I don't have access to your branch, could you apply the following patch copying the test to native_test.dart and then using a simplified version for the web?

diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart
index abe54a3..e2914b3 100644
--- a/packages/sqlite_async/test/basic_test.dart
+++ b/packages/sqlite_async/test/basic_test.dart
@@ -1,6 +1,4 @@
 import 'dart:async';
-import 'dart:math';
-import 'package:collection/collection.dart';
 import 'package:sqlite_async/sqlite3_common.dart';
 import 'package:sqlite_async/sqlite_async.dart';
 import 'package:test/test.dart';
@@ -9,6 +7,7 @@ import 'utils/test_utils_impl.dart';
 
 final testUtils = TestUtils();
 const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm');
+const _isWeb = identical(0, 0.0) || _isDart2Wasm;
 
 void main() {
   group('Shared Basic Tests', () {
@@ -305,8 +304,7 @@ void main() {
     });
 
     test('with all connections', () async {
-      // TODO: Is this right?
-      final maxReaders = _isDart2Wasm ? 0 : 3;
+      final maxReaders = _isWeb ? 0 : 3;
 
       final db = SqliteDatabase.withFactory(
         await testUtils.testFactory(path: path),
@@ -315,53 +313,35 @@ void main() {
       await db.initialize();
       await createTables(db);
 
-      Future<Row> readWithRandomDelay(SqliteReadContext ctx, int id) async {
-        return await ctx.get(
-            'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
-            [id, 5 + Random().nextInt(10)]);
-      }
-
       // Warm up to spawn the max readers
-      await Future.wait(
-        [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)),
-      );
+      await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]);
 
       bool finishedWithAllConns = false;
 
       late Future<void> readsCalledWhileWithAllConnsRunning;
 
-      print("${DateTime.now()} start");
+      final parentZone = Zone.current;
       await db.withAllConnections((writer, readers) async {
         expect(readers.length, maxReaders);
 
         // Run some reads during the block that they should run after the block finishes and releases
         // all locks
-        readsCalledWhileWithAllConnsRunning = Future.wait(
-          [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
-            final r = await db.readLock((c) async {
-              expect(finishedWithAllConns, isTrue);
-              return await readWithRandomDelay(c, i);
-            });
-            print(
-                "${DateTime.now()} After withAllConnections, started while running $r");
-          }),
-        );
-
-        await Future.wait([
-          writer.execute(
-              "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *",
-              [
-                123,
-                5 + Random().nextInt(20)
-              ]).then((value) =>
-              print("${DateTime.now()} withAllConnections writer done $value")),
-          ...readers
-              .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) {
-                    print(
-                        "${DateTime.now()} withAllConnections readers done $results");
-                  }))
-        ]);
-      }).then((_) => finishedWithAllConns = true);
+        // Need a root zone here to avoid recursive lock errors.
+        readsCalledWhileWithAllConnsRunning =
+            Future(parentZone.bindCallback(() async {
+          await Future.wait(
+            [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
+              await db.readLock((c) async {
+                expect(finishedWithAllConns, isTrue);
+                await Future.delayed(const Duration(milliseconds: 100));
+              });
+            }),
+          );
+        }));
+
+        await Future.delayed(const Duration(milliseconds: 200));
+        finishedWithAllConns = true;
+      });
 
       await readsCalledWhileWithAllConnsRunning;
     });
diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart
index c580306..3f348e6 100644
--- a/packages/sqlite_async/test/native/basic_test.dart
+++ b/packages/sqlite_async/test/native/basic_test.dart
@@ -5,6 +5,7 @@ import 'dart:async';
 import 'dart:io';
 import 'dart:math';
 
+import 'package:collection/collection.dart';
 import 'package:path/path.dart' show join;
 import 'package:sqlite3/common.dart' as sqlite;
 import 'package:sqlite_async/sqlite_async.dart';
@@ -161,6 +162,68 @@ void main() {
       expect(readerInfo, {'state': 'updated'});
     });
 
+    test('with all connections', () async {
+      final maxReaders = 3;
+
+      final db = SqliteDatabase.withFactory(
+        await testUtils.testFactory(path: path),
+        maxReaders: maxReaders,
+      );
+      await db.initialize();
+      await createTables(db);
+
+      Future<sqlite.Row> readWithRandomDelay(
+          SqliteReadContext ctx, int id) async {
+        return await ctx.get(
+            'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
+            [id, 5 + Random().nextInt(10)]);
+      }
+
+      // Warm up to spawn the max readers
+      await Future.wait(
+        [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)),
+      );
+
+      bool finishedWithAllConns = false;
+
+      late Future<void> readsCalledWhileWithAllConnsRunning;
+
+      print("${DateTime.now()} start");
+      await db.withAllConnections((writer, readers) async {
+        expect(readers.length, maxReaders);
+
+        // Run some reads during the block that they should run after the block finishes and releases
+        // all locks
+        readsCalledWhileWithAllConnsRunning = Future.wait(
+          [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
+            final r = await db.readLock((c) async {
+              expect(finishedWithAllConns, isTrue);
+              return await readWithRandomDelay(c, i);
+            });
+            print(
+                "${DateTime.now()} After withAllConnections, started while running $r");
+          }),
+        );
+
+        await Future.wait([
+          writer.execute(
+              "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *",
+              [
+                123,
+                5 + Random().nextInt(20)
+              ]).then((value) =>
+              print("${DateTime.now()} withAllConnections writer done $value")),
+          ...readers
+              .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) {
+                    print(
+                        "${DateTime.now()} withAllConnections readers done $results");
+                  }))
+        ]);
+      }).then((_) => finishedWithAllConns = true);
+
+      await readsCalledWhileWithAllConnsRunning;
+    });
+
     test('read-only transactions', () async {
       final db = await testUtils.setupDatabase(path: path);
       await createTables(db);

Copy link
Contributor

@simolus3 simolus3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@simolus3 simolus3 merged commit b97be61 into powersync-ja:main Oct 7, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Obtain all active connections in the pool

4 participants