Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 775213a

Browse files
committed
feat: add support for chunked uploads
1 parent 231c4d7 commit 775213a

File tree

7 files changed

+355
-22
lines changed

7 files changed

+355
-22
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"async": "^2.6.1",
3030
"big.js": "^5.1.2",
3131
"bs58": "^4.0.1",
32+
"buffer-to-stream": "^1.0.0",
3233
"cids": "~0.5.3",
3334
"concat-stream": "^1.6.2",
3435
"debug": "^3.1.0",
@@ -58,6 +59,7 @@
5859
"pump": "^3.0.0",
5960
"qs": "^6.5.2",
6061
"readable-stream": "^2.3.6",
62+
"readable-stream-node-to-web": "^1.0.1",
6163
"stream-http": "^2.8.3",
6264
"stream-to-pull-stream": "^1.7.2",
6365
"streamifier": "~0.1.1",

src/add2/add2.js

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
'use strict'
2+
3+
const { Readable, Transform } = require('stream')
4+
const toStream = require('buffer-to-stream')
5+
const pump = require('pump')
6+
const Multipart = require('./multipart2')
7+
const {prepareWithHeaders} = require('./../utils/prepare-file')
8+
9+
const arrayToStream = (data) => {
10+
let i = 0
11+
return new Readable({
12+
objectMode: true,
13+
read () {
14+
this.push(i < data.length ? data[i++] : null)
15+
}
16+
})
17+
}
18+
19+
const prepareTransform = (options) => new Transform({
20+
objectMode: true,
21+
transform (chunk, encoding, callback) {
22+
callback(null, prepareWithHeaders(chunk, options))
23+
}
24+
})
25+
26+
module.exports = (send) => (files, options) => {
27+
const multipart = new Multipart()
28+
29+
// add pump
30+
arrayToStream([].concat(files))
31+
.pipe(prepareTransform(options))
32+
.pipe(multipart)
33+
34+
return sendChunked(multipart, send, options)
35+
}
36+
37+
const sendChunked = (multipartStream, send, options) => {
38+
return new Promise((resolve, reject) => {
39+
const boundary = multipartStream._boundary
40+
let index = 0
41+
let rangeStart = 0
42+
let rangeEnd = 0
43+
let size = 0
44+
let ended = false
45+
let running = false
46+
const name = createName()
47+
48+
multipartStream.on('end', () => {
49+
ended = true
50+
console.log('end', size)
51+
52+
// if multipart already ended and no request is pending send last request
53+
if (!running) {
54+
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
55+
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size)
56+
.then(rsp => {
57+
resolve(rsp)
58+
})
59+
}
60+
})
61+
62+
multipartStream.on('data', (chunk) => {
63+
console.log('Sending ', chunk.length)
64+
multipartStream.pause()
65+
index++
66+
rangeEnd = rangeStart + chunk.length
67+
size += chunk.length
68+
running = true
69+
70+
// sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary)
71+
sendChunkRequest(send, options, chunk, index, rangeStart, rangeEnd, name, boundary)
72+
.then(rsp => {
73+
console.log('Response', rsp)
74+
rangeStart = rangeEnd
75+
multipartStream.resume()
76+
// if multipart already ended send last request
77+
if (ended) {
78+
console.log('sending last')
79+
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
80+
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size)
81+
.then(rsp => {
82+
resolve(rsp)
83+
})
84+
}
85+
running = false
86+
})
87+
.catch(reject)
88+
})
89+
})
90+
}
91+
92+
const sendChunk = (chunk, id, start, end, name, boundary, size = '*') => {
93+
const url = new URL('http://localhost')
94+
const search = new URLSearchParams()
95+
search.set('stream-channels', true)
96+
url.port = 5002
97+
url.pathname = 'api/v0/add-chunked'
98+
url.search = search
99+
100+
return window.fetch(url.href, {
101+
method: 'POST',
102+
body: chunk,
103+
headers: {
104+
'Content-Type': 'application/octet-stream',
105+
'Content-Range': `bytes ${start}-${end}/${size}`,
106+
'Ipfs-Chunk-Name': name,
107+
'Ipfs-Chunk-Id': id,
108+
'Ipfs-Chunk-Boundary': boundary
109+
}
110+
})
111+
.then(res => res.json())
112+
}
113+
114+
function createName () {
115+
const date = new Date(Date.now()).toISOString()
116+
function chr4 () {
117+
return Math.random().toString(16).slice(-4)
118+
}
119+
return date + '--' + chr4() + chr4() +
120+
'-' + chr4() +
121+
'-' + chr4() +
122+
'-' + chr4() +
123+
'-' + chr4() + chr4() + chr4()
124+
}
125+
126+
const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, size = '*') => {
127+
return new Promise((resolve, reject) => {
128+
const qs = {
129+
'cid-version': options['cid-version'],
130+
'raw-leaves': options['raw-leaves'],
131+
'only-hash': options.onlyHash,
132+
'wrap-with-directory': options.wrapWithDirectory,
133+
hash: options.hashAlg || options.hash
134+
}
135+
const args = {
136+
path: 'add-chunked',
137+
qs: qs,
138+
args: options.args,
139+
stream: true,
140+
// recursive: true,
141+
// progress: options.progress,
142+
headers: {
143+
'Content-Type': 'application/octet-stream',
144+
'Content-Range': `bytes ${start}-${end}/${size}`,
145+
'Ipfs-Chunk-Name': name,
146+
'Ipfs-Chunk-Id': id,
147+
'Ipfs-Chunk-Boundary': boundary
148+
}
149+
}
150+
151+
const req = send(args, (err, res) => {
152+
if (err) {
153+
return reject(err)
154+
}
155+
156+
resolve(res)
157+
})
158+
159+
pump(toStream(chunk), req)
160+
})
161+
}

