diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9a2b8d111..09ba96f1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+
+# [17.1.0](https://github.com/ipfs/js-ipfs-api/compare/v17.0.1...v17.1.0) (2017-11-20)
+
+
+### Features
+
+* send files HTTP request should stream ([#629](https://github.com/ipfs/js-ipfs-api/issues/629)) ([dae62cb](https://github.com/ipfs/js-ipfs-api/commit/dae62cb))
+
+
+
## [17.0.1](https://github.com/ipfs/js-ipfs-api/compare/v17.0.0...v17.0.1) (2017-11-20)
diff --git a/package.json b/package.json
index 444aff8ae..08581e219 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "ipfs-api",
- "version": "17.0.1",
+ "version": "17.1.0",
"description": "A client library for the IPFS HTTP API. Follows interface-ipfs-core spec",
"main": "src/index.js",
"browser": {
@@ -38,7 +38,6 @@
"lru-cache": "^4.1.1",
"multiaddr": "^3.0.1",
"multihashes": "~0.4.12",
- "multipart-stream": "^2.0.1",
"ndjson": "^1.5.0",
"once": "^1.4.0",
"peer-id": "~0.10.2",
diff --git a/src/block/put.js b/src/block/put.js
index d27f95314..0ec1e5c5f 100644
--- a/src/block/put.js
+++ b/src/block/put.js
@@ -3,34 +3,35 @@
const promisify = require('promisify-es6')
const Block = require('ipfs-block')
const CID = require('cids')
+const once = require('once')
+const SendOneFile = require('../utils/send-one-file')
module.exports = (send) => {
- return promisify((block, cid, callback) => {
+ const sendOneFile = SendOneFile(send, 'block/put')
+
+ return promisify((block, cid, _callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
if (typeof cid === 'function') {
- callback = cid
+ _callback = cid
cid = {}
}
+ const callback = once(_callback)
+
if (Array.isArray(block)) {
- const err = new Error('block.put() only accepts 1 file')
- return callback(err)
+ return callback(new Error('block.put accepts only one block'))
}
if (typeof block === 'object' && block.data) {
block = block.data
}
- const request = {
- path: 'block/put',
- files: block
- }
-
- // Transform the response to a Block
- const transform = (info, callback) => {
- callback(null, new Block(block, new CID(info.Key)))
- }
+ sendOneFile(block, {}, (err, result) => {
+ if (err) {
+ return callback(err) // early
+ }
- send.andTransform(request, transform, callback)
+ callback(null, new Block(block, new CID(result.Key)))
+ })
})
}
diff --git a/src/files/add-pull-stream.js b/src/files/add-pull-stream.js
index 8c23abc23..daf050de8 100644
--- a/src/files/add-pull-stream.js
+++ b/src/files/add-pull-stream.js
@@ -1,30 +1,6 @@
'use strict'
-const addCmd = require('./add.js')
-const pull = require('pull-stream')
-const pushable = require('pull-pushable')
+const SendFilesStream = require('../utils/send-files-stream')
+const toPull = require('stream-to-pull-stream')
-module.exports = (send) => {
- const add = addCmd(send)
-
- return (options) => {
- options = options || {}
-
- const source = pushable()
- const sink = pull.collect((err, tuples) => {
- if (err) { return source.end(err) }
-
- add(tuples, options, (err, filesAdded) => {
- if (err) { return source.end(err) }
-
- filesAdded.forEach((file) => source.push(file))
- source.end()
- })
- })
-
- return {
- sink: sink,
- source: source
- }
- }
-}
+module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options))
diff --git a/src/files/add-readable-stream.js b/src/files/add-readable-stream.js
index cb4364d1d..b3e03d4e8 100644
--- a/src/files/add-readable-stream.js
+++ b/src/files/add-readable-stream.js
@@ -1,31 +1,5 @@
'use strict'
-const addCmd = require('./add.js')
-const Duplex = require('readable-stream').Duplex
+const SendFilesStream = require('../utils/send-files-stream')
-module.exports = (send) => {
- const add = addCmd(send)
-
- return (options) => {
- options = options || {}
-
- const tuples = []
-
- const ds = new Duplex({ objectMode: true })
- ds._read = (n) => {}
-
- ds._write = (file, enc, next) => {
- tuples.push(file)
- next()
- }
-
- ds.end = () => add(tuples, options, (err, res) => {
- if (err) { return ds.emit('error', err) }
-
- res.forEach((tuple) => ds.push(tuple))
- ds.push(null)
- })
-
- return ds
- }
-}
+module.exports = (send) => SendFilesStream(send, 'add')
diff --git a/src/files/add.js b/src/files/add.js
index 324a49416..6b54bb03d 100644
--- a/src/files/add.js
+++ b/src/files/add.js
@@ -1,51 +1,42 @@
'use strict'
-const isStream = require('is-stream')
const promisify = require('promisify-es6')
-const ProgressStream = require('../utils/progress-stream')
-const converter = require('../utils/converter')
+const ConcatStream = require('concat-stream')
+const once = require('once')
+const isStream = require('is-stream')
+const SendFilesStream = require('../utils/send-files-stream')
module.exports = (send) => {
- return promisify((files, opts, callback) => {
- if (typeof opts === 'function') {
- callback = opts
- opts = {}
- }
+ const createAddStream = SendFilesStream(send, 'add')
- opts = opts || {}
-
- const ok = Buffer.isBuffer(files) ||
- isStream.readable(files) ||
- Array.isArray(files)
-
- if (!ok) {
- return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
+ return promisify((_files, options, _callback) => {
+ if (typeof options === 'function') {
+ _callback = options
+ options = null
}
- const qs = {}
+ const callback = once(_callback)
- if (opts['cid-version'] != null) {
- qs['cid-version'] = opts['cid-version']
- } else if (opts.cidVersion != null) {
- qs['cid-version'] = opts.cidVersion
+ if (!options) {
+ options = {}
}
- if (opts['raw-leaves'] != null) {
- qs['raw-leaves'] = opts['raw-leaves']
- } else if (opts.rawLeaves != null) {
- qs['raw-leaves'] = opts.rawLeaves
- }
+ const ok = Buffer.isBuffer(_files) ||
+ isStream.readable(_files) ||
+ Array.isArray(_files)
- if (opts.hash != null) {
- qs.hash = opts.hash
- } else if (opts.hashAlg != null) {
- qs.hash = opts.hashAlg
+ if (!ok) {
+ return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
}
- const request = { path: 'add', files: files, qs: qs, progress: opts.progress }
+ const files = [].concat(_files)
+
+ const stream = createAddStream(options)
+ const concat = ConcatStream((result) => callback(null, result))
+ stream.once('error', callback)
+ stream.pipe(concat)
- send.andTransform(request, (response, cb) => {
- converter(ProgressStream.fromStream(opts.progress, response), cb)
- }, callback)
+ files.forEach((file) => stream.write(file))
+ stream.end()
})
}
diff --git a/src/files/write.js b/src/files/write.js
index 17c7ce8df..5e9efa03b 100644
--- a/src/files/write.js
+++ b/src/files/write.js
@@ -1,28 +1,42 @@
'use strict'
const promisify = require('promisify-es6')
+const concatStream = require('concat-stream')
+const once = require('once')
+const SendFilesStream = require('../utils/send-files-stream')
module.exports = (send) => {
- return promisify((pathDst, files, opts, callback) => {
+ const sendFilesStream = SendFilesStream(send, 'files/write')
+
+ return promisify((pathDst, _files, opts, _callback) => {
if (typeof opts === 'function' &&
- !callback) {
- callback = opts
+ !_callback) {
+ _callback = opts
opts = {}
}
// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' &&
- typeof callback === 'function') {
- callback = opts
+ typeof _callback === 'function') {
+ _callback = opts
opts = {}
}
- send({
- path: 'files/write',
+ const files = [].concat(_files)
+ const callback = once(_callback)
+
+ const options = {
args: pathDst,
- qs: opts,
- files: files
- }, callback)
+ qs: opts
+ }
+
+ const stream = sendFilesStream(options)
+ const concat = concatStream((result) => callback(null, result))
+ stream.once('error', callback)
+ stream.pipe(concat)
+
+ files.forEach((file) => stream.write(file))
+ stream.end()
})
}
diff --git a/src/index.js b/src/index.js
index 4e6443cf3..b56a521a9 100644
--- a/src/index.js
+++ b/src/index.js
@@ -3,7 +3,7 @@
const multiaddr = require('multiaddr')
const loadCommands = require('./utils/load-commands')
const getConfig = require('./utils/default-config')
-const getRequestAPI = require('./utils/request-api')
+const sendRequest = require('./utils/send-request')
function IpfsAPI (hostOrMultiaddr, port, opts) {
const config = getConfig()
@@ -35,7 +35,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
config.port = split[1]
}
- const requestAPI = getRequestAPI(config)
+ const requestAPI = sendRequest(config)
const cmds = loadCommands(requestAPI)
cmds.send = requestAPI
cmds.Buffer = Buffer
diff --git a/src/object/appendData.js b/src/object/appendData.js
index daf2a572d..fa3783909 100644
--- a/src/object/appendData.js
+++ b/src/object/appendData.js
@@ -1,16 +1,20 @@
'use strict'
const promisify = require('promisify-es6')
+const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
+const SendOneFile = require('../utils/send-one-file')
module.exports = (send) => {
const objectGet = require('./get')(send)
+ const sendOneFile = SendOneFile(send, 'object/patch/append-data')
- return promisify((multihash, data, opts, callback) => {
+ return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
- callback = opts
+ _callback = opts
opts = {}
}
+ const callback = once(_callback)
if (!opts) {
opts = {}
}
@@ -21,14 +25,11 @@ module.exports = (send) => {
return callback(err)
}
- send({
- path: 'object/patch/append-data',
- args: [multihash],
- files: data
- }, (err, result) => {
+ sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}
+
objectGet(result.Hash, { enc: 'base58' }, callback)
})
})
diff --git a/src/object/put.js b/src/object/put.js
index 537c9d34f..1fd6aca11 100644
--- a/src/object/put.js
+++ b/src/object/put.js
@@ -9,13 +9,20 @@ const lruOptions = {
}
const cache = LRU(lruOptions)
+const SendOneFile = require('../utils/send-one-file')
+const once = require('once')
module.exports = (send) => {
- return promisify((obj, options, callback) => {
+ const sendOneFile = SendOneFile(send, 'object/put')
+
+ return promisify((obj, options, _callback) => {
if (typeof options === 'function') {
- callback = options
+ _callback = options
options = {}
}
+
+ const callback = once(_callback)
+
if (!options) {
options = {}
}
@@ -56,13 +63,13 @@ module.exports = (send) => {
}
const enc = options.enc || 'json'
- send({
- path: 'object/put',
- qs: { inputenc: enc },
- files: buf
- }, (err, result) => {
+ const sendOptions = {
+ qs: { inputenc: enc }
+ }
+
+ sendOneFile(buf, sendOptions, (err, result) => {
if (err) {
- return callback(err)
+ return callback(err) // early
}
if (Buffer.isBuffer(obj)) {
diff --git a/src/object/setData.js b/src/object/setData.js
index a7380df60..a4296dddd 100644
--- a/src/object/setData.js
+++ b/src/object/setData.js
@@ -1,16 +1,20 @@
'use strict'
const promisify = require('promisify-es6')
+const once = require('once')
const cleanMultihash = require('../utils/clean-multihash')
+const SendOneFile = require('../utils/send-one-file')
module.exports = (send) => {
const objectGet = require('./get')(send)
+ const sendOneFile = SendOneFile(send, 'object/patch/set-data')
- return promisify((multihash, data, opts, callback) => {
+ return promisify((multihash, data, opts, _callback) => {
if (typeof opts === 'function') {
- callback = opts
+ _callback = opts
opts = {}
}
+ const callback = once(_callback)
if (!opts) {
opts = {}
}
@@ -21,11 +25,7 @@ module.exports = (send) => {
return callback(err)
}
- send({
- path: 'object/patch/set-data',
- args: [multihash],
- files: data
- }, (err, result) => {
+ sendOneFile(data, { args: [multihash] }, (err, result) => {
if (err) {
return callback(err)
}
diff --git a/src/util/fs-add.js b/src/util/fs-add.js
index e305069fd..8a3ea404f 100644
--- a/src/util/fs-add.js
+++ b/src/util/fs-add.js
@@ -2,11 +2,11 @@
const isNode = require('detect-node')
const promisify = require('promisify-es6')
-const converter = require('../utils/converter')
const moduleConfig = require('../utils/module-config')
+const SendOneFile = require('../utils/send-one-file-multiple-results')
module.exports = (arg) => {
- const send = moduleConfig(arg)
+ const sendOneFile = SendOneFile(moduleConfig(arg), 'add')
return promisify((path, opts, callback) => {
if (typeof opts === 'function' &&
@@ -31,7 +31,6 @@ module.exports = (arg) => {
return callback(new Error('"path" must be a string'))
}
- const request = { path: 'add', files: path, qs: opts }
- send.andTransform(request, converter, callback)
+ sendOneFile(path, { qs: opts }, callback)
})
}
diff --git a/src/util/url-add.js b/src/util/url-add.js
index a9889e64f..3caf11cb2 100644
--- a/src/util/url-add.js
+++ b/src/util/url-add.js
@@ -1,14 +1,13 @@
'use strict'
const promisify = require('promisify-es6')
-const once = require('once')
const parseUrl = require('url').parse
const request = require('../utils/request')
-const converter = require('../utils/converter')
const moduleConfig = require('../utils/module-config')
+const SendOneFile = require('../utils/send-one-file-multiple-results')
module.exports = (arg) => {
- const send = moduleConfig(arg)
+ const sendOneFile = SendOneFile(moduleConfig(arg), 'add')
return promisify((url, opts, callback) => {
if (typeof (opts) === 'function' &&
@@ -25,19 +24,17 @@ module.exports = (arg) => {
opts = {}
}
- callback = once(callback)
-
if (!validUrl(url)) {
return callback(new Error('"url" param must be an http(s) url'))
}
- requestWithRedirect(url, opts, send, callback)
+ requestWithRedirect(url, opts, sendOneFile, callback)
})
}
const validUrl = (url) => typeof url === 'string' && url.startsWith('http')
-const requestWithRedirect = (url, opts, send, callback) => {
+const requestWithRedirect = (url, opts, sendOneFile, callback) => {
request(parseUrl(url).protocol)(url, (res) => {
res.once('error', callback)
if (res.statusCode >= 400) {
@@ -50,11 +47,9 @@ const requestWithRedirect = (url, opts, send, callback) => {
if (!validUrl(redirection)) {
return callback(new Error('redirection url must be an http(s) url'))
}
- requestWithRedirect(redirection, opts, send, callback)
+ requestWithRedirect(redirection, opts, sendOneFile, callback)
} else {
- const request = { path: 'add', files: res, qs: opts }
-
- send.andTransform(request, converter, callback)
+ sendOneFile(res, { qs: opts }, callback)
}
}).end()
}
diff --git a/src/utils/converter.js b/src/utils/converter.js
index b372e0d09..444064bf3 100644
--- a/src/utils/converter.js
+++ b/src/utils/converter.js
@@ -26,18 +26,20 @@ const streamToValue = require('./stream-to-value')
*/
class ConverterStream extends TransformStream {
constructor (options) {
- const opts = Object.assign(options || {}, { objectMode: true })
+ const opts = Object.assign({}, options || {}, { objectMode: true })
super(opts)
}
_transform (obj, enc, callback) {
- this.push({
+ if (!obj.Hash) {
+ return callback()
+ }
+
+ callback(null, {
path: obj.Name,
hash: obj.Hash,
size: parseInt(obj.Size, 10)
})
-
- callback(null)
}
}
@@ -54,4 +56,5 @@ function converter (inputStream, callback) {
streamToValue(outputStream, callback)
}
-module.exports = converter
+exports = module.exports = converter
+exports.ConverterStream = ConverterStream
diff --git a/src/utils/module-config.js b/src/utils/module-config.js
index a05b4d3ab..4e1b0e6a1 100644
--- a/src/utils/module-config.js
+++ b/src/utils/module-config.js
@@ -1,7 +1,7 @@
'use strict'
const getConfig = require('./default-config')
-const requestAPI = require('./request-api')
+const sendRequest = require('./send-request')
const multiaddr = require('multiaddr')
module.exports = (arg) => {
@@ -10,12 +10,12 @@ module.exports = (arg) => {
if (typeof arg === 'function') {
return arg
} else if (typeof arg === 'object') {
- return requestAPI(arg)
+ return sendRequest(arg)
} else if (typeof arg === 'string') {
const maddr = multiaddr(arg).nodeAddress()
config.host = maddr.address
config.port = maddr.port
- return requestAPI(config)
+ return sendRequest(config)
} else {
throw new Error('Argument must be a send function or a config object.')
}
diff --git a/src/utils/multipart.js b/src/utils/multipart.js
new file mode 100644
index 000000000..bae39e141
--- /dev/null
+++ b/src/utils/multipart.js
@@ -0,0 +1,122 @@
+'use strict'
+
+const Transform = require('stream').Transform
+const isNode = require('detect-node')
+
+const PADDING = '--'
+const NEW_LINE = '\r\n'
+const NEW_LINE_BUFFER = Buffer.from(NEW_LINE)
+
+class Multipart extends Transform {
+ constructor (options) {
+ super(Object.assign({}, options, { objectMode: true, highWaterMark: 1 }))
+
+ this._boundary = this._generateBoundary()
+ this._files = []
+ this._draining = false
+ }
+
+ _flush () {
+ this.push(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE))
+ this.push(null)
+ }
+
+ _generateBoundary () {
+ var boundary = '--------------------------'
+ for (var i = 0; i < 24; i++) {
+ boundary += Math.floor(Math.random() * 10).toString(16)
+ }
+
+ return boundary
+ }
+
+ _transform (file, encoding, callback) {
+ if (Buffer.isBuffer(file)) {
+ this.push(file)
+ return callback() // early
+ }
+ // not a buffer, must be a file
+ this._files.push(file)
+ this._maybeDrain(callback)
+ }
+
+ _maybeDrain (callback) {
+ if (!this._draining) {
+ if (this._files.length) {
+ this._draining = true
+ const file = this._files.shift()
+ this._pushFile(file, (err) => {
+ this._draining = false
+ if (err) {
+ this.emit('error', err)
+ } else {
+ this._maybeDrain(callback)
+ }
+ })
+ } else {
+ this.emit('drained all files')
+ callback()
+ }
+ } else {
+ this.once('drained all files', callback)
+ }
+ }
+
+ _pushFile (file, callback) {
+ const leading = this._leading(file.headers || {})
+
+ this.push(leading)
+
+ let content = file.content || Buffer.alloc(0)
+
+ if (Buffer.isBuffer(content)) {
+ this.push(content)
+ this.push(NEW_LINE_BUFFER)
+ return callback() // early
+ }
+
+ // From now on we assume content is a stream
+
+ content.once('error', this.emit.bind(this, 'error'))
+
+ content.once('end', () => {
+ this.push(NEW_LINE_BUFFER)
+ callback()
+
+ // TODO: backpressure!!! wait once self is drained so we can proceed
+ // This does not work
+ // this.once('drain', () => {
+ // callback()
+ // })
+ })
+
+ content.on('data', (data) => {
+ const drained = this.push(data)
+ // Only do the drain dance on Node.js.
+ // In browserland, the underlying stream
+ // does NOT drain because the request is only sent
+ // once this stream ends.
+ if (!drained && isNode) {
+ content.pause()
+ this.once('drain', () => content.resume())
+ }
+ })
+ }
+
+ _leading (headers) {
+ var leading = [PADDING + this._boundary]
+
+ Object.keys(headers).forEach((header) => {
+ leading.push(header + ': ' + headers[header])
+ })
+
+ leading.push('')
+ leading.push('')
+
+ const leadingStr = leading.join(NEW_LINE)
+
+ return Buffer.from(leadingStr)
+ }
+}
+
+module.exports = Multipart
diff --git a/src/utils/get-files-stream.js b/src/utils/prepare-file.js
similarity index 76%
rename from src/utils/get-files-stream.js
rename to src/utils/prepare-file.js
index 8a44c8f75..988903df7 100644
--- a/src/utils/get-files-stream.js
+++ b/src/utils/prepare-file.js
@@ -1,28 +1,9 @@
'use strict'
const isNode = require('detect-node')
-const Multipart = require('multipart-stream')
const flatmap = require('flatmap')
const escape = require('glob-escape')
-function headers (file) {
- const name = file.path
- ? encodeURIComponent(file.path)
- : ''
-
- const header = { 'Content-Disposition': `file; filename="${name}"` }
-
- if (file.dir || !file.content) {
- header['Content-Type'] = 'application/x-directory'
- } else if (file.symlink) {
- header['Content-Type'] = 'application/symlink'
- } else {
- header['Content-Type'] = 'application/octet-stream'
- }
-
- return header
-}
-
function strip (name, base) {
const smallBase = base
.split('/')
@@ -99,14 +80,10 @@ function loadPaths (opts, file) {
}
}
-function getFilesStream (files, opts) {
- if (!files) {
- return null
- }
-
- const mp = new Multipart()
+function prepareFile (file, opts) {
+ let files = [].concat(file)
- flatmap(files, (file) => {
+ return flatmap(files, (file) => {
if (typeof file === 'string') {
if (!isNode) {
throw new Error('Can not add paths in node')
@@ -130,14 +107,7 @@ function getFilesStream (files, opts) {
dir: false,
content: file
}
- }).forEach((file) => {
- mp.addPart({
- headers: headers(file),
- body: file.content
- })
})
-
- return mp
}
-exports = module.exports = getFilesStream
+exports = module.exports = prepareFile
diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js
new file mode 100644
index 000000000..0a40c6445
--- /dev/null
+++ b/src/utils/send-files-stream.js
@@ -0,0 +1,151 @@
+'use strict'
+
+const Duplex = require('stream').Duplex
+const eachSeries = require('async/eachSeries')
+const isStream = require('is-stream')
+const once = require('once')
+const prepareFile = require('./prepare-file')
+const Multipart = require('./multipart')
+const Converter = require('./converter').ConverterStream
+
+function headers (file) {
+ const name = file.path
+ ? encodeURIComponent(file.path)
+ : ''
+
+ const header = { 'Content-Disposition': `file; filename="${name}"` }
+
+ if (!file.content) {
+ header['Content-Type'] = 'application/x-directory'
+ } else if (file.symlink) {
+ header['Content-Type'] = 'application/symlink'
+ } else {
+ header['Content-Type'] = 'application/octet-stream'
+ }
+
+ return header
+}
+
+module.exports = (send, path) => {
+ return (options) => {
+ let request
+ let ended = false
+ let writing = false
+
+ if (!options) {
+ options = {}
+ }
+
+ const multipart = new Multipart()
+
+ const retStream = new Duplex({ objectMode: true })
+
+ retStream._read = (n) => {}
+
+ retStream._write = (file, enc, _next) => {
+ const next = once(_next)
+ try {
+ const files = prepareFile(file, Object.assign({}, options, options.qs)).map(
+ (file) => Object.assign({headers: headers(file)}, file))
+
+ writing = true
+ eachSeries(
+ files,
+ (file, cb) => multipart.write(file, enc, cb),
+ (err) => {
+ writing = false
+ if (err) {
+ return next(err)
+ }
+ if (ended) {
+ multipart.end()
+ }
+ next()
+ })
+ } catch (err) {
+ next(err)
+ }
+ }
+
+ retStream.once('finish', () => {
+ if (!ended) {
+ ended = true
+ if (!writing) {
+ multipart.end()
+ }
+ }
+ })
+
+ const qs = options.qs || {}
+
+ if (options['cid-version'] != null) {
+ qs['cid-version'] = options['cid-version']
+ } else if (options.cidVersion != null) {
+ qs['cid-version'] = options.cidVersion
+ }
+
+ if (options['raw-leaves'] != null) {
+ qs['raw-leaves'] = options['raw-leaves']
+ } else if (options.rawLeaves != null) {
+ qs['raw-leaves'] = options.rawLeaves
+ }
+
+ if (options.hash != null) {
+ qs.hash = options.hash
+ } else if (options.hashAlg != null) {
+ qs.hash = options.hashAlg
+ }
+
+ const args = {
+ path: path,
+ qs: qs,
+ args: options.args,
+ multipart: true,
+ multipartBoundary: multipart._boundary,
+ stream: true,
+ recursive: true,
+ progress: options.progress
+ }
+
+ multipart.on('error', (err) => {
+ retStream.emit('error', err)
+ })
+
+ request = send(args, (err, response) => {
+ if (err) {
+ return retStream.emit('error', err)
+ }
+
+ if (!response) {
+ // no response object, which means
+ // everything is ok, so we end the
+ // return stream
+ return retStream.push(null) // early
+ }
+
+ if (!isStream(response)) {
+ retStream.push(response)
+ retStream.push(null)
+ return
+ }
+
+ response.on('data', (d) => {
+ if (d.Bytes && options.progress) {
+ options.progress(d.Bytes)
+ }
+ })
+ const convertedResponse = new Converter()
+ convertedResponse.once('end', () => retStream.push(null))
+ convertedResponse.on('data', (d) => retStream.push(d))
+ response.pipe(convertedResponse)
+ })
+
+ // signal the multipart that the underlying stream has drained and that
+ // it can continue producing data..
+ request.on('drain', () => multipart.emit('drain'))
+
+ multipart.pipe(request)
+
+ return retStream
+ }
+}
diff --git a/src/utils/send-one-file-multiple-results.js b/src/utils/send-one-file-multiple-results.js
new file mode 100644
index 000000000..180a9ad34
--- /dev/null
+++ b/src/utils/send-one-file-multiple-results.js
@@ -0,0 +1,18 @@
+'use strict'
+
+const once = require('once')
+const ConcatStream = require('concat-stream')
+const SendFilesStream = require('./send-files-stream')
+
+module.exports = (send, path) => {
+ const sendFilesStream = SendFilesStream(send, path)
+ return (file, options, _callback) => {
+ const callback = once(_callback)
+ const stream = sendFilesStream(options)
+ const concat = ConcatStream((results) => callback(null, results))
+ stream.once('error', callback)
+ stream.pipe(concat)
+ stream.write(file)
+ stream.end()
+ }
+}
diff --git a/src/utils/send-one-file.js b/src/utils/send-one-file.js
new file mode 100644
index 000000000..80de20842
--- /dev/null
+++ b/src/utils/send-one-file.js
@@ -0,0 +1,18 @@
+'use strict'
+
+const SendOneFileMultipleResults = require('./send-one-file-multiple-results')
+
+module.exports = (send, path) => {
+ const sendFile = SendOneFileMultipleResults(send, path)
+ return (file, options, callback) => {
+ sendFile(file, options, (err, results) => {
+ if (err) {
+ return callback(err)
+ }
+ if (results.length !== 1) {
+ return callback(new Error('expected 1 result and had ' + results.length))
+ }
+ callback(null, results[0])
+ })
+ }
+}
diff --git a/src/utils/request-api.js b/src/utils/send-request.js
similarity index 90%
rename from src/utils/request-api.js
rename to src/utils/send-request.js
index 8cac6cbef..336e356d2 100644
--- a/src/utils/request-api.js
+++ b/src/utils/send-request.js
@@ -6,7 +6,6 @@ const isNode = require('detect-node')
const ndjson = require('ndjson')
const pump = require('pump')
const once = require('once')
-const getFilesStream = require('./get-files-stream')
const streamToValue = require('./stream-to-value')
const streamToJsonValue = require('./stream-to-json-value')
const request = require('./request')
@@ -89,10 +88,6 @@ function requestAPI (config, options, callback) {
callback = once(callback)
options.qs = options.qs || {}
- if (Array.isArray(options.files)) {
- options.qs.recursive = true
- }
-
if (Array.isArray(options.path)) {
options.path = options.path.join('/')
}
@@ -102,9 +97,6 @@ function requestAPI (config, options, callback) {
if (options.args) {
options.qs.arg = options.args
}
- if (options.files && !Array.isArray(options.files)) {
- options.files = [options.files]
- }
if (options.progress) {
options.qs.progress = true
}
@@ -117,9 +109,8 @@ function requestAPI (config, options, callback) {
options.qs['stream-channels'] = true
- let stream
- if (options.files) {
- stream = getFilesStream(options.files, options.qs)
+ if (options.stream) {
+ options.buffer = false
}
// this option is only used internally, not passed to daemon
@@ -133,12 +124,12 @@ function requestAPI (config, options, callback) {
headers['User-Agent'] = config['user-agent']
}
- if (options.files) {
- if (!stream.boundary) {
- return callback(new Error('No boundary in multipart stream'))
+ if (options.multipart) {
+ if (!options.multipartBoundary) {
+ return callback(new Error('No multipartBoundary'))
}
- headers['Content-Type'] = `multipart/form-data; boundary=${stream.boundary}`
+ headers['Content-Type'] = `multipart/form-data; boundary=${options.multipartBoundary}`
}
const qs = Qs.stringify(options.qs, {
@@ -174,22 +165,21 @@ function requestAPI (config, options, callback) {
return qsDefaultEncoder(data)
}
})
- const req = request(config.protocol)({
+ const reqOptions = {
hostname: config.host,
path: `${config['api-path']}${options.path}?${qs}`,
port: config.port,
method: method,
headers: headers,
protocol: `${config.protocol}:`
- }, onRes(options.buffer, callback))
+ }
+ const req = request(config.protocol)(reqOptions, onRes(options.buffer, callback))
req.on('error', (err) => {
callback(err)
})
- if (options.files) {
- stream.pipe(req)
- } else {
+ if (!options.stream) {
req.end()
}