diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 6b1bbbd..a70c687 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -19,3 +19,20 @@ jobs: xcodebuild test -scheme PowerSync -destination "platform=iOS Simulator,name=iPhone 16" xcodebuild test -scheme PowerSync -destination "platform=macOS,arch=arm64,name=My Mac" xcodebuild test -scheme PowerSync -destination "platform=watchOS Simulator,arch=arm64,name=Apple Watch Ultra 2 (49mm)" + + buildSwift6: + name: Build and test with Swift 6 + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - name: Set up XCode + uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: latest-stable + - name: Use Swift 6 + run: | + sed -i '' 's|// swift-tools-version:[0-9.]*|// swift-tools-version:6.1|' Package.swift + - name: Build and Test + run: | + swift build -Xswiftc -strict-concurrency=complete + swift test -Xswiftc -strict-concurrency=complete diff --git a/.gitignore b/.gitignore index fb8464f..79542bb 100644 --- a/.gitignore +++ b/.gitignore @@ -69,6 +69,7 @@ DerivedData/ .swiftpm/configuration/registries.json .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .netrc - +.vscode +.sourcekit-lsp Secrets.swift \ No newline at end of file diff --git a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 4dfb9de..547bdd5 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "2d885a1b46f17f9239b7876e3889168a6de98024718f2d7af03aede290c8a86a", + "originHash" : "33297127250b66812faa920958a24bae46bf9e9d1c38ea6b84ca413efaf16afd", "pins" : [ { "identity" : "anycodable", @@ -15,8 +15,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "state" : { - "revision" : "21057135ce8269b43582022aa4ca56407332e6a8", - "version" : "0.4.2" + "revision" : "3396dd7eb9d4264b19e3d95bfe0d77347826f4c2", + "version" : "0.4.4" } }, { diff --git a/Demo/PowerSyncExample/PowerSync/SystemManager.swift b/Demo/PowerSyncExample/PowerSync/SystemManager.swift index 4737c52..b2c08bc 100644 --- a/Demo/PowerSyncExample/PowerSync/SystemManager.swift +++ b/Demo/PowerSyncExample/PowerSync/SystemManager.swift @@ -13,6 +13,7 @@ func getAttachmentsDirectoryPath() throws -> String { let logTag = "SystemManager" +@MainActor @Observable class SystemManager { let connector = SupabaseConnector() @@ -225,23 +226,23 @@ class SystemManager { if let attachments, let photoId = todo.photoId { try await attachments.deleteFile( attachmentId: photoId - ) { tx, _ in + ) { transaction, _ in try self.deleteTodoInTX( id: todo.id, - tx: tx + tx: transaction ) } } else { - try await db.writeTransaction { tx in + try await db.writeTransaction { transaction in try self.deleteTodoInTX( id: todo.id, - tx: tx + tx: transaction ) } } } - private func deleteTodoInTX(id: String, tx: ConnectionContext) throws { + private nonisolated func deleteTodoInTX(id: String, tx: ConnectionContext) throws { _ = try tx.execute( sql: "DELETE FROM \(TODOS_TABLE) WHERE id = ?", parameters: [id] diff --git a/Demo/StructuredQueriesExample/README.md b/Demo/StructuredQueriesExample/README.md new file mode 100644 index 0000000..b143d70 --- /dev/null +++ b/Demo/StructuredQueriesExample/README.md @@ -0,0 +1,9 @@ +# Structured Queries Example + +This used PowerSync along side the [structured-queries](https://github.com/pointfreeco/swift-structured-queries) library to perform DB operations using a typed query builder syntax. + +This demo can be executed with + +```bash +swift run StructuredQueriesExample +``` diff --git a/Demo/StructuredQueriesExample/Sources/entry.swift b/Demo/StructuredQueriesExample/Sources/entry.swift new file mode 100644 index 0000000..ffcd74f --- /dev/null +++ b/Demo/StructuredQueriesExample/Sources/entry.swift @@ -0,0 +1,99 @@ +import Foundation +import PowerSync +import PowerSyncStructuredQueries +import StructuredQueries + +@Table("users") +struct User { + var id: String + var name: String + var birthday: Date? +} + +@Table("posts") +struct Post { + var id: String + var description: String + @Column("user_id") + var userId: String +} + +@Selection +struct JoinedResult { + let postDescription: String + let userName: String +} + +@main +struct Main { + static func main() async throws { + // TODO, check if the schema can be shared in some way + let powersync = PowerSyncDatabase( + schema: Schema( + tables: [ + Table( + name: "users", + columns: [ + .text("name"), + .text("birthday") + ] + ), + Table( + name: "posts", + columns: [ + .text("description"), + .text("user_id") + ] + ), + ], + ), + dbFilename: "tests.sqlite" + ) + + let testUserID = UUID().uuidString + + try await User.insert { + ($0.id, $0.name, $0.birthday) + } values: { + (testUserID, "Steven", Date()) + }.execute(powersync) + + try await User.insert { + User( + id: UUID().uuidString, + name: "Nevets" + ) + }.execute(powersync) + + let users = try await User.all.fetchAll(powersync) + print("The users are:") + for user in users { + print(user) + } + + let posts = try await Post.all.fetchAll(powersync) + print("The posts are:") + for post in posts { + print(post) + } + + try await Post.insert { + Post( + id: UUID().uuidString, description: "A Post", userId: testUserID + ) + }.execute(powersync) + + print("Joined posts are:") + let joinedPosts = try await Post.join(User.all) { $0.userId == $1.id } + .select { + JoinedResult.Columns( + postDescription: $0.description, + userName: $1.name + ) + } + .fetchAll(powersync) + for joinedResult in joinedPosts { + print(joinedResult) + } + } +} diff --git a/Package.resolved b/Package.resolved index 86aa79f..9f0c45d 100644 --- a/Package.resolved +++ b/Package.resolved @@ -8,6 +8,60 @@ "revision" : "3396dd7eb9d4264b19e3d95bfe0d77347826f4c2", "version" : "0.4.4" } + }, + { + "identity" : "swift-custom-dump", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-custom-dump", + "state" : { + "revision" : "82645ec760917961cfa08c9c0c7104a57a0fa4b1", + "version" : "1.3.3" + } + }, + { + "identity" : "swift-snapshot-testing", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-snapshot-testing", + "state" : { + "revision" : "d7e40607dcd6bc26543f5d9433103f06e0b28f8f", + "version" : "1.18.6" + } + }, + { + "identity" : "swift-structured-queries", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-structured-queries", + "state" : { + "revision" : "2468f4e34d909d11c053d773562c03ffea40a72e", + "version" : "0.12.1" + } + }, + { + "identity" : "swift-syntax", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swiftlang/swift-syntax", + "state" : { + "revision" : "f99ae8aa18f0cf0d53481901f88a0991dc3bd4a2", + "version" : "601.0.1" + } + }, + { + "identity" : "swift-tagged", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-tagged", + "state" : { + "revision" : "3907a9438f5b57d317001dc99f3f11b46882272b", + "version" : "0.10.0" + } + }, + { + "identity" : "xctest-dynamic-overlay", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/xctest-dynamic-overlay", + "state" : { + "revision" : "b2ed9eabefe56202ee4939dd9fc46b6241c88317", + "version" : "1.6.1" + } } ], "version" : 2 diff --git a/Package.swift b/Package.swift index fdae3ac..ff61b9b 100644 --- a/Package.swift +++ b/Package.swift @@ -2,11 +2,12 @@ // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription + let packageName = "PowerSync" // Set this to the absolute path of your Kotlin SDK checkout if you want to use a local Kotlin // build. Also see docs/LocalBuild.md for details -let localKotlinSdkOverride: String? = nil +let localKotlinSdkOverride: String? = "/Users/stevenontong/Documents/platform_code/powersync/powersync-kotlin" // Set this to the absolute path of your powersync-sqlite-core checkout if you want to use a // local build of the core extension. @@ -53,15 +54,22 @@ let package = Package( platforms: [ .iOS(.v13), .macOS(.v10_15), - .watchOS(.v9) + .watchOS(.v9), ], products: [ // Products define the executables and libraries a package produces, making them visible to other packages. .library( name: packageName, - targets: ["PowerSync"]), + targets: ["PowerSync"] + ), + .library( + name: "PowerSyncStructuredQueries", + targets: ["PowerSyncStructuredQueries"] + ), ], - dependencies: conditionalDependencies, + dependencies: [ + .package(url: "https://github.com/pointfreeco/swift-structured-queries", from: "0.4.0"), + ] + conditionalDependencies, targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products from dependencies. @@ -69,11 +77,27 @@ let package = Package( name: packageName, dependencies: [ kotlinTargetDependency, - .product(name: "PowerSyncSQLiteCore", package: corePackageName) - ]), + .product(name: "PowerSyncSQLiteCore", package: corePackageName), + ] + ), .testTarget( name: "PowerSyncTests", - dependencies: ["PowerSync"] + dependencies: ["PowerSync"], + ), + .target( + name: "PowerSyncStructuredQueries", + dependencies: [ + .byName(name: packageName), + .product(name: "StructuredQueries", package: "swift-structured-queries"), + ], + path: "Sources/StructuredQueries" + ), + .executableTarget( + name: "StructuredQueriesExample", + dependencies: [ + .byName(name: "PowerSyncStructuredQueries"), + ], + path: "Demo/StructuredQueriesExample" ), ] + conditionalTargets ) diff --git a/README.md b/README.md index 730a12c..0b3a7a6 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,22 @@ This is the PowerSync SDK for Swift clients. The SDK reference is available [her ## Structure: Packages -- [Sources](./Sources/) +- [Sources](./Sources/PowerSync) - This is the Swift SDK implementation. +- [Sources](./Sources/StructuredQueries) + + - A typed query builder using [structured-queries](https://github.com/pointfreeco/swift-structured-queries). + ## Demo Apps / Example Projects The easiest way to test the PowerSync Swift SDK is to run our demo application. - [Demo/PowerSyncExample](./Demo/README.md): A simple to-do list application demonstrating the use of the PowerSync Swift SDK using a Supabase connector. +- [Demo/StructuredQueriesExample](./Demo/StructuredQueriesExample/README.md): A simple Swift executable which uses the `PowerSyncStructuredQueries` library. + ## Installation Add diff --git a/Sources/PowerSync/Kotlin/DatabaseLogger.swift b/Sources/PowerSync/Kotlin/DatabaseLogger.swift index 852dc07..ed49d5a 100644 --- a/Sources/PowerSync/Kotlin/DatabaseLogger.swift +++ b/Sources/PowerSync/Kotlin/DatabaseLogger.swift @@ -6,7 +6,7 @@ import PowerSyncKotlin private class KermitLogWriterAdapter: Kermit_coreLogWriter { /// The underlying Swift log writer to forward log messages to. let logger: any LoggerProtocol - + /// Initializes a new adapter. /// /// - Parameter logger: A Swift log writer that will handle log output. @@ -14,7 +14,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { self.logger = logger super.init() } - + /// Called by Kermit to log a message. /// /// - Parameters: @@ -22,7 +22,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { /// - message: The content of the log message. /// - tag: A string categorizing the log. /// - throwable: An optional Kotlin exception (ignored here). - override func log(severity: Kermit_coreSeverity, message: String, tag: String, throwable: KotlinThrowable?) { + override func log(severity: Kermit_coreSeverity, message: String, tag: String, throwable _: KotlinThrowable?) { switch severity { case PowerSyncKotlin.Kermit_coreSeverity.verbose: return logger.debug(message, tag: tag) @@ -43,7 +43,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { class KotlinKermitLoggerConfig: PowerSyncKotlin.Kermit_coreLoggerConfig { var logWriterList: [Kermit_coreLogWriter] var minSeverity: PowerSyncKotlin.Kermit_coreSeverity - + init(logWriterList: [Kermit_coreLogWriter], minSeverity: PowerSyncKotlin.Kermit_coreSeverity) { self.logWriterList = logWriterList self.minSeverity = minSeverity @@ -54,18 +54,18 @@ class KotlinKermitLoggerConfig: PowerSyncKotlin.Kermit_coreLoggerConfig { /// /// This class bridges Swift log writers with the Kotlin logging system and supports /// runtime configuration of severity levels and writer lists. -class DatabaseLogger: LoggerProtocol { +final class DatabaseLogger: LoggerProtocol { /// The underlying Kermit logger instance provided by the PowerSyncKotlin SDK. public let kLogger: PowerSyncKotlin.KermitLogger public let logger: any LoggerProtocol - + /// Initializes a new logger with an optional list of writers. /// /// - Parameter logger: A logger which will be called for each internal log operation init(_ logger: any LoggerProtocol) { self.logger = logger // Set to the lowest severity. The provided logger should filter by severity - self.kLogger = PowerSyncKotlin.KermitLogger( + kLogger = PowerSyncKotlin.KermitLogger( config: KotlinKermitLoggerConfig( logWriterList: [KermitLogWriterAdapter(logger: logger)], minSeverity: Kermit_coreSeverity.verbose @@ -73,27 +73,27 @@ class DatabaseLogger: LoggerProtocol { tag: "PowerSync" ) } - + /// Logs a debug-level message. public func debug(_ message: String, tag: String?) { logger.debug(message, tag: tag) } - + /// Logs an info-level message. public func info(_ message: String, tag: String?) { logger.info(message, tag: tag) } - + /// Logs a warning-level message. public func warning(_ message: String, tag: String?) { logger.warning(message, tag: tag) } - + /// Logs an error-level message. public func error(_ message: String, tag: String?) { logger.error(message, tag: tag) } - + /// Logs a fault (assert-level) message, typically used for critical issues. public func fault(_ message: String, tag: String?) { logger.fault(message, tag: tag) diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index d78b370..ab59b70 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -1,7 +1,7 @@ import Foundation import PowerSyncKotlin -final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { +final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol, @unchecked Sendable { let logger: any LoggerProtocol private let kotlinDatabase: PowerSyncKotlin.PowerSyncDatabase @@ -98,7 +98,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } @discardableResult - func execute(sql: String, parameters: [Any?]?) async throws -> Int64 { + func execute(sql: String, parameters: [Sendable?]?) async throws -> Int64 { try await writeTransaction { ctx in try ctx.execute( sql: sql, @@ -107,10 +107,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> RowType { try await readLock { ctx in try ctx.get( @@ -121,10 +121,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType { try await readLock { ctx in try ctx.get( @@ -135,10 +135,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> [RowType] { try await readLock { ctx in try ctx.getAll( @@ -149,10 +149,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] { try await readLock { ctx in try ctx.getAll( @@ -163,10 +163,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> RowType? { try await readLock { ctx in try ctx.getOptional( @@ -177,10 +177,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? { try await readLock { ctx in try ctx.getOptional( @@ -191,10 +191,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func watch( + func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) throws -> AsyncThrowingStream<[RowType], any Error> { try watch( options: WatchOptions( @@ -205,10 +205,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { ) } - func watch( + func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], any Error> { try watch( options: WatchOptions( @@ -219,7 +219,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { ) } - func watch( + func watch( options: WatchOptions ) throws -> AsyncThrowingStream<[RowType], Error> { AsyncThrowingStream { continuation in @@ -269,62 +269,54 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func writeLock( + func writeLock( callback: @escaping (any ConnectionContext) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( await kotlinDatabase.writeLock( - callback: LockCallback( - callback: callback - ) + callback: wrapLockContext(callback: callback) ), to: R.self ) } } - func writeTransaction( + func writeTransaction( callback: @escaping (any Transaction) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( await kotlinDatabase.writeTransaction( - callback: TransactionCallback( - callback: callback - ) + callback: wrapTransactionContext(callback: callback) ), to: R.self ) } } - func readLock( - callback: @escaping (any ConnectionContext) throws -> R + func readLock( + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( await kotlinDatabase.readLock( - callback: LockCallback( - callback: callback - ) + callback: wrapLockContext(callback: callback) ), to: R.self ) } } - func readTransaction( + func readTransaction( callback: @escaping (any Transaction) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( await kotlinDatabase.readTransaction( - callback: TransactionCallback( - callback: callback - ) + callback: wrapTransactionContext(callback: callback) ), to: R.self ) @@ -336,7 +328,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } /// Tries to convert Kotlin PowerSyncExceptions to Swift Exceptions - private func wrapPowerSyncException( + private func wrapPowerSyncException( handler: () async throws -> R) async throws -> R { @@ -356,7 +348,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { private func getQuerySourceTables( sql: String, - parameters: [Any?] + parameters: [Sendable?] ) async throws -> Set { let rows = try await getAll( sql: "EXPLAIN \(sql)", @@ -372,11 +364,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } ) - let rootPages = rows.compactMap { r in - if (r.opcode == "OpenRead" || r.opcode == "OpenWrite") && - r.p3 == 0 && r.p2 != 0 + let rootPages = rows.compactMap { row in + if (row.opcode == "OpenRead" || row.opcode == "OpenWrite") && + row.p3 == 0 && row.p2 != 0 { - return r.p2 + return row.p2 } return nil } @@ -389,11 +381,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { message: "Failed to convert pages data to UTF-8 string" ) } - + let tableRows = try await getAll( sql: "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", parameters: [ - pagesString + pagesString, ] ) { try $0.getString(index: 0) } @@ -414,3 +406,50 @@ private struct ExplainQueryResult { let p2: Int64 let p3: Int64 } + +extension Error { + func toPowerSyncError() -> PowerSyncKotlin.PowerSyncException { + return PowerSyncKotlin.PowerSyncException( + message: localizedDescription, + cause: PowerSyncKotlin.KotlinThrowable(message: localizedDescription) + ) + } +} + +func wrapLockContext( + callback: @escaping (any ConnectionContext) throws -> Any +) throws -> PowerSyncKotlin.ThrowableLockCallback { + PowerSyncKotlin.wrapContextHandler { kotlinContext in + do { + return try PowerSyncKotlin.PowerSyncResult.Success( + value: callback( + KotlinConnectionContext( + ctx: kotlinContext + ) + )) + } catch { + return PowerSyncKotlin.PowerSyncResult.Failure( + exception: error.toPowerSyncError() + ) + } + } +} + +func wrapTransactionContext( + callback: @escaping (any Transaction) throws -> Any +) throws -> PowerSyncKotlin.ThrowableTransactionCallback { + PowerSyncKotlin.wrapTransactionContextHandler { kotlinContext in + do { + return try PowerSyncKotlin.PowerSyncResult.Success( + value: callback( + KotlinTransactionContext( + ctx: kotlinContext + ) + )) + } catch { + return PowerSyncKotlin.PowerSyncResult.Failure( + exception: error.toPowerSyncError() + ) + } + } +} diff --git a/Sources/PowerSync/Kotlin/KotlinTypes.swift b/Sources/PowerSync/Kotlin/KotlinTypes.swift index 18edcbd..e0e80de 100644 --- a/Sources/PowerSync/Kotlin/KotlinTypes.swift +++ b/Sources/PowerSync/Kotlin/KotlinTypes.swift @@ -1,5 +1,10 @@ import PowerSyncKotlin -typealias KotlinPowerSyncBackendConnector = PowerSyncKotlin.PowerSyncBackendConnector +typealias KotlinPowerSyncBackendConnector = PowerSyncKotlin.SwiftPowerSyncBackendConnector typealias KotlinPowerSyncCredentials = PowerSyncKotlin.PowerSyncCredentials typealias KotlinPowerSyncDatabase = PowerSyncKotlin.PowerSyncDatabase + +extension KotlinPowerSyncBackendConnector: @retroactive @unchecked Sendable {} +extension KotlinPowerSyncCredentials: @retroactive @unchecked Sendable {} +extension PowerSyncKotlin.KermitLogger: @retroactive @unchecked Sendable {} +extension PowerSyncKotlin.SyncStatus: @retroactive @unchecked Sendable {} diff --git a/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift b/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift index 8e8da4c..01ea256 100644 --- a/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift +++ b/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift @@ -1,6 +1,9 @@ import OSLog -class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { +final class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector, + // We need to declare this since we declared KotlinPowerSyncBackendConnector as @unchecked Sendable + @unchecked Sendable +{ let swiftBackendConnector: PowerSyncBackendConnector let db: any PowerSyncDatabaseProtocol let logTag = "PowerSyncBackendConnector" @@ -26,7 +29,7 @@ class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { } } - override func __uploadData(database: KotlinPowerSyncDatabase) async throws { + override func __performUpload() async throws { do { // Pass the Swift DB protocal to the connector return try await swiftBackendConnector.uploadData(database: db) diff --git a/Sources/PowerSync/Kotlin/SafeCastError.swift b/Sources/PowerSync/Kotlin/SafeCastError.swift index 35ef8cb..bd18664 100644 --- a/Sources/PowerSync/Kotlin/SafeCastError.swift +++ b/Sources/PowerSync/Kotlin/SafeCastError.swift @@ -1,7 +1,7 @@ import Foundation enum SafeCastError: Error, CustomStringConvertible { - case typeMismatch(expected: Any.Type, actual: Any?) + case typeMismatch(expected: String, actual: String?) var description: String { switch self { @@ -25,6 +25,6 @@ func safeCast(_ value: Any?, to type: T.Type) throws -> T { if let castedValue = value as? T { return castedValue } else { - throw SafeCastError.typeMismatch(expected: type, actual: value) + throw SafeCastError.typeMismatch(expected: "\(type)", actual: "\(value ?? "nil")") } } diff --git a/Sources/PowerSync/Kotlin/TransactionCallback.swift b/Sources/PowerSync/Kotlin/TransactionCallback.swift deleted file mode 100644 index 78f460d..0000000 --- a/Sources/PowerSync/Kotlin/TransactionCallback.swift +++ /dev/null @@ -1,67 +0,0 @@ -import PowerSyncKotlin - -/// Internal Wrapper for Kotlin lock context lambdas -class LockCallback: PowerSyncKotlin.ThrowableLockCallback { - let callback: (ConnectionContext) throws -> R - - init(callback: @escaping (ConnectionContext) throws -> R) { - self.callback = callback - } - - // The Kotlin SDK does not gracefully handle exceptions thrown from Swift callbacks. - // If a Swift callback throws an exception, it results in a `BAD ACCESS` crash. - // - // To prevent this, we catch the exception and return it as a `PowerSyncException`, - // allowing Kotlin to propagate the error correctly. - // - // This approach is a workaround. Ideally, we should introduce an internal mechanism - // in the Kotlin SDK to handle errors from Swift more robustly. - // - // Currently, we wrap the public `PowerSyncDatabase` class in Kotlin, which limits our - // ability to handle exceptions cleanly. Instead, we should expose an internal implementation - // from a "core" package in Kotlin that provides better control over exception handling - // and other functionality—without modifying the public `PowerSyncDatabase` API to include - // Swift-specific logic. - func execute(context: PowerSyncKotlin.ConnectionContext) throws -> Any { - do { - return try callback( - KotlinConnectionContext( - ctx: context - ) - ) - } catch { - return PowerSyncKotlin.PowerSyncException( - message: error.localizedDescription, - cause: PowerSyncKotlin.KotlinThrowable( - message: error.localizedDescription - ) - ) - } - } -} - -/// Internal Wrapper for Kotlin transaction context lambdas -class TransactionCallback: PowerSyncKotlin.ThrowableTransactionCallback { - let callback: (Transaction) throws -> R - - init(callback: @escaping (Transaction) throws -> R) { - self.callback = callback - } - - func execute(transaction: PowerSyncKotlin.PowerSyncTransaction) throws -> Any { - do { - return try callback( - KotlinTransactionContext( - ctx: transaction - ) - ) - } catch { - return PowerSyncKotlin.PowerSyncException( - message: error.localizedDescription, - cause: PowerSyncKotlin.KotlinThrowable( - message: error.localizedDescription - ) - ) - } - } -} diff --git a/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift b/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift index dbfca2d..0b7f314 100644 --- a/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift +++ b/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift @@ -10,17 +10,17 @@ protocol KotlinConnectionContextProtocol: ConnectionContext { /// Implements most of `ConnectionContext` using the `ctx` provided. extension KotlinConnectionContextProtocol { - func execute(sql: String, parameters: [Any?]?) throws -> Int64 { + func execute(sql: String, parameters: [Sendable?]?) throws -> Int64 { try ctx.execute( sql: sql, parameters: mapParameters(parameters) ) } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> RowType? { return try wrapQueryCursorTyped( mapper: mapper, @@ -35,10 +35,10 @@ extension KotlinConnectionContextProtocol { ) } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> [RowType] { return try wrapQueryCursorTyped( mapper: mapper, @@ -53,10 +53,10 @@ extension KotlinConnectionContextProtocol { ) } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> RowType { return try wrapQueryCursorTyped( mapper: mapper, @@ -72,7 +72,10 @@ extension KotlinConnectionContextProtocol { } } -class KotlinConnectionContext: KotlinConnectionContextProtocol { +final class KotlinConnectionContext: KotlinConnectionContextProtocol, + // The Kotlin ConnectionContext is technically sendable, but we cannot annotate that + @unchecked Sendable +{ let ctx: PowerSyncKotlin.ConnectionContext init(ctx: PowerSyncKotlin.ConnectionContext) { @@ -80,7 +83,10 @@ class KotlinConnectionContext: KotlinConnectionContextProtocol { } } -class KotlinTransactionContext: Transaction, KotlinConnectionContextProtocol { +final class KotlinTransactionContext: Transaction, KotlinConnectionContextProtocol, + // The Kotlin ConnectionContext is technically sendable, but we cannot annotate that + @unchecked Sendable +{ let ctx: PowerSyncKotlin.ConnectionContext init(ctx: PowerSyncKotlin.PowerSyncTransaction) { diff --git a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift index b71615f..db8ce2d 100644 --- a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift +++ b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift @@ -2,7 +2,7 @@ import Combine import Foundation import PowerSyncKotlin -class KotlinSyncStatus: KotlinSyncStatusDataProtocol, SyncStatus { +final class KotlinSyncStatus: KotlinSyncStatusDataProtocol, SyncStatus { private let baseStatus: PowerSyncKotlin.SyncStatus var base: any PowerSyncKotlin.SyncStatusData { diff --git a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift index 0d2d759..df64951 100644 --- a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift +++ b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift @@ -6,7 +6,10 @@ protocol KotlinSyncStatusDataProtocol: SyncStatusData { var base: PowerSyncKotlin.SyncStatusData { get } } -struct KotlinSyncStatusData: KotlinSyncStatusDataProtocol { +struct KotlinSyncStatusData: KotlinSyncStatusDataProtocol, + // We can't override the PowerSyncKotlin.SyncStatusData's Sendable status + @unchecked Sendable +{ let base: PowerSyncKotlin.SyncStatusData } @@ -15,19 +18,19 @@ extension KotlinSyncStatusDataProtocol { var connected: Bool { base.connected } - + var connecting: Bool { base.connecting } - + var downloading: Bool { base.downloading } - + var uploading: Bool { base.uploading } - + var lastSyncedAt: Date? { guard let lastSyncedAt = base.lastSyncedAt else { return nil } return Date( @@ -36,32 +39,32 @@ extension KotlinSyncStatusDataProtocol { ) ) } - + var downloadProgress: (any SyncDownloadProgress)? { guard let kotlinProgress = base.downloadProgress else { return nil } return KotlinSyncDownloadProgress(progress: kotlinProgress) } - + var hasSynced: Bool? { base.hasSynced?.boolValue } - + var uploadError: Any? { base.uploadError } - + var downloadError: Any? { base.downloadError } - + var anyError: Any? { base.anyError } - + public var priorityStatusEntries: [PriorityStatusEntry] { base.priorityStatusEntries.map { mapPriorityStatus($0) } } - + public func statusForPriority(_ priority: BucketPriority) -> PriorityStatusEntry { mapPriorityStatus( base.statusForPriority( @@ -69,7 +72,7 @@ extension KotlinSyncStatusDataProtocol { ) ) } - + private func mapPriorityStatus(_ status: PowerSyncKotlin.PriorityStatusEntry) -> PriorityStatusEntry { var lastSyncedAt: Date? if let syncedAt = status.lastSyncedAt { @@ -77,7 +80,7 @@ extension KotlinSyncStatusDataProtocol { timeIntervalSince1970: Double(syncedAt.epochSeconds) ) } - + return PriorityStatusEntry( priority: BucketPriority(status.priority), lastSyncedAt: lastSyncedAt, @@ -94,7 +97,7 @@ extension KotlinProgressWithOperationsProtocol { var totalOperations: Int32 { return base.totalOperations } - + var downloadedOperations: Int32 { return base.downloadedOperations } @@ -106,11 +109,11 @@ struct KotlinProgressWithOperations: KotlinProgressWithOperationsProtocol { struct KotlinSyncDownloadProgress: KotlinProgressWithOperationsProtocol, SyncDownloadProgress { let progress: PowerSyncKotlin.SyncDownloadProgress - + var base: any PowerSyncKotlin.ProgressWithOperations { progress } - + func untilPriority(priority: BucketPriority) -> any ProgressWithOperations { return KotlinProgressWithOperations(base: progress.untilPriority(priority: priority.priorityCode)) } diff --git a/Sources/PowerSync/Kotlin/wrapQueryCursor.swift b/Sources/PowerSync/Kotlin/wrapQueryCursor.swift index 05a99ca..ebdd33a 100644 --- a/Sources/PowerSync/Kotlin/wrapQueryCursor.swift +++ b/Sources/PowerSync/Kotlin/wrapQueryCursor.swift @@ -14,9 +14,9 @@ import PowerSyncKotlin /// and other functionality—without modifying the public `PowerSyncDatabase` API to include /// Swift-specific logic. func wrapQueryCursor( - mapper: @escaping (SqlCursor) throws -> RowType, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType, // The Kotlin APIs return the results as Any, we can explicitly cast internally - executor: @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> ReturnType + executor: @Sendable @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> ReturnType ) throws -> ReturnType { var mapperException: Error? @@ -36,7 +36,7 @@ func wrapQueryCursor( } let executionResult = try executor(wrappedMapper) - + if let mapperException { // Allow propagating the error throw mapperException @@ -45,15 +45,14 @@ func wrapQueryCursor( return executionResult } - -func wrapQueryCursorTyped( - mapper: @escaping (SqlCursor) throws -> RowType, +func wrapQueryCursorTyped( + mapper: @Sendable @escaping (SqlCursor) throws -> RowType, // The Kotlin APIs return the results as Any, we can explicitly cast internally - executor: @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> Any?, + executor: @Sendable @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> Any?) throws -> Any?, resultType: ReturnType.Type ) throws -> ReturnType { return try safeCast( - wrapQueryCursor( + wrapQueryCursor( mapper: mapper, executor: executor ), to: @@ -61,7 +60,6 @@ func wrapQueryCursorTyped( ) } - /// Throws a `PowerSyncException` using a helper provided by the Kotlin SDK. /// We can't directly throw Kotlin `PowerSyncException`s from Swift, but we can delegate the throwing /// to the Kotlin implementation. @@ -73,7 +71,7 @@ func wrapQueryCursorTyped( /// to any calling Kotlin stack. /// This only works for SKIEE methods which have an associated completion handler which handles annotated errors. /// This seems to only apply for Kotlin suspending function bindings. -func throwKotlinPowerSyncError (message: String, cause: String? = nil) throws { +func throwKotlinPowerSyncError(message: String, cause: String? = nil) throws { try throwPowerSyncException( exception: PowerSyncKotlin.PowerSyncException( message: message, diff --git a/Sources/PowerSync/Logger.swift b/Sources/PowerSync/Logger.swift index 988d013..cd1c06c 100644 --- a/Sources/PowerSync/Logger.swift +++ b/Sources/PowerSync/Logger.swift @@ -4,7 +4,6 @@ import OSLog /// /// This writer uses `os.Logger` on iOS/macOS/tvOS/watchOS 14+ and falls back to `print` for earlier versions. public class PrintLogWriter: LogWriterProtocol { - private let subsystem: String private let category: String private lazy var logger: Any? = { @@ -13,17 +12,18 @@ public class PrintLogWriter: LogWriterProtocol { } return nil }() - + /// Creates a new PrintLogWriter /// - Parameters: /// - subsystem: The subsystem identifier (typically reverse DNS notation of your app) /// - category: The category within your subsystem public init(subsystem: String = Bundle.main.bundleIdentifier ?? "com.powersync.logger", - category: String = "default") { + category: String = "default") + { self.subsystem = subsystem self.category = category } - + /// Logs a message with a given severity and optional tag. /// - Parameters: /// - severity: The severity level of the message. @@ -32,10 +32,10 @@ public class PrintLogWriter: LogWriterProtocol { public func log(severity: LogSeverity, message: String, tag: String?) { let tagPrefix = tag.map { !$0.isEmpty ? "[\($0)] " : "" } ?? "" let formattedMessage = "\(tagPrefix)\(message)" - + if #available(iOS 14.0, macOS 11.0, tvOS 14.0, watchOS 7.0, *) { guard let logger = logger as? Logger else { return } - + switch severity { case .info: logger.info("\(formattedMessage, privacy: .public)") @@ -54,58 +54,65 @@ public class PrintLogWriter: LogWriterProtocol { } } +/// A default logger configuration that uses `PrintLogWriter` and filters messages by minimum severity. +public final class DefaultLogger: LoggerProtocol, + // The shared state is guarded by the DispatchQueue + @unchecked Sendable +{ + private var minSeverity: LogSeverity + private var writers: [any LogWriterProtocol] + private let queue = DispatchQueue(label: "DefaultLogger.queue") - -/// A default logger configuration that uses `PrintLogWritter` and filters messages by minimum severity. -public class DefaultLogger: LoggerProtocol { - public var minSeverity: LogSeverity - public var writers: [any LogWriterProtocol] - /// Initializes the default logger with an optional minimum severity level. /// /// - Parameters /// - minSeverity: The minimum severity level to log. Defaults to `.debug`. /// - writers: Optional writers which logs should be written to. Defaults to a `PrintLogWriter`. - public init(minSeverity: LogSeverity = .debug, writers: [any LogWriterProtocol]? = nil ) { - self.writers = writers ?? [ PrintLogWriter() ] + public init(minSeverity: LogSeverity = .debug, writers: [any LogWriterProtocol]? = nil) { + self.writers = writers ?? [PrintLogWriter()] self.minSeverity = minSeverity } - - public func setWriters(_ writters: [any LogWriterProtocol]) { - self.writers = writters + + public func setWriters(_ writers: [any LogWriterProtocol]) { + queue.sync { + self.writers = writers + } } - + public func setMinSeverity(_ severity: LogSeverity) { - self.minSeverity = severity + queue.sync { + minSeverity = severity + } } - - + public func debug(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.debug, tag: tag) + writeLog(message, severity: LogSeverity.debug, tag: tag) } - + public func error(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.error, tag: tag) + writeLog(message, severity: LogSeverity.error, tag: tag) } - + public func info(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.info, tag: tag) + writeLog(message, severity: LogSeverity.info, tag: tag) } - + public func warning(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.warning, tag: tag) + writeLog(message, severity: LogSeverity.warning, tag: tag) } - + public func fault(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.fault, tag: tag) + writeLog(message, severity: LogSeverity.fault, tag: tag) } - + private func writeLog(_ message: String, severity: LogSeverity, tag: String?) { - if (severity.rawValue < self.minSeverity.rawValue) { + let currentSeverity = queue.sync { minSeverity } + + if severity.rawValue < currentSeverity.rawValue { return } - - for writer in self.writers { + + for writer in writers { writer.log(severity: severity, message: message, tag: tag) } } diff --git a/Sources/PowerSync/PowerSyncCredentials.swift b/Sources/PowerSync/PowerSyncCredentials.swift index b1e1d27..1de8bae 100644 --- a/Sources/PowerSync/PowerSyncCredentials.swift +++ b/Sources/PowerSync/PowerSyncCredentials.swift @@ -1,10 +1,9 @@ import Foundation - /// /// Temporary credentials to connect to the PowerSync service. /// -public struct PowerSyncCredentials: Codable { +public struct PowerSyncCredentials: Codable, Sendable { /// PowerSync endpoint, e.g. "https://myinstance.powersync.co". public let endpoint: String @@ -14,17 +13,18 @@ public struct PowerSyncCredentials: Codable { /// User ID. @available(*, deprecated, message: "This value is not used anymore.") public let userId: String? = nil - + enum CodingKeys: String, CodingKey { - case endpoint - case token - } + case endpoint + case token + } @available(*, deprecated, message: "Use init(endpoint:token:) instead. `userId` is no longer used.") public init( endpoint: String, token: String, - userId: String? = nil) { + userId _: String? = nil + ) { self.endpoint = endpoint self.token = token } @@ -34,12 +34,12 @@ public struct PowerSyncCredentials: Codable { self.token = token } - internal init(kotlin: KotlinPowerSyncCredentials) { - self.endpoint = kotlin.endpoint - self.token = kotlin.token + init(kotlin: KotlinPowerSyncCredentials) { + endpoint = kotlin.endpoint + token = kotlin.token } - internal var kotlinCredentials: KotlinPowerSyncCredentials { + var kotlinCredentials: KotlinPowerSyncCredentials { return KotlinPowerSyncCredentials(endpoint: endpoint, token: token, userId: nil) } diff --git a/Sources/PowerSync/Protocol/LoggerProtocol.swift b/Sources/PowerSync/Protocol/LoggerProtocol.swift index f2c3396..2169f86 100644 --- a/Sources/PowerSync/Protocol/LoggerProtocol.swift +++ b/Sources/PowerSync/Protocol/LoggerProtocol.swift @@ -1,4 +1,4 @@ -public enum LogSeverity: Int, CaseIterable { +public enum LogSeverity: Int, CaseIterable, Sendable { /// Detailed information typically used for debugging. case debug = 0 @@ -47,35 +47,35 @@ public protocol LogWriterProtocol { /// A protocol defining the interface for a logger that supports severity filtering and multiple writers. /// /// Conformers provide logging APIs and manage attached log writers. -public protocol LoggerProtocol { +public protocol LoggerProtocol: Sendable { /// Logs an informational message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func info(_ message: String, tag: String?) - + /// Logs an error message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func error(_ message: String, tag: String?) - + /// Logs a debug message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func debug(_ message: String, tag: String?) - + /// Logs a warning message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func warning(_ message: String, tag: String?) - + /// Logs a fault message, typically used for critical system-level failures. /// /// - Parameters: diff --git a/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift b/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift index 87fda9a..7c3418b 100644 --- a/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift +++ b/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift @@ -1,4 +1,4 @@ -public protocol PowerSyncBackendConnectorProtocol { +public protocol PowerSyncBackendConnectorProtocol: Sendable { /// /// Get credentials for PowerSync. /// @@ -28,7 +28,7 @@ public protocol PowerSyncBackendConnectorProtocol { /// 1. Creating credentials for connecting to the PowerSync service. /// 2. Applying local changes against the backend application server. /// -/// +@MainActor open class PowerSyncBackendConnector: PowerSyncBackendConnectorProtocol { public init() {} @@ -36,5 +36,5 @@ open class PowerSyncBackendConnector: PowerSyncBackendConnectorProtocol { return nil } - open func uploadData(database: PowerSyncDatabaseProtocol) async throws {} + open func uploadData(database _: PowerSyncDatabaseProtocol) async throws {} } diff --git a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift index 0edde56..4c3bd6b 100644 --- a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift +++ b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift @@ -12,7 +12,7 @@ public struct SyncClientConfiguration { /// /// - SeeAlso: `SyncRequestLoggerConfiguration` for configuration options public let requestLogger: SyncRequestLoggerConfiguration? - + /// Creates a new sync client configuration. /// - Parameter requestLogger: Optional network logger configuration public init(requestLogger: SyncRequestLoggerConfiguration? = nil) { @@ -26,10 +26,10 @@ public struct SyncClientConfiguration { public struct ConnectOptions { /// Defaults to 1 second public static let DefaultCrudThrottle: TimeInterval = 1 - + /// Defaults to 5 seconds public static let DefaultRetryDelay: TimeInterval = 5 - + /// TimeInterval (in seconds) between CRUD (Create, Read, Update, Delete) operations. /// /// Default is ``ConnectOptions/DefaultCrudThrottle``. @@ -54,14 +54,14 @@ public struct ConnectOptions { /// ] /// ``` public var params: JsonParam - + /// Uses a new sync client implemented in Rust instead of the one implemented in Kotlin. /// /// The new client is more efficient and will become the default in the future, but is still marked as experimental for now. /// We encourage interested users to try the new client. @_spi(PowerSyncExperimental) public var newClientImplementation: Bool - + /// Configuration for the sync client used for PowerSync requests. /// /// Provides options to customize network behavior including logging of HTTP @@ -73,7 +73,7 @@ public struct ConnectOptions { /// /// - SeeAlso: `SyncClientConfiguration` for available configuration options public var clientConfiguration: SyncClientConfiguration? - + /// Initializes a `ConnectOptions` instance with optional values. /// /// - Parameters: @@ -90,10 +90,10 @@ public struct ConnectOptions { self.crudThrottle = crudThrottle self.retryDelay = retryDelay self.params = params - self.newClientImplementation = false + newClientImplementation = false self.clientConfiguration = clientConfiguration } - + /// Initializes a ``ConnectOptions`` instance with optional values, including experimental options. @_spi(PowerSyncExperimental) public init( @@ -118,25 +118,25 @@ public struct ConnectOptions { /// Use `PowerSyncDatabase.connect` to connect to the PowerSync service, to keep the local database in sync with the remote database. /// /// All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded. -public protocol PowerSyncDatabaseProtocol: Queries { +public protocol PowerSyncDatabaseProtocol: Queries, Sendable { /// The current sync status. var currentStatus: SyncStatus { get } - + /// Logger used for PowerSync operations var logger: any LoggerProtocol { get } - + /// Wait for the first sync to occur func waitForFirstSync() async throws - + /// Replace the schema with a new version. This is for advanced use cases - typically the schema /// should just be specified once in the constructor. /// /// Cannot be used while connected - this should only be called before connect. func updateSchema(schema: SchemaProtocol) async throws - + /// Wait for the first (possibly partial) sync to occur that contains all buckets in the given priority. func waitForFirstSync(priority: Int32) async throws - + /// Connects to the PowerSync service and keeps the local database in sync with the remote database. /// /// The connection is automatically re-opened if it fails for any reason. @@ -172,7 +172,7 @@ public protocol PowerSyncDatabaseProtocol: Queries { connector: PowerSyncBackendConnector, options: ConnectOptions? ) async throws - + /// Get a batch of crud data to upload. /// /// Returns nil if there is no data to upload. @@ -188,7 +188,7 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// data by transaction. One batch may contain data from multiple transactions, /// and a single transaction may be split over multiple batches. func getCrudBatch(limit: Int32) async throws -> CrudBatch? - + /// Get the next recorded transaction to upload. /// /// Returns nil if there is no data to upload. @@ -201,15 +201,15 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// Unlike `getCrudBatch`, this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. func getNextCrudTransaction() async throws -> CrudTransaction? - + /// Convenience method to get the current version of PowerSync. func getPowerSyncVersion() async throws -> String - + /// Close the sync connection. /// /// Use `connect` to connect again. func disconnect() async throws - + /// Disconnect and clear the database. /// Use this when logging out. /// The database can still be queried after this is called, but the tables @@ -217,7 +217,7 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// /// - Parameter clearLocal: Set to false to preserve data in local-only tables. Defaults to `true`. func disconnectAndClear(clearLocal: Bool) async throws - + /// Close the database, releasing resources. /// Also disconnects any active connection. /// @@ -264,11 +264,11 @@ public extension PowerSyncDatabaseProtocol { ) ) } - + func disconnectAndClear() async throws { try await disconnectAndClear(clearLocal: true) } - + func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? { try await getCrudBatch( limit: limit diff --git a/Sources/PowerSync/Protocol/QueriesProtocol.swift b/Sources/PowerSync/Protocol/QueriesProtocol.swift index 1e94702..0f315f4 100644 --- a/Sources/PowerSync/Protocol/QueriesProtocol.swift +++ b/Sources/PowerSync/Protocol/QueriesProtocol.swift @@ -3,16 +3,16 @@ import Foundation public let DEFAULT_WATCH_THROTTLE: TimeInterval = 0.03 // 30ms -public struct WatchOptions { +public struct WatchOptions: Sendable { public var sql: String - public var parameters: [Any?] + public var parameters: [Sendable?] public var throttle: TimeInterval - public var mapper: (SqlCursor) throws -> RowType + public var mapper: @Sendable (SqlCursor) throws -> RowType public init( - sql: String, parameters: [Any?]? = [], + sql: String, parameters: [Sendable?]? = [], throttle: TimeInterval? = DEFAULT_WATCH_THROTTLE, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) { self.sql = sql self.parameters = parameters ?? [] @@ -25,37 +25,37 @@ public protocol Queries { /// Execute a write query (INSERT, UPDATE, DELETE) /// Using `RETURNING *` will result in an error. @discardableResult - func execute(sql: String, parameters: [Any?]?) async throws -> Int64 + func execute(sql: String, parameters: [Sendable?]?) async throws -> Int64 /// Execute a read-only (SELECT) query and return a single result. /// If there is no result, throws an IllegalArgumentException. /// See `getOptional` for queries where the result might be empty. func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType /// Execute a read-only (SELECT) query and return the results. func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] /// Execute a read-only (SELECT) query and return a single optional result. func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? /// Execute a read-only (SELECT) query every time the source tables are modified /// and return the results as an array in a Publisher. func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> func watch( @@ -66,7 +66,7 @@ public protocol Queries { /// /// In most cases, [writeTransaction] should be used instead. func writeLock( - callback: @escaping (any ConnectionContext) throws -> R + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R /// Takes a read lock, without starting a transaction. @@ -74,17 +74,17 @@ public protocol Queries { /// The lock only applies to a single connection, and multiple /// connections may hold read locks at the same time. func readLock( - callback: @escaping (any ConnectionContext) throws -> R + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R /// Execute a write transaction with the given callback func writeTransaction( - callback: @escaping (any Transaction) throws -> R + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R /// Execute a read transaction with the given callback func readTransaction( - callback: @escaping (any Transaction) throws -> R + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R } @@ -96,29 +96,29 @@ public extension Queries { func get( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType { return try await get(sql: sql, parameters: [], mapper: mapper) } func getAll( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] { return try await getAll(sql: sql, parameters: [], mapper: mapper) } func getOptional( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? { return try await getOptional(sql: sql, parameters: [], mapper: mapper) } func watch( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> { - return try watch(sql: sql, parameters: [Any?](), mapper: mapper) + return try watch(sql: sql, parameters: [Sendable?](), mapper: mapper) } } diff --git a/Sources/PowerSync/Protocol/db/ConnectionContext.swift b/Sources/PowerSync/Protocol/db/ConnectionContext.swift index 13dd939..4f904d1 100644 --- a/Sources/PowerSync/Protocol/db/ConnectionContext.swift +++ b/Sources/PowerSync/Protocol/db/ConnectionContext.swift @@ -1,71 +1,71 @@ import Foundation -public protocol ConnectionContext { +public protocol ConnectionContext: Sendable { /** Executes a SQL statement with optional parameters. - + - Parameters: - sql: The SQL statement to execute - parameters: Optional list of parameters for the SQL statement - + - Returns: A value indicating the number of rows affected - + - Throws: PowerSyncError if execution fails */ @discardableResult - func execute(sql: String, parameters: [Any?]?) throws -> Int64 - + func execute(sql: String, parameters: [Sendable?]?) throws -> Int64 + /** Retrieves an optional value from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps the SQL cursor result to the desired type - + - Returns: An optional value of type RowType or nil if no result - + - Throws: PowerSyncError if the query fails */ - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> RowType? - + /** Retrieves all matching rows from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps each SQL cursor result to the desired type - + - Returns: An array of RowType objects - + - Throws: PowerSyncError if the query fails */ - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> [RowType] - + /** Retrieves a single value from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps the SQL cursor result to the desired type - + - Returns: A value of type RowType - + - Throws: PowerSyncError if the query fails or no result is found */ - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> RowType } diff --git a/Sources/PowerSync/Protocol/sync/BucketPriority.swift b/Sources/PowerSync/Protocol/sync/BucketPriority.swift index 0ff8a1d..6b0f677 100644 --- a/Sources/PowerSync/Protocol/sync/BucketPriority.swift +++ b/Sources/PowerSync/Protocol/sync/BucketPriority.swift @@ -1,7 +1,7 @@ import Foundation /// Represents the priority of a bucket, used for sorting and managing operations based on priority levels. -public struct BucketPriority: Comparable { +public struct BucketPriority: Comparable, Sendable { /// The priority code associated with the bucket. Higher values indicate lower priority. public let priorityCode: Int32 diff --git a/Sources/PowerSync/Protocol/sync/SyncStatusData.swift b/Sources/PowerSync/Protocol/sync/SyncStatusData.swift index 66b836a..f4b5a4d 100644 --- a/Sources/PowerSync/Protocol/sync/SyncStatusData.swift +++ b/Sources/PowerSync/Protocol/sync/SyncStatusData.swift @@ -1,7 +1,7 @@ import Foundation /// A protocol representing the synchronization status of a system, providing various indicators and error states. -public protocol SyncStatusData { +public protocol SyncStatusData: Sendable { /// Indicates whether the system is currently connected. var connected: Bool { get } @@ -12,7 +12,7 @@ public protocol SyncStatusData { var downloading: Bool { get } /// Realtime progress information about downloaded operations during an active sync. - /// + /// /// For more information on what progress is reported, see ``SyncDownloadProgress``. /// This value will be non-null only if ``downloading`` is `true`. var downloadProgress: SyncDownloadProgress? { get } @@ -50,7 +50,7 @@ public protocol SyncStatusData { } /// A protocol extending `SyncStatusData` to include flow-based updates for synchronization status. -public protocol SyncStatus: SyncStatusData { +public protocol SyncStatus: SyncStatusData, Sendable { /// Provides a flow of synchronization status updates. /// - Returns: An `AsyncStream` that emits updates whenever the synchronization status changes. func asFlow() -> AsyncStream diff --git a/Sources/PowerSync/attachments/Attachment.swift b/Sources/PowerSync/attachments/Attachment.swift index 42ad8f5..a14a27f 100644 --- a/Sources/PowerSync/attachments/Attachment.swift +++ b/Sources/PowerSync/attachments/Attachment.swift @@ -1,5 +1,5 @@ /// Enum representing the state of an attachment -public enum AttachmentState: Int { +public enum AttachmentState: Int, Sendable { /// The attachment has been queued for download from the cloud storage case queuedDownload /// The attachment has been queued for upload to the cloud storage @@ -24,7 +24,7 @@ public enum AttachmentState: Int { } /// Struct representing an attachment -public struct Attachment { +public struct Attachment: Sendable { /// Unique identifier for the attachment public let id: String @@ -91,7 +91,7 @@ public struct Attachment { func with( filename: String? = nil, state: AttachmentState? = nil, - timestamp : Int = 0, + timestamp: Int = 0, hasSynced: Bool? = nil, localUri: String?? = .none, mediaType: String?? = .none, @@ -110,20 +110,19 @@ public struct Attachment { metaData: resolveOverride(metaData, current: self.metaData) ) } - + /// Resolves double optionals /// if a non nil value is provided: the override will be used /// if .some(nil) is provided: The value will be set to nil /// // if nil is provided: the current value will be preserved private func resolveOverride(_ override: T??, current: T?) -> T? { if let value = override { - return value // could be nil (explicit clear) or a value + return value // could be nil (explicit clear) or a value } else { - return current // not provided, use current + return current // not provided, use current } } - /// Constructs an `Attachment` from a `SqlCursor`. /// /// - Parameter cursor: The `SqlCursor` containing the attachment data. diff --git a/Sources/PowerSync/attachments/AttachmentContext.swift b/Sources/PowerSync/attachments/AttachmentContext.swift index 394d028..5adff58 100644 --- a/Sources/PowerSync/attachments/AttachmentContext.swift +++ b/Sources/PowerSync/attachments/AttachmentContext.swift @@ -1,66 +1,80 @@ import Foundation -/// Context which performs actions on the attachment records -open class AttachmentContext { - private let db: any PowerSyncDatabaseProtocol - private let tableName: String - private let logger: any LoggerProtocol - private let logTag = "AttachmentService" - private let maxArchivedCount: Int64 - - /// Table used for storing attachments in the attachment queue. - private var table: String { - return tableName - } - - /// Initializes a new `AttachmentContext`. - public init( - db: PowerSyncDatabaseProtocol, - tableName: String, - logger: any LoggerProtocol, - maxArchivedCount: Int64 - ) { - self.db = db - self.tableName = tableName - self.logger = logger - self.maxArchivedCount = maxArchivedCount - } +public protocol AttachmentContext: Sendable { + var db: any PowerSyncDatabaseProtocol { get } + var tableName: String { get } + var logger: any LoggerProtocol { get } + var maxArchivedCount: Int64 { get } /// Deletes the attachment from the attachment queue. - public func deleteAttachment(id: String) async throws { + func deleteAttachment(id: String) async throws + + /// Sets the state of the attachment to ignored (archived). + func ignoreAttachment(id: String) async throws + + /// Gets the attachment from the attachment queue using an ID. + func getAttachment(id: String) async throws -> Attachment? + + /// Saves the attachment to the attachment queue. + func saveAttachment(attachment: Attachment) async throws -> Attachment + + /// Saves multiple attachments to the attachment queue. + func saveAttachments(attachments: [Attachment]) async throws + + /// Gets all the IDs of attachments in the attachment queue. + func getAttachmentIds() async throws -> [String] + + /// Gets all attachments in the attachment queue. + func getAttachments() async throws -> [Attachment] + + /// Gets all active attachments that require an operation to be performed. + func getActiveAttachments() async throws -> [Attachment] + + /// Deletes attachments that have been archived. + /// + /// - Parameter callback: A callback invoked with the list of archived attachments before deletion. + /// - Returns: `true` if all items have been deleted, `false` if there may be more archived items remaining. + func deleteArchivedAttachments( + callback: @Sendable @escaping ([Attachment]) async throws -> Void + ) async throws -> Bool + + /// Clears the attachment queue. + /// + /// - Note: Currently only used for testing purposes. + func clearQueue() async throws +} + +public extension AttachmentContext { + func deleteAttachment(id: String) async throws { _ = try await db.execute( - sql: "DELETE FROM \(table) WHERE id = ?", + sql: "DELETE FROM \(tableName) WHERE id = ?", parameters: [id] ) } - /// Sets the state of the attachment to ignored (archived). - public func ignoreAttachment(id: String) async throws { + func ignoreAttachment(id: String) async throws { _ = try await db.execute( - sql: "UPDATE \(table) SET state = ? WHERE id = ?", + sql: "UPDATE \(tableName) SET state = ? WHERE id = ?", parameters: [AttachmentState.archived.rawValue, id] ) } - /// Gets the attachment from the attachment queue using an ID. - public func getAttachment(id: String) async throws -> Attachment? { + func getAttachment(id: String) async throws -> Attachment? { return try await db.getOptional( - sql: "SELECT * FROM \(table) WHERE id = ?", + sql: "SELECT * FROM \(tableName) WHERE id = ?", parameters: [id] ) { cursor in try Attachment.fromCursor(cursor) } } - /// Saves the attachment to the attachment queue. - public func saveAttachment(attachment: Attachment) async throws -> Attachment { + func saveAttachment(attachment: Attachment) async throws -> Attachment { return try await db.writeTransaction { ctx in try self.upsertAttachment(attachment, context: ctx) } } - /// Saves multiple attachments to the attachment queue. - public func saveAttachments(attachments: [Attachment]) async throws { + func saveAttachments(attachments: [Attachment]) async throws { if attachments.isEmpty { return } @@ -72,24 +86,22 @@ open class AttachmentContext { } } - /// Gets all the IDs of attachments in the attachment queue. - public func getAttachmentIds() async throws -> [String] { + func getAttachmentIds() async throws -> [String] { return try await db.getAll( - sql: "SELECT id FROM \(table) WHERE id IS NOT NULL", + sql: "SELECT id FROM \(tableName) WHERE id IS NOT NULL", parameters: [] ) { cursor in try cursor.getString(name: "id") } } - /// Gets all attachments in the attachment queue. - public func getAttachments() async throws -> [Attachment] { + func getAttachments() async throws -> [Attachment] { return try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE id IS NOT NULL ORDER BY @@ -101,14 +113,13 @@ open class AttachmentContext { } } - /// Gets all active attachments that require an operation to be performed. - public func getActiveAttachments() async throws -> [Attachment] { + func getActiveAttachments() async throws -> [Attachment] { return try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE state = ? OR state = ? @@ -126,25 +137,18 @@ open class AttachmentContext { } } - /// Clears the attachment queue. - /// - /// - Note: Currently only used for testing purposes. - public func clearQueue() async throws { - _ = try await db.execute("DELETE FROM \(table)") + func clearQueue() async throws { + _ = try await db.execute("DELETE FROM \(tableName)") } - /// Deletes attachments that have been archived. - /// - /// - Parameter callback: A callback invoked with the list of archived attachments before deletion. - /// - Returns: `true` if all items have been deleted, `false` if there may be more archived items remaining. - public func deleteArchivedAttachments(callback: @escaping ([Attachment]) async throws -> Void) async throws -> Bool { + func deleteArchivedAttachments(callback: @Sendable @escaping ([Attachment]) async throws -> Void) async throws -> Bool { let limit = 1000 let attachments = try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE state = ? ORDER BY @@ -166,7 +170,7 @@ open class AttachmentContext { let idsString = String(data: ids, encoding: .utf8)! _ = try await db.execute( - sql: "DELETE FROM \(table) WHERE id IN (SELECT value FROM json_each(?));", + sql: "DELETE FROM \(tableName) WHERE id IN (SELECT value FROM json_each(?));", parameters: [idsString] ) @@ -179,7 +183,7 @@ open class AttachmentContext { /// - attachment: The attachment to upsert. /// - context: The database transaction context. /// - Returns: The original attachment. - public func upsertAttachment( + func upsertAttachment( _ attachment: Attachment, context: ConnectionContext ) throws -> Attachment { @@ -198,7 +202,7 @@ open class AttachmentContext { try context.execute( sql: """ INSERT OR REPLACE INTO - \(table) (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) + \(tableName) (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, @@ -218,3 +222,23 @@ open class AttachmentContext { return attachment } } + +/// Context which performs actions on the attachment records +public actor AttachmentContextImpl: AttachmentContext { + public let db: any PowerSyncDatabaseProtocol + public let tableName: String + public let logger: any LoggerProtocol + public let maxArchivedCount: Int64 + + public init( + db: PowerSyncDatabaseProtocol, + tableName: String, + logger: any LoggerProtocol, + maxArchivedCount: Int64 + ) { + self.db = db + self.tableName = tableName + self.logger = logger + self.maxArchivedCount = maxArchivedCount + } +} diff --git a/Sources/PowerSync/attachments/AttachmentQueue.swift b/Sources/PowerSync/attachments/AttachmentQueue.swift index 857d998..e2efc3b 100644 --- a/Sources/PowerSync/attachments/AttachmentQueue.swift +++ b/Sources/PowerSync/attachments/AttachmentQueue.swift @@ -1,227 +1,152 @@ import Combine import Foundation -/// Class used to implement the attachment queue -/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments. -open class AttachmentQueue { - /// Default name of the attachments table - public static let defaultTableName = "attachments" - - let logTag = "AttachmentQueue" - - /// PowerSync database client - public let db: PowerSyncDatabaseProtocol - - /// Remote storage adapter - public let remoteStorage: RemoteStorageAdapter - - /// Directory name for attachments - private let attachmentsDirectory: String +/// Default name of the attachments table +public let defaultAttachmentsTableName = "attachments" - /// Closure which creates a Stream of ``WatchedAttachmentItem`` - private let watchAttachments: () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error> - - /// Local file system adapter - public let localStorage: LocalStorageAdapter - - /// Attachments table name in SQLite - private let attachmentsQueueTableName: String +public protocol AttachmentQueueProtocol: Sendable { + var db: any PowerSyncDatabaseProtocol { get } + var attachmentsService: any AttachmentService { get } + var localStorage: any LocalStorageAdapter { get } + var syncingService: any SyncingService { get } + var downloadAttachments: Bool { get } - /// Optional sync error handler - private let errorHandler: SyncErrorHandler? - - /// Interval between periodic syncs - private let syncInterval: TimeInterval + /// Starts the attachment sync process + func startSync() async throws - /// Limit on number of archived attachments - private let archivedCacheLimit: Int64 + /// Stops active syncing tasks. Syncing can be resumed with ``startSync()`` + func stopSyncing() async throws - /// Duration for throttling sync operations - private let syncThrottleDuration: TimeInterval + /// Closes the attachment queue and cancels all sync tasks + func close() async throws - /// Subdirectories to be created in attachments directory - private let subdirectories: [String]? + /// Resolves the filename for a new attachment + /// - Parameters: + /// - attachmentId: Attachment ID + /// - fileExtension: File extension + /// - Returns: Resolved filename + func resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String? + ) async -> String - /// Whether to allow downloading of attachments - private let downloadAttachments: Bool + /// Processes watched attachment items and updates sync state + /// - Parameter items: List of watched attachment items + func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws - /** - * Logging interface used for all log operations - */ - public let logger: any LoggerProtocol + /// Saves a new file and schedules it for upload + /// - Parameters: + /// - data: File data + /// - mediaType: MIME type + /// - fileExtension: File extension + /// - updateHook: Hook to assign attachment relationships in the same transaction + /// - Returns: The created attachment + @discardableResult + func saveFile( + data: Data, + mediaType: String, + fileExtension: String?, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment - /// Attachment service for interacting with attachment records - public let attachmentsService: AttachmentService + /// Queues a file for deletion + /// - Parameters: + /// - attachmentId: ID of the attachment to delete + /// - updateHook: Hook to perform additional DB updates in the same transaction + @discardableResult + func deleteFile( + attachmentId: String, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment - private var syncStatusTask: Task? - private var cancellables = Set() + /// Returns the local URI where a file is stored based on filename + /// - Parameter filename: The name of the file + /// - Returns: The file path + @Sendable func getLocalUri(_ filename: String) async -> String - /// Indicates whether the queue has been closed - public private(set) var closed: Bool = false + /// Removes all archived items + func expireCache() async throws - /// Syncing service instance - private(set) lazy var syncingService: SyncingService = .init( - remoteStorage: self.remoteStorage, - localStorage: self.localStorage, - attachmentsService: self.attachmentsService, - logger: self.logger, - getLocalUri: { [weak self] filename in - guard let self = self else { return filename } - return self.getLocalUri(filename) - }, - errorHandler: self.errorHandler, - syncThrottle: self.syncThrottleDuration - ) - - private let lock: LockActor + /// Clears the attachment queue and deletes all attachment files + func clearQueue() async throws +} - /// Initializes the attachment queue - /// - Parameters match the stored properties - public init( - db: PowerSyncDatabaseProtocol, - remoteStorage: RemoteStorageAdapter, - attachmentsDirectory: String, - watchAttachments: @escaping () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error>, - localStorage: LocalStorageAdapter = FileManagerStorageAdapter(), - attachmentsQueueTableName: String = defaultTableName, - errorHandler: SyncErrorHandler? = nil, - syncInterval: TimeInterval = 30.0, - archivedCacheLimit: Int64 = 100, - syncThrottleDuration: TimeInterval = 1.0, - subdirectories: [String]? = nil, - downloadAttachments: Bool = true, - logger: (any LoggerProtocol)? = nil - ) { - self.db = db - self.remoteStorage = remoteStorage - self.attachmentsDirectory = attachmentsDirectory - self.watchAttachments = watchAttachments - self.localStorage = localStorage - self.attachmentsQueueTableName = attachmentsQueueTableName - self.errorHandler = errorHandler - self.syncInterval = syncInterval - self.archivedCacheLimit = archivedCacheLimit - self.syncThrottleDuration = syncThrottleDuration - self.subdirectories = subdirectories - self.downloadAttachments = downloadAttachments - self.logger = logger ?? db.logger - self.attachmentsService = AttachmentService( - db: db, - tableName: attachmentsQueueTableName, - logger: self.logger, - maxArchivedCount: archivedCacheLimit - ) - self.lock = LockActor() +public extension AttachmentQueueProtocol { + func resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String? + ) -> String { + return "\(attachmentId).\(fileExtension ?? "attachment")" } - /// Starts the attachment sync process - public func startSync() async throws { - try await lock.withLock { - try guardClosed() - - // Stop any active syncing before starting new Tasks - try await _stopSyncing() - - // Ensure the directory where attachments are downloaded exists - try await localStorage.makeDir(path: attachmentsDirectory) + @discardableResult + func saveFile( + data: Data, + mediaType: String, + fileExtension: String?, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + let id = try await db.get(sql: "SELECT uuid() as id", parameters: [], mapper: { cursor in + try cursor.getString(name: "id") + }) - if let subdirectories = subdirectories { - for subdirectory in subdirectories { - let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path - try await localStorage.makeDir(path: path) - } - } + let filename = await resolveNewAttachmentFilename(attachmentId: id, fileExtension: fileExtension) + let localUri = await getLocalUri(filename) - // Verify initial state - try await attachmentsService.withContext { context in - try await self.verifyAttachments(context: context) - } + // Write the file to the filesystem + let fileSize = try await localStorage.saveFile(filePath: localUri, data: data) - try await syncingService.startSync(period: syncInterval) - - syncStatusTask = Task { - do { - try await withThrowingTaskGroup(of: Void.self) { group in - // Add connectivity monitoring task - group.addTask { - var previousConnected = self.db.currentStatus.connected - for await status in self.db.currentStatus.asFlow() { - try Task.checkCancellation() - if !previousConnected && status.connected { - try await self.syncingService.triggerSync() - } - previousConnected = status.connected - } - } + return try await attachmentsService.withContext { context in + // Start a write transaction. The attachment record and relevant local relationship + // assignment should happen in the same transaction. + try await db.writeTransaction { tx in + let attachment = Attachment( + id: id, + filename: filename, + state: AttachmentState.queuedUpload, + localUri: localUri, + mediaType: mediaType, + size: fileSize + ) - // Add attachment watching task - group.addTask { - for try await items in try self.watchAttachments() { - try await self.processWatchedAttachments(items: items) - } - } + // Allow consumers to set relationships to this attachment id + try updateHook(tx, attachment) - // Wait for any task to complete (which should only happen on cancellation) - try await group.next() - } - } catch { - if !(error is CancellationError) { - logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag) - } - } + return try context.upsertAttachment(attachment, context: tx) } } } - /// Stops active syncing tasks. Syncing can be resumed with ``startSync()`` - public func stopSyncing() async throws { - try await lock.withLock { - try await _stopSyncing() - } - } - - private func _stopSyncing() async throws { - try guardClosed() - - syncStatusTask?.cancel() - // Wait for the task to actually complete - do { - _ = try await syncStatusTask?.value - } catch { - // Task completed with error (likely cancellation) - // This is okay - } - syncStatusTask = nil + @discardableResult + func deleteFile( + attachmentId: String, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + try await attachmentsService.withContext { context in + guard let attachment = try await context.getAttachment(id: attachmentId) else { + throw PowerSyncAttachmentError.notFound("Attachment record with id \(attachmentId) was not found.") + } - try await syncingService.stopSync() - } + let result = try await self.db.writeTransaction { transaction in + try updateHook(transaction, attachment) - /// Closes the attachment queue and cancels all sync tasks - public func close() async throws { - try await lock.withLock { - try guardClosed() + let updatedAttachment = Attachment( + id: attachment.id, + filename: attachment.filename, + state: AttachmentState.queuedDelete, + hasSynced: attachment.hasSynced, + localUri: attachment.localUri, + mediaType: attachment.mediaType, + size: attachment.size + ) - try await _stopSyncing() - try await syncingService.close() - closed = true + return try context.upsertAttachment(updatedAttachment, context: transaction) + } + return result } } - /// Resolves the filename for a new attachment - /// - Parameters: - /// - attachmentId: Attachment ID - /// - fileExtension: File extension - /// - Returns: Resolved filename - public func resolveNewAttachmentFilename( - attachmentId: String, - fileExtension: String? - ) -> String { - return "\(attachmentId).\(fileExtension ?? "attachment")" - } - - /// Processes watched attachment items and updates sync state - /// - Parameter items: List of watched attachment items - public func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws { + func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws { // Need to get all the attachments which are tracked in the DB. // We might need to restore an archived attachment. try await attachmentsService.withContext { context in @@ -231,13 +156,13 @@ open class AttachmentQueue { for item in items { guard let existingQueueItem = currentAttachments.first(where: { $0.id == item.id }) else { // Item is not present in the queue - - if !self.downloadAttachments { + + if !downloadAttachments { continue } // This item should be added to the queue - let filename = self.resolveNewAttachmentFilename( + let filename = await resolveNewAttachmentFilename( attachmentId: item.id, fileExtension: item.fileExtension ) @@ -302,101 +227,218 @@ open class AttachmentQueue { } } - /// Saves a new file and schedules it for upload - /// - Parameters: - /// - data: File data - /// - mediaType: MIME type - /// - fileExtension: File extension - /// - updateHook: Hook to assign attachment relationships in the same transaction - /// - Returns: The created attachment - @discardableResult - public func saveFile( - data: Data, - mediaType: String, - fileExtension: String?, - updateHook: @escaping (ConnectionContext, Attachment) throws -> Void - ) async throws -> Attachment { - let id = try await db.get(sql: "SELECT uuid() as id", parameters: [], mapper: { cursor in - try cursor.getString(name: "id") - }) + func expireCache() async throws { + try await attachmentsService.withContext { context in + var done = false + repeat { + done = try await self.syncingService.deleteArchivedAttachments(context) + } while !done + } + } +} - let filename = resolveNewAttachmentFilename(attachmentId: id, fileExtension: fileExtension) - let localUri = getLocalUri(filename) +/// Class used to implement the attachment queue +/// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments. +public actor AttachmentQueue: AttachmentQueueProtocol { + let logTag = "AttachmentQueue" - // Write the file to the filesystem - let fileSize = try await localStorage.saveFile(filePath: localUri, data: data) + /// PowerSync database client + public let db: PowerSyncDatabaseProtocol - return try await attachmentsService.withContext { context in - // Start a write transaction. The attachment record and relevant local relationship - // assignment should happen in the same transaction. - try await self.db.writeTransaction { tx in - let attachment = Attachment( - id: id, - filename: filename, - state: AttachmentState.queuedUpload, - localUri: localUri, - mediaType: mediaType, - size: fileSize - ) + /// Remote storage adapter + public let remoteStorage: RemoteStorageAdapter - // Allow consumers to set relationships to this attachment id - try updateHook(tx, attachment) + /// Directory name for attachments + private let attachmentsDirectory: String - return try context.upsertAttachment(attachment, context: tx) - } + /// Closure which creates a Stream of ``WatchedAttachmentItem`` + private let watchAttachments: @Sendable () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error> + + /// Local file system adapter + public let localStorage: LocalStorageAdapter + + /// Attachments table name in SQLite + private let attachmentsQueueTableName: String + + /// Optional sync error handler + private let errorHandler: SyncErrorHandler? + + /// Interval between periodic syncs + private let syncInterval: TimeInterval + + /// Limit on number of archived attachments + private let archivedCacheLimit: Int64 + + /// Duration for throttling sync operations + private let syncThrottleDuration: TimeInterval + + /// Subdirectories to be created in attachments directory + private let subdirectories: [String]? + + /// Whether to allow downloading of attachments + public let downloadAttachments: Bool + + /** + * Logging interface used for all log operations + */ + public let logger: any LoggerProtocol + + /// Attachment service for interacting with attachment records + public let attachmentsService: AttachmentService + + private var syncStatusTask: Task? + private var cancellables = Set() + + /// Indicates whether the queue has been closed + public private(set) var closed: Bool = false + + /// Syncing service instance + public let syncingService: SyncingService + + private let _getLocalUri: @Sendable (_ filename: String) async -> String + + /// Initializes the attachment queue + /// - Parameters match the stored properties + public init( + db: PowerSyncDatabaseProtocol, + remoteStorage: RemoteStorageAdapter, + attachmentsDirectory: String, + watchAttachments: @Sendable @escaping () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error>, + localStorage: LocalStorageAdapter = FileManagerStorageAdapter(), + attachmentsQueueTableName: String = defaultAttachmentsTableName, + errorHandler: SyncErrorHandler? = nil, + syncInterval: TimeInterval = 30.0, + archivedCacheLimit: Int64 = 100, + syncThrottleDuration: TimeInterval = 1.0, + subdirectories: [String]? = nil, + downloadAttachments: Bool = true, + logger: (any LoggerProtocol)? = nil, + getLocalUri: (@Sendable (_ filename: String) async -> String)? = nil + ) { + self.db = db + self.remoteStorage = remoteStorage + self.attachmentsDirectory = attachmentsDirectory + self.watchAttachments = watchAttachments + self.localStorage = localStorage + self.attachmentsQueueTableName = attachmentsQueueTableName + self.errorHandler = errorHandler + self.syncInterval = syncInterval + self.archivedCacheLimit = archivedCacheLimit + self.syncThrottleDuration = syncThrottleDuration + self.subdirectories = subdirectories + self.downloadAttachments = downloadAttachments + self.logger = logger ?? db.logger + _getLocalUri = getLocalUri ?? { filename in + URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(filename).path } + attachmentsService = AttachmentServiceImpl( + db: db, + tableName: attachmentsQueueTableName, + logger: self.logger, + maxArchivedCount: archivedCacheLimit + ) + syncingService = SyncingServiceImpl( + remoteStorage: self.remoteStorage, + localStorage: self.localStorage, + attachmentsService: attachmentsService, + logger: self.logger, + getLocalUri: _getLocalUri, + errorHandler: self.errorHandler, + syncThrottle: self.syncThrottleDuration + ) } - /// Queues a file for deletion - /// - Parameters: - /// - attachmentId: ID of the attachment to delete - /// - updateHook: Hook to perform additional DB updates in the same transaction - @discardableResult - public func deleteFile( - attachmentId: String, - updateHook: @escaping (ConnectionContext, Attachment) throws -> Void - ) async throws -> Attachment { - try await attachmentsService.withContext { context in - guard let attachment = try await context.getAttachment(id: attachmentId) else { - throw PowerSyncAttachmentError.notFound("Attachment record with id \(attachmentId) was not found.") - } + public func getLocalUri(_ filename: String) async -> String { + return await _getLocalUri(filename) + } - let result = try await self.db.writeTransaction { tx in - try updateHook(tx, attachment) + public func startSync() async throws { + try guardClosed() - let updatedAttachment = Attachment( - id: attachment.id, - filename: attachment.filename, - state: AttachmentState.queuedDelete, - hasSynced: attachment.hasSynced, - localUri: attachment.localUri, - mediaType: attachment.mediaType, - size: attachment.size - ) + // Stop any active syncing before starting new Tasks + try await _stopSyncing() - return try context.upsertAttachment(updatedAttachment, context: tx) + // Ensure the directory where attachments are downloaded exists + try await localStorage.makeDir(path: attachmentsDirectory) + + if let subdirectories = subdirectories { + for subdirectory in subdirectories { + let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path + try await localStorage.makeDir(path: path) } - return result } + + // Verify initial state + try await attachmentsService.withContext { context in + try await self.verifyAttachments(context: context) + } + + try await syncingService.startSync(period: syncInterval) + _startSyncTask() } - /// Returns the local URI where a file is stored based on filename - /// - Parameter filename: The name of the file - /// - Returns: The file path - public func getLocalUri(_ filename: String) -> String { - return URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(filename).path + public func stopSyncing() async throws { + try await _stopSyncing() } - /// Removes all archived items - public func expireCache() async throws { - try await attachmentsService.withContext { context in - var done = false - repeat { - done = try await self.syncingService.deleteArchivedAttachments(context) - } while !done + private func _startSyncTask() { + syncStatusTask = Task { + do { + try await withThrowingTaskGroup(of: Void.self) { group in + // Add connectivity monitoring task + group.addTask { + var previousConnected = self.db.currentStatus.connected + for await status in self.db.currentStatus.asFlow() { + try Task.checkCancellation() + if !previousConnected, status.connected { + try await self.syncingService.triggerSync() + } + previousConnected = status.connected + } + } + + // Add attachment watching task + group.addTask { + for try await items in try self.watchAttachments() { + try await self.processWatchedAttachments(items: items) + } + } + + // Wait for any task to complete (which should only happen on cancellation) + try await group.next() + } + } catch { + if !(error is CancellationError) { + logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag) + } + } } } + private func _stopSyncing() async throws { + try guardClosed() + + syncStatusTask?.cancel() + // Wait for the task to actually complete + do { + _ = try await syncStatusTask?.value + } catch { + // Task completed with error (likely cancellation) + // This is okay + } + syncStatusTask = nil + + try await syncingService.stopSync() + } + + public func close() async throws { + try guardClosed() + + try await _stopSyncing() + try await syncingService.close() + closed = true + } + /// Clears the attachment queue and deletes all attachment files public func clearQueue() async throws { try await attachmentsService.withContext { context in @@ -421,7 +463,7 @@ open class AttachmentQueue { // The file exists, this is correct continue } - + if attachment.state == AttachmentState.queuedUpload { // The file must have been removed from the local storage before upload was completed updates.append(attachment.with( diff --git a/Sources/PowerSync/attachments/AttachmentService.swift b/Sources/PowerSync/attachments/AttachmentService.swift index 3690439..3bb0c62 100644 --- a/Sources/PowerSync/attachments/AttachmentService.swift +++ b/Sources/PowerSync/attachments/AttachmentService.swift @@ -1,14 +1,23 @@ import Foundation +public protocol AttachmentService: Sendable { + /// Watches for changes to the attachments table. + func watchActiveAttachments() async throws -> AsyncThrowingStream<[String], Error> + + /// Executes a callback with exclusive access to the attachment context. + func withContext( + callback: @Sendable @escaping (AttachmentContext) async throws -> R + ) async throws -> R +} + /// Service which manages attachment records. -open class AttachmentService { +actor AttachmentServiceImpl: AttachmentService { private let db: any PowerSyncDatabaseProtocol private let tableName: String private let logger: any LoggerProtocol private let logTag = "AttachmentService" private let context: AttachmentContext - private let lock: LockActor /// Initializes the attachment service with the specified database, table name, logger, and max archived count. public init( @@ -20,16 +29,14 @@ open class AttachmentService { self.db = db self.tableName = tableName self.logger = logger - context = AttachmentContext( + context = AttachmentContextImpl( db: db, tableName: tableName, logger: logger, maxArchivedCount: maxArchivedCount ) - lock = LockActor() } - /// Watches for changes to the attachments table. public func watchActiveAttachments() throws -> AsyncThrowingStream<[String], Error> { logger.info("Watching attachments...", tag: logTag) @@ -49,17 +56,14 @@ open class AttachmentService { parameters: [ AttachmentState.queuedUpload.rawValue, AttachmentState.queuedDownload.rawValue, - AttachmentState.queuedDelete.rawValue, + AttachmentState.queuedDelete.rawValue ] ) { cursor in try cursor.getString(name: "id") } } - /// Executes a callback with exclusive access to the attachment context. - public func withContext(callback: @Sendable @escaping (AttachmentContext) async throws -> R) async throws -> R { - try await lock.withLock { - try await callback(context) - } + public func withContext(callback: @Sendable @escaping (AttachmentContext) async throws -> R) async throws -> R { + try await callback(context) } } diff --git a/Sources/PowerSync/attachments/FileManagerLocalStorage.swift b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift index cc3915e..65e46e1 100644 --- a/Sources/PowerSync/attachments/FileManagerLocalStorage.swift +++ b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift @@ -3,7 +3,7 @@ import Foundation /** * Implementation of LocalStorageAdapter using FileManager */ -public class FileManagerStorageAdapter: LocalStorageAdapter { +public actor FileManagerStorageAdapter: LocalStorageAdapter { private let fileManager = FileManager.default public init() {} diff --git a/Sources/PowerSync/attachments/LocalStorage.swift b/Sources/PowerSync/attachments/LocalStorage.swift index 071e522..a2187c6 100644 --- a/Sources/PowerSync/attachments/LocalStorage.swift +++ b/Sources/PowerSync/attachments/LocalStorage.swift @@ -4,7 +4,7 @@ import Foundation public enum PowerSyncAttachmentError: Error { /// A general error with an associated message case generalError(String) - + /// Indicates no matching attachment record could be found case notFound(String) @@ -16,13 +16,13 @@ public enum PowerSyncAttachmentError: Error { /// The given file or directory path was invalid case invalidPath(String) - + /// The attachments queue or sub services have been closed case closed(String) } /// Protocol defining an adapter interface for local file storage -public protocol LocalStorageAdapter { +public protocol LocalStorageAdapter: Sendable { /// Saves data to a file at the specified path. /// /// - Parameters: diff --git a/Sources/PowerSync/attachments/LockActor.swift b/Sources/PowerSync/attachments/LockActor.swift deleted file mode 100644 index 94f41db..0000000 --- a/Sources/PowerSync/attachments/LockActor.swift +++ /dev/null @@ -1,48 +0,0 @@ -import Foundation - -actor LockActor { - private var isLocked = false - private var waiters: [(id: UUID, continuation: CheckedContinuation)] = [] - - func withLock(_ operation: @Sendable () async throws -> T) async throws -> T { - try await waitUntilUnlocked() - - isLocked = true - defer { unlockNext() } - - try Task.checkCancellation() // cancellation check after acquiring lock - return try await operation() - } - - private func waitUntilUnlocked() async throws { - if !isLocked { return } - - let id = UUID() - - // Use withTaskCancellationHandler to manage cancellation - await withTaskCancellationHandler { - await withCheckedContinuation { continuation in - waiters.append((id: id, continuation: continuation)) - } - } onCancel: { - // Cancellation logic: remove the waiter when cancelled - Task { - await self.removeWaiter(id: id) - } - } - } - - private func removeWaiter(id: UUID) async { - // Safely remove the waiter from the actor's waiters list - waiters.removeAll { $0.id == id } - } - - private func unlockNext() { - if let next = waiters.first { - waiters.removeFirst() - next.continuation.resume() - } else { - isLocked = false - } - } -} diff --git a/Sources/PowerSync/attachments/RemoteStorage.swift b/Sources/PowerSync/attachments/RemoteStorage.swift index bd94a42..8779655 100644 --- a/Sources/PowerSync/attachments/RemoteStorage.swift +++ b/Sources/PowerSync/attachments/RemoteStorage.swift @@ -1,7 +1,7 @@ import Foundation /// Adapter for interfacing with remote attachment storage. -public protocol RemoteStorageAdapter { +public protocol RemoteStorageAdapter: Sendable { /// Uploads a file to remote storage. /// /// - Parameters: diff --git a/Sources/PowerSync/attachments/SyncErrorHandler.swift b/Sources/PowerSync/attachments/SyncErrorHandler.swift index b9a2b55..e7feb31 100644 --- a/Sources/PowerSync/attachments/SyncErrorHandler.swift +++ b/Sources/PowerSync/attachments/SyncErrorHandler.swift @@ -6,7 +6,7 @@ import Foundation /// operations (download, upload, delete) should be retried upon failure. /// /// If an operation fails and should not be retried, the attachment record is archived. -public protocol SyncErrorHandler { +public protocol SyncErrorHandler: Sendable { /// Handles a download error for a specific attachment. /// /// - Parameters: @@ -44,7 +44,7 @@ public protocol SyncErrorHandler { /// Default implementation of `SyncErrorHandler`. /// /// By default, all operations return `false`, indicating no retry. -public class DefaultSyncErrorHandler: SyncErrorHandler { +public final class DefaultSyncErrorHandler: SyncErrorHandler, Sendable { public init() {} public func onDownloadError(attachment _: Attachment, error _: Error) async -> Bool { diff --git a/Sources/PowerSync/attachments/SyncingService.swift b/Sources/PowerSync/attachments/SyncingService.swift index 3c3a551..28cd209 100644 --- a/Sources/PowerSync/attachments/SyncingService.swift +++ b/Sources/PowerSync/attachments/SyncingService.swift @@ -6,18 +6,37 @@ import Foundation /// This watches for changes to active attachments and performs queued /// download, upload, and delete operations. Syncs can be triggered manually, /// periodically, or based on database changes. -open class SyncingService { +public protocol SyncingService: Sendable { + /// Starts periodic syncing of attachments. + /// + /// - Parameter period: The time interval in seconds between each sync. + func startSync(period: TimeInterval) async throws + + func stopSync() async throws + + /// Cleans up internal resources and cancels any ongoing syncing. + func close() async throws + + /// Triggers a sync operation. Can be called manually. + func triggerSync() async throws + + /// Deletes attachments marked as archived that exist on local storage. + /// + /// - Returns: `true` if any deletions occurred, `false` otherwise. + func deleteArchivedAttachments(_ context: AttachmentContext) async throws -> Bool +} + +actor SyncingServiceImpl: SyncingService { private let remoteStorage: RemoteStorageAdapter private let localStorage: LocalStorageAdapter private let attachmentsService: AttachmentService - private let getLocalUri: (String) async -> String + private let getLocalUri: @Sendable (String) async -> String private let errorHandler: SyncErrorHandler? private let syncThrottle: TimeInterval private var cancellables = Set() private let syncTriggerSubject = PassthroughSubject() private var periodicSyncTimer: Timer? private var syncTask: Task? - private let lock: LockActor let logger: any LoggerProtocol let logTag = "AttachmentSync" @@ -32,12 +51,12 @@ open class SyncingService { /// - getLocalUri: Callback used to resolve a local path for saving downloaded attachments. /// - errorHandler: Optional handler to determine if sync errors should be retried. /// - syncThrottle: Throttle interval to control frequency of sync triggers. - init( + public init( remoteStorage: RemoteStorageAdapter, localStorage: LocalStorageAdapter, attachmentsService: AttachmentService, logger: any LoggerProtocol, - getLocalUri: @escaping (String) async -> String, + getLocalUri: @Sendable @escaping (String) async -> String, errorHandler: SyncErrorHandler? = nil, syncThrottle: TimeInterval = 5.0 ) { @@ -48,29 +67,24 @@ open class SyncingService { self.errorHandler = errorHandler self.syncThrottle = syncThrottle self.logger = logger - self.closed = false - self.lock = LockActor() + closed = false } /// Starts periodic syncing of attachments. /// /// - Parameter period: The time interval in seconds between each sync. public func startSync(period: TimeInterval) async throws { - try await lock.withLock { - try guardClosed() + try guardClosed() - // Close any active sync operations - try await _stopSync() + // Close any active sync operations + try await _stopSync() - setupSyncFlow(period: period) - } + setupSyncFlow(period: period) } public func stopSync() async throws { - try await lock.withLock { - try guardClosed() - try await _stopSync() - } + try guardClosed() + try await _stopSync() } private func _stopSync() async throws { @@ -93,12 +107,10 @@ open class SyncingService { /// Cleans up internal resources and cancels any ongoing syncing. func close() async throws { - try await lock.withLock { - try guardClosed() + try guardClosed() - try await _stopSync() - closed = true - } + try await _stopSync() + closed = true } /// Triggers a sync operation. Can be called manually. @@ -137,7 +149,7 @@ open class SyncingService { .sink { _ in continuation.yield(()) } continuation.onTermination = { _ in - cancellable.cancel() + continuation.finish() } self.cancellables.insert(cancellable) } @@ -150,7 +162,7 @@ open class SyncingService { try await withThrowingTaskGroup(of: Void.self) { group in // Handle sync trigger events group.addTask { - let syncTrigger = self.createSyncTrigger() + let syncTrigger = await self.createSyncTrigger() for await _ in syncTrigger { try Task.checkCancellation() @@ -165,9 +177,9 @@ open class SyncingService { // Watch attachment records. Trigger a sync on change group.addTask { - for try await _ in try self.attachmentsService.watchActiveAttachments() { + for try await _ in try await self.attachmentsService.watchActiveAttachments() { try Task.checkCancellation() - self.syncTriggerSubject.send(()) + await self._triggerSyncSubject() } } @@ -178,7 +190,7 @@ open class SyncingService { try await self.triggerSync() } } - + // Wait for any task to complete try await group.next() } @@ -270,6 +282,11 @@ open class SyncingService { } } + /// Small actor isolated method to trigger the sync subject + private func _triggerSyncSubject() { + syncTriggerSubject.send(()) + } + /// Deletes an attachment from remote and local storage. /// /// - Parameter attachment: The attachment to delete. diff --git a/Sources/PowerSync/attachments/WatchedAttachmentItem.swift b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift index b4cddc7..ff415dd 100644 --- a/Sources/PowerSync/attachments/WatchedAttachmentItem.swift +++ b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift @@ -4,7 +4,7 @@ import Foundation /// A watched attachment record item. /// This is usually returned from watching all relevant attachment IDs. -public struct WatchedAttachmentItem { +public struct WatchedAttachmentItem: Sendable { /// Id for the attachment record public let id: String diff --git a/Sources/StructuredQueries/PowerSyncQueryDecoder.swift b/Sources/StructuredQueries/PowerSyncQueryDecoder.swift new file mode 100644 index 0000000..9183a00 --- /dev/null +++ b/Sources/StructuredQueries/PowerSyncQueryDecoder.swift @@ -0,0 +1,99 @@ +import Foundation +import PowerSync +import StructuredQueries + +@usableFromInline +struct PowerSyncQueryDecoder: QueryDecoder { + /// The structured queries library will decode the typed struct's columns in + /// the same order which it compiled the SELECT statement's column list. + /// The library will call the correct typed `decode` method, we just need to keep + /// track of which column index we're on. + @usableFromInline + var currentIndex: Int = 0 + + /// The PowerSync `SqlCursor` provided in the `mapper` closure. + @usableFromInline + var cursor: SqlCursor + + @usableFromInline + init(cursor: SqlCursor) { + self.cursor = cursor + } + + /// Decodes a single tuple of the given type starting from the current column. + /// + /// - Parameter columnTypes: The types to decode as. + /// - Returns: A tuple of the requested types. + @inlinable + @inline(__always) + public mutating func decodeColumns( + _: (repeat each T).Type + ) throws -> (repeat (each T).QueryOutput) { + try (repeat (each T)(decoder: &self).queryOutput) + } + + @inlinable + mutating func decode(_: [UInt8].Type) throws -> [UInt8]? { + defer { currentIndex += 1 } + // TODO: blob support + return nil + } + + @inlinable + mutating func decode(_: Bool.Type) throws -> Bool? { + try decode(Int64.self).map { $0 != 0 } + } + + @inlinable + mutating func decode(_: Date.Type) throws -> Date? { + try decode(String.self).map { + let formatter = ISO8601DateFormatter() + guard let date = formatter.date(from: $0) else { + throw InvalidDate() + } + return date + } + } + + @inlinable + mutating func decode(_: Double.Type) throws -> Double? { + defer { currentIndex += 1 } + return cursor.getDoubleOptional(index: currentIndex) + } + + @inlinable + mutating func decode(_: Int.Type) throws -> Int? { + try decode(Int64.self).map(Int.init) + } + + @inlinable + mutating func decode(_: Int64.Type) throws -> Int64? { + defer { currentIndex += 1 } + return cursor.getInt64Optional(index: currentIndex) + } + + @inlinable + mutating func decode(_: String.Type) throws -> String? { + defer { currentIndex += 1 } + return cursor.getStringOptional(index: currentIndex) + } + + @inlinable + mutating func decode(_: UUID.Type) throws -> UUID? { + guard let uuidString = try decode(String.self) else { return nil } + guard let uuid = UUID(uuidString: uuidString) else { throw InvalidUUID() } + return uuid + } +} + +@usableFromInline +struct InvalidUUID: Error { + @usableFromInline + init() {} +} + +@usableFromInline +struct InvalidDate: Error { + @usableFromInline + init() {} +} diff --git a/Sources/StructuredQueries/QueryCursor.swift b/Sources/StructuredQueries/QueryCursor.swift new file mode 100644 index 0000000..35c5328 --- /dev/null +++ b/Sources/StructuredQueries/QueryCursor.swift @@ -0,0 +1,71 @@ +import Foundation +import PowerSync +import StructuredQueries + +/// The Structured Queries library is dialect agnostic. +/// For our purposes, we can use "?" for placeholders. +public extension QueryFragment { + func prepareSqlite() -> (sql: String, bindings: [QueryBinding]) { + prepare { _ in "?" } + } +} + +@usableFromInline +final class QueryValueCursor { + public typealias Element = QueryValue.QueryOutput + @usableFromInline + let powerSync: PowerSyncDatabaseProtocol + + @usableFromInline + let query: QueryFragment + + @usableFromInline + init(powerSync: PowerSyncDatabaseProtocol, query: QueryFragment) throws { + self.powerSync = powerSync + self.query = query + } + + /// Performs a `PowerSyncDatabaseProtocol.getAll` to execute a SELECT query. + /// A decoder users the provided `SqlCursor` to map the columns to the Structured Table type. + @inlinable + public func elements() async throws -> [Element] { + let preparedQuery = query.prepareSqlite() + return try await powerSync.getAll( + sql: preparedQuery.sql, + parameters: preparedQuery.bindings.map { try $0.powerSyncValue } + ) { psCursor in + + var decoder = PowerSyncQueryDecoder(cursor: psCursor) + return try QueryValue(decoder: &decoder).queryOutput + } + } +} + +/// The bindings provided by `prepare` seem to be wrapped in a Swift class +/// which causes binding to fail. This converts values to be usable by the Kotlin SDK. +extension QueryBinding { + @inlinable + var powerSyncValue: Sendable? { + get throws { + switch self { + case let .blob(blob): + return blob + case let .date(date): + let formatter = ISO8601DateFormatter() + return formatter.string(from: date) + case let .double(double): + return double + case let .int(int): + return int + case .null: + return nil + case let .text(text): + return text + case let .uuid(uuid): + return uuid.uuidString + case let .invalid(error): + throw error + } + } + } +} diff --git a/Sources/StructuredQueries/Statement+PowerSync.swift b/Sources/StructuredQueries/Statement+PowerSync.swift new file mode 100644 index 0000000..5382780 --- /dev/null +++ b/Sources/StructuredQueries/Statement+PowerSync.swift @@ -0,0 +1,141 @@ +import Foundation +import PowerSync +import StructuredQueriesCore + +public extension StructuredQueriesCore.Statement { + /// Executes a structured query on the given database connection. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// try await Player.insert { $0.name } values: { "Arthur" } + /// .execute(db) + /// // INSERT INTO "players" ("name") + /// // VALUES ('Arthur'); + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + @inlinable + @MainActor + func execute(_ powerSync: PowerSyncDatabaseProtocol) async throws where QueryValue == () { + let preparedQuery = query.prepareSqlite() + try await powerSync.execute( + sql: preparedQuery.sql, + parameters: preparedQuery.bindings.map { try $0.powerSyncValue } + ) + } + + /// Returns an array of all values fetched from the database. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// let lastName = "O'Reilly" + /// let players = try await Player + /// .where { $0.lastName == lastName } + /// .fetchAll(db) + /// // SELECT … FROM "players" + /// // WHERE "players"."lastName" = 'O''Reilly' + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + /// - Returns: An array of all values decoded from the database. + @inlinable + func fetchAll(_ powerSync: PowerSyncDatabaseProtocol) async throws -> [QueryValue.QueryOutput] + where QueryValue: QueryRepresentable + { + let cursor: QueryValueCursor = try QueryValueCursor( + powerSync: powerSync, + query: query + ) + return try await cursor.elements() + } + + /// Returns a single value fetched from the database. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// let lastName = "O'Reilly" + /// let player = try await Player + /// .where { $0.lastName == lastName } + /// .limit(1) + /// .fetchOne(db) + /// // SELECT … FROM "players" + /// // WHERE "players"."lastName" = 'O''Reilly' + /// // LIMIT 1 + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + /// - Returns: A single value decoded from the database. + @inlinable + func fetchOne(_ powerSync: PowerSyncDatabaseProtocol) async throws -> QueryValue.QueryOutput? + where QueryValue: QueryRepresentable + { + let all = try await fetchAll(powerSync) + return all.first + } +} + +public extension SelectStatement where QueryValue == (), Joins == () { + /// Returns the number of rows fetched by the query. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// let count = try await Player.all.fetchCount(db) + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + /// - Returns: The number of rows fetched by the query. + @inlinable + func fetchCount(_ powerSync: PowerSyncDatabaseProtocol) async throws -> Int { + let query = asSelect().count() + return try await query.fetchOne(powerSync) ?? 0 + } +} + +extension SelectStatement where QueryValue == (), Joins == () { + /// Returns an array of all values fetched from the database. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// let users = try await User.all.fetchAll(db) + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + /// - Returns: An array of all values decoded from the database. + @_documentation(visibility: private) + @inlinable + public func fetchAll(_ powerSync: PowerSyncDatabaseProtocol) async throws -> [From.QueryOutput] { + let cursor = try QueryValueCursor( + powerSync: powerSync, + query: query + ) + return try await cursor.elements() + } + + /// Returns a single value fetched from the database. + /// + /// For example: + /// + /// ```swift + /// let db = PowerSyncDatabase(...) + /// let user = try await User.all.fetchOne(db) + /// ``` + /// + /// - Parameter powerSync: A PowerSync database connection. + /// - Returns: A single value decoded from the database. + @_documentation(visibility: private) + @inlinable + public func fetchOne(_ powerSync: PowerSyncDatabaseProtocol) async throws -> From.QueryOutput? { + let all = try await fetchAll(powerSync) + return all.first + } +} diff --git a/Tests/PowerSyncTests/AttachmentTests.swift b/Tests/PowerSyncTests/AttachmentTests.swift index b4427e4..d21eb78 100644 --- a/Tests/PowerSyncTests/AttachmentTests.swift +++ b/Tests/PowerSyncTests/AttachmentTests.swift @@ -1,4 +1,3 @@ - @testable import PowerSync import XCTest @@ -29,7 +28,7 @@ final class AttachmentTests: XCTestCase { database = nil try await super.tearDown() } - + func getAttachmentDirectory() -> String { URL(fileURLWithPath: NSTemporaryDirectory()).appendingPathComponent("attachments").path } @@ -40,37 +39,39 @@ final class AttachmentTests: XCTestCase { remoteStorage: { struct MockRemoteStorage: RemoteStorageAdapter { func uploadFile( - fileData: Data, - attachment: Attachment + fileData _: Data, + attachment _: Attachment ) async throws {} - + /** * Download a file from remote storage */ - func downloadFile(attachment: Attachment) async throws -> Data { + func downloadFile(attachment _: Attachment) async throws -> Data { return Data([1, 2, 3]) } - + /** * Delete a file from remote storage */ - func deleteFile(attachment: Attachment) async throws {} + func deleteFile(attachment _: Attachment) async throws {} } - + return MockRemoteStorage() }(), attachmentsDirectory: getAttachmentDirectory(), - watchAttachments: { try self.database.watch(options: WatchOptions( - sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", - mapper: { cursor in try WatchedAttachmentItem( - id: cursor.getString(name: "photo_id"), - fileExtension: "jpg" - ) } - )) } + watchAttachments: { [database] in + try database!.watch(options: WatchOptions( + sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", + mapper: { cursor in try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) } + )) + } ) - + try await queue.startSync() - + // Create a user which has a photo_id associated. // This will be treated as a download since no attachment record was created. // saveFile creates the attachment record before the updates are made. @@ -78,58 +79,60 @@ final class AttachmentTests: XCTestCase { sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'steven', 'steven@example.com', uuid())", parameters: [] ) - - let attachmentsWatch = try database.watch( - options: WatchOptions( - sql: "SELECT * FROM attachments", - mapper: { cursor in try Attachment.fromCursor(cursor) } - )).makeAsyncIterator() - + let attachmentRecord = try await waitForMatch( - iterator: attachmentsWatch, + iteratorGenerator: { [database] in try database!.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )) }, where: { results in results.first?.state == AttachmentState.synced }, timeout: 5 ).first - - // The file should exist + +// The file should exist let localData = try await queue.localStorage.readFile(filePath: attachmentRecord!.localUri!) XCTAssertEqual(localData.count, 3) - + try await queue.clearQueue() try await queue.close() } - + func testAttachmentUpload() async throws { - class MockRemoteStorage: RemoteStorageAdapter { + actor MockRemoteStorage: RemoteStorageAdapter { public var uploadCalled = false - + + func wasUploadCalled() -> Bool { + return uploadCalled + } + func uploadFile( - fileData: Data, - attachment: Attachment + fileData _: Data, + attachment _: Attachment ) async throws { uploadCalled = true } - + /** * Download a file from remote storage */ - func downloadFile(attachment: Attachment) async throws -> Data { + func downloadFile(attachment _: Attachment) async throws -> Data { return Data([1, 2, 3]) } - + /** * Delete a file from remote storage */ - func deleteFile(attachment: Attachment) async throws {} + func deleteFile(attachment _: Attachment) async throws {} } let mockedRemote = MockRemoteStorage() - + let queue = AttachmentQueue( db: database, remoteStorage: mockedRemote, attachmentsDirectory: getAttachmentDirectory(), - watchAttachments: { try self.database.watch(options: WatchOptions( + watchAttachments: { [database] in try database!.watch(options: WatchOptions( sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", mapper: { cursor in try WatchedAttachmentItem( id: cursor.getString(name: "photo_id"), @@ -137,35 +140,36 @@ final class AttachmentTests: XCTestCase { ) } )) } ) - + try await queue.startSync() - - let attachmentsWatch = try database.watch( - options: WatchOptions( - sql: "SELECT * FROM attachments", - mapper: { cursor in try Attachment.fromCursor(cursor) } - )).makeAsyncIterator() - + _ = try await queue.saveFile( data: Data([3, 4, 5]), mediaType: "image/jpg", fileExtension: "jpg" - ) { tx, attachment in - _ = try tx.execute( + ) { transaction, attachment in + _ = try transaction.execute( sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'john', 'j@j.com', ?)", parameters: [attachment.id] ) } - + _ = try await waitForMatch( - iterator: attachmentsWatch, + iteratorGenerator: { [database] in + try database!.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )) + }, where: { results in results.first?.state == AttachmentState.synced }, timeout: 5 ).first - + + let uploadCalled = await mockedRemote.wasUploadCalled() // Upload should have been called - XCTAssertTrue(mockedRemote.uploadCalled) - + XCTAssertTrue(uploadCalled) + try await queue.clearQueue() try await queue.close() } @@ -176,21 +180,18 @@ public enum WaitForMatchError: Error { case predicateFail(message: String) } -public func waitForMatch( - iterator: AsyncThrowingStream.Iterator, - where predicate: @escaping (T) -> Bool, +public func waitForMatch( + iteratorGenerator: @Sendable @escaping () throws -> AsyncThrowingStream, + where predicate: @Sendable @escaping (T) -> Bool, timeout: TimeInterval ) async throws -> T { let timeoutNanoseconds = UInt64(timeout * 1_000_000_000) return try await withThrowingTaskGroup(of: T.self) { group in // Task to wait for a matching value - group.addTask { - var localIterator = iterator - while let value = try await localIterator.next() { - if predicate(value) { - return value - } + group.addTask { [iteratorGenerator] in + for try await value in try iteratorGenerator() where predicate(value) { + return value } throw WaitForMatchError.timeout() // stream ended before match } @@ -214,13 +215,13 @@ func waitFor( predicate: () async throws -> Void ) async throws { let intervalNanoseconds = UInt64(interval * 1_000_000_000) - + let timeoutDate = Date( timeIntervalSinceNow: timeout ) - + var lastError: Error? - + while Date() < timeoutDate { do { try await predicate() @@ -230,7 +231,7 @@ func waitFor( } try await Task.sleep(nanoseconds: intervalNanoseconds) } - + throw WaitForMatchError.timeout( lastError: lastError ) diff --git a/Tests/PowerSyncTests/ConnectTests.swift b/Tests/PowerSyncTests/ConnectTests.swift index 3c80cb4..22aaea9 100644 --- a/Tests/PowerSyncTests/ConnectTests.swift +++ b/Tests/PowerSyncTests/ConnectTests.swift @@ -73,8 +73,8 @@ final class ConnectTests: XCTestCase { description: "Watch Sync Status" ) - let watchTask = Task { - for try await _ in database.currentStatus.asFlow() { + let watchTask = Task { [database] in + for try await _ in database!.currentStatus.asFlow() { expectation.fulfill() } }