This repository was archived by the owner on Feb 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathtransport.js
213 lines (192 loc) · 5.81 KB
/
transport.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import { decodeError } from 'ipfs-message-port-protocol/error'
import { DisconnectError, TimeoutError, AbortError } from './error.js'
/**
* @template I,O
* @typedef {import('./query').Query<I, O>} Query
*/
/**
* RPC Transport over `MessagePort` that can execute queries. It takes care of
* executing queries by issuing a message with unique ID and fullfilling a
* query when corresponding response message is received. It also makes sure
* that aborted / timed out queries are cancelled as needed.
*
* It is expected that there will be at most one transport for a message port
* instance.
*
*/
export class MessageTransport {
/**
* Create transport for the underlying message port.
*
* @param {MessagePort} [port]
*/
constructor (port) {
this.port = null
// Assigining a random enough identifier to the transport, to ensure that
// query.id will be unique when multiple tabs are communicating with a
// a server in the SharedWorker.
this.id = Math.random()
.toString(32)
.slice(2)
// Local unique id on the transport which is incremented for each query.
this.nextID = 0
// Dictionary of pending requests
/** @type {Record<string, Query<any, any>>} */
this.queries = Object.create(null)
// If port is provided connect this transport to it. If not transport can
// queue queries and execute those once it's connected.
if (port) {
this.connect(port)
}
}
/**
* Executes given query with this transport and returns promise for it's
* result. Promise fails with an error if query fails.
*
* @template I, O
* @param {Query<I, O>} query
* @returns {Promise<O>}
*/
execute (query) {
const id = `${this.id}@${this.nextID++}`
this.queries[id] = query
// If query has a timeout set a timer.
if (query.timeout > 0 && query.timeout < Infinity) {
query.timerID = setTimeout(MessageTransport.timeout, query.timeout, this, id)
}
if (query.signal) {
query.signal.addEventListener('abort', () => this.abort(id), {
once: true
})
}
// If transport is connected (it has port) post a query, otherwise it
// will remain in the pending queries queue.
if (this.port) {
MessageTransport.postQuery(this.port, id, query)
}
return query.result
}
/**
* Connects this transport to the given message port. Throws `Error` if
* transport is already connected. All the pending queries will be executed
* as connection occurs.
*
* @param {MessagePort} port
*/
connect (port) {
if (this.port) {
throw new Error('Transport is already open')
} else {
this.port = port
this.port.addEventListener('message', this)
this.port.start()
// Go ever pending queries (that were submitted before transport was
// connected) and post them. This loop is safe because messages will not
// arrive while this loop is running so no mutation can occur.
for (const [id, query] of Object.entries(this.queries)) {
MessageTransport.postQuery(port, id, query)
}
}
}
/**
* Disconnects this transport. This will cause all the pending queries
* to be aborted and undelying message port to be closed.
*
* Once disconnected transport can not be reconnected back.
*/
disconnect () {
const error = new DisconnectError()
for (const [id, query] of Object.entries(this.queries)) {
query.fail(error)
this.abort(id)
}
// Note that reference to port is kept that ensures that attempt to
// reconnect will throw an error.
if (this.port) {
this.port.removeEventListener('message', this)
this.port.close()
}
}
/**
* Invoked on query timeout. If query is still pending it will fail and
* abort message will be send to a the server.
*
* @param {MessageTransport} self
* @param {string} id
*/
static timeout (self, id) {
const { queries } = self
const query = queries[id]
if (query) {
delete queries[id]
query.fail(new TimeoutError('request timed out'))
if (self.port) {
self.port.postMessage({ type: 'abort', id })
}
}
}
/**
* Aborts this query by failing with `AbortError` and sending an abort message
* to the server. If query is no longer pending this has no effect.
*
* @param {string} id
*/
abort (id) {
const { queries } = this
const query = queries[id]
if (query) {
delete queries[id]
query.fail(new AbortError())
if (this.port) {
this.port.postMessage({ type: 'abort', id })
}
if (query.timerID != null) {
clearTimeout(query.timerID)
}
}
}
/**
* Sends a given `query` with a given `id` over the message channel.
*
* @param {MessagePort} port
* @param {string} id
* @param {Query<any, any>} query
*/
static postQuery (port, id, query) {
port.postMessage(
{
type: 'query',
namespace: query.namespace,
method: query.method,
id,
input: query.toJSON()
},
// @ts-expect-error - Type signature does not expect 2nd undefined arg
query.transfer()
)
}
/**
* Handler is invoked when message on the message port is received.
*
* @param {MessageEvent} event
*/
handleEvent (event) {
const { id, result } = event.data
const query = this.queries[id]
// If query with a the given ID is found it is completed with the result,
// otherwise it is cancelled.
// Note: query may not be found when it was aborted on the client and at the
// same time server posted response.
if (query) {
delete this.queries[id]
if (result.ok) {
query.succeed(result.value)
} else {
query.fail(decodeError(result.error))
}
if (query.timerID != null) {
clearTimeout(query.timerID)
}
}
}
}