From 340edf53e3fba30e5df98aaf5be212983e776ba1 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 15 Nov 2019 15:15:12 +0100 Subject: [PATCH] chore: address review --- package.json | 2 +- src/registrar.js | 7 +++++++ src/upgrader.js | 6 +++--- test/pubsub/operation.node.js | 32 ++++++++++++++++++++++---------- test/registrar/registrar.spec.js | 13 ++++++++++++- 5 files changed, 45 insertions(+), 15 deletions(-) diff --git a/package.json b/package.json index 42bb04b7..6a1d81a6 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "it-protocol-buffers": "^0.2.0", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.17.1", - "libp2p-interfaces": "^0.1.4", + "libp2p-interfaces": "^0.1.5", "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", diff --git a/src/registrar.js b/src/registrar.js index 98d2f978..d777458e 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -5,6 +5,8 @@ const debug = require('debug') const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') +const Topology = require('libp2p-interfaces/src/topology') +const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const { Connection } = require('libp2p-interfaces/src/connection') const PeerInfo = require('peer-info') @@ -109,6 +111,11 @@ class Registrar { * @return {string} registrar identifier */ register (topology) { + assert( + Topology.isTopology(topology) || + MulticodecTopology.isMulticodecTopology(topology), + 'topology must be an instance of interfaces/topology') + // Create topology const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() diff --git a/src/upgrader.js b/src/upgrader.js index 6890c6f7..1699451a 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -186,7 +186,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) connection.addStream(stream, protocol) - this._onStream({ connection, stream, protocol, remotePeer }) + this._onStream({ connection, stream, protocol }) } catch (err) { log.error(err) } @@ -254,9 +254,9 @@ class Upgrader { * @param {Stream} options.stream * @param {string} options.protocol */ - _onStream ({ connection, stream, protocol, remotePeer }) { + _onStream ({ connection, stream, protocol }) { const handler = this.protocols.get(protocol) - handler({ connection, stream, protocol, remotePeer }) + handler({ connection, stream, protocol }) } /** diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index b7e5a77b..bec41a30 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -101,8 +101,7 @@ describe('Pubsub subsystem operates correctly', () => { }) }) - // TODO: Needs identify push - describe.skip('pubsub started after connect', () => { + describe('pubsub started after connect', () => { beforeEach(async () => { libp2p = await create(mergeOptions(subsystemOptions, { peerInfo @@ -132,7 +131,7 @@ describe('Pubsub subsystem operates correctly', () => { sinon.restore() }) - it.skip('should get notified of connected peers after starting', async () => { + it('should get notified of connected peers after starting', async () => { const connection = await libp2p.dial(remAddr) expect(connection).to.exist() @@ -141,14 +140,16 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - // Wait for - // Validate + await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) + expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) }) - it.skip('should receive pubsub messages', async () => { + it('should receive pubsub messages', async function () { + this.timeout(10e3) const defer = pDefer() + const libp2pId = libp2p.peerInfo.id.toB58String() const topic = 'test-topic' const data = 'hey!' @@ -156,15 +157,26 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() - // TODO: wait for + await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1) - libp2p.pubsub.subscribe(topic) - libp2p.pubsub.once(topic, (msg) => { + let subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.not.include(topic) + + libp2p.pubsub.subscribe(topic, (msg) => { expect(msg.data.toString()).to.equal(data) defer.resolve() }) - libp2p.pubsub.publish(topic, data) + subscribedTopics = libp2p.pubsub.getTopics() + expect(subscribedTopics).to.include(topic) + + // wait for remoteLibp2p to know about libp2p subscription + await pWaitFor(() => { + const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic) + return subscribedPeers.includes(libp2pId) + }) + + remoteLibp2p.pubsub.publish(topic, data) await defer.promise }) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index e1540cfd..9114e035 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -33,7 +33,18 @@ describe('registrar', () => { throw new Error('should fail to register a protocol if no multicodec is provided') }) - // TODO: not valid topology + it('should fail to register a protocol if an invalid topology is provided', () => { + const fakeTopology = { + random: 1 + } + try { + registrar.register() + } catch (err) { + expect(err).to.exist(fakeTopology) + return + } + throw new Error('should fail to register a protocol if an invalid topology is provided') + }) }) describe('registration', () => {