Skip to content
34 changes: 19 additions & 15 deletions Sources/AsyncQueue/ActorQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,26 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
let (taskStream, taskStreamContinuation) = AsyncStream<ActorTask>.makeStream()
self.taskStreamContinuation = taskStreamContinuation

Task { @ActorQueueSynchronization in
func beginExecuting(
_ operation: sending @escaping (isolated ActorType) async -> Void,
in context: isolated ActorType
) {
// In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor.
// Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution.
Task {
await operation(context)
}
}

Task {
// In an ideal world, we would isolate this `for await` loop to the `ActorType`.
// However, there's no good way to do that without retaining the actor and creating a cycle.
for await actorTask in taskStream {
// In Swift 6, a `Task` enqueued from a global actor begins executing immediately on that global actor.
// Since we're running on a global actor already, we can just dispatch a Task to get first-enqueued-first-start
// task execution. In an ideal world, we wouldn't need a global actor and would isolate this `for await` loop on
// the `ActorType`. However, there's no good way to do that just yet.
Task {
await actorTask.task(actorTask.executionContext)
}
// Await switching to the ActorType context.
await beginExecuting(
actorTask.task,
in: actorTask.executionContext
)
}
}
}
Expand Down Expand Up @@ -152,10 +163,3 @@ public final class ActorQueue<ActorType: Actor>: @unchecked Sendable {
let task: @Sendable (isolated ActorType) async -> Void
}
}

/// A global actor used for synchronizing task execution.
@globalActor
private struct ActorQueueSynchronization {
fileprivate actor Synchronization {}
fileprivate static let shared = Synchronization()
}