mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-04-24 17:52:21 +00:00
chore: complete pubsub tests
This commit is contained in:
parent
91dba97125
commit
1e9d909078
@ -24,6 +24,7 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"lint": "aegir lint",
|
"lint": "aegir lint",
|
||||||
"build": "aegir build",
|
"build": "aegir build",
|
||||||
|
"prepare": "aegir build --no-bundle",
|
||||||
"test": "aegir test",
|
"test": "aegir test",
|
||||||
"test:node": "aegir test --target node",
|
"test:node": "aegir test --target node",
|
||||||
"test:browser": "aegir test --target browser",
|
"test:browser": "aegir test --target browser",
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
/* eslint-env mocha */
|
/* eslint-env mocha */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const chai = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
|
305
src/pubsub/tests/connection-handlers.js
Normal file
305
src/pubsub/tests/connection-handlers.js
Normal file
@ -0,0 +1,305 @@
|
|||||||
|
/* eslint-env mocha */
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { expect } = require('aegir/utils/chai')
|
||||||
|
const sinon = require('sinon')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
|
const uint8ArrayToString = require('uint8arrays/to-string')
|
||||||
|
|
||||||
|
const { expectSet } = require('./utils')
|
||||||
|
|
||||||
|
module.exports = (common) => {
|
||||||
|
describe('pubsub connection handlers', () => {
|
||||||
|
let psA, psB
|
||||||
|
|
||||||
|
describe('nodes send state on connection', () => {
|
||||||
|
// Create pubsub nodes and connect them
|
||||||
|
before(async () => {
|
||||||
|
[psA, psB] = await common.setup(2)
|
||||||
|
|
||||||
|
expect(psA.peers.size).to.be.eql(0)
|
||||||
|
expect(psB.peers.size).to.be.eql(0)
|
||||||
|
|
||||||
|
// Start pubsub
|
||||||
|
psA.start()
|
||||||
|
psB.start()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Make subscriptions prior to nodes connected
|
||||||
|
before(() => {
|
||||||
|
psA.subscribe('Za')
|
||||||
|
psB.subscribe('Zb')
|
||||||
|
|
||||||
|
expect(psA.peers.size).to.equal(0)
|
||||||
|
expectSet(psA.subscriptions, ['Za'])
|
||||||
|
expect(psB.peers.size).to.equal(0)
|
||||||
|
expectSet(psB.subscriptions, ['Zb'])
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
await common.teardown()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('existing subscriptions are sent upon peer connection', async function () {
|
||||||
|
this.timeout(10e3)
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
psA._libp2p.dial(psB.peerId),
|
||||||
|
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
|
||||||
|
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(psA.peers.size).to.equal(1)
|
||||||
|
expect(psB.peers.size).to.equal(1)
|
||||||
|
|
||||||
|
expectSet(psA.subscriptions, ['Za'])
|
||||||
|
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
|
||||||
|
|
||||||
|
expectSet(psB.subscriptions, ['Zb'])
|
||||||
|
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('pubsub started before connect', () => {
|
||||||
|
// Create pubsub nodes and start them
|
||||||
|
beforeEach(async () => {
|
||||||
|
[psA, psB] = await common.setup(2)
|
||||||
|
|
||||||
|
psA.start()
|
||||||
|
psB.start()
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
|
||||||
|
await common.teardown()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should get notified of connected peers on dial', async () => {
|
||||||
|
const connection = await psA._libp2p.dial(psB.peerId)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
|
||||||
|
return Promise.all([
|
||||||
|
pWaitFor(() => psA.peers.size === 1),
|
||||||
|
pWaitFor(() => psB.peers.size === 1)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should receive pubsub messages', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
|
||||||
|
await psA._libp2p.dial(psB.peerId)
|
||||||
|
|
||||||
|
let subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.not.include(topic)
|
||||||
|
|
||||||
|
psA.on(topic, (msg) => {
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal(data)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
psA.subscribe(topic)
|
||||||
|
|
||||||
|
subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.include(topic)
|
||||||
|
|
||||||
|
// wait for psB to know about psA subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = psB.getSubscribers(topic)
|
||||||
|
return subscribedPeers.includes(psA.peerId.toB58String())
|
||||||
|
})
|
||||||
|
psB.publish(topic, data)
|
||||||
|
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('pubsub started after connect', () => {
|
||||||
|
// Create pubsub nodes
|
||||||
|
beforeEach(async () => {
|
||||||
|
[psA, psB] = await common.setup(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
|
||||||
|
psA && psA.stop()
|
||||||
|
psB && psB.stop()
|
||||||
|
|
||||||
|
await common.teardown()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should get notified of connected peers after starting', async () => {
|
||||||
|
const connection = await psA._libp2p.dial(psB.peerId)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
expect(psA.peers.size).to.be.eql(0)
|
||||||
|
expect(psB.peers.size).to.be.eql(0)
|
||||||
|
|
||||||
|
psA.start()
|
||||||
|
psB.start()
|
||||||
|
|
||||||
|
return Promise.all([
|
||||||
|
pWaitFor(() => psA.peers.size === 1),
|
||||||
|
pWaitFor(() => psB.peers.size === 1)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should receive pubsub messages', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
|
||||||
|
await psA._libp2p.dial(psB.peerId)
|
||||||
|
|
||||||
|
psA.start()
|
||||||
|
psB.start()
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => psA.peers.size === 1),
|
||||||
|
pWaitFor(() => psB.peers.size === 1)
|
||||||
|
])
|
||||||
|
|
||||||
|
let subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.not.include(topic)
|
||||||
|
|
||||||
|
psA.on(topic, (msg) => {
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal(data)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
psA.subscribe(topic)
|
||||||
|
|
||||||
|
subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.include(topic)
|
||||||
|
|
||||||
|
// wait for psB to know about psA subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = psB.getSubscribers(topic)
|
||||||
|
return subscribedPeers.includes(psA.peerId.toB58String())
|
||||||
|
})
|
||||||
|
psB.publish(topic, data)
|
||||||
|
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('pubsub with intermittent connections', () => {
|
||||||
|
// Create pubsub nodes and start them
|
||||||
|
beforeEach(async () => {
|
||||||
|
[psA, psB] = await common.setup(2)
|
||||||
|
|
||||||
|
psA.start()
|
||||||
|
psB.start()
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
|
||||||
|
psA && psA.stop()
|
||||||
|
psB && psB.stop()
|
||||||
|
|
||||||
|
await common.teardown()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should receive pubsub messages after a node restart', async function () {
|
||||||
|
this.timeout(10e3)
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
const psAid = psA.peerId.toB58String()
|
||||||
|
|
||||||
|
let counter = 0
|
||||||
|
const defer1 = pDefer()
|
||||||
|
const defer2 = pDefer()
|
||||||
|
|
||||||
|
await psA._libp2p.dial(psB.peerId)
|
||||||
|
|
||||||
|
let subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.not.include(topic)
|
||||||
|
|
||||||
|
psA.on(topic, (msg) => {
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal(data)
|
||||||
|
counter++
|
||||||
|
counter === 1 ? defer1.resolve() : defer2.resolve()
|
||||||
|
})
|
||||||
|
psA.subscribe(topic)
|
||||||
|
|
||||||
|
subscribedTopics = psA.getTopics()
|
||||||
|
expect(subscribedTopics).to.include(topic)
|
||||||
|
|
||||||
|
// wait for psB to know about psA subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = psB.getSubscribers(topic)
|
||||||
|
return subscribedPeers.includes(psAid)
|
||||||
|
})
|
||||||
|
psB.publish(topic, data)
|
||||||
|
|
||||||
|
await defer1.promise
|
||||||
|
|
||||||
|
psB.stop()
|
||||||
|
await psB._libp2p.stop()
|
||||||
|
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
|
||||||
|
await psB._libp2p.start()
|
||||||
|
psB.start()
|
||||||
|
|
||||||
|
psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
|
||||||
|
await psA._libp2p.dial(psB.peerId)
|
||||||
|
|
||||||
|
// wait for remoteLibp2p to know about libp2p subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = psB.getSubscribers(topic)
|
||||||
|
return subscribedPeers.includes(psAid)
|
||||||
|
})
|
||||||
|
|
||||||
|
psB.publish(topic, data)
|
||||||
|
|
||||||
|
await defer2.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should handle quick reconnects with a delayed disconnect', async () => {
|
||||||
|
// Subscribe on both
|
||||||
|
const handlerSpy = sinon.spy()
|
||||||
|
const topic = 'reconnect-channel'
|
||||||
|
|
||||||
|
psA.on(topic, handlerSpy)
|
||||||
|
psB.on(topic, handlerSpy)
|
||||||
|
await Promise.all([
|
||||||
|
psA.subscribe(topic),
|
||||||
|
psB.subscribe(topic)
|
||||||
|
])
|
||||||
|
|
||||||
|
// Create two connections to the remote peer
|
||||||
|
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
|
||||||
|
// second connection
|
||||||
|
await psA._libp2p.dialer.connectToPeer(psB.peerId)
|
||||||
|
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)
|
||||||
|
|
||||||
|
// Wait for subscriptions to occur
|
||||||
|
await pWaitFor(() => {
|
||||||
|
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
|
||||||
|
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
|
||||||
|
})
|
||||||
|
|
||||||
|
// Verify messages go both ways
|
||||||
|
psA.publish(topic, 'message1')
|
||||||
|
psB.publish(topic, 'message2')
|
||||||
|
await pWaitFor(() => handlerSpy.callCount >= 2)
|
||||||
|
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
|
||||||
|
|
||||||
|
// Disconnect the first connection (this acts as a delayed reconnect)
|
||||||
|
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')
|
||||||
|
|
||||||
|
await originalConnection.close()
|
||||||
|
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)
|
||||||
|
|
||||||
|
// Verify messages go both ways after the disconnect
|
||||||
|
handlerSpy.resetHistory()
|
||||||
|
psA.publish(topic, 'message3')
|
||||||
|
psB.publish(topic, 'message4')
|
||||||
|
await pWaitFor(() => handlerSpy.callCount >= 2)
|
||||||
|
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
@ -1,8 +1,7 @@
|
|||||||
/* eslint-env mocha */
|
/* eslint-env mocha */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const chai = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const uint8ArrayFromString = require('uint8arrays/from-string')
|
const uint8ArrayFromString = require('uint8arrays/from-string')
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
const apiTest = require('./api')
|
const apiTest = require('./api')
|
||||||
const emitSelfTest = require('./emit-self')
|
const emitSelfTest = require('./emit-self')
|
||||||
const messagesTest = require('./messages')
|
const messagesTest = require('./messages')
|
||||||
|
const connectionHandlersTest = require('./connection-handlers')
|
||||||
const twoNodesTest = require('./two-nodes')
|
const twoNodesTest = require('./two-nodes')
|
||||||
const multipleNodesTest = require('./multiple-nodes')
|
const multipleNodesTest = require('./multiple-nodes')
|
||||||
|
|
||||||
@ -12,6 +13,7 @@ module.exports = (common) => {
|
|||||||
apiTest(common)
|
apiTest(common)
|
||||||
emitSelfTest(common)
|
emitSelfTest(common)
|
||||||
messagesTest(common)
|
messagesTest(common)
|
||||||
|
connectionHandlersTest(common)
|
||||||
twoNodesTest(common)
|
twoNodesTest(common)
|
||||||
multipleNodesTest(common)
|
multipleNodesTest(common)
|
||||||
})
|
})
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
/* eslint-env mocha */
|
/* eslint-env mocha */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const chai = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const PeerId = require('peer-id')
|
const PeerId = require('peer-id')
|
||||||
|
@ -2,8 +2,7 @@
|
|||||||
/* eslint max-nested-callbacks: ["error", 6] */
|
/* eslint max-nested-callbacks: ["error", 6] */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const chai = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const delay = require('delay')
|
const delay = require('delay')
|
||||||
|
@ -2,8 +2,7 @@
|
|||||||
/* eslint max-nested-callbacks: ["error", 6] */
|
/* eslint max-nested-callbacks: ["error", 6] */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const chai = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const { expect } = chai
|
|
||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
@ -24,205 +23,148 @@ function shouldNotHappen (_) {
|
|||||||
|
|
||||||
module.exports = (common) => {
|
module.exports = (common) => {
|
||||||
describe('pubsub with two nodes', () => {
|
describe('pubsub with two nodes', () => {
|
||||||
describe('fresh nodes', () => {
|
let psA, psB
|
||||||
let psA, psB
|
|
||||||
|
|
||||||
// Create pubsub nodes and connect them
|
// Create pubsub nodes and connect them
|
||||||
before(async () => {
|
before(async () => {
|
||||||
[psA, psB] = await common.setup(2)
|
[psA, psB] = await common.setup(2)
|
||||||
|
|
||||||
expect(psA.peers.size).to.be.eql(0)
|
expect(psA.peers.size).to.be.eql(0)
|
||||||
expect(psB.peers.size).to.be.eql(0)
|
expect(psB.peers.size).to.be.eql(0)
|
||||||
|
|
||||||
// Start pubsub and connect nodes
|
// Start pubsub and connect nodes
|
||||||
psA.start()
|
psA.start()
|
||||||
psB.start()
|
psB.start()
|
||||||
|
|
||||||
await psA._libp2p.dial(psB.peerId)
|
await psA._libp2p.dial(psB.peerId)
|
||||||
|
|
||||||
// Wait for peers to be ready in pubsub
|
// Wait for peers to be ready in pubsub
|
||||||
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
|
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
|
||||||
})
|
|
||||||
|
|
||||||
after(async () => {
|
|
||||||
sinon.restore()
|
|
||||||
|
|
||||||
psA && psA.stop()
|
|
||||||
psB && psB.stop()
|
|
||||||
|
|
||||||
await common.teardown()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Subscribe to a topic in nodeA', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
|
|
||||||
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
|
|
||||||
expectSet(psA.subscriptions, [topic])
|
|
||||||
expect(psB.peers.size).to.equal(1)
|
|
||||||
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
|
|
||||||
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
|
|
||||||
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
|
|
||||||
defer.resolve()
|
|
||||||
})
|
|
||||||
psA.subscribe(topic)
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Publish to a topic in nodeA', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
|
|
||||||
psA.once(topic, (msg) => {
|
|
||||||
expect(uint8ArrayToString(msg.data)).to.equal('hey')
|
|
||||||
psB.removeListener(topic, shouldNotHappen)
|
|
||||||
defer.resolve()
|
|
||||||
})
|
|
||||||
|
|
||||||
psB.once(topic, shouldNotHappen)
|
|
||||||
|
|
||||||
psA.publish(topic, uint8ArrayFromString('hey'))
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Publish to a topic in nodeB', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
|
|
||||||
psA.once(topic, (msg) => {
|
|
||||||
psA.once(topic, shouldNotHappen)
|
|
||||||
expect(uint8ArrayToString(msg.data)).to.equal('banana')
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
psA.removeListener(topic, shouldNotHappen)
|
|
||||||
psB.removeListener(topic, shouldNotHappen)
|
|
||||||
|
|
||||||
defer.resolve()
|
|
||||||
}, 100)
|
|
||||||
})
|
|
||||||
|
|
||||||
psB.once(topic, shouldNotHappen)
|
|
||||||
|
|
||||||
psB.publish(topic, uint8ArrayFromString('banana'))
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Publish 10 msg to a topic in nodeB', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
let counter = 0
|
|
||||||
|
|
||||||
psB.once(topic, shouldNotHappen)
|
|
||||||
psA.on(topic, receivedMsg)
|
|
||||||
|
|
||||||
function receivedMsg (msg) {
|
|
||||||
expect(uint8ArrayToString(msg.data)).to.equal('banana')
|
|
||||||
expect(msg.from).to.be.eql(psB.peerId.toB58String())
|
|
||||||
expect(msg.seqno).to.be.a('Uint8Array')
|
|
||||||
expect(msg.topicIDs).to.be.eql([topic])
|
|
||||||
|
|
||||||
if (++counter === 10) {
|
|
||||||
psA.removeListener(topic, receivedMsg)
|
|
||||||
psB.removeListener(topic, shouldNotHappen)
|
|
||||||
|
|
||||||
defer.resolve()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Unsubscribe from topic in nodeA', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
|
|
||||||
psA.unsubscribe(topic)
|
|
||||||
expect(psA.subscriptions.size).to.equal(0)
|
|
||||||
|
|
||||||
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
|
|
||||||
expect(psB.peers.size).to.equal(1)
|
|
||||||
expectSet(psB.topics.get(topic), [])
|
|
||||||
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
|
|
||||||
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
|
|
||||||
|
|
||||||
defer.resolve()
|
|
||||||
})
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('Publish to a topic:Z in nodeA nodeB', () => {
|
|
||||||
const defer = pDefer()
|
|
||||||
|
|
||||||
psA.once('Z', shouldNotHappen)
|
|
||||||
psB.once('Z', shouldNotHappen)
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
psA.removeListener('Z', shouldNotHappen)
|
|
||||||
psB.removeListener('Z', shouldNotHappen)
|
|
||||||
defer.resolve()
|
|
||||||
}, 100)
|
|
||||||
|
|
||||||
psB.publish('Z', uint8ArrayFromString('banana'))
|
|
||||||
psA.publish('Z', uint8ArrayFromString('banana'))
|
|
||||||
|
|
||||||
return defer.promise
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('nodes send state on connection', () => {
|
after(async () => {
|
||||||
let psA, psB
|
sinon.restore()
|
||||||
|
|
||||||
// Create pubsub nodes and connect them
|
psA && psA.stop()
|
||||||
before(async () => {
|
psB && psB.stop()
|
||||||
[psA, psB] = await common.setup(2)
|
|
||||||
|
|
||||||
expect(psA.peers.size).to.be.eql(0)
|
await common.teardown()
|
||||||
expect(psB.peers.size).to.be.eql(0)
|
})
|
||||||
|
|
||||||
// Start pubsub and connect nodes
|
it('Subscribe to a topic in nodeA', () => {
|
||||||
psA.start()
|
const defer = pDefer()
|
||||||
psB.start()
|
|
||||||
})
|
|
||||||
|
|
||||||
// Make subscriptions prior to nodes connected
|
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
|
||||||
before(() => {
|
expectSet(psA.subscriptions, [topic])
|
||||||
psA.subscribe('Za')
|
|
||||||
psB.subscribe('Zb')
|
|
||||||
|
|
||||||
expect(psA.peers.size).to.equal(0)
|
|
||||||
expectSet(psA.subscriptions, ['Za'])
|
|
||||||
expect(psB.peers.size).to.equal(0)
|
|
||||||
expectSet(psB.subscriptions, ['Zb'])
|
|
||||||
})
|
|
||||||
|
|
||||||
after(async () => {
|
|
||||||
sinon.restore()
|
|
||||||
|
|
||||||
psA && psA.stop()
|
|
||||||
psB && psB.stop()
|
|
||||||
|
|
||||||
await common.teardown()
|
|
||||||
})
|
|
||||||
|
|
||||||
it('existing subscriptions are sent upon peer connection', async function () {
|
|
||||||
this.timeout(10e3)
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
psA._libp2p.dial(psB.peerId),
|
|
||||||
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
|
|
||||||
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
|
|
||||||
])
|
|
||||||
|
|
||||||
expect(psA.peers.size).to.equal(1)
|
|
||||||
expect(psB.peers.size).to.equal(1)
|
expect(psB.peers.size).to.equal(1)
|
||||||
|
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
|
||||||
expectSet(psA.subscriptions, ['Za'])
|
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
|
||||||
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
|
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
|
||||||
|
defer.resolve()
|
||||||
expectSet(psB.subscriptions, ['Zb'])
|
|
||||||
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
|
|
||||||
})
|
})
|
||||||
|
psA.subscribe(topic)
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Publish to a topic in nodeA', () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
psA.once(topic, (msg) => {
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal('hey')
|
||||||
|
psB.removeListener(topic, shouldNotHappen)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
psB.once(topic, shouldNotHappen)
|
||||||
|
|
||||||
|
psA.publish(topic, uint8ArrayFromString('hey'))
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Publish to a topic in nodeB', () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
psA.once(topic, (msg) => {
|
||||||
|
psA.once(topic, shouldNotHappen)
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal('banana')
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
psA.removeListener(topic, shouldNotHappen)
|
||||||
|
psB.removeListener(topic, shouldNotHappen)
|
||||||
|
|
||||||
|
defer.resolve()
|
||||||
|
}, 100)
|
||||||
|
})
|
||||||
|
|
||||||
|
psB.once(topic, shouldNotHappen)
|
||||||
|
|
||||||
|
psB.publish(topic, uint8ArrayFromString('banana'))
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Publish 10 msg to a topic in nodeB', () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
let counter = 0
|
||||||
|
|
||||||
|
psB.once(topic, shouldNotHappen)
|
||||||
|
psA.on(topic, receivedMsg)
|
||||||
|
|
||||||
|
function receivedMsg (msg) {
|
||||||
|
expect(uint8ArrayToString(msg.data)).to.equal('banana')
|
||||||
|
expect(msg.from).to.be.eql(psB.peerId.toB58String())
|
||||||
|
expect(msg.seqno).to.be.a('Uint8Array')
|
||||||
|
expect(msg.topicIDs).to.be.eql([topic])
|
||||||
|
|
||||||
|
if (++counter === 10) {
|
||||||
|
psA.removeListener(topic, receivedMsg)
|
||||||
|
psB.removeListener(topic, shouldNotHappen)
|
||||||
|
|
||||||
|
defer.resolve()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Unsubscribe from topic in nodeA', () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
psA.unsubscribe(topic)
|
||||||
|
expect(psA.subscriptions.size).to.equal(0)
|
||||||
|
|
||||||
|
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
|
||||||
|
expect(psB.peers.size).to.equal(1)
|
||||||
|
expectSet(psB.topics.get(topic), [])
|
||||||
|
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
|
||||||
|
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
|
||||||
|
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Publish to a topic:Z in nodeA nodeB', () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
psA.once('Z', shouldNotHappen)
|
||||||
|
psB.once('Z', shouldNotHappen)
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
psA.removeListener('Z', shouldNotHappen)
|
||||||
|
psB.removeListener('Z', shouldNotHappen)
|
||||||
|
defer.resolve()
|
||||||
|
}, 100)
|
||||||
|
|
||||||
|
psB.publish('Z', uint8ArrayFromString('banana'))
|
||||||
|
psA.publish('Z', uint8ArrayFromString('banana'))
|
||||||
|
|
||||||
|
return defer.promise
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const { expect } = require('chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
|
|
||||||
exports.first = (map) => map.values().next().value
|
exports.first = (map) => map.values().next().value
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user