diff --git a/package.json b/package.json index f9a307c6..43bebf7a 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,7 @@ "it-pipe": "^1.1.0", "it-protocol-buffers": "^0.2.0", "libp2p-crypto": "^0.17.6", - "libp2p-interfaces": "^0.3.0", + "libp2p-interfaces": "^0.3.1", "libp2p-utils": "^0.1.2", "mafmt": "^7.0.0", "merge-options": "^2.0.0", diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index a29ac953..75ad5542 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -189,4 +189,91 @@ describe('Pubsub subsystem operates correctly', () => { await defer.promise }) }) + + describe('pubsub started after disconnect and connect', () => { + beforeEach(async () => { + libp2p = await create(mergeOptions(subsystemOptions, { + peerId, + addresses: { + listen: [listenAddr] + } + })) + + remoteLibp2p = await create(mergeOptions(subsystemOptions, { + peerId: remotePeerId, + addresses: { + listen: [remoteListenAddr] + }, + config: { + pubsub: { + enabled: true + } + } + })) + + await libp2p.start() + await remoteLibp2p.start() + + libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) + }) + + afterEach(() => Promise.all([ + libp2p && libp2p.stop(), + remoteLibp2p && remoteLibp2p.stop() + ])) + + afterEach(() => { + sinon.restore() + }) + + it('should receive pubsub messages', async () => { + const topic = 'test-topic' + const data = 'hey!' + const libp2pId = libp2p.peerId.toB58String() + + let counter = 0 + const defer1 = pDefer() + const defer2 = pDefer() + const handler = (msg) => { + expect(msg.data.toString()).to.equal(data) + counter++ + counter === 1 ? defer1.resolve() : defer2.resolve() + } + + await libp2p.dial(remotePeerId) + + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, handler) + + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) + return subscribedPeers.includes(libp2pId) + }) + remoteLibp2p.pubsub.publish(topic, data) + + await defer1.promise + + await remoteLibp2p.stop() + await remoteLibp2p.start() + + libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) + await libp2p.dial(remotePeerId) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getSubscribers(topic) + return subscribedPeers.includes(libp2pId) + }) + + remoteLibp2p.pubsub.publish(topic, data) + + await defer2.promise + }) + }) })