From 2b30c2f767bce1f1dc38ff4adf1386184680f055 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Wed, 23 Nov 2022 21:12:51 -0800 Subject: [PATCH 01/39] Rename AsyncQueue -> FIFOQueue. Create ActorQueue --- README.md | 69 ++++++-- Sources/AsyncQueue/ActorQueue.swift | 116 +++++++++++++ .../{AsyncQueue.swift => FIFOQueue.swift} | 6 +- Sources/AsyncQueue/Semaphore.swift | 57 +++++++ Tests/AsyncQueueTests/ActorQueueTests.swift | 153 ++++++++++++++++++ ...cQueueTests.swift => FIFOQueueTests.swift} | 61 +------ Tests/AsyncQueueTests/Utilities/Counter.swift | 36 +++++ 7 files changed, 430 insertions(+), 68 deletions(-) create mode 100644 Sources/AsyncQueue/ActorQueue.swift rename Sources/AsyncQueue/{AsyncQueue.swift => FIFOQueue.swift} (90%) create mode 100644 Sources/AsyncQueue/Semaphore.swift create mode 100644 Tests/AsyncQueueTests/ActorQueueTests.swift rename Tests/AsyncQueueTests/{AsyncQueueTests.swift => FIFOQueueTests.swift} (82%) create mode 100644 Tests/AsyncQueueTests/Utilities/Counter.swift diff --git a/README.md b/README.md index 53b58ea..fe2d1d9 100644 --- a/README.md +++ b/README.md @@ -5,26 +5,77 @@ [![License](https://img.shields.io/cocoapods/l/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue) [![Platform](https://img.shields.io/cocoapods/p/swift-async-queue.svg)](https://cocoapods.org/pods/swift-async-queue) -A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts. +A library of queues that enables sending ordered tasks from synchronous to asynchronous contexts. -## Usage +## Task Ordering and Swift Concurrency -### Basic Initialization +Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test: -```swift -let asyncQueue = AsyncQueue() ``` +@MainActor +func test_mainActor_taskOrdering() async { + var counter = 0 + var tasks = [Task]() + for iteration in 1...100 { + tasks.append(Task { + counter += 1 + XCTAssertEqual(counter, iteration) // often fails + }) + } + for task in tasks { + _ = await task.value + } +} +``` + +Despite the spawned `Task` inheriting the serial `@MainActor` execution context, the ordering of the scheduled asynchronous work is not guaranteed. -### Sending events from a synchronous context +While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID645) are great at serializing tasks, there is no simple way in the standard Swift library to send ordered tasks to them from a synchronous context. + +### Executing asynchronous tasks in FIFO order + +Use a `FIFOQueue` queue to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued. ```swift -asyncQueue.async { /* awaitable context that executes after all other enqueued work is completed */ } +let queue = FIFOQueue() +queue.async { + /* + `async` context that executes after all other enqueued work is completed. + Work enqueued after this task will wait for this task to complete. + */ +} +Task { + await queue.await { + /* + `async` context that can return a value or throw an error. + Executes after all other enqueued work is completed. + Work enqueued after this task will wait for this task to complete. + */ + } +} ``` -### Awaiting work from an asynchronous context +### Sending ordered asynchronous tasks to Actors + +Use an `ActorQueue` queue to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin _but not end_ executing in the order in which they are enqueued. ```swift -await asyncQueue.await { /* throw-able, return-able, awaitable context that executes after all other enqueued work is completed */ } +let queue = ActorQueue() +queue.async { + /* + `async` context that executes after all other enqueued work has begun executing. + Work enqueued after this task will wait for this task to complete or suspend. + */ +} +Task { + await queue.await { + /* + `async` context that can return a value or throw an error. + Executes after all other enqueued work has completed or suspended. + Work enqueued after this task will wait for this task to complete or suspend. + */ + } +} ``` ## Requirements diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift new file mode 100644 index 0000000..fe59c64 --- /dev/null +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -0,0 +1,116 @@ +// MIT License +// +// Copyright (c) 2022 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +/// A queue that executes asynchronous tasks enqueued from a nonisolated context. +/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow tasks that were enqueued to begin executing. +/// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends. +/// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance. +public final class ActorQueue: Sendable { + + // MARK: Initialization + + /// Instantiates an asynchronous queue. + /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. + public init(priority: TaskPriority? = nil) { + var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil + let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in + capturedTaskStreamContinuation = continuation + } + guard let capturedTaskStreamContinuation else { + fatalError("Continuation not captured during stream creation!") + } + taskStreamContinuation = capturedTaskStreamContinuation + + streamTask = Task.detached(priority: priority) { + actor ActorExecutor { + func seriallyExecute(_ task: @escaping @Sendable () async -> Void) async { + let semaphore = Semaphore() + Task { + await self.execute(task, afterSignaling: semaphore) + } + // Wait for the task to start. + await semaphore.wait() + } + + private func execute( + _ task: @escaping @Sendable () async -> Void, + afterSignaling semaphore: Semaphore + ) async { + // Signal that the task has started. + await semaphore.signal() + await task() + } + } + + let executor = ActorExecutor() + for await task in taskStream { + await executor.seriallyExecute(task) + } + } + } + + deinit { + taskStreamContinuation.finish() + } + + // MARK: Public + + /// Schedules an asynchronous task for execution and immediately returns. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + public func async(_ task: @escaping @Sendable () async -> Void) { + taskStreamContinuation.yield(task) + } + + /// Schedules an asynchronous throwing task and returns after the task is complete. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + /// - Returns: The value returned from the enqueued task. + public func await(_ task: @escaping @Sendable () async -> T) async -> T { + await withUnsafeContinuation { continuation in + taskStreamContinuation.yield { + continuation.resume(returning: await task()) + } + } + } + + /// Schedules an asynchronous task and returns after the task is complete. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + /// - Returns: The value returned from the enqueued task. + public func await(_ task: @escaping @Sendable () async throws -> T) async throws -> T { + try await withUnsafeThrowingContinuation { continuation in + taskStreamContinuation.yield { + do { + continuation.resume(returning: try await task()) + } catch { + continuation.resume(throwing: error) + } + } + } + } + + // MARK: Private + + private let streamTask: Task + private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation +} diff --git a/Sources/AsyncQueue/AsyncQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift similarity index 90% rename from Sources/AsyncQueue/AsyncQueue.swift rename to Sources/AsyncQueue/FIFOQueue.swift index da611fe..f2fd812 100644 --- a/Sources/AsyncQueue/AsyncQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -20,8 +20,10 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -/// A queue that enables sending FIFO-ordered tasks from synchronous to asynchronous contexts -public final class AsyncQueue: Sendable { +/// A queue that executes asynchronous tasks enqueued from a nonisolated context in FIFO order. +/// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued. +/// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock. +public final class FIFOQueue: Sendable { // MARK: Initialization diff --git a/Sources/AsyncQueue/Semaphore.swift b/Sources/AsyncQueue/Semaphore.swift new file mode 100644 index 0000000..f595635 --- /dev/null +++ b/Sources/AsyncQueue/Semaphore.swift @@ -0,0 +1,57 @@ +// MIT License +// +// Copyright (c) 2022 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +/// A thread-safe semaphore implementation. +actor Semaphore { + func wait() async { + count -= 1 + guard count < 0 else { + // We don't need to wait because count is greater than or equal to zero. + return + } + + await withCheckedContinuation { continuation in + continuations.append(continuation) + } + } + + func signal() { + count += 1 + guard !isWaiting else { + // Continue waiting. + return + } + + for continuation in continuations { + continuation.resume() + } + + continuations.removeAll() + } + + var isWaiting: Bool { + count < 0 + } + + private var continuations = [CheckedContinuation]() + private var count = 0 +} diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift new file mode 100644 index 0000000..c4d955d --- /dev/null +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -0,0 +1,153 @@ +// MIT License +// +// Copyright (c) 2022 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +import XCTest + +@testable import AsyncQueue + +final class ActorQueueTests: XCTestCase { + + // MARK: XCTestCase + + override func setUp() async throws { + try await super.setUp() + + systemUnderTest = ActorQueue() + } + + // MARK: Behavior Tests + + func test_async_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + systemUnderTest.async { + await counter.incrementAndExpectCount(equals: iteration) + } + } + await systemUnderTest.await { /* Drain the queue */ } + } + + func test_async_startsExecutionOfNextTaskAfterSuspension() async { + let counter = Counter() + let semaphore = Semaphore() + for iteration in 1...1_000 { + systemUnderTest.async { + await counter.incrementAndExpectCount(equals: iteration) + await semaphore.wait() + } + } + systemUnderTest.async { + for _ in 1...1_000 { + await semaphore.signal() + } + } + await systemUnderTest.await { /* Drain the queue */ } + } + + func test_async_allowsReentrancy() async { + let counter = Counter() + await systemUnderTest.await { [systemUnderTest] in + await systemUnderTest.await { + await counter.incrementAndExpectCount(equals: 1) + } + await counter.incrementAndExpectCount(equals: 2) + } + await systemUnderTest.await { /* Drain the queue */ } + } + + @MainActor + func test_async_doesNotRetainTaskAfterExecution() async { + let expectation = self.expectation(description: #function) + final class Reference: Sendable {} + final class ReferenceHolder: @unchecked Sendable { + var reference: Reference? = Reference() + } + let referenceHolder = ReferenceHolder() + weak var weakReference = referenceHolder.reference + let asyncSemaphore = Semaphore() + let syncSemaphore = Semaphore() + systemUnderTest.async { [reference = referenceHolder.reference] in + // Now that we've started the task and captured the reference, release the synchronous code. + await syncSemaphore.signal() + // Wait for the synchronous setup to complete and the reference to be nil'd out. + await asyncSemaphore.wait() + // Retain the unsafe counter until the task is completed. + _ = reference + // Signal that this task is about to clean up. + expectation.fulfill() + } + // Wait for the asynchronous task to start. + await syncSemaphore.wait() + referenceHolder.reference = nil + XCTAssertNotNil(weakReference) + // Allow the enqueued task to complete. + await asyncSemaphore.signal() + // Make sure the task has completed. + waitForExpectations(timeout: 1.0) + // Wait a runloop to ensure the previously enqueued task has had time to deallocate. + DispatchQueue.main.async { + XCTAssertNil(weakReference) + } + } + + func test_await_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + systemUnderTest.async { + await counter.incrementAndExpectCount(equals: iteration) + } + + guard iteration % 25 == 0 else { + // Keep sending async events to the queue. + continue + } + + await systemUnderTest.await { + let count = await counter.count + XCTAssertEqual(count, iteration) + } + } + await systemUnderTest.await { /* Drain the queue */ } + } + + func test_await_canReturn() async { + let expectedValue = UUID() + let returnedValue = await systemUnderTest.await { expectedValue } + XCTAssertEqual(expectedValue, returnedValue) + } + + func test_await_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await systemUnderTest.await { throw expectedError } + } catch { + XCTAssertEqual(error as? TestError, expectedError) + } + } + + // MARK: Private + + private var systemUnderTest = ActorQueue() +} diff --git a/Tests/AsyncQueueTests/AsyncQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift similarity index 82% rename from Tests/AsyncQueueTests/AsyncQueueTests.swift rename to Tests/AsyncQueueTests/FIFOQueueTests.swift index c3dabdb..b032e58 100644 --- a/Tests/AsyncQueueTests/AsyncQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -24,14 +24,14 @@ import XCTest @testable import AsyncQueue -final class AsyncQueueTests: XCTestCase { +final class FIFOQueueTests: XCTestCase { // MARK: XCTestCase override func setUp() async throws { try await super.setUp() - systemUnderTest = AsyncQueue() + systemUnderTest = FIFOQueue() } // MARK: Behavior Tests @@ -80,7 +80,7 @@ final class AsyncQueueTests: XCTestCase { } func test_async_retainsReceiverUntilFlushed() async { - var systemUnderTest: AsyncQueue? = AsyncQueue() + var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() let expectation = self.expectation(description: #function) let semaphore = Semaphore() @@ -170,58 +170,5 @@ final class AsyncQueueTests: XCTestCase { // MARK: Private - private var systemUnderTest = AsyncQueue() - - // MARK: - Counter - - private actor Counter { - func incrementAndExpectCount(equals expectedCount: Int) { - increment() - XCTAssertEqual(expectedCount, count) - } - - func increment() { - count += 1 - } - - var count = 0 - } - - // MARK: - Semaphore - - private actor Semaphore { - - func wait() async { - count -= 1 - guard count < 0 else { - // We don't need to wait because count is greater than or equal to zero. - return - } - - await withCheckedContinuation { continuation in - continuations.append(continuation) - } - } - - func signal() { - count += 1 - guard !isWaiting else { - // Continue waiting. - return - } - - for continuation in continuations { - continuation.resume() - } - - continuations.removeAll() - } - - var isWaiting: Bool { - count < 0 - } - - private var continuations = [CheckedContinuation]() - private var count = 0 - } + private var systemUnderTest = FIFOQueue() } diff --git a/Tests/AsyncQueueTests/Utilities/Counter.swift b/Tests/AsyncQueueTests/Utilities/Counter.swift new file mode 100644 index 0000000..5a01e74 --- /dev/null +++ b/Tests/AsyncQueueTests/Utilities/Counter.swift @@ -0,0 +1,36 @@ +// MIT License +// +// Copyright (c) 2022 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +import XCTest + +actor Counter { + func incrementAndExpectCount(equals expectedCount: Int) { + increment() + XCTAssertEqual(expectedCount, count) + } + + func increment() { + count += 1 + } + + var count = 0 +} From 9511ffeaab7ea55847adf7f95488cac081ccefb6 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Wed, 23 Nov 2022 22:15:14 -0800 Subject: [PATCH 02/39] Create SemaphoreTests.swift to up coverage --- Tests/AsyncQueueTests/SemaphoreTests.swift | 91 ++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 Tests/AsyncQueueTests/SemaphoreTests.swift diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift new file mode 100644 index 0000000..3e1a478 --- /dev/null +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -0,0 +1,91 @@ +// MIT License +// +// Copyright (c) 2022 Dan Federman +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +import XCTest + +@testable import AsyncQueue + +final class SemaphoreTests: XCTestCase { + + // MARK: XCTestCase + + override func setUp() async throws { + try await super.setUp() + + systemUnderTest = Semaphore() + } + + override func tearDown() async throws { + try await super.tearDown() + + while await systemUnderTest.isWaiting { + await systemUnderTest.signal() + } + } + + // MARK: Behavior Tests + + func test_wait_suspendsUntilEqualNumberOfSignalCalls() async { + let counter = Counter() + let iterationCount = 1_000 + + var waits = [Task]() + for _ in 1...iterationCount { + waits.append(Task { + await self.systemUnderTest.wait() + await counter.increment() + }) + } + + var signals = [Task]() + for _ in 0..<(iterationCount-1) { + signals.append(Task { + await self.systemUnderTest.signal() + let count = await counter.count + XCTAssertEqual(0, count) + }) + } + + for signal in signals { + await signal.value + } + + await self.systemUnderTest.signal() + + for wait in waits { + await wait.value + } + + let count = await counter.count + XCTAssertEqual(iterationCount, count) + } + + func test_wait_doesNotSuspendIfSignalCalledFirst() async { + await systemUnderTest.signal() + await systemUnderTest.wait() + // If the test doesn't hang forever, we've succeeded! + } + + // MARK: Private + + private var systemUnderTest = Semaphore() +} From 0070e39cdbf4a1246cc2825b3fd78837638eeace Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Thu, 24 Nov 2022 06:54:03 -0800 Subject: [PATCH 03/39] Simplify --- Sources/AsyncQueue/ActorQueue.swift | 25 +++++++++++++------------ Sources/AsyncQueue/Semaphore.swift | 4 ++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index fe59c64..5f35ec2 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -42,28 +42,29 @@ public final class ActorQueue: Sendable { streamTask = Task.detached(priority: priority) { actor ActorExecutor { - func seriallyExecute(_ task: @escaping @Sendable () async -> Void) async { + func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { let semaphore = Semaphore() - Task { - await self.execute(task, afterSignaling: semaphore) - } - // Wait for the task to start. + executeWithoutWaiting(task, afterSignaling: semaphore) + // Suspend the calling code until our enqueued task starts. await semaphore.wait() } - private func execute( + private func executeWithoutWaiting( _ task: @escaping @Sendable () async -> Void, - afterSignaling semaphore: Semaphore - ) async { - // Signal that the task has started. - await semaphore.signal() - await task() + afterSignaling semaphore: Semaphore) + { + // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. + Task { + // Now that we're back within the serial Actor context, signal that the task has started. + await semaphore.signal() + await task() + } } } let executor = ActorExecutor() for await task in taskStream { - await executor.seriallyExecute(task) + await executor.suspendUntilStarted(task) } } } diff --git a/Sources/AsyncQueue/Semaphore.swift b/Sources/AsyncQueue/Semaphore.swift index f595635..811169d 100644 --- a/Sources/AsyncQueue/Semaphore.swift +++ b/Sources/AsyncQueue/Semaphore.swift @@ -29,7 +29,7 @@ actor Semaphore { return } - await withCheckedContinuation { continuation in + await withUnsafeContinuation { continuation in continuations.append(continuation) } } @@ -52,6 +52,6 @@ actor Semaphore { count < 0 } - private var continuations = [CheckedContinuation]() + private var continuations = [UnsafeContinuation]() private var count = 0 } From 67910a774beffb654f53ead8cc30c9e5d68b33cf Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Thu, 24 Nov 2022 07:11:12 -0800 Subject: [PATCH 04/39] Get to 100% coverage --- Sources/AsyncQueue/ActorQueue.swift | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 5f35ec2..eebf3d2 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -35,10 +35,9 @@ public final class ActorQueue: Sendable { let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in capturedTaskStreamContinuation = continuation } - guard let capturedTaskStreamContinuation else { - fatalError("Continuation not captured during stream creation!") - } - taskStreamContinuation = capturedTaskStreamContinuation + // Continuation will be captured during stream creation, so it is safe to force unwrap here. + // If this force-unwrap fails, something is fundamentally broken in the Swift runtime. + taskStreamContinuation = capturedTaskStreamContinuation! streamTask = Task.detached(priority: priority) { actor ActorExecutor { From 0565678b069a5c6d195113e261ecf449d626cbfa Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 07:17:49 -0800 Subject: [PATCH 05/39] Enable sending synchronous tasks to the ActorQueue --- Sources/AsyncQueue/ActorQueue.swift | 113 ++++++++++++++------ Tests/AsyncQueueTests/ActorQueueTests.swift | 31 ++++++ 2 files changed, 113 insertions(+), 31 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index eebf3d2..ea119b8 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -31,36 +31,15 @@ public final class ActorQueue: Sendable { /// Instantiates an asynchronous queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. public init(priority: TaskPriority? = nil) { - var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil - let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in + var capturedTaskStreamContinuation: AsyncStream.Continuation? = nil + let taskStream = AsyncStream { continuation in capturedTaskStreamContinuation = continuation } // Continuation will be captured during stream creation, so it is safe to force unwrap here. // If this force-unwrap fails, something is fundamentally broken in the Swift runtime. taskStreamContinuation = capturedTaskStreamContinuation! - streamTask = Task.detached(priority: priority) { - actor ActorExecutor { - func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { - let semaphore = Semaphore() - executeWithoutWaiting(task, afterSignaling: semaphore) - // Suspend the calling code until our enqueued task starts. - await semaphore.wait() - } - - private func executeWithoutWaiting( - _ task: @escaping @Sendable () async -> Void, - afterSignaling semaphore: Semaphore) - { - // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. - Task { - // Now that we're back within the serial Actor context, signal that the task has started. - await semaphore.signal() - await task() - } - } - } - + Task.detached(priority: priority) { let executor = ActorExecutor() for await task in taskStream { await executor.suspendUntilStarted(task) @@ -78,7 +57,7 @@ public final class ActorQueue: Sendable { /// The schedueled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. public func async(_ task: @escaping @Sendable () async -> Void) { - taskStreamContinuation.yield(task) + taskStreamContinuation.yield(.async(task)) } /// Schedules an asynchronous throwing task and returns after the task is complete. @@ -87,9 +66,9 @@ public final class ActorQueue: Sendable { /// - Returns: The value returned from the enqueued task. public func await(_ task: @escaping @Sendable () async -> T) async -> T { await withUnsafeContinuation { continuation in - taskStreamContinuation.yield { + taskStreamContinuation.yield(.async({ continuation.resume(returning: await task()) - } + })) } } @@ -99,18 +78,90 @@ public final class ActorQueue: Sendable { /// - Returns: The value returned from the enqueued task. public func await(_ task: @escaping @Sendable () async throws -> T) async throws -> T { try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield { + taskStreamContinuation.yield(.async({ do { continuation.resume(returning: try await task()) } catch { continuation.resume(throwing: error) } - } + })) + } + } + + /// Schedules a synchronous task for execution and immediately returns. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + public func async(_ task: @escaping @Sendable () -> Void) { + taskStreamContinuation.yield(.sync(task)) + } + + /// Schedules an throwing task and returns after the task is complete. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + /// - Returns: The value returned from the enqueued task. + public func await(_ task: @escaping @Sendable () -> T) async -> T { + await withUnsafeContinuation { continuation in + taskStreamContinuation.yield(.sync({ + continuation.resume(returning: task()) + })) + } + } + + /// Schedules an task and returns after the task is complete. + /// The schedueled task will not execute until all prior tasks have completed or suspended. + /// - Parameter task: The task to enqueue. + /// - Returns: The value returned from the enqueued task. + public func await(_ task: @escaping @Sendable () throws -> T) async throws -> T { + try await withUnsafeThrowingContinuation { continuation in + taskStreamContinuation.yield(.sync({ + do { + continuation.resume(returning: try task()) + } catch { + continuation.resume(throwing: error) + } + })) } } // MARK: Private - private let streamTask: Task - private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation + private let taskStreamContinuation: AsyncStream.Continuation + + // MARK: - TaskType + + enum TaskType { + case sync(@Sendable () -> Void) + case async(@Sendable () async -> Void) + } + + // MARK: - ActorExecutor + + private actor ActorExecutor { + func suspendUntilStarted(_ task: TaskType) async { + let semaphore = Semaphore() + executeWithoutWaiting(task, afterSignaling: semaphore) + // Suspend the calling code until our enqueued task starts. + await semaphore.wait() + } + + private func executeWithoutWaiting( + _ task: TaskType, + afterSignaling semaphore: Semaphore) + { + // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. + Task { + switch task { + case let .sync(task): + task() + // Synchronous tasks can not re-enter this queue, so it is safe to wait until the task completes prior to signaling the semaphore. + await semaphore.signal() + case let .async(task): + // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. + await semaphore.signal() + await task() + } + } + } + } + } diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index c4d955d..7186269 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -46,6 +46,37 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } + func test_async_sendsSynchronousEventsInOrder() async { + final class SynchronousCounter: @unchecked Sendable { + func incrementAndExpectCount(equals expectedCount: Int) { + increment() + XCTAssertEqual(expectedCount, count) + } + + func increment() { + count += 1 + } + + var count: Int { + set { + dispatchQueue.async { self.unsafeCount = newValue } + } + get { + dispatchQueue.sync { unsafeCount } + } + } + var unsafeCount = 0 + let dispatchQueue = DispatchQueue(label: #function) + } + let counter = SynchronousCounter() + for iteration in 1...1_000 { + systemUnderTest.async { + counter.incrementAndExpectCount(equals: iteration) + } + } + await systemUnderTest.await { /* Drain the queue */ } + } + func test_async_startsExecutionOfNextTaskAfterSuspension() async { let counter = Counter() let semaphore = Semaphore() From 17894cc8e356de009fe93d3855715d47ab0b4794 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 07:27:10 -0800 Subject: [PATCH 06/39] Improve explanation of ActorQueue's ordering guarantee --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fe2d1d9..2efaa81 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Task { ### Sending ordered asynchronous tasks to Actors -Use an `ActorQueue` queue to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin _but not end_ executing in the order in which they are enqueued. +Use an `ActorQueue` queue to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code. ```swift let queue = ActorQueue() From 86f564f3d99ec17c4c2438e798f917f828c28008 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 12:47:40 -0800 Subject: [PATCH 07/39] The queues are the antecedent Co-authored-by: Michael Bachand --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 643e5aa..2589ec2 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Swift Package Manager compatible](https://img.shields.io/badge/SPM-compatible-4BC51D.svg?style=flat)](https://github.com/apple/swift-package-manager) [![codecov](https://codecov.io/gh/dfed/swift-async-queue/branch/main/graph/badge.svg?token=nZBHcZZ63F)](https://codecov.io/gh/dfed/swift-async-queue) -A library of queues that enables sending ordered tasks from synchronous to asynchronous contexts. +A library of queues that enable sending ordered tasks from synchronous to asynchronous contexts. ## Task Ordering and Swift Concurrency From 96b497f09bed9db5bd1a3c4242612e39e41e2191 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 12:48:00 -0800 Subject: [PATCH 08/39] Improve accuracy of ActorQueue description in README Co-authored-by: Michael Bachand --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2589ec2..fc8ebc7 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Task { await queue.await { /* `async` context that can return a value or throw an error. - Executes after all other enqueued work has completed or suspended. + Executes after all other enqueued work has begun executing. Work enqueued after this task will wait for this task to complete or suspend. */ } From e5a7f55787871ac2de33b7cf30c23e32283ab16e Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 12:49:08 -0800 Subject: [PATCH 09/39] ActorQueue does not need to be Sendable --- Sources/AsyncQueue/ActorQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 351d05a..e7cd950 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -24,7 +24,7 @@ /// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow tasks that were enqueued to begin executing. /// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends. /// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance. -public final class ActorQueue: Sendable { +public final class ActorQueue { // MARK: Initialization From 6fa22defed007b8487ace0ed1ad6d77f21c92245 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 12:50:22 -0800 Subject: [PATCH 10/39] Update comments --- Sources/AsyncQueue/ActorQueue.swift | 6 +++--- Sources/AsyncQueue/FIFOQueue.swift | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index e7cd950..f08171b 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -28,7 +28,7 @@ public final class ActorQueue { // MARK: Initialization - /// Instantiates an asynchronous queue. + /// Instantiates an actor queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. public init(priority: TaskPriority? = nil) { var capturedTaskStreamContinuation: AsyncStream.Continuation? = nil @@ -60,7 +60,7 @@ public final class ActorQueue { taskStreamContinuation.yield(.async(task)) } - /// Schedules an asynchronous throwing task and returns after the task is complete. + /// Schedules an asynchronous task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. @@ -72,7 +72,7 @@ public final class ActorQueue { } } - /// Schedules an asynchronous task and returns after the task is complete. + /// Schedules an asynchronous throwing task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index 083bfbe..689d7ac 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -27,7 +27,7 @@ public final class FIFOQueue: Sendable { // MARK: Initialization - /// Instantiates an asynchronous queue. + /// Instantiates a FIFO queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. public init(priority: TaskPriority? = nil) { var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil @@ -58,7 +58,7 @@ public final class FIFOQueue: Sendable { taskStreamContinuation.yield(task) } - /// Schedules an asynchronous throwing task and returns after the task is complete. + /// Schedules an asynchronous task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. @@ -70,7 +70,7 @@ public final class FIFOQueue: Sendable { } } - /// Schedules an asynchronous task and returns after the task is complete. + /// Schedules an asynchronous throwing task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. From 1f656dbfc9893dfce90cc5f325c36872972d30d3 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 12:51:30 -0800 Subject: [PATCH 11/39] documentation copy/pasta fix --- Sources/AsyncQueue/ActorQueue.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index f08171b..69100a6 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -95,7 +95,7 @@ public final class ActorQueue { taskStreamContinuation.yield(.sync(task)) } - /// Schedules an throwing task and returns after the task is complete. + /// Schedules a task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. @@ -107,7 +107,7 @@ public final class ActorQueue { } } - /// Schedules an task and returns after the task is complete. + /// Schedules a throwing task and returns after the task is complete. /// The scheduled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. /// - Returns: The value returned from the enqueued task. From 64f00e371d2f48bf0076af89310b8676cadd7638 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 28 Nov 2022 13:47:42 -0800 Subject: [PATCH 12/39] Disable sending synchronous tasks to the ActorQueue --- Sources/AsyncQueue/ActorQueue.swift | 75 ++++----------------- Tests/AsyncQueueTests/ActorQueueTests.swift | 31 --------- 2 files changed, 13 insertions(+), 93 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 69100a6..81865e1 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -31,8 +31,8 @@ public final class ActorQueue { /// Instantiates an actor queue. /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. public init(priority: TaskPriority? = nil) { - var capturedTaskStreamContinuation: AsyncStream.Continuation? = nil - let taskStream = AsyncStream { continuation in + var capturedTaskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation? = nil + let taskStream = AsyncStream<@Sendable () async -> Void> { continuation in capturedTaskStreamContinuation = continuation } // Continuation will be captured during stream creation, so it is safe to force unwrap here. @@ -57,7 +57,7 @@ public final class ActorQueue { /// The scheduled task will not execute until all prior tasks have completed or suspended. /// - Parameter task: The task to enqueue. public func async(_ task: @escaping @Sendable () async -> Void) { - taskStreamContinuation.yield(.async(task)) + taskStreamContinuation.yield(task) } /// Schedules an asynchronous task and returns after the task is complete. @@ -66,9 +66,9 @@ public final class ActorQueue { /// - Returns: The value returned from the enqueued task. public func await(_ task: @escaping @Sendable () async -> T) async -> T { await withUnsafeContinuation { continuation in - taskStreamContinuation.yield(.async({ + taskStreamContinuation.yield { continuation.resume(returning: await task()) - })) + } } } @@ -78,66 +78,24 @@ public final class ActorQueue { /// - Returns: The value returned from the enqueued task. public func await(_ task: @escaping @Sendable () async throws -> T) async throws -> T { try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield(.async({ + taskStreamContinuation.yield { do { continuation.resume(returning: try await task()) } catch { continuation.resume(throwing: error) } - })) - } - } - - /// Schedules a synchronous task for execution and immediately returns. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - public func async(_ task: @escaping @Sendable () -> Void) { - taskStreamContinuation.yield(.sync(task)) - } - - /// Schedules a task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func await(_ task: @escaping @Sendable () -> T) async -> T { - await withUnsafeContinuation { continuation in - taskStreamContinuation.yield(.sync({ - continuation.resume(returning: task()) - })) - } - } - - /// Schedules a throwing task and returns after the task is complete. - /// The scheduled task will not execute until all prior tasks have completed or suspended. - /// - Parameter task: The task to enqueue. - /// - Returns: The value returned from the enqueued task. - public func await(_ task: @escaping @Sendable () throws -> T) async throws -> T { - try await withUnsafeThrowingContinuation { continuation in - taskStreamContinuation.yield(.sync({ - do { - continuation.resume(returning: try task()) - } catch { - continuation.resume(throwing: error) - } - })) + } } } // MARK: Private - private let taskStreamContinuation: AsyncStream.Continuation - - // MARK: - TaskType - - enum TaskType { - case sync(@Sendable () -> Void) - case async(@Sendable () async -> Void) - } + private let taskStreamContinuation: AsyncStream<@Sendable () async -> Void>.Continuation // MARK: - ActorExecutor private actor ActorExecutor { - func suspendUntilStarted(_ task: TaskType) async { + func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { let semaphore = Semaphore() executeWithoutWaiting(task, afterSignaling: semaphore) // Suspend the calling code until our enqueued task starts. @@ -145,21 +103,14 @@ public final class ActorQueue { } private func executeWithoutWaiting( - _ task: TaskType, + _ task: @escaping @Sendable () async -> Void, afterSignaling semaphore: Semaphore) { // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. Task { - switch task { - case let .sync(task): - task() - // Synchronous tasks can not re-enter this queue, so it is safe to wait until the task completes prior to signaling the semaphore. - await semaphore.signal() - case let .async(task): - // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. - await semaphore.signal() - await task() - } + // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. + await semaphore.signal() + await task() } } } diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 7186269..c4d955d 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -46,37 +46,6 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - func test_async_sendsSynchronousEventsInOrder() async { - final class SynchronousCounter: @unchecked Sendable { - func incrementAndExpectCount(equals expectedCount: Int) { - increment() - XCTAssertEqual(expectedCount, count) - } - - func increment() { - count += 1 - } - - var count: Int { - set { - dispatchQueue.async { self.unsafeCount = newValue } - } - get { - dispatchQueue.sync { unsafeCount } - } - } - var unsafeCount = 0 - let dispatchQueue = DispatchQueue(label: #function) - } - let counter = SynchronousCounter() - for iteration in 1...1_000 { - systemUnderTest.async { - counter.incrementAndExpectCount(equals: iteration) - } - } - await systemUnderTest.await { /* Drain the queue */ } - } - func test_async_startsExecutionOfNextTaskAfterSuspension() async { let counter = Counter() let semaphore = Semaphore() From 583ca33164c5145d83651ead513bb6e54fa4bab8 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 3 Dec 2022 15:17:26 -0800 Subject: [PATCH 13/39] Better explain how test_async_startsExecutionOfNextTaskAfterSuspension works + simplify test --- Tests/AsyncQueueTests/ActorQueueTests.swift | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index c4d955d..5239cf0 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -47,18 +47,15 @@ final class ActorQueueTests: XCTestCase { } func test_async_startsExecutionOfNextTaskAfterSuspension() async { - let counter = Counter() let semaphore = Semaphore() - for iteration in 1...1_000 { - systemUnderTest.async { - await counter.incrementAndExpectCount(equals: iteration) - await semaphore.wait() - } + systemUnderTest.async { + await semaphore.wait() } systemUnderTest.async { - for _ in 1...1_000 { - await semaphore.signal() - } + // Signal the semaphore from the actor queue. + // If the actor queue were FIFO, this test would hang since this code would never execute: + // we'd still be waiting for the prior `wait()` tasks to finish. + await semaphore.signal() } await systemUnderTest.await { /* Drain the queue */ } } From b313127e5d0f33ab9be0f18a2665f42dd99f9aa9 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 3 Dec 2022 15:48:44 -0800 Subject: [PATCH 14/39] Do not wait a runloop to test --- Tests/AsyncQueueTests/ActorQueueTests.swift | 31 +++++++++++++-------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 5239cf0..c9168c5 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -73,15 +73,23 @@ final class ActorQueueTests: XCTestCase { @MainActor func test_async_doesNotRetainTaskAfterExecution() async { - let expectation = self.expectation(description: #function) final class Reference: Sendable {} final class ReferenceHolder: @unchecked Sendable { - var reference: Reference? = Reference() + init() { + reference = Reference() + weakReference = reference + } + private(set) var reference: Reference? + private(set) weak var weakReference: Reference? + + func clearReference() { + reference = nil + } } let referenceHolder = ReferenceHolder() - weak var weakReference = referenceHolder.reference let asyncSemaphore = Semaphore() let syncSemaphore = Semaphore() + let asyncTaskCompletingExpectation = expectation(description: "async task completing") systemUnderTest.async { [reference = referenceHolder.reference] in // Now that we've started the task and captured the reference, release the synchronous code. await syncSemaphore.signal() @@ -89,21 +97,22 @@ final class ActorQueueTests: XCTestCase { await asyncSemaphore.wait() // Retain the unsafe counter until the task is completed. _ = reference - // Signal that this task is about to clean up. - expectation.fulfill() + self.systemUnderTest.async { + // Signal that this task has cleaned up. + // This closure will not execute until the prior closure completes. + asyncTaskCompletingExpectation.fulfill() + } } // Wait for the asynchronous task to start. await syncSemaphore.wait() - referenceHolder.reference = nil - XCTAssertNotNil(weakReference) + referenceHolder.clearReference() + XCTAssertNotNil(referenceHolder.weakReference) // Allow the enqueued task to complete. await asyncSemaphore.signal() // Make sure the task has completed. waitForExpectations(timeout: 1.0) - // Wait a runloop to ensure the previously enqueued task has had time to deallocate. - DispatchQueue.main.async { - XCTAssertNil(weakReference) - } + + XCTAssertNil(referenceHolder.weakReference) } func test_await_sendsEventsInOrder() async { From 2539caeeceac665a8f913af4a70737e3a84ae7c8 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 3 Dec 2022 15:50:15 -0800 Subject: [PATCH 15/39] further test cleanup --- Tests/AsyncQueueTests/ActorQueueTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index c9168c5..e78868f 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -89,7 +89,7 @@ final class ActorQueueTests: XCTestCase { let referenceHolder = ReferenceHolder() let asyncSemaphore = Semaphore() let syncSemaphore = Semaphore() - let asyncTaskCompletingExpectation = expectation(description: "async task completing") + let expectation = self.expectation(description: #function) systemUnderTest.async { [reference = referenceHolder.reference] in // Now that we've started the task and captured the reference, release the synchronous code. await syncSemaphore.signal() @@ -100,7 +100,7 @@ final class ActorQueueTests: XCTestCase { self.systemUnderTest.async { // Signal that this task has cleaned up. // This closure will not execute until the prior closure completes. - asyncTaskCompletingExpectation.fulfill() + expectation.fulfill() } } // Wait for the asynchronous task to start. From 2dc1d8c2de8cf8f3563751f37addb39a64ead09e Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sun, 4 Dec 2022 22:24:07 -0800 Subject: [PATCH 16/39] Write comments to explain the test --- Tests/AsyncQueueTests/SemaphoreTests.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index 3e1a478..2726728 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -57,6 +57,8 @@ final class SemaphoreTests: XCTestCase { } var signals = [Task]() + // Loop one fewer than iterationCount. + // The count will be zero each time because we haven't `signal`ed `iterationCount` times yet. for _ in 0..<(iterationCount-1) { signals.append(Task { await self.systemUnderTest.signal() @@ -65,16 +67,22 @@ final class SemaphoreTests: XCTestCase { }) } + // Wait for every looped `signal` task above to complete before we signal the final time. + // If we didn't wait here, we could introduce a race condition that would lead the above `XCTAssertEqual` to fail. for signal in signals { await signal.value } + // Signal one more time, matching the number of `wait`s above. await self.systemUnderTest.signal() + // Now that we have a matching number of `signal`s to the number of enqueued `wait`s, we can await the completion of every wait task. + // Waiting for the `waits` prior to now would have deadlocked. for wait in waits { await wait.value } + // Now that we've executed a matching number of `wait` and `signal` calls, the counter will have been incremented `iterationCount` times. let count = await counter.count XCTAssertEqual(iterationCount, count) } From 2863b37a580ee373cec3887af3a9a09be42fd41c Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Mon, 5 Dec 2022 08:20:49 -0800 Subject: [PATCH 17/39] Make wait() return whether it suspended --- Sources/AsyncQueue/Semaphore.swift | 9 +++++++-- Tests/AsyncQueueTests/SemaphoreTests.swift | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/Sources/AsyncQueue/Semaphore.swift b/Sources/AsyncQueue/Semaphore.swift index 811169d..4a46d00 100644 --- a/Sources/AsyncQueue/Semaphore.swift +++ b/Sources/AsyncQueue/Semaphore.swift @@ -22,18 +22,23 @@ /// A thread-safe semaphore implementation. actor Semaphore { - func wait() async { + /// Decrement the counting semaphore. If the resulting value is less than zero, this function waits for a signal to occur before returning. + /// - Returns: Whether the call triggered a suspension + @discardableResult + func wait() async -> Bool { count -= 1 guard count < 0 else { // We don't need to wait because count is greater than or equal to zero. - return + return false } await withUnsafeContinuation { continuation in continuations.append(continuation) } + return true } + /// Increment the counting semaphore. If the previous value was less than zero, this function resumes a waiting thread before returning. func signal() { count += 1 guard !isWaiting else { diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index 2726728..1b5c88e 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -89,8 +89,8 @@ final class SemaphoreTests: XCTestCase { func test_wait_doesNotSuspendIfSignalCalledFirst() async { await systemUnderTest.signal() - await systemUnderTest.wait() - // If the test doesn't hang forever, we've succeeded! + let didSuspend = await systemUnderTest.wait() + XCTAssertFalse(didSuspend) } // MARK: Private From 8de9d144e56cde36aadae92425193c24827fb471 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Tue, 6 Dec 2022 08:55:32 -0800 Subject: [PATCH 18/39] Improve test_wait_suspendsUntilEqualNumberOfSignalCalls --- Tests/AsyncQueueTests/SemaphoreTests.swift | 83 ++++++++++++++-------- 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index 1b5c88e..9268f2b 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -45,46 +45,73 @@ final class SemaphoreTests: XCTestCase { // MARK: Behavior Tests func test_wait_suspendsUntilEqualNumberOfSignalCalls() async { - let counter = Counter() + actor CountingExecutor { + func enqueue(andCount incrementOnCompletion: Bool = false, _ task: @escaping @Sendable () async -> Void) async { + await withCheckedContinuation { continuation in + // Re-enter the actor context but don't wait for the result. + Task { + // Now that we're back in the actor context, resume. + continuation.resume() + await task() + if incrementOnCompletion { + incrementTasksCompleted() + } + } + } + } + + func execute(_ task: @Sendable () async -> Void) async { + await task() + } + + var countedTasksCompleted = 0 + + private func incrementTasksCompleted() { + countedTasksCompleted += 1 + } + } + let executor = CountingExecutor() let iterationCount = 1_000 - var waits = [Task]() for _ in 1...iterationCount { - waits.append(Task { - await self.systemUnderTest.wait() - await counter.increment() - }) + await executor.enqueue(andCount: true) { + let didSuspend = await self.systemUnderTest.wait() + XCTAssertTrue(didSuspend) + + // Signal without waiting that our prior wait completed. + Task { + await self.systemUnderTest.signal() + } + } } - var signals = [Task]() // Loop one fewer than iterationCount. - // The count will be zero each time because we haven't `signal`ed `iterationCount` times yet. + // The count will be zero each time because we have more `wait` than `signal` calls. for _ in 0..<(iterationCount-1) { - signals.append(Task { + await executor.enqueue { await self.systemUnderTest.signal() - let count = await counter.count - XCTAssertEqual(0, count) - }) - } - - // Wait for every looped `signal` task above to complete before we signal the final time. - // If we didn't wait here, we could introduce a race condition that would lead the above `XCTAssertEqual` to fail. - for signal in signals { - await signal.value + // Enqueue a task to check the completed task count to give the suspended tasks above time to resume (if they were to resume, which they won't). + await executor.enqueue { + let completedCountedTasks = await executor.countedTasksCompleted + XCTAssertEqual(completedCountedTasks, 0) + } + } } - // Signal one more time, matching the number of `wait`s above. - await self.systemUnderTest.signal() + await executor.execute { + // Signal one last time, enabling all of the original `wait` calls to resume. + await self.systemUnderTest.signal() - // Now that we have a matching number of `signal`s to the number of enqueued `wait`s, we can await the completion of every wait task. - // Waiting for the `waits` prior to now would have deadlocked. - for wait in waits { - await wait.value + for _ in 1...iterationCount { + // Wait for all enqueued `wait`s to have completed and signaled their completion. + await self.systemUnderTest.wait() + } + // Enqueue a task to check the completed task count to give the suspended tasks above time to resume. + await executor.enqueue { + let tasksCompleted = await executor.countedTasksCompleted + XCTAssertEqual(iterationCount, tasksCompleted) + } } - - // Now that we've executed a matching number of `wait` and `signal` calls, the counter will have been incremented `iterationCount` times. - let count = await counter.count - XCTAssertEqual(iterationCount, count) } func test_wait_doesNotSuspendIfSignalCalledFirst() async { From d1a55600431fc690cd4001acac76a60c4c0290aa Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Wed, 7 Dec 2022 07:58:01 -0800 Subject: [PATCH 19/39] Better comments --- Tests/AsyncQueueTests/SemaphoreTests.swift | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index 9268f2b..e6f841c 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -45,12 +45,29 @@ final class SemaphoreTests: XCTestCase { // MARK: Behavior Tests func test_wait_suspendsUntilEqualNumberOfSignalCalls() async { + /* + This test is tricky to pull off! + Our requirements: + 1. We need to call `wait()` before `signal()` + 2. We need to ensure that the `wait()` call suspends _before_ we call `signal()` + 3. We can't `await` the `wait()` call before calling `signal()` since that would effectively deadlock the test. + + In order to ensure that we are executing the `wait()` calls before we call `signal()` _without awaiting a `wait()` call_, + we utilize an Actor (which has inherently ordered execution) to enqueue ordered `Task`s. We make calls to this actor + `await` the beginning of the `Task` to ensure that each `Task` has begun before resuming the test's execution. + */ + actor CountingExecutor { + /// Enqueues an asynchronous task. This method suspends the caller until the asynchronous task has begun, ensuring ordered execution of enqueued tasks. + /// - Parameters: + /// - incrementOnCompletion: Whether to increment `countedTasksCompleted` once the `task` completes. + /// - task: A unit of work. func enqueue(andCount incrementOnCompletion: Bool = false, _ task: @escaping @Sendable () async -> Void) async { + // Await the start of the soon-to-be-enqueued `Task` with a continuation. await withCheckedContinuation { continuation in // Re-enter the actor context but don't wait for the result. Task { - // Now that we're back in the actor context, resume. + // Now that we're back in the actor context, resume the calling code. continuation.resume() await task() if incrementOnCompletion { @@ -78,7 +95,8 @@ final class SemaphoreTests: XCTestCase { let didSuspend = await self.systemUnderTest.wait() XCTAssertTrue(didSuspend) - // Signal without waiting that our prior wait completed. + // Signal without waiting that the suspended wait call above has resumed. + // This signal allows us to `wait()` for all of these enqueued `wait()` tasks to have completed later in this test. Task { await self.systemUnderTest.signal() } From 744db4a39befd17ec1e9f05debaa111bfc0c607a Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Thu, 8 Dec 2022 08:34:29 -0800 Subject: [PATCH 20/39] Better README documentation --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index fc8ebc7..03ede92 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,12 @@ queue.async { `async` context that executes after all other enqueued work is completed. Work enqueued after this task will wait for this task to complete. */ + try? await Task.sleep(nanoseconds: 1_000_000) +} +queue.async { + /* + This task begins execution once the above one-second sleep completes. + */ } Task { await queue.await { @@ -63,6 +69,12 @@ queue.async { `async` context that executes after all other enqueued work has begun executing. Work enqueued after this task will wait for this task to complete or suspend. */ + try? await Task.sleep(nanoseconds: 1_000_000) +} +queue.async { + /* + This task begins execution once the above task suspends due to the one-second sleep. + */ } Task { await queue.await { @@ -75,6 +87,35 @@ Task { } ``` +In practice, an `ActorQueue` should be utilized with a single instance of an `actor` type to bridge nonisolated and isolated contexts: +``` +public actor LogStore { + + // MARK: Public + + nonisolated + public func log(_ message: String) { + queue.async { + await self.append(message) + } + } + + nonisolated + public func retrieveLogs() async -> [String] { + await queue.await { await self.logs } + } + + // MARK: Private + + private func append(_ message: String) { + logs.append(message) + } + + private let queue = ActorQueue() + private var logs = [String]() +} +``` + ## Requirements * Xcode 14.1 or later. From 031fb5b46d53dd65f96684959fcedde6da6c28df Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Thu, 8 Dec 2022 08:37:42 -0800 Subject: [PATCH 21/39] Remove duplicative label 'queue' from README discussion --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 03ede92..a30db6f 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ While [actors](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html# ### Executing asynchronous tasks in FIFO order -Use a `FIFOQueue` queue to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued. +Use a `FIFOQueue` to execute asynchronous tasks enqueued from a nonisolated context in FIFO order. Tasks sent to one of these queues are guaranteed to begin _and end_ executing in the order in which they are enqueued. ```swift let queue = FIFOQueue() @@ -60,7 +60,7 @@ Task { ### Sending ordered asynchronous tasks to Actors -Use an `ActorQueue` queue to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code. +Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code. ```swift let queue = ActorQueue() From 63fa6856df1fdd47d7eff1c44b432617cd66e972 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Thu, 8 Dec 2022 10:07:49 -0800 Subject: [PATCH 22/39] Eliminate race in test_wait_suspendsUntilEqualNumberOfSignalCalls() --- Tests/AsyncQueueTests/SemaphoreTests.swift | 57 ++++++++++------------ 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index e6f841c..e21d1e9 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -59,64 +59,61 @@ final class SemaphoreTests: XCTestCase { actor CountingExecutor { /// Enqueues an asynchronous task. This method suspends the caller until the asynchronous task has begun, ensuring ordered execution of enqueued tasks. - /// - Parameters: - /// - incrementOnCompletion: Whether to increment `countedTasksCompleted` once the `task` completes. - /// - task: A unit of work. - func enqueue(andCount incrementOnCompletion: Bool = false, _ task: @escaping @Sendable () async -> Void) async { + /// - Parameter task: A unit of work that returns work to execute after the task completes and the count is incremented. + func enqueueAndCount(_ task: @escaping @Sendable () async -> (() async -> Void)?) async { // Await the start of the soon-to-be-enqueued `Task` with a continuation. await withCheckedContinuation { continuation in // Re-enter the actor context but don't wait for the result. Task { // Now that we're back in the actor context, resume the calling code. continuation.resume() - await task() - if incrementOnCompletion { - incrementTasksCompleted() - } + let executeAfterIncrement = await task() + countedTasksCompleted += 1 + await executeAfterIncrement?() } } } - func execute(_ task: @Sendable () async -> Void) async { - await task() + func execute(_ task: @Sendable () async throws -> Void) async rethrows { + try await task() } var countedTasksCompleted = 0 - - private func incrementTasksCompleted() { - countedTasksCompleted += 1 - } } + let executor = CountingExecutor() let iterationCount = 1_000 for _ in 1...iterationCount { - await executor.enqueue(andCount: true) { + await executor.enqueueAndCount { let didSuspend = await self.systemUnderTest.wait() XCTAssertTrue(didSuspend) - // Signal without waiting that the suspended wait call above has resumed. - // This signal allows us to `wait()` for all of these enqueued `wait()` tasks to have completed later in this test. - Task { + return { + // Signal that the suspended wait call above has resumed. + // This signal allows us to `wait()` for all of these enqueued `wait()` tasks to have completed later in this test. await self.systemUnderTest.signal() } } } // Loop one fewer than iterationCount. - // The count will be zero each time because we have more `wait` than `signal` calls. - for _ in 0..<(iterationCount-1) { - await executor.enqueue { + for _ in 1.. Date: Thu, 8 Dec 2022 18:22:15 -1000 Subject: [PATCH 23/39] Move ActorQueue example to a doc comment --- README.md | 29 ----------------------------- Sources/AsyncQueue/ActorQueue.swift | 29 ++++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index a30db6f..2abc694 100644 --- a/README.md +++ b/README.md @@ -87,35 +87,6 @@ Task { } ``` -In practice, an `ActorQueue` should be utilized with a single instance of an `actor` type to bridge nonisolated and isolated contexts: -``` -public actor LogStore { - - // MARK: Public - - nonisolated - public func log(_ message: String) { - queue.async { - await self.append(message) - } - } - - nonisolated - public func retrieveLogs() async -> [String] { - await queue.await { await self.logs } - } - - // MARK: Private - - private func append(_ message: String) { - logs.append(message) - } - - private let queue = ActorQueue() - private var logs = [String]() -} -``` - ## Requirements * Xcode 14.1 or later. diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 81865e1..71143a4 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -21,8 +21,35 @@ // SOFTWARE. /// A queue that executes asynchronous tasks enqueued from a nonisolated context. -/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow tasks that were enqueued to begin executing. +/// Tasks are guaranteed to begin executing in the order in which they are enqueued. However, if a task suspends it will allow subsequently enqueued tasks to begin executing. /// Asynchronous tasks sent to this queue execute as they would in an `actor` type, allowing for re-entrancy and non-FIFO behavior when an individual task suspends. +/// +/// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order. +/// Here is an example of how an `ActorQueue` should be utilized within an `actor`: +/// ``` +/// public actor LogStore { +/// +/// nonisolated +/// public func log(_ message: String) { +/// queue.async { +/// await self.append(message) +/// } +/// } +/// +/// nonisolated +/// public func retrieveLogs() async -> [String] { +/// await queue.await { await self.logs } +/// } +/// +/// private func append(_ message: String) { +/// logs.append(message) +/// } +/// +/// private let queue = ActorQueue() +/// private var logs = [String]() +/// } +/// ``` +/// /// - Warning: Execution order is not guaranteed unless the enqueued tasks interact with a single `actor` instance. public final class ActorQueue { From f70f19d1010779fa890e502f207d16dc508ef3ca Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 09:31:59 -1000 Subject: [PATCH 24/39] Bump version and update README --- AsyncQueue.podspec | 2 +- README.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/AsyncQueue.podspec b/AsyncQueue.podspec index fe333ee..9d73f34 100644 --- a/AsyncQueue.podspec +++ b/AsyncQueue.podspec @@ -1,6 +1,6 @@ Pod::Spec.new do |s| s.name = 'AsyncQueue' - s.version = '0.0.1' + s.version = '0.0.2' s.license = 'MIT' s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.' s.homepage = 'https://github.com/dfed/swift-async-queue' diff --git a/README.md b/README.md index 06e6f7e..020e980 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,7 @@ To install swift-async-queue in your iOS project with [Swift Package Manager](ht ```swift dependencies: [ - .package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.1"), + .package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.2"), ] ``` @@ -117,7 +117,7 @@ To install swift-async-queue in your iOS project with [CocoaPods](http://cocoapo ``` platform :ios, '13.0' -pod 'AsyncQueue', '~> 0.1' +pod 'AsyncQueue', '~> 0.0.2' ``` ## Contributing From b8357db3a769d0d20aa7dcd1041c7cfe2cb3a441 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 10:01:21 -1000 Subject: [PATCH 25/39] Simplify ActorExecutor --- Sources/AsyncQueue/ActorQueue.swift | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 71143a4..48bd456 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -124,21 +124,14 @@ public final class ActorQueue { private actor ActorExecutor { func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { let semaphore = Semaphore() - executeWithoutWaiting(task, afterSignaling: semaphore) - // Suspend the calling code until our enqueued task starts. - await semaphore.wait() - } - - private func executeWithoutWaiting( - _ task: @escaping @Sendable () async -> Void, - afterSignaling semaphore: Semaphore) - { // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. Task { // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. await semaphore.signal() await task() } + // Suspend the calling code until our enqueued task starts. + await semaphore.wait() } } From 10ec535169963d24db12ae4e64da147aa5644aed Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 16:48:15 -1000 Subject: [PATCH 26/39] Do not ship Semaphore. Use fewer 'await' calls in ActorExecutor --- Sources/AsyncQueue/ActorQueue.swift | 22 ++++++++++++------- .../Utilities}/Semaphore.swift | 0 2 files changed, 14 insertions(+), 8 deletions(-) rename {Sources/AsyncQueue => Tests/AsyncQueueTests/Utilities}/Semaphore.swift (100%) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 48bd456..ac4f6b0 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -123,16 +123,22 @@ public final class ActorQueue { private actor ActorExecutor { func suspendUntilStarted(_ task: @escaping @Sendable () async -> Void) async { - let semaphore = Semaphore() - // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. - Task { - // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. - await semaphore.signal() - await task() - } // Suspend the calling code until our enqueued task starts. - await semaphore.wait() + await withUnsafeContinuation { continuation in + // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. + Task { + // Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance. + // Without this line the task executes on a random context, causing execution order to be nondeterministic. + _ = void + + // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. + continuation.resume() + await task() + } + } } + + private let void: Void = () } } diff --git a/Sources/AsyncQueue/Semaphore.swift b/Tests/AsyncQueueTests/Utilities/Semaphore.swift similarity index 100% rename from Sources/AsyncQueue/Semaphore.swift rename to Tests/AsyncQueueTests/Utilities/Semaphore.swift From ddff295a6be9f9e29d6b263c2ba75f1731f3fd84 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 20:07:06 -1000 Subject: [PATCH 27/39] Add link to Swift bug report --- Sources/AsyncQueue/ActorQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index ac4f6b0..e7c9954 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -128,7 +128,7 @@ public final class ActorQueue { // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. Task { // Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance. - // Without this line the task executes on a random context, causing execution order to be nondeterministic. + // This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503 _ = void // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. From 847c0ed73a435db6a80de5c6507c378e885befc9 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 20:38:10 -1000 Subject: [PATCH 28/39] Since we're renaming we should bump a major version (i.e. a minor version in beta semver) --- AsyncQueue.podspec | 2 +- README.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/AsyncQueue.podspec b/AsyncQueue.podspec index 9d73f34..54ef9da 100644 --- a/AsyncQueue.podspec +++ b/AsyncQueue.podspec @@ -1,6 +1,6 @@ Pod::Spec.new do |s| s.name = 'AsyncQueue' - s.version = '0.0.2' + s.version = '0.1.0' s.license = 'MIT' s.summary = 'A queue that enables ordered sending of events from synchronous to asynchronous code.' s.homepage = 'https://github.com/dfed/swift-async-queue' diff --git a/README.md b/README.md index 020e980..b5d774e 100644 --- a/README.md +++ b/README.md @@ -107,7 +107,7 @@ To install swift-async-queue in your iOS project with [Swift Package Manager](ht ```swift dependencies: [ - .package(url: "https://github.com/dfed/swift-async-queue", from: "0.0.2"), + .package(url: "https://github.com/dfed/swift-async-queue", from: "0.1.0"), ] ``` @@ -117,7 +117,7 @@ To install swift-async-queue in your iOS project with [CocoaPods](http://cocoapo ``` platform :ios, '13.0' -pod 'AsyncQueue', '~> 0.0.2' +pod 'AsyncQueue', '~> 0.1.0' ``` ## Contributing From 01c2dac2ccaf03f92c5f316aecd1c31da74ed545 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 21:10:37 -1000 Subject: [PATCH 29/39] Delete SemaphoreTests.swift since it is no longer required --- Tests/AsyncQueueTests/SemaphoreTests.swift | 139 --------------------- 1 file changed, 139 deletions(-) delete mode 100644 Tests/AsyncQueueTests/SemaphoreTests.swift diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift deleted file mode 100644 index e21d1e9..0000000 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ /dev/null @@ -1,139 +0,0 @@ -// MIT License -// -// Copyright (c) 2022 Dan Federman -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -import XCTest - -@testable import AsyncQueue - -final class SemaphoreTests: XCTestCase { - - // MARK: XCTestCase - - override func setUp() async throws { - try await super.setUp() - - systemUnderTest = Semaphore() - } - - override func tearDown() async throws { - try await super.tearDown() - - while await systemUnderTest.isWaiting { - await systemUnderTest.signal() - } - } - - // MARK: Behavior Tests - - func test_wait_suspendsUntilEqualNumberOfSignalCalls() async { - /* - This test is tricky to pull off! - Our requirements: - 1. We need to call `wait()` before `signal()` - 2. We need to ensure that the `wait()` call suspends _before_ we call `signal()` - 3. We can't `await` the `wait()` call before calling `signal()` since that would effectively deadlock the test. - - In order to ensure that we are executing the `wait()` calls before we call `signal()` _without awaiting a `wait()` call_, - we utilize an Actor (which has inherently ordered execution) to enqueue ordered `Task`s. We make calls to this actor - `await` the beginning of the `Task` to ensure that each `Task` has begun before resuming the test's execution. - */ - - actor CountingExecutor { - /// Enqueues an asynchronous task. This method suspends the caller until the asynchronous task has begun, ensuring ordered execution of enqueued tasks. - /// - Parameter task: A unit of work that returns work to execute after the task completes and the count is incremented. - func enqueueAndCount(_ task: @escaping @Sendable () async -> (() async -> Void)?) async { - // Await the start of the soon-to-be-enqueued `Task` with a continuation. - await withCheckedContinuation { continuation in - // Re-enter the actor context but don't wait for the result. - Task { - // Now that we're back in the actor context, resume the calling code. - continuation.resume() - let executeAfterIncrement = await task() - countedTasksCompleted += 1 - await executeAfterIncrement?() - } - } - } - - func execute(_ task: @Sendable () async throws -> Void) async rethrows { - try await task() - } - - var countedTasksCompleted = 0 - } - - let executor = CountingExecutor() - let iterationCount = 1_000 - - for _ in 1...iterationCount { - await executor.enqueueAndCount { - let didSuspend = await self.systemUnderTest.wait() - XCTAssertTrue(didSuspend) - - return { - // Signal that the suspended wait call above has resumed. - // This signal allows us to `wait()` for all of these enqueued `wait()` tasks to have completed later in this test. - await self.systemUnderTest.signal() - } - } - } - - // Loop one fewer than iterationCount. - for _ in 1.. Date: Fri, 9 Dec 2022 21:48:04 -1000 Subject: [PATCH 30/39] Remove unnecessary '@MainActor' decoration on test --- Tests/AsyncQueueTests/ActorQueueTests.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index e78868f..6725d57 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -71,7 +71,6 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - @MainActor func test_async_doesNotRetainTaskAfterExecution() async { final class Reference: Sendable {} final class ReferenceHolder: @unchecked Sendable { @@ -110,7 +109,7 @@ final class ActorQueueTests: XCTestCase { // Allow the enqueued task to complete. await asyncSemaphore.signal() // Make sure the task has completed. - waitForExpectations(timeout: 1.0) + await waitForExpectations(timeout: 1.0) XCTAssertNil(referenceHolder.weakReference) } From be963033cb0b91b2d09781654263ad73134ae653 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Fri, 9 Dec 2022 21:52:00 -1000 Subject: [PATCH 31/39] Add test_async_retainsReceiverUntilFlushed() to ActorQueueTests to match FIFOQueueTests --- Tests/AsyncQueueTests/ActorQueueTests.swift | 23 +++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 6725d57..28a5297 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -71,6 +71,29 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } + func test_async_retainsReceiverUntilFlushed() async { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = self.expectation(description: #function) + let semaphore = Semaphore() + systemUnderTest?.async { + // Make the queue wait. + await semaphore.wait() + await counter.incrementAndExpectCount(equals: 1) + } + systemUnderTest?.async { + // This async task should not execute until the semaphore is released. + await counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + } + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await waitForExpectations(timeout: 1.0) + } + func test_async_doesNotRetainTaskAfterExecution() async { final class Reference: Sendable {} final class ReferenceHolder: @unchecked Sendable { From 139f87428e17ddc3158535fda05d13aaba25ab05 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 10 Dec 2022 07:33:47 -1000 Subject: [PATCH 32/39] Simply workaround for Task not executing within actor execution context --- Sources/AsyncQueue/ActorQueue.swift | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index e7c9954..7774ae1 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -127,18 +127,19 @@ public final class ActorQueue { await withUnsafeContinuation { continuation in // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. Task { - // Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance. - // This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503 - _ = void - - // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. - continuation.resume() - await task() + // Call through to an instance method to ensure we're executing the below within the Actor context. + // For more information, see https://github.com/apple/swift/issues/62505 + await execute { + // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. + continuation.resume() + await task() + } } } } - private let void: Void = () + private func execute(_ task: () async -> Void) async { + await task() + } } - } From 2435bfbab6810828f2b9bf016a7f4b4ff10946cc Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 10 Dec 2022 07:35:10 -1000 Subject: [PATCH 33/39] Revert "Simply workaround for Task not executing within actor execution context" This reverts commit 139f87428e17ddc3158535fda05d13aaba25ab05. --- Sources/AsyncQueue/ActorQueue.swift | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index 7774ae1..e7c9954 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -127,19 +127,18 @@ public final class ActorQueue { await withUnsafeContinuation { continuation in // Utilize the serial (but not FIFO) Actor context to execute the task without requiring the calling method to wait for the task to complete. Task { - // Call through to an instance method to ensure we're executing the below within the Actor context. - // For more information, see https://github.com/apple/swift/issues/62505 - await execute { - // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. - continuation.resume() - await task() - } + // Force this task to execute within the ActorExecutor's context by accessing an ivar on the instance. + // This works around a bug when compiling with Xcode 14.1: https://github.com/apple/swift/issues/62503 + _ = void + + // Signal that the task has started. As long as the `task` below interacts with another `actor` the order of execution is guaranteed. + continuation.resume() + await task() } } } - private func execute(_ task: () async -> Void) async { - await task() - } + private let void: Void = () } + } From 3f6d78960dbc0db9ae87742a8fa708646bebbe43 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 10 Dec 2022 11:41:09 -1000 Subject: [PATCH 34/39] Better test name --- Tests/AsyncQueueTests/ActorQueueTests.swift | 2 +- Tests/AsyncQueueTests/FIFOQueueTests.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 28a5297..8109f7e 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -71,7 +71,7 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - func test_async_retainsReceiverUntilFlushed() async { + func test_async_executesAfterReceiverIsDeallocated() async { var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() let expectation = self.expectation(description: #function) diff --git a/Tests/AsyncQueueTests/FIFOQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift index b032e58..b5960ff 100644 --- a/Tests/AsyncQueueTests/FIFOQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -79,7 +79,7 @@ final class FIFOQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - func test_async_retainsReceiverUntilFlushed() async { + func test_async_executesAfterReceiverIsDeallocated() async { var systemUnderTest: FIFOQueue? = FIFOQueue() let counter = Counter() let expectation = self.expectation(description: #function) From 4b66f403c88e8c65c404420fdfd1db9cc7706296 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 10 Dec 2022 11:42:26 -1000 Subject: [PATCH 35/39] Make explicit that queue is deallocated in tests --- Tests/AsyncQueueTests/ActorQueueTests.swift | 2 ++ Tests/AsyncQueueTests/FIFOQueueTests.swift | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 8109f7e..53d81f1 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -86,8 +86,10 @@ final class ActorQueueTests: XCTestCase { await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() } + weak var queue = systemUnderTest // Nil out our reference to the queue to show that the enqueued tasks will still complete systemUnderTest = nil + XCTAssertNil(queue) // Signal the semaphore to unlock the remaining enqueued tasks. await semaphore.signal() diff --git a/Tests/AsyncQueueTests/FIFOQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift index b5960ff..1ae297c 100644 --- a/Tests/AsyncQueueTests/FIFOQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -94,8 +94,10 @@ final class FIFOQueueTests: XCTestCase { await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() } + weak var queue = systemUnderTest // Nil out our reference to the queue to show that the enqueued tasks will still complete systemUnderTest = nil + XCTAssertNil(queue) // Signal the semaphore to unlock the remaining enqueued tasks. await semaphore.signal() From ada3d6c694dc44897dec042e15ea39f75760efa7 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sat, 10 Dec 2022 11:46:55 -1000 Subject: [PATCH 36/39] Update test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() in ActorQueueTests to use an ActorQueue --- Tests/AsyncQueueTests/ActorQueueTests.swift | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 53d81f1..fdea04c 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -71,26 +71,22 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - func test_async_executesAfterReceiverIsDeallocated() async { - var systemUnderTest: FIFOQueue? = FIFOQueue() + func test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() async { + var systemUnderTest: ActorQueue? = ActorQueue() let counter = Counter() let expectation = self.expectation(description: #function) let semaphore = Semaphore() systemUnderTest?.async { - // Make the queue wait. + // Make the task wait. await semaphore.wait() await counter.incrementAndExpectCount(equals: 1) - } - systemUnderTest?.async { - // This async task should not execute until the semaphore is released. - await counter.incrementAndExpectCount(equals: 2) expectation.fulfill() } weak var queue = systemUnderTest // Nil out our reference to the queue to show that the enqueued tasks will still complete systemUnderTest = nil XCTAssertNil(queue) - // Signal the semaphore to unlock the remaining enqueued tasks. + // Signal the semaphore to unlock the enqueued tasks. await semaphore.signal() await waitForExpectations(timeout: 1.0) From 771e5e44acdc2d7f9dc3ce8f8a7182f669f497a3 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sun, 11 Dec 2022 08:05:39 -1000 Subject: [PATCH 37/39] Accurate test method naming --- Tests/AsyncQueueTests/ActorQueueTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index fdea04c..25c5b53 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -60,7 +60,7 @@ final class ActorQueueTests: XCTestCase { await systemUnderTest.await { /* Drain the queue */ } } - func test_async_allowsReentrancy() async { + func test_await_allowsReentrancy() async { let counter = Counter() await systemUnderTest.await { [systemUnderTest] in await systemUnderTest.await { From ea6ade9cfed311ae2bbf06a410916a5fb4483984 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sun, 11 Dec 2022 09:16:09 -1000 Subject: [PATCH 38/39] Remove unnecessary await --- Tests/AsyncQueueTests/ActorQueueTests.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 25c5b53..9d29927 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -68,7 +68,6 @@ final class ActorQueueTests: XCTestCase { } await counter.incrementAndExpectCount(equals: 2) } - await systemUnderTest.await { /* Drain the queue */ } } func test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() async { From bf3a0c2f7bc74486b365d93cfb08b489e61853f6 Mon Sep 17 00:00:00 2001 From: Dan Federman Date: Sun, 8 Jan 2023 15:12:31 -0800 Subject: [PATCH 39/39] Add swift tag to multi-line code examples --- README.md | 2 +- Sources/AsyncQueue/ActorQueue.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0c0026c..6dc8b63 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ A library of queues that enable sending ordered tasks from synchronous to asynch Tasks sent from a synchronous context to an asynchronous context in Swift Concurrency are inherently unordered. Consider the following test: -``` +```swift @MainActor func test_mainActor_taskOrdering() async { var counter = 0 diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index e7c9954..1c18bd6 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -26,7 +26,7 @@ /// /// An `ActorQueue` is used to ensure tasks sent from a nonisolated context to a single `actor`'s isolated context begin execution in order. /// Here is an example of how an `ActorQueue` should be utilized within an `actor`: -/// ``` +/// ```swift /// public actor LogStore { /// /// nonisolated