diff --git a/README.md b/README.md index bcd8f540..8c868dc4 100644 --- a/README.md +++ b/README.md @@ -211,22 +211,18 @@ class Node extends Libp2p { **IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670. -#### Create a Node - `Libp2p.createLibp2p(options, callback)` +#### Create a Node - `Libp2p.create(options)` > Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead ```js -const { createLibp2p } = require('libp2p') -createLibp2p(options, (err, libp2p) => { - if (err) throw err - libp2p.start((err) => { - if (err) throw err - }) -}) +const { create } = require('libp2p') +const libp2p = await create(options) + +await libp2p.start() ``` - `options`: Object of libp2p configuration options -- `callback`: Function with signature `function (Error, Libp2p) {}` #### Create a Node alternative - `new Libp2p(options)` diff --git a/package.json b/package.json index 16986a97..7ea9febb 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "multiaddr": "^7.1.0", "multistream-select": "^0.15.0", "once": "^1.4.0", + "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", "peer-id": "^0.13.3", @@ -90,8 +91,8 @@ "libp2p-bootstrap": "^0.9.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", - "libp2p-floodsub": "~0.17.0", - "libp2p-gossipsub": "~0.0.4", + "libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async", + "libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", @@ -103,6 +104,7 @@ "lodash.times": "^4.3.2", "nock": "^10.0.6", "p-defer": "^3.0.0", + "p-wait-for": "^3.1.0", "portfinder": "^1.0.20", "pull-goodbye": "0.0.2", "pull-length-prefixed": "^1.3.3", diff --git a/src/index.js b/src/index.js index 45648db0..5efcc8fe 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ 'use strict' const FSM = require('fsm-event') -const EventEmitter = require('events').EventEmitter +const { EventEmitter } = require('events') const debug = require('debug') const log = debug('libp2p') log.error = debug('libp2p:error') @@ -9,7 +9,6 @@ const errCode = require('err-code') const promisify = require('promisify-es6') const each = require('async/each') -const nextTick = require('async/nextTick') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') @@ -66,6 +65,8 @@ class Libp2p extends EventEmitter { this._transport = [] // Transport instances/references this._discovery = [] // Discovery service instances/references + this.peerStore = new PeerStore() + // create the switch, and listen for errors this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch) @@ -147,7 +148,7 @@ class Libp2p extends EventEmitter { } // start pubsub - if (this._modules.pubsub && this._config.pubsub.enabled !== false) { + if (this._modules.pubsub) { this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub) } @@ -251,6 +252,7 @@ class Libp2p extends EventEmitter { this.state('stop') try { + this.pubsub && await this.pubsub.stop() await this.transportManager.close() await this._switch.stop() } catch (err) { @@ -385,10 +387,16 @@ class Libp2p extends EventEmitter { const multiaddrs = this.peerInfo.multiaddrs.toArray() // Start parallel tasks + const tasks = [ + this.transportManager.listen(multiaddrs) + ] + + if (this._config.pubsub.enabled) { + this.pubsub && this.pubsub.start() + } + try { - await Promise.all([ - this.transportManager.listen(multiaddrs) - ]) + await Promise.all(tasks) } catch (err) { log.error(err) this.emit('error', err) @@ -483,16 +491,15 @@ module.exports = Libp2p * Like `new Libp2p(options)` except it will create a `PeerInfo` * instance if one is not provided in options. * @param {object} options Libp2p configuration options - * @param {function(Error, Libp2p)} callback - * @returns {void} + * @returns {Libp2p} */ -module.exports.createLibp2p = promisify((options, callback) => { +module.exports.create = async (options = {}) => { if (options.peerInfo) { - return nextTick(callback, null, new Libp2p(options)) + return new Libp2p(options) } - PeerInfo.create((err, peerInfo) => { - if (err) return callback(err) - options.peerInfo = peerInfo - callback(null, new Libp2p(options)) - }) -}) + + const peerInfo = await PeerInfo.create() + + options.peerInfo = peerInfo + return new Libp2p(options) +} diff --git a/src/pubsub.js b/src/pubsub.js index 2246a2d1..344c2f65 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,52 +1,21 @@ 'use strict' -const nextTick = require('async/nextTick') -const { messages, codes } = require('./errors') -const promisify = require('promisify-es6') - const errCode = require('err-code') +const { messages, codes } = require('./errors') module.exports = (node, Pubsub, config) => { - const pubsub = new Pubsub(node, config) + const pubsub = new Pubsub(node.peerInfo, node.registrar, config) return { /** * Subscribe the given handler to a pubsub topic - * * @param {string} topic * @param {function} handler The handler to subscribe - * @param {object|null} [options] - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Subscribe a handler to a topic - * - * // `null` must be passed for options until subscribe is no longer using promisify - * const handler = (message) => { } - * await libp2p.subscribe(topic, handler, null) - * - * @example Use a callback instead of the Promise api - * - * // `options` may be passed or omitted when supplying a callback - * const handler = (message) => { } - * libp2p.subscribe(topic, handler, callback) + * @returns {void} */ - subscribe: (topic, handler, options, callback) => { - // can't use promisify because it thinks the handler is a callback - if (typeof options === 'function') { - callback = options - options = {} - } - + subscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (pubsub.listenerCount(topic) === 0) { @@ -54,46 +23,16 @@ module.exports = (node, Pubsub, config) => { } pubsub.on(topic, handler) - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, /** * Unsubscribes from a pubsub topic - * * @param {string} topic - * @param {function|null} handler The handler to unsubscribe from - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Unsubscribe a topic for all handlers - * - * // `null` must be passed until unsubscribe is no longer using promisify - * await libp2p.unsubscribe(topic, null) - * - * @example Unsubscribe a topic for 1 handler - * - * await libp2p.unsubscribe(topic, handler) - * - * @example Use a callback instead of the Promise api - * - * libp2p.unsubscribe(topic, handler, callback) + * @param {function} [handler] The handler to unsubscribe from */ - unsubscribe: (topic, handler, callback) => { - // can't use promisify because it thinks the handler is a callback + unsubscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (!handler) { @@ -105,61 +44,46 @@ module.exports = (node, Pubsub, config) => { if (pubsub.listenerCount(topic) === 0) { pubsub.unsubscribe(topic) } - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, - publish: promisify((topic, data, callback) => { + publish: (topic, data) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } try { data = Buffer.from(data) } catch (err) { - return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')) + throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID') } - pubsub.publish(topic, data, callback) - }), + return pubsub.publish(topic, data) + }, - ls: promisify((callback) => { + getTopics: () => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - const subscriptions = Array.from(pubsub.subscriptions) + return pubsub.getTopics() + }, - nextTick(() => callback(null, subscriptions)) - }), - - peers: promisify((topic, callback) => { + getPeersSubscribed: (topic) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - if (typeof topic === 'function') { - callback = topic - topic = null - } - - const peers = Array.from(pubsub.peers.values()) - .filter((peer) => topic ? peer.topics.has(topic) : true) - .map((peer) => peer.info.id.toB58String()) - - nextTick(() => callback(null, peers)) - }), + return pubsub.getPeersSubscribed(topic) + }, setMaxListeners (n) { return pubsub.setMaxListeners(n) }, - start: promisify((cb) => pubsub.start(cb)), + _pubsub: pubsub, - stop: promisify((cb) => pubsub.stop(cb)) + start: () => pubsub.start(), + + stop: () => pubsub.stop() } } diff --git a/src/upgrader.js b/src/upgrader.js index 1699451a..6890c6f7 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -186,7 +186,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol }) + this._onStream({ connection, stream, protocol, remotePeer }) } catch (err) { log.error(err) } @@ -254,9 +254,9 @@ class Upgrader { * @param {Stream} options.stream * @param {string} options.protocol */ - _onStream ({ connection, stream, protocol }) { + _onStream ({ connection, stream, protocol, remotePeer }) { const handler = this.protocols.get(protocol) - handler({ connection, stream, protocol }) + handler({ connection, stream, protocol, remotePeer }) } /** diff --git a/test/pubsub/configuration.node.js b/test/pubsub/configuration.node.js new file mode 100644 index 00000000..7ec68b30 --- /dev/null +++ b/test/pubsub/configuration.node.js @@ -0,0 +1,93 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { baseOptions, subsystemOptions } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is configurable', () => { + let libp2p + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should not exist if no module is provided', async () => { + libp2p = await create(baseOptions) + expect(libp2p.pubsub).to.not.exist() + }) + + it('should exist if the module is provided', async () => { + libp2p = await create(subsystemOptions) + expect(libp2p.pubsub).to.exist() + }) + + it('should start and stop by default once libp2p starts', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + + await libp2p.stop() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should not start if disabled once libp2p starts', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should allow a manual start', async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.pubsub.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + }) +}) diff --git a/test/pubsub/implementations.node.js b/test/pubsub/implementations.node.js new file mode 100644 index 00000000..120ee84d --- /dev/null +++ b/test/pubsub/implementations.node.js @@ -0,0 +1,101 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const Floodsub = require('libp2p-floodsub') +const Gossipsub = require('libp2p-gossipsub') +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') +const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { baseOptions } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is able to use different implementations', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await Promise.all([ + PeerInfo.create(), + PeerInfo.create() + ]) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('Floodsub nodes', () => { + return pubsubTest(floodsubMulticodec, Floodsub) + }) + + it('Gossipsub nodes', () => { + return pubsubTest(gossipsubMulticodec, Gossipsub) + }) + + const pubsubTest = async (multicodec, pubsub) => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + libp2p = await create(mergeOptions(baseOptions, { + peerInfo, + modules: { + pubsub: pubsub + } + })) + + remoteLibp2p = await create(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo, + modules: { + pubsub: pubsub + } + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + const libp2pId = libp2p.peerInfo.id.toB58String() + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + + const connection = await libp2p.dialProtocol(remAddr, multicodec) + + await new Promise((resolve) => setTimeout(resolve, 1000)) + expect(connection).to.exist() + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + await new Promise((resolve) => setTimeout(resolve, 1000)) + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + } +}) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js new file mode 100644 index 00000000..b52d6c13 --- /dev/null +++ b/test/pubsub/operation.node.js @@ -0,0 +1,174 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const PeerInfo = require('peer-info') + +const { create } = require('../../src') +const { subsystemOptions, subsystemMulticodecs } = require('./utils') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem operates correctly', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await Promise.all([ + PeerInfo.create(), + PeerInfo.create() + ]) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + describe('pubsub started before connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it('should get notified of connected peers on dial', async () => { + sinon.spy(libp2p.registrar, 'onConnect') + sinon.spy(remoteLibp2p.registrar, 'onConnect') + + const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + expect(connection).to.exist() + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) + + expect(libp2p.registrar.onConnect.callCount).to.equal(1) + expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) + }) + + it('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + const libp2pId = libp2p.peerInfo.id.toB58String() + + await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) + + // TODO: Needs identify push + describe.skip('pubsub started after connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo, + config: { + pubsub: { + enabled: false + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it.skip('should get notified of connected peers after starting', async () => { + const connection = await libp2p.dial(remAddr) + + expect(connection).to.exist() + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0) + + remoteLibp2p.pubsub.start() + + // Wait for + // Validate + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) + }) + + it.skip('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + await libp2p.dial(remAddr) + + remoteLibp2p.pubsub.start() + + // TODO: wait for + + libp2p.pubsub.subscribe(topic) + libp2p.pubsub.once(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + libp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) +}) diff --git a/test/pubsub/utils.js b/test/pubsub/utils.js new file mode 100644 index 00000000..11495c5d --- /dev/null +++ b/test/pubsub/utils.js @@ -0,0 +1,29 @@ +'use strict' + +const Gossipsub = require('libp2p-gossipsub') +const { multicodec } = require('libp2p-gossipsub') +const Crypto = require('../../src/insecure/plaintext') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-tcp') + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +const subsystemOptions = mergeOptions(baseOptions, { + modules: { + pubsub: Gossipsub + } +}) + +module.exports.subsystemOptions = subsystemOptions + +module.exports.subsystemMulticodecs = [multicodec]