1
1
'use strict'
2
2
3
- const { Duplex } = require ( 'stream' )
3
+ const { Duplex, PassThrough } = require ( 'stream' )
4
4
const { isSource } = require ( 'is-pull-stream' )
5
- const toStream = require ( 'pull-stream-to-stream' )
5
+ const pump = require ( 'pump' )
6
+ const pullToStream = require ( 'pull-stream-to-stream' )
7
+ const bufferToStream = require ( 'buffer-to-stream' )
6
8
7
9
/** @private @typedef {import("../files/add-experimental").AddOptions } AddOptions */
8
10
@@ -63,7 +65,8 @@ class Multipart extends Duplex {
63
65
constructor ( options ) {
64
66
super ( {
65
67
writableObjectMode : true ,
66
- writableHighWaterMark : 1
68
+ writableHighWaterMark : 1 ,
69
+ readableHighWaterMark : options . chunkSize ? Math . max ( 136 , options . chunkSize ) : 16384 // min is 136
67
70
} )
68
71
69
72
this . _boundary = generateBoundary ( )
@@ -72,12 +75,11 @@ class Multipart extends Duplex {
72
75
this . buffer = Buffer . alloc ( this . chunkSize )
73
76
this . bufferOffset = 0
74
77
this . extraBytes = 0
78
+ this . sourceReadable = false
75
79
}
76
80
77
81
_read ( ) {
78
- if ( this . source && ! this . isPaused ( ) ) {
79
- this . source . resume ( )
80
- }
82
+ // empty read
81
83
}
82
84
83
85
_write ( file , encoding , callback ) {
@@ -87,30 +89,34 @@ class Multipart extends Duplex {
87
89
}
88
90
89
91
_final ( callback ) {
90
- this . pushChunk ( Buffer . from ( PADDING + this . _boundary + PADDING + NEW_LINE ) , true )
91
92
// Flush the rest and finish
92
- if ( this . bufferOffset && ! this . destroyed ) {
93
+ const tail = Buffer . from ( PADDING + this . _boundary + PADDING + NEW_LINE )
94
+ if ( this . chunkSize === 0 ) {
95
+ this . push ( tail )
96
+ } else {
97
+ this . extraBytes += tail . length
93
98
const slice = this . buffer . slice ( 0 , this . bufferOffset )
94
- this . push ( slice )
99
+
95
100
this . bufferOffset = 0
101
+ this . push ( Buffer . concat ( [ slice , tail ] , slice . length + tail . length ) )
96
102
}
103
+
97
104
this . push ( null )
98
105
callback ( )
99
106
}
100
107
101
- pauseAll ( ) {
102
- this . pause ( )
103
- if ( this . source ) {
104
- this . source . pause ( )
105
- }
106
- }
108
+ resume ( ) {
109
+ super . resume ( )
107
110
108
- resumeAll ( ) {
109
- this . resume ( )
110
- if ( this . source ) {
111
- this . source . resume ( )
111
+ // Chunked mode
112
+ if ( this . chunkSize > 0 && this . sourceReadable ) {
113
+ let chunk
114
+ while ( ! this . isPaused ( ) && ( chunk = this . source . read ( this . chunkSize - this . bufferOffset ) ) !== null ) {
115
+ this . pushChunk ( chunk )
116
+ }
112
117
}
113
118
}
119
+
114
120
/**
115
121
* Push chunk
116
122
*
@@ -119,7 +125,6 @@ class Multipart extends Duplex {
119
125
* @return {boolean }
120
126
*/
121
127
pushChunk ( chunk , isExtra = false ) {
122
- let result = true
123
128
if ( chunk === null ) {
124
129
return this . push ( null )
125
130
}
@@ -132,61 +137,65 @@ class Multipart extends Duplex {
132
137
this . extraBytes += chunk . length
133
138
}
134
139
135
- // If we have enough bytes in this chunk to get buffer up to chunkSize,
136
- // fill in buffer, push it, and reset its offset.
137
- // Otherwise, just copy the entire chunk in to buffer.
140
+ if ( this . bufferOffset === 0 && chunk . length === this . chunkSize ) {
141
+ return this . push ( chunk )
142
+ }
143
+
138
144
const bytesNeeded = ( this . chunkSize - this . bufferOffset )
139
- if ( chunk . length >= bytesNeeded ) {
140
- chunk . copy ( this . buffer , this . bufferOffset , 0 , bytesNeeded )
141
- result = this . push ( this . buffer )
145
+ // make sure we have the correct amount of bytes
146
+ if ( chunk . length === bytesNeeded ) {
147
+ // chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded)
148
+ const slice = this . buffer . slice ( 0 , this . bufferOffset )
142
149
this . bufferOffset = 0
143
- // Handle leftovers from the chunk
144
- const leftovers = chunk . slice ( 0 , chunk . length - bytesNeeded )
145
- let size = leftovers . length
146
- while ( size >= this . chunkSize ) {
147
- result = this . push ( chunk . slice ( this . bufferOffset , this . bufferOffset + this . chunkSize ) )
148
- this . bufferOffset += this . chunkSize
149
- size -= this . chunkSize
150
- }
151
- // if we still have anything left copy to the buffer
152
- chunk . copy ( this . buffer , 0 , this . bufferOffset , this . bufferOffset + size )
153
- this . bufferOffset = size
154
- } else {
155
- chunk . copy ( this . buffer , this . bufferOffset )
156
- this . bufferOffset += chunk . length
150
+ return this . push ( Buffer . concat ( [ slice , chunk ] , slice . length + chunk . length ) )
157
151
}
158
152
159
- return result
153
+ if ( chunk . length > bytesNeeded ) {
154
+ this . emit ( 'error' , new RangeError ( `Chunk is too big needed ${ bytesNeeded } got ${ chunk . length } ` ) )
155
+ return false
156
+ }
157
+
158
+ chunk . copy ( this . buffer , this . bufferOffset )
159
+ this . bufferOffset += chunk . length
160
+
161
+ return true
160
162
}
161
163
162
164
pushFile ( file , callback ) {
163
165
this . pushChunk ( leading ( file . headers , this . _boundary ) , true )
164
166
165
- let content = file . content || Buffer . alloc ( 0 )
167
+ this . source = file . content || Buffer . alloc ( 0 )
166
168
167
- if ( Buffer . isBuffer ( content ) ) {
168
- this . pushChunk ( content )
169
- this . pushChunk ( NEW_LINE_BUFFER , true )
170
- return callback ( )
169
+ if ( Buffer . isBuffer ( this . source ) ) {
170
+ this . source = bufferToStream ( this . source )
171
171
}
172
172
173
- if ( isSource ( content ) ) {
174
- content = toStream . source ( content )
173
+ if ( isSource ( file . content ) ) {
174
+ // pull-stream-to-stream doesn't support readable event...
175
+ this . source = pump ( [ pullToStream . source ( file . content ) , new PassThrough ( ) ] )
175
176
}
176
- this . source = content
177
177
178
- // From now on we assume content is a stream
179
- content . on ( 'data' , ( data ) => {
180
- if ( ! this . pushChunk ( data ) ) {
181
- content . pause ( )
178
+ this . source . on ( 'readable' , ( ) => {
179
+ this . sourceReadable = true
180
+ let chunk = null
181
+ if ( this . chunkSize === 0 ) {
182
+ if ( ( chunk = this . source . read ( ) ) !== null ) {
183
+ this . pushChunk ( chunk )
184
+ }
185
+ } else {
186
+ while ( ! this . isPaused ( ) && ( chunk = this . source . read ( this . chunkSize - this . bufferOffset ) ) !== null ) {
187
+ this . pushChunk ( chunk )
188
+ }
182
189
}
183
190
} )
184
- content . once ( 'error' , this . emit . bind ( this , 'error' ) )
185
191
186
- content . once ( 'end' , ( ) => {
192
+ this . source . on ( 'end' , ( ) => {
193
+ this . sourceReadable = false
187
194
this . pushChunk ( NEW_LINE_BUFFER , true )
188
195
callback ( )
189
196
} )
197
+
198
+ this . source . on ( 'error' , err => this . emit ( 'error' , err ) )
190
199
}
191
200
}
192
201
0 commit comments