Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func testFIFOQueueOrdering() async {
actor Counter {
nonisolated
func incrementAndAssertCountEquals(_ expectedCount: Int) {
queue.async {
queue.enqueue {
await self.increment()
let incrementedCount = await self.count
XCTAssertEqual(incrementedCount, expectedCount) // always succeeds
Expand All @@ -62,7 +62,7 @@ func testFIFOQueueOrdering() async {

nonisolated
func flushQueue() async {
await queue.await { }
await queue.enqueueAndWait { }
}

func increment() {
Expand Down Expand Up @@ -104,15 +104,15 @@ func testActorQueueOrdering() async {

nonisolated
func incrementAndAssertCountEquals(_ expectedCount: Int) {
queue.async { myself in
queue.enqueue { myself in
myself.count += 1
XCTAssertEqual(expectedCount, myself.count) // always succeeds
}
}

nonisolated
func flushQueue() async {
await queue.await { _ in }
await queue.enqueueAndWait { _ in }
}

private var count = 0
Expand Down
12 changes: 6 additions & 6 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
///
/// nonisolated
/// public func log(_ message: String) {
/// queue.async { myself in
/// queue.enqueue { myself in
/// myself.logs.append(message)
/// }
/// }
///
/// nonisolated
/// public func retrieveLogs() async -> [String] {
/// await queue.await { myself in myself.logs }
/// await queue.enqueueAndWait { myself in myself.logs }
/// }
///
/// private let queue = ActorQueue<LogStore>()
Expand Down Expand Up @@ -78,7 +78,7 @@ public final class ActorQueue<ActorType: Actor> {

// MARK: Public

/// Sets the actor context within which each `async` and `await`ed task will execute.
/// Sets the actor context within which each `enqueue` and `enqueueAndWait` task will execute.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"enqueue and enqueueAndWait" -> "enqueueed and enqueueAndWaited"?

/// It is recommended that this method be called in the adopted actor’s `init` method.
/// **Must be called prior to enqueuing any work on the receiver.**
///
Expand All @@ -92,15 +92,15 @@ public final class ActorQueue<ActorType: Actor> {
/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
public func async(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
public func enqueue(_ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield(ActorTask(executionContext: executionContext, task: task))
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
return await withUnsafeContinuation { continuation in
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
Expand All @@ -113,7 +113,7 @@ public final class ActorQueue<ActorType: Actor> {
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue. The task's parameter is a reference to the actor whose execution context has been adopted.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
let executionContext = self.executionContext // Capture/retain the executionContext before suspending.
return try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield(ActorTask(executionContext: executionContext) { executionContext in
Expand Down
12 changes: 6 additions & 6 deletions Sources/AsyncQueue/FIFOQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class FIFOQueue: Sendable {
/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
public func async(_ task: @escaping @Sendable () async -> Void) {
public func enqueue(_ task: @escaping @Sendable () async -> Void) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to edit this await in this line?

/// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `await` this queue from a task executing on this queue will result in a deadlock.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooooh solid find!

taskStreamContinuation.yield(task)
}

Expand All @@ -63,15 +63,15 @@ public final class FIFOQueue: Sendable {
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
public func enqueue<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield { await task(isolatedActor) }
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable () async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task())
Expand All @@ -85,7 +85,7 @@ public final class FIFOQueue: Sendable {
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
public func enqueueAndWait<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task(isolatedActor))
Expand All @@ -97,7 +97,7 @@ public final class FIFOQueue: Sendable {
/// The scheduled task will not execute until all prior tasks – including suspended tasks – have completed.
/// - Parameter task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
public func enqueueAndWait<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
Expand All @@ -115,7 +115,7 @@ public final class FIFOQueue: Sendable {
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
public func enqueueAndWait<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
Expand Down
58 changes: 29 additions & 29 deletions Tests/AsyncQueueTests/ActorQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ final class ActorQueueTests: XCTestCase {
XCTAssertNil(weakCounter)
}

func test_async_retainsAdoptedActorUntilEnqueuedTasksComplete() async {
func test_enqueue_retainsAdoptedActorUntilEnqueuedTasksComplete() async {
let systemUnderTest = ActorQueue<Counter>()
var counter: Counter? = Counter()
weak var weakCounter = counter
systemUnderTest.adoptExecutionContext(of: counter!)

let semaphore = Semaphore()
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
await semaphore.wait()
}

Expand All @@ -63,64 +63,64 @@ final class ActorQueueTests: XCTestCase {
await semaphore.signal()
}

func test_async_taskParameterIsAdoptedActor() async {
func test_enqueue_taskParameterIsAdoptedActor() async {
let semaphore = Semaphore()
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
XCTAssertTrue(counter === self.counter)
await semaphore.signal()
}

await semaphore.wait()
}

func test_await_taskParameterIsAdoptedActor() async {
await systemUnderTest.await { counter in
func test_enqueueAndWait_taskParameterIsAdoptedActor() async {
await systemUnderTest.enqueueAndWait { counter in
XCTAssertTrue(counter === self.counter)
}
}

func test_async_sendsEventsInOrder() async {
func test_enqueue_sendsEventsInOrder() async {
for iteration in 1...1_000 {
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
counter.incrementAndExpectCount(equals: iteration)
}
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_async_startsExecutionOfNextTaskAfterSuspension() async {
func test_enqueue_startsExecutionOfNextTaskAfterSuspension() async {
let systemUnderTest = ActorQueue<Semaphore>()
let semaphore = Semaphore()
systemUnderTest.adoptExecutionContext(of: semaphore)

systemUnderTest.async { semaphore in
systemUnderTest.enqueue { semaphore in
await semaphore.wait()
}
systemUnderTest.async { semaphore in
systemUnderTest.enqueue { semaphore in
// Signal the semaphore from the actor queue.
// If the actor queue were FIFO, this test would hang since this code would never execute:
// we'd still be waiting for the prior `wait()` tasks to finish.
semaphore.signal()
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_await_allowsReentrancy() async {
await systemUnderTest.await { [systemUnderTest] counter in
await systemUnderTest.await { counter in
func test_enqueueAndWait_allowsReentrancy() async {
await systemUnderTest.enqueueAndWait { [systemUnderTest] counter in
await systemUnderTest.enqueueAndWait { counter in
counter.incrementAndExpectCount(equals: 1)
}
counter.incrementAndExpectCount(equals: 2)
}
}

func test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() async {
func test_enqueue_executesEnqueuedTasksAfterReceiverIsDeallocated() async {
var systemUnderTest: ActorQueue<Counter>? = ActorQueue()
systemUnderTest?.adoptExecutionContext(of: counter)

let expectation = self.expectation(description: #function)
let semaphore = Semaphore()
systemUnderTest?.async { counter in
systemUnderTest?.enqueue { counter in
// Make the task wait.
await semaphore.wait()
counter.incrementAndExpectCount(equals: 1)
Expand All @@ -136,7 +136,7 @@ final class ActorQueueTests: XCTestCase {
await waitForExpectations(timeout: 1.0)
}

func test_async_doesNotRetainTaskAfterExecution() async {
func test_enqueue_doesNotRetainTaskAfterExecution() async {
final class Reference: Sendable {}
final class ReferenceHolder: @unchecked Sendable {
init() {
Expand All @@ -157,14 +157,14 @@ final class ActorQueueTests: XCTestCase {
systemUnderTest.adoptExecutionContext(of: syncSemaphore)

let expectation = self.expectation(description: #function)
systemUnderTest.async { [reference = referenceHolder.reference] syncSemaphore in
systemUnderTest.enqueue { [reference = referenceHolder.reference] syncSemaphore in
// Now that we've started the task and captured the reference, release the synchronous code.
syncSemaphore.signal()
// Wait for the synchronous setup to complete and the reference to be nil'd out.
await asyncSemaphore.wait()
// Retain the unsafe counter until the task is completed.
_ = reference
systemUnderTest.async { _ in
systemUnderTest.enqueue { _ in
// Signal that this task has cleaned up.
// This closure will not execute until the prior closure completes.
expectation.fulfill()
Expand All @@ -182,9 +182,9 @@ final class ActorQueueTests: XCTestCase {
XCTAssertNil(referenceHolder.weakReference)
}

func test_await_sendsEventsInOrder() async {
func test_enqueueAndWait_sendsEventsInOrder() async {
for iteration in 1...1_000 {
systemUnderTest.async { counter in
systemUnderTest.enqueue { counter in
counter.incrementAndExpectCount(equals: iteration)
}

Expand All @@ -193,26 +193,26 @@ final class ActorQueueTests: XCTestCase {
continue
}

await systemUnderTest.await { counter in
await systemUnderTest.enqueueAndWait { counter in
XCTAssertEqual(counter.count, iteration)
}
}
await systemUnderTest.await { _ in /* Drain the queue */ }
await systemUnderTest.enqueueAndWait { _ in /* Drain the queue */ }
}

func test_await_canReturn() async {
func test_enqueueAndWait_canReturn() async {
let expectedValue = UUID()
let returnedValue = await systemUnderTest.await { _ in expectedValue }
let returnedValue = await systemUnderTest.enqueueAndWait { _ in expectedValue }
XCTAssertEqual(expectedValue, returnedValue)
}

func test_await_canThrow() async {
func test_enqueueAndWait_canThrow() async {
struct TestError: Error, Equatable {
private let identifier = UUID()
}
let expectedError = TestError()
do {
try await systemUnderTest.await { _ in throw expectedError }
try await systemUnderTest.enqueueAndWait { _ in throw expectedError }
} catch {
XCTAssertEqual(error as? TestError, expectedError)
}
Expand Down
Loading