Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Remove internal SQLDelight and SQLiter dependencies.
* Add `rawConnection` getter to `ConnectionContext`, which is a `SQLiteConnection` instance from
`androidx.sqlite` that can be used to step through statements in a custom way.
* Support asynchronous transactions and locks.

## 1.5.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class CrudTest {
),
)

database.writeTransaction { tx ->
database.writeTransactionAsync { tx ->
tx.execute(
"INSERT INTO foo (id,a,b,c) VALUES (uuid(), ?, ?, ?)",
listOf(
Expand Down
104 changes: 93 additions & 11 deletions core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.powersync

import androidx.sqlite.SQLiteConnection
import androidx.sqlite.execSQL
import app.cash.turbine.test
import app.cash.turbine.turbineScope
import co.touchlab.kermit.ExperimentalKermitApi
Expand Down Expand Up @@ -80,6 +78,7 @@ class DatabaseTest {
// Start a long running writeTransaction
val transactionJob =
scope.async {
@Suppress("DEPRECATION")
database.writeTransaction { tx ->
// Create another user
// External readers should not see this user while the transaction is open
Expand Down Expand Up @@ -115,6 +114,57 @@ class DatabaseTest {
assertEquals(afterTx.size, 2)
}

@Test
fun testConcurrentReadsAsync() =
databaseTest {
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf(
"steven",
"s@journeyapps.com",
),
)

val pausedTransaction = CompletableDeferred<Unit>()
val transactionItemCreated = CompletableDeferred<Unit>()
// Start a long running writeTransaction
val transactionJob =
scope.async {
database.writeTransactionAsync { tx ->
// Create another user
// External readers should not see this user while the transaction is open
tx.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf(
"steven",
"s@journeyapps.com",
),
)

transactionItemCreated.complete(Unit)

// Block this transaction until we free it
runBlocking {
pausedTransaction.await()
}
}
}

// Make sure to wait for the item to have been created in the transaction
transactionItemCreated.await()
// Try and read while the write transaction is busy
val result = database.getAll("SELECT * FROM users") { UserRow.from(it) }
// The transaction is not commited yet, we should only read 1 user
assertEquals(result.size, 1)

// Let the transaction complete
pausedTransaction.complete(Unit)
transactionJob.await()

val afterTx = database.getAll("SELECT * FROM users") { UserRow.from(it) }
assertEquals(afterTx.size, 2)
}

@Test
fun testTransactionReads() =
databaseTest {
Expand All @@ -126,6 +176,7 @@ class DatabaseTest {
),
)

@Suppress("DEPRECATION")
database.writeTransaction { tx ->
val userCount =
tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! }
Expand All @@ -146,6 +197,37 @@ class DatabaseTest {
}
}

@Test
fun testTransactionReadsAsync() =
databaseTest {
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf(
"steven",
"s@journeyapps.com",
),
)

database.writeTransactionAsync { tx ->
val userCount =
tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! }
assertEquals(userCount[0], 1)

tx.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf(
"steven",
"s@journeyapps.com",
),
)

// Getters inside the transaction should be able to see the latest update
val userCount2 =
tx.getAll("SELECT COUNT(*) as count FROM users") { cursor -> cursor.getLong(0)!! }
assertEquals(userCount2[0], 2)
}
}

@Test
fun testTableUpdates() =
databaseTest {
Expand All @@ -161,7 +243,7 @@ class DatabaseTest {
)
query.awaitItem() shouldHaveSize 1

database.writeTransaction {
database.writeTransactionAsync {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("Test2", "test2@example.org"),
Expand All @@ -175,11 +257,11 @@ class DatabaseTest {
query.awaitItem() shouldHaveSize 3

try {
database.writeTransaction {
database.writeTransactionAsync {
it.execute("DELETE FROM users;")
it.execute("syntax error, revert please (this is intentional from the unit test)")
}
} catch (e: Exception) {
} catch (_: Exception) {
// Ignore
}

Expand Down Expand Up @@ -229,7 +311,7 @@ class DatabaseTest {
// Request a lock
val lockJob =
scope.async {
database.readLock {
database.readLockAsync {
inLock.complete(Unit)
runBlocking {
pausedLock.await()
Expand All @@ -255,7 +337,7 @@ class DatabaseTest {
assertEquals(actual = database.closed, expected = false)

// Any new readLocks should throw
val exception = shouldThrow<PowerSyncException> { database.readLock {} }
val exception = shouldThrow<PowerSyncException> { database.readLockAsync {} }
exception.message shouldBe "Cannot process connection pool request"

// Release the lock
Expand Down Expand Up @@ -327,7 +409,7 @@ class DatabaseTest {
fun basicReadTransaction() =
databaseTest {
val count =
database.readTransaction { it ->
database.readTransactionAsync { it ->
it.get("SELECT COUNT(*) from users") { it.getLong(0)!! }
}
count shouldBe 0
Expand Down Expand Up @@ -416,7 +498,7 @@ class DatabaseTest {
listOf("a", "a@example.org"),
)

database.writeTransaction {
database.writeTransactionAsync {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "b@example.org"),
Expand All @@ -442,7 +524,7 @@ class DatabaseTest {
fun testCrudTransactions() =
databaseTest {
suspend fun insertInTransaction(size: Int) {
database.writeTransaction { tx ->
database.writeTransactionAsync { tx ->
repeat(size) {
tx.execute("INSERT INTO users (id, name, email) VALUES (uuid(), null, null)")
}
Expand Down Expand Up @@ -478,7 +560,7 @@ class DatabaseTest {
listOf("a", "a@example.org"),
)

database.writeTransaction {
database.writeTransactionAsync {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "b@example.org"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ internal class ActiveDatabaseTest(
return db
}

suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } }
suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLockAsync { } }

@OptIn(ExperimentalPowerSyncAPI::class)
fun createSyncClient(): HttpClient {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.powersync.bucket

import com.powersync.db.ScopedWriteQueries
import com.powersync.db.SqlCursor
import com.powersync.db.crud.CrudEntry
import com.powersync.db.internal.PowerSyncTransaction
import com.powersync.db.schema.SerializableSchema
import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
Expand All @@ -18,11 +18,11 @@ internal interface BucketStorage {

suspend fun nextCrudItem(): CrudEntry?

fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry?
suspend fun nextCrudItem(transaction: ScopedWriteQueries): CrudEntry?

suspend fun hasCrud(): Boolean

fun hasCrud(transaction: PowerSyncTransaction): Boolean
suspend fun hasCrud(transaction: ScopedWriteQueries): Boolean

fun mapCrudEntry(row: SqlCursor): CrudEntry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.powersync.bucket

import co.touchlab.kermit.Logger
import co.touchlab.stately.concurrency.AtomicBoolean
import com.powersync.db.ScopedWriteQueries
import com.powersync.db.SqlCursor
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.CrudRow
import com.powersync.db.internal.InternalDatabase
import com.powersync.db.internal.InternalTable
import com.powersync.db.internal.PowerSyncTransaction
import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncDataBatch
Expand Down Expand Up @@ -37,7 +37,7 @@ internal class BucketStorageImpl(

override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)

override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? =
override suspend fun nextCrudItem(transaction: ScopedWriteQueries): CrudEntry? =
transaction.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)

private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1"
Expand All @@ -56,7 +56,7 @@ internal class BucketStorageImpl(
return res == 1L
}

override fun hasCrud(transaction: PowerSyncTransaction): Boolean {
override suspend fun hasCrud(transaction: ScopedWriteQueries): Boolean {
val res = transaction.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper)
return res == 1L
}
Expand Down Expand Up @@ -85,10 +85,10 @@ internal class BucketStorageImpl(

logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" }

return db.writeTransaction { tx ->
return db.writeTransactionAsync { tx ->
if (hasCrud(tx)) {
logger.w { "[updateLocalTarget] ps crud is not empty" }
return@writeTransaction false
return@writeTransactionAsync false
}

val seqAfter =
Expand All @@ -101,21 +101,21 @@ internal class BucketStorageImpl(
if (seqAfter != seqBefore) {
logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore")
// New crud data may have been uploaded since we got the checkpoint. Abort.
return@writeTransaction false
return@writeTransactionAsync false
}

tx.execute(
"UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
listOf(opId),
)

return@writeTransaction true
return@writeTransactionAsync true
}
}

@LegacySyncImplementation
override suspend fun saveSyncData(syncDataBatch: SyncDataBatch) {
db.writeTransaction { tx ->
db.writeTransactionAsync { tx ->
val jsonString = JsonUtil.json.encodeToString(syncDataBatch)
tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
Expand Down Expand Up @@ -155,7 +155,7 @@ internal class BucketStorageImpl(

@LegacySyncImplementation
private suspend fun deleteBucket(bucketName: String) {
db.writeTransaction { tx ->
db.writeTransactionAsync { tx ->
tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
listOf("delete_bucket", bucketName),
Expand Down Expand Up @@ -219,7 +219,7 @@ internal class BucketStorageImpl(
}
}.map { it.bucket }

db.writeTransaction { tx ->
db.writeTransactionAsync { tx ->
tx.execute(
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)),
Expand Down Expand Up @@ -306,7 +306,7 @@ internal class BucketStorageImpl(
""
}

return db.writeTransaction { tx ->
return db.writeTransactionAsync { tx ->
tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
listOf("sync_local", args),
Expand Down Expand Up @@ -338,7 +338,7 @@ internal class BucketStorageImpl(
)
}

return@writeTransaction didApply
return@writeTransactionAsync didApply
}
}

Expand All @@ -355,7 +355,7 @@ internal class BucketStorageImpl(
}

override suspend fun control(args: PowerSyncControlArguments): List<Instruction> =
db.writeTransaction { tx ->
db.writeTransactionAsync { tx ->
logger.v { "powersync_control: $args" }

val (op: String, data: Any?) =
Expand Down
Loading
Loading