Skip to content

Commit 98e183f

Browse files
committed
prevent readers from spawning while running withAllConnections
1 parent c9830f6 commit 98e183f

File tree

2 files changed

+124
-5
lines changed

2 files changed

+124
-5
lines changed

packages/sqlite_async/lib/src/native/database/connection_pool.dart

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
3131

3232
final MutexImpl mutex;
3333

34+
bool _runningWithAllConnections = false;
35+
3436
@override
3537
bool closed = false;
3638

@@ -88,6 +90,11 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
8890
return;
8991
}
9092

93+
if (_availableReadConnections.isEmpty && _runningWithAllConnections) {
94+
// Wait until withAllConnections is done
95+
return;
96+
}
97+
9198
var nextItem = _queue.removeFirst();
9299
while (nextItem.completer.isCompleted) {
93100
// This item already timed out - try the next one if available
@@ -237,16 +244,26 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
237244
Future<T> Function(
238245
SqliteWriteContext writer, List<SqliteReadContext> readers)
239246
block) async {
240-
final blockCompleter = Completer<T>();
241-
final (write, reads) = await _lockAllConns<T>(blockCompleter);
247+
try {
248+
_runningWithAllConnections = true;
249+
250+
final blockCompleter = Completer<T>();
251+
final (write, reads) = await _lockAllConns<T>(blockCompleter);
242252

243253
try {
244254
final res = await block(write, reads);
245255
blockCompleter.complete(res);
246256
return res;
247257
} catch (e, st) {
248-
blockCompleter.completeError(e, st);
249-
rethrow;
258+
blockCompleter.completeError(e, st);
259+
rethrow;
260+
}
261+
} finally {
262+
_runningWithAllConnections = false;
263+
264+
// Continue processing any pending read requests that may have been queued while
265+
// the block was running.
266+
Timer.run(_nextRead);
250267
}
251268
}
252269

packages/sqlite_async/test/native/basic_test.dart

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
library;
33

44
import 'dart:async';
5+
import 'dart:io';
56
import 'dart:math';
67

78
import 'package:collection/collection.dart';
9+
import 'package:path/path.dart' show join;
810
import 'package:sqlite3/common.dart' as sqlite;
911
import 'package:sqlite3/sqlite3.dart' show Row;
1012
import 'package:sqlite_async/sqlite_async.dart';
1113
import 'package:test/test.dart';
1214

15+
import '../utils/abstract_test_utils.dart';
1316
import '../utils/test_utils_impl.dart';
1417

1518
final testUtils = TestUtils();
@@ -126,7 +129,7 @@ void main() {
126129

127130
print("${DateTime.now()} start");
128131
await db.withAllConnections((writer, readers) async {
129-
assert(readers.length == 3);
132+
expect(readers.length, 3);
130133

131134
// Run some reads during the block that they should run after the block finishes and releases
132135
// all locks
@@ -160,6 +163,77 @@ void main() {
160163
await readsCalledWhileWithAllConnsRunning;
161164
});
162165

166+
test('prevent opening new readers while in withAllConnections', () async {
167+
final sharedStateDir = Directory.systemTemp.createTempSync();
168+
addTearDown(() => sharedStateDir.deleteSync(recursive: true));
169+
170+
final File sharedStateFile =
171+
File(join(sharedStateDir.path, 'shared-state.txt'));
172+
173+
sharedStateFile.writeAsStringSync('initial');
174+
175+
final db = SqliteDatabase.withFactory(
176+
_TestSqliteOpenFactoryWithSharedStateFile(
177+
path: path, sharedStateFilePath: sharedStateFile.path),
178+
maxReaders: 3);
179+
await db.initialize();
180+
await createTables(db);
181+
182+
// The writer saw 'initial' in the file when opening the connection
183+
expect(
184+
await db
185+
.writeLock((c) => c.get('SELECT file_contents_on_open() AS state')),
186+
{'state': 'initial'},
187+
);
188+
189+
final withAllConnectionsCompleter = Completer<void>();
190+
191+
final withAllConnsFut = db.withAllConnections((writer, readers) async {
192+
expect(readers.length, 0); // No readers yet
193+
194+
// Simulate some work until the file is updated
195+
await Future.delayed(const Duration(milliseconds: 200));
196+
sharedStateFile.writeAsStringSync('updated');
197+
198+
await withAllConnectionsCompleter.future;
199+
});
200+
201+
// Start a reader that gets the contents of the shared file
202+
bool readFinished = false;
203+
final someReadFut =
204+
db.get('SELECT file_contents_on_open() AS state', []).then((r) {
205+
readFinished = true;
206+
return r;
207+
});
208+
209+
// The withAllConnections should prevent the reader from opening
210+
await Future.delayed(const Duration(milliseconds: 100));
211+
expect(readFinished, isFalse);
212+
213+
// Free all the locks
214+
withAllConnectionsCompleter.complete();
215+
await withAllConnsFut;
216+
217+
final readerInfo = await someReadFut;
218+
expect(readFinished, isTrue);
219+
// The read should see the updated value in the file. This checks
220+
// that a reader doesn't spawn while running withAllConnections
221+
expect(readerInfo, {'state': 'updated'});
222+
});
223+
224+
test('slow first', () async {
225+
final db = SqliteDatabase.withFactory(
226+
await testUtils.testFactory(path: path),
227+
maxReaders: 1);
228+
await db.initialize();
229+
await createTables(db);
230+
231+
print("STAAAART");
232+
await Future.wait([1000, 10].map((t) => db.get(
233+
'SELECT ? as i, test_sleep(?) as sleep, test_connection_name() as connection',
234+
[t, t])));
235+
});
236+
163237
test('read-only transactions', () async {
164238
final db = await testUtils.setupDatabase(path: path);
165239
await createTables(db);
@@ -439,3 +513,31 @@ class _InvalidPragmaOnOpenFactory extends DefaultSqliteOpenFactory {
439513
];
440514
}
441515
}
516+
517+
class _TestSqliteOpenFactoryWithSharedStateFile
518+
extends TestDefaultSqliteOpenFactory {
519+
final String sharedStateFilePath;
520+
521+
_TestSqliteOpenFactoryWithSharedStateFile(
522+
{required super.path, required this.sharedStateFilePath});
523+
524+
@override
525+
sqlite.CommonDatabase open(SqliteOpenOptions options) {
526+
final File sharedStateFile = File(sharedStateFilePath);
527+
final String sharedState = sharedStateFile.readAsStringSync();
528+
529+
final db = super.open(options);
530+
531+
// Function to return the contents of the shared state file at the time of opening
532+
// so that we know at which point the factory was called.
533+
db.createFunction(
534+
functionName: 'file_contents_on_open',
535+
argumentCount: const sqlite.AllowedArgumentCount(0),
536+
function: (args) {
537+
return sharedState;
538+
},
539+
);
540+
541+
return db;
542+
}
543+
}

0 commit comments

Comments
 (0)