Skip to content

Add a mechanism to "Transfer" JSObject between Workers #292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
28d5ec0
Add `JSObject.transfer` and `JSObject.receive` APIs
kateinoigakukun Mar 10, 2025
e406cd3
Stop hardcoding the Swift toolchain version in the Multithreading exa…
kateinoigakukun Mar 10, 2025
cfa1b2d
Update Multithreading example to support transferable objects
kateinoigakukun Mar 10, 2025
9d335a8
Add OffscreenCanvas example
kateinoigakukun Mar 10, 2025
98cec71
Rename `JSObject.receive` to `JSObject.Transferring.receive`
kateinoigakukun Mar 10, 2025
9b84176
Update test harness to support transferring
kateinoigakukun Mar 10, 2025
c481614
Fix JSObject lifetime issue while transferring
kateinoigakukun Mar 10, 2025
65ddcd3
Add basic tests for transferring objects between threads
kateinoigakukun Mar 10, 2025
f0bd60c
Fix native build
kateinoigakukun Mar 10, 2025
8d4bba6
Add cautionary notes to the documentation of `JSObject.transfer()`.
kateinoigakukun Mar 10, 2025
09d5311
Rename `JSObject.Transferring` to `JSTransferring<T>`
kateinoigakukun Mar 11, 2025
f25bfec
MessageBroker
kateinoigakukun Mar 11, 2025
58f91c3
Relax deinit requirement
kateinoigakukun Mar 11, 2025
2a081de
Remove dead code and fix error message
kateinoigakukun Mar 11, 2025
4fe37e7
Rename JSTransferring to JSSending
kateinoigakukun Mar 11, 2025
eeff111
Add `JSSending.receive(...)` to receive multiple objects at once
kateinoigakukun Mar 11, 2025
44a5dba
Build fix
kateinoigakukun Mar 11, 2025
b678f71
Skip multi-transfer tests
kateinoigakukun Mar 11, 2025
f5e3a95
Rename JSObject+Transferring.swift to JSSending.swift
kateinoigakukun Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add JSObject.transfer and JSObject.receive APIs
These APIs allow transferring a `JSObject` between worker threads. The
`JSObject.transfer` method creates a `JSObject.Transferring` instance
that is `Sendable` and can be sent to another worker thread. The
`JSObject.receive` method requests the object from the source worker
thread and postMessage it to the destination worker thread.
  • Loading branch information
kateinoigakukun committed Mar 10, 2025
commit 28d5ec060749d2ed386b554e282977a4ecee9a4a
147 changes: 137 additions & 10 deletions Runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,45 @@ import {
pointer,
TypedArray,
ImportedFunctions,
MAIN_THREAD_TID,
} from "./types.js";
import * as JSValue from "./js-value.js";
import { Memory } from "./memory.js";

type TransferMessage = {
type: "transfer";
data: {
object: any;
transferring: pointer;
destinationTid: number;
};
};

type RequestTransferMessage = {
type: "requestTransfer";
data: {
objectRef: ref;
objectSourceTid: number;
transferring: pointer;
destinationTid: number;
};
};

type TransferErrorMessage = {
type: "transferError";
data: {
error: string;
};
};

type MainToWorkerMessage = {
type: "wake";
};
} | RequestTransferMessage | TransferMessage | TransferErrorMessage;

type WorkerToMainMessage = {
type: "job";
data: number;
};
} | RequestTransferMessage | TransferMessage | TransferErrorMessage;

