Skip to content

Commit ada5638

Browse files
committed
Enable executing the enqueued task on a desired actor's execution context
1 parent ada3d6c commit ada5638

File tree

2 files changed

+247
-0
lines changed

2 files changed

+247
-0
lines changed

Sources/AsyncQueue/FIFOQueue.swift

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ public final class FIFOQueue: Sendable {
5858
taskStreamContinuation.yield(task)
5959
}
6060

61+
/// Schedules an asynchronous task for execution and immediately returns.
62+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
63+
/// - Parameters:
64+
/// - isolatedActor: The actor within which the task is isolated.
65+
/// - task: The task to enqueue.
66+
public func async<ActorType: Actor>(on isolatedActor: ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> Void) {
67+
taskStreamContinuation.yield { await task(isolatedActor) }
68+
}
69+
6170
/// Schedules an asynchronous task and returns after the task is complete.
6271
/// The scheduled task will not execute until all prior tasks have completed.
6372
/// - Parameter task: The task to enqueue.
@@ -70,6 +79,20 @@ public final class FIFOQueue: Sendable {
7079
}
7180
}
7281

82+
/// Schedules an asynchronous task and returns after the task is complete.
83+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
84+
/// - Parameters:
85+
/// - isolatedActor: The actor within which the task is isolated.
86+
/// - task: The task to enqueue.
87+
/// - Returns: The value returned from the enqueued task.
88+
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async -> T) async -> T {
89+
await withUnsafeContinuation { continuation in
90+
taskStreamContinuation.yield {
91+
continuation.resume(returning: await task(isolatedActor))
92+
}
93+
}
94+
}
95+
7396
/// Schedules an asynchronous throwing task and returns after the task is complete.
7497
/// The scheduled task will not execute until all prior tasks have completed.
7598
/// - Parameter task: The task to enqueue.
@@ -86,6 +109,24 @@ public final class FIFOQueue: Sendable {
86109
}
87110
}
88111

112+
/// Schedules an asynchronous throwing task and returns after the task is complete.
113+
/// The scheduled task will not execute until all prior tasks have completed or suspended.
114+
/// - Parameters:
115+
/// - isolatedActor: The actor within which the task is isolated.
116+
/// - task: The task to enqueue.
117+
/// - Returns: The value returned from the enqueued task.
118+
public func await<ActorType: Actor, T>(on isolatedActor: isolated ActorType, _ task: @escaping @Sendable (isolated ActorType) async throws -> T) async throws -> T {
119+
try await withUnsafeThrowingContinuation { continuation in
120+
taskStreamContinuation.yield {
121+
do {
122+
continuation.resume(returning: try await task(isolatedActor))
123+
} catch {
124+
continuation.resume(throwing: error)
125+
}
126+
}
127+
}
128+
}
129+
89130
// MARK: Private
90131

91132
private let streamTask: Task<Void, Never>

Tests/AsyncQueueTests/FIFOQueueTests.swift

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,32 @@ final class FIFOQueueTests: XCTestCase {
4646
await systemUnderTest.await { /* Drain the queue */ }
4747
}
4848

