Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit d2eb925

Browse files
dignifiedquiredaviddias
authored andcommitted
feat(pubsub): Add pubsub api (#493)
* feat(pubsub): Add pubsub api
1 parent 820150c commit d2eb925

9 files changed

+466
-2
lines changed

README.md

+24
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,30 @@ $ ipfs config --json API.HTTPHeaders.Access-Control-Allow-Methods "[\"PUT\", \"P
139139

140140
> `js-ipfs-api` follows the spec defined by [`interface-ipfs-core`](https://github.com/ipfs/interface-ipfs-core), which concerns the interface to expect from IPFS implementations. This interface is a currently active endeavor. You can use it today to consult the methods available.
141141
142+
#### Caveats
143+
144+
##### Pubsub
145+
146+
**Currently, the [PubSub API only works in Node.js envinroment](https://github.com/ipfs/js-ipfs-api/issues/518)**
147+
148+
We currently don't support pubsub when run in the browser, and we test it with separate set of tests to make sure if it's being used in the browser, pubsub errors.
149+
150+
More info: https://github.com/ipfs/js-ipfs-api/issues/518
151+
152+
This means:
153+
- You can use pubsub from js-ipfs-api in Node.js
154+
- You can use pubsub from js-ipfs-api in Electron
155+
(when js-ipfs-api is ran in the main process of Electron)
156+
- You can't use pubsub from js-ipfs-api in the browser
157+
- You can't use pubsub from js-ipfs-api in Electron's
158+
renderer process
159+
- You can use pubsub from js-ipfs in the browsers
160+
- You can use pubsub from js-ipfs in Node.js
161+
- You can use pubsub from js-ipfs in Electron
162+
(in both the main process and the renderer process)
163+
- See https://github.com/ipfs/js-ipfs for details on
164+
pubsub in js-ipfs
165+
142166
##### [bitswap]()
143167

144168
- [`ipfs.bitswap.wantlist()`]()

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"eslint-plugin-react": "^6.10.3",
6767
"gulp": "^3.9.1",
6868
"hapi": "^16.1.0",
69-
"interface-ipfs-core": "~0.26.1",
69+
"interface-ipfs-core": "~0.26.2",
7070
"ipfsd-ctl": "~0.20.0",
7171
"pre-commit": "^1.2.2",
7272
"socket.io": "^1.7.3",

src/api/pubsub.js

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const EventEmitter = require('events')
5+
const eos = require('end-of-stream')
6+
const isNode = require('detect-node')
7+
const PubsubMessageStream = require('../pubsub-message-stream')
8+
const stringlistToArray = require('../stringlist-to-array')
9+
10+
const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser')
11+
12+
/* Public API */
13+
module.exports = (send) => {
14+
/* Internal subscriptions state and functions */
15+
const ps = new EventEmitter()
16+
const subscriptions = {}
17+
ps.id = Math.random()
18+
return {
19+
subscribe: (topic, options, handler, callback) => {
20+
const defaultOptions = {
21+
discover: false
22+
}
23+
24+
if (typeof options === 'function') {
25+
callback = handler
26+
handler = options
27+
options = defaultOptions
28+
}
29+
30+
if (!options) {
31+
options = defaultOptions
32+
}
33+
34+
// Throw an error if ran in the browsers
35+
if (!isNode) {
36+
if (!callback) {
37+
return Promise.reject(NotSupportedError())
38+
}
39+
return callback(NotSupportedError())
40+
}
41+
42+
// promisify doesn't work as we always pass a
43+
// function as last argument (`handler`)
44+
if (!callback) {
45+
return new Promise((resolve, reject) => {
46+
subscribe(topic, options, handler, (err) => {
47+
if (err) {
48+
return reject(err)
49+
}
50+
resolve()
51+
})
52+
})
53+
}
54+
55+
subscribe(topic, options, handler, callback)
56+
},
57+
unsubscribe: (topic, handler) => {
58+
if (!isNode) {
59+
throw NotSupportedError()
60+
}
61+
62+
if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
63+
throw new Error(`Not subscribed to '${topic}'`)
64+
}
65+
66+
ps.removeListener(topic, handler)
67+
68+
// Drop the request once we are actualy done
69+
if (ps.listenerCount(topic) === 0) {
70+
subscriptions[topic].abort()
71+
subscriptions[topic] = null
72+
}
73+
},
74+
publish: promisify((topic, data, callback) => {
75+
if (!isNode) {
76+
return callback(NotSupportedError())
77+
}
78+
79+
if (!Buffer.isBuffer(data)) {
80+
return callback(new Error('data must be a Buffer'))
81+
}
82+
83+
const request = {
84+
path: 'pubsub/pub',
85+
args: [topic, data]
86+
}
87+
88+
send(request, callback)
89+
}),
90+
ls: promisify((callback) => {
91+
if (!isNode) {
92+
return callback(NotSupportedError())
93+
}
94+
95+
const request = {
96+
path: 'pubsub/ls'
97+
}
98+
99+
send.andTransform(request, stringlistToArray, callback)
100+
}),
101+
peers: promisify((topic, callback) => {
102+
if (!isNode) {
103+
return callback(NotSupportedError())
104+
}
105+
106+
const request = {
107+
path: 'pubsub/peers',
108+
args: [topic]
109+
}
110+
111+
send.andTransform(request, stringlistToArray, callback)
112+
}),
113+
setMaxListeners (n) {
114+
return ps.setMaxListeners(n)
115+
}
116+
}
117+
118+
function subscribe (topic, options, handler, callback) {
119+
ps.on(topic, handler)
120+
if (subscriptions[topic]) {
121+
return callback()
122+
}
123+
124+
// Request params
125+
const request = {
126+
path: 'pubsub/sub',
127+
args: [topic],
128+
qs: {
129+
discover: options.discover
130+
}
131+
}
132+
133+
// Start the request and transform the response
134+
// stream to Pubsub messages stream
135+
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
136+
if (err) {
137+
subscriptions[topic] = null
138+
ps.removeListener(topic, handler)
139+
return callback(err)
140+
}
141+
142+
stream.on('data', (msg) => {
143+
ps.emit(topic, msg)
144+
})
145+
146+
stream.on('error', (err) => {
147+
ps.emit('error', err)
148+
})
149+
150+
eos(stream, (err) => {
151+
if (err) {
152+
ps.emit('error', err)
153+
}
154+
155+
subscriptions[topic] = null
156+
ps.removeListener(topic, handler)
157+
})
158+
159+
callback()
160+
})
161+
}
162+
}

