-
Notifications
You must be signed in to change notification settings - Fork 28k
/
Copy pathpipe-readable.ts
146 lines (125 loc) · 4.35 KB
/
pipe-readable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import type { ServerResponse } from 'node:http'
import {
ResponseAbortedName,
createAbortController,
} from './web/spec-extension/adapters/next-request'
import { DetachedPromise } from '../lib/detached-promise'
import { getTracer } from './lib/trace/tracer'
import { NextNodeServerSpan } from './lib/trace/constants'
import { getClientComponentLoaderMetrics } from './client-component-renderer-logger'
export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
return e?.name === 'AbortError' || e?.name === ResponseAbortedName
}
function createWriterFromResponse(
res: ServerResponse,
waitUntilForEnd?: Promise<unknown>
): WritableStream<Uint8Array> {
let started = false
// Create a promise that will resolve once the response has drained. See
// https://nodejs.org/api/stream.html#stream_event_drain
let drained = new DetachedPromise<void>()
function onDrain() {
drained.resolve()
}
res.on('drain', onDrain)
// If the finish event fires, it means we shouldn't block and wait for the
// drain event.
res.once('close', () => {
res.off('drain', onDrain)
drained.resolve()
})
// Create a promise that will resolve once the response has finished. See
// https://nodejs.org/api/http.html#event-finish_1
const finished = new DetachedPromise<void>()
res.once('finish', () => {
finished.resolve()
})
// Create a writable stream that will write to the response.
return new WritableStream<Uint8Array>({
write: async (chunk) => {
// You'd think we'd want to use `start` instead of placing this in `write`
// but this ensures that we don't actually flush the headers until we've
// started writing chunks.
if (!started) {
started = true
if (
'performance' in globalThis &&
process.env.NEXT_OTEL_PERFORMANCE_PREFIX
) {
const metrics = getClientComponentLoaderMetrics()
if (metrics) {
performance.measure(
`${process.env.NEXT_OTEL_PERFORMANCE_PREFIX}:next-client-component-loading`,
{
start: metrics.clientComponentLoadStart,
end:
metrics.clientComponentLoadStart +
metrics.clientComponentLoadTimes,
}
)
}
}
res.flushHeaders()
getTracer().trace(
NextNodeServerSpan.startResponse,
{
spanName: 'start response',
},
() => undefined
)
}
try {
const ok = res.write(chunk)
// Added by the `compression` middleware, this is a function that will
// flush the partially-compressed response to the client.
if ('flush' in res && typeof res.flush === 'function') {
res.flush()
}
// If the write returns false, it means there's some backpressure, so
// wait until it's streamed before continuing.
if (!ok) {
await drained.promise
// Reset the drained promise so that we can wait for the next drain event.
drained = new DetachedPromise<void>()
}
} catch (err) {
res.end()
throw new Error('failed to write chunk to response', { cause: err })
}
},
abort: (err) => {
if (res.writableFinished) return
res.destroy(err)
},
close: async () => {
// if a waitUntil promise was passed, wait for it to resolve before
// ending the response.
if (waitUntilForEnd) {
await waitUntilForEnd
}
if (res.writableFinished) return
res.end()
return finished.promise
},
})
}
export async function pipeToNodeResponse(
readable: ReadableStream<Uint8Array>,
res: ServerResponse,
waitUntilForEnd?: Promise<unknown>
) {
try {
// If the response has already errored, then just return now.
const { errored, destroyed } = res
if (errored || destroyed) return
// Create a new AbortController so that we can abort the readable if the
// client disconnects.
const controller = createAbortController(res)
const writer = createWriterFromResponse(res, waitUntilForEnd)
await readable.pipeTo(writer, { signal: controller.signal })
} catch (err: any) {
// If this isn't related to an abort error, re-throw it.
if (isAbortError(err)) return
throw new Error('failed to pipe response', { cause: err })
}
}