chore: address review

This commit is contained in:
Vasco Santos 2019-11-15 15:15:12 +01:00
parent 4cc9736485
commit 340edf53e3
5 changed files with 45 additions and 15 deletions

View File

@ -55,7 +55,7 @@
"it-protocol-buffers": "^0.2.0", "it-protocol-buffers": "^0.2.0",
"latency-monitor": "~0.2.1", "latency-monitor": "~0.2.1",
"libp2p-crypto": "^0.17.1", "libp2p-crypto": "^0.17.1",
"libp2p-interfaces": "^0.1.4", "libp2p-interfaces": "^0.1.5",
"mafmt": "^7.0.0", "mafmt": "^7.0.0",
"merge-options": "^1.0.1", "merge-options": "^1.0.1",
"moving-average": "^1.0.0", "moving-average": "^1.0.0",

View File

@ -5,6 +5,8 @@ const debug = require('debug')
const log = debug('libp2p:peer-store') const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error') log.error = debug('libp2p:peer-store:error')
const Topology = require('libp2p-interfaces/src/topology')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')
const { Connection } = require('libp2p-interfaces/src/connection') const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info') const PeerInfo = require('peer-info')
@ -109,6 +111,11 @@ class Registrar {
* @return {string} registrar identifier * @return {string} registrar identifier
*/ */
register (topology) { register (topology) {
assert(
Topology.isTopology(topology) ||
MulticodecTopology.isMulticodecTopology(topology),
'topology must be an instance of interfaces/topology')
// Create topology // Create topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()

View File

@ -186,7 +186,7 @@ class Upgrader {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol) log('%s: incoming stream opened on %s', direction, protocol)
connection.addStream(stream, protocol) connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol, remotePeer }) this._onStream({ connection, stream, protocol })
} catch (err) { } catch (err) {
log.error(err) log.error(err)
} }
@ -254,9 +254,9 @@ class Upgrader {
* @param {Stream} options.stream * @param {Stream} options.stream
* @param {string} options.protocol * @param {string} options.protocol
*/ */
_onStream ({ connection, stream, protocol, remotePeer }) { _onStream ({ connection, stream, protocol }) {
const handler = this.protocols.get(protocol) const handler = this.protocols.get(protocol)
handler({ connection, stream, protocol, remotePeer }) handler({ connection, stream, protocol })
} }
/** /**

View File

@ -101,8 +101,7 @@ describe('Pubsub subsystem operates correctly', () => {
}) })
}) })
// TODO: Needs identify push describe('pubsub started after connect', () => {
describe.skip('pubsub started after connect', () => {
beforeEach(async () => { beforeEach(async () => {
libp2p = await create(mergeOptions(subsystemOptions, { libp2p = await create(mergeOptions(subsystemOptions, {
peerInfo peerInfo
@ -132,7 +131,7 @@ describe('Pubsub subsystem operates correctly', () => {
sinon.restore() sinon.restore()
}) })
it.skip('should get notified of connected peers after starting', async () => { it('should get notified of connected peers after starting', async () => {
const connection = await libp2p.dial(remAddr) const connection = await libp2p.dial(remAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -141,14 +140,16 @@ describe('Pubsub subsystem operates correctly', () => {
remoteLibp2p.pubsub.start() remoteLibp2p.pubsub.start()
// Wait for await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1)
// Validate
expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1) expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(1)
expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1) expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(1)
}) })
it.skip('should receive pubsub messages', async () => { it('should receive pubsub messages', async function () {
this.timeout(10e3)
const defer = pDefer() const defer = pDefer()
const libp2pId = libp2p.peerInfo.id.toB58String()
const topic = 'test-topic' const topic = 'test-topic'
const data = 'hey!' const data = 'hey!'
@ -156,15 +157,26 @@ describe('Pubsub subsystem operates correctly', () => {
remoteLibp2p.pubsub.start() remoteLibp2p.pubsub.start()
// TODO: wait for await pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1)
libp2p.pubsub.subscribe(topic) let subscribedTopics = libp2p.pubsub.getTopics()
libp2p.pubsub.once(topic, (msg) => { expect(subscribedTopics).to.not.include(topic)
libp2p.pubsub.subscribe(topic, (msg) => {
expect(msg.data.toString()).to.equal(data) expect(msg.data.toString()).to.equal(data)
defer.resolve() defer.resolve()
}) })
libp2p.pubsub.publish(topic, data) subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic)
return subscribedPeers.includes(libp2pId)
})
remoteLibp2p.pubsub.publish(topic, data)
await defer.promise await defer.promise
}) })

View File

@ -33,7 +33,18 @@ describe('registrar', () => {
throw new Error('should fail to register a protocol if no multicodec is provided') throw new Error('should fail to register a protocol if no multicodec is provided')
}) })
// TODO: not valid topology it('should fail to register a protocol if an invalid topology is provided', () => {
const fakeTopology = {
random: 1
}
try {
registrar.register()
} catch (err) {
expect(err).to.exist(fakeTopology)
return
}
throw new Error('should fail to register a protocol if an invalid topology is provided')
})
}) })
describe('registration', () => { describe('registration', () => {