From 1e9d90907871570d63109a615a640e43962b9a28 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 19 Jan 2021 15:41:15 +0100 Subject: [PATCH] chore: complete pubsub tests --- package.json | 1 + src/pubsub/tests/api.js | 3 +- src/pubsub/tests/connection-handlers.js | 305 +++++++++++++++++++++++ src/pubsub/tests/emit-self.js | 3 +- src/pubsub/tests/index.js | 2 + src/pubsub/tests/messages.js | 3 +- src/pubsub/tests/multiple-nodes.js | 3 +- src/pubsub/tests/two-nodes.js | 318 ++++++++++-------------- src/pubsub/tests/utils.js | 2 +- 9 files changed, 443 insertions(+), 197 deletions(-) create mode 100644 src/pubsub/tests/connection-handlers.js diff --git a/package.json b/package.json index 8385569..7251faf 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "scripts": { "lint": "aegir lint", "build": "aegir build", + "prepare": "aegir build --no-bundle", "test": "aegir test", "test:node": "aegir test --target node", "test:browser": "aegir test --target browser", diff --git a/src/pubsub/tests/api.js b/src/pubsub/tests/api.js index 2de0cf7..fc0e2a5 100644 --- a/src/pubsub/tests/api.js +++ b/src/pubsub/tests/api.js @@ -1,8 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -const { expect } = chai +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const pDefer = require('p-defer') diff --git a/src/pubsub/tests/connection-handlers.js b/src/pubsub/tests/connection-handlers.js new file mode 100644 index 0000000..747ad9c --- /dev/null +++ b/src/pubsub/tests/connection-handlers.js @@ -0,0 +1,305 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayToString = require('uint8arrays/to-string') + +const { expectSet } = require('./utils') + +module.exports = (common) => { + describe('pubsub connection handlers', () => { + let psA, psB + + describe('nodes send state on connection', () => { + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) + + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + // Start pubsub + psA.start() + psB.start() + }) + + // Make subscriptions prior to nodes connected + before(() => { + psA.subscribe('Za') + psB.subscribe('Zb') + + expect(psA.peers.size).to.equal(0) + expectSet(psA.subscriptions, ['Za']) + expect(psB.peers.size).to.equal(0) + expectSet(psB.subscriptions, ['Zb']) + }) + + after(async () => { + sinon.restore() + await common.teardown() + }) + + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(10e3) + + await Promise.all([ + psA._libp2p.dial(psB.peerId), + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)) + ]) + + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + + expectSet(psA.subscriptions, ['Za']) + expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()]) + + expectSet(psB.subscriptions, ['Zb']) + expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()]) + }) + }) + + describe('pubsub started before connect', () => { + // Create pubsub nodes and start them + beforeEach(async () => { + [psA, psB] = await common.setup(2) + + psA.start() + psB.start() + }) + + afterEach(async () => { + sinon.restore() + + await common.teardown() + }) + + it('should get notified of connected peers on dial', async () => { + const connection = await psA._libp2p.dial(psB.peerId) + expect(connection).to.exist() + + return Promise.all([ + pWaitFor(() => psA.peers.size === 1), + pWaitFor(() => psB.peers.size === 1) + ]) + }) + + it('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + await psA._libp2p.dial(psB.peerId) + + let subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.not.include(topic) + + psA.on(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal(data) + defer.resolve() + }) + psA.subscribe(topic) + + subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for psB to know about psA subscription + await pWaitFor(() => { + const subscribedPeers = psB.getSubscribers(topic) + return subscribedPeers.includes(psA.peerId.toB58String()) + }) + psB.publish(topic, data) + + await defer.promise + }) + }) + + describe('pubsub started after connect', () => { + // Create pubsub nodes + beforeEach(async () => { + [psA, psB] = await common.setup(2) + }) + + afterEach(async () => { + sinon.restore() + + psA && psA.stop() + psB && psB.stop() + + await common.teardown() + }) + + it('should get notified of connected peers after starting', async () => { + const connection = await psA._libp2p.dial(psB.peerId) + expect(connection).to.exist() + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + psA.start() + psB.start() + + return Promise.all([ + pWaitFor(() => psA.peers.size === 1), + pWaitFor(() => psB.peers.size === 1) + ]) + }) + + it('should receive pubsub messages', async () => { + const defer = pDefer() + const topic = 'test-topic' + const data = 'hey!' + + await psA._libp2p.dial(psB.peerId) + + psA.start() + psB.start() + + await Promise.all([ + pWaitFor(() => psA.peers.size === 1), + pWaitFor(() => psB.peers.size === 1) + ]) + + let subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.not.include(topic) + + psA.on(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal(data) + defer.resolve() + }) + psA.subscribe(topic) + + subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for psB to know about psA subscription + await pWaitFor(() => { + const subscribedPeers = psB.getSubscribers(topic) + return subscribedPeers.includes(psA.peerId.toB58String()) + }) + psB.publish(topic, data) + + await defer.promise + }) + }) + + describe('pubsub with intermittent connections', () => { + // Create pubsub nodes and start them + beforeEach(async () => { + [psA, psB] = await common.setup(2) + + psA.start() + psB.start() + }) + + afterEach(async () => { + sinon.restore() + + psA && psA.stop() + psB && psB.stop() + + await common.teardown() + }) + + it('should receive pubsub messages after a node restart', async function () { + this.timeout(10e3) + const topic = 'test-topic' + const data = 'hey!' + const psAid = psA.peerId.toB58String() + + let counter = 0 + const defer1 = pDefer() + const defer2 = pDefer() + + await psA._libp2p.dial(psB.peerId) + + let subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.not.include(topic) + + psA.on(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal(data) + counter++ + counter === 1 ? defer1.resolve() : defer2.resolve() + }) + psA.subscribe(topic) + + subscribedTopics = psA.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for psB to know about psA subscription + await pWaitFor(() => { + const subscribedPeers = psB.getSubscribers(topic) + return subscribedPeers.includes(psAid) + }) + psB.publish(topic, data) + + await defer1.promise + + psB.stop() + await psB._libp2p.stop() + await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId)) + await psB._libp2p.start() + psB.start() + + psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs) + await psA._libp2p.dial(psB.peerId) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = psB.getSubscribers(topic) + return subscribedPeers.includes(psAid) + }) + + psB.publish(topic, data) + + await defer2.promise + }) + + it('should handle quick reconnects with a delayed disconnect', async () => { + // Subscribe on both + const handlerSpy = sinon.spy() + const topic = 'reconnect-channel' + + psA.on(topic, handlerSpy) + psB.on(topic, handlerSpy) + await Promise.all([ + psA.subscribe(topic), + psB.subscribe(topic) + ]) + + // Create two connections to the remote peer + const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId) + // second connection + await psA._libp2p.dialer.connectToPeer(psB.peerId) + expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2) + + // Wait for subscriptions to occur + await pWaitFor(() => { + return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) && + psB.getSubscribers(topic).includes(psA.peerId.toB58String()) + }) + + // Verify messages go both ways + psA.publish(topic, 'message1') + psB.publish(topic, 'message2') + await pWaitFor(() => handlerSpy.callCount >= 2) + expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2']) + + // Disconnect the first connection (this acts as a delayed reconnect) + const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set') + + await originalConnection.close() + await pWaitFor(() => psAConnUpdateSpy.callCount === 1) + + // Verify messages go both ways after the disconnect + handlerSpy.resetHistory() + psA.publish(topic, 'message3') + psB.publish(topic, 'message4') + await pWaitFor(() => handlerSpy.callCount >= 2) + expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4']) + }) + }) + }) +} diff --git a/src/pubsub/tests/emit-self.js b/src/pubsub/tests/emit-self.js index a2a063e..a91e0d3 100644 --- a/src/pubsub/tests/emit-self.js +++ b/src/pubsub/tests/emit-self.js @@ -1,8 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -const { expect } = chai +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const uint8ArrayFromString = require('uint8arrays/from-string') diff --git a/src/pubsub/tests/index.js b/src/pubsub/tests/index.js index 1b1414b..9e42a9c 100644 --- a/src/pubsub/tests/index.js +++ b/src/pubsub/tests/index.js @@ -4,6 +4,7 @@ const apiTest = require('./api') const emitSelfTest = require('./emit-self') const messagesTest = require('./messages') +const connectionHandlersTest = require('./connection-handlers') const twoNodesTest = require('./two-nodes') const multipleNodesTest = require('./multiple-nodes') @@ -12,6 +13,7 @@ module.exports = (common) => { apiTest(common) emitSelfTest(common) messagesTest(common) + connectionHandlersTest(common) twoNodesTest(common) multipleNodesTest(common) }) diff --git a/src/pubsub/tests/messages.js b/src/pubsub/tests/messages.js index f827710..1e3b357 100644 --- a/src/pubsub/tests/messages.js +++ b/src/pubsub/tests/messages.js @@ -1,8 +1,7 @@ /* eslint-env mocha */ 'use strict' -const chai = require('chai') -const { expect } = chai +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const PeerId = require('peer-id') diff --git a/src/pubsub/tests/multiple-nodes.js b/src/pubsub/tests/multiple-nodes.js index 41cc9b4..587ac28 100644 --- a/src/pubsub/tests/multiple-nodes.js +++ b/src/pubsub/tests/multiple-nodes.js @@ -2,8 +2,7 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' -const chai = require('chai') -const { expect } = chai +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const delay = require('delay') diff --git a/src/pubsub/tests/two-nodes.js b/src/pubsub/tests/two-nodes.js index 028e0aa..9c8cb0b 100644 --- a/src/pubsub/tests/two-nodes.js +++ b/src/pubsub/tests/two-nodes.js @@ -2,8 +2,7 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' -const chai = require('chai') -const { expect } = chai +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const pDefer = require('p-defer') @@ -24,205 +23,148 @@ function shouldNotHappen (_) { module.exports = (common) => { describe('pubsub with two nodes', () => { - describe('fresh nodes', () => { - let psA, psB + let psA, psB - // Create pubsub nodes and connect them - before(async () => { - [psA, psB] = await common.setup(2) + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) - // Start pubsub and connect nodes - psA.start() - psB.start() + // Start pubsub and connect nodes + psA.start() + psB.start() - await psA._libp2p.dial(psB.peerId) + await psA._libp2p.dial(psB.peerId) - // Wait for peers to be ready in pubsub - await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1) - }) - - after(async () => { - sinon.restore() - - psA && psA.stop() - psB && psB.stop() - - await common.teardown() - }) - - it('Subscribe to a topic in nodeA', () => { - const defer = pDefer() - - psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { - expectSet(psA.subscriptions, [topic]) - expect(psB.peers.size).to.equal(1) - expectSet(psB.topics.get(topic), [psA.peerId.toB58String()]) - expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) - defer.resolve() - }) - psA.subscribe(topic) - - return defer.promise - }) - - it('Publish to a topic in nodeA', () => { - const defer = pDefer() - - psA.once(topic, (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal('hey') - psB.removeListener(topic, shouldNotHappen) - defer.resolve() - }) - - psB.once(topic, shouldNotHappen) - - psA.publish(topic, uint8ArrayFromString('hey')) - - return defer.promise - }) - - it('Publish to a topic in nodeB', () => { - const defer = pDefer() - - psA.once(topic, (msg) => { - psA.once(topic, shouldNotHappen) - expect(uint8ArrayToString(msg.data)).to.equal('banana') - - setTimeout(() => { - psA.removeListener(topic, shouldNotHappen) - psB.removeListener(topic, shouldNotHappen) - - defer.resolve() - }, 100) - }) - - psB.once(topic, shouldNotHappen) - - psB.publish(topic, uint8ArrayFromString('banana')) - - return defer.promise - }) - - it('Publish 10 msg to a topic in nodeB', () => { - const defer = pDefer() - let counter = 0 - - psB.once(topic, shouldNotHappen) - psA.on(topic, receivedMsg) - - function receivedMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('banana') - expect(msg.from).to.be.eql(psB.peerId.toB58String()) - expect(msg.seqno).to.be.a('Uint8Array') - expect(msg.topicIDs).to.be.eql([topic]) - - if (++counter === 10) { - psA.removeListener(topic, receivedMsg) - psB.removeListener(topic, shouldNotHappen) - - defer.resolve() - } - } - - Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana'))) - - return defer.promise - }) - - it('Unsubscribe from topic in nodeA', () => { - const defer = pDefer() - - psA.unsubscribe(topic) - expect(psA.subscriptions.size).to.equal(0) - - psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { - expect(psB.peers.size).to.equal(1) - expectSet(psB.topics.get(topic), []) - expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) - expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) - - defer.resolve() - }) - - return defer.promise - }) - - it('Publish to a topic:Z in nodeA nodeB', () => { - const defer = pDefer() - - psA.once('Z', shouldNotHappen) - psB.once('Z', shouldNotHappen) - - setTimeout(() => { - psA.removeListener('Z', shouldNotHappen) - psB.removeListener('Z', shouldNotHappen) - defer.resolve() - }, 100) - - psB.publish('Z', uint8ArrayFromString('banana')) - psA.publish('Z', uint8ArrayFromString('banana')) - - return defer.promise - }) + // Wait for peers to be ready in pubsub + await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1) }) - describe('nodes send state on connection', () => { - let psA, psB + after(async () => { + sinon.restore() - // Create pubsub nodes and connect them - before(async () => { - [psA, psB] = await common.setup(2) + psA && psA.stop() + psB && psB.stop() - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) + await common.teardown() + }) - // Start pubsub and connect nodes - psA.start() - psB.start() - }) + it('Subscribe to a topic in nodeA', () => { + const defer = pDefer() - // Make subscriptions prior to nodes connected - before(() => { - psA.subscribe('Za') - psB.subscribe('Zb') - - expect(psA.peers.size).to.equal(0) - expectSet(psA.subscriptions, ['Za']) - expect(psB.peers.size).to.equal(0) - expectSet(psB.subscriptions, ['Zb']) - }) - - after(async () => { - sinon.restore() - - psA && psA.stop() - psB && psB.stop() - - await common.teardown() - }) - - it('existing subscriptions are sent upon peer connection', async function () { - this.timeout(10e3) - - await Promise.all([ - psA._libp2p.dial(psB.peerId), - new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), - new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)) - ]) - - expect(psA.peers.size).to.equal(1) + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expectSet(psA.subscriptions, [topic]) expect(psB.peers.size).to.equal(1) - - expectSet(psA.subscriptions, ['Za']) - expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()]) - - expectSet(psB.subscriptions, ['Zb']) - expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()]) + expectSet(psB.topics.get(topic), [psA.peerId.toB58String()]) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + defer.resolve() }) + psA.subscribe(topic) + + return defer.promise + }) + + it('Publish to a topic in nodeA', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + psB.removeListener(topic, shouldNotHappen) + defer.resolve() + }) + + psB.once(topic, shouldNotHappen) + + psA.publish(topic, uint8ArrayFromString('hey')) + + return defer.promise + }) + + it('Publish to a topic in nodeB', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + psA.once(topic, shouldNotHappen) + expect(uint8ArrayToString(msg.data)).to.equal('banana') + + setTimeout(() => { + psA.removeListener(topic, shouldNotHappen) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + }, 100) + }) + + psB.once(topic, shouldNotHappen) + + psB.publish(topic, uint8ArrayFromString('banana')) + + return defer.promise + }) + + it('Publish 10 msg to a topic in nodeB', () => { + const defer = pDefer() + let counter = 0 + + psB.once(topic, shouldNotHappen) + psA.on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('banana') + expect(msg.from).to.be.eql(psB.peerId.toB58String()) + expect(msg.seqno).to.be.a('Uint8Array') + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + psA.removeListener(topic, receivedMsg) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + } + } + + Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana'))) + + return defer.promise + }) + + it('Unsubscribe from topic in nodeA', () => { + const defer = pDefer() + + psA.unsubscribe(topic) + expect(psA.subscriptions.size).to.equal(0) + + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expect(psB.peers.size).to.equal(1) + expectSet(psB.topics.get(topic), []) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + + defer.resolve() + }) + + return defer.promise + }) + + it('Publish to a topic:Z in nodeA nodeB', () => { + const defer = pDefer() + + psA.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) + + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + defer.resolve() + }, 100) + + psB.publish('Z', uint8ArrayFromString('banana')) + psA.publish('Z', uint8ArrayFromString('banana')) + + return defer.promise }) }) } diff --git a/src/pubsub/tests/utils.js b/src/pubsub/tests/utils.js index 7d16ac4..d32cd18 100644 --- a/src/pubsub/tests/utils.js +++ b/src/pubsub/tests/utils.js @@ -1,6 +1,6 @@ 'use strict' -const { expect } = require('chai') +const { expect } = require('aegir/utils/chai') exports.first = (map) => map.values().next().value