Skip to content

Commit 37c441d

Browse files
committed
Make ActorQueue's tasks isolated to an actor's context
1 parent 771e5e4 commit 37c441d

File tree

2 files changed

+41
-36
lines changed

2 files changed

+41
-36
lines changed

Sources/AsyncQueue/ActorQueue.swift

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@
3131
///
3232
/// nonisolated
3333
/// public func log(_ message: String) {
34-
/// queue.async {
35-
/// await self.append(message)
34+
/// queue.async(on: self) { myself in
35+
/// myself.append(message)
3636
/// }
3737
/// }
3838
///
3939
/// nonisolated
4040
/// public func retrieveLogs() async -> [String] {
41-
/// await queue.await { await self.logs }
41+
/// await queue.await(on: self) { myself in myself.logs }
4242
/// }
4343
///
4444
/// private func append(_ message: String) {
@@ -82,32 +82,38 @@ public final class ActorQueue {
8282

8383
/// Schedules an asynchronous task for execution and immediately returns.
8484
/// The scheduled task will not execute until all prior tasks have completed or suspended.
85-
/// - Parameter task: The task to enqueue.
86-
public func async(_ task: @escaping @Sendable () async -> Void) {
87-
taskStreamContinuation.yield(task)
85+
/// - Parameters:
86+
/// - isolatedActor: The actor within which the task is isolated.
87+
/// - task: The task to enqueue.
88+
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
89+
taskStreamContinuation.yield { await task(isolatedActor) }
8890
}
8991

9092
/// Schedules an asynchronous task and returns after the task is complete.
9193
/// The scheduled task will not execute until all prior tasks have completed or suspended.
92-
/// - Parameter task: The task to enqueue.
94+
/// - Parameters:
95+
/// - isolatedActor: The actor within which the task is isolated.
96+
/// - task: The task to enqueue.
9397
/// - Returns: The value returned from the enqueued task.
94-
public func await<T>(_ task: @escaping @Sendable () async -> T) async -> T {
98+
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
9599
await withUnsafeContinuation { continuation in
96100
taskStreamContinuation.yield {
97-
continuation.resume(returning: await task())
101+
continuation.resume(returning: await task(isolatedActor))
98102
}
99103
}
100104
}
101105

102106
/// Schedules an asynchronous throwing task and returns after the task is complete.
103107
/// The scheduled task will not execute until all prior tasks have completed or suspended.
104-
/// - Parameter task: The task to enqueue.
108+
/// - Parameters:
109+
/// - isolatedActor: The actor within which the task is isolated.
110+
/// - task: The task to enqueue.
105111
/// - Returns: The value returned from the enqueued task.
106-
public func await<T>(_ task: @escaping @Sendable () async throws -> T) async throws -> T {
112+
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
107113
try await withUnsafeThrowingContinuation { continuation in
108114
taskStreamContinuation.yield {
109115
do {
110-
continuation.resume(returning: try await task())
116+
continuation.resume(returning: try await task(isolatedActor))
111117
} catch {
112118
continuation.resume(throwing: error)
113119
}

Tests/AsyncQueueTests/ActorQueueTests.swift

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,47 +39,46 @@ final class ActorQueueTests: XCTestCase {
3939
func test_async_sendsEventsInOrder() async {
4040
let counter = Counter()
4141
for iteration in 1...1_000 {
42-
systemUnderTest.async {
43-
await counter.incrementAndExpectCount(equals: iteration)
42+
systemUnderTest.async(on: counter) { counter in
43+
counter.incrementAndExpectCount(equals: iteration)
4444
}
4545
}
46-
await systemUnderTest.await { /* Drain the queue */ }
46+
await systemUnderTest.await(on: counter) { _ in /* Drain the queue */ }
4747
}
4848

4949
func test_async_startsExecutionOfNextTaskAfterSuspension() async {
5050
let semaphore = Semaphore()
51-
systemUnderTest.async {
51+
systemUnderTest.async(on: semaphore) { semaphore in
5252
await semaphore.wait()
5353
}
54-
systemUnderTest.async {
54+
systemUnderTest.async(on: semaphore) { semaphore in
5555
// Signal the semaphore from the actor queue.
5656
// If the actor queue were FIFO, this test would hang since this code would never execute:
5757
// we'd still be waiting for the prior `wait()` tasks to finish.
58-
await semaphore.signal()
58+
semaphore.signal()
5959
}
60-
await systemUnderTest.await { /* Drain the queue */ }
60+
await systemUnderTest.await(on: semaphore) { _ in /* Drain the queue */ }
6161
}
6262

6363
func test_await_allowsReentrancy() async {
6464
let counter = Counter()
65-
await systemUnderTest.await { [systemUnderTest] in
66-
await systemUnderTest.await {
67-
await counter.incrementAndExpectCount(equals: 1)
65+
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
66+
await systemUnderTest.await(on: counter) { counter in
67+
counter.incrementAndExpectCount(equals: 1)
6868
}
69-
await counter.incrementAndExpectCount(equals: 2)
69+
counter.incrementAndExpectCount(equals: 2)
7070
}
71-
await systemUnderTest.await { /* Drain the queue */ }
7271
}
7372

7473
func test_async_executesEnqueuedTasksAfterReceiverIsDeallocated() async {
7574
var systemUnderTest: ActorQueue? = ActorQueue()
7675
let counter = Counter()
7776
let expectation = self.expectation(description: #function)
7877
let semaphore = Semaphore()
79-
systemUnderTest?.async {
78+
systemUnderTest?.async(on: counter) { counter in
8079
// Make the task wait.
8180
await semaphore.wait()
82-
await counter.incrementAndExpectCount(equals: 1)
81+
counter.incrementAndExpectCount(equals: 1)
8382
expectation.fulfill()
8483
}
8584
weak var queue = systemUnderTest
@@ -110,14 +109,14 @@ final class ActorQueueTests: XCTestCase {
110109
let asyncSemaphore = Semaphore()
111110
let syncSemaphore = Semaphore()
112111
let expectation = self.expectation(description: #function)
113-
systemUnderTest.async { [reference = referenceHolder.reference] in
112+
systemUnderTest.async(on: syncSemaphore) { [reference = referenceHolder.reference] syncSemaphore in
114113
// Now that we've started the task and captured the reference, release the synchronous code.
115-
await syncSemaphore.signal()
114+
syncSemaphore.signal()
116115
// Wait for the synchronous setup to complete and the reference to be nil'd out.
117116
await asyncSemaphore.wait()
118117
// Retain the unsafe counter until the task is completed.
119118
_ = reference
120-
self.systemUnderTest.async {
119+
self.systemUnderTest.async(on: syncSemaphore) { _ in
121120
// Signal that this task has cleaned up.
122121
// This closure will not execute until the prior closure completes.
123122
expectation.fulfill()
@@ -138,26 +137,26 @@ final class ActorQueueTests: XCTestCase {
138137
func test_await_sendsEventsInOrder() async {
139138
let counter = Counter()
140139
for iteration in 1...1_000 {
141-
systemUnderTest.async {
142-
await counter.incrementAndExpectCount(equals: iteration)
140+
systemUnderTest.async(on: counter) { counter in
141+
counter.incrementAndExpectCount(equals: iteration)
143142
}
144143

145144
guard iteration % 25 == 0 else {
146145
// Keep sending async events to the queue.
147146
continue
148147
}
149148

150-
await systemUnderTest.await {
151-
let count = await counter.count
149+
await systemUnderTest.await(on: counter) { counter in
150+
let count = counter.count
152151
XCTAssertEqual(count, iteration)
153152
}
154153
}
155-
await systemUnderTest.await { /* Drain the queue */ }
154+
await systemUnderTest.await(on: counter) { counter in /* Drain the queue */ }
156155
}
157156

158157
func test_await_canReturn() async {
159158
let expectedValue = UUID()
160-
let returnedValue = await systemUnderTest.await { expectedValue }
159+
let returnedValue = await systemUnderTest.await(on: Counter()) { _ in expectedValue }
161160
XCTAssertEqual(expectedValue, returnedValue)
162161
}
163162

@@ -167,7 +166,7 @@ final class ActorQueueTests: XCTestCase {
167166
}
168167
let expectedError = TestError()
169168
do {
170-
try await systemUnderTest.await { throw expectedError }
169+
try await systemUnderTest.await(on: Counter()) { _ in throw expectedError }
171170
} catch {
172171
XCTAssertEqual(error as? TestError, expectedError)
173172
}

0 commit comments

Comments
 (0)