-
Notifications
You must be signed in to change notification settings - Fork 28k
/
Copy pathnode-web-streams-helper.ts
671 lines (580 loc) · 20.3 KB
/
node-web-streams-helper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
import { getTracer } from '../lib/trace/tracer'
import { AppRenderSpan } from '../lib/trace/constants'
import { DetachedPromise } from '../../lib/detached-promise'
import { scheduleImmediate, atLeastOneTask } from '../../lib/scheduler'
import { ENCODED_TAGS } from './encodedTags'
import {
indexOfUint8Array,
isEquivalentUint8Arrays,
removeFromUint8Array,
} from './uint8array-helpers'
function voidCatch() {
// this catcher is designed to be used with pipeTo where we expect the underlying
// pipe implementation to forward errors but we don't want the pipeTo promise to reject
// and be unhandled
}
export type ReactReadableStream = ReadableStream<Uint8Array> & {
allReady?: Promise<void> | undefined
}
// We can share the same encoder instance everywhere
// Notably we cannot do the same for TextDecoder because it is stateful
// when handling streaming data
const encoder = new TextEncoder()
export function chainStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
// We could encode this invariant in the arguments but current uses of this function pass
// use spread so it would be missed by
if (streams.length === 0) {
throw new Error('Invariant: chainStreams requires at least one stream')
}
// If we only have 1 stream we fast path it by returning just this stream
if (streams.length === 1) {
return streams[0]
}
const { readable, writable } = new TransformStream()
// We always initiate pipeTo immediately. We know we have at least 2 streams
// so we need to avoid closing the writable when this one finishes.
let promise = streams[0].pipeTo(writable, { preventClose: true })
let i = 1
for (; i < streams.length - 1; i++) {
const nextStream = streams[i]
promise = promise.then(() =>
nextStream.pipeTo(writable, { preventClose: true })
)
}
// We can omit the length check because we halted before the last stream and there
// is at least two streams so the lastStream here will always be defined
const lastStream = streams[i]
promise = promise.then(() => lastStream.pipeTo(writable))
// Catch any errors from the streams and ignore them, they will be handled
// by whatever is consuming the readable stream.
promise.catch(voidCatch)
return readable
}
export function streamFromString(str: string): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(str))
controller.close()
},
})
}
export function streamFromBuffer(chunk: Buffer): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
controller.enqueue(chunk)
controller.close()
},
})
}
export async function streamToBuffer(
stream: ReadableStream<Uint8Array>
): Promise<Buffer> {
const buffers: Buffer[] = []
// @ts-expect-error TypeScript gets this wrong (https://nodejs.org/api/webstreams.html#async-iteration)
for await (const chunk of stream) {
buffers.push(chunk)
}
return Buffer.concat(buffers)
}
export async function streamToString(
stream: ReadableStream<Uint8Array>
): Promise<string> {
const decoder = new TextDecoder('utf-8', { fatal: true })
let string = ''
// @ts-expect-error TypeScript gets this wrong (https://nodejs.org/api/webstreams.html#async-iteration)
for await (const chunk of stream) {
string += decoder.decode(chunk, { stream: true })
}
string += decoder.decode()
return string
}
export function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let bufferedChunks: Array<Uint8Array> = []
let bufferByteLength: number = 0
let pending: DetachedPromise<void> | undefined
const flush = (controller: TransformStreamDefaultController) => {
// If we already have a pending flush, then return early.
if (pending) return
const detached = new DetachedPromise<void>()
pending = detached
scheduleImmediate(() => {
try {
const chunk = new Uint8Array(bufferByteLength)
let copiedBytes = 0
for (let i = 0; i < bufferedChunks.length; i++) {
const bufferedChunk = bufferedChunks[i]
chunk.set(bufferedChunk, copiedBytes)
copiedBytes += bufferedChunk.byteLength
}
// We just wrote all the buffered chunks so we need to reset the bufferedChunks array
// and our bufferByteLength to prepare for the next round of buffered chunks
bufferedChunks.length = 0
bufferByteLength = 0
controller.enqueue(chunk)
} catch {
// If an error occurs while enqueuing it can't be due to this
// transformers fault. It's likely due to the controller being
// errored due to the stream being cancelled.
} finally {
pending = undefined
detached.resolve()
}
})
}
return new TransformStream({
transform(chunk, controller) {
// Combine the previous buffer with the new chunk.
bufferedChunks.push(chunk)
bufferByteLength += chunk.byteLength
// Flush the buffer to the controller.
flush(controller)
},
flush() {
if (!pending) return
return pending.promise
},
})
}
function createInsertedHTMLStream(
getServerInsertedHTML: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
return new TransformStream({
transform: async (chunk, controller) => {
const html = await getServerInsertedHTML()
if (html) {
controller.enqueue(encoder.encode(html))
}
controller.enqueue(chunk)
},
})
}
export function renderToInitialFizzStream({
ReactDOMServer,
element,
streamOptions,
}: {
ReactDOMServer: typeof import('react-dom/server.edge')
element: React.ReactElement
streamOptions?: any
}): Promise<ReactReadableStream> {
return getTracer().trace(AppRenderSpan.renderToReadableStream, async () =>
ReactDOMServer.renderToReadableStream(element, streamOptions)
)
}
function createHeadInsertionTransformStream(
insert: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
let inserted = false
let freezing = false
// We need to track if this transform saw any bytes because if it didn't
// we won't want to insert any server HTML at all
let hasBytes = false
return new TransformStream({
async transform(chunk, controller) {
hasBytes = true
// While react is flushing chunks, we don't apply insertions
if (freezing) {
controller.enqueue(chunk)
return
}
const insertion = await insert()
if (inserted) {
if (insertion) {
const encodedInsertion = encoder.encode(insertion)
controller.enqueue(encodedInsertion)
}
controller.enqueue(chunk)
freezing = true
} else {
// TODO (@Ethan-Arrowood): Replace the generic `indexOfUint8Array` method with something finely tuned for the subset of things actually being checked for.
const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD)
if (index !== -1) {
if (insertion) {
const encodedInsertion = encoder.encode(insertion)
const insertedHeadContent = new Uint8Array(
chunk.length + encodedInsertion.length
)
insertedHeadContent.set(chunk.slice(0, index))
insertedHeadContent.set(encodedInsertion, index)
insertedHeadContent.set(
chunk.slice(index),
index + encodedInsertion.length
)
controller.enqueue(insertedHeadContent)
} else {
controller.enqueue(chunk)
}
freezing = true
inserted = true
}
}
if (!inserted) {
controller.enqueue(chunk)
} else {
scheduleImmediate(() => {
freezing = false
})
}
},
async flush(controller) {
// Check before closing if there's anything remaining to insert.
if (hasBytes) {
const insertion = await insert()
if (insertion) {
controller.enqueue(encoder.encode(insertion))
}
}
},
})
}
// Suffix after main body content - scripts before </body>,
// but wait for the major chunks to be enqueued.
function createDeferredSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
let flushed = false
let pending: DetachedPromise<void> | undefined
const flush = (controller: TransformStreamDefaultController) => {
const detached = new DetachedPromise<void>()
pending = detached
scheduleImmediate(() => {
try {
controller.enqueue(encoder.encode(suffix))
} catch {
// If an error occurs while enqueuing it can't be due to this
// transformers fault. It's likely due to the controller being
// errored due to the stream being cancelled.
} finally {
pending = undefined
detached.resolve()
}
})
}
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
// If we've already flushed, we're done.
if (flushed) return
// Schedule the flush to happen.
flushed = true
flush(controller)
},
flush(controller) {
if (pending) return pending.promise
if (flushed) return
// Flush now.
controller.enqueue(encoder.encode(suffix))
},
})
}
// Merge two streams into one. Ensure the final transform stream is closed
// when both are finished.
function createMergedTransformStream(
stream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let pull: Promise<void> | null = null
let donePulling = false
async function startPulling(controller: TransformStreamDefaultController) {
if (pull) {
return
}
const reader = stream.getReader()
// NOTE: streaming flush
// We are buffering here for the inlined data stream because the
// "shell" stream might be chunkenized again by the underlying stream
// implementation, e.g. with a specific high-water mark. To ensure it's
// the safe timing to pipe the data stream, this extra tick is
// necessary.
// We don't start reading until we've left the current Task to ensure
// that it's inserted after flushing the shell. Note that this implementation
// might get stale if impl details of Fizz change in the future.
await atLeastOneTask()
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
donePulling = true
return
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
}
}
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
// Start the streaming if it hasn't already been started yet.
if (!pull) {
pull = startPulling(controller)
}
},
flush(controller) {
if (donePulling) {
return
}
return pull || startPulling(controller)
},
})
}
/**
* This transform stream moves the suffix to the end of the stream, so results
* like `</body></html><script>...</script>` will be transformed to
* `<script>...</script></body></html>`.
*/
function createMoveSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
let foundSuffix = false
const encodedSuffix = encoder.encode(suffix)
return new TransformStream({
transform(chunk, controller) {
if (foundSuffix) {
return controller.enqueue(chunk)
}
const index = indexOfUint8Array(chunk, encodedSuffix)
if (index > -1) {
foundSuffix = true
// If the whole chunk is the suffix, then don't write anything, it will
// be written in the flush.
if (chunk.length === suffix.length) {
return
}
// Write out the part before the suffix.
const before = chunk.slice(0, index)
controller.enqueue(before)
// In the case where the suffix is in the middle of the chunk, we need
// to split the chunk into two parts.
if (chunk.length > suffix.length + index) {
// Write out the part after the suffix.
const after = chunk.slice(index + suffix.length)
controller.enqueue(after)
}
} else {
controller.enqueue(chunk)
}
},
flush(controller) {
// Even if we didn't find the suffix, the HTML is not valid if we don't
// add it, so insert it at the end.
controller.enqueue(encodedSuffix)
},
})
}
function createStripDocumentClosingTagsTransform(): TransformStream<
Uint8Array,
Uint8Array
> {
return new TransformStream({
transform(chunk, controller) {
// We rely on the assumption that chunks will never break across a code unit.
// This is reasonable because we currently concat all of React's output from a single
// flush into one chunk before streaming it forward which means the chunk will represent
// a single coherent utf-8 string. This is not safe to use if we change our streaming to no
// longer do this large buffered chunk
if (
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY_AND_HTML) ||
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY) ||
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.HTML)
) {
// the entire chunk is the closing tags; return without enqueueing anything.
return
}
// We assume these tags will go at together at the end of the document and that
// they won't appear anywhere else in the document. This is not really a safe assumption
// but until we revamp our streaming infra this is a performant way to string the tags
chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.BODY)
chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.HTML)
controller.enqueue(chunk)
},
})
}
/*
* Checks if the root layout is missing the html or body tags
* and if so, it will inject a script tag to throw an error in the browser, showing the user
* the error message in the error overlay.
*/
export function createRootLayoutValidatorStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let foundHtml = false
let foundBody = false
return new TransformStream({
async transform(chunk, controller) {
// Peek into the streamed chunk to see if the tags are present.
if (
!foundHtml &&
indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.HTML) > -1
) {
foundHtml = true
}
if (
!foundBody &&
indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.BODY) > -1
) {
foundBody = true
}
controller.enqueue(chunk)
},
flush(controller) {
const missingTags: typeof window.__next_root_layout_missing_tags = []
if (!foundHtml) missingTags.push('html')
if (!foundBody) missingTags.push('body')
if (!missingTags.length) return
controller.enqueue(
encoder.encode(
`<script>self.__next_root_layout_missing_tags=${JSON.stringify(
missingTags
)}</script>`
)
)
},
})
}
function chainTransformers<T>(
readable: ReadableStream<T>,
transformers: ReadonlyArray<TransformStream<T, T> | null>
): ReadableStream<T> {
let stream = readable
for (const transformer of transformers) {
if (!transformer) continue
stream = stream.pipeThrough(transformer)
}
return stream
}
export type ContinueStreamOptions = {
inlinedDataStream: ReadableStream<Uint8Array> | undefined
isStaticGeneration: boolean
getServerInsertedHTML: (() => Promise<string>) | undefined
serverInsertedHTMLToHead: boolean
validateRootLayout?: boolean
/**
* Suffix to inject after the buffered data, but before the close tags.
*/
suffix?: string | undefined
}
export async function continueFizzStream(
renderStream: ReactReadableStream,
{
suffix,
inlinedDataStream,
isStaticGeneration,
getServerInsertedHTML,
serverInsertedHTMLToHead,
validateRootLayout,
}: ContinueStreamOptions
): Promise<ReadableStream<Uint8Array>> {
const closeTag = '</body></html>'
// Suffix itself might contain close tags at the end, so we need to split it.
const suffixUnclosed = suffix ? suffix.split(closeTag, 1)[0] : null
// If we're generating static HTML and there's an `allReady` promise on the
// stream, we need to wait for it to resolve before continuing.
if (isStaticGeneration && 'allReady' in renderStream) {
await renderStream.allReady
}
return chainTransformers(renderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
// Insert generated tags to head
getServerInsertedHTML && !serverInsertedHTMLToHead
? createInsertedHTMLStream(getServerInsertedHTML)
: null,
// Insert suffix content
suffixUnclosed != null && suffixUnclosed.length > 0
? createDeferredSuffixStream(suffixUnclosed)
: null,
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
inlinedDataStream ? createMergedTransformStream(inlinedDataStream) : null,
// Validate the root layout for missing html or body tags
validateRootLayout ? createRootLayoutValidatorStream() : null,
// Close tags should always be deferred to the end
createMoveSuffixStream(closeTag),
// Special head insertions
// TODO-APP: Insert server side html to end of head in app layout rendering, to avoid
// hydration errors. Remove this once it's ready to be handled by react itself.
getServerInsertedHTML && serverInsertedHTMLToHead
? createHeadInsertionTransformStream(getServerInsertedHTML)
: null,
])
}
type ContinueDynamicPrerenderOptions = {
getServerInsertedHTML: () => Promise<string>
}
export async function continueDynamicPrerender(
prerenderStream: ReadableStream<Uint8Array>,
{ getServerInsertedHTML }: ContinueDynamicPrerenderOptions
) {
return (
prerenderStream
// Buffer everything to avoid flushing too frequently
.pipeThrough(createBufferedTransformStream())
.pipeThrough(createStripDocumentClosingTagsTransform())
// Insert generated tags to head
.pipeThrough(createHeadInsertionTransformStream(getServerInsertedHTML))
)
}
type ContinueStaticPrerenderOptions = {
inlinedDataStream: ReadableStream<Uint8Array>
getServerInsertedHTML: () => Promise<string>
}
export async function continueStaticPrerender(
prerenderStream: ReadableStream<Uint8Array>,
{ inlinedDataStream, getServerInsertedHTML }: ContinueStaticPrerenderOptions
) {
const closeTag = '</body></html>'
return (
prerenderStream
// Buffer everything to avoid flushing too frequently
.pipeThrough(createBufferedTransformStream())
// Insert generated tags to head
.pipeThrough(createHeadInsertionTransformStream(getServerInsertedHTML))
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
.pipeThrough(createMergedTransformStream(inlinedDataStream))
// Close tags should always be deferred to the end
.pipeThrough(createMoveSuffixStream(closeTag))
)
}
type ContinueResumeOptions = {
inlinedDataStream: ReadableStream<Uint8Array>
getServerInsertedHTML: () => Promise<string>
}
export async function continueDynamicHTMLResume(
renderStream: ReadableStream<Uint8Array>,
{ inlinedDataStream, getServerInsertedHTML }: ContinueResumeOptions
) {
const closeTag = '</body></html>'
return (
renderStream
// Buffer everything to avoid flushing too frequently
.pipeThrough(createBufferedTransformStream())
// Insert generated tags to head
.pipeThrough(createHeadInsertionTransformStream(getServerInsertedHTML))
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
.pipeThrough(createMergedTransformStream(inlinedDataStream))
// Close tags should always be deferred to the end
.pipeThrough(createMoveSuffixStream(closeTag))
)
}
type ContinueDynamicDataResumeOptions = {
inlinedDataStream: ReadableStream<Uint8Array>
}
export async function continueDynamicDataResume(
renderStream: ReadableStream<Uint8Array>,
{ inlinedDataStream }: ContinueDynamicDataResumeOptions
) {
const closeTag = '</body></html>'
return (
renderStream
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
.pipeThrough(createMergedTransformStream(inlinedDataStream))
// Close tags should always be deferred to the end
.pipeThrough(createMoveSuffixStream(closeTag))
)
}