diff --git a/README.md b/README.md index de08e1b0..1a28d9b9 100644 --- a/README.md +++ b/README.md @@ -211,22 +211,18 @@ class Node extends Libp2p { **IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670. -#### Create a Node - `Libp2p.createLibp2p(options, callback)` +#### Create a Node - `Libp2p.create(options)` > Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead ```js -const { createLibp2p } = require('libp2p') -createLibp2p(options, (err, libp2p) => { - if (err) throw err - libp2p.start((err) => { - if (err) throw err - }) -}) +const { create } = require('libp2p') +const libp2p = await create(options) + +await libp2p.start() ``` - `options`: Object of libp2p configuration options -- `callback`: Function with signature `function (Error, Libp2p) {}` #### Create a Node alternative - `new Libp2p(options)` diff --git a/package.json b/package.json index 16986a97..1ebd91a9 100644 --- a/package.json +++ b/package.json @@ -55,13 +55,14 @@ "it-protocol-buffers": "^0.2.0", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.17.1", - "libp2p-interfaces": "^0.1.3", + "libp2p-interfaces": "^0.1.5", "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", "multiaddr": "^7.1.0", "multistream-select": "^0.15.0", "once": "^1.4.0", + "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", "peer-id": "^0.13.3", @@ -90,8 +91,8 @@ "libp2p-bootstrap": "^0.9.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", - "libp2p-floodsub": "~0.17.0", - "libp2p-gossipsub": "~0.0.4", + "libp2p-floodsub": "^0.19.0", + "libp2p-gossipsub": "ChainSafe/gossipsub-js#beta/async", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", "libp2p-mplex": "^0.9.1", @@ -103,6 +104,7 @@ "lodash.times": "^4.3.2", "nock": "^10.0.6", "p-defer": "^3.0.0", + "p-wait-for": "^3.1.0", "portfinder": "^1.0.20", "pull-goodbye": "0.0.2", "pull-length-prefixed": "^1.3.3", diff --git a/src/connection-manager/topology.js b/src/connection-manager/topology.js deleted file mode 100644 index 2c2a8779..00000000 --- a/src/connection-manager/topology.js +++ /dev/null @@ -1,108 +0,0 @@ -'use strict' - -const assert = require('assert') - -class Topology { - /** - * @param {Object} props - * @param {number} props.min minimum needed connections (default: 0) - * @param {number} props.max maximum needed connections (default: Infinity) - * @param {Array} props.multicodecs protocol multicodecs - * @param {Object} props.handlers - * @param {function} props.handlers.onConnect protocol "onConnect" handler - * @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler - * @constructor - */ - constructor ({ - min = 0, - max = Infinity, - multicodecs, - handlers - }) { - assert(multicodecs, 'one or more multicodec should be provided') - assert(handlers, 'the handlers should be provided') - assert(handlers.onConnect && typeof handlers.onConnect === 'function', - 'the \'onConnect\' handler must be provided') - assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function', - 'the \'onDisconnect\' handler must be provided') - - this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs] - this.min = min - this.max = max - - // Handlers - this._onConnect = handlers.onConnect - this._onDisconnect = handlers.onDisconnect - - this.peers = new Map() - this._registrar = undefined - - this._onProtocolChange = this._onProtocolChange.bind(this) - } - - set registrar (registrar) { - this._registrar = registrar - this._registrar.peerStore.on('change:protocols', this._onProtocolChange) - - // Update topology peers - this._updatePeers(this._registrar.peerStore.peers.values()) - } - - /** - * Update topology. - * @param {Array} peerInfoIterable - * @returns {void} - */ - _updatePeers (peerInfoIterable) { - for (const peerInfo of peerInfoIterable) { - if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) { - // Add the peer regardless of whether or not there is currently a connection - this.peers.set(peerInfo.id.toB58String(), peerInfo) - // If there is a connection, call _onConnect - const connection = this._registrar.getConnection(peerInfo) - connection && this._onConnect(peerInfo, connection) - } else { - // Remove any peers we might be tracking that are no longer of value to us - this.peers.delete(peerInfo.id.toB58String()) - } - } - } - - /** - * Notify protocol of peer disconnected. - * @param {PeerInfo} peerInfo - * @param {Error} [error] - * @returns {void} - */ - disconnect (peerInfo, error) { - this._onDisconnect(peerInfo, error) - } - - /** - * Check if a new peer support the multicodecs for this topology. - * @param {Object} props - * @param {PeerInfo} props.peerInfo - * @param {Array} props.protocols - */ - _onProtocolChange ({ peerInfo, protocols }) { - const existingPeer = this.peers.get(peerInfo.id.toB58String()) - const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) - - // Not supporting the protocol anymore? - if (existingPeer && hasProtocol.length === 0) { - this._onDisconnect({ - peerInfo - }) - } - - // New to protocol support - for (const protocol of protocols) { - if (this.multicodecs.includes(protocol)) { - this._updatePeers([peerInfo]) - return - } - } - } -} - -module.exports = Topology diff --git a/src/index.js b/src/index.js index 45648db0..5efcc8fe 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ 'use strict' const FSM = require('fsm-event') -const EventEmitter = require('events').EventEmitter +const { EventEmitter } = require('events') const debug = require('debug') const log = debug('libp2p') log.error = debug('libp2p:error') @@ -9,7 +9,6 @@ const errCode = require('err-code') const promisify = require('promisify-es6') const each = require('async/each') -const nextTick = require('async/nextTick') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') @@ -66,6 +65,8 @@ class Libp2p extends EventEmitter { this._transport = [] // Transport instances/references this._discovery = [] // Discovery service instances/references + this.peerStore = new PeerStore() + // create the switch, and listen for errors this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch) @@ -147,7 +148,7 @@ class Libp2p extends EventEmitter { } // start pubsub - if (this._modules.pubsub && this._config.pubsub.enabled !== false) { + if (this._modules.pubsub) { this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub) } @@ -251,6 +252,7 @@ class Libp2p extends EventEmitter { this.state('stop') try { + this.pubsub && await this.pubsub.stop() await this.transportManager.close() await this._switch.stop() } catch (err) { @@ -385,10 +387,16 @@ class Libp2p extends EventEmitter { const multiaddrs = this.peerInfo.multiaddrs.toArray() // Start parallel tasks + const tasks = [ + this.transportManager.listen(multiaddrs) + ] + + if (this._config.pubsub.enabled) { + this.pubsub && this.pubsub.start() + } + try { - await Promise.all([ - this.transportManager.listen(multiaddrs) - ]) + await Promise.all(tasks) } catch (err) { log.error(err) this.emit('error', err) @@ -483,16 +491,15 @@ module.exports = Libp2p * Like `new Libp2p(options)` except it will create a `PeerInfo` * instance if one is not provided in options. * @param {object} options Libp2p configuration options - * @param {function(Error, Libp2p)} callback - * @returns {void} + * @returns {Libp2p} */ -module.exports.createLibp2p = promisify((options, callback) => { +module.exports.create = async (options = {}) => { if (options.peerInfo) { - return nextTick(callback, null, new Libp2p(options)) + return new Libp2p(options) } - PeerInfo.create((err, peerInfo) => { - if (err) return callback(err) - options.peerInfo = peerInfo - callback(null, new Libp2p(options)) - }) -}) + + const peerInfo = await PeerInfo.create() + + options.peerInfo = peerInfo + return new Libp2p(options) +} diff --git a/src/pubsub.js b/src/pubsub.js index 2246a2d1..2c067d99 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,52 +1,21 @@ 'use strict' -const nextTick = require('async/nextTick') -const { messages, codes } = require('./errors') -const promisify = require('promisify-es6') - const errCode = require('err-code') +const { messages, codes } = require('./errors') module.exports = (node, Pubsub, config) => { - const pubsub = new Pubsub(node, config) + const pubsub = new Pubsub(node.peerInfo, node.registrar, config) return { /** * Subscribe the given handler to a pubsub topic - * * @param {string} topic * @param {function} handler The handler to subscribe - * @param {object|null} [options] - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Subscribe a handler to a topic - * - * // `null` must be passed for options until subscribe is no longer using promisify - * const handler = (message) => { } - * await libp2p.subscribe(topic, handler, null) - * - * @example Use a callback instead of the Promise api - * - * // `options` may be passed or omitted when supplying a callback - * const handler = (message) => { } - * libp2p.subscribe(topic, handler, callback) + * @returns {void} */ - subscribe: (topic, handler, options, callback) => { - // can't use promisify because it thinks the handler is a callback - if (typeof options === 'function') { - callback = options - options = {} - } - + subscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (pubsub.listenerCount(topic) === 0) { @@ -54,46 +23,16 @@ module.exports = (node, Pubsub, config) => { } pubsub.on(topic, handler) - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, /** * Unsubscribes from a pubsub topic - * * @param {string} topic - * @param {function|null} handler The handler to unsubscribe from - * @param {function} [callback] An optional callback - * - * @returns {Promise|void} A promise is returned if no callback is provided - * - * @example Unsubscribe a topic for all handlers - * - * // `null` must be passed until unsubscribe is no longer using promisify - * await libp2p.unsubscribe(topic, null) - * - * @example Unsubscribe a topic for 1 handler - * - * await libp2p.unsubscribe(topic, handler) - * - * @example Use a callback instead of the Promise api - * - * libp2p.unsubscribe(topic, handler, callback) + * @param {function} [handler] The handler to unsubscribe from */ - unsubscribe: (topic, handler, callback) => { - // can't use promisify because it thinks the handler is a callback + unsubscribe: (topic, handler) => { if (!node.isStarted() && !pubsub.started) { - const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - - if (callback) { - return nextTick(() => callback(err)) - } - - return Promise.reject(err) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } if (!handler) { @@ -105,61 +44,61 @@ module.exports = (node, Pubsub, config) => { if (pubsub.listenerCount(topic) === 0) { pubsub.unsubscribe(topic) } - - if (callback) { - return nextTick(() => callback()) - } - - return Promise.resolve() }, - publish: promisify((topic, data, callback) => { + /** + * Publish messages to the given topics. + * @param {Array|string} topic + * @param {Buffer} data + * @returns {Promise} + */ + publish: (topic, data) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } try { data = Buffer.from(data) } catch (err) { - return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')) + throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID') } - pubsub.publish(topic, data, callback) - }), + return pubsub.publish(topic, data) + }, - ls: promisify((callback) => { + /** + * Get a list of topics the node is subscribed to. + * @returns {Array} topics + */ + getTopics: () => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - const subscriptions = Array.from(pubsub.subscriptions) + return pubsub.getTopics() + }, - nextTick(() => callback(null, subscriptions)) - }), - - peers: promisify((topic, callback) => { + /** + * Get a list of the peer-ids that are subscribed to one topic. + * @param {string} topic + * @returns {Array} + */ + getPeersSubscribed: (topic) => { if (!node.isStarted() && !pubsub.started) { - return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) + throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) } - if (typeof topic === 'function') { - callback = topic - topic = null - } - - const peers = Array.from(pubsub.peers.values()) - .filter((peer) => topic ? peer.topics.has(topic) : true) - .map((peer) => peer.info.id.toB58String()) - - nextTick(() => callback(null, peers)) - }), + return pubsub.getPeersSubscribed(topic) + }, setMaxListeners (n) { return pubsub.setMaxListeners(n) }, - start: promisify((cb) => pubsub.start(cb)), + _pubsub: pubsub, - stop: promisify((cb) => pubsub.stop(cb)) + start: () => pubsub.start(), + + stop: () => pubsub.stop() } } diff --git a/src/registrar.js b/src/registrar.js index c6e4439c..c4a11680 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -5,9 +5,9 @@ const debug = require('debug') const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') +const Topology = require('libp2p-interfaces/src/topology') const { Connection } = require('libp2p-interfaces/src/connection') const PeerInfo = require('peer-info') -const Toplogy = require('./connection-manager/topology') /** * Responsible for notifying registered protocols of events in the network. @@ -106,17 +106,16 @@ class Registrar { /** * Register handlers for a set of multicodecs given - * @param {Object} topologyProps properties for topology - * @param {Array|string} topologyProps.multicodecs - * @param {Object} topologyProps.handlers - * @param {function} topologyProps.handlers.onConnect - * @param {function} topologyProps.handlers.onDisconnect + * @param {Topology} topology protocol topology * @return {string} registrar identifier */ - register (topologyProps) { - // Create multicodec topology + register (topology) { + assert( + Topology.isTopology(topology), + 'topology must be an instance of interfaces/topology') + + // Create topology const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() - const topology = new Toplogy(topologyProps) this.topologies.set(id, topology) diff --git a/test/pubsub/configuration.node.js b/test/pubsub/configuration.node.js new file mode 100644 index 00000000..829e303f --- /dev/null +++ b/test/pubsub/configuration.node.js @@ -0,0 +1,92 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { baseOptions, subsystemOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is configurable', () => { + let libp2p + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should not exist if no module is provided', async () => { + libp2p = await create(baseOptions) + expect(libp2p.pubsub).to.not.exist() + }) + + it('should exist if the module is provided', async () => { + libp2p = await create(subsystemOptions) + expect(libp2p.pubsub).to.exist() + }) + + it('should start and stop by default once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + + await libp2p.stop() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should not start if disabled once libp2p starts', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + }) + + it('should allow a manual start', async () => { + const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1) + peerInfo.multiaddrs.add(listenAddr) + + const customOptions = mergeOptions(subsystemOptions, { + peerInfo, + config: { + pubsub: { + enabled: false + } + } + }) + + libp2p = await create(customOptions) + await libp2p.start() + expect(libp2p.pubsub._pubsub.started).to.equal(false) + + await libp2p.pubsub.start() + expect(libp2p.pubsub._pubsub.started).to.equal(true) + }) +}) diff --git a/test/pubsub/implementations.node.js b/test/pubsub/implementations.node.js new file mode 100644 index 00000000..cb2ee9f2 --- /dev/null +++ b/test/pubsub/implementations.node.js @@ -0,0 +1,95 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const Floodsub = require('libp2p-floodsub') +const Gossipsub = require('libp2p-gossipsub') +const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') +const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub') + +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { baseOptions } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem is able to use different implementations', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + it('Floodsub nodes', () => { + return pubsubTest(floodsubMulticodec, Floodsub) + }) + + it('Gossipsub nodes', () => { + return pubsubTest(gossipsubMulticodec, Gossipsub) + }) + + const pubsubTest = async (multicodec, pubsub) => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + libp2p = await create(mergeOptions(baseOptions, { + peerInfo, + modules: { + pubsub: pubsub + } + })) + + remoteLibp2p = await create(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo, + modules: { + pubsub: pubsub + } + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + const libp2pId = libp2p.peerInfo.id.toB58String() + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + + const connection = await libp2p.dialProtocol(remAddr, multicodec) + expect(connection).to.exist() + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + + remoteLibp2p.pubsub.publish(topic, data) + await defer.promise + } +}) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js new file mode 100644 index 00000000..5ffe3f34 --- /dev/null +++ b/test/pubsub/operation.node.js @@ -0,0 +1,184 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const pWaitFor = require('p-wait-for') +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') +const multiaddr = require('multiaddr') + +const { create } = require('../../src') +const { subsystemOptions, subsystemMulticodecs } = require('./utils') +const peerUtils = require('../utils/creators/peer') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('Pubsub subsystem operates correctly', () => { + let peerInfo, remotePeerInfo + let libp2p, remoteLibp2p + let remAddr + + beforeEach(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + + peerInfo.multiaddrs.add(listenAddr) + remotePeerInfo.multiaddrs.add(remoteListenAddr) + }) + + describe('pubsub started before connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo + })) + + await Promise.all([ + libp2p.start(), + remoteLibp2p.start() + ]) + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it('should get notified of connected peers on dial', async () => { + const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + expect(connection).to.exist() + + return Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) + }) + + it('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + const libp2pId = libp2p.peerInfo.id.toB58String() + + await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) + + describe('pubsub started after connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerInfo + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerInfo: remotePeerInfo, + config: { + pubsub: { + enabled: false + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + remAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it('should get notified of connected peers after starting', async () => { + const connection = await libp2p.dial(remAddr) + + expect(connection).to.exist() + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0) + expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0) + + remoteLibp2p.pubsub.start() + + return Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) + }) + + it('should receive pubsub messages', async function () { + this.timeout(10e3) + const defer = pDefer() + const libp2pId = libp2p.peerInfo.id.toB58String() + const topic = 'test-topic' + const data = 'hey!' + + await libp2p.dial(remAddr) + + remoteLibp2p.pubsub.start() + + await Promise.all([ + pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + ]) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, (msg) => { + expect(msg.data.toString()).to.equal(data) + defer.resolve() + }) + + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + + remoteLibp2p.pubsub.publish(topic, data) + + await defer.promise + }) + }) +}) diff --git a/test/pubsub/utils.js b/test/pubsub/utils.js new file mode 100644 index 00000000..11495c5d --- /dev/null +++ b/test/pubsub/utils.js @@ -0,0 +1,29 @@ +'use strict' + +const Gossipsub = require('libp2p-gossipsub') +const { multicodec } = require('libp2p-gossipsub') +const Crypto = require('../../src/insecure/plaintext') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-tcp') + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +const subsystemOptions = mergeOptions(baseOptions, { + modules: { + pubsub: Gossipsub + } +}) + +module.exports.subsystemOptions = subsystemOptions + +module.exports.subsystemMulticodecs = [multicodec] diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index ec7d1b61..9114e035 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -7,6 +7,7 @@ const { expect } = chai const pDefer = require('p-defer') const PeerInfo = require('peer-info') +const Topology = require('libp2p-interfaces/src/topology/multicodec-topology') const PeerStore = require('../../src/peer-store') const Registrar = require('../../src/registrar') const { createMockConnection } = require('./utils') @@ -32,52 +33,17 @@ describe('registrar', () => { throw new Error('should fail to register a protocol if no multicodec is provided') }) - it('should fail to register a protocol if no handlers are provided', () => { - const topologyProps = { - multicodecs: multicodec + it('should fail to register a protocol if an invalid topology is provided', () => { + const fakeTopology = { + random: 1 } - try { - registrar.register(topologyProps) + registrar.register() } catch (err) { - expect(err).to.exist() + expect(err).to.exist(fakeTopology) return } - throw new Error('should fail to register a protocol if no handlers are provided') - }) - - it('should fail to register a protocol if the onConnect handler is not provided', () => { - const topologyProps = { - multicodecs: multicodec, - handlers: { - onDisconnect: () => { } - } - } - - try { - registrar.register(topologyProps) - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if the onConnect handler is not provided') - }) - - it('should fail to register a protocol if the onDisconnect handler is not provided', () => { - const topologyProps = { - multicodecs: multicodec, - handlers: { - onConnect: () => { } - } - } - - try { - registrar.register(topologyProps) - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if the onDisconnect handler is not provided') + throw new Error('should fail to register a protocol if an invalid topology is provided') }) }) @@ -88,13 +54,13 @@ describe('registrar', () => { }) it('should be able to register a protocol', () => { - const topologyProps = { + const topologyProps = new Topology({ + multicodecs: multicodec, handlers: { onConnect: () => { }, onDisconnect: () => { } - }, - multicodecs: multicodec - } + } + }) const identifier = registrar.register(topologyProps) @@ -102,13 +68,13 @@ describe('registrar', () => { }) it('should be able to unregister a protocol', () => { - const topologyProps = { + const topologyProps = new Topology({ + multicodecs: multicodec, handlers: { onConnect: () => { }, onDisconnect: () => { } - }, - multicodecs: multicodec - } + } + }) const identifier = registrar.register(topologyProps) const success = registrar.unregister(identifier) @@ -138,7 +104,7 @@ describe('registrar', () => { registrar.onConnect(remotePeerInfo, conn) expect(registrar.connections.size).to.eql(1) - const topologyProps = { + const topologyProps = new Topology({ multicodecs: multicodec, handlers: { onConnect: (peerInfo, connection) => { @@ -153,7 +119,7 @@ describe('registrar', () => { onDisconnectDefer.resolve() } } - } + }) // Register protocol const identifier = registrar.register(topologyProps) @@ -161,11 +127,9 @@ describe('registrar', () => { // Topology created expect(topology).to.exist() - expect(topology.peers.size).to.eql(1) registrar.onDisconnect(remotePeerInfo) expect(registrar.connections.size).to.eql(0) - expect(topology.peers.size).to.eql(1) // topology should keep the peer // Wait for handlers to be called return Promise.all([ @@ -178,7 +142,7 @@ describe('registrar', () => { const onConnectDefer = pDefer() const onDisconnectDefer = pDefer() - const topologyProps = { + const topologyProps = new Topology({ multicodecs: multicodec, handlers: { onConnect: () => { @@ -188,7 +152,7 @@ describe('registrar', () => { onDisconnectDefer.resolve() } } - } + }) // Register protocol const identifier = registrar.register(topologyProps) @@ -196,7 +160,6 @@ describe('registrar', () => { // Topology created expect(topology).to.exist() - expect(topology.peers.size).to.eql(0) expect(registrar.connections.size).to.eql(0) // Setup connections before registrar @@ -212,7 +175,6 @@ describe('registrar', () => { peerStore.put(peerInfo) await onConnectDefer.promise - expect(topology.peers.size).to.eql(1) // Remove protocol to peer and update it peerInfo.protocols.delete(multicodec)