49+
func test_asyncOn_sendsEventsInOrder() async {
50+
let counter = Counter()
51+
for iteration in 1...1_000 {
52+
systemUnderTest.async(on: counter) { counter in
53+
counter.incrementAndExpectCount(equals: iteration)
54+
}
55+
}
56+
await systemUnderTest.await { /* Drain the queue */ }
57+
}
58+
59+
func test_async_asyncOn_sendEventsInOrder() async {
60+
let counter = Counter()
61+
for iteration in 1...1_000 {
62+
if iteration % 2 == 0 {
63+
systemUnderTest.async {
64+
await counter.incrementAndExpectCount(equals: iteration)
65+
}
66+
} else {
67+
systemUnderTest.async(on: counter) { counter in
68+
counter.incrementAndExpectCount(equals: iteration)
69+
}
70+
}
71+
}
72+
await systemUnderTest.await { /* Drain the queue */ }
73+
}
74+
4975
func test_async_executesAsyncBlocksAtomically() async {
5076
let semaphore = Semaphore()
5177
for _ in 1...1_000 {
@@ -65,7 +91,54 @@ final class FIFOQueueTests: XCTestCase {
6591
await systemUnderTest.await { /* Drain the queue */ }
6692
}
6793

94+
func test_asyncOn_executesAsyncBlocksAtomically() async {
95+
let semaphore = Semaphore()
96+
for _ in 1...1_000 {
97+
systemUnderTest.async(on: semaphore) { semaphore in
98+
let isWaiting = semaphore.isWaiting
99+
// This test will fail occasionally if we aren't executing atomically.
100+
// You can prove this to yourself by replacing `systemUnderTest.async` above with `Task`.
101+
XCTAssertFalse(isWaiting)
102+
// Signal the semaphore before or after we wait – let the scheduler decide.
103+
Task {
104+
semaphore.signal()
105+
}
106+
// Wait for the concurrent task to complete.
107+
await semaphore.wait()
108+
}
109+
}
110+
await systemUnderTest.await { /* Drain the queue */ }
111+
}
112+
68113
func test_async_isNotReentrant() async {
114+
let counter = Counter()
115+
systemUnderTest.async { [systemUnderTest] in
116+
systemUnderTest.async {
117+
await counter.incrementAndExpectCount(equals: 2)
118+
}
119+
await counter.incrementAndExpectCount(equals: 1)
120+
systemUnderTest.async {
121+
await counter.incrementAndExpectCount(equals: 3)
122+
}
123+
}
124+
await systemUnderTest.await { /* Drain the queue */ }
125+
}
126+
127+
func test_asyncOn_isNotReentrant() async {
128+
let counter = Counter()
129+
systemUnderTest.async(on: counter) { [systemUnderTest] counter in
130+
systemUnderTest.async(on: counter) { counter in
131+
counter.incrementAndExpectCount(equals: 2)
132+
}
133+
counter.incrementAndExpectCount(equals: 1)
134+
systemUnderTest.async(on: counter) { counter in
135+
counter.incrementAndExpectCount(equals: 3)
136+
}
137+
}
138+
await systemUnderTest.await { /* Drain the queue */ }
139+
}
140+
141+
func test_await_async_areNotReentrant() async {
69142
let counter = Counter()
70143
await systemUnderTest.await { [systemUnderTest] in
71144
systemUnderTest.async {
@@ -79,6 +152,48 @@ final class FIFOQueueTests: XCTestCase {
79152
await systemUnderTest.await { /* Drain the queue */ }
80153
}
81154

155+
func test_awaitOn_asyncOn_areNotReentrant() async {
156+
let counter = Counter()
157+
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
158+
systemUnderTest.async(on: counter) { counter in
159+
counter.incrementAndExpectCount(equals: 2)
160+
}
161+
counter.incrementAndExpectCount(equals: 1)
162+
systemUnderTest.async(on: counter) { counter in
163+
counter.incrementAndExpectCount(equals: 3)
164+
}
165+
}
166+
await systemUnderTest.await { /* Drain the queue */ }
167+
}
168+
169+
func test_await_asyncOn_areNotReentrant() async {
170+
let counter = Counter()
171+
await systemUnderTest.await { [systemUnderTest] in
172+
systemUnderTest.async(on: counter) { counter in
173+
counter.incrementAndExpectCount(equals: 2)
174+
}
175+
await counter.incrementAndExpectCount(equals: 1)
176+
systemUnderTest.async(on: counter) { counter in
177+
counter.incrementAndExpectCount(equals: 3)
178+
}
179+
}
180+
await systemUnderTest.await { /* Drain the queue */ }
181+
}
182+
183+
func test_awaitOn_async_areNotReentrant() async {
184+
let counter = Counter()
185+
await systemUnderTest.await(on: counter) { [systemUnderTest] counter in
186+
systemUnderTest.async {
187+
await counter.incrementAndExpectCount(equals: 2)
188+
}
189+
counter.incrementAndExpectCount(equals: 1)
190+
systemUnderTest.async {
191+
await counter.incrementAndExpectCount(equals: 3)
192+
}
193+
}
194+
await systemUnderTest.await { /* Drain the queue */ }
195+
}
196+
82197
func test_async_executesAfterReceiverIsDeallocated() async {
83198
var systemUnderTest: FIFOQueue? = FIFOQueue()
84199
let counter = Counter()
@@ -104,6 +219,31 @@ final class FIFOQueueTests: XCTestCase {
104219
await waitForExpectations(timeout: 1.0)
105220
}
106221

222+
func test_asyncOn_executesAfterReceiverIsDeallocated() async {
223+
var systemUnderTest: FIFOQueue? = FIFOQueue()
224+
let counter = Counter()
225+
let expectation = self.expectation(description: #function)
226+
let semaphore = Semaphore()
227+
systemUnderTest?.async(on: counter) { counter in
228+
// Make the queue wait.
229+
await semaphore.wait()
230+
counter.incrementAndExpectCount(equals: 1)
231+
}
232+
systemUnderTest?.async(on: counter) { counter in
233+
// This async task should not execute until the semaphore is released.
234+
counter.incrementAndExpectCount(equals: 2)
235+
expectation.fulfill()
236+
}
237+
weak var queue = systemUnderTest
238+
// Nil out our reference to the queue to show that the enqueued tasks will still complete
239+
systemUnderTest = nil
240+
XCTAssertNil(queue)
241+
// Signal the semaphore to unlock the remaining enqueued tasks.
242+
await semaphore.signal()
243+
244+
await waitForExpectations(timeout: 1.0)
245+
}
246+
107247
func test_async_doesNotRetainTaskAfterExecution() async {
108248
final class Reference: Sendable {}
109249
final class ReferenceHolder: @unchecked Sendable {
@@ -132,6 +272,34 @@ final class FIFOQueueTests: XCTestCase {
132272
XCTAssertNil(weakReference)
133273
}
134274

275+
func test_asyncOn_doesNotRetainTaskAfterExecution() async {
276+
final class Reference: Sendable {}
277+
final class ReferenceHolder: @unchecked Sendable {
278+
var reference: Reference? = Reference()
279+
}
280+
let referenceHolder = ReferenceHolder()
281+
weak var weakReference = referenceHolder.reference
282+
let asyncSemaphore = Semaphore()
283+
let syncSemaphore = Semaphore()
284+
systemUnderTest.async(on: syncSemaphore) { [reference = referenceHolder.reference] syncSemaphore in
285+
// Now that we've started the task and captured the reference, release the synchronous code.
286+
syncSemaphore.signal()
287+
// Wait for the synchronous setup to complete and the reference to be nil'd out.
288+
await asyncSemaphore.wait()
289+
// Retain the unsafe counter until the task is completed.
290+
_ = reference
291+
}
292+
// Wait for the asynchronous task to start.
293+
await syncSemaphore.wait()
294+
referenceHolder.reference = nil
295+
XCTAssertNotNil(weakReference)
296+
// Allow the enqueued task to complete.
297+
await asyncSemaphore.signal()
298+
// Make sure the task has completed.
299+
await systemUnderTest.await { /* Drain the queue */ }
300+
XCTAssertNil(weakReference)
301+
}
302+
135303
func test_await_sendsEventsInOrder() async {
136304
let counter = Counter()
137305
for iteration in 1...1_000 {
@@ -152,12 +320,38 @@ final class FIFOQueueTests: XCTestCase {
152320
await systemUnderTest.await { /* Drain the queue */ }
153321
}
154322

323+
func test_awaitOn_sendsEventsInOrder() async {
324+
let counter = Counter()
325+
for iteration in 1...1_000 {
326+
systemUnderTest.async {
327+
await counter.incrementAndExpectCount(equals: iteration)
328+
}
329+
330+
guard iteration % 25 == 0 else {
331+
// Keep sending async events to the queue.
332+
continue
333+
}
334+
335+
await systemUnderTest.await(on: counter) { counter in
336+
let count = counter.count
337+
XCTAssertEqual(count, iteration)
338+
}
339+
}
340+
await systemUnderTest.await { /* Drain the queue */ }
341+
}
342+
155343
func test_await_canReturn() async {
156344
let expectedValue = UUID()
157345
let returnedValue = await systemUnderTest.await { expectedValue }
158346
XCTAssertEqual(expectedValue, returnedValue)
159347
}
160348

349+
func test_awaitOn_canReturn() async {
350+
let expectedValue = UUID()
351+
let returnedValue = await systemUnderTest.await(on: Counter()) { _ in expectedValue }
352+
XCTAssertEqual(expectedValue, returnedValue)
353+
}
354+
161355
func test_await_canThrow() async {
162356
struct TestError: Error, Equatable {
163357
private let identifier = UUID()
@@ -170,6 +364,18 @@ final class FIFOQueueTests: XCTestCase {
170364
}
171365
}
172366

367+
func test_awaitOn_canThrow() async {
368+
struct TestError: Error, Equatable {
369+
private let identifier = UUID()
370+
}
371+
let expectedError = TestError()
372+
do {
373+
try await systemUnderTest.await(on: Counter()) { _ in throw expectedError }
374+
} catch {
375+
XCTAssertEqual(error as? TestError, expectedError)
376+
}
377+
}
378+
173379
// MARK: Private
174380

175381
private var systemUnderTest = FIFOQueue()

0 commit comments

Comments
 (0)