/**
* A thread channel is a set of functions that are used to communicate between
Expand Down Expand Up @@ -60,8 +87,9 @@ export type SwiftRuntimeThreadChannel =
* This function is used to send messages from the worker thread to the main thread.
* The message submitted by this function is expected to be listened by `listenMessageFromWorkerThread`.
* @param message The message to be sent to the main thread.
* @param transfer The array of objects to be transferred to the main thread.
*/
postMessageToMainThread: (message: WorkerToMainMessage) => void;
postMessageToMainThread: (message: WorkerToMainMessage, transfer: any[]) => void;
/**
* This function is expected to be set in the worker thread and should listen
* to messages from the main thread sent by `postMessageToWorkerThread`.
Expand All @@ -75,8 +103,9 @@ export type SwiftRuntimeThreadChannel =
* The message submitted by this function is expected to be listened by `listenMessageFromMainThread`.
* @param tid The thread ID of the worker thread.
* @param message The message to be sent to the worker thread.
* @param transfer The array of objects to be transferred to the worker thread.
*/
postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage) => void;
postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage, transfer: any[]) => void;
/**
* This function is expected to be set in the main thread and should listen
* to messages sent by `postMessageToMainThread` from the worker thread.
Expand Down Expand Up @@ -610,8 +639,37 @@ export class SwiftRuntime {
case "wake":
this.exports.swjs_wake_worker_thread();
break;
case "requestTransfer": {
const object = this.memory.getObject(message.data.objectRef);
const messageToMainThread: TransferMessage = {
type: "transfer",
data: {
object,
destinationTid: message.data.destinationTid,
transferring: message.data.transferring,
},
};
try {
this.postMessageToMainThread(messageToMainThread, [object]);
} catch (error) {
this.postMessageToMainThread({
type: "transferError",
data: { error: String(error) },
});
}
break;
}
case "transfer": {
const objectRef = this.memory.retain(message.data.object);
this.exports.swjs_receive_object(objectRef, message.data.transferring);
break;
}
case "transferError": {
console.error(message.data.error); // TODO: Handle the error
break;
}
default:
const unknownMessage: never = message.type;
const unknownMessage: never = message;
throw new Error(`Unknown message type: ${unknownMessage}`);
}
});
Expand All @@ -632,8 +690,57 @@ export class SwiftRuntime {
case "job":
this.exports.swjs_enqueue_main_job_from_worker(message.data);
break;
case "requestTransfer": {
if (message.data.objectSourceTid == MAIN_THREAD_TID) {
const object = this.memory.getObject(message.data.objectRef);
if (message.data.destinationTid != tid) {
throw new Error("Invariant violation: The destination tid of the transfer request must be the same as the tid of the worker thread that received the request.");
}
this.postMessageToWorkerThread(message.data.destinationTid, {
type: "transfer",
data: {
object,
transferring: message.data.transferring,
destinationTid: message.data.destinationTid,
},
}, [object]);
} else {
// Proxy the transfer request to the worker thread that owns the object
this.postMessageToWorkerThread(message.data.objectSourceTid, {
type: "requestTransfer",
data: {
objectRef: message.data.objectRef,
objectSourceTid: tid,
transferring: message.data.transferring,
destinationTid: message.data.destinationTid,
},
});
}
break;
}
case "transfer": {
if (message.data.destinationTid == MAIN_THREAD_TID) {
const objectRef = this.memory.retain(message.data.object);
this.exports.swjs_receive_object(objectRef, message.data.transferring);
} else {
// Proxy the transfer response to the destination worker thread
this.postMessageToWorkerThread(message.data.destinationTid, {
type: "transfer",
data: {
object: message.data.object,
transferring: message.data.transferring,
destinationTid: message.data.destinationTid,
},
}, [message.data.object]);
}
break;
}
case "transferError": {
console.error(message.data.error); // TODO: Handle the error
break;
}
default:
const unknownMessage: never = message.type;
const unknownMessage: never = message;
throw new Error(`Unknown message type: ${unknownMessage}`);
}
},
Expand All @@ -649,27 +756,47 @@ export class SwiftRuntime {
// Main thread's tid is always -1
return this.tid || -1;
},
swjs_request_transferring_object: (
object_ref: ref,
object_source_tid: number,
transferring: pointer,
) => {
if (this.tid == object_source_tid) {
// Fast path: The object is already in the same thread
this.exports.swjs_receive_object(object_ref, transferring);
return;
}
this.postMessageToMainThread({
type: "requestTransfer",
data: {
objectRef: object_ref,
objectSourceTid: object_source_tid,
transferring,
destinationTid: this.tid ?? MAIN_THREAD_TID,
},
});
},
};
}

private postMessageToMainThread(message: WorkerToMainMessage) {
private postMessageToMainThread(message: WorkerToMainMessage, transfer: any[] = []) {
const threadChannel = this.options.threadChannel;
if (!(threadChannel && "postMessageToMainThread" in threadChannel)) {
throw new Error(
"postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."
);
}
threadChannel.postMessageToMainThread(message);
threadChannel.postMessageToMainThread(message, transfer);
}

private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage) {
private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage, transfer: any[] = []) {
const threadChannel = this.options.threadChannel;
if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) {
throw new Error(
"postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."
);
}
threadChannel.postMessageToWorkerThread(tid, message);
threadChannel.postMessageToWorkerThread(tid, message, transfer);
}
}

Expand Down
8 changes: 8 additions & 0 deletions Runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface ExportedFunctions {

swjs_enqueue_main_job_from_worker(unowned_job: number): void;
swjs_wake_worker_thread(): void;
swjs_receive_object(object: ref, transferring: pointer): void;
}

export interface ImportedFunctions {
Expand Down Expand Up @@ -112,6 +113,11 @@ export interface ImportedFunctions {
swjs_listen_message_from_worker_thread: (tid: number) => void;
swjs_terminate_worker_thread: (tid: number) => void;
swjs_get_worker_thread_id: () => number;
swjs_request_transferring_object: (
object_ref: ref,
object_source_tid: number,
transferring: pointer,
) => void;
}

export const enum LibraryFeatures {
Expand All @@ -133,3 +139,5 @@ export type TypedArray =
export function assertNever(x: never, message: string) {
throw new Error(message);
}

export const MAIN_THREAD_TID = -1;
60 changes: 60 additions & 0 deletions Sources/JavaScriptEventLoop/JSObject+Transferring.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
@_spi(JSObject_id) import JavaScriptKit
import _CJavaScriptKit

extension JSObject {
public class Transferring: @unchecked Sendable {
fileprivate let sourceTid: Int32
fileprivate let idInSource: JavaScriptObjectRef
fileprivate var continuation: CheckedContinuation<JSObject, Error>? = nil

init(sourceTid: Int32, id: JavaScriptObjectRef) {
self.sourceTid = sourceTid
self.idInSource = id
}

func receive(isolation: isolated (any Actor)?) async throws -> JSObject {
#if compiler(>=6.1) && _runtime(_multithreaded)
swjs_request_transferring_object(
idInSource,
sourceTid,
Unmanaged.passRetained(self).toOpaque()
)
return try await withCheckedThrowingContinuation { continuation in
self.continuation = continuation
}
#else
return JSObject(id: idInSource)
#endif
}
}

/// Transfers the ownership of a `JSObject` to be sent to another Worker.
///
/// - Parameter object: The `JSObject` to be transferred.
/// - Returns: A `JSTransferring` instance that can be shared across worker threads.
/// - Note: The original `JSObject` should not be accessed after calling this method.
public static func transfer(_ object: JSObject) -> Transferring {
#if compiler(>=6.1) && _runtime(_multithreaded)
Transferring(sourceTid: object.ownerTid, id: object.id)
#else
Transferring(sourceTid: -1, id: object.id)
#endif
}

/// Receives a transferred `JSObject` from a Worker.
///
/// - Parameter transferring: The `JSTransferring` instance received from other worker threads.
/// - Returns: The reconstructed `JSObject` that can be used in the receiving Worker.
public static func receive(_ transferring: Transferring, isolation: isolated (any Actor)? = #isolation) async throws -> JSObject {
try await transferring.receive(isolation: isolation)
}
}

#if compiler(>=6.1) // @_expose and @_extern are only available in Swift 6.1+
@_expose(wasm, "swjs_receive_object")
@_cdecl("swjs_receive_object")
#endif
func _swjs_receive_object(_ object: JavaScriptObjectRef, _ transferring: UnsafeRawPointer) {
let transferring = Unmanaged<JSObject.Transferring>.fromOpaque(transferring).takeRetainedValue()
transferring.continuation?.resume(returning: JSObject(id: object))
}
16 changes: 4 additions & 12 deletions Sources/JavaScriptKit/FundamentalObjects/JSObject.swift
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
import _CJavaScriptKit

#if arch(wasm32)
#if canImport(wasi_pthread)
import wasi_pthread
#endif
#else
import Foundation // for pthread_t on non-wasi platforms
#endif

/// `JSObject` represents an object in JavaScript and supports dynamic member lookup.
/// Any member access like `object.foo` will dynamically request the JavaScript and Swift
/// runtime bridge library for a member with the specified name in this object.
Expand All @@ -31,14 +23,14 @@ public class JSObject: Equatable {
public var id: JavaScriptObjectRef

#if compiler(>=6.1) && _runtime(_multithreaded)
private let ownerThread: pthread_t
package let ownerTid: Int32
#endif

@_spi(JSObject_id)
public init(id: JavaScriptObjectRef) {
self.id = id
#if compiler(>=6.1) && _runtime(_multithreaded)
self.ownerThread = pthread_self()
self.ownerTid = swjs_get_worker_thread_id_cached()
#endif
}

Expand All @@ -51,14 +43,14 @@ public class JSObject: Equatable {
/// object spaces are not shared across threads backed by Web Workers.
private func assertOnOwnerThread(hint: @autoclosure () -> String) {
#if compiler(>=6.1) && _runtime(_multithreaded)
precondition(pthread_equal(ownerThread, pthread_self()) != 0, "JSObject is being accessed from a thread other than the owner thread: \(hint())")
precondition(ownerTid == swjs_get_worker_thread_id_cached(), "JSObject is being accessed from a thread other than the owner thread: \(hint())")
#endif
}

/// Asserts that the two objects being compared are owned by the same thread.
private static func assertSameOwnerThread(lhs: JSObject, rhs: JSObject, hint: @autoclosure () -> String) {
#if compiler(>=6.1) && _runtime(_multithreaded)
precondition(pthread_equal(lhs.ownerThread, rhs.ownerThread) != 0, "JSObject is being accessed from a thread other than the owner thread: \(hint())")
precondition(lhs.ownerTid == rhs.ownerTid, "JSObject is being accessed from a thread other than the owner thread: \(hint())")
#endif
}

Expand Down
Loading