feat: discovery modules (#486)

* feat: discovery modules

* chore: address review
This commit is contained in:
Vasco Santos 2019-11-28 22:50:14 +01:00 committed by Jacob Heun
parent 1999606ecc
commit 18a062ed12
4 changed files with 206 additions and 4 deletions

View File

@ -88,13 +88,13 @@
"glob": "^7.1.4", "glob": "^7.1.4",
"interface-datastore": "^0.6.0", "interface-datastore": "^0.6.0",
"it-pair": "^1.0.0", "it-pair": "^1.0.0",
"libp2p-bootstrap": "^0.9.7", "libp2p-bootstrap": "^0.10.3",
"libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "^0.19.0", "libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0", "libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "~0.17.0", "libp2p-kad-dht": "~0.17.0",
"libp2p-mdns": "^0.12.3", "libp2p-mdns": "^0.13.0",
"libp2p-mplex": "^0.9.1", "libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0", "libp2p-pnet": "~0.1.0",
"libp2p-secio": "^0.12.1", "libp2p-secio": "^0.12.1",

View File

@ -168,7 +168,6 @@ class Libp2p extends EventEmitter {
await this.stop() await this.stop()
throw err throw err
} }
this._isStarted = true
} }
/** /**
@ -317,8 +316,13 @@ class Libp2p extends EventEmitter {
this.pubsub && this.pubsub.start() this.pubsub && this.pubsub.start()
} }
// DHT subsystem
if (this._config.dht.enabled) { if (this._config.dht.enabled) {
this._dht && this._dht.start() this._dht && this._dht.start()
// TODO: this should be modified once random-walk is used as
// the other discovery modules
this._dht._dht.on('peer', this._peerDiscovered)
} }
} }
@ -327,6 +331,11 @@ class Libp2p extends EventEmitter {
* @private * @private
*/ */
_onDidStart () { _onDidStart () {
this._isStarted = true
// Peer discovery
this._setupPeerDiscovery()
// Once we start, emit and dial any peers we may have already discovered // Once we start, emit and dial any peers we may have already discovered
for (const peerInfo of this.peerStore.peers.values()) { for (const peerInfo of this.peerStore.peers.values()) {
this.emit('peer:discovery', peerInfo) this.emit('peer:discovery', peerInfo)
@ -385,7 +394,7 @@ class Libp2p extends EventEmitter {
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
_setupPeerDiscovery () { _setupPeerDiscovery () {
for (const DiscoveryService of this._modules.peerDiscovery) { for (const DiscoveryService of this._modules.peerDiscovery || []) {
let config = { let config = {
enabled: true // on by default enabled: true // on by default
} }

View File

@ -0,0 +1,172 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const defer = require('p-defer')
const mergeOptions = require('merge-options')
const Bootstrap = require('libp2p-bootstrap')
const crypto = require('libp2p-crypto')
const KadDht = require('libp2p-kad-dht')
const MulticastDNS = require('libp2p-mdns')
const multiaddr = require('multiaddr')
const Libp2p = require('../../src')
const baseOptions = require('../utils/base-options')
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
describe('peer discovery scenarios', () => {
let peerInfo, remotePeerInfo1, remotePeerInfo2
let libp2p
before(async () => {
[peerInfo, remotePeerInfo1, remotePeerInfo2] = await createPeerInfoFromFixture(3)
peerInfo.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
remotePeerInfo1.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
remotePeerInfo2.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
})
afterEach(async () => {
libp2p && await libp2p.stop()
})
it('bootstrap should discover all peers in the list', async () => {
const deferred = defer()
const bootstrappers = [
...remotePeerInfo1.multiaddrs.toArray().map((ma) => `${ma}/p2p/${remotePeerInfo1.id.toB58String()}`),
...remotePeerInfo2.multiaddrs.toArray().map((ma) => `${ma}/p2p/${remotePeerInfo2.id.toB58String()}`)
]
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo,
modules: {
peerDiscovery: [Bootstrap]
},
config: {
peerDiscovery: {
bootstrap: {
enabled: true,
list: bootstrappers
}
}
}
}))
const expectedPeers = new Set([
remotePeerInfo1.id.toB58String(),
remotePeerInfo2.id.toB58String()
])
libp2p.on('peer:discovery', (peerInfo) => {
expectedPeers.delete(peerInfo.id.toB58String())
if (expectedPeers.size === 0) {
libp2p.removeAllListeners('peer:discovery')
deferred.resolve()
}
})
await libp2p.start()
return deferred.promise
})
it('MulticastDNS should discover all peers on the local network', async () => {
const deferred = defer()
const getConfig = (peerInfo) => mergeOptions(baseOptions, {
peerInfo,
modules: {
peerDiscovery: [MulticastDNS]
},
config: {
peerDiscovery: {
mdns: {
enabled: true,
interval: 200, // discover quickly
// use a random tag to prevent CI collision
serviceTag: crypto.randomBytes(10).toString('hex')
}
}
}
})
libp2p = new Libp2p(getConfig(peerInfo))
const remoteLibp2p1 = new Libp2p(getConfig(remotePeerInfo1))
const remoteLibp2p2 = new Libp2p(getConfig(remotePeerInfo2))
const expectedPeers = new Set([
remotePeerInfo1.id.toB58String(),
remotePeerInfo2.id.toB58String()
])
libp2p.on('peer:discovery', (peerInfo) => {
expectedPeers.delete(peerInfo.id.toB58String())
if (expectedPeers.size === 0) {
libp2p.removeAllListeners('peer:discovery')
deferred.resolve()
}
})
await remoteLibp2p1.start()
await remoteLibp2p2.start()
await libp2p.start()
await deferred.promise
await remoteLibp2p1.stop()
await remoteLibp2p2.stop()
})
it('kad-dht should discover other peers', async () => {
const deferred = defer()
const getConfig = (peerInfo) => mergeOptions(baseOptions, {
peerInfo,
modules: {
dht: KadDht
},
config: {
dht: {
randomWalk: {
enabled: true,
delay: 100,
interval: 200, // start the query sooner
timeout: 3000
},
enabled: true
}
}
})
libp2p = new Libp2p(getConfig(peerInfo))
const remoteLibp2p1 = new Libp2p(getConfig(remotePeerInfo1))
const remoteLibp2p2 = new Libp2p(getConfig(remotePeerInfo2))
libp2p.on('peer:discovery', (peerInfo) => {
if (peerInfo.id.toB58String() === remotePeerInfo2.id.toB58String()) {
libp2p.removeAllListeners('peer:discovery')
deferred.resolve()
}
})
await remoteLibp2p1.start()
await remoteLibp2p2.start()
await libp2p.start()
// Topology:
// A -> B
// C -> B
await Promise.all([
libp2p.dial(remotePeerInfo1),
remoteLibp2p2.dial(remotePeerInfo1)
])
await deferred.promise
await remoteLibp2p1.stop()
await remoteLibp2p2.stop()
})
})

View File

@ -5,7 +5,11 @@ const chai = require('chai')
chai.use(require('dirty-chai')) chai.use(require('dirty-chai'))
const { expect } = chai const { expect } = chai
const sinon = require('sinon') const sinon = require('sinon')
const defer = require('p-defer') const defer = require('p-defer')
const mergeOptions = require('merge-options')
const MulticastDNS = require('libp2p-mdns')
const Libp2p = require('../../src') const Libp2p = require('../../src')
const baseOptions = require('../utils/base-options.browser') const baseOptions = require('../utils/base-options.browser')
@ -22,6 +26,7 @@ describe('peer discovery', () => {
afterEach(async () => { afterEach(async () => {
libp2p && await libp2p.stop() libp2p && await libp2p.stop()
sinon.reset()
}) })
it('should dial know peers on startup', async () => { it('should dial know peers on startup', async () => {
@ -42,4 +47,20 @@ describe('peer discovery', () => {
await deferred.promise await deferred.promise
expect(spy.getCall(0).args).to.eql([remotePeerInfo]) expect(spy.getCall(0).args).to.eql([remotePeerInfo])
}) })
it('should ignore self on discovery', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo,
modules: {
peerDiscovery: [MulticastDNS]
}
}))
await libp2p.start()
const discoverySpy = sinon.spy()
libp2p.on('peer:discovery', discoverySpy)
libp2p._discovery[0].emit('peer', libp2p.peerInfo)
expect(discoverySpy.called).to.eql(false)
})
}) })