Skip to content

Commit dd1819e

Browse files
committed
Rename AsyncQueue -> FIFOQueue. Create ActorQueue
1 parent 5405eff commit dd1819e

File tree

7 files changed

+423
-68
lines changed

7 files changed

+423
-68
lines changed

README.md

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,77 @@
55
[![License](https://img.shields.io/cocoapods/l/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue)
66
[![Platform](https://img.shields.io/cocoapods/p/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue)
77

8-
A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts.
8+
A library of queues that enables sending ordered tasks from synchronous to asynchronous contexts.
99

10-
## Usage
10+
## Task Ordering and Swift Concurrency
1111

12-
### Basic Initialization
12+
Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test:
1313

14-
```swift
15-
let asyncQueue = AsyncQueue()
1614
```
15+
@MainActor
16+
func test_mainActor_taskOrdering() async {
17+
var counter = 0
18+
var tasks = [Task<Void, Never>]()
19+
for iteration in 1...100 {
20+
tasks.append(Task {
21+
counter += 1
22+
XCTAssertEqual(counter, iteration) // often fails
23+
})
24+
}
25+
for task in tasks {
26+
_ = await task.value
27+
}
28+
}
29+
```
30+
31+
Despite the spawned `Task` inheriting the serial `@MainActor` execution context, the ordering of the scheduled asynchronous work is not guaranteed.
1732

18-
### Sending events from a synchronous context
33+
While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID645) are great at serializing tasks, there is no simple way in the standard Swift library to send ordered tasks to them from a synchronous context.
34+
35+
### Enqueueing asynchronous tasks in FIFO order
36+
37+
Use a `FIFOQueue` queue to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued.
1938

2039
```swift
21-
asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ }
40+
let queue = FIFOQueue()
41+
queue.async {
42+
/*
43+
`async` context that executes after all other enqueued work is completed.
44+
Work enqueued after this task will wait for this task to complete.
45+
*/
46+
}
47+
Task {
48+
await queue.await {
49+
/*
50+
`async` context that can return a value or throw an error.
51+
Executes after all other enqueued work is completed.
52+
Work enqueued after this task will wait for this task to complete.
53+
*/
54+
}
55+
}
2256
```
2357

24-
### Awaiting work from an asynchronous context
58+
### Enqueueing asynchronous tasks in Actor order
59+
60+
Use an `ActorQueue` queue to execute asynchronous tasks enqueued from a nonisolated context in order. Tasks sent to one of these queues are guaranteed to begin _but not end_ executing in the order in which they are enqueued.
2561

2662
```swift
27-
await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ }
63+
let queue = ActorQueue()
64+
queue.async {
65+
/*
66+
`async` context that executes after all other enqueued work has begun executing.
67+
Work enqueued after this task will wait for this task to complete or suspend.
68+
*/
69+
}
70+
Task {
71+
await queue.await {
72+
/*
73+
`async` context that can return a value or throw an error.
74+
Executes after all other enqueued work has completed or suspended.
75+
Work enqueued after this task will wait for this task to complete or suspend.
76+
*/
77+
}
78+
}
2879
```
2980

3081
## Requirements
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// MIT License
2+
//
3+
// Copyright (c) 2022 Dan Federman
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
/// A queue that executes asynchronous tasks enqueued from a nonisolated context.
24+
/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow tasks that were enqueued to begin executing.
25+
/// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends.
26+
public final class ActorQueue: Sendable {
27+
28+
// MARK: Initialization
29+
30+
/// Instantiates an asynchronous queue.
31+
/// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue.
32+
public init(priority: TaskPriority? = nil) {
33+
var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil
34+
let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in
35+
capturedTaskStreamContinuation = continuation
36+
}
37+
guard let capturedTaskStreamContinuation else {
38+
fatalError("Continuation not captured during stream creation!")
39+
}
40+
taskStreamContinuation = capturedTaskStreamContinuation
41+
42+
streamTask = Task.detached(priority: priority) {
43+
actor ActorExecutor {
44+
init(priority: TaskPriority?) {
45+
self.priority = priority
46+
}
47+
48+
func seriallyExecute(_ task: @escaping @Sendable () async -> Void) async {
49+
let semaphore = Semaphore()
50+
Task.detached(priority: priority) {
51+
await self.execute(task, afterSignaling: semaphore)
52+
}
53+
// Wait for the task to start.
54+
await semaphore.wait()
55+
}
56+
57+
private let priority: TaskPriority?
58+
private func execute(
59+
_ task: @escaping @Sendable () async -> Void,
60+
afterSignaling semaphore: Semaphore
61+
) async {
62+
// Signal that the task has started.
63+
await semaphore.signal()
64+
await task()
65+
}
66+
}
67+
68+
let executor = ActorExecutor(priority: priority)
69+
for await task in taskStream {
70+
await executor.seriallyExecute(task)
71+
}
72+
}
73+
}
74+
75+
deinit {
76+
taskStreamContinuation.finish()
77+
}
78+
79+
// MARK: Public
80+
81+
/// Schedules an asynchronous task for execution and immediately returns.
82+
/// The schedueled task will not execute until all prior tasks have completed.
83+
/// - Parameter task: The task to enqueue.
84+
public func async(_ task: @escaping @Sendable () async -> Void) {
85+
taskStreamContinuation.yield(task)
86+
}
87+
88+
/// Schedules an asynchronous throwing task and returns after the task is complete.
89+
/// The schedueled task will not execute until all prior tasks have completed.
90+
/// - Parameter task: The task to enqueue.
91+
/// - Returns: The value returned from the enqueued task.
92+
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
93+
await withUnsafeContinuation { continuation in
94+
taskStreamContinuation.yield {
95+
continuation.resume(returning: await task())
96+
}
97+
}
98+
}
99+
100+
/// Schedules an asynchronous task and returns after the task is complete.
101+
/// The schedueled task will not execute until all prior tasks have completed.
102+
/// - Parameter task: The task to enqueue.
103+
/// - Returns: The value returned from the enqueued task.
104+
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
105+
try await withUnsafeThrowingContinuation { continuation in
106+
taskStreamContinuation.yield {
107+
do {
108+
continuation.resume(returning: try await task())
109+
} catch {
110+
continuation.resume(throwing: error)
111+
}
112+
}
113+
}
114+
}
115+
116+
// MARK: Private
117+
118+
private let streamTask: Task<Void, Never>
119+
private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation
120+
}

Sources/AsyncQueue/AsyncQueue.swift renamed to Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
// SOFTWARE.
2222

23-
/// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts
24-
public final class AsyncQueue: Sendable {
23+
/// A queue that executes asynchronous tasks enqueued from a nonisolated context in FIFO order.
24+
/// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued.
25+
/// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock.
26+
public final class FIFOQueue: Sendable {
2527

2628
// MARK: Initialization
2729

Sources/AsyncQueue/Semaphore.swift

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// MIT License
2+
//
3+
// Copyright (c) 2022 Dan Federman
4+
//
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
/// A thread-safe semaphore implementation.
24+
actor Semaphore {
25+
func wait() async {
26+
count -= 1
27+
guard count < 0 else {
28+
// We don't need to wait because count is greater than or equal to zero.
29+
return
30+
}
31+
32+
await withCheckedContinuation { continuation in
33+
continuations.append(continuation)
34+
}
35+
}
36+
37+
func signal() {
38+
count += 1
39+
guard !isWaiting else {
40+
// Continue waiting.
41+
return
42+
}
43+
44+
for continuation in continuations {
45+
continuation.resume()
46+
}
47+
48+
continuations.removeAll()
49+
}
50+
51+
var isWaiting: Bool {
52+
count < 0
53+
}
54+
55+
private var continuations = [CheckedContinuation<Void, Never>]()
56+
private var count = 0
57+
}

0 commit comments

Comments
 (0)