test: add pubsub reconnect test (#693)

* test: add pubsub reconnect test

* chore: dep bump

* chore: remove temp pubsub dep

Co-authored-by: Vasco Santos <vasco.santos@moxy.studio>

Co-authored-by: Vasco Santos <vasco.santos@moxy.studio>
This commit is contained in:
Jacob Heun
2020-07-07 18:31:51 +02:00
committed by GitHub
parent 5cd8c19567
commit 51da8874d8
2 changed files with 48 additions and 4 deletions

View File

@ -102,7 +102,7 @@
"libp2p-delegated-content-routing": "^0.5.0", "libp2p-delegated-content-routing": "^0.5.0",
"libp2p-delegated-peer-routing": "^0.5.0", "libp2p-delegated-peer-routing": "^0.5.0",
"libp2p-floodsub": "^0.21.0", "libp2p-floodsub": "^0.21.0",
"libp2p-gossipsub": "^0.4.0", "libp2p-gossipsub": "^0.4.6",
"libp2p-kad-dht": "^0.19.1", "libp2p-kad-dht": "^0.19.1",
"libp2p-mdns": "^0.14.1", "libp2p-mdns": "^0.14.1",
"libp2p-mplex": "^0.9.5", "libp2p-mplex": "^0.9.5",

View File

@ -190,12 +190,18 @@ describe('Pubsub subsystem operates correctly', () => {
}) })
}) })
describe('pubsub started after disconnect and connect', () => { describe('pubsub with intermittent connections', () => {
beforeEach(async () => { beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, { libp2p = await create(mergeOptions(subsystemOptions, {
peerId, peerId,
addresses: { addresses: {
listen: [listenAddr] listen: [listenAddr]
},
config: {
pubsub: {
enabled: true,
emitSelf: false
}
} }
})) }))
@ -206,7 +212,8 @@ describe('Pubsub subsystem operates correctly', () => {
}, },
config: { config: {
pubsub: { pubsub: {
enabled: true enabled: true,
emitSelf: false
} }
} }
})) }))
@ -226,7 +233,7 @@ describe('Pubsub subsystem operates correctly', () => {
sinon.restore() sinon.restore()
}) })
it('should receive pubsub messages', async () => { it('should receive pubsub messages after a node restart', async () => {
const topic = 'test-topic' const topic = 'test-topic'
const data = 'hey!' const data = 'hey!'
const libp2pId = libp2p.peerId.toB58String() const libp2pId = libp2p.peerId.toB58String()
@ -275,5 +282,42 @@ describe('Pubsub subsystem operates correctly', () => {
await defer2.promise await defer2.promise
}) })
it('should handle quick reconnects with a delayed disconnect', async () => {
// Subscribe on both
const handlerSpy = sinon.spy()
const topic = 'reconnect-channel'
await Promise.all([
libp2p.pubsub.subscribe(topic, handlerSpy),
remoteLibp2p.pubsub.subscribe(topic, handlerSpy)
])
// Create two connections to the remote peer
const originalConnection = await libp2p.dialer.connectToPeer(remoteLibp2p.peerId)
// second connection
await libp2p.dialer.connectToPeer(remoteLibp2p.peerId)
expect(libp2p.connections.get(remoteLibp2p.peerId.toB58String())).to.have.length(2)
// Wait for subscriptions to occur
await pWaitFor(() => {
return libp2p.pubsub.getSubscribers(topic).includes(remoteLibp2p.peerId.toB58String()) &&
remoteLibp2p.pubsub.getSubscribers(topic).includes(libp2p.peerId.toB58String())
})
// Verify messages go both ways
libp2p.pubsub.publish(topic, 'message1')
remoteLibp2p.pubsub.publish(topic, 'message2')
await pWaitFor(() => handlerSpy.callCount === 2)
expect(handlerSpy.args.map(([message]) => message.data.toString())).to.include.members(['message1', 'message2'])
// Disconnect the first connection (this acts as a delayed reconnect)
await originalConnection.close()
// Verify messages go both ways after the disconnect
handlerSpy.resetHistory()
libp2p.pubsub.publish(topic, 'message3')
remoteLibp2p.pubsub.publish(topic, 'message4')
await pWaitFor(() => handlerSpy.callCount === 2)
expect(handlerSpy.args.map(([message]) => message.data.toString())).to.include.members(['message3', 'message4'])
})
}) })
}) })