src/add2/multipart2.js

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
'use strict'
2+
3+
const { Duplex } = require('stream')
4+
const { isSource } = require('is-pull-stream')
5+
const toStream = require('pull-stream-to-stream')
6+
7+
const PADDING = '--'
8+
const NEW_LINE = '\r\n'
9+
const NEW_LINE_BUFFER = Buffer.from(NEW_LINE)
10+
11+
const generateBoundary = () => {
12+
var boundary = '--------------------------'
13+
for (var i = 0; i < 24; i++) {
14+
boundary += Math.floor(Math.random() * 10).toString(16)
15+
}
16+
17+
return boundary
18+
}
19+
20+
const leading = (headers = {}, boundary) => {
21+
var leading = [PADDING + boundary]
22+
23+
Object.keys(headers).forEach((header) => {
24+
leading.push(header + ': ' + headers[header])
25+
})
26+
27+
leading.push('')
28+
leading.push('')
29+
30+
const leadingStr = leading.join(NEW_LINE)
31+
32+
return Buffer.from(leadingStr)
33+
}
34+
35+
class Multipart extends Duplex {
36+
constructor (options) {
37+
super(Object.assign({}, options, { writableObjectMode: true, writableHighWaterMark: 1 }))
38+
39+
this._boundary = generateBoundary()
40+
this.source = null
41+
this.chunkSize = 256000
42+
this.buffer = Buffer.alloc(this.chunkSize)
43+
this.bufferOffset = 0
44+
this.running = true
45+
}
46+
47+
_read () {
48+
if (this.source) {
49+
this.source.resume()
50+
}
51+
}
52+
53+
_write (file, encoding, callback) {
54+
this.pushFile(file, () => {
55+
this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE))
56+
callback()
57+
})
58+
}
59+
60+
_final (callback) {
61+
// Flush the rest and finish
62+
if (this.bufferOffset) {
63+
this.push(this.buffer.slice(0, this.bufferOffset))
64+
this.bufferOffset = 0
65+
}
66+
this.running = false
67+
this.push(null)
68+
callback()
69+
}
70+
71+
pushChunk (chunk) {
72+
const bytesNeeded = (this.chunkSize - this.bufferOffset)
73+
let result = true
74+
if (chunk === null) {
75+
return this.push(null)
76+
}
77+
78+
// If we have enough bytes in this chunk to get buffer up to chunkSize,
79+
// fill in buffer, push it, and reset its offset.
80+
// Otherwise, just copy the entire chunk in to buffer.
81+
if (chunk.length >= bytesNeeded) {
82+
chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded)
83+
result = this.push(this.buffer)
84+
this.bufferOffset = 0
85+
// Handle leftovers from the chunk
86+
const leftovers = chunk.slice(0, chunk.length - bytesNeeded)
87+
let size = leftovers.length
88+
while (size >= this.chunkSize) {
89+
result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize))
90+
this.bufferOffset += this.chunkSize
91+
size -= this.chunkSize
92+
}
93+
// if we still have anything left copy to the buffer
94+
chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size)
95+
this.bufferOffset = size
96+
} else {
97+
chunk.copy(this.buffer, this.bufferOffset)
98+
this.bufferOffset += chunk.length
99+
}
100+
101+
return result
102+
}
103+
104+
pushFile (file, callback) {
105+
this.pushChunk(leading(file.headers, this._boundary))
106+
107+
let content = file.content || Buffer.alloc(0)
108+
109+
if (Buffer.isBuffer(content)) {
110+
this.pushChunk(content)
111+
this.pushChunk(NEW_LINE_BUFFER)
112+
return callback() // early
113+
}
114+
115+
if (isSource(content)) {
116+
content = toStream.source(content)
117+
}
118+
this.source = content
119+
// From now on we assume content is a stream
120+
121+
content.on('data', (data) => {
122+
if (!this.pushChunk(data)) {
123+
content.pause()
124+
}
125+
})
126+
content.once('error', this.emit.bind(this, 'error'))
127+
128+
content.once('end', () => {
129+
this.pushChunk(NEW_LINE_BUFFER)
130+
callback()
131+
})
132+
}
133+
}
134+
135+
module.exports = Multipart

src/utils/load-commands.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ function requireCommands () {
6464
addFromStream: require('../files/add')(send),
6565
addFromURL: require('../util/url-add')(send),
6666
getEndpointConfig: require('../util/get-endpoint-config')(config),
67+
add2: require('./../add2/add2')(send),
6768
crypto: require('libp2p-crypto'),
6869
isIPFS: require('is-ipfs')
6970
}

0 commit comments

Comments
 (0)