diff --git a/README.md b/README.md index 3d3a36f0..6a484d9f 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex') const SECIO = require('libp2p-secio') const MulticastDNS = require('libp2p-mdns') const DHT = require('libp2p-kad-dht') +const GossipSub = require('libp2p-gossipsub') const defaultsDeep = require('@nodeutils/defaults-deep') const Protector = require('libp2p-pnet') const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') @@ -154,7 +155,8 @@ class Node extends Libp2p { peerDiscovery: [ MulticastDNS ], - dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components + dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components + pubsub: GossipSub }, // libp2p config options (typically found on a config.json) @@ -187,9 +189,8 @@ class Node extends Libp2p { timeout: 10e3 } }, - // Enable/Disable Experimental features - EXPERIMENTAL: { // Experimental features ("behind a flag") - pubsub: false + pubsub: { + enabled: true } } } diff --git a/package.json b/package.json index 2c06660f..6f060947 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,6 @@ "err-code": "^1.1.2", "fsm-event": "^2.1.0", "libp2p-connection-manager": "^0.1.0", - "libp2p-floodsub": "^0.16.1", "libp2p-ping": "^0.8.5", "libp2p-switch": "^0.42.12", "libp2p-websockets": "^0.12.2", @@ -74,6 +73,8 @@ "libp2p-circuit": "^0.3.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", + "libp2p-floodsub": "~0.17.0", + "libp2p-gossipsub": "~0.0.4", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.8.4", @@ -84,6 +85,7 @@ "libp2p-websocket-star": "~0.10.2", "libp2p-websocket-star-rendezvous": "~0.3.0", "lodash.times": "^4.3.2", + "merge-options": "^1.0.1", "nock": "^10.0.6", "pull-goodbye": "0.0.2", "pull-mplex": "^0.1.2", diff --git a/src/config.js b/src/config.js index 6b718038..f3fef2e8 100644 --- a/src/config.js +++ b/src/config.js @@ -19,6 +19,7 @@ const modulesSchema = s({ connProtector: s.union(['undefined', s.interface({ protect: 'function' })]), contentRouting: optional(list(['object'])), dht: optional(s('null|function|object')), + pubsub: optional(s('null|function|object')), peerDiscovery: optional(list([s('object|function')])), peerRouting: optional(list(['object'])), streamMuxer: optional(list([s('object|function')])), @@ -59,12 +60,10 @@ const configSchema = s({ timeout: 10e3 } }), - // Experimental config - EXPERIMENTAL: s({ - pubsub: 'boolean' - }, { - // Experimental defaults - pubsub: false + // Pubsub config + pubsub: s('object?', { + // DHT defaults + enabled: false }) }, {}) diff --git a/src/index.js b/src/index.js index c77faf67..5be1f7b7 100644 --- a/src/index.js +++ b/src/index.js @@ -122,9 +122,9 @@ class Libp2p extends EventEmitter { }) } - // enable/disable pubsub - if (this._config.EXPERIMENTAL.pubsub) { - this.pubsub = pubsub(this) + // start pubsub + if (this._modules.pubsub && this._config.pubsub.enabled !== false) { + this.pubsub = pubsub(this, this._modules.pubsub) } // Attach remaining APIs @@ -403,8 +403,8 @@ class Libp2p extends EventEmitter { } }, (cb) => { - if (this._floodSub) { - return this._floodSub.start(cb) + if (this.pubsub) { + return this.pubsub.start(cb) } cb() }, @@ -442,8 +442,8 @@ class Libp2p extends EventEmitter { ) }, (cb) => { - if (this._floodSub) { - return this._floodSub.stop(cb) + if (this.pubsub) { + return this.pubsub.stop(cb) } cb() }, diff --git a/src/pubsub.js b/src/pubsub.js index a8b429c9..5c6e6505 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -2,15 +2,12 @@ const nextTick = require('async/nextTick') const { messages, codes } = require('./errors') -const FloodSub = require('libp2p-floodsub') const promisify = require('promisify-es6') const errCode = require('err-code') -module.exports = (node) => { - const floodSub = new FloodSub(node) - - node._floodSub = floodSub +module.exports = (node, Pubsub) => { + const pubsub = new Pubsub(node, { emitSelf: true }) return { /** @@ -41,16 +38,16 @@ module.exports = (node) => { options = {} } - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } function subscribe (cb) { - if (floodSub.listenerCount(topic) === 0) { - floodSub.subscribe(topic) + if (pubsub.listenerCount(topic) === 0) { + pubsub.subscribe(topic) } - floodSub.on(topic, handler) + pubsub.on(topic, handler) nextTick(cb) } @@ -80,18 +77,18 @@ module.exports = (node) => { * libp2p.unsubscribe(topic, handler, callback) */ unsubscribe: promisify((topic, handler, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } if (!handler) { - floodSub.removeAllListeners(topic) + pubsub.removeAllListeners(topic) } else { - floodSub.removeListener(topic, handler) + pubsub.removeListener(topic, handler) } - if (floodSub.listenerCount(topic) === 0) { - floodSub.unsubscribe(topic) + if (pubsub.listenerCount(topic) === 0) { + pubsub.unsubscribe(topic) } if (typeof callback === 'function') { @@ -102,29 +99,31 @@ module.exports = (node) => { }), publish: promisify((topic, data, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } - if (!Buffer.isBuffer(data)) { - return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER')) + try { + data = Buffer.from(data) + } catch (err) { + return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')) } - floodSub.publish(topic, data, callback) + pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } - const subscriptions = Array.from(floodSub.subscriptions) + const subscriptions = Array.from(pubsub.subscriptions) nextTick(() => callback(null, subscriptions)) }), peers: promisify((topic, callback) => { - if (!node.isStarted() && !floodSub.started) { + if (!node.isStarted() && !pubsub.started) { return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } @@ -133,7 +132,7 @@ module.exports = (node) => { topic = null } - const peers = Array.from(floodSub.peers.values()) + const peers = Array.from(pubsub.peers.values()) .filter((peer) => topic ? peer.topics.has(topic) : true) .map((peer) => peer.info.id.toB58String()) @@ -141,7 +140,11 @@ module.exports = (node) => { }), setMaxListeners (n) { - return floodSub.setMaxListeners(n) - } + return pubsub.setMaxListeners(n) + }, + + start: (cb) => pubsub.start(cb), + + stop: (cb) => pubsub.stop(cb) } } diff --git a/test/config.spec.js b/test/config.spec.js index 4b0696f4..329ac2f7 100644 --- a/test/config.spec.js +++ b/test/config.spec.js @@ -82,8 +82,8 @@ describe('configuration', () => { peerDiscovery: { autoDial: true }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, dht: { kBucketSize: 20, @@ -144,8 +144,8 @@ describe('configuration', () => { enabled: true } }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, dht: { kBucketSize: 20, @@ -269,8 +269,8 @@ describe('configuration', () => { dht: DHT }, config: { - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false }, peerDiscovery: { autoDial: true diff --git a/test/create.spec.js b/test/create.spec.js index 750c4378..d73582a6 100644 --- a/test/create.spec.js +++ b/test/create.spec.js @@ -19,8 +19,8 @@ describe('libp2p creation', () => { it('should be able to start and stop successfully', (done) => { createNode([], { config: { - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true }, dht: { enabled: true @@ -32,7 +32,7 @@ describe('libp2p creation', () => { const sw = node._switch const cm = node.connectionManager const dht = node._dht - const pub = node._floodSub + const pub = node.pubsub sinon.spy(sw, 'start') sinon.spy(cm, 'start') @@ -77,13 +77,13 @@ describe('libp2p creation', () => { it('should not create disabled modules', (done) => { createNode([], { config: { - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } }, (err, node) => { expect(err).to.not.exist() - expect(node._floodSub).to.not.exist() + expect(node._pubsub).to.not.exist() done() }) }) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index b45e577c..559b10de 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -11,23 +11,31 @@ const parallel = require('async/parallel') const series = require('async/series') const _times = require('lodash.times') +const Floodsub = require('libp2p-floodsub') +const mergeOptions = require('merge-options') + const { codes } = require('../src/errors') const createNode = require('./utils/create-node') -function startTwo (callback) { +function startTwo (options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + const tasks = _times(2, () => (cb) => { - createNode('/ip4/0.0.0.0/tcp/0', { + createNode('/ip4/0.0.0.0/tcp/0', mergeOptions({ config: { peerDiscovery: { mdns: { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true } } - }, (err, node) => { + }, options), (err, node) => { expect(err).to.not.exist() node.start((err) => cb(err, node)) }) @@ -47,22 +55,17 @@ function stopTwo (nodes, callback) { ], callback) } -// There is a vast test suite on PubSub through js-ipfs -// https://github.com/ipfs/interface-ipfs-core/blob/master/js/src/pubsub.js -// and libp2p-floodsub itself -// https://github.com/libp2p/js-libp2p-floodsub/tree/master/test -// TODO: consider if all or some of those should come here describe('.pubsub', () => { - describe('.pubsub on (default)', (done) => { + describe('.pubsub on (default)', () => { it('start two nodes and send one message, then unsubscribe', (done) => { // Check the final series error, and the publish handler expect(2).checks(done) let nodes - const data = Buffer.from('test') + const data = 'test' const handler = (msg) => { // verify the data is correct and mark the expect - expect(msg.data).to.eql(data).mark() + expect(msg.data.toString()).to.eql(data).mark() } series([ @@ -77,10 +80,6 @@ describe('.pubsub', () => { (cb) => setTimeout(cb, 500), // publish on the second (cb) => nodes[1].pubsub.publish('pubsub', data, cb), - // ls subscripts - (cb) => nodes[1].pubsub.ls(cb), - // get subscribed peers - (cb) => nodes[1].pubsub.peers('pubsub', cb), // Wait a moment before unsubscribing (cb) => setTimeout(cb, 500), // unsubscribe on the first @@ -115,6 +114,123 @@ describe('.pubsub', () => { (cb) => setTimeout(cb, 500), // publish on the second (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // ls subscripts + (cb) => nodes[1].pubsub.ls(cb), + // get subscribed peers + (cb) => nodes[1].pubsub.peers('pubsub', cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe from all + (cb) => nodes[0].pubsub.unsubscribe('pubsub', null, cb), + // Verify unsubscribed + (cb) => { + nodes[0].pubsub.ls((err, topics) => { + expect(topics.length).to.eql(0).mark() + cb(err) + }) + }, + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) + }) + it('publish should fail if data is not a buffer nor a string', (done) => { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + peerDiscovery: { + mdns: { + enabled: false + } + }, + pubsub: { + enabled: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + + node.start((err) => { + expect(err).to.not.exist() + + node.pubsub.publish('pubsub', 10, (err) => { + expect(err).to.exist() + expect(err.code).to.equal('ERR_DATA_IS_NOT_VALID') + + done() + }) + }) + }) + }) + }) + + describe('.pubsub on using floodsub', () => { + it('start two nodes and send one message, then unsubscribe', (done) => { + // Check the final series error, and the publish handler + expect(2).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo({ + modules: { + pubsub: Floodsub + } + }, (err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe on the first + (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) + }) + it('start two nodes and send one message, then unsubscribe without handler', (done) => { + // Check the final series error, and the publish handler + expect(3).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo({ + modules: { + pubsub: Floodsub + } + }, (err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), // Wait a moment before unsubscribing (cb) => setTimeout(cb, 500), // unsubscribe from all @@ -141,9 +257,12 @@ describe('.pubsub', () => { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true } + }, + modules: { + pubsub: Floodsub } }, (err, node) => { expect(err).to.not.exist() @@ -151,9 +270,9 @@ describe('.pubsub', () => { node.start((err) => { expect(err).to.not.exist() - node.pubsub.publish('pubsub', 'datastr', (err) => { + node.pubsub.publish('pubsub', 10, (err) => { expect(err).to.exist() - expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER') + expect(err.code).to.equal('ERR_DATA_IS_NOT_VALID') done() }) @@ -170,9 +289,6 @@ describe('.pubsub', () => { mdns: { enabled: false } - }, - EXPERIMENTAL: { - pubsub: false } } }, (err, node) => { @@ -194,8 +310,8 @@ describe('.pubsub', () => { enabled: false } }, - EXPERIMENTAL: { - pubsub: true + pubsub: { + enabled: true } } }, (err, node) => { diff --git a/test/utils/bundle-browser.js b/test/utils/bundle-browser.js index 5e9b3fec..a5140da9 100644 --- a/test/utils/bundle-browser.js +++ b/test/utils/bundle-browser.js @@ -8,6 +8,7 @@ const SPDY = require('libp2p-spdy') const MPLEX = require('libp2p-mplex') const PULLMPLEX = require('pull-mplex') const KadDHT = require('libp2p-kad-dht') +const GossipSub = require('libp2p-gossipsub') const SECIO = require('libp2p-secio') const defaultsDeep = require('@nodeutils/defaults-deep') const libp2p = require('../..') @@ -57,7 +58,8 @@ class Node extends libp2p { wsStar.discovery, Bootstrap ], - dht: KadDHT + dht: KadDHT, + pubsub: GossipSub }, config: { peerDiscovery: { @@ -88,8 +90,8 @@ class Node extends libp2p { }, enabled: false }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } } diff --git a/test/utils/bundle-nodejs.js b/test/utils/bundle-nodejs.js index d4900dd5..842224ca 100644 --- a/test/utils/bundle-nodejs.js +++ b/test/utils/bundle-nodejs.js @@ -6,6 +6,7 @@ const WS = require('libp2p-websockets') const Bootstrap = require('libp2p-bootstrap') const SPDY = require('libp2p-spdy') const KadDHT = require('libp2p-kad-dht') +const GossipSub = require('libp2p-gossipsub') const MPLEX = require('libp2p-mplex') const PULLMPLEX = require('pull-mplex') const SECIO = require('libp2p-secio') @@ -52,7 +53,8 @@ class Node extends libp2p { MulticastDNS, Bootstrap ], - dht: KadDHT + dht: KadDHT, + pubsub: GossipSub }, config: { peerDiscovery: { @@ -81,8 +83,8 @@ class Node extends libp2p { }, enabled: true }, - EXPERIMENTAL: { - pubsub: false + pubsub: { + enabled: false } } }