src/load-commands.js

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ function requireCommands () {
2525
refs: require('./api/refs'),
2626
repo: require('./api/repo'),
2727
swarm: require('./api/swarm'),
28+
pubsub: require('./api/pubsub'),
2829
update: require('./api/update'),
2930
version: require('./api/version')
3031
}

src/pubsub-message-stream.js

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict'
2+
3+
const TransformStream = require('readable-stream').Transform
4+
const PubsubMessage = require('./pubsub-message-utils')
5+
6+
class PubsubMessageStream extends TransformStream {
7+
constructor (options) {
8+
const opts = Object.assign(options || {}, { objectMode: true })
9+
super(opts)
10+
}
11+
12+
static from (inputStream, callback) {
13+
let outputStream = inputStream.pipe(new PubsubMessageStream())
14+
inputStream.on('end', () => outputStream.emit('end'))
15+
callback(null, outputStream)
16+
}
17+
18+
_transform (obj, enc, callback) {
19+
let msg
20+
try {
21+
msg = PubsubMessage.deserialize(obj, 'base64')
22+
} catch (err) {
23+
// Not a valid pubsub message
24+
// go-ipfs returns '{}' as the very first object atm, we skip that
25+
return callback()
26+
}
27+
28+
this.push(msg)
29+
callback()
30+
}
31+
}
32+
33+
module.exports = PubsubMessageStream

src/pubsub-message-utils.js

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
'use strict'
2+
3+
const bs58 = require('bs58')
4+
5+
module.exports = {
6+
deserialize (data, enc) {
7+
enc = enc ? enc.toLowerCase() : 'json'
8+
9+
if (enc === 'json') {
10+
return deserializeFromJson(data)
11+
} else if (enc === 'base64') {
12+
return deserializeFromBase64(data)
13+
}
14+
15+
throw new Error(`Unsupported encoding: '${enc}'`)
16+
}
17+
}
18+
19+
function deserializeFromJson (data) {
20+
const json = JSON.parse(data)
21+
return deserializeFromBase64(json)
22+
}
23+
24+
function deserializeFromBase64 (obj) {
25+
if (!isPubsubMessage(obj)) {
26+
throw new Error(`Not a pubsub message`)
27+
}
28+
29+
return {
30+
from: bs58.encode(new Buffer(obj.from, 'base64')).toString(),
31+
seqno: new Buffer(obj.seqno, 'base64'),
32+
data: new Buffer(obj.data, 'base64'),
33+
topicCIDs: obj.topicIDs || obj.topicCIDs
34+
}
35+
}
36+
37+
function isPubsubMessage (obj) {
38+
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
39+
}

test/interface/pubsub.spec.js

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/* eslint-env mocha */
2+
3+
'use strict'
4+
5+
const test = require('interface-ipfs-core')
6+
const FactoryClient = require('../ipfs-factory/client')
7+
const isNode = require('detect-node')
8+
9+
if (isNode) {
10+
let fc
11+
12+
const common = {
13+
setup: function (callback) {
14+
fc = new FactoryClient()
15+
callback(null, fc)
16+
},
17+
teardown: function (callback) {
18+
fc.dismantle(callback)
19+
}
20+
}
21+
22+
test.pubsub(common)
23+
}

test/ipfs-factory/daemon-spawner.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ function spawnEphemeralNode (callback) {
8484
node.setConfig(configKey, configVal, cb)
8585
}, cb)
8686
},
87-
(cb) => node.startDaemon(cb)
87+
(cb) => node.startDaemon(['--enable-pubsub-experiment'], cb)
8888
], (err) => callback(err, node))
8989
})
9090
}

0 commit comments

Comments
 (0)