Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,32 @@ Task {

### Sending ordered asynchronous tasks to Actors

Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code.
Use an `ActorQueue` to send ordered asynchronous tasks from a nonisolated context to an `actor` instance's isolated `async` context. Tasks sent to one of these queues are guaranteed to begin executing in the order in which they are enqueued. Ordering of execution is guaranteed up until the first [suspension point](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID639) within the called `actor` code.

```swift
let targetActor = MyActor()
let queue = ActorQueue()
queue.async {
queue.async(on: targetActor) { targetActor in
/*
`async` context that executes after all other enqueued work has begun executing.
Work enqueued after this task will wait for this task to complete or suspend.
This task executes on the `targetActor`'s isolated context.
*/
try? await Task.sleep(nanoseconds: 1_000_000)
}
queue.async {
queue.async(on: targetActor) { targetActor in
/*
This task begins execution once the above task suspends due to the one-second sleep.
This task executes on the `targetActor`'s isolated context.
*/
}
Task {
await queue.await {
await queue.await(on: targetActor) { targetActor in
/*
`async` context that can return a value or throw an error.
Executes after all other enqueued work has begun executing.
Work enqueued after this task will wait for this task to complete or suspend.
This task executes on the targetActor's isolated context.
*/
}
}
Expand Down
30 changes: 18 additions & 12 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
///
/// nonisolated
/// public func log(_ message: String) {
/// queue.async {
/// await self.append(message)
/// queue.async(on: self) { myself in
/// myself.append(message)
/// }
/// }
///
/// nonisolated
/// public func retrieveLogs() async -> [String] {
/// await queue.await { await self.logs }
/// await queue.await(on: self) { myself in myself.logs }
/// }
///
/// private func append(_ message: String) {
Expand Down Expand Up @@ -82,32 +82,38 @@ public final class ActorQueue {

/// Schedules an asynchronous task for execution and immediately returns.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
public func async(_ task: @escaping @Sendable () async -> Void) {
taskStreamContinuation.yield(task)
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
taskStreamContinuation.yield { await task(isolatedActor) }
}

/// Schedules an asynchronous task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
await withUnsafeContinuation { continuation in
taskStreamContinuation.yield {
continuation.resume(returning: await task())
continuation.resume(returning: await task(isolatedActor))
}
}
}

/// Schedules an asynchronous throwing task and returns after the task is complete.
/// The scheduled task will not execute until all prior tasks have completed or suspended.
/// - Parameter task: The task to enqueue.
/// - Parameters:
/// - isolatedActor: The actor within which the task is isolated.
/// - task: The task to enqueue.
/// - Returns: The value returned from the enqueued task.
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
try await withUnsafeThrowingContinuation { continuation in
taskStreamContinuation.yield {
do {
continuation.resume(returning: try await task())
continuation.resume(returning: try await task(isolatedActor))
} catch {
continuation.resume(throwing: error)
}
Expand Down
46 changes: 23 additions & 23 deletions Tests/AsyncQueueTests/ActorQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,34 @@ final class ActorQueueTests: XCTestCase {
func test_async_sendsEventsInOrder() async {
let counter = Counter()
for iteration in 1...1_000 {
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: iteration)
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: iteration)
}
}
await systemUnderTest.await { /* Drain the queue */ }
await systemUnderTest.await(on: counter) { _ in /* Drain the queue */ }
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to add a new internal method called func allEnqueuedTasksSuspendedOrCompleted() async to make this test call-site read more clearly since we have a few of these.

}

func test_async_startsExecutionOfNextTaskAfterSuspension() async {
let semaphore = Semaphore()
systemUnderTest.async {
systemUnderTest.async(on: semaphore) { semaphore in
await semaphore.wait()
}
systemUnderTest.async {
systemUnderTest.async(on: semaphore) { semaphore in
// Signal the semaphore from the actor queue.
// If the actor queue were FIFO, this test would hang since this code would never execute:
// we'd still be waiting for the prior `wait()` tasks to finish.
await semaphore.signal()
semaphore.signal()
}
await systemUnderTest.await { /* Drain the queue */ }
await systemUnderTest.await(on: semaphore) { _ in /* Drain the queue */ }
}

func test_await_allowsReentrancy() async {
let counter = Counter()
await systemUnderTest.await { [systemUnderTest] in
await systemUnderTest.await {
await counter.incrementAndExpectCount(equals: 1)
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
await systemUnderTest.await(on: counter) { counter in
counter.incrementAndExpectCount(equals: 1)
}
await counter.incrementAndExpectCount(equals: 2)
counter.incrementAndExpectCount(equals: 2)
}
}

Expand All @@ -75,10 +75,10 @@ final class ActorQueueTests: XCTestCase {
let counter = Counter()
let expectation = self.expectation(description: #function)
let semaphore = Semaphore()
systemUnderTest?.async {
systemUnderTest?.async(on: counter) { counter in
// Make the task wait.
await semaphore.wait()
await counter.incrementAndExpectCount(equals: 1)
counter.incrementAndExpectCount(equals: 1)
expectation.fulfill()
}
weak var queue = systemUnderTest
Expand Down Expand Up @@ -109,14 +109,14 @@ final class ActorQueueTests: XCTestCase {
let asyncSemaphore = Semaphore()
let syncSemaphore = Semaphore()
let expectation = self.expectation(description: #function)
systemUnderTest.async { [reference = referenceHolder.reference] in
systemUnderTest.async(on: syncSemaphore) { [reference = referenceHolder.reference] syncSemaphore in
// Now that we've started the task and captured the reference, release the synchronous code.
await syncSemaphore.signal()
syncSemaphore.signal()
// Wait for the synchronous setup to complete and the reference to be nil'd out.
await asyncSemaphore.wait()
// Retain the unsafe counter until the task is completed.
_ = reference
self.systemUnderTest.async {
self.systemUnderTest.async(on: syncSemaphore) { _ in
// Signal that this task has cleaned up.
// This closure will not execute until the prior closure completes.
expectation.fulfill()
Expand All @@ -137,26 +137,26 @@ final class ActorQueueTests: XCTestCase {
func test_await_sendsEventsInOrder() async {
let counter = Counter()
for iteration in 1...1_000 {
systemUnderTest.async {
await counter.incrementAndExpectCount(equals: iteration)
systemUnderTest.async(on: counter) { counter in
counter.incrementAndExpectCount(equals: iteration)
}

guard iteration % 25 == 0 else {
// Keep sending async events to the queue.
continue
}

await systemUnderTest.await {
let count = await counter.count
await systemUnderTest.await(on: counter) { counter in
let count = counter.count
XCTAssertEqual(count, iteration)
}
}
await systemUnderTest.await { /* Drain the queue */ }
await systemUnderTest.await(on: counter) { counter in /* Drain the queue */ }
}

func test_await_canReturn() async {
let expectedValue = UUID()
let returnedValue = await systemUnderTest.await { expectedValue }
let returnedValue = await systemUnderTest.await(on: Counter()) { _ in expectedValue }
XCTAssertEqual(expectedValue, returnedValue)
}

Expand All @@ -166,7 +166,7 @@ final class ActorQueueTests: XCTestCase {
}
let expectedError = TestError()
do {
try await systemUnderTest.await { throw expectedError }
try await systemUnderTest.await(on: Counter()) { _ in throw expectedError }
} catch {
XCTAssertEqual(error as? TestError, expectedError)
}
Expand Down