diff --git a/package.json b/package.json index f5513c92..badda309 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "libp2p-delegated-content-routing": "^0.5.0", "libp2p-delegated-peer-routing": "^0.5.0", "libp2p-floodsub": "^0.21.0", - "libp2p-gossipsub": "^0.4.0", + "libp2p-gossipsub": "^0.4.6", "libp2p-kad-dht": "^0.19.1", "libp2p-mdns": "^0.14.1", "libp2p-mplex": "^0.9.5", diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index 75ad5542..64a3d4e0 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -190,12 +190,18 @@ describe('Pubsub subsystem operates correctly', () => { }) }) - describe('pubsub started after disconnect and connect', () => { + describe('pubsub with intermittent connections', () => { beforeEach(async () => { libp2p = await create(mergeOptions(subsystemOptions, { peerId, addresses: { listen: [listenAddr] + }, + config: { + pubsub: { + enabled: true, + emitSelf: false + } } })) @@ -206,7 +212,8 @@ describe('Pubsub subsystem operates correctly', () => { }, config: { pubsub: { - enabled: true + enabled: true, + emitSelf: false } } })) @@ -226,7 +233,7 @@ describe('Pubsub subsystem operates correctly', () => { sinon.restore() }) - it('should receive pubsub messages', async () => { + it('should receive pubsub messages after a node restart', async () => { const topic = 'test-topic' const data = 'hey!' const libp2pId = libp2p.peerId.toB58String() @@ -275,5 +282,42 @@ describe('Pubsub subsystem operates correctly', () => { await defer2.promise }) + + it('should handle quick reconnects with a delayed disconnect', async () => { + // Subscribe on both + const handlerSpy = sinon.spy() + const topic = 'reconnect-channel' + await Promise.all([ + libp2p.pubsub.subscribe(topic, handlerSpy), + remoteLibp2p.pubsub.subscribe(topic, handlerSpy) + ]) + // Create two connections to the remote peer + const originalConnection = await libp2p.dialer.connectToPeer(remoteLibp2p.peerId) + // second connection + await libp2p.dialer.connectToPeer(remoteLibp2p.peerId) + expect(libp2p.connections.get(remoteLibp2p.peerId.toB58String())).to.have.length(2) + + // Wait for subscriptions to occur + await pWaitFor(() => { + return libp2p.pubsub.getSubscribers(topic).includes(remoteLibp2p.peerId.toB58String()) && + remoteLibp2p.pubsub.getSubscribers(topic).includes(libp2p.peerId.toB58String()) + }) + + // Verify messages go both ways + libp2p.pubsub.publish(topic, 'message1') + remoteLibp2p.pubsub.publish(topic, 'message2') + await pWaitFor(() => handlerSpy.callCount === 2) + expect(handlerSpy.args.map(([message]) => message.data.toString())).to.include.members(['message1', 'message2']) + + // Disconnect the first connection (this acts as a delayed reconnect) + await originalConnection.close() + + // Verify messages go both ways after the disconnect + handlerSpy.resetHistory() + libp2p.pubsub.publish(topic, 'message3') + remoteLibp2p.pubsub.publish(topic, 'message4') + await pWaitFor(() => handlerSpy.callCount === 2) + expect(handlerSpy.args.map(([message]) => message.data.toString())).to.include.members(['message3', 'message4']) + }) }) })