diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 107319d8..ef208310 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -11,7 +11,7 @@ jobs: outputs: tag: ${{ steps.tag.outputs.tag }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: fetch-depth: 0 @@ -32,7 +32,7 @@ jobs: maven_publish: runs-on: macos-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Validate Gradle Wrapper uses: gradle/wrapper-validation-action@v1 - uses: actions/cache@v3 @@ -81,14 +81,14 @@ jobs: with: cache-encryption-key: ${{ secrets.GRADLE_ENCRYPTION_KEY }} - name: Build frameworks - run: "./gradlew PowerSyncKotlin:buildRelease" + run: "./gradlew internal:PowerSyncKotlin:buildRelease" - uses: actions/upload-artifact@v4 with: name: XCFramework retention-days: 1 # Only used temporarily compression-level: 0 # We're already uploading a compressed file - path: PowerSyncKotlin/build/FrameworkArchives/PowersyncKotlinRelease.zip + path: internal/PowerSyncKotlin/build/FrameworkArchives/PowersyncKotlinRelease.zip if-no-files-found: error add_assets: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9c5cde47..4b6bf5f3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,3 +74,55 @@ jobs: path: | **/build/reports/ **/build/test-results/ + + android_emulator: + runs-on: ubuntu-latest + timeout-minutes: 30 + env: + AVD_NAME: ubuntu-avd-x86_64-31 + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Validate Gradle Wrapper + uses: gradle/actions/wrapper-validation@v4 + - uses: actions/cache@v4 + with: + path: ~/.konan + key: ${{ runner.os }}-${{ hashFiles('**/.lock') }} + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + - name: Set up Gradle + uses: gradle/actions/setup-gradle@v4 + with: + cache-encryption-key: ${{ secrets.GRADLE_ENCRYPTION_KEY }} + - name: AVD Cache + uses: actions/cache@v4 + id: avd-cache + with: + path: | + ~/.android/avd/* + ~/.android/adb* + key: avd-31 + + # https://github.com/ReactiveCircus/android-emulator-runner?tab=readme-ov-file#usage--examples + - name: Enable KVM + run: | + echo 'KERNEL=="kvm", GROUP="kvm", MODE="0666", OPTIONS+="static_node=kvm"' | sudo tee /etc/udev/rules.d/99-kvm4all.rules + sudo udevadm control --reload-rules + sudo udevadm trigger --name-match=kvm + + - name: emulator tests + uses: reactivecircus/android-emulator-runner@v2 + with: + api-level: 31 + force-avd-creation: false + target: google_apis + arch: x86_64 + disable-animations: false + avd-name: $AVD_NAME + emulator-options: -no-window -gpu swiftshader_indirect -noaudio -no-boot-anim -camera-back none + script: ./gradlew --scan core-tests-android:connectedCheck diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b42907a..f3123235 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 1.7.0 (unreleased) + +- Add `PowerSyncDatabase.inMemory` to create an in-memory SQLite database with PowerSync. + This may be useful for testing. +- The Supabase connector can now be subclassed to customize how rows are uploaded and how errors are handled. +- Experimental support for sync streams. + ## 1.6.1 * Fix `dlopen failed: library "libpowersync.so.so" not found` errors on Android. diff --git a/README.md b/README.md index 56c60fcc..ad0d92ed 100644 --- a/README.md +++ b/README.md @@ -26,19 +26,14 @@ and API documentation [here](https://powersync-ja.github.io/powersync-kotlin/). - This is the Kotlin Multiplatform SDK implementation. -- [connectors](./connectors/) - - - [SupabaseConnector.kt](./connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt) An example connector implementation for Supabase (Postgres). The backend - connector provides the connection between your application backend and the PowerSync managed database. It is used to: - 1. Retrieve a token to connect to the PowerSync service. - 2. Apply local changes on your backend application server (and from there, to your backend database). - - [integrations](./integrations/) - - [room](./integrations/room/README.md): Allows using the [Room database library](https://developer.android.com/jetpack/androidx/releases/room) - with PowerSync, making it easier to run typed queries on the database. + - [room](./integrations/room/README.md): Allows using the [Room database library](https://developer.android.com/jetpack/androidx/releases/room) with PowerSync, making it easier to run typed queries on the database. - [sqldelight](./integrations/sqldelight/README.md): Allows using [SQLDelight](https://sqldelight.github.io/sqldelight) with PowerSync, also enabling typed statements on the database. - + - [SupabaseConnector.kt](./integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt) An example connector implementation for Supabase (Postgres). The backend + connector provides the connection between your application backend and the PowerSync managed database. It is used to: + 1. Retrieve a token to connect to the PowerSync service. + 2. Apply local changes on your backend application server (and from there, to your backend database). ## Demo Apps / Example Projects diff --git a/build.gradle.kts b/build.gradle.kts index 967269ec..e5d1df9c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -68,10 +68,10 @@ tasks.getByName("clean") { // Merges individual module docs into a single HTML output dependencies { dokka(project(":core:")) - dokka(project(":connectors:supabase")) dokka(project(":compose:")) dokka(project(":integrations:room")) dokka(project(":integrations:sqldelight")) + dokka(project(":integrations:supabase")) } dokka { diff --git a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt index a1221e29..8ee2b28f 100644 --- a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt +++ b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt @@ -26,3 +26,5 @@ public fun BundledSQLiteDriver.addPowerSyncExtension() { @ExperimentalPowerSyncAPI @Throws(PowerSyncException::class) public actual fun resolvePowerSyncLoadableExtensionPath(): String? = "libpowersync.so" + +internal actual fun openInMemoryConnection(): SQLiteConnection = BundledSQLiteDriver().also { it.addPowerSyncExtension() }.open(":memory:") diff --git a/core/src/appleNonWatchOsMain/kotlin/com/powersync/DatabaseDriverFactory.appleNonWatchOs.kt b/core/src/appleNonWatchOsMain/kotlin/com/powersync/DatabaseDriverFactory.appleNonWatchOs.kt index dda195e8..54d2033b 100644 --- a/core/src/appleNonWatchOsMain/kotlin/com/powersync/DatabaseDriverFactory.appleNonWatchOs.kt +++ b/core/src/appleNonWatchOsMain/kotlin/com/powersync/DatabaseDriverFactory.appleNonWatchOs.kt @@ -25,3 +25,5 @@ public actual class DatabaseDriverFactory { @ExperimentalPowerSyncAPI @Throws(PowerSyncException::class) public actual fun resolvePowerSyncLoadableExtensionPath(): String? = powerSyncExtensionPath + +internal actual fun openInMemoryConnection(): SQLiteConnection = DatabaseDriverFactory().openConnection(":memory:", 0x02) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 810d25d9..97eee155 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,7 +1,5 @@ package com.powersync -import androidx.sqlite.SQLiteConnection -import androidx.sqlite.execSQL import app.cash.turbine.test import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/db/InMemoryTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/db/InMemoryTest.kt new file mode 100644 index 00000000..ebc04960 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/db/InMemoryTest.kt @@ -0,0 +1,76 @@ +package com.powersync.db + +import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.PowerSyncDatabase +import com.powersync.db.schema.Column +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +@OptIn(ExperimentalKermitApi::class) +class InMemoryTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + + private val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(logWriter), + ), + ) + + @Test + fun createsSchema() = + runTest { + val db = PowerSyncDatabase.Companion.inMemory(schema, this, logger) + try { + db.getAll("SELECT * FROM users") { } shouldHaveSize 0 + } finally { + db.close() + } + } + + @Test + fun watch() = + runTest { + val db = PowerSyncDatabase.Companion.inMemory(schema, this, logger) + try { + turbineScope { + val turbine = + db.watch("SELECT name FROM users", mapper = { it.getString(0)!! }).testIn(this) + + turbine.awaitItem() shouldBe listOf() + + db.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("test user")) + turbine.awaitItem() shouldBe listOf("test user") + turbine.cancelAndIgnoreRemainingEvents() + } + } finally { + db.close() + } + } + + companion object { + private val schema = + Schema( + Table( + name = "users", + columns = + listOf( + Column.Companion.text("name"), + ), + ), + ) + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 306db1f9..a1eb15d2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException import com.powersync.TestConnector import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector @@ -165,7 +165,7 @@ abstract class BaseSyncIntegrationTest( add( BucketChecksum( bucket = "bucket$prio", - priority = BucketPriority(prio), + priority = StreamPriority(prio), checksum = 10 + prio, ), ) @@ -218,7 +218,7 @@ abstract class BaseSyncIntegrationTest( // Emit a partial sync complete for each priority but the last. for (priorityNo in 0..<3) { - val priority = BucketPriority(priorityNo) + val priority = StreamPriority(priorityNo) pushData(priorityNo) syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -258,7 +258,7 @@ abstract class BaseSyncIntegrationTest( listOf( BucketChecksum( bucket = "bkt", - priority = BucketPriority(1), + priority = StreamPriority(1), checksum = 0, ), ), @@ -268,17 +268,17 @@ abstract class BaseSyncIntegrationTest( syncLines.send( SyncLine.CheckpointPartiallyComplete( lastOpId = "0", - priority = BucketPriority(1), + priority = StreamPriority(1), ), ) - database.waitForFirstSync(BucketPriority(1)) + database.waitForFirstSync(StreamPriority(1)) database.close() // Connect to the same database again database = openDatabaseAndInitialize() database.currentStatus.hasSynced shouldBe false - database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true + database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true } @Test diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index c2b3cb9e..a3f1e0c2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,10 +3,10 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.testutils.ActiveDatabaseTest import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor @@ -35,7 +35,7 @@ abstract class BaseSyncProgressTest( private fun bucket( name: String, count: Int, - priority: BucketPriority = BucketPriority(3), + priority: StreamPriority = StreamPriority(3), ): BucketChecksum = BucketChecksum( bucket = name, @@ -68,7 +68,7 @@ abstract class BaseSyncProgressTest( ) } - private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) { + private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) { if (priority != null) { syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -93,7 +93,7 @@ abstract class BaseSyncProgressTest( private suspend fun ReceiveTurbine.expectProgress( total: Pair, - priorities: Map> = emptyMap(), + priorities: Map> = emptyMap(), ) { val item = awaitItem() val progress = item.downloadProgress ?: error("Expected download progress on $item") @@ -357,7 +357,7 @@ abstract class BaseSyncProgressTest( ) { turbine.expectProgress( prio2, - mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2), + mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2), ) } @@ -367,8 +367,8 @@ abstract class BaseSyncProgressTest( lastOpId = "10", checksums = listOf( - bucket("a", 5, BucketPriority(0)), - bucket("b", 5, BucketPriority(2)), + bucket("a", 5, StreamPriority(0)), + bucket("b", 5, StreamPriority(2)), ), ), ), @@ -378,7 +378,7 @@ abstract class BaseSyncProgressTest( addDataLine("a", 5) expectProgress(5 to 5, 5 to 10) - addCheckpointComplete(BucketPriority(0)) + addCheckpointComplete(StreamPriority(0)) expectProgress(5 to 5, 5 to 10) addDataLine("b", 2) @@ -390,8 +390,8 @@ abstract class BaseSyncProgressTest( lastOpId = "14", updatedBuckets = listOf( - bucket("a", 8, BucketPriority(0)), - bucket("b", 6, BucketPriority(2)), + bucket("a", 8, StreamPriority(0)), + bucket("b", 6, StreamPriority(2)), ), removedBuckets = emptyList(), ), diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt new file mode 100644 index 00000000..9f23f9b4 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -0,0 +1,289 @@ +package com.powersync.sync + +import app.cash.turbine.turbineScope +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.StreamPriority +import com.powersync.testutils.databaseTest +import com.powersync.testutils.waitFor +import com.powersync.utils.JsonParam +import com.powersync.utils.JsonUtil +import io.kotest.matchers.collections.shouldHaveSingleElement +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.put +import kotlin.test.Test + +@OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) +class SyncStreamTest : AbstractSyncTest(true) { + @Test + fun `can disable default streams`() = + databaseTest { + database.connect( + connector, + options = + SyncOptions( + newClientImplementation = true, + includeDefaultStreams = false, + clientConfiguration = SyncClientConfiguration.ExistingClient(createSyncClient()), + ), + ) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + streams["include_defaults"]!!.jsonPrimitive.content shouldBe "false" + + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscribes with streams`() = + databaseTest { + val a = database.syncStream("stream", mapOf("foo" to JsonParam.String("a"))).subscribe() + val b = database.syncStream("stream", mapOf("foo" to JsonParam.String("b"))).subscribe(priority = StreamPriority(1)) + + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Should request subscriptions + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 2 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe + """{"stream":"stream","parameters":{"foo":"a"},"override_priority":null}""" + JsonUtil.json.encodeToString(subscriptions[1]) shouldBe + """{"stream":"stream","parameters":{"foo":"b"},"override_priority":1}""" + true + } + + syncLines.send( + checkpointLine( + listOf( + bucket( + "a", + 3, + subscriptions = + buildJsonArray { + add(defaultSubscription(0)) + }, + ), + bucket( + "b", + 1, + subscriptions = + buildJsonArray { + add(defaultSubscription(1)) + }, + ), + ), + listOf(stream("stream", false)), + ), + ) + + // Subscriptions should be active now, but not marked as synced. + var status = turbine.awaitItem() + for (subscription in listOf(a, b)) { + val subscriptionStatus = status.forStream(subscription)!! + subscriptionStatus.subscription.active shouldBe true + subscriptionStatus.subscription.lastSyncedAt shouldBe null + subscriptionStatus.subscription.hasExplicitSubscription shouldBe true + } + + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = "0", + priority = StreamPriority(1), + ), + ) + status = turbine.awaitItem() + status.forStream(a)!!.subscription.lastSyncedAt shouldBe null + status.forStream(b)!!.subscription.lastSyncedAt shouldNotBeNull {} + b.waitForFirstSync() + + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "0")) + a.waitForFirstSync() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `reports default streams`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + syncLines.send(checkpointLine(listOf(), listOf(stream("default_stream", true)))) + + val status = turbine.awaitItem() + status.syncStreams!! shouldHaveSingleElement { + it.subscription.name shouldBe "default_stream" + it.subscription.parameters shouldBe null + it.subscription.isDefault shouldBe true + it.subscription.hasExplicitSubscription shouldBe false + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `changes subscriptions dynamically`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + requestedSyncStreams.clear() + + val subscription = database.syncStream("a").subscribe() + waitForSyncLinesChannelClosed() + + // Adding the subscription should reconnect + turbine.waitFor { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 1 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe """{"stream":"a","parameters":null,"override_priority":null}""" + true + } + + // Given that the subscription has a default TTL, unsubscribing should not re-subscribe. + subscription.unsubscribe() + delay(100) + turbine.expectNoEvents() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscriptions update while offline`() = + databaseTest { + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.awaitItem() // Ignore initial + + // Subscribing while offline should add the stream to the subscriptions reported in the + // status. + val subscription = database.syncStream("foo").subscribe() + val status = turbine.awaitItem() + status.forStream(subscription) shouldNotBeNull {} + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `unsubscribing multiple times has no effect`() = + databaseTest { + val a = database.syncStream("a").subscribe() + val aAgain = database.syncStream("a").subscribe() + a.unsubscribe() + a.unsubscribe() + + // Pretend the streams are expired - they should still be requested because the core + // extension extends the lifetime of streams currently referenced before connecting. + database.execute("UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000") + + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 1 + true + } + aAgain.unsubscribe() + } + + @Test + fun unsubscribeAll() = + databaseTest { + val a = database.syncStream("a").subscribe() + database.syncStream("a").unsubscribeAll() + + // Despite a being active, it should not be requested. + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 0 + true + } + a.unsubscribe() + } +} + +@OptIn(ExperimentalSerializationApi::class) +private fun checkpointLine( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("checkpoint", checkpoint(buckets, streams)) + } + +@OptIn(ExperimentalSerializationApi::class) +private fun checkpoint( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("last_op_id", "0") + put("buckets", buildJsonArray { addAll(buckets) }) + put("streams", buildJsonArray { addAll(streams) }) + } + +private fun bucket( + name: String, + priority: Int, + subscriptions: JsonArray? = null, +): JsonObject = + buildJsonObject { + put("bucket", name) + put("priority", priority) + put("checksum", 0) + subscriptions?.let { put("subscriptions", it) } + } + +private fun stream( + name: String, + isDefault: Boolean, +): JsonObject = + buildJsonObject { + put("name", name) + put("is_default", isDefault) + put("errors", JsonArray(emptyList())) + } + +private fun defaultSubscription(index: Int): JsonObject = buildJsonObject { put("sub", index) } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 0b533cfd..fed689ff 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -23,10 +23,12 @@ import io.ktor.client.HttpClient import io.ktor.client.engine.mock.toByteArray import io.ktor.http.ContentType import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path import kotlinx.serialization.json.JsonElement +import kotlin.coroutines.resume expect val factory: DatabaseDriverFactory @@ -102,6 +104,23 @@ internal class ActiveDatabaseTest( var connector = TestConnector() + suspend fun waitForSyncLinesChannelClosed() { + suspendCancellableCoroutine { continuation -> + var cancelled = false + continuation.invokeOnCancellation { + cancelled = true + } + + syncLines.invokeOnClose { + if (!cancelled) { + continuation.resume(Unit) + } + + syncLines = Channel() + } + } + } + fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } val db = @@ -123,7 +142,7 @@ internal class ActiveDatabaseTest( fun createSyncClient(): HttpClient { val engine = MockSyncService( - lines = syncLines, + lines = { syncLines }, generateCheckpoint = { checkpointResponse() }, syncLinesContentType = { syncLinesContentType }, trackSyncRequest = { diff --git a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt index eb2d67a9..18698c95 100644 --- a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt @@ -17,6 +17,8 @@ public expect class DatabaseDriverFactory { ): SQLiteConnection } +internal expect fun openInMemoryConnection(): SQLiteConnection + /** * Resolves a path to the loadable PowerSync core extension library. * diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index de2b1399..bdf8e66b 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -1,7 +1,7 @@ package com.powersync import co.touchlab.kermit.Logger -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.ActiveDatabaseGroup import com.powersync.db.ActiveDatabaseResource @@ -10,10 +10,13 @@ import com.powersync.db.Queries import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudTransaction import com.powersync.db.driver.SQLiteConnectionPool +import com.powersync.db.driver.SingleConnectionPool import com.powersync.db.schema.Schema import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam +import com.powersync.utils.generateLogger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.firstOrNull @@ -66,7 +69,7 @@ public interface PowerSyncDatabase : Queries { * given [priority] (or a higher one, since those would be synchronized first) has completed. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun waitForFirstSync(priority: BucketPriority) + public suspend fun waitForFirstSync(priority: StreamPriority) /** * Connect to the PowerSync service, and keep the databases in sync. @@ -181,6 +184,17 @@ public interface PowerSyncDatabase : Queries { @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getPowerSyncVersion(): String + /** + * Create a [SyncStream] instance for the given [name] and [parameters]. + * + * Use [SyncStream.subscribe] on the returned instance to subscribe to the stream. + */ + @ExperimentalPowerSyncAPI + public fun syncStream( + name: String, + parameters: Map? = null, + ): SyncStream + /** * Close the sync connection. * @@ -233,6 +247,29 @@ public interface PowerSyncDatabase : Queries { return openedWithGroup(pool, scope, schema, logger, group) } + /** + * Creates an in-memory PowerSync database instance, useful for testing. + */ + @OptIn(ExperimentalPowerSyncAPI::class) + public fun inMemory( + schema: Schema, + scope: CoroutineScope, + logger: Logger? = null, + ): PowerSyncDatabase { + val logger = generateLogger(logger) + // Since this returns a fresh in-memory database every time, use a fresh group to avoid warnings about the + // same database being opened multiple times. + val collection = ActiveDatabaseGroup.GroupsCollection().referenceDatabase(logger, "test") + + return openedWithGroup( + SingleConnectionPool(openInMemoryConnection()), + scope, + schema, + logger, + collection, + ) + } + @ExperimentalPowerSyncAPI internal fun openedWithGroup( pool: SQLiteConnectionPool, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 2fe4c042..30269d69 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable internal data class BucketChecksum( val bucket: String, - val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY, + val priority: StreamPriority = StreamPriority.DEFAULT_PRIORITY, val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index bf4fa151..8ea60a38 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,6 +1,7 @@ package com.powersync.bucket import com.powersync.db.SqlCursor +import com.powersync.db.StreamKey import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.SerializableSchema @@ -9,6 +10,7 @@ import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject @@ -49,7 +51,7 @@ internal interface BucketStorage { @LegacySyncImplementation suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority? = null, + partialPriority: StreamPriority? = null, ): SyncLocalDatabaseResult suspend fun control(args: PowerSyncControlArguments): List @@ -65,6 +67,10 @@ internal sealed interface PowerSyncControlArguments { class Start( val parameters: JsonObject, val schema: SerializableSchema, + @SerialName("include_defaults") + val includeDefaults: Boolean, + @SerialName("active_streams") + val activeStreams: List, ) : PowerSyncControlArguments { override val sqlArguments: Pair get() = "start" to JsonUtil.json.encodeToString(this) @@ -99,6 +105,12 @@ internal sealed interface PowerSyncControlArguments { data object ResponseStreamEnd : PowerSyncControlArguments { override val sqlArguments: Pair = "connection" to "end" } + + class UpdateSubscriptions( + activeStreams: List, + ) : PowerSyncControlArguments { + override val sqlArguments: Pair = "update_subscriptions" to JsonUtil.json.encodeToString(activeStreams) + } } @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index b9c9e58a..15b126b2 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -196,7 +196,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation override suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority?, + partialPriority: StreamPriority?, ): SyncLocalDatabaseResult { val result = validateChecksums(targetCheckpoint, partialPriority) @@ -250,7 +250,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun validateChecksums( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): SyncLocalDatabaseResult { val serializedCheckpoint = JsonUtil.json.encodeToString( @@ -286,11 +286,11 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun updateObjectsFromBuckets( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): Boolean { @Serializable data class SyncLocalArgs( - val priority: BucketPriority, + val priority: StreamPriority, val buckets: List, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index 5dc4823b..4cf92bdf 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -3,6 +3,7 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement @LegacySyncImplementation @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt similarity index 52% rename from core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt rename to core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt index 60073707..333e4274 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt @@ -2,25 +2,29 @@ package com.powersync.bucket import kotlinx.serialization.Serializable import kotlin.jvm.JvmInline +import kotlin.jvm.JvmName + +@Deprecated("Use StreamPriority instead") +public typealias BucketPriority = StreamPriority @JvmInline @Serializable -public value class BucketPriority( +public value class StreamPriority( private val priorityCode: Int, -) : Comparable { +) : Comparable { init { require(priorityCode >= 0) } - override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode) + override fun compareTo(other: StreamPriority): Int = other.priorityCode.compareTo(priorityCode) - public companion object { - internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE) + public companion object Companion { + internal val FULL_SYNC_PRIORITY: StreamPriority = StreamPriority(Int.MAX_VALUE) /** * The assumed priority for buckets when talking to older sync service instances that don't * support bucket priorities. */ - internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3) + internal val DEFAULT_PRIORITY: StreamPriority = StreamPriority(3) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 2a02d86e..ddb8abe5 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -4,9 +4,9 @@ import co.touchlab.kermit.Logger import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException -import com.powersync.bucket.BucketPriority import com.powersync.bucket.BucketStorage import com.powersync.bucket.BucketStorageImpl +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry @@ -19,7 +19,8 @@ import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.PriorityStatusEntry +import com.powersync.sync.CoreSyncStatus +import com.powersync.sync.StreamingSyncClient import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData @@ -42,11 +43,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlinx.datetime.LocalDateTime -import kotlinx.datetime.TimeZone -import kotlinx.datetime.toInstant import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Instant /** * A PowerSync managed database. @@ -80,6 +77,7 @@ internal class PowerSyncDatabaseImpl( get() = activeDatabaseGroup.first.group.identifier private val resource = activeDatabaseGroup.first + private val streams = StreamTracker(this) private val internalDb = InternalDatabaseImpl(pool, logger) @@ -111,7 +109,7 @@ internal class PowerSyncDatabaseImpl( } updateSchemaInternal(schema) - updateHasSynced() + resolveOfflineSyncStatus() } private suspend fun waitReady() { @@ -150,7 +148,7 @@ internal class PowerSyncDatabaseImpl( disconnectInternal() connectInternal(crudThrottleMs) { scope -> - SyncStream( + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = suspend { connector.uploadData(this) }, @@ -160,6 +158,7 @@ internal class PowerSyncDatabaseImpl( uploadScope = scope, options = options, schema = schema, + activeSubscriptions = streams.currentlyReferencedStreams, ) } } @@ -167,12 +166,12 @@ internal class PowerSyncDatabaseImpl( private fun connectInternal( crudThrottleMs: Long, - createStream: (CoroutineScope) -> SyncStream, + createStream: (CoroutineScope) -> StreamingSyncClient, ) { val db = this val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job - var activeStream: SyncStream? = null + var activeStream: StreamingSyncClient? = null scope.launch(job) { // Create the stream in this scope so that everything launched by the stream is bound to @@ -316,6 +315,11 @@ internal class PowerSyncDatabaseImpl( } } + override fun syncStream( + name: String, + parameters: Map?, + ): SyncStream = PendingStream(streams, name, parameters) + override suspend fun getPowerSyncVersion(): String { // The initialization sets powerSyncVersion. waitReady() @@ -466,57 +470,31 @@ internal class PowerSyncDatabaseImpl( currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } } - private suspend fun updateHasSynced() { - data class SyncedAt( - val priority: BucketPriority, - val syncedAt: Instant?, - ) - - // Query the database to see if any data has been synced - val syncedAtRows = - internalDb.getAll("SELECT * FROM ps_sync_state ORDER BY priority") { - val rawTime = it.getString(1)!! - - SyncedAt( - priority = BucketPriority(it.getLong(0)!!.toInt()), - syncedAt = - LocalDateTime - .parse(rawTime.replace(" ", "T")) - .toInstant(TimeZone.UTC), - ) + internal suspend fun resolveOfflineSyncStatusIfNotConnected() { + mutex.withLock { + if (syncSupervisorJob == null) { + // Not connected or connecting + resolveOfflineSyncStatus() } + } + } - val priorityStatus = mutableListOf() - var lastSyncedAt: Instant? = null - - for (row in syncedAtRows) { - if (row.priority == BucketPriority.FULL_SYNC_PRIORITY) { - lastSyncedAt = row.syncedAt - } else { - priorityStatus.add( - PriorityStatusEntry( - priority = row.priority, - lastSyncedAt = row.syncedAt, - hasSynced = true, - ), - ) + private suspend fun resolveOfflineSyncStatus() { + val offlineSyncStatus = + internalDb.get("SELECT powersync_offline_sync_status()") { + JsonUtil.json.decodeFromString(it.getString(0)!!) } - } currentStatus.update { - copy( - hasSynced = lastSyncedAt != null, - lastSyncedAt = lastSyncedAt, - priorityStatusEntries = priorityStatus, - ) + applyCoreChanges(offlineSyncStatus) } } override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) - override suspend fun waitForFirstSync(priority: BucketPriority) = waitForFirstSyncImpl(priority) + override suspend fun waitForFirstSync(priority: StreamPriority) = waitForFirstSyncImpl(priority) - private suspend fun waitForFirstSyncImpl(priority: BucketPriority?) { + private suspend fun waitForFirstSyncImpl(priority: StreamPriority?) { val predicate: (SyncStatusData) -> Boolean = if (priority == null) { { it.hasSynced == true } @@ -524,6 +502,10 @@ internal class PowerSyncDatabaseImpl( { it.statusForPriority(priority).hasSynced == true } } + waitForStatusMatching(predicate) + } + + internal suspend fun waitForStatusMatching(predicate: (SyncStatusData) -> Boolean) { if (predicate(currentStatus)) { return } diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt new file mode 100644 index 00000000..200a410b --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -0,0 +1,172 @@ +package com.powersync.db + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow +import com.powersync.sync.SyncStream +import com.powersync.sync.SyncStreamSubscription +import com.powersync.utils.JsonParam +import com.powersync.utils.toJsonObject +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlin.time.Duration + +internal class StreamTracker( + val db: PowerSyncDatabaseImpl, +) { + val groupMutex = Mutex() + val streamGroups = mutableMapOf() + val currentlyReferencedStreams = MutableStateFlow(listOf()) + + suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { + db.writeTransaction { tx -> + tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) + } + db.resolveOfflineSyncStatusIfNotConnected() + } + + internal suspend fun subscribe( + stream: PendingStream, + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription { + val key = stream.key + subscriptionsCommand( + RustSubscriptionChangeRequest( + subscribe = + SubscribeToStream( + stream = key, + ttl = ttl?.inWholeSeconds?.toInt(), + priority = priority, + ), + ), + ) + + return groupMutex.withLock { + var didAddNewGroup = false + val group = + streamGroups.getOrPut(key) { + didAddNewGroup = true + SubscriptionGroup(this, key) + } + + if (didAddNewGroup) { + val updatedStreams = streamGroups.values.toList() + currentlyReferencedStreams.value = updatedStreams + } + + SubscriptionImplementation(group) + } + } + + internal fun removeStreamGroup(key: StreamKey) { + streamGroups.remove(key)?.also { it.active = false } + currentlyReferencedStreams.value = streamGroups.values.toList() + } + + private companion object { + private val jsonDontEncodeDefaults = + Json { + // We don't want to encode defaults so that the RustSubscriptionChangeRequest encodes to the + // correct enum structure with only one field set. + encodeDefaults = false + } + } +} + +internal class PendingStream( + private val tracker: StreamTracker, + override val name: String, + val userParameters: Map?, +) : SyncStream { + override val parameters: Map? + get() { + val obj = userParameters?.toJsonObject() ?: return null + return TypedRow(obj) + } + + val key: StreamKey get() { + val jsonParameters = userParameters?.toJsonObject() + return StreamKey(name, jsonParameters) + } + + override suspend fun subscribe( + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription = tracker.subscribe(this, ttl, priority) + + override suspend fun unsubscribeAll() { + tracker.groupMutex.withLock { + tracker.removeStreamGroup(key) + tracker.subscriptionsCommand(RustSubscriptionChangeRequest(unsubscribe = key)) + } + } +} + +internal class SubscriptionGroup( + val tracker: StreamTracker, + val key: StreamKey, + var refcount: Int = 0, + var active: Boolean = true, +) { + suspend fun decrementRefCount() { + tracker.groupMutex.withLock { + refcount-- + if (refcount == 0 && active) { + tracker.removeStreamGroup(key) + } + } + } +} + +private class SubscriptionImplementation( + val group: SubscriptionGroup, +) : SyncStreamSubscription { + init { + group.refcount++ + } + + private var subscribed = true + + override val name: String + get() = group.key.name + + override val parameters: Map? = group.key.params?.let { TypedRow(it) } + + @OptIn(ExperimentalPowerSyncAPI::class) + override suspend fun waitForFirstSync() { + group.tracker.db.waitForStatusMatching { it.forStream(this)?.subscription?.hasSynced == true } + } + + override suspend fun unsubscribe() { + if (subscribed) { + subscribed = false + group.decrementRefCount() + } + } +} + +@Serializable +internal class RustSubscriptionChangeRequest( + // this is actually an enum with associated data, but this serializes into the form we want + // when only a single field is set. + val subscribe: SubscribeToStream? = null, + val unsubscribe: StreamKey? = null, +) + +@Serializable +internal class SubscribeToStream( + val stream: StreamKey, + val ttl: Int? = null, + val priority: StreamPriority? = null, +) + +@Serializable +internal data class StreamKey( + val name: String, + val params: JsonObject?, +) diff --git a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt index 43e7e127..78c384c7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt @@ -1,5 +1,13 @@ package com.powersync.db.crud +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.serialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -8,6 +16,7 @@ import kotlinx.serialization.json.contentOrNull import kotlinx.serialization.json.jsonPrimitive import kotlin.experimental.ExperimentalObjCRefinement import kotlin.native.HiddenFromObjC +import kotlin.time.Instant /** * A named collection of values as they appear in a SQLite row. @@ -55,14 +64,29 @@ private data class ToStringEntry( get() = inner.value.jsonPrimitive.contentOrNull } -private class TypedRow( - inner: JsonObject, +@Serializable(with = TypedRow.Serializer::class) +internal class TypedRow( + private val inner: JsonObject, ) : AbstractMap() { override val entries: Set> = inner.entries.mapTo( mutableSetOf(), ::ToTypedEntry, ) + + private object Serializer : KSerializer { + override val descriptor: SerialDescriptor + get() = serialDescriptor() + + override fun deserialize(decoder: Decoder): TypedRow = TypedRow(JsonObject.serializer().deserialize(decoder)) + + override fun serialize( + encoder: Encoder, + value: TypedRow, + ) { + encoder.encodeSerializableValue(JsonObject.serializer(), value.inner) + } + } } private data class ToTypedEntry( diff --git a/core/src/commonMain/kotlin/com/powersync/db/driver/InternalConnectionPool.kt b/core/src/commonMain/kotlin/com/powersync/db/driver/InternalConnectionPool.kt index b5258f2a..682d5163 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/driver/InternalConnectionPool.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/driver/InternalConnectionPool.kt @@ -36,33 +36,7 @@ internal class InternalConnectionPool( readOnly = false, ) - connection.execSQL("pragma journal_mode = WAL") - connection.execSQL("pragma journal_size_limit = ${6 * 1024 * 1024}") - connection.execSQL("pragma busy_timeout = 30000") - connection.execSQL("pragma cache_size = ${50 * 1024}") - - if (readOnly) { - connection.execSQL("pragma query_only = TRUE") - } - - // Older versions of the SDK used to set up an empty schema and raise the user version to 1. - // Keep doing that for consistency. - if (!readOnly) { - val version = - connection.prepare("pragma user_version").use { - require(it.step()) - if (it.isNull(0)) 0L else it.getLong(0) - } - if (version < 1L) { - connection.execSQL("pragma user_version = 1") - } - - // Also install a commit, rollback and update hooks in the core extension to implement - // the updates flow here (not all our driver implementations support hooks, so this is - // a more reliable fallback). - connection.execSQL("select powersync_update_hooks('install');") - } - + connection.setupDefaultPragmas(readOnly) return connection } @@ -75,13 +49,10 @@ internal class InternalConnectionPool( } finally { // When we've leased a write connection, we may have to update table update flows // after users ran their custom statements. - writeConnection.prepare("SELECT powersync_update_hooks('get')").use { - check(it.step()) - val updatedTables = JsonUtil.json.decodeFromString>(it.getText(0)) - if (updatedTables.isNotEmpty()) { - scope.launch { - tableUpdatesFlow.emit(updatedTables) - } + val updatedTables = writeConnection.readPendingUpdates() + if (updatedTables.isNotEmpty()) { + scope.launch { + tableUpdatesFlow.emit(updatedTables) } } } @@ -106,3 +77,39 @@ internal class InternalConnectionPool( readPool.close() } } + +internal fun SQLiteConnection.setupDefaultPragmas(readOnly: Boolean) { + execSQL("pragma journal_mode = WAL") + execSQL("pragma journal_size_limit = ${6 * 1024 * 1024}") + execSQL("pragma busy_timeout = 30000") + execSQL("pragma cache_size = ${50 * 1024}") + + if (readOnly) { + execSQL("pragma query_only = TRUE") + } + + // Older versions of the SDK used to set up an empty schema and raise the user version to 1. + // Keep doing that for consistency. + if (!readOnly) { + val version = + prepare("pragma user_version").use { + require(it.step()) + if (it.isNull(0)) 0L else it.getLong(0) + } + if (version < 1L) { + execSQL("pragma user_version = 1") + } + + // Also install a commit, rollback and update hooks in the core extension to implement + // the updates flow here (not all our driver implementations support hooks, so this is + // a more reliable fallback). + execSQL("select powersync_update_hooks('install');") + } +} + +internal fun SQLiteConnection.readPendingUpdates(): Set = + prepare("SELECT powersync_update_hooks('get')").use { + check(it.step()) + val updatedTables = JsonUtil.json.decodeFromString>(it.getText(0)) + updatedTables + } diff --git a/core/src/commonMain/kotlin/com/powersync/db/driver/SingleConnectionPool.kt b/core/src/commonMain/kotlin/com/powersync/db/driver/SingleConnectionPool.kt new file mode 100644 index 00000000..41337f96 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/driver/SingleConnectionPool.kt @@ -0,0 +1,58 @@ +package com.powersync.db.driver + +import androidx.sqlite.SQLiteConnection +import com.powersync.ExperimentalPowerSyncAPI +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * A [SQLiteConnectionPool] backed by a single database connection. + * + * This does not provide any concurrency, but is still a reasonable implementation to use for e.g. tests. + */ +@OptIn(ExperimentalPowerSyncAPI::class) +internal class SingleConnectionPool( + private val conn: SQLiteConnection, +) : SQLiteConnectionPool { + private val mutex: Mutex = Mutex() + private var closed = false + private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) + + init { + conn.setupDefaultPragmas(false) + } + + override suspend fun read(callback: suspend (SQLiteConnectionLease) -> T): T = write(callback) + + override suspend fun write(callback: suspend (SQLiteConnectionLease) -> T): T = + mutex.withLock { + check(!closed) { "Connection closed" } + + try { + callback(RawConnectionLease(conn)) + } finally { + val updates = conn.readPendingUpdates() + if (updates.isNotEmpty()) { + tableUpdatesFlow.emit(updates) + } + } + } + + override suspend fun withAllConnections( + action: suspend (writer: SQLiteConnectionLease, readers: List) -> R, + ) = write { writer -> + action(writer, emptyList()) + Unit + } + + override val updates: SharedFlow> + get() = tableUpdatesFlow + + override suspend fun close() { + mutex.withLock { + conn.close() + } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt index ced90401..fb0bbe65 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt @@ -1,6 +1,7 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -46,7 +47,11 @@ internal sealed interface Instruction { data object FlushSileSystem : Instruction - data object CloseSyncStream : Instruction + @Serializable + data class CloseSyncStream( + @SerialName("hide_disconnect") + val hideDisconnect: Boolean, + ) : Instruction data object DidCompleteSync : Instruction @@ -60,7 +65,7 @@ internal sealed interface Instruction { private val establishSyncStream = serializer() private val fetchCredentials = serializer() private val flushFileSystem = serializer() - private val closeSyncStream = serializer() + private val closeSyncStream = serializer() private val didCompleteSync = serializer() override val descriptor = @@ -88,7 +93,6 @@ internal sealed interface Instruction { } 5 -> { decodeSerializableElement(descriptor, 5, closeSyncStream) - CloseSyncStream } 6 -> { decodeSerializableElement(descriptor, 6, didCompleteSync) @@ -127,8 +131,31 @@ internal data class CoreSyncStatus( val downloading: CoreDownloadProgress?, @SerialName("priority_status") val priorityStatus: List, + val streams: List, ) +@Serializable +internal data class CoreActiveStreamSubscription( + override val name: String, + override val parameters: TypedRow?, + val priority: StreamPriority?, + val progress: ProgressInfo, + override val active: Boolean, + @SerialName("is_default") + override val isDefault: Boolean, + @SerialName("has_explicit_subscription") + override val hasExplicitSubscription: Boolean, + @SerialName("expires_at") + @Serializable(with = InstantTimestampSerializer::class) + override val expiresAt: Instant?, + @SerialName("last_synced_at") + @Serializable(with = InstantTimestampSerializer::class) + override val lastSyncedAt: Instant?, +) : SyncSubscriptionDescription { + override val hasSynced: Boolean + get() = lastSyncedAt != null +} + @Serializable internal data class CoreDownloadProgress( val buckets: Map, @@ -136,7 +163,7 @@ internal data class CoreDownloadProgress( @Serializable internal data class CoreBucketProgress( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("at_last") val atLast: Long, @SerialName("since_last") @@ -147,7 +174,7 @@ internal data class CoreBucketProgress( @Serializable internal data class CorePriorityStatus( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("last_synced_at") @Serializable(with = InstantTimestampSerializer::class) val lastSyncedAt: Instant?, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index d260624c..ae5833f2 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -1,8 +1,10 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.LocalOperationCounters +import com.powersync.bucket.StreamPriority +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable import kotlin.math.min /** @@ -41,8 +43,11 @@ public interface ProgressWithOperations { } } +@Serializable internal data class ProgressInfo( + @SerialName("downloaded") override val downloadedOperations: Int, + @SerialName("total") override val totalOperations: Int, ) : ProgressWithOperations @@ -73,7 +78,7 @@ public data class SyncDownloadProgress internal constructor( override val totalOperations: Int init { - val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY) + val (target, completed) = targetAndCompletedCounts(StreamPriority.FULL_SYNC_PRIORITY) totalOperations = target downloadedOperations = completed } @@ -127,7 +132,7 @@ public data class SyncDownloadProgress internal constructor( * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded * in total and how many of them have already been received. */ - public fun untilPriority(priority: BucketPriority): ProgressWithOperations { + public fun untilPriority(priority: StreamPriority): ProgressWithOperations { val (total, completed) = targetAndCompletedCounts(priority) return ProgressInfo(totalOperations = total, downloadedOperations = completed) } @@ -150,7 +155,7 @@ public data class SyncDownloadProgress internal constructor( }, ) - private fun targetAndCompletedCounts(priority: BucketPriority): Pair = + private fun targetAndCompletedCounts(priority: StreamPriority): Pair = buckets.values .asSequence() .filter { it.priority >= priority } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt new file mode 100644 index 00000000..c5977157 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt @@ -0,0 +1,102 @@ +package com.powersync.sync + +import com.powersync.bucket.StreamPriority +import kotlin.time.Duration +import kotlin.time.Instant + +public interface SyncStreamDescription { + /** + * THe name of the stream as it appears in the stream definition for the PowerSync service. + */ + public val name: String + + /** + * The parameters used to subscribe to the stream, if any. + * + * The same stream can be subscribed to multiple times with different parameters. + */ + public val parameters: Map? +} + +/** + * Information about a subscribed sync stream. + * + * This includes the [SyncStreamDescription] along with information about the current sync status. + */ +public interface SyncSubscriptionDescription : SyncStreamDescription { + /** + * Whether this stream is active, meaning that the subscription has been acknowledged by the + * sync service. + */ + public val active: Boolean + + /** + * Whether this stream subscription is included yb default, regardless of whether the stream has + * explicitly been subscribed to or not. + * + * Default streams are created by applying `auto_subscribe: true` in their definition on the + * sync service. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val isDefault: Boolean + + /** + * Whether this stream been subscribed to explicitly. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val hasExplicitSubscription: Boolean + + /** + * For sync streams that have a time-to-live, the current time at which the stream would expire + * if not subscribed to again. + */ + public val expiresAt: Instant? + + /** + * Whether this stream subscription has been synced at least once. + */ + public val hasSynced: Boolean + + /** + * If [hasSynced] is true, the last time data from this stream has been synced. + */ + public val lastSyncedAt: Instant? +} + +/** + * A handle to a [SyncStreamDescription] that allows subscribing to the stream. + * + * To obtain an instance of [SyncStream], call [com.powersync.PowerSyncDatabase.syncStream]. + */ +public interface SyncStream : SyncStreamDescription { + public suspend fun subscribe( + ttl: Duration? = null, + priority: StreamPriority? = null, + ): SyncStreamSubscription + + public suspend fun unsubscribeAll() +} + +/** + * A [SyncStream] that has been subscribed to. + */ +public interface SyncStreamSubscription : SyncStreamDescription { + /** + * A variant of [com.powersync.PowerSyncDatabase.waitForFirstSync] that is specific to this + * stream subscription. + */ + public suspend fun waitForFirstSync() + + /** + * Removes this subscription. + * + * Once all [SyncStreamSubscription]s for a [SyncStream] have been unsubscribed, the `ttl` for + * that stream starts running. When it expires without subscribing again, the stream will be + * evicted. + */ + public suspend fun unsubscribe() +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt similarity index 93% rename from core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt rename to core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 00ead40f..10997a34 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -12,10 +12,11 @@ import com.powersync.bucket.Checkpoint import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.db.SubscriptionGroup import com.powersync.db.crud.CrudEntry import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.SyncStream.Companion.SOCKET_TIMEOUT +import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig @@ -52,6 +53,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map @@ -102,7 +104,7 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA } @OptIn(ExperimentalPowerSyncAPI::class) -internal class SyncStream( +internal class StreamingSyncClient( private val bucketStorage: BucketStorage, private val connector: PowerSyncBackendConnector, private val uploadCrud: suspend () -> Unit, @@ -112,6 +114,7 @@ internal class SyncStream( private val uploadScope: CoroutineScope, private val options: SyncOptions, private val schema: Schema, + private val activeSubscriptions: StateFlow>, ) { private var isUploadingCrud = AtomicReference(null) private var completedCrudUploads = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) @@ -148,8 +151,14 @@ internal class SyncStream( suspend fun streamingSync() { var invalidCredentials = false clientId = bucketStorage.getClientId() + var result = SyncIterationResult() + while (true) { - status.update { copy(connecting = true) } + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connecting = true) } + } + result = SyncIterationResult() + try { if (invalidCredentials) { // This may error. In that case it will be retried again on the next @@ -157,7 +166,7 @@ internal class SyncStream( connector.invalidateCredentials() invalidCredentials = false } - streamingSyncIteration() + result = streamingSyncIteration() } catch (e: Exception) { if (e is CancellationException) { throw e @@ -166,8 +175,10 @@ internal class SyncStream( logger.e("Error in streamingSync: ${e.message}") status.update { copy(downloadError = e) } } finally { - status.update { copy(connected = false, connecting = true, downloading = false) } - delay(retryDelayMs) + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connected = false, connecting = true, downloading = false) } + delay(retryDelayMs) + } } } } @@ -329,7 +340,7 @@ internal class SyncStream( } } - private suspend fun streamingSyncIteration() { + private suspend fun streamingSyncIteration(): SyncIterationResult = coroutineScope { if (options.newClientImplementation) { val iteration = ActiveIteration(this) @@ -345,9 +356,9 @@ internal class SyncStream( } } else { legacySyncIteration() + SyncIterationResult() } } - } @OptIn(LegacySyncImplementation::class) private suspend fun CoroutineScope.legacySyncIteration() { @@ -363,26 +374,42 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, - var fetchLinesJob: Job? = null, - var credentialsInvalidation: Job? = null, ) { + var fetchLinesJob: Job? = null + var credentialsInvalidation: Job? = null + // Using a channel for control invocations so that they're handled by a single coroutine, // avoiding races between concurrent jobs like fetching credentials. private val controlInvocations = Channel() + private var result = SyncIterationResult() private suspend fun invokeControl(args: PowerSyncControlArguments) { val instructions = bucketStorage.control(args) instructions.forEach { handleInstruction(it) } } - suspend fun start() { + suspend fun start(): SyncIterationResult { + var subscriptions = activeSubscriptions.value + invokeControl( PowerSyncControlArguments.Start( parameters = params, schema = schema.toSerializable(), + includeDefaults = options.includeDefaultStreams, + activeStreams = subscriptions.map { it.key }, ), ) + val listenForUpdatedSubscriptions = + scope.launch { + activeSubscriptions.collect { + if (subscriptions !== it) { + subscriptions = it + controlInvocations.send(PowerSyncControlArguments.UpdateSubscriptions(activeSubscriptions.value.map { it.key })) + } + } + } + var hadSyncLine = false for (line in controlInvocations) { val instructions = bucketStorage.control(line) @@ -398,6 +425,9 @@ internal class SyncStream( triggerCrudUploadAsync() } } + + listenForUpdatedSubscriptions.cancel() + return result } suspend fun stop() { @@ -429,8 +459,10 @@ internal class SyncStream( } } - Instruction.CloseSyncStream -> { - logger.v { "Closing sync stream connection" } + is Instruction.CloseSyncStream -> { + val hideDisconnect = instruction.hideDisconnect + logger.v { "Closing sync stream connection. Hide disconnect: $hideDisconnect" } + result = SyncIterationResult(hideDisconnect) fetchLinesJob!!.cancelAndJoin() fetchLinesJob = null logger.v { "Sync stream connection shut down" } @@ -771,7 +803,7 @@ internal class SyncStream( } } - internal companion object { + internal companion object Companion { // The sync service sends a token keepalive message roughly every 20 seconds. So if we don't receive a message // in twice that time, assume the connection is broken. internal const val SOCKET_TIMEOUT: Long = 40_000 @@ -854,3 +886,7 @@ internal data class SyncStreamState( private class PendingCrudUpload( val done: CompletableDeferred, ) + +private class SyncIterationResult( + val hideDisconnectStateAndReconnectImmediately: Boolean = false, +) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt index a9a9a4f3..557b8346 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt @@ -1,9 +1,9 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -39,7 +39,7 @@ internal sealed interface SyncLine { @Serializable data class CheckpointPartiallyComplete( @SerialName("last_op_id") val lastOpId: String, - @SerialName("priority") val priority: BucketPriority, + @SerialName("priority") val priority: StreamPriority, ) : SyncLine @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index c8c89f5b..30902d3d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -57,6 +57,11 @@ public class SyncOptions * Allows configuring the [HttpClient] used for connecting to the PowerSync service. */ public val clientConfiguration: SyncClientConfiguration? = null, + /** + * Whether streams that have been defined with `auto_subscribe: true` should be synced even + * when they don't have an explicit subscription. + */ + public val includeDefaultStreams: Boolean = true, ) { public companion object { /** diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 180e1429..9bdc9354 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,6 +1,7 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow @@ -10,32 +11,32 @@ import kotlin.time.Instant @ConsistentCopyVisibility public data class PriorityStatusEntry internal constructor( - val priority: BucketPriority, + val priority: StreamPriority, val lastSyncedAt: Instant?, val hasSynced: Boolean?, ) -public interface SyncStatusData { +public sealed class SyncStatusData { /** * true if currently connected. * * This means the PowerSync connection is ready to download, and [PowerSyncBackendConnector.uploadData] may be called for any local changes. */ - public val connected: Boolean + public abstract val connected: Boolean /** * true if the PowerSync connection is busy connecting. * * During this stage, [PowerSyncBackendConnector.uploadData] may already be called, and [uploading] may be true. */ - public val connecting: Boolean + public abstract val connecting: Boolean /** * true if actively downloading changes. * * This is only true when [connected] is also true. */ - public val downloading: Boolean + public abstract val downloading: Boolean /** * Realtime progress information about downloaded operations during an active sync. @@ -44,45 +45,45 @@ public interface SyncStatusData { * For more information on what progress is reported, see [SyncDownloadProgress]. * This value will be non-null only if [downloading] is true. */ - public val downloadProgress: SyncDownloadProgress? + public abstract val downloadProgress: SyncDownloadProgress? /** * true if uploading changes */ - public val uploading: Boolean + public abstract val uploading: Boolean /** * Time that a last sync has fully completed, if any. * * Currently this is reset to null after a restart. */ - public val lastSyncedAt: Instant? + public abstract val lastSyncedAt: Instant? /** * Indicates whether there has been at least one full sync, if any. * * Is null when unknown, for example when state is still being loaded from the database. */ - public val hasSynced: Boolean? + public abstract val hasSynced: Boolean? /** * Error during uploading. * * Cleared on the next successful upload. */ - public val uploadError: Any? + public abstract val uploadError: Any? /** * Error during downloading (including connecting). * * Cleared on the next successful data download. */ - public val downloadError: Any? + public abstract val downloadError: Any? /** * Convenience getter for either the value of downloadError or uploadError */ - public val anyError: Any? + public abstract val anyError: Any? /** * Available [PriorityStatusEntry] reporting the sync status for buckets within priorities. @@ -91,12 +92,14 @@ public interface SyncStatusData { * and [lastSyncedAt] are set to indicate that a partial (but no complete) sync has completed. * A completed [PriorityStatusEntry] at one priority level always includes all higher priorities too. */ - public val priorityStatusEntries: List + public abstract val priorityStatusEntries: List + + internal abstract val internalSubscriptions: List? /** * Status information for whether buckets in [priority] have been synchronized. */ - public fun statusForPriority(priority: BucketPriority): PriorityStatusEntry { + public fun statusForPriority(priority: StreamPriority): PriorityStatusEntry { val byDescendingPriorities = priorityStatusEntries.sortedByDescending { it.priority } for (entry in byDescendingPriorities) { @@ -110,6 +113,38 @@ public interface SyncStatusData { // A complete sync necessarily includes all priorities. return PriorityStatusEntry(priority, lastSyncedAt, hasSynced) } + + /** + * All sync streams currently being tracked in the database. + * + * This returns null when the database is currently being opened and we don't have reliable + * information about included streams yet. + */ + @ExperimentalPowerSyncAPI + public val syncStreams: List? get() = internalSubscriptions?.map(this::exposeStreamStatus) + + /** + * Status information for [stream], if it's a stream that is currently tracked by the sync + * client. + */ + @ExperimentalPowerSyncAPI + public fun forStream(stream: SyncStreamDescription): SyncStreamStatus? { + val raw = internalSubscriptions?.firstOrNull { it.name == stream.name && it.parameters == stream.parameters } ?: return null + return exposeStreamStatus(raw) + } + + private fun exposeStreamStatus(internal: CoreActiveStreamSubscription): SyncStreamStatus { + val progress = + if (this.downloadProgress == null) { + null + } else { + // The core extension will always give us progress numbers, but we should only expose + // them when that makes sense (i.e. we're actually downloading). + internal.progress + } + + return SyncStreamStatus(progress, internal) + } } internal data class SyncStatusDataContainer( @@ -123,12 +158,13 @@ internal data class SyncStatusDataContainer( override val uploadError: Any? = null, override val downloadError: Any? = null, override val priorityStatusEntries: List = emptyList(), -) : SyncStatusData { + override val internalSubscriptions: List = emptyList(), +) : SyncStatusData() { override val anyError get() = downloadError ?: uploadError internal fun applyCoreChanges(status: CoreSyncStatus): SyncStatusDataContainer { - val completeSync = status.priorityStatus.firstOrNull { it.priority == BucketPriority.FULL_SYNC_PRIORITY } + val completeSync = status.priorityStatus.firstOrNull { it.priority == StreamPriority.FULL_SYNC_PRIORITY } return copy( connected = status.connected, @@ -145,6 +181,7 @@ internal data class SyncStatusDataContainer( hasSynced = it.hasSynced, ) }, + internalSubscriptions = status.streams, ) } @@ -169,7 +206,7 @@ internal data class SyncStatusDataContainer( @ConsistentCopyVisibility public data class SyncStatus internal constructor( private var data: SyncStatusDataContainer = SyncStatusDataContainer(), -) : SyncStatusData { +) : SyncStatusData() { private val stateFlow: MutableStateFlow = MutableStateFlow(data) /** @@ -224,6 +261,9 @@ public data class SyncStatus internal constructor( override val priorityStatusEntries: List get() = data.priorityStatusEntries + override val internalSubscriptions: List + get() = data.internalSubscriptions + override fun toString(): String = "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced=$hasSynced, error=$anyError)" @@ -231,3 +271,30 @@ public data class SyncStatus internal constructor( public fun empty(): SyncStatus = SyncStatus() } } + +/** + * Current information about a [SyncStreamSubscription]. + */ +@ConsistentCopyVisibility +public data class SyncStreamStatus internal constructor( + /** + * If the sync status is currently downloading, information about download progress related to + * this stream. + */ + val progress: ProgressWithOperations?, + internal val internal: CoreActiveStreamSubscription, +) { + /** + * The [SyncSubscriptionDescription] providing information about the subscription. + */ + val subscription: SyncSubscriptionDescription + get() = internal + + /** + * The priority of this stream. + * + * New data on higher-priority streams can interrupt low-priority streams. + */ + val priority: StreamPriority + get() = internal.priority ?: StreamPriority.FULL_SYNC_PRIORITY +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt similarity index 88% rename from core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt rename to core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt index 3872da0a..5592dfa6 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt @@ -13,7 +13,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.schema.Schema -import com.powersync.sync.SyncStream.Companion.bsonObjects +import com.powersync.sync.StreamingSyncClient.Companion.bsonObjects import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock @@ -24,6 +24,7 @@ import io.ktor.client.engine.mock.MockEngine import io.ktor.utils.io.ByteChannel import io.ktor.utils.io.writeByteArray import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout @@ -34,10 +35,10 @@ import kotlin.test.assertContains import kotlin.test.assertEquals @OptIn(ExperimentalKermitApi::class, ExperimentalPowerSyncAPI::class) -class SyncStreamTest { +class StreamingSyncClientTest { private lateinit var bucketStorage: BucketStorage private lateinit var connector: PowerSyncBackendConnector - private lateinit var syncStream: SyncStream + private lateinit var streamingSyncClient: StreamingSyncClient private val testLogWriter = TestLogWriter( loggable = Severity.Verbose, @@ -66,8 +67,8 @@ class SyncStreamTest { @Test fun testInvalidateCredentials() = runTest { - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = {}, @@ -84,10 +85,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) connector.cachedCredentials = TestConnector.testCredentials - syncStream.invalidateCredentials() + streamingSyncClient.invalidateCredentials() connector.cachedCredentials shouldBe null } @@ -109,8 +111,8 @@ class SyncStreamTest { everySuspend { nextCrudItem() } returns mockCrudEntry } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -128,10 +130,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) - syncStream.status.update { copy(connected = true) } - syncStream.triggerCrudUploadAsync().join() + streamingSyncClient.status.update { copy(connected = true) } + streamingSyncClient.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -160,8 +163,8 @@ class SyncStreamTest { everySuspend { getClientId() } returns "test-client-id" } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -179,24 +182,25 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) // Launch streaming sync in a coroutine that we'll cancel after verification val job = launch { - syncStream.streamingSync() + streamingSyncClient.streamingSync() } // Wait for status to update withTimeout(1000) { - while (!syncStream.status.connecting) { + while (!streamingSyncClient.status.connecting) { delay(10) } } // Verify initial state - assertEquals(true, syncStream.status.connecting) - assertEquals(false, syncStream.status.connected) + assertEquals(true, streamingSyncClient.status.connecting) + assertEquals(false, streamingSyncClient.status.connected) // Clean up job.cancel() diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt index 5236d1e0..c5fff921 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt @@ -1,8 +1,8 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint +import com.powersync.bucket.StreamPriority import com.powersync.utils.JsonUtil import kotlin.test.Test import kotlin.test.assertEquals @@ -35,7 +35,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(3), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(3), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}""", @@ -48,7 +48,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(1), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(1), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}""", @@ -77,7 +77,7 @@ class SyncLineTest { checkDeserializing( SyncLine.CheckpointPartiallyComplete( lastOpId = "10", - priority = BucketPriority(1), + priority = StreamPriority(1), ), """{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}""", ) diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 48cf0dfd..20c0dc12 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -27,6 +27,7 @@ import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume +import kotlinx.serialization.json.JsonElement /** * A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel]. @@ -37,7 +38,7 @@ import kotlinx.coroutines.channels.consume */ @OptIn(LegacySyncImplementation::class) internal class MockSyncService( - private val lines: ReceiveChannel, + private val lines: () -> ReceiveChannel, private val syncLinesContentType: () -> ContentType, private val generateCheckpoint: () -> WriteCheckpointResponse, private val trackSyncRequest: suspend (HttpRequestData) -> Unit, @@ -59,16 +60,19 @@ internal class MockSyncService( trackSyncRequest(data) val job = scope.writer { - lines.consume { + lines().consume { while (true) { // Wait for a downstream listener being ready before requesting a sync line channel.awaitFreeSpace() - val line = receive() - when (line) { + when (val line = receive()) { is SyncLine -> { val serializedLine = JsonUtil.json.encodeToString(line) channel.writeStringUtf8("$serializedLine\n") } + is JsonElement -> { + val serializedLine = JsonUtil.json.encodeToString(line) + channel.writeStringUtf8("$serializedLine\n") + } is ByteArray -> { channel.writeByteArray(line) } diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index 68a9bc47..3467e699 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -1,6 +1,7 @@ package com.powersync import androidx.sqlite.SQLiteConnection +import androidx.sqlite.driver.bundled.BundledSQLiteConnection import androidx.sqlite.driver.bundled.BundledSQLiteDriver import com.powersync.db.runWrapped @@ -25,3 +26,5 @@ private val powersyncExtension: String by lazy { extractLib("powersync") } @ExperimentalPowerSyncAPI @Throws(PowerSyncException::class) public actual fun resolvePowerSyncLoadableExtensionPath(): String? = runWrapped { powersyncExtension } + +internal actual fun openInMemoryConnection(): SQLiteConnection = DatabaseDriverFactory().openConnection(":memory:", 0x02) diff --git a/core/src/watchosMain/kotlin/com/powersync/DatabaseDriverFactory.watchos.kt b/core/src/watchosMain/kotlin/com/powersync/DatabaseDriverFactory.watchos.kt index 10f73537..29bd81ee 100644 --- a/core/src/watchosMain/kotlin/com/powersync/DatabaseDriverFactory.watchos.kt +++ b/core/src/watchosMain/kotlin/com/powersync/DatabaseDriverFactory.watchos.kt @@ -38,3 +38,5 @@ public actual fun resolvePowerSyncLoadableExtensionPath(): String? { didLoadExtension return null } + +internal actual fun openInMemoryConnection(): SQLiteConnection = DatabaseDriverFactory().openConnection(":memory:", 0x02) diff --git a/demos/android-supabase-todolist/build.gradle.kts b/demos/android-supabase-todolist/build.gradle.kts index 391e8711..737ffd68 100644 --- a/demos/android-supabase-todolist/build.gradle.kts +++ b/demos/android-supabase-todolist/build.gradle.kts @@ -127,7 +127,7 @@ dependencies { // When adopting the PowerSync dependencies into your project, use the latest version available at // https://central.sonatype.com/artifact/com.powersync/core implementation(projects.core) // "com.powersync:core:latest.release" - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase:latest.release" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase:latest.release" implementation(projects.compose) // "com.powersync:compose:latest.release" implementation(libs.uuid) implementation(libs.kermit) diff --git a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt index 2b95c3e3..0cb1b4a1 100644 --- a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt +++ b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt @@ -13,7 +13,7 @@ import androidx.compose.runtime.Composable import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.Input import com.powersync.demos.components.ListContent @@ -66,7 +66,7 @@ internal fun HomeScreen( // When giving lists a higher priority than items, we can have a consistent snapshot of // lists without items. In the case where many items exist (that might take longer to // sync initially), this allows us to display lists earlier. - if (status.statusForPriority(BucketPriority(1)).hasSynced == true) { + if (status.statusForPriority(StreamPriority(1)).hasSynced == true) { ListContent( items = items, onItemClicked = onItemClicked, diff --git a/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts b/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts index d8144eed..70fad978 100644 --- a/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts +++ b/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts @@ -44,7 +44,7 @@ android { dependencies { // When copying this example, use the the current version available // at: https://central.sonatype.com/artifact/com.powersync/connector-supabase - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase" implementation(projects.demos.supabaseTodolist.shared) diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts index 43b4a7a7..3a0fe227 100644 --- a/demos/supabase-todolist/shared/build.gradle.kts +++ b/demos/supabase-todolist/shared/build.gradle.kts @@ -42,7 +42,7 @@ kotlin { // When copying this example, use the current version available // at: https://central.sonatype.com/artifact/com.powersync/core api(projects.core) // "com.powersync:core" - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase" implementation(projects.compose) // "com.powersync:compose" implementation(libs.uuid) implementation(compose.runtime) diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt index 52e5b521..9ebd9596 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -15,7 +15,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.unit.dp import com.powersync.PowerSyncDatabase -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.compose.composeState import com.powersync.sync.SyncStatusData import org.koin.compose.koinInject @@ -28,7 +28,7 @@ import org.koin.compose.koinInject @Composable fun GuardBySync( db: PowerSyncDatabase = koinInject(), - priority: BucketPriority? = null, + priority: StreamPriority? = null, content: @Composable () -> Unit ) { val state: SyncStatusData by db.currentStatus.composeState() diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index 4e53852e..97eb9dce 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -1,22 +1,18 @@ package com.powersync.demos.screens -import androidx.compose.foundation.background import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.Spacer -import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.width -import androidx.compose.material.MaterialTheme import androidx.compose.material.Text import androidx.compose.material.TopAppBar import androidx.compose.runtime.Composable -import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input @@ -64,7 +60,7 @@ internal fun HomeScreen( // than items, we can have a consistent snapshot of lists without items. In the case where // many items exist (that might take longer to sync initially), this allows us to display // lists earlier. - GuardBySync(priority = BucketPriority(1)) { + GuardBySync(priority = StreamPriority(1)) { Input( text = inputText, onAddClicked = onAddItemClicked, diff --git a/gradle.properties b/gradle.properties index 72b5be9e..a2776781 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.6.1 +LIBRARY_VERSION=1.7.0 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/ diff --git a/integrations/room/README.md b/integrations/room/README.md index 31925dd1..9b6ce1bd 100644 --- a/integrations/room/README.md +++ b/integrations/room/README.md @@ -3,62 +3,6 @@ > [!NOTE] > Note that this package is currently in alpha. -This module provides the ability to use PowerSync with Room databases. This module aims for complete -Room support, meaning that: +This module integrates PowerSync with Room databases. This allows you run typed queries against the local database with compile-time validation. -1. Changes synced from PowerSync automatically update your Room `Flow`s. -2. Room and PowerSync cooperate on the write connection, avoiding "database is locked errors". -3. Changes from Room trigger a CRUD upload. - -For more details on using this module, see its page on the [PowerSync documentation](https://docs.powersync.com/client-sdk-references/kotlin-multiplatform/libraries/room). - -## Setup - -Add a dependency on `com.powersync:integration-room` with the same version you use for the main -PowerSync SDK. - -PowerSync can use an existing Room database, provided that the PowerSync core SQLite extension has -been loaded. To do that: - -1. Add a dependency on `androidx.sqlite:sqlite-bundled`. Using the SQLite version from the Android - framework will not work as it doesn't support loading extensions. -2. On your `RoomDatabase.Builder`, call `setDriver()` with a PowerSync-enabled driver: - ```Kotlin - val driver = BundledSQLiteDriver().also { - it.loadPowerSyncExtension() // Extension method by this module - } - - Room.databaseBuilder(...).setDriver(driver).build() - ``` -3. Configure raw tables for your Room databases. - -After these steps, you can open your Room database like you normally would. Then, you can use the -following method to obtain a `PowerSyncDatabase` instance which is backed by Room: - -```Kotlin -// With Room, you need to use raw tables (https://docs.powersync.com/usage/use-case-examples/raw-tables). -// This is because Room verifies your schema at runtime, and PowerSync-managed views will not -// pass those checks. -val schema = Schema(...) -val pool = RoomConnectionPool(yourRoomDatabase, schema) -val powersync = PowerSyncDatabase.opened( - pool = pool, - scope = this, - schema = schema, - identifier = "databaseName", // Prefer to use the same path/name as your Room database - logger = Logger, -) -powersync.connect(...) -``` - -Changes from PowerSync (regardless of whether they've been made with `powersync.execute` or from a -sync operation) will automatically trigger updates in Room. - -To also transfer local writes to PowerSync, you need to - -1. Create triggers on your Room tables to insert into `ps_crud` (see the - [PowerSync documentation on raw tables](https://docs.powersync.com/usage/use-case-examples/raw-tables#capture-local-writes-with-triggers) - for details). -2. Pass the schema as a second parameter to the `RoomConnectionPool` constructor. This will make the - pool notify PowerSync on Room writes for every raw table mentioned in the schema. - Alternatively, call `transferPendingRoomUpdatesToPowerSync` after writes in Room. +For details on using this integration, see its page on the [PowerSync documentation](https://docs.powersync.com/client-sdk-references/kotlin-multiplatform/libraries/room). \ No newline at end of file diff --git a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt index e9267193..49b44ec6 100644 --- a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt +++ b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt @@ -2,7 +2,9 @@ package com.powersync.integrations.room import androidx.sqlite.driver.bundled.BundledSQLiteDriver import app.cash.turbine.turbineScope +import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity import co.touchlab.kermit.loggerConfigInit import com.powersync.PowerSyncDatabase import com.powersync.db.getString @@ -28,6 +30,7 @@ class PowerSyncRoomTest { @AfterTest fun tearDown() { + logger.i { "Closing Room database" } database.close() } @@ -35,7 +38,6 @@ class PowerSyncRoomTest { fun roomWritePowerSyncRead() = runTest { database.userDao().create(User(id = "test", name = "Test user")) - val logger = Logger(loggerConfigInit()) val powersync = PowerSyncDatabase.opened( @@ -61,7 +63,6 @@ class PowerSyncRoomTest { @Test fun roomWritePowerSyncWatch() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -88,12 +89,13 @@ class PowerSyncRoomTest { turbine.awaitItem() shouldHaveSize 1 turbine.cancel() } + + powersync.close() } @Test fun powersyncWriteRoomRead() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -108,12 +110,12 @@ class PowerSyncRoomTest { database.userDao().getAll() shouldHaveSize 0 powersync.execute("insert into user values (uuid(), ?)", listOf("PowerSync user")) database.userDao().getAll() shouldHaveSize 1 + powersync.close() } @Test fun powersyncWriteRoomWatch() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -133,5 +135,11 @@ class PowerSyncRoomTest { turbine.awaitItem() shouldHaveSize 1 turbine.cancel() } + + powersync.close() } + + companion object { + private val logger = Logger(loggerConfigInit(CommonWriter())) + } } diff --git a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt index 17bc4f6a..1dffff4a 100644 --- a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt +++ b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt @@ -37,7 +37,7 @@ interface UserDao { suspend fun delete(user: User) } -@Database(entities = [User::class], version = 1) +@Database(entities = [User::class], version = 1, exportSchema = false) @ConstructedBy(TestDatabaseConstructor::class) abstract class TestDatabase : RoomDatabase() { abstract fun userDao(): UserDao diff --git a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt index 3f8aad30..6da84ca3 100644 --- a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt +++ b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt @@ -5,6 +5,7 @@ import androidx.room.Transactor import androidx.room.execSQL import androidx.room.useReaderConnection import androidx.room.useWriterConnection +import androidx.sqlite.SQLiteException import androidx.sqlite.SQLiteStatement import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.driver.SQLiteConnectionPool @@ -73,7 +74,18 @@ public class RoomConnectionPool( db.getCoroutineScope().launch { val tables = schema.rawTables.map { it.name }.toTypedArray() db.invalidationTracker.createFlow(*tables, emitInitialState = false).collect { - transferPendingRoomUpdatesToPowerSync() + try { + transferPendingRoomUpdatesToPowerSync() + } catch (e: SQLiteException) { + // It can happen that we get an update shortly before the database is closed. Since this is + // asynchronous, we'd then be using the database in a closed state, which fails. We handle that by + // stopping the flow collection. + if (e.message == "Connection pool is closed") { + return@collect + } + + throw e + } } } } @@ -91,7 +103,7 @@ public class RoomConnectionPool( val changed = it.usePrepared("SELECT powersync_update_hooks('get')") { stmt -> check(stmt.step()) - json.decodeFromString>(stmt.getText(0)) + Json.decodeFromString>(stmt.getText(0)) } val userTables = @@ -114,10 +126,6 @@ public class RoomConnectionPool( override suspend fun close() { // Noop, Room database managed independently } - - private companion object { - val json = Json {} - } } private class RoomTransactionLease( diff --git a/integrations/sqldelight/README.md b/integrations/sqldelight/README.md index de94ad7f..4e8a7399 100644 --- a/integrations/sqldelight/README.md +++ b/integrations/sqldelight/README.md @@ -3,65 +3,6 @@ > [!NOTE] > Note that this package is currently in beta. -This library provides the `PowerSyncDriver` class, which implements an `SqlDriver` for `SQLDelight` -backed by PowerSync. +This library integrates PowerSync with SQLDelight. This allows you run typed queries against the local database with compile-time validation. It provides the `PowerSyncDriver` class, which implements a `SqlDriver` for `SQLDelight` backed by PowerSync. -For more details on using this module, see its page on the [PowerSync documentation](https://docs.powersync.com/client-sdk-references/kotlin-multiplatform/libraries/sqldelight). - -## Setup - -Add a dependency on `com.powersync:integration-sqldelight`, using the same version you use for the -PowerSync SDK. - -## Usage - -To get started, ensure that SQLDelight is not linking sqlite3 (the PowerSync SDK takes care of that, -and you don't want to link it twice). Also, ensure the async generator is active because the -PowerSync driver does not support synchronous reads: - -```kotlin -sqldelight { - databases { - linkSqlite.set(false) - - create("MyAppDatabase") { - generateAsync.set(true) - deriveSchemaFromMigrations.set(false) - - dialect("app.cash.sqldelight:sqlite-3-38-dialect") - } - } -} -``` - -Next, define your tables in `.sq` files (but note that the `CREATE TABLE` statement won't be used, -PowerSync creates JSON-backed views for tables instead). -Open a PowerSync database [in the usual way](https://docs.powersync.com/client-sdk-references/kotlin-multiplatform#getting-started) -and finally pass it to the constructor of your generated SQLDelight database: - -```kotlin -val db: PowerSyncDatabase = openPowerSyncDatabase() -val yourSqlDelightDatabase = YourDatabase(PowerSyncDriver(db)) -``` - -Afterwards, writes on both databases (the original `PowerSyncDatabase` instance and the SQLDelight -database) will be visible to each other, update each other's query flows and will get synced -properly. - -## Limitations - -Please note that this library is currently in alpha. It is tested, but API changes are still -possible. - -There are also some limitations to be aware of: - -1. Due to historical reasons, the PowerSync SDK migrates all databases to `user_version` 1 when - created (but it will never downgrade a database). - So if you want to use SQLDelight's schema tools, the first version would have to be `2`. -2. The `CREATE TABLE` statements in your `.sq` files are only used at build time to verify your - queries. At runtime, PowerSync will create tables from your schema as views, the defined - statements are ignored. - If you want to use the schema managed by SQLDelight, configure PowerSync to use - [raw tables](https://docs.powersync.com/usage/use-case-examples/raw-tables). -3. Functions and tables contributed by the PowerSync core extension are not visible to `.sq` files - at the moment. We might revisit this with a custom dialect in the future. +For details on using this integration, see its page on the [PowerSync documentation](https://docs.powersync.com/client-sdk-references/kotlin-multiplatform/libraries/sqldelight). \ No newline at end of file diff --git a/integrations/sqldelight/build.gradle.kts b/integrations/sqldelight/build.gradle.kts index 77007786..d9eb1aba 100644 --- a/integrations/sqldelight/build.gradle.kts +++ b/integrations/sqldelight/build.gradle.kts @@ -22,21 +22,21 @@ kotlin { implementation(libs.kotlinx.coroutines.core) } - commonTest.dependencies { - // Separate project because SQLDelight can't generate code in test source sets. - implementation(projects.integrations.sqldelightTestDatabase) + val commonIntegrationTest by creating { + dependsOn(commonTest.get()) - implementation(libs.kotlin.test) - implementation(libs.kotlinx.io) - implementation(libs.test.turbine) - implementation(libs.test.coroutines) - implementation(libs.test.kotest.assertions) + dependencies { + // Separate project because SQLDelight can't generate code in test source sets. + implementation(projects.integrations.sqldelightTestDatabase) - implementation(libs.sqldelight.coroutines) - } + implementation(libs.kotlin.test) + implementation(libs.kotlinx.io) + implementation(libs.test.turbine) + implementation(libs.test.coroutines) + implementation(libs.test.kotest.assertions) - val commonIntegrationTest by creating { - dependsOn(commonTest.get()) + implementation(libs.sqldelight.coroutines) + } } // The PowerSync SDK links the core extension, so we can just run tests as-is. diff --git a/integrations/sqldelight/src/appleTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.apple.kt b/integrations/sqldelight/src/appleTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.apple.kt deleted file mode 100644 index 8cc9d44d..00000000 --- a/integrations/sqldelight/src/appleTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.apple.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.powersync.integrations.sqldelight - -import com.powersync.DatabaseDriverFactory - -actual fun databaseDriverFactory(): DatabaseDriverFactory = DatabaseDriverFactory() diff --git a/integrations/sqldelight/src/commonIntegrationTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.kt b/integrations/sqldelight/src/commonIntegrationTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.kt index f3edf259..2cdbf85a 100644 --- a/integrations/sqldelight/src/commonIntegrationTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.kt +++ b/integrations/sqldelight/src/commonIntegrationTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.kt @@ -144,12 +144,9 @@ class SqlDelightTest { private fun databaseTest(body: suspend TestScope.(PowerSyncDatabase) -> Unit) { runTest { - val allowedChars = ('A'..'Z') + ('a'..'z') + ('0'..'9') - val suffix = CharArray(8) { allowedChars.random() }.concatToString() - val db = - PowerSyncDatabase( - databaseDriverFactory(), + PowerSyncDatabase.inMemory( + scope = this, schema = Schema( Table( @@ -160,13 +157,9 @@ private fun databaseTest(body: suspend TestScope.(PowerSyncDatabase) -> Unit) { ), ), ), - dbFilename = "db-$suffix", - dbDirectory = SystemTemporaryDirectory.toString(), ) body(db) db.close() } } - -expect fun databaseDriverFactory(): DatabaseDriverFactory diff --git a/integrations/sqldelight/src/jvmTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.jvm.kt b/integrations/sqldelight/src/jvmTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.jvm.kt deleted file mode 100644 index 8cc9d44d..00000000 --- a/integrations/sqldelight/src/jvmTest/kotlin/com/powersync/integrations/sqldelight/SqlDelightTest.jvm.kt +++ /dev/null @@ -1,5 +0,0 @@ -package com.powersync.integrations.sqldelight - -import com.powersync.DatabaseDriverFactory - -actual fun databaseDriverFactory(): DatabaseDriverFactory = DatabaseDriverFactory() diff --git a/connectors/README.md b/integrations/supabase/README.md similarity index 57% rename from connectors/README.md rename to integrations/supabase/README.md index 0c9a09c4..76218687 100644 --- a/connectors/README.md +++ b/integrations/supabase/README.md @@ -1,16 +1,12 @@ -# PowerSync Backend Connectors +# PowerSync Supabase Connector -Convenience implementations of backend connectors that provide the connection between your application backend and the PowerSync managed database. +Convenience implementation of a backend connector that provide the connection between your application backend and the PowerSync managed database +by delegating to Supabase. It is used to: 1. Retrieve a token to connect to the PowerSync service. 2. Apply local changes on your backend application server (and from there, to your backend database). +The connector is fairly basic, and also serves as an example for getting started. -## Provided Connectors - -### Supabase (Postgres) - -A basic implementation of a PowerSync Backend Connector for Supabase, that serves as getting started example. - -See a step-by-step tutorial for connecting to Supabase, [here](https://docs.powersync.com/integration-guides/supabase-+-powersync). \ No newline at end of file +See a step-by-step tutorial for connecting to Supabase, [here](https://docs.powersync.com/integration-guides/supabase-+-powersync). diff --git a/connectors/supabase/build.gradle.kts b/integrations/supabase/build.gradle.kts similarity index 58% rename from connectors/supabase/build.gradle.kts rename to integrations/supabase/build.gradle.kts index 0410af6a..9004bbcb 100644 --- a/connectors/supabase/build.gradle.kts +++ b/integrations/supabase/build.gradle.kts @@ -7,6 +7,7 @@ plugins { alias(libs.plugins.android.library) alias(libs.plugins.kotlinter) id("com.powersync.plugins.sonatype") + id("com.powersync.plugins.sharedbuild") id("dokka-convention") } @@ -22,15 +23,37 @@ kotlin { } explicitApi() + applyDefaultHierarchyTemplate() sourceSets { commonMain.dependencies { - api(project(":core")) + api(projects.core) implementation(libs.kotlinx.coroutines.core) implementation(libs.supabase.client) api(libs.supabase.auth) api(libs.supabase.storage) } + + val commonIntegrationTest by creating { + dependsOn(commonTest.get()) + + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.io) + implementation(libs.test.turbine) + implementation(libs.test.coroutines) + implementation(libs.test.kotest.assertions) + + implementation(libs.sqldelight.coroutines) + } + } + + // The PowerSync SDK links the core extension, so we can just run tests as-is. + jvmTest.get().dependsOn(commonIntegrationTest) + + // We have special setup in this build configuration to make these tests link the PowerSync extension, so they + // can run integration tests along with the executable for unit testing. + nativeTest.orNull?.dependsOn(commonIntegrationTest) } } diff --git a/connectors/supabase/gradle.properties b/integrations/supabase/gradle.properties similarity index 100% rename from connectors/supabase/gradle.properties rename to integrations/supabase/gradle.properties diff --git a/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt b/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt new file mode 100644 index 00000000..14728a6b --- /dev/null +++ b/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt @@ -0,0 +1,67 @@ +package com.powersync.connector.supabase + +import com.powersync.PowerSyncDatabase +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudTransaction +import com.powersync.db.schema.Column +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.equals.shouldBeEqual +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +class SupabaseConnectorTest { + @Test + fun errorHandling() = + runTest { + val db = + PowerSyncDatabase.inMemory( + scope = this, + schema = + Schema( + Table( + "users", + listOf( + Column.text("name"), + ), + ), + ), + ) + + try { + db.writeTransaction { tx -> + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("a")) + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("b")) + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("c")) + } + + var calledErrorHandler = false + val connector = + object : SupabaseConnector("", "", "") { + override suspend fun uploadCrudEntry(entry: CrudEntry): Unit = + throw Exception("Expected exception, failing in uploadCrudEntry") + + override suspend fun handleError( + tx: CrudTransaction, + entry: CrudEntry, + exception: Exception, + errorCode: String?, + ) { + calledErrorHandler = true + + tx.crud shouldHaveSize 3 + entry shouldBeEqual tx.crud[0] + exception.message shouldBe "Expected exception, failing in uploadCrudEntry" + tx.complete(null) + } + } + + connector.uploadData(db) + calledErrorHandler shouldBe true + } finally { + db.close() + } + } +} diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt similarity index 66% rename from connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt rename to integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt index 12913cda..15ebbc3e 100644 --- a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt +++ b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt @@ -5,6 +5,7 @@ import com.powersync.PowerSyncDatabase import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudTransaction import com.powersync.db.crud.UpdateType import com.powersync.db.runWrapped import io.github.jan.supabase.SupabaseClient @@ -27,12 +28,13 @@ import io.ktor.utils.io.InternalAPI import kotlinx.coroutines.flow.StateFlow import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonPrimitive +import kotlin.toString /** * Get a Supabase token to authenticate against the PowerSync instance. */ @OptIn(SupabaseInternal::class, InternalAPI::class) -public class SupabaseConnector( +public open class SupabaseConnector( public val supabaseClient: SupabaseClient, public val powerSyncEndpoint: String, private val storageBucket: String? = null, @@ -40,7 +42,7 @@ public class SupabaseConnector( private val json = Json { coerceInputValues = true } private var errorCode: String? = null - private object PostgresFatalCodes { + public companion object PostgresFatalCodes { // Using Regex patterns for Postgres error codes private val FATAL_RESPONSE_CODES = listOf( @@ -52,7 +54,7 @@ public class SupabaseConnector( "^42501$".toRegex(), ) - fun isFatalError(code: String): Boolean = + public fun isFatalError(code: String): Boolean = FATAL_RESPONSE_CODES.any { pattern -> pattern.matches(code) } @@ -172,6 +174,78 @@ public class SupabaseConnector( ) } + /** + * Uses the PostgREST APIs to upload a given [entry] to the backend database. + * + * This method should report errors during the upload as an exception that would be caught by [uploadData]. + */ + public open suspend fun uploadCrudEntry(entry: CrudEntry) { + val table = supabaseClient.from(entry.table) + + when (entry.op) { + UpdateType.PUT -> { + val data = + buildMap { + put("id", JsonPrimitive(entry.id)) + entry.opData?.jsonValues?.let { putAll(it) } + } + table.upsert(data) + } + UpdateType.PATCH -> { + table.update(entry.opData!!.jsonValues) { + filter { + eq("id", entry.id) + } + } + } + UpdateType.DELETE -> { + table.delete { + filter { + eq("id", entry.id) + } + } + } + } + } + + /** + * Handles an error during the upload. This method can be overridden to log errors or customize error handling. + * + * By default, it discards the rest of a transaction when the error code indicates that this is a fatal postgres + * error that can't be retried. Otherwise, it rethrows the exception so that the PowerSync SDK will retry. + * + * @param tx The full [CrudTransaction] we're in the process of uploading. + * @param entry The [CrudEntry] for which an upload has failed. + * @param exception The [Exception] thrown by the Supabase client. + * @param [errorCode] The postgres error code, if any. + * @throws Exception If the upload should be retried. If this method doesn't throw, it should mark [tx] as complete + * by invoking [CrudTransaction.complete]. In that case, the local write would be lost. + */ + public open suspend fun handleError( + tx: CrudTransaction, + entry: CrudEntry, + exception: Exception, + errorCode: String?, + ) { + if (errorCode != null && isFatalError(errorCode)) { + /** + * Instead of blocking the queue with these errors, + * discard the (rest of the) transaction. + * + * Note that these errors typically indicate a bug in the application. + * If protecting against data loss is important, save the failing records + * elsewhere instead of discarding, and/or notify the user. + */ + Logger.e("Data upload error: ${exception.message}") + Logger.e("Discarding entry: $entry") + tx.complete(null) + return + } + + Logger.e("Data upload error - retrying last entry: $entry, $exception") + throw exception + } + /** * Upload local changes to the app backend (in this case Supabase). * @@ -186,54 +260,16 @@ public class SupabaseConnector( try { for (entry in transaction.crud) { lastEntry = entry - - val table = supabaseClient.from(entry.table) - - when (entry.op) { - UpdateType.PUT -> { - val data = - buildMap { - put("id", JsonPrimitive(entry.id)) - entry.opData?.jsonValues?.let { putAll(it) } - } - table.upsert(data) - } - UpdateType.PATCH -> { - table.update(entry.opData!!.jsonValues) { - filter { - eq("id", entry.id) - } - } - } - UpdateType.DELETE -> { - table.delete { - filter { - eq("id", entry.id) - } - } - } - } + uploadCrudEntry(entry) } transaction.complete(null) } catch (e: Exception) { - if (errorCode != null && PostgresFatalCodes.isFatalError(errorCode.toString())) { - /** - * Instead of blocking the queue with these errors, - * discard the (rest of the) transaction. - * - * Note that these errors typically indicate a bug in the application. - * If protecting against data loss is important, save the failing records - * elsewhere instead of discarding, and/or notify the user. - */ - Logger.e("Data upload error: ${e.message}") - Logger.e("Discarding entry: $lastEntry") - transaction.complete(null) - return@runWrapped + if (lastEntry != null) { + handleError(transaction, lastEntry, e, errorCode) + } else { + throw e } - - Logger.e("Data upload error - retrying last entry: $lastEntry, $e") - throw e } } } diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt similarity index 100% rename from connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt rename to integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt diff --git a/PowerSyncKotlin/Package.swift b/internal/PowerSyncKotlin/Package.swift similarity index 100% rename from PowerSyncKotlin/Package.swift rename to internal/PowerSyncKotlin/Package.swift diff --git a/PowerSyncKotlin/README.md b/internal/PowerSyncKotlin/README.md similarity index 100% rename from PowerSyncKotlin/README.md rename to internal/PowerSyncKotlin/README.md diff --git a/PowerSyncKotlin/build.gradle.kts b/internal/PowerSyncKotlin/build.gradle.kts similarity index 100% rename from PowerSyncKotlin/build.gradle.kts rename to internal/PowerSyncKotlin/build.gradle.kts diff --git a/PowerSyncKotlin/gradle.properties b/internal/PowerSyncKotlin/gradle.properties similarity index 100% rename from PowerSyncKotlin/gradle.properties rename to internal/PowerSyncKotlin/gradle.properties diff --git a/PowerSyncKotlin/powersync.podspec b/internal/PowerSyncKotlin/powersync.podspec similarity index 90% rename from PowerSyncKotlin/powersync.podspec rename to internal/PowerSyncKotlin/powersync.podspec index 6b965f9a..57131b1a 100644 --- a/PowerSyncKotlin/powersync.podspec +++ b/internal/PowerSyncKotlin/powersync.podspec @@ -12,7 +12,7 @@ Pod::Spec.new do |spec| spec.pod_target_xcconfig = { - 'KOTLIN_PROJECT_PATH' => ':PowerSyncKotlin', + 'KOTLIN_PROJECT_PATH' => ':internal:PowerSyncKotlin', 'PRODUCT_MODULE_NAME' => 'PowerSyncKotlin', } @@ -28,7 +28,7 @@ Pod::Spec.new do |spec| fi set -ev REPO_ROOT="$PODS_TARGET_SRCROOT" - "$REPO_ROOT/../gradlew" -p "$REPO_ROOT" $KOTLIN_PROJECT_PATH:syncFramework \ + "$REPO_ROOT/../../gradlew" -p "$REPO_ROOT" $KOTLIN_PROJECT_PATH:syncFramework \ -Pkotlin.native.cocoapods.platform=$PLATFORM_NAME \ -Pkotlin.native.cocoapods.archs="$ARCHS" \ -Pkotlin.native.cocoapods.configuration="$CONFIGURATION" diff --git a/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Connector.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Connector.kt similarity index 100% rename from PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Connector.kt rename to internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Connector.kt diff --git a/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Locks.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Locks.kt similarity index 100% rename from PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Locks.kt rename to internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/Locks.kt diff --git a/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt b/internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt similarity index 100% rename from PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt rename to internal/PowerSyncKotlin/src/appleMain/kotlin/com/powersync/SDK.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index c66d4964..3a0d5b12 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,17 +28,16 @@ plugins { rootProject.name = "powersync-root" include(":internal:download-core-extension") +include(":internal:PowerSyncKotlin") include(":core") include(":core-tests-android") -include(":connectors:supabase") include(":integrations:room") include(":static-sqlite-driver") include(":integrations:sqldelight") include(":integrations:sqldelight-test-database") - -include(":PowerSyncKotlin") +include(":integrations:supabase") include(":compose")