1
1
'use strict'
2
2
3
3
const { Readable, Transform } = require ( 'stream' )
4
- const toStream = require ( 'buffer-to-stream' )
5
4
const pump = require ( 'pump' )
6
5
const Multipart = require ( './multipart2' )
7
6
const { prepareWithHeaders} = require ( './../utils/prepare-file' )
@@ -24,106 +23,101 @@ const prepareTransform = (options) => new Transform({
24
23
} )
25
24
26
25
module . exports = ( send ) => ( files , options ) => {
27
- const multipart = new Multipart ( )
28
-
29
- // add pump
30
- arrayToStream ( [ ] . concat ( files ) )
31
- . pipe ( prepareTransform ( options ) )
32
- . pipe ( multipart )
26
+ const multipart = pump (
27
+ arrayToStream ( [ ] . concat ( files ) ) ,
28
+ prepareTransform ( options ) ,
29
+ new Multipart ( options ) ,
30
+ ( err ) => {
31
+ if ( err ) {
32
+ // probably better to create a rejected Promise to return
33
+ console . error ( err )
34
+ }
35
+ }
36
+ )
33
37
34
38
return sendChunked ( multipart , send , options )
35
39
}
36
40
37
41
const sendChunked = ( multipartStream , send , options ) => {
38
42
return new Promise ( ( resolve , reject ) => {
39
- const boundary = multipartStream . _boundary
40
- let index = 0
41
- let rangeStart = 0
42
- let rangeEnd = 0
43
- let size = 0
44
- let ended = false
45
- let running = false
46
- const name = createName ( )
43
+ const state = {
44
+ boundary : multipartStream . _boundary ,
45
+ id : uuid ( ) ,
46
+ index : 0 ,
47
+ rangeStart : 0 ,
48
+ rangeEnd : 0 ,
49
+ rangeTotal : 0 ,
50
+ ended : false ,
51
+ running : false
52
+ }
47
53
54
+ multipartStream . on ( 'error' , reject )
48
55
multipartStream . on ( 'end' , ( ) => {
49
- ended = true
50
- console . log ( 'end' , size )
56
+ state . ended = true
57
+ console . log ( 'end' , state . rangeTotal )
51
58
52
59
// multipart ended and no request is running send last request
53
- if ( ! running ) {
54
- // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
55
- sendChunkRequest ( send , options , '' , - 1 , rangeEnd , rangeEnd , name , boundary , size )
56
- . then ( rsp => {
57
- resolve ( rsp )
58
- } )
60
+ if ( ! state . running ) {
61
+ sendChunkRequest ( send , options , '' , state )
62
+ . then ( resolve )
63
+ . catch ( reject )
59
64
}
60
65
} )
61
66
62
67
multipartStream . on ( 'data' , ( chunk ) => {
63
68
console . log ( 'Sending ' , chunk . length )
69
+ // stop producing chunks
64
70
multipartStream . pause ( )
65
- index ++
66
- rangeEnd = rangeStart + chunk . length
67
- size += chunk . length
68
- running = true
71
+ state . index ++
72
+ state . rangeEnd = state . rangeStart + chunk . length
73
+ state . rangeTotal += chunk . length
74
+ state . running = true
69
75
70
- // sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary)
71
- sendChunkRequest ( send , options , chunk , index , rangeStart , rangeEnd , name , boundary )
76
+ sendChunkRequest ( send , options , chunk , state )
72
77
. then ( rsp => {
73
78
console . log ( 'Response' , rsp )
74
- rangeStart = rangeEnd
79
+ state . running = false
80
+ state . rangeStart = state . rangeEnd
81
+ // resume producing chunks
75
82
multipartStream . resume ( )
83
+
76
84
// if multipart already ended send last request
77
- if ( ended ) {
78
- console . log ( 'sending last' )
79
- // sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
80
- sendChunkRequest ( send , options , '' , - 1 , rangeEnd , rangeEnd , name , boundary , size )
81
- . then ( rsp => {
82
- resolve ( rsp )
83
- } )
85
+ if ( state . ended ) {
86
+ return sendChunkRequest ( send , options , '' , state )
87
+ . then ( resolve )
84
88
}
85
- running = false
86
89
} )
87
90
. catch ( reject )
88
91
} )
89
92
} )
90
93
}
91
94
92
- const sendChunk = ( chunk , id , start , end , name , boundary , size = '*' ) => {
93
- const url = new URL ( 'http://localhost' )
94
- const search = new URLSearchParams ( )
95
- search . set ( 'stream-channels' , true )
96
- url . port = 5002
97
- url . pathname = 'api/v0/add-chunked'
98
- url . search = search
99
-
100
- return window . fetch ( url . href , {
101
- method : 'POST' ,
102
- body : chunk ,
103
- headers : {
104
- 'Content-Type' : 'application/octet-stream' ,
105
- 'Content-Range' : `bytes ${ start } -${ end } /${ size } ` ,
106
- 'Ipfs-Chunk-Name' : name ,
107
- 'Ipfs-Chunk-Id' : id ,
108
- 'Ipfs-Chunk-Boundary' : boundary
109
- }
110
- } )
111
- . then ( res => res . json ( ) )
112
- }
113
-
114
- function createName ( ) {
115
- const date = new Date ( Date . now ( ) ) . toISOString ( )
95
+ /**
96
+ * Poor man's uuid
97
+ *
98
+ * @returns {String }
99
+ */
100
+ function uuid ( ) {
116
101
function chr4 ( ) {
117
102
return Math . random ( ) . toString ( 16 ) . slice ( - 4 )
118
103
}
119
- return date + '--' + chr4 ( ) + chr4 ( ) +
104
+ return chr4 ( ) + chr4 ( ) +
120
105
'-' + chr4 ( ) +
121
106
'-' + chr4 ( ) +
122
107
'-' + chr4 ( ) +
123
108
'-' + chr4 ( ) + chr4 ( ) + chr4 ( )
124
109
}
125
110
126
- const sendChunkRequest = ( send , options , chunk , id , start , end , name , boundary , size = '*' ) => {
111
+ /**
112
+ * Send http request
113
+ *
114
+ * @param {function } send
115
+ * @param {Object } options - http request options
116
+ * @param {Uint8Array } chunk - chunk to send
117
+ * @param {Object } {id, start, end, name, boundary, size = '*' } - uploading session state
118
+ * @returns {Promise }
119
+ */
120
+ const sendChunkRequest = ( send , options , chunk , { boundary, id, index, rangeStart, rangeEnd, rangeTotal = '*' } ) => {
127
121
return new Promise ( ( resolve , reject ) => {
128
122
const qs = {
129
123
'cid-version' : options [ 'cid-version' ] ,
@@ -141,21 +135,21 @@ const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary,
141
135
progress : options . progress ,
142
136
headers : {
143
137
'Content-Type' : 'application/octet-stream' ,
144
- 'Content-Range' : `bytes ${ start } -${ end } /${ size } ` ,
145
- 'Ipfs-Chunk-Name ' : name ,
146
- 'Ipfs-Chunk-Id ' : id ,
147
- 'Ipfs-Chunk-Boundary' : boundary
138
+ 'Content-Range' : `bytes ${ rangeStart } -${ rangeEnd } /${ rangeTotal } ` ,
139
+ 'X- Ipfs-Chunk-Group-Uuid ' : id ,
140
+ 'X- Ipfs-Chunk-Index ' : index ,
141
+ 'X- Ipfs-Chunk-Boundary' : boundary
148
142
}
149
143
}
150
144
151
145
const req = send ( args , ( err , res ) => {
152
146
if ( err ) {
153
147
return reject ( err )
154
148
}
155
-
156
149
resolve ( res )
157
150
} )
158
151
152
+ // write and send
159
153
req . write ( Buffer . from ( chunk ) )
160
154
req . end ( )
161
155
} )
0 commit comments