Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2b30c2f
Rename AsyncQueue -> FIFOQueue. Create ActorQueue
dfed Nov 24, 2022
9511ffe
Create SemaphoreTests.swift to up coverage
dfed Nov 24, 2022
0070e39
Simplify
dfed Nov 24, 2022
67910a7
Get to 100% coverage
dfed Nov 24, 2022
0565678
Enable sending synchronous tasks to the ActorQueue
dfed Nov 28, 2022
17894cc
Improve explanation of ActorQueue's ordering guarantee
dfed Nov 28, 2022
200034c
Merge branch 'main' into dfed--fifo-vs-actor
dfed Nov 28, 2022
86f564f
The queues are the antecedent
dfed Nov 28, 2022
96b497f
Improve accuracy of ActorQueue description in README
dfed Nov 28, 2022
e5a7f55
ActorQueue does not need to be Sendable
dfed Nov 28, 2022
6fa22de
Update comments
dfed Nov 28, 2022
1f656db
documentation copy/pasta fix
dfed Nov 28, 2022
64f00e3
Disable sending synchronous tasks to the ActorQueue
dfed Nov 28, 2022
583ca33
Better explain how test_async_startsExecutionOfNextTaskAfterSuspensio…
dfed Dec 3, 2022
79b9999
Merge branch 'main' into dfed--fifo-vs-actor
dfed Dec 3, 2022
b313127
Do not wait a runloop to test
dfed Dec 3, 2022
2539cae
further test cleanup
dfed Dec 3, 2022
2dc1d8c
Write comments to explain the test
dfed Dec 5, 2022
2863b37
Make wait() return whether it suspended
dfed Dec 5, 2022
8de9d14
Improve test_wait_suspendsUntilEqualNumberOfSignalCalls
dfed Dec 6, 2022
d1a5560
Better comments
dfed Dec 7, 2022
744db4a
Better README documentation
dfed Dec 8, 2022
031fb5b
Remove duplicative label 'queue' from README discussion
dfed Dec 8, 2022
63fa685
Eliminate race in test_wait_suspendsUntilEqualNumberOfSignalCalls()
dfed Dec 8, 2022
dca625d
Move ActorQueue example to a doc comment
dfed Dec 9, 2022
e258cb8
Merge branch 'main' into dfed--fifo-vs-actor
dfed Dec 9, 2022
f70f19d
Bump version and update README
dfed Dec 9, 2022
b8357db
Simplify ActorExecutor
dfed Dec 9, 2022
10ec535
Do not ship Semaphore. Use fewer 'await' calls in ActorExecutor
dfed Dec 10, 2022
ddff295
Add link to Swift bug report
dfed Dec 10, 2022
847c0ed
Since we're renaming we should bump a major version (i.e. a minor ver…
dfed Dec 10, 2022
01c2dac
Delete SemaphoreTests.swift since it is no longer required
dfed Dec 10, 2022
47a1753
Remove unnecessary '@MainActor' decoration on test
dfed Dec 10, 2022
be96303
Add test_async_retainsReceiverUntilFlushed() to ActorQueueTests to ma…
dfed Dec 10, 2022
139f874
Simply workaround for Task not executing within actor execution context
dfed Dec 10, 2022
2435bfb
Revert "Simply workaround for Task not executing within actor executi…
dfed Dec 10, 2022
3f6d789
Better test name
dfed Dec 10, 2022
4b66f40
Make explicit that queue is deallocated in tests
dfed Dec 10, 2022
ada3d6c
Update test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() i…
dfed Dec 10, 2022
771e5e4
Accurate test method naming
dfed Dec 11, 2022
ea6ade9
Remove unnecessary await
dfed Dec 11, 2022
1fb3643
Merge branch 'main' into dfed--fifo-vs-actor
dfed Dec 21, 2022
bf3a0c2
Add swift tag to multi-line code examples
dfed Jan 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AsyncQueue.podspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Pod::Spec.new do |s|
s.name = 'AsyncQueue'
s.version = '0.0.1'
s.version = '0.1.0'
s.license = 'MIT'
s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.'
s.homepage = 'https://github.com/dfed/swift-async-queue'
Expand Down
83 changes: 73 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,89 @@
[![License](https://img.shields.io/cocoapods/l/AsyncQueue.svg)](https://cocoapods.org/pods/AsyncQueue)
[![Platform](https://img.shields.io/cocoapods/p/AsyncQueue.svg)](https://cocoapods.org/pods/AsyncQueue)

A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts.
A library of queues that enable sending ordered tasks from synchronous to asynchronous contexts.

## Usage
## Task Ordering and Swift Concurrency

### Basic Initialization
Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test:

```swift
let asyncQueue = AsyncQueue()
@MainActor
func test_mainActor_taskOrdering() async {
var counter = 0
var tasks = [Task<Void, Never>]()
for iteration in 1...100 {
tasks.append(Task {
counter += 1
XCTAssertEqual(counter, iteration) // often fails
})
}
for task in tasks {
_ = await task.value
}
}
```

### Sending events from a synchronous context
Despite the spawned `Task` inheriting the serial `@MainActor` execution context, the ordering of the scheduled asynchronous work is not guaranteed.

While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID645) are great at serializing tasks, there is no simple way in the standard Swift library to send ordered tasks to them from a synchronous context.

### Executing asynchronous tasks in FIFO order

Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued.

```swift
asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ }
let queue = FIFOQueue()
queue.async {
/*
`async` context that executes after all other enqueued work is completed.
Work enqueued after this task will wait for this task to complete.
*/
try? await Task.sleep(nanoseconds: 1_000_000)
}
queue.async {
/*
This task begins execution once the above one-second sleep completes.
*/
}
Task {
await queue.await {
/*
`async` context that can return a value or throw an error.
Executes after all other enqueued work is completed.
Work enqueued after this task will wait for this task to complete.
*/
}
}
```

### Awaiting work from an asynchronous context
### Sending ordered asynchronous tasks to Actors

Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code.

```swift
await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ }
let queue = ActorQueue()
queue.async {
/*
`async` context that executes after all other enqueued work has begun executing.
Work enqueued after this task will wait for this task to complete or suspend.
*/
try? await Task.sleep(nanoseconds: 1_000_000)
}
queue.async {
/*
This task begins execution once the above task suspends due to the one-second sleep.
*/
}
Task {
await queue.await {
/*
`async` context that can return a value or throw an error.
Executes after all other enqueued work has begun executing.
Work enqueued after this task will wait for this task to complete or suspend.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/
}
}
```

## Requirements
Expand All @@ -45,7 +108,7 @@ To install swift-async-queue in your iOS project with [Swift Package Manager](ht

```swift
dependencies: [
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.1"),
.package(url: "https://github.com/dfed/swift-async-queue", from: "0.1.0"),
]
```

Expand All @@ -55,7 +118,7 @@ To install swift-async-queue in your iOS project with [CocoaPods](http://cocoapo

```
platform :ios, '13.0'
pod 'AsyncQueue', '~> 0.1'
pod 'AsyncQueue', '~> 0.1.0'
```

## Contributing
Expand Down
144 changes: 144 additions & 0 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// MIT License
//
// Copyright (c) 2022 Dan Federman
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

/// A queue that executes asynchronous tasks enqueued from a nonisolated context.
/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing.
/// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends.
///
/// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order.
/// Here is an example of how an `ActorQueue` should be utilized within an `actor`:
/// ```swift
/// public actor LogStore {
///
/// nonisolated
/// public func log(_ message: String) {
/// queue.async {
/// await self.append(message)
/// }
/// }
///
/// nonisolated
/// public func retrieveLogs() async -> [String] {
/// await queue.await { await self.logs }
/// }
///
/// private func append(_ message: String) {
/// logs.append(message)
/// }
///
/// private let queue = ActorQueue()
/// private var logs = [String]()
/// }
/// ```
///
/// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance.
public final class ActorQueue {

// MARK: Initialization

/// Instantiates an actor queue.
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
public init(priority: TaskPriority? = nil) {
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
capturedTaskStreamContinuation = continuation
}
// Continuation will be captured during stream creation, so it is safe to force unwrap here.
// If this force-unwrap fails, something is fundamentally broken in the Swift runtime.
taskStreamContinuation = capturedTaskStreamContinuation!

Task.detached(priority: priority) {
let executor = ActorExecutor()
for await task in taskStream {
await executor.suspendUntilStarted(task)
}
}
}

deinit {
taskStreamContinuation.finish()
}

// MARK: Public

/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
public func async(_ task: @escaping @Sendable () async -> Void) {
taskStreamContinuation.yield(task)
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task())
}
}
}

/// Schedules an asynchronous throwing task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function is accepts a throwing task should the above one accept a throwing task too?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd use rethrows here so we only need to declare one method, however I couldn't get rethrows to work here: there's no way today (that I know of) to signal to the compiler that we're only going to throw within withUnsafeThrowingContinuation if and only if the task throws. So I created one method that can throw and one that can't.

try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
continuation.resume(returning: try await task())
} catch {
continuation.resume(throwing: error)
}
}
}
}

