diff --git a/package.json b/package.json index adbdaac9..5f9ac49c 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "homepage": "https://github.com/libp2p/js-libp2p", "dependencies": { "async": "^2.6.0", + "libp2p-floodsub": "^0.14.1", "libp2p-ping": "~0.6.1", "libp2p-switch": "~0.36.1", "mafmt": "^4.0.0", diff --git a/src/error-messages.js b/src/error-messages.js new file mode 100644 index 00000000..e3183330 --- /dev/null +++ b/src/error-messages.js @@ -0,0 +1,3 @@ +'use strict' + +exports.NOT_STARTED_YET = 'The libp2p node is not started yet' diff --git a/src/index.js b/src/index.js index b6974988..624105eb 100644 --- a/src/index.js +++ b/src/index.js @@ -14,6 +14,7 @@ const Ping = require('libp2p-ping') const peerRouting = require('./peer-routing') const contentRouting = require('./content-routing') const dht = require('./dht') +const pubsub = require('./pubsub') const getPeerInfo = require('./get-peer-info') exports = module.exports @@ -89,6 +90,7 @@ class Node extends EventEmitter { this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) this.dht = dht(this) + this.pubsub = pubsub(this) this._getPeerInfo = getPeerInfo(this) @@ -149,17 +151,29 @@ class Node extends EventEmitter { cb() }, (cb) => { - // TODO: chicken-and-egg problem: + // TODO: chicken-and-egg problem #1: // have to set started here because DHT requires libp2p is already started this._isStarted = true if (this._dht) { - return this._dht.start(cb) + this._dht.start(cb) + } else { + cb() } - cb() }, + (cb) => { + // TODO: chicken-and-egg problem #2: + // have to set started here because FloodSub requires libp2p is already started + if (this._options !== false) { + this._floodSub.start(cb) + } else { + cb() + } + }, + (cb) => { // detect which multiaddrs we don't have a transport for and remove them const multiaddrs = this.peerInfo.multiaddrs.toArray() + transports.forEach((transport) => { multiaddrs.forEach((multiaddr) => { if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) && @@ -188,6 +202,11 @@ class Node extends EventEmitter { } series([ + (cb) => { + if (this._floodSub.started) { + this._floodSub.stop(cb) + } + }, (cb) => { if (this._dht) { return this._dht.stop(cb) diff --git a/src/pubsub.js b/src/pubsub.js new file mode 100644 index 00000000..51354df8 --- /dev/null +++ b/src/pubsub.js @@ -0,0 +1,89 @@ +'use strict' + +const setImmediate = require('async/setImmediate') +const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET +const FloodSub = require('libp2p-floodsub') + +module.exports = (node) => { + const floodSub = new FloodSub(node) + + node._floodSub = floodSub + + return { + subscribe: (topic, options, handler, callback) => { + if (!node.isStarted()) { + return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + } + + if (typeof options === 'function') { + callback = handler + handler = options + options = {} + } + + function subscribe (cb) { + if (floodSub.listenerCount(topic) === 0) { + floodSub.subscribe(topic) + } + + floodSub.pubsub.on(topic, handler) + setImmediate(cb) + } + + subscribe(callback) + }, + + unsubscribe: (topic, handler) => { + floodSub.removeListener(topic, handler) + + if (floodSub.listenerCount(topic) === 0) { + floodSub.unsubscribe(topic) + } + }, + + publish: (topic, data, callback) => { + if (!node.isStarted()) { + return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + } + + if (!Buffer.isBuffer(data)) { + return setImmediate(() => callback(new Error('data must be a Buffer'))) + } + + floodSub.publish(topic, data) + + setImmediate(() => callback()) + }, + + ls: (callback) => { + if (!node.isStarted()) { + return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + } + + const subscriptions = Array.from(floodSub.subscriptions) + + setImmediate(() => callback(null, subscriptions)) + }, + + peers: (topic, callback) => { + if (!node.isStarted()) { + return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + } + + if (typeof topic === 'function') { + callback = topic + topic = null + } + + const peers = Array.from(floodSub.peers.values()) + .filter((peer) => topic ? peer.topics.has(topic) : true) + .map((peer) => peer.info.id.toB58String()) + + setImmediate(() => callback(null, peers)) + }, + + setMaxListeners (n) { + return floodSub.setMaxListeners(n) + } + } +} diff --git a/test/node.js b/test/node.js index ede2d7f8..0ea47d11 100644 --- a/test/node.js +++ b/test/node.js @@ -4,6 +4,7 @@ require('./base') require('./transports.node') require('./stream-muxing.node') require('./peer-discovery.node') +require('./pubsub.node') require('./peer-routing.node') require('./content-routing.node') require('./circuit-relay.node') diff --git a/test/pubsub.node.js b/test/pubsub.node.js new file mode 100644 index 00000000..8dbde560 --- /dev/null +++ b/test/pubsub.node.js @@ -0,0 +1,52 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ + +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const parallel = require('async/parallel') +const _times = require('lodash.times') +const utils = require('./utils/node') +const createNode = utils.createNode + +describe('.pubsub', () => { + let nodeA + let nodeB + + before(function (done) { + this.timeout(5 * 1000) + + const tasks = _times(2, () => (cb) => { + createNode('/ip4/0.0.0.0/tcp/0', { + mdns: false, + dht: true + }, (err, node) => { + expect(err).to.not.exist() + node.start((err) => cb(err, node)) + }) + }) + + parallel(tasks, (err, nodes) => { + expect(err).to.not.exist() + nodeA = nodes[0] + nodeB = nodes[1] + + nodeA.dial(nodeB.peerInfo, done) + }) + }) + + after((done) => { + parallel([ + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) + ], done) + }) + + describe('.pubsub on (default)', () => { + }) + + describe('.pubsub off', () => { + }) +})