-
Notifications
You must be signed in to change notification settings - Fork 13
Get all Sqlite connections in the pool #101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # packages/drift_sqlite_async/lib/src/executor.dart # packages/sqlite_async/lib/src/web/database.dart # packages/sqlite_async/lib/src/web/database/web_sqlite_database.dart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I'm wondering about is whether a snapshot of all connections enough - e.g. I could imagine a potential race condition between usages of this call and the pool growing.
Another API which should work as well but might be safer to use could be something like this:
/// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
Future<void> withAllConnections(Future<void> Function(List<({SqliteWriteContext context, bool isReadOnlyConnection})> connections) block);
That also feels a bit clumsy though, so I'm not sure if just exposing all connections directly might be easier for this advanced use-case that realistically requires cooperation with a custom open factory anyway.
@simolus3 We are happy to discuss a different API. Our only use case is the snippet above, and it's only used for attaching/detaching databases. |
@simolus3 Has there been any second thought about this proposal? |
Sorry about the delay here! I still think that a callback-based approach would be a safer option, but I agree that this is a useful addition to have. I know that in Kotlin, we actually have an API for this, and we're using it internally to ensure a schema refresh hits all connections (by running a Anyway, I would happy with the following API: /// Locks all underlying connections making up this database, and gives [block] access to all of them at once.
Future<T> withAllConnections<T>(Future<T> Function(SqliteWriteContext writer, List<SqliteReadContext> readers) block); I know that on Kotlin we're obtaining the read and write locks independently, and that works reasonably well.
Would this still be necessary if the method was part of this package? |
@simolus3 We handle that case here: sqlite_async.dart/packages/sqlite_async/lib/src/native/database/connection_pool.dart Line 226 in 6705d13
|
@simolus3 would it be ok to use the write lock in the implementation? That would lock the readers too, right? |
I think we'd have to lock both, for the connection pool the idea is that readers can run concurrently to the writer thanks to WAL mode. |
@simolus3 Would this implementation be ok? Not sure what would be the best way to test this. File: Future<T> withAllConnections<T>(
Future<T> Function(
SqliteWriteContext writer, List<SqliteReadContext> readers)
block) async {
final blockCompleter = Completer<T>();
final (write, reads) = await _lockAllConns<T>(blockCompleter);
try {
final res = await block(write, reads);
blockCompleter.complete(res);
return res;
} catch (e, st) {
blockCompleter.completeError(e, st);
rethrow;
}
}
/// Locks all connections, returning the acquired contexts.
/// We pass a completer that would be called after the locks are taken.
Future<(SqliteWriteContext, List<SqliteReadContext>)> _lockAllConns<T>(
Completer<T> lockCompleter) async {
final List<Completer<SqliteReadContext>> readLockedCompleters = [];
final Completer<SqliteWriteContext> writeLockedCompleter = Completer();
// Take the write lock
writeLock((ctx) {
writeLockedCompleter.complete(ctx);
return lockCompleter.future;
});
// Take all the read locks
for (final readConn in _allReadConnections) {
final completer = Completer<SqliteReadContext>();
readLockedCompleters.add(completer);
readConn.readLock((ctx) {
completer.complete(ctx);
return lockCompleter.future;
});
}
// Wait after all locks are taken
final contexts = await Future.wait([
writeLockedCompleter.future,
...readLockedCompleters.map((e) => e.future)
]);
return (contexts.first as SqliteWriteContext, contexts.sublist(1));
} |
Yes, that implementation looks good to me (btw I think you can use
I think we can ignore that case. It sounds like a better way to handle that race is to also write a corresponding open factory to ensure new connections are in the state you want them to be in. I don't see much of a difference between this and a new reader being added after you return from
IMO a simple test in say |
@simolus3 Thank you!
There is a difference in our only use case for this. We save information about the currently attached databases into a file, and the factory reads that file. So if we call withAllConnections, we need to make sure to wait for its completion before spawning a new reader, so that it knows about all the attachs, and it doesn't miss one because of a race condition |
98e183f
to
dfb1dbe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation looks good to me apart from one comment about a potential race condition.
packages/sqlite_async/lib/src/native/database/connection_pool.dart
Outdated
Show resolved
Hide resolved
packages/sqlite_async/lib/src/native/database/connection_pool.dart
Outdated
Show resolved
Hide resolved
The implementation looks good to me. For the web test to work, we also need to take dart2js into consideration (and apply some web-specific fixes). It looks like I don't have access to your branch, could you apply the following patch copying the test to diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart
index abe54a3..e2914b3 100644
--- a/packages/sqlite_async/test/basic_test.dart
+++ b/packages/sqlite_async/test/basic_test.dart
@@ -1,6 +1,4 @@
import 'dart:async';
-import 'dart:math';
-import 'package:collection/collection.dart';
import 'package:sqlite_async/sqlite3_common.dart';
import 'package:sqlite_async/sqlite_async.dart';
import 'package:test/test.dart';
@@ -9,6 +7,7 @@ import 'utils/test_utils_impl.dart';
final testUtils = TestUtils();
const _isDart2Wasm = bool.fromEnvironment('dart.tool.dart2wasm');
+const _isWeb = identical(0, 0.0) || _isDart2Wasm;
void main() {
group('Shared Basic Tests', () {
@@ -305,8 +304,7 @@ void main() {
});
test('with all connections', () async {
- // TODO: Is this right?
- final maxReaders = _isDart2Wasm ? 0 : 3;
+ final maxReaders = _isWeb ? 0 : 3;
final db = SqliteDatabase.withFactory(
await testUtils.testFactory(path: path),
@@ -315,53 +313,35 @@ void main() {
await db.initialize();
await createTables(db);
- Future<Row> readWithRandomDelay(SqliteReadContext ctx, int id) async {
- return await ctx.get(
- 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
- [id, 5 + Random().nextInt(10)]);
- }
-
// Warm up to spawn the max readers
- await Future.wait(
- [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)),
- );
+ await Future.wait([for (var i = 0; i < 10; i++) db.get('SELECT $i')]);
bool finishedWithAllConns = false;
late Future<void> readsCalledWhileWithAllConnsRunning;
- print("${DateTime.now()} start");
+ final parentZone = Zone.current;
await db.withAllConnections((writer, readers) async {
expect(readers.length, maxReaders);
// Run some reads during the block that they should run after the block finishes and releases
// all locks
- readsCalledWhileWithAllConnsRunning = Future.wait(
- [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
- final r = await db.readLock((c) async {
- expect(finishedWithAllConns, isTrue);
- return await readWithRandomDelay(c, i);
- });
- print(
- "${DateTime.now()} After withAllConnections, started while running $r");
- }),
- );
-
- await Future.wait([
- writer.execute(
- "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *",
- [
- 123,
- 5 + Random().nextInt(20)
- ]).then((value) =>
- print("${DateTime.now()} withAllConnections writer done $value")),
- ...readers
- .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) {
- print(
- "${DateTime.now()} withAllConnections readers done $results");
- }))
- ]);
- }).then((_) => finishedWithAllConns = true);
+ // Need a root zone here to avoid recursive lock errors.
+ readsCalledWhileWithAllConnsRunning =
+ Future(parentZone.bindCallback(() async {
+ await Future.wait(
+ [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
+ await db.readLock((c) async {
+ expect(finishedWithAllConns, isTrue);
+ await Future.delayed(const Duration(milliseconds: 100));
+ });
+ }),
+ );
+ }));
+
+ await Future.delayed(const Duration(milliseconds: 200));
+ finishedWithAllConns = true;
+ });
await readsCalledWhileWithAllConnsRunning;
});
diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart
index c580306..3f348e6 100644
--- a/packages/sqlite_async/test/native/basic_test.dart
+++ b/packages/sqlite_async/test/native/basic_test.dart
@@ -5,6 +5,7 @@ import 'dart:async';
import 'dart:io';
import 'dart:math';
+import 'package:collection/collection.dart';
import 'package:path/path.dart' show join;
import 'package:sqlite3/common.dart' as sqlite;
import 'package:sqlite_async/sqlite_async.dart';
@@ -161,6 +162,68 @@ void main() {
expect(readerInfo, {'state': 'updated'});
});
+ test('with all connections', () async {
+ final maxReaders = 3;
+
+ final db = SqliteDatabase.withFactory(
+ await testUtils.testFactory(path: path),
+ maxReaders: maxReaders,
+ );
+ await db.initialize();
+ await createTables(db);
+
+ Future<sqlite.Row> readWithRandomDelay(
+ SqliteReadContext ctx, int id) async {
+ return await ctx.get(
+ 'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
+ [id, 5 + Random().nextInt(10)]);
+ }
+
+ // Warm up to spawn the max readers
+ await Future.wait(
+ [1, 2, 3, 4, 5, 6, 7, 8].map((i) => readWithRandomDelay(db, i)),
+ );
+
+ bool finishedWithAllConns = false;
+
+ late Future<void> readsCalledWhileWithAllConnsRunning;
+
+ print("${DateTime.now()} start");
+ await db.withAllConnections((writer, readers) async {
+ expect(readers.length, maxReaders);
+
+ // Run some reads during the block that they should run after the block finishes and releases
+ // all locks
+ readsCalledWhileWithAllConnsRunning = Future.wait(
+ [1, 2, 3, 4, 5, 6, 7, 8].map((i) async {
+ final r = await db.readLock((c) async {
+ expect(finishedWithAllConns, isTrue);
+ return await readWithRandomDelay(c, i);
+ });
+ print(
+ "${DateTime.now()} After withAllConnections, started while running $r");
+ }),
+ );
+
+ await Future.wait([
+ writer.execute(
+ "INSERT OR REPLACE INTO test_data(id, description) SELECT ? as i, test_sleep(?) || ' ' || test_connection_name() || ' 1 ' || datetime() as connection RETURNING *",
+ [
+ 123,
+ 5 + Random().nextInt(20)
+ ]).then((value) =>
+ print("${DateTime.now()} withAllConnections writer done $value")),
+ ...readers
+ .mapIndexed((i, r) => readWithRandomDelay(r, i).then((results) {
+ print(
+ "${DateTime.now()} withAllConnections readers done $results");
+ }))
+ ]);
+ }).then((_) => finishedWithAllConns = true);
+
+ await readsCalledWhileWithAllConnsRunning;
+ });
+
test('read-only transactions', () async {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Fixes #62
This is a proposal API to fix the linked issue. We have been using it succesfully on our project for the past months. The main goal is to be able to attach a database on all the active connections, read and write. Additionally, we have our own open factory, which knows which databases are attached at any given time, so it will also attach new opened connections.
We use it like this: