-
-
Notifications
You must be signed in to change notification settings - Fork 14
Rename AsyncQueue -> FIFOQueue. Create ActorQueue #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2b30c2f
9511ffe
0070e39
67910a7
0565678
17894cc
200034c
86f564f
96b497f
e5a7f55
6fa22de
1f656db
64f00e3
583ca33
79b9999
b313127
2539cae
2dc1d8c
2863b37
8de9d14
d1a5560
744db4a
031fb5b
63fa685
dca625d
e258cb8
f70f19d
b8357db
10ec535
ddff295
847c0ed
01c2dac
47a1753
be96303
139f874
2435bfb
3f6d789
4b66f40
ada3d6c
771e5e4
ea6ade9
1fb3643
bf3a0c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,26 +6,89 @@ | |
| [](https://cocoapods.org/pods/AsyncQueue) | ||
| [](https://cocoapods.org/pods/AsyncQueue) | ||
|
|
||
| A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts. | ||
| A library of queues that enable sending ordered tasks from synchronous to asynchronous contexts. | ||
|
|
||
| ## Usage | ||
| ## Task Ordering and Swift Concurrency | ||
|
|
||
| ### Basic Initialization | ||
| Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test: | ||
|
|
||
| ```swift | ||
| let asyncQueue = AsyncQueue() | ||
| @MainActor | ||
| func test_mainActor_taskOrdering() async { | ||
| var counter = 0 | ||
| var tasks = [Task<Void, Never>]() | ||
| for iteration in 1...100 { | ||
| tasks.append(Task { | ||
| counter += 1 | ||
| XCTAssertEqual(counter, iteration) // often fails | ||
| }) | ||
| } | ||
| for task in tasks { | ||
| _ = await task.value | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Sending events from a synchronous context | ||
| Despite the spawned `Task` inheriting the serial `@MainActor` execution context, the ordering of the scheduled asynchronous work is not guaranteed. | ||
|
|
||
| While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID645) are great at serializing tasks, there is no simple way in the standard Swift library to send ordered tasks to them from a synchronous context. | ||
|
|
||
| ### Executing asynchronous tasks in FIFO order | ||
|
|
||
| Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued. | ||
|
|
||
| ```swift | ||
| asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ } | ||
| let queue = FIFOQueue() | ||
| queue.async { | ||
| /* | ||
| `async` context that executes after all other enqueued work is completed. | ||
| Work enqueued after this task will wait for this task to complete. | ||
| */ | ||
| try? await Task.sleep(nanoseconds: 1_000_000) | ||
| } | ||
| queue.async { | ||
| /* | ||
| This task begins execution once the above one-second sleep completes. | ||
| */ | ||
| } | ||
| Task { | ||
| await queue.await { | ||
| /* | ||
| `async` context that can return a value or throw an error. | ||
| Executes after all other enqueued work is completed. | ||
| Work enqueued after this task will wait for this task to complete. | ||
| */ | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Awaiting work from an asynchronous context | ||
| ### Sending ordered asynchronous tasks to Actors | ||
|
|
||
| Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code. | ||
|
|
||
| ```swift | ||
| await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ } | ||
| let queue = ActorQueue() | ||
| queue.async { | ||
| /* | ||
| `async` context that executes after all other enqueued work has begun executing. | ||
| Work enqueued after this task will wait for this task to complete or suspend. | ||
| */ | ||
| try? await Task.sleep(nanoseconds: 1_000_000) | ||
| } | ||
| queue.async { | ||
| /* | ||
| This task begins execution once the above task suspends due to the one-second sleep. | ||
| */ | ||
| } | ||
| Task { | ||
| await queue.await { | ||
| /* | ||
| `async` context that can return a value or throw an error. | ||
| Executes after all other enqueued work has begun executing. | ||
| Work enqueued after this task will wait for this task to complete or suspend. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| */ | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Requirements | ||
|
|
@@ -45,7 +108,7 @@ To install swift-async-queue in your iOS project with [Swift Package Manager](ht | |
|
|
||
| ```swift | ||
| dependencies: [ | ||
| .package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.1"), | ||
| .package(url: "https://github.com/dfed/swift-async-queue", from: "0.1.0"), | ||
| ] | ||
| ``` | ||
|
|
||
|
|
@@ -55,7 +118,7 @@ To install swift-async-queue in your iOS project with [CocoaPods](http://cocoapo | |
|
|
||
| ``` | ||
| platform :ios, '13.0' | ||
| pod 'AsyncQueue', '~> 0.1' | ||
| pod 'AsyncQueue', '~> 0.1.0' | ||
| ``` | ||
|
|
||
| ## Contributing | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| // MIT License | ||
| // | ||
| // Copyright (c) 2022 Dan Federman | ||
| // | ||
| // Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| // of this software and associated documentation files (the "Software"), to deal | ||
| // in the Software without restriction, including without limitation the rights | ||
| // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| // copies of the Software, and to permit persons to whom the Software is | ||
| // furnished to do so, subject to the following conditions: | ||
| // | ||
| // The above copyright notice and this permission notice shall be included in all | ||
| // copies or substantial portions of the Software. | ||
| // | ||
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| // SOFTWARE. | ||
|
|
||
| /// A queue that executes asynchronous tasks enqueued from a nonisolated context. | ||
dfed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing. | ||
| /// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends. | ||
| /// | ||
| /// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order. | ||
| /// Here is an example of how an `ActorQueue` should be utilized within an `actor`: | ||
| /// ```swift | ||
| /// public actor LogStore { | ||
| /// | ||
| /// nonisolated | ||
| /// public func log(_ message: String) { | ||
| /// queue.async { | ||
| /// await self.append(message) | ||
| /// } | ||
| /// } | ||
| /// | ||
| /// nonisolated | ||
| /// public func retrieveLogs() async -> [String] { | ||
| /// await queue.await { await self.logs } | ||
| /// } | ||
| /// | ||
| /// private func append(_ message: String) { | ||
| /// logs.append(message) | ||
| /// } | ||
| /// | ||
| /// private let queue = ActorQueue() | ||
| /// private var logs = [String]() | ||
| /// } | ||
| /// ``` | ||
| /// | ||
| /// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance. | ||
dfed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public final class ActorQueue { | ||
|
|
||
| // MARK: Initialization | ||
|
|
||
| /// Instantiates an actor queue. | ||
| /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. | ||
| public init(priority: TaskPriority? = nil) { | ||
| var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil | ||
| let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in | ||
| capturedTaskStreamContinuation = continuation | ||
| } | ||
| // Continuation will be captured during stream creation, so it is safe to force unwrap here. | ||
| // If this force-unwrap fails, something is fundamentally broken in the Swift runtime. | ||
| taskStreamContinuation = capturedTaskStreamContinuation! | ||
|
|
||
| Task.detached(priority: priority) { | ||
| let executor = ActorExecutor() | ||
| for await task in taskStream { | ||
| await executor.suspendUntilStarted(task) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| deinit { | ||
| taskStreamContinuation.finish() | ||
| } | ||
|
|
||
| // MARK: Public | ||
|
|
||
| /// Schedules an asynchronous task for execution and immediately returns. | ||
| /// The scheduled task will not execute until all prior tasks have completed or suspended. | ||
| /// - Parameter task: The task to enqueue. | ||
| public func async(_ task: @escaping @Sendable () async -> Void) { | ||
| taskStreamContinuation.yield(task) | ||
| } | ||
|
|
||
| /// Schedules an asynchronous task and returns after the task is complete. | ||
| /// The scheduled task will not execute until all prior tasks have completed or suspended. | ||
| /// - Parameter task: The task to enqueue. | ||
| /// - Returns: The value returned from the enqueued task. | ||
| public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T { | ||
| await withUnsafeContinuation { continuation in | ||
| taskStreamContinuation.yield { | ||
| continuation.resume(returning: await task()) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Schedules an asynchronous throwing task and returns after the task is complete. | ||
| /// The scheduled task will not execute until all prior tasks have completed or suspended. | ||
| /// - Parameter task: The task to enqueue. | ||
| /// - Returns: The value returned from the enqueued task. | ||
| public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this function is accepts a throwing task should the above one accept a throwing task too?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we'd use |
||
| try await withUnsafeThrowingContinuation { continuation in | ||
| taskStreamContinuation.yield { | ||
| do { | ||
| continuation.resume(returning: try await task()) | ||
| } catch { | ||
| continuation.resume(throwing: error) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // MARK: Private | ||
|
|
||
| private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation | ||
|
|
||
| // MARK: - ActorExecutor | ||
|
|
||
| private actor ActorExecutor { | ||
| func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { | ||
| // Suspend the calling code until our enqueued task starts. | ||
| await withUnsafeContinuation { continuation in | ||
| // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. | ||
| Task { | ||
| // Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance. | ||
| // This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503 | ||
| _ = void | ||
|
|
||
| // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. | ||
| continuation.resume() | ||
| await task() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private let void: Void = () | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,12 +20,14 @@ | |
| // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| // SOFTWARE. | ||
|
|
||
| /// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts | ||
| public final class AsyncQueue: Sendable { | ||
| /// A queue that executes asynchronous tasks enqueued from a nonisolated context in FIFO order. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As before I think it would be helpful to have a code example showing how this class is intended to be used.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been reflecting on this, and this queue doesn't really have an "intended" way to be used. It enables executing async tasks from a synchronous context in FIFO order, but unlike the |
||
| /// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued. | ||
| /// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock. | ||
| public final class FIFOQueue: Sendable { | ||
|
|
||
| // MARK: Initialization | ||
|
|
||
| /// Instantiates an asynchronous queue. | ||
| /// Instantiates a FIFO queue. | ||
| /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. | ||
| public init(priority: TaskPriority? = nil) { | ||
| var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil | ||
|
|
@@ -56,7 +58,7 @@ public final class AsyncQueue: Sendable { | |
| taskStreamContinuation.yield(task) | ||
| } | ||
|
|
||
| /// Schedules an asynchronous throwing task and returns after the task is complete. | ||
| /// Schedules an asynchronous task and returns after the task is complete. | ||
| /// The scheduled task will not execute until all prior tasks have completed. | ||
| /// - Parameter task: The task to enqueue. | ||
| /// - Returns: The value returned from the enqueued task. | ||
|
|
@@ -68,7 +70,7 @@ public final class AsyncQueue: Sendable { | |
| } | ||
| } | ||
|
|
||
| /// Schedules an asynchronous task and returns after the task is complete. | ||
| /// Schedules an asynchronous throwing task and returns after the task is complete. | ||
| /// The scheduled task will not execute until all prior tasks have completed. | ||
| /// - Parameter task: The task to enqueue. | ||
| /// - Returns: The value returned from the enqueued task. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.