chore: restructure pubsub tests

This commit is contained in:
Vasco Santos 2021-01-20 10:34:47 +01:00 committed by Vasco Santos
parent 2a6a635f13
commit 2c4b567b00
7 changed files with 105 additions and 468 deletions

View File

@ -12,6 +12,7 @@
"fs-extra": "^8.1.0",
"libp2p-pubsub-peer-discovery": "^4.0.0",
"libp2p-relay-server": "^0.2.0",
"libp2p-gossipsub": "^0.8.0",
"p-defer": "^3.0.0",
"which": "^2.0.1"
},

View File

@ -8,6 +8,10 @@ We've seen many interesting use cases appear with this, here are some highlights
- [IPFS PubSub (using libp2p-floodsub) for IoT](https://www.youtube.com/watch?v=qLpM5pBDGiE).
- [Real Time distributed Applications](https://www.youtube.com/watch?v=vQrbxyDPSXg)
## 0. Set up the example
Before moving into the examples, you should run `npm install` on the top level `js-libp2p` folder, in order to install all the dependencies needed for this example. In addition, you will need to install the example related dependencies by doing `cd examples && npm install`. Once the install finishes, you should move into the example folder with `cd pubsub`.
## 1. Setting up a simple PubSub network on top of libp2p
For this example, we will use MulticastDNS for automatic Peer Discovery. This example is based the previous examples found in [Discovery Mechanisms](../discovery-mechanisms). You can find the complete version at [1.js](./1.js).

View File

@ -3,14 +3,13 @@
const { expect } = require('aegir/utils/chai')
const mergeOptions = require('merge-options')
const { Multiaddr } = require('multiaddr')
const pDefer = require('p-defer')
const delay = require('delay')
const { create } = require('../../src')
const { baseOptions, subsystemOptions } = require('./utils')
const { baseOptions, pubsubSubsystemOptions } = require('./utils')
const peerUtils = require('../utils/creators/peer')
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
describe('Pubsub subsystem is configurable', () => {
let libp2p
@ -24,18 +23,15 @@ describe('Pubsub subsystem is configurable', () => {
})
it('should exist if the module is provided', async () => {
libp2p = await create(subsystemOptions)
libp2p = await create(pubsubSubsystemOptions)
expect(libp2p.pubsub).to.exist()
})
it('should start and stop by default once libp2p starts', async () => {
const [peerId] = await peerUtils.createPeerId()
const customOptions = mergeOptions(subsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
}
const customOptions = mergeOptions(pubsubSubsystemOptions, {
peerId
})
libp2p = await create(customOptions)
@ -51,11 +47,8 @@ describe('Pubsub subsystem is configurable', () => {
it('should not start if disabled once libp2p starts', async () => {
const [peerId] = await peerUtils.createPeerId()
const customOptions = mergeOptions(subsystemOptions, {
const customOptions = mergeOptions(pubsubSubsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
},
config: {
pubsub: {
enabled: false
@ -73,11 +66,8 @@ describe('Pubsub subsystem is configurable', () => {
it('should allow a manual start', async () => {
const [peerId] = await peerUtils.createPeerId()
const customOptions = mergeOptions(subsystemOptions, {
const customOptions = mergeOptions(pubsubSubsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
},
config: {
pubsub: {
enabled: false
@ -93,3 +83,43 @@ describe('Pubsub subsystem is configurable', () => {
expect(libp2p.pubsub.started).to.equal(true)
})
})
describe('Pubsub subscription handlers adapter', () => {
let libp2p
beforeEach(async () => {
const [peerId] = await peerUtils.createPeerId()
libp2p = await create(mergeOptions(pubsubSubsystemOptions, {
peerId
}))
await libp2p.start()
})
it('extends pubsub with subscribe handler', async () => {
let countMessages = 0
const topic = 'topic'
const defer = pDefer()
const handler = () => {
countMessages++
if (countMessages > 1) {
throw new Error('only one message should be received')
}
defer.resolve()
}
await libp2p.pubsub.subscribe(topic, handler)
libp2p.pubsub.emit(topic, 'useless-data')
await defer.promise
await libp2p.pubsub.unsubscribe(topic, handler)
libp2p.pubsub.emit(topic, 'useless-data')
// wait to guarantee that the handler is not called twice
await delay(100)
})
})

View File

@ -0,0 +1,52 @@
'use strict'
const Pubsub = require('libp2p-interfaces/src/pubsub')
const { NOISE: Crypto } = require('libp2p-noise')
const Muxer = require('libp2p-mplex')
const Transport = require('libp2p-websockets')
const filters = require('libp2p-websockets/src/filters')
const transportKey = Transport.prototype[Symbol.toStringTag]
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
const relayAddr = MULTIADDRS_WEBSOCKETS[0]
const mergeOptions = require('merge-options')
const baseOptions = {
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
}
module.exports.baseOptions = baseOptions
class MockPubsub extends Pubsub {
constructor (libp2p, options = {}) {
super({
debugName: 'mock-pubsub',
multicodecs: '/mock-pubsub',
libp2p,
...options
})
}
}
const pubsubSubsystemOptions = mergeOptions(baseOptions, {
modules: {
pubsub: MockPubsub
},
addresses: {
listen: [`${relayAddr}/p2p-circuit`]
},
config: {
transport: {
[transportKey]: {
filter: filters.all
}
}
}
})
module.exports.pubsubSubsystemOptions = pubsubSubsystemOptions

View File

@ -1,95 +0,0 @@
'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const pWaitFor = require('p-wait-for')
const pDefer = require('p-defer')
const mergeOptions = require('merge-options')
const Floodsub = require('libp2p-floodsub')
const Gossipsub = require('libp2p-gossipsub')
const { multicodec: floodsubMulticodec } = require('libp2p-floodsub')
const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub')
const uint8ArrayToString = require('uint8arrays/to-string')
const { Multiaddr } = require('multiaddr')
const { create } = require('../../src')
const { baseOptions } = require('./utils')
const peerUtils = require('../utils/creators/peer')
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
describe('Pubsub subsystem is able to use different implementations', () => {
let peerId, remotePeerId
let libp2p, remoteLibp2p
beforeEach(async () => {
[peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 })
})
afterEach(() => Promise.all([
libp2p && libp2p.stop(),
remoteLibp2p && remoteLibp2p.stop()
]))
it('Floodsub nodes', () => {
return pubsubTest(floodsubMulticodec, Floodsub)
})
it('Gossipsub nodes', () => {
return pubsubTest(gossipsubMulticodec, Gossipsub)
})
const pubsubTest = async (multicodec, pubsub) => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
libp2p = await create(mergeOptions(baseOptions, {
peerId,
addresses: {
listen: [listenAddr]
},
modules: {
pubsub: pubsub
}
}))
remoteLibp2p = await create(mergeOptions(baseOptions, {
peerId: remotePeerId,
addresses: {
listen: [remoteListenAddr]
},
modules: {
pubsub: pubsub
}
}))
await Promise.all([
libp2p.start(),
remoteLibp2p.start()
])
const libp2pId = libp2p.peerId.toB58String()
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
const connection = await libp2p.dialProtocol(remotePeerId, multicodec)
expect(connection).to.exist()
libp2p.pubsub.subscribe(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer.promise
}
})

View File

@ -1,326 +0,0 @@
'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pWaitFor = require('p-wait-for')
const pDefer = require('p-defer')
const mergeOptions = require('merge-options')
const { Multiaddr } = require('multiaddr')
const uint8ArrayToString = require('uint8arrays/to-string')
const { create } = require('../../src')
const { subsystemOptions, subsystemMulticodecs } = require('./utils')
const peerUtils = require('../utils/creators/peer')
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
describe('Pubsub subsystem operates correctly', () => {
let peerId, remotePeerId
let libp2p, remoteLibp2p
beforeEach(async () => {
[peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 })
})
describe('pubsub started before connect', () => {
beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
}
}))
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
peerId: remotePeerId,
addresses: {
listen: [remoteListenAddr]
}
}))
await Promise.all([
libp2p.start(),
remoteLibp2p.start()
])
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
})
afterEach(() => Promise.all([
libp2p && libp2p.stop(),
remoteLibp2p && remoteLibp2p.stop()
]))
afterEach(() => {
sinon.restore()
})
it('should get notified of connected peers on dial', async () => {
const connection = await libp2p.dialProtocol(remotePeerId, subsystemMulticodecs)
expect(connection).to.exist()
return Promise.all([
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
})
it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
const libp2pId = libp2p.peerId.toB58String()
await libp2p.dialProtocol(remotePeerId, subsystemMulticodecs)
let subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.not.include(topic)
libp2p.pubsub.subscribe(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer.promise
})
})
describe('pubsub started after connect', () => {
beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
}
}))
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
peerId: remotePeerId,
addresses: {
listen: [remoteListenAddr]
},
config: {
pubsub: {
enabled: false
}
}
}))
await libp2p.start()
await remoteLibp2p.start()
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
})
afterEach(() => Promise.all([
libp2p && libp2p.stop(),
remoteLibp2p && remoteLibp2p.stop()
]))
afterEach(() => {
sinon.restore()
})
it('should get notified of connected peers after starting', async () => {
const connection = await libp2p.dial(remotePeerId)
expect(connection).to.exist()
expect(libp2p.pubsub.peers.size).to.be.eql(0)
expect(remoteLibp2p.pubsub.peers.size).to.be.eql(0)
remoteLibp2p.pubsub.start()
return Promise.all([
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
})
it('should receive pubsub messages', async function () {
this.timeout(10e3)
const defer = pDefer()
const libp2pId = libp2p.peerId.toB58String()
const topic = 'test-topic'
const data = 'hey!'
await libp2p.dial(remotePeerId)
remoteLibp2p.pubsub.start()
await Promise.all([
pWaitFor(() => libp2p.pubsub.peers.size === 1),
pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1)
])
let subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.not.include(topic)
libp2p.pubsub.subscribe(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer.promise
})
})
describe('pubsub with intermittent connections', () => {
beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, {
peerId,
addresses: {
listen: [listenAddr]
},
config: {
pubsub: {
enabled: true,
emitSelf: false
}
}
}))
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
peerId: remotePeerId,
addresses: {
listen: [remoteListenAddr]
},
config: {
pubsub: {
enabled: true,
emitSelf: false
}
}
}))
await libp2p.start()
await remoteLibp2p.start()
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
})
afterEach(() => Promise.all([
libp2p && libp2p.stop(),
remoteLibp2p && remoteLibp2p.stop()
]))
afterEach(() => {
sinon.restore()
})
it('should receive pubsub messages after a node restart', async () => {
const topic = 'test-topic'
const data = 'hey!'
const libp2pId = libp2p.peerId.toB58String()
let counter = 0
const defer1 = pDefer()
const defer2 = pDefer()
const handler = (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
counter++
counter === 1 ? defer1.resolve() : defer2.resolve()
}
await libp2p.dial(remotePeerId)
let subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.not.include(topic)
libp2p.pubsub.subscribe(topic, handler)
subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer1.promise
await remoteLibp2p.stop()
await remoteLibp2p.start()
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
await libp2p.dial(remotePeerId)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer2.promise
})
it('should handle quick reconnects with a delayed disconnect', async () => {
// Subscribe on both
const handlerSpy = sinon.spy()
const topic = 'reconnect-channel'
await Promise.all([
libp2p.pubsub.subscribe(topic, handlerSpy),
remoteLibp2p.pubsub.subscribe(topic, handlerSpy)
])
// Create two connections to the remote peer
const originalConnection = await libp2p.dialer.connectToPeer(remoteLibp2p.peerId)
// second connection
await libp2p.dialer.connectToPeer(remoteLibp2p.peerId)
expect(libp2p.connections.get(remoteLibp2p.peerId.toB58String())).to.have.length(2)
// Wait for subscriptions to occur
await pWaitFor(() => {
return libp2p.pubsub.getSubscribers(topic).includes(remoteLibp2p.peerId.toB58String()) &&
remoteLibp2p.pubsub.getSubscribers(topic).includes(libp2p.peerId.toB58String())
})
// Verify messages go both ways
libp2p.pubsub.publish(topic, 'message1')
remoteLibp2p.pubsub.publish(topic, 'message2')
await pWaitFor(() => handlerSpy.callCount === 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
// Disconnect the first connection (this acts as a delayed reconnect)
const libp2pConnUpdateSpy = sinon.spy(libp2p.connectionManager.connections, 'set')
const remoteLibp2pConnUpdateSpy = sinon.spy(remoteLibp2p.connectionManager.connections, 'set')
await originalConnection.close()
await pWaitFor(() => libp2pConnUpdateSpy.callCount === 1 && remoteLibp2pConnUpdateSpy.callCount === 1)
// Verify messages go both ways after the disconnect
handlerSpy.resetHistory()
libp2p.pubsub.publish(topic, 'message3')
remoteLibp2p.pubsub.publish(topic, 'message4')
await pWaitFor(() => handlerSpy.callCount === 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
})
})
})

View File

@ -1,29 +0,0 @@
'use strict'
const Gossipsub = require('libp2p-gossipsub')
const { multicodec } = require('libp2p-gossipsub')
const Crypto = require('../../src/insecure/plaintext')
const Muxer = require('libp2p-mplex')
const Transport = require('libp2p-tcp')
const mergeOptions = require('merge-options')
const baseOptions = {
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
}
module.exports.baseOptions = baseOptions
const subsystemOptions = mergeOptions(baseOptions, {
modules: {
pubsub: Gossipsub
}
})
module.exports.subsystemOptions = subsystemOptions
module.exports.subsystemMulticodecs = [multicodec]