// MARK: Private

private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation

// MARK: - ActorExecutor

private actor ActorExecutor {
func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async {
// Suspend the calling code until our enqueued task starts.
await withUnsafeContinuation { continuation in
// Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete.
Task {
// Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance.
// This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503
_ = void

// Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed.
continuation.resume()
await task()
}
}
}

private let void: Void = ()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

/// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts
public final class AsyncQueue: Sendable {
/// A queue that executes asynchronous tasks enqueued from a nonisolated context in FIFO order.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As before I think it would be helpful to have a code example showing how this class is intended to be used.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been reflecting on this, and this queue doesn't really have an "intended" way to be used. It enables executing async tasks from a synchronous context in FIFO order, but unlike the ActorQueue there's no real limitations or intended usage here.

/// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued.
/// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock.
public final class FIFOQueue: Sendable {

// MARK: Initialization

/// Instantiates an asynchronous queue.
/// Instantiates a FIFO queue.
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
public init(priority: TaskPriority? = nil) {
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
Expand Down Expand Up @@ -56,7 +58,7 @@ public final class AsyncQueue: Sendable {
taskStreamContinuation.yield(task)
}

/// Schedules an asynchronous throwing task and returns after the task is complete.
/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
Expand All @@ -68,7 +70,7 @@ public final class AsyncQueue: Sendable {
}
}

/// Schedules an asynchronous task and returns after the task is complete.
/// Schedules an asynchronous throwing task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
Expand Down
Loading