#if compiler(>=6.1) && _runtime(_multithreaded) // @_expose and @_extern are only available in Swift 6.1+ import JavaScriptKit import _CJavaScriptKit import _CJavaScriptEventLoop import Synchronization #if canImport(wasi_pthread) import wasi_pthread import WASILibc #endif // MARK: - Web Worker Task Executor /// A task executor that runs tasks on Web Worker threads. /// /// ## Prerequisites /// /// This task executor is designed to work with [wasi-threads](https://github.com/WebAssembly/wasi-threads) /// but it requires the following single extension: /// The wasi-threads implementation should listen to the `message` event /// from spawned Web Workers, and forward the message to the main thread /// by calling `_swjs_enqueue_main_job_from_worker`. /// /// ## Usage /// /// ```swift /// let executor = WebWorkerTaskExecutor(numberOfThreads: 4) /// defer { executor.terminate() } /// /// await withTaskExecutorPreference(executor) { /// // This block runs on the Web Worker thread. /// await withTaskGroup(of: Int.self) { group in /// for i in 0..<10 { /// // Structured child works are executed on the Web Worker thread. /// group.addTask { fibonacci(of: i) } /// } /// } /// } /// ```` /// /// ## Known limitations /// /// Currently, the Cooperative Global Executor of Swift runtime has a bug around /// main executor detection. The issue leads to ignoring the `@MainActor` /// attribute, which is supposed to run tasks on the main thread, when this web /// worker executor is preferred. /// /// ```swift /// func run(executor: WebWorkerTaskExecutor) async { /// await withTaskExecutorPreference(executor) { /// // This block runs on the Web Worker thread. /// await MainActor.run { /// // This block should run on the main thread, but it runs on /// // the Web Worker thread. /// } /// } /// // Back to the main thread. /// } /// ```` /// @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) // For `Atomic` and `TaskExecutor` types public final class WebWorkerTaskExecutor: TaskExecutor { /// A job worker dedicated to a single Web Worker thread. /// /// ## Lifetime /// The worker instance in Swift world lives as long as the /// `WebWorkerTaskExecutor` instance that spawned it lives. Thus, the worker /// instance may outlive the underlying Web Worker thread. fileprivate final class Worker: Sendable { /// The state of the worker. /// /// State transition: /// /// +---------+ +------------+ /// +----->| Idle |--[terminate]-->| Terminated | /// | +---+-----+ +------------+ /// | | /// | [enqueue] /// | | /// [no more job] | /// | v /// | +---------+ /// +------| Running | /// +---------+ /// enum State: UInt32, AtomicRepresentable { /// The worker is idle and waiting for a new job. case idle = 0 /// The worker is processing a job. case running = 1 /// The worker is terminated. case terminated = 2 } let state: Atomic<State> = Atomic(.idle) /// TODO: Rewrite it to use real queue :-) let jobQueue: Mutex<[UnownedJob]> = Mutex([]) /// The TaskExecutor that spawned this worker. /// This variable must be set only once when the worker is started. nonisolated(unsafe) weak var parentTaskExecutor: WebWorkerTaskExecutor.Executor? /// The thread ID of this worker. let tid: Atomic<pid_t> = Atomic(0) /// A trace statistics struct TraceStats: CustomStringConvertible { var enqueuedJobs: Int = 0 var dequeuedJobs: Int = 0 var processedJobs: Int = 0 var description: String { "TraceStats(E: \(enqueuedJobs), D: \(dequeuedJobs), P: \(processedJobs))" } } #if JAVASCRIPTKIT_STATS private let traceStats = Mutex(TraceStats()) private func statsIncrement(_ keyPath: WritableKeyPath<TraceStats, Int>) { traceStats.withLock { stats in stats[keyPath: keyPath] += 1 } } #else private func statsIncrement(_ keyPath: WritableKeyPath<TraceStats, Int>) {} #endif /// The worker bound to the current thread. /// Returns `nil` if the current thread is not a worker thread. static var currentThread: Worker? { guard let ptr = swjs_thread_local_task_executor_worker else { return nil } return Unmanaged<Worker>.fromOpaque(ptr).takeUnretainedValue() } init() {} /// Enqueue a job to the worker. func enqueue(_ job: UnownedJob) { statsIncrement(\.enqueuedJobs) var locked: Bool repeat { let result: Void? = jobQueue.withLockIfAvailable { queue in queue.append(job) // Wake up the worker to process a job. switch state.exchange(.running, ordering: .sequentiallyConsistent) { case .idle: if Self.currentThread === self { // Enqueueing a new job to the current worker thread, but it's idle now. // This is usually the case when a continuation is resumed by JS events // like `setTimeout` or `addEventListener`. // We can run the job and subsequently spawned jobs immediately. // JSPromise.resolve(JSValue.undefined).then { _ in _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in self.run() return JSValue.undefined }) } else { let tid = self.tid.load(ordering: .sequentiallyConsistent) swjs_wake_up_worker_thread(tid) } case .running: // The worker is already running, no need to wake up. break case .terminated: // Will not wake up the worker because it's already terminated. break } } locked = result != nil } while !locked } func scheduleNextRun() { _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in self.run() return JSValue.undefined }) } /// Run the worker /// /// NOTE: This function must be called from the worker thread. /// It will return when the worker is terminated. func start(executor: WebWorkerTaskExecutor.Executor) { // Get the thread ID of the current worker thread from the JS side. // NOTE: Unfortunately even though `pthread_self` internally holds the thread ID, // there is no public API to get it because it's a part of implementation details // of wasi-libc. So we need to get it from the JS side. let tid = swjs_get_worker_thread_id() // Set the thread-local variable to the current worker. // `self` outlives the worker thread because `Executor` retains the worker. // Thus it's safe to store the reference without extra retain. swjs_thread_local_task_executor_worker = Unmanaged.passUnretained(self).toOpaque() // Start listening events from the main thread. // This must be called after setting the swjs_thread_local_task_executor_worker // because the event listener enqueues jobs to the TLS worker. swjs_listen_message_from_main_thread() // Set the parent executor. parentTaskExecutor = executor // Store the thread ID to the worker. This notifies the main thread that the worker is started. self.tid.store(tid, ordering: .sequentiallyConsistent) trace("Worker.start tid=\(tid)") } /// Process jobs in the queue. /// /// Return when the worker has no more jobs to run or terminated. /// This method must be called from the worker thread after the worker /// is started by `start(executor:)`. func run() { trace("Worker.run") guard let executor = parentTaskExecutor else { preconditionFailure("The worker must be started with a parent executor.") } do { // Assert the state at the beginning of the run. let state = state.load(ordering: .sequentiallyConsistent) assert( state == .running || state == .terminated, "Invalid state: not running (tid=\(self.tid.load(ordering: .sequentiallyConsistent)), \(state))" ) } while true { // Pop a job from the queue. let job = jobQueue.withLock { queue -> UnownedJob? in if let job = queue.first { queue.removeFirst() return job } // No more jobs to run now. Wait for a new job to be enqueued. let (exchanged, original) = state.compareExchange(expected: .running, desired: .idle, ordering: .sequentiallyConsistent) switch (exchanged, original) { case (true, _): trace("Worker.run exited \(original) -> idle") return nil // Regular case case (false, .idle): preconditionFailure("unreachable: Worker/run running in multiple threads!?") case (false, .running): preconditionFailure("unreachable: running -> idle should return exchanged=true") case (false, .terminated): return nil // The worker is terminated, exit the loop. } } guard let job else { return } statsIncrement(\.dequeuedJobs) job.runSynchronously( on: executor.asUnownedTaskExecutor() ) statsIncrement(\.processedJobs) // The job is done. Continue to the next job. } } /// Terminate the worker. func terminate() { trace("Worker.terminate tid=\(tid.load(ordering: .sequentiallyConsistent))") state.store(.terminated, ordering: .sequentiallyConsistent) let tid = self.tid.load(ordering: .sequentiallyConsistent) guard tid != 0 else { // The worker is not started yet. return } swjs_terminate_worker_thread(tid) } } fileprivate final class Executor: TaskExecutor { let numberOfThreads: Int let workers: [Worker] let roundRobinIndex: Mutex<Int> = Mutex(0) init(numberOfThreads: Int) { self.numberOfThreads = numberOfThreads var workers = [Worker]() for _ in 0..<numberOfThreads { let worker = Worker() workers.append(worker) } self.workers = workers } func start(timeout: Duration, checkInterval: Duration) async throws { #if canImport(wasi_pthread) class Context: @unchecked Sendable { let executor: WebWorkerTaskExecutor.Executor let worker: Worker init(executor: WebWorkerTaskExecutor.Executor, worker: Worker) { self.executor = executor self.worker = worker } } trace("Executor.start") // Start worker threads via pthread_create. for worker in workers { // NOTE: The context must be allocated on the heap because // `pthread_create` on WASI does not guarantee the thread is started // immediately. The context must be retained until the thread is started. let context = Context(executor: self, worker: worker) let ptr = Unmanaged.passRetained(context).toOpaque() let ret = pthread_create(nil, nil, { ptr in // Cast to a optional pointer to absorb nullability variations between platforms. let ptr: UnsafeMutableRawPointer? = ptr let context = Unmanaged<Context>.fromOpaque(ptr!).takeRetainedValue() context.worker.start(executor: context.executor) // The worker is started. Throw JS exception to unwind the call stack without // reaching the `pthread_exit`, which is called immediately after this block. swjs_unsafe_event_loop_yield() return nil }, ptr) precondition(ret == 0, "Failed to create a thread") } // Wait until all worker threads are started and wire up messaging channels // between the main thread and workers to notify job enqueuing events each other. let clock = ContinuousClock() let workerInitStarted = clock.now for worker in workers { var tid: pid_t repeat { if workerInitStarted.duration(to: .now) > timeout { fatalError("Worker thread initialization timeout exceeded (\(timeout))") } tid = worker.tid.load(ordering: .sequentiallyConsistent) try await clock.sleep(for: checkInterval) } while tid == 0 swjs_listen_message_from_worker_thread(tid) } #else fatalError("Unsupported platform") #endif } func terminate() { for worker in workers { worker.terminate() } } func enqueue(_ job: UnownedJob) { precondition(!workers.isEmpty, "No worker threads are available") // If the current thread is a worker thread, enqueue the job to the current worker. if let worker = Worker.currentThread { worker.enqueue(job) return } // Otherwise (main thread), enqueue the job to the worker with round-robin scheduling. // TODO: Use a more sophisticated scheduling algorithm with priority. roundRobinIndex.withLock { index in let worker = workers[index] worker.enqueue(job) index = (index + 1) % numberOfThreads } } } private let executor: Executor /// Create a new Web Worker task executor. /// /// - Parameters: /// - numberOfThreads: The number of Web Worker threads to spawn. /// - timeout: The timeout to wait for all worker threads to be started. /// - checkInterval: The interval to check if all worker threads are started. public init(numberOfThreads: Int, timeout: Duration = .seconds(3), checkInterval: Duration = .microseconds(5)) async throws { self.executor = Executor(numberOfThreads: numberOfThreads) try await self.executor.start(timeout: timeout, checkInterval: checkInterval) } /// Terminate child Web Worker threads. /// Jobs enqueued to the executor after calling this method will be ignored. /// /// NOTE: This method must be called after all tasks that prefer this executor are done. /// Otherwise, the tasks may stuck forever. public func terminate() { executor.terminate() } /// The number of Web Worker threads. public var numberOfThreads: Int { executor.numberOfThreads } // MARK: TaskExecutor conformance /// Enqueue a job to the executor. /// /// NOTE: Called from the Swift Concurrency runtime. public func enqueue(_ job: UnownedJob) { Self.traceStatsIncrement(\.enqueueExecutor) executor.enqueue(job) } // MARK: Statistics /// Executor global statistics internal struct ExecutorStats: CustomStringConvertible { var sendJobToMainThread: Int = 0 var receiveJobFromWorkerThread: Int = 0 var enqueueGlobal: Int = 0 var enqueueExecutor: Int = 0 var description: String { "ExecutorStats(sendWtoM: \(sendJobToMainThread), recvWfromM: \(receiveJobFromWorkerThread)), enqueueGlobal: \(enqueueGlobal), enqueueExecutor: \(enqueueExecutor)" } } #if JAVASCRIPTKIT_STATS private static let stats = Mutex(ExecutorStats()) fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath<ExecutorStats, Int>) { stats.withLock { stats in stats[keyPath: keyPath] += 1 } } internal func dumpStats() { Self.stats.withLock { stats in print("WebWorkerTaskExecutor stats: \(stats)") } } #else fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath<ExecutorStats, Int>) {} internal func dumpStats() {} #endif // MARK: Global Executor hack private static var _mainThread: pthread_t? private static var _swift_task_enqueueGlobal_hook_original: UnsafeMutableRawPointer? private static var _swift_task_enqueueGlobalWithDelay_hook_original: UnsafeMutableRawPointer? private static var _swift_task_enqueueGlobalWithDeadline_hook_original: UnsafeMutableRawPointer? /// Install a global executor that forwards jobs from Web Worker threads to the main thread. /// /// This function must be called once before using the Web Worker task executor. public static func installGlobalExecutor() { #if canImport(wasi_pthread) // Ensure this function is called only once. guard _mainThread == nil else { return } _mainThread = pthread_self() assert(swjs_get_worker_thread_id() == -1, "\(#function) must be called on the main thread") _swift_task_enqueueGlobal_hook_original = swift_task_enqueueGlobal_hook typealias swift_task_enqueueGlobal_hook_Fn = @convention(thin) (UnownedJob, swift_task_enqueueGlobal_original) -> Void let swift_task_enqueueGlobal_hook_impl: swift_task_enqueueGlobal_hook_Fn = { job, base in WebWorkerTaskExecutor.traceStatsIncrement(\.enqueueGlobal) // Enter this block only if the current Task has no executor preference. if pthread_equal(pthread_self(), WebWorkerTaskExecutor._mainThread) != 0 { // If the current thread is the main thread, delegate the job // execution to the original hook of JavaScriptEventLoop. let original = unsafeBitCast(WebWorkerTaskExecutor._swift_task_enqueueGlobal_hook_original, to: swift_task_enqueueGlobal_hook_Fn.self) original(job, base) } else { // Notify the main thread to execute the job when a job is // enqueued from a Web Worker thread but without an executor preference. // This is usually the case when hopping back to the main thread // at the end of a task. WebWorkerTaskExecutor.traceStatsIncrement(\.sendJobToMainThread) let jobBitPattern = unsafeBitCast(job, to: UInt.self) swjs_send_job_to_main_thread(jobBitPattern) } } swift_task_enqueueGlobal_hook = unsafeBitCast(swift_task_enqueueGlobal_hook_impl, to: UnsafeMutableRawPointer?.self) #else fatalError("Unsupported platform") #endif } } /// Enqueue a job scheduled from a Web Worker thread to the main thread. /// This function is called when a job is enqueued from a Web Worker thread. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @_expose(wasm, "swjs_enqueue_main_job_from_worker") func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) { WebWorkerTaskExecutor.traceStatsIncrement(\.receiveJobFromWorkerThread) JavaScriptEventLoop.shared.enqueue(ExecutorJob(job)) } /// Wake up the worker thread. /// This function is called when a job is enqueued from the main thread to a worker thread. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @_expose(wasm, "swjs_wake_worker_thread") func _swjs_wake_worker_thread() { WebWorkerTaskExecutor.Worker.currentThread!.run() } #endif fileprivate func trace(_ message: String) { #if JAVASCRIPTKIT_TRACE JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n") #endif }