diff --git a/examples/package.json b/examples/package.json index 7b877b97..23b635b8 100644 --- a/examples/package.json +++ b/examples/package.json @@ -12,6 +12,7 @@ "fs-extra": "^8.1.0", "libp2p-pubsub-peer-discovery": "^4.0.0", "libp2p-relay-server": "^0.2.0", + "libp2p-gossipsub": "^0.8.0", "p-defer": "^3.0.0", "which": "^2.0.1" }, diff --git a/examples/pubsub/README.md b/examples/pubsub/README.md index 2016b2c8..9ea93dc0 100644 --- a/examples/pubsub/README.md +++ b/examples/pubsub/README.md @@ -8,6 +8,10 @@ We've seen many interesting use cases appear with this, here are some highlights - [IPFS PubSub (using libp2p-floodsub) for IoT](https://www.youtube.com/watch?v=qLpM5pBDGiE). - [Real Time distributed Applications](https://www.youtube.com/watch?v=vQrbxyDPSXg) +## 0. Set up the example + +Before moving into the examples, you should run `npm install` on the top level `js-libp2p` folder, in order to install all the dependencies needed for this example. In addition, you will need to install the example related dependencies by doing `cd examples && npm install`. Once the install finishes, you should move into the example folder with `cd pubsub`. + ## 1. Setting up a simple PubSub network on top of libp2p For this example, we will use MulticastDNS for automatic Peer Discovery. This example is based the previous examples found in [Discovery Mechanisms](../discovery-mechanisms). You can find the complete version at [1.js](./1.js). diff --git a/test/pubsub/configuration.node.js b/test/configuration/pubsub.spec.js similarity index 58% rename from test/pubsub/configuration.node.js rename to test/configuration/pubsub.spec.js index 998b886e..6f2393d6 100644 --- a/test/pubsub/configuration.node.js +++ b/test/configuration/pubsub.spec.js @@ -3,14 +3,13 @@ const { expect } = require('aegir/utils/chai') const mergeOptions = require('merge-options') -const { Multiaddr } = require('multiaddr') +const pDefer = require('p-defer') +const delay = require('delay') const { create } = require('../../src') -const { baseOptions, subsystemOptions } = require('./utils') +const { baseOptions, pubsubSubsystemOptions } = require('./utils') const peerUtils = require('../utils/creators/peer') -const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') - describe('Pubsub subsystem is configurable', () => { let libp2p @@ -24,18 +23,15 @@ describe('Pubsub subsystem is configurable', () => { }) it('should exist if the module is provided', async () => { - libp2p = await create(subsystemOptions) + libp2p = await create(pubsubSubsystemOptions) expect(libp2p.pubsub).to.exist() }) it('should start and stop by default once libp2p starts', async () => { const [peerId] = await peerUtils.createPeerId() - const customOptions = mergeOptions(subsystemOptions, { - peerId, - addresses: { - listen: [listenAddr] - } + const customOptions = mergeOptions(pubsubSubsystemOptions, { + peerId }) libp2p = await create(customOptions) @@ -51,11 +47,8 @@ describe('Pubsub subsystem is configurable', () => { it('should not start if disabled once libp2p starts', async () => { const [peerId] = await peerUtils.createPeerId() - const customOptions = mergeOptions(subsystemOptions, { + const customOptions = mergeOptions(pubsubSubsystemOptions, { peerId, - addresses: { - listen: [listenAddr] - }, config: { pubsub: { enabled: false @@ -73,11 +66,8 @@ describe('Pubsub subsystem is configurable', () => { it('should allow a manual start', async () => { const [peerId] = await peerUtils.createPeerId() - const customOptions = mergeOptions(subsystemOptions, { + const customOptions = mergeOptions(pubsubSubsystemOptions, { peerId, - addresses: { - listen: [listenAddr] - }, config: { pubsub: { enabled: false @@ -93,3 +83,43 @@ describe('Pubsub subsystem is configurable', () => { expect(libp2p.pubsub.started).to.equal(true) }) }) + +describe('Pubsub subscription handlers adapter', () => { + let libp2p + + beforeEach(async () => { + const [peerId] = await peerUtils.createPeerId() + + libp2p = await create(mergeOptions(pubsubSubsystemOptions, { + peerId + })) + + await libp2p.start() + }) + + it('extends pubsub with subscribe handler', async () => { + let countMessages = 0 + const topic = 'topic' + const defer = pDefer() + + const handler = () => { + countMessages++ + if (countMessages > 1) { + throw new Error('only one message should be received') + } + + defer.resolve() + } + + await libp2p.pubsub.subscribe(topic, handler) + + libp2p.pubsub.emit(topic, 'useless-data') + await defer.promise + + await libp2p.pubsub.unsubscribe(topic, handler) + libp2p.pubsub.emit(topic, 'useless-data') + + // wait to guarantee that the handler is not called twice + await delay(100) + }) +}) diff --git a/test/configuration/utils.js b/test/configuration/utils.js new file mode 100644 index 00000000..c1a7aa5c --- /dev/null +++ b/test/configuration/utils.js @@ -0,0 +1,52 @@ +'use strict' + +const Pubsub = require('libp2p-interfaces/src/pubsub') +const { NOISE: Crypto } = require('libp2p-noise') +const Muxer = require('libp2p-mplex') +const Transport = require('libp2p-websockets') +const filters = require('libp2p-websockets/src/filters') +const transportKey = Transport.prototype[Symbol.toStringTag] + +const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') +const relayAddr = MULTIADDRS_WEBSOCKETS[0] + +const mergeOptions = require('merge-options') + +const baseOptions = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +} + +module.exports.baseOptions = baseOptions + +class MockPubsub extends Pubsub { + constructor (libp2p, options = {}) { + super({ + debugName: 'mock-pubsub', + multicodecs: '/mock-pubsub', + libp2p, + ...options + }) + } +} + +const pubsubSubsystemOptions = mergeOptions(baseOptions, { + modules: { + pubsub: MockPubsub + }, + addresses: { + listen: [`${relayAddr}/p2p-circuit`] + }, + config: { + transport: { + [transportKey]: { + filter: filters.all + } + } + } +}) + +module.exports.pubsubSubsystemOptions = pubsubSubsystemOptions diff --git a/test/pubsub/implementations.node.js b/test/pubsub/implementations.node.js deleted file mode 100644 index 165df61b..00000000 --- a/test/pubsub/implementations.node.js +++ /dev/null @@ -1,95 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const { expect } = require('aegir/utils/chai') -const pWaitFor = require('p-wait-for') -const pDefer = require('p-defer') -const mergeOptions = require('merge-options') - -const Floodsub = require('libp2p-floodsub') -const Gossipsub = require('libp2p-gossipsub') -const { multicodec: floodsubMulticodec } = require('libp2p-floodsub') -const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub') -const uint8ArrayToString = require('uint8arrays/to-string') - -const { Multiaddr } = require('multiaddr') - -const { create } = require('../../src') -const { baseOptions } = require('./utils') -const peerUtils = require('../utils/creators/peer') - -const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') -const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') - -describe('Pubsub subsystem is able to use different implementations', () => { - let peerId, remotePeerId - let libp2p, remoteLibp2p - - beforeEach(async () => { - [peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 }) - }) - - afterEach(() => Promise.all([ - libp2p && libp2p.stop(), - remoteLibp2p && remoteLibp2p.stop() - ])) - - it('Floodsub nodes', () => { - return pubsubTest(floodsubMulticodec, Floodsub) - }) - - it('Gossipsub nodes', () => { - return pubsubTest(gossipsubMulticodec, Gossipsub) - }) - - const pubsubTest = async (multicodec, pubsub) => { - const defer = pDefer() - const topic = 'test-topic' - const data = 'hey!' - - libp2p = await create(mergeOptions(baseOptions, { - peerId, - addresses: { - listen: [listenAddr] - }, - modules: { - pubsub: pubsub - } - })) - - remoteLibp2p = await create(mergeOptions(baseOptions, { - peerId: remotePeerId, - addresses: { - listen: [remoteListenAddr] - }, - modules: { - pubsub: pubsub - } - })) - - await Promise.all([ - libp2p.start(), - remoteLibp2p.start() - ]) - - const libp2pId = libp2p.peerId.toB58String() - libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) - - const connection = await libp2p.dialProtocol(remotePeerId, multicodec) - expect(connection).to.exist() - - libp2p.pubsub.subscribe(topic, (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal(data) - defer.resolve() - }) - - // wait for remoteLibp2p to know about libp2p subscription - await pWaitFor(() => { - const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) - return subscribedPeers.includes(libp2pId) - }) - - remoteLibp2p.pubsub.publish(topic, data) - await defer.promise - } -}) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js deleted file mode 100644 index 21034383..00000000 --- a/test/pubsub/operation.node.js +++ /dev/null @@ -1,326 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const { expect } = require('aegir/utils/chai') -const sinon = require('sinon') - -const pWaitFor = require('p-wait-for') -const pDefer = require('p-defer') -const mergeOptions = require('merge-options') -const { Multiaddr } = require('multiaddr') -const uint8ArrayToString = require('uint8arrays/to-string') - -const { create } = require('../../src') -const { subsystemOptions, subsystemMulticodecs } = require('./utils') -const peerUtils = require('../utils/creators/peer') - -const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') -const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0') - -describe('Pubsub subsystem operates correctly', () => { - let peerId, remotePeerId - let libp2p, remoteLibp2p - - beforeEach(async () => { - [peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 }) - }) - - describe('pubsub started before connect', () => { - beforeEach(async () => { - libp2p = await create(mergeOptions(subsystemOptions, { - peerId, - addresses: { - listen: [listenAddr] - } - })) - - remoteLibp2p = await create(mergeOptions(subsystemOptions, { - peerId: remotePeerId, - addresses: { - listen: [remoteListenAddr] - } - })) - - await Promise.all([ - libp2p.start(), - remoteLibp2p.start() - ]) - - libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) - }) - - afterEach(() => Promise.all([ - libp2p && libp2p.stop(), - remoteLibp2p && remoteLibp2p.stop() - ])) - - afterEach(() => { - sinon.restore() - }) - - it('should get notified of connected peers on dial', async () => { - const connection = await libp2p.dialProtocol(remotePeerId, subsystemMulticodecs) - - expect(connection).to.exist() - - return Promise.all([ - pWaitFor(() => libp2p.pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) - ]) - }) - - it('should receive pubsub messages', async () => { - const defer = pDefer() - const topic = 'test-topic' - const data = 'hey!' - const libp2pId = libp2p.peerId.toB58String() - - await libp2p.dialProtocol(remotePeerId, subsystemMulticodecs) - - let subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.not.include(topic) - - libp2p.pubsub.subscribe(topic, (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal(data) - defer.resolve() - }) - - subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.include(topic) - - // wait for remoteLibp2p to know about libp2p subscription - await pWaitFor(() => { - const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) - return subscribedPeers.includes(libp2pId) - }) - remoteLibp2p.pubsub.publish(topic, data) - - await defer.promise - }) - }) - - describe('pubsub started after connect', () => { - beforeEach(async () => { - libp2p = await create(mergeOptions(subsystemOptions, { - peerId, - addresses: { - listen: [listenAddr] - } - })) - - remoteLibp2p = await create(mergeOptions(subsystemOptions, { - peerId: remotePeerId, - addresses: { - listen: [remoteListenAddr] - }, - config: { - pubsub: { - enabled: false - } - } - })) - - await libp2p.start() - await remoteLibp2p.start() - - libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) - }) - - afterEach(() => Promise.all([ - libp2p && libp2p.stop(), - remoteLibp2p && remoteLibp2p.stop() - ])) - - afterEach(() => { - sinon.restore() - }) - - it('should get notified of connected peers after starting', async () => { - const connection = await libp2p.dial(remotePeerId) - - expect(connection).to.exist() - expect(libp2p.pubsub.peers.size).to.be.eql(0) - expect(remoteLibp2p.pubsub.peers.size).to.be.eql(0) - - remoteLibp2p.pubsub.start() - - return Promise.all([ - pWaitFor(() => libp2p.pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) - ]) - }) - - it('should receive pubsub messages', async function () { - this.timeout(10e3) - const defer = pDefer() - const libp2pId = libp2p.peerId.toB58String() - const topic = 'test-topic' - const data = 'hey!' - - await libp2p.dial(remotePeerId) - - remoteLibp2p.pubsub.start() - - await Promise.all([ - pWaitFor(() => libp2p.pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) - ]) - - let subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.not.include(topic) - - libp2p.pubsub.subscribe(topic, (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal(data) - defer.resolve() - }) - - subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.include(topic) - - // wait for remoteLibp2p to know about libp2p subscription - await pWaitFor(() => { - const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) - return subscribedPeers.includes(libp2pId) - }) - - remoteLibp2p.pubsub.publish(topic, data) - - await defer.promise - }) - }) - - describe('pubsub with intermittent connections', () => { - beforeEach(async () => { - libp2p = await create(mergeOptions(subsystemOptions, { - peerId, - addresses: { - listen: [listenAddr] - }, - config: { - pubsub: { - enabled: true, - emitSelf: false - } - } - })) - - remoteLibp2p = await create(mergeOptions(subsystemOptions, { - peerId: remotePeerId, - addresses: { - listen: [remoteListenAddr] - }, - config: { - pubsub: { - enabled: true, - emitSelf: false - } - } - })) - - await libp2p.start() - await remoteLibp2p.start() - - libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) - }) - - afterEach(() => Promise.all([ - libp2p && libp2p.stop(), - remoteLibp2p && remoteLibp2p.stop() - ])) - - afterEach(() => { - sinon.restore() - }) - - it('should receive pubsub messages after a node restart', async () => { - const topic = 'test-topic' - const data = 'hey!' - const libp2pId = libp2p.peerId.toB58String() - - let counter = 0 - const defer1 = pDefer() - const defer2 = pDefer() - const handler = (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal(data) - counter++ - counter === 1 ? defer1.resolve() : defer2.resolve() - } - - await libp2p.dial(remotePeerId) - - let subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.not.include(topic) - - libp2p.pubsub.subscribe(topic, handler) - - subscribedTopics = libp2p.pubsub.getTopics() - expect(subscribedTopics).to.include(topic) - - // wait for remoteLibp2p to know about libp2p subscription - await pWaitFor(() => { - const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) - return subscribedPeers.includes(libp2pId) - }) - remoteLibp2p.pubsub.publish(topic, data) - - await defer1.promise - - await remoteLibp2p.stop() - await remoteLibp2p.start() - - libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) - await libp2p.dial(remotePeerId) - - // wait for remoteLibp2p to know about libp2p subscription - await pWaitFor(() => { - const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) - return subscribedPeers.includes(libp2pId) - }) - - remoteLibp2p.pubsub.publish(topic, data) - - await defer2.promise - }) - - it('should handle quick reconnects with a delayed disconnect', async () => { - // Subscribe on both - const handlerSpy = sinon.spy() - const topic = 'reconnect-channel' - await Promise.all([ - libp2p.pubsub.subscribe(topic, handlerSpy), - remoteLibp2p.pubsub.subscribe(topic, handlerSpy) - ]) - // Create two connections to the remote peer - const originalConnection = await libp2p.dialer.connectToPeer(remoteLibp2p.peerId) - // second connection - await libp2p.dialer.connectToPeer(remoteLibp2p.peerId) - expect(libp2p.connections.get(remoteLibp2p.peerId.toB58String())).to.have.length(2) - - // Wait for subscriptions to occur - await pWaitFor(() => { - return libp2p.pubsub.getSubscribers(topic).includes(remoteLibp2p.peerId.toB58String()) && - remoteLibp2p.pubsub.getSubscribers(topic).includes(libp2p.peerId.toB58String()) - }) - - // Verify messages go both ways - libp2p.pubsub.publish(topic, 'message1') - remoteLibp2p.pubsub.publish(topic, 'message2') - await pWaitFor(() => handlerSpy.callCount === 2) - expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2']) - - // Disconnect the first connection (this acts as a delayed reconnect) - const libp2pConnUpdateSpy = sinon.spy(libp2p.connectionManager.connections, 'set') - const remoteLibp2pConnUpdateSpy = sinon.spy(remoteLibp2p.connectionManager.connections, 'set') - - await originalConnection.close() - await pWaitFor(() => libp2pConnUpdateSpy.callCount === 1 && remoteLibp2pConnUpdateSpy.callCount === 1) - - // Verify messages go both ways after the disconnect - handlerSpy.resetHistory() - libp2p.pubsub.publish(topic, 'message3') - remoteLibp2p.pubsub.publish(topic, 'message4') - await pWaitFor(() => handlerSpy.callCount === 2) - expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4']) - }) - }) -}) diff --git a/test/pubsub/utils.js b/test/pubsub/utils.js deleted file mode 100644 index 11495c5d..00000000 --- a/test/pubsub/utils.js +++ /dev/null @@ -1,29 +0,0 @@ -'use strict' - -const Gossipsub = require('libp2p-gossipsub') -const { multicodec } = require('libp2p-gossipsub') -const Crypto = require('../../src/insecure/plaintext') -const Muxer = require('libp2p-mplex') -const Transport = require('libp2p-tcp') - -const mergeOptions = require('merge-options') - -const baseOptions = { - modules: { - transport: [Transport], - streamMuxer: [Muxer], - connEncryption: [Crypto] - } -} - -module.exports.baseOptions = baseOptions - -const subsystemOptions = mergeOptions(baseOptions, { - modules: { - pubsub: Gossipsub - } -}) - -module.exports.subsystemOptions = subsystemOptions - -module.exports.subsystemMulticodecs = [multicodec]