feat: auto dial discovered peers (#349)

This commit is contained in:
Jacob Heun
2019-04-11 12:44:58 +02:00
committed by GitHub
parent 8b627797e2
commit 01aa44724e
12 changed files with 430 additions and 153 deletions

View File

@ -45,23 +45,23 @@ class Node extends EventEmitter {
super()
// validateConfig will ensure the config is correct,
// and add default values where appropriate
_options = validateConfig(_options)
this._options = validateConfig(_options)
this.datastore = _options.datastore
this.peerInfo = _options.peerInfo
this.peerBook = _options.peerBook || new PeerBook()
this.datastore = this._options.datastore
this.peerInfo = this._options.peerInfo
this.peerBook = this._options.peerBook || new PeerBook()
this._modules = _options.modules
this._config = _options.config
this._modules = this._options.modules
this._config = this._options.config
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references
// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, _options.switch)
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch.on('error', (...args) => this.emit('error', ...args))
this.stats = this._switch.stats
this.connectionManager = new ConnectionManager(this, _options.connectionManager)
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Attach stream multiplexers
if (this._modules.streamMuxer) {
@ -165,6 +165,16 @@ class Node extends EventEmitter {
log.error(err)
this.emit('error', err)
})
// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
this.peerBook.getAllArray().forEach((peerInfo) => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
})
this._peerDiscovered = this._peerDiscovered.bind(this)
}
/**
@ -352,45 +362,21 @@ class Node extends EventEmitter {
this._switch.transport.add(ws.tag || ws.constructor.name, ws)
}
// all transports need to be setup before discover starts
if (this._modules.peerDiscovery) {
each(this._modules.peerDiscovery, (D, _cb) => {
let config = {}
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()
if (D.tag &&
this._config.peerDiscovery &&
this._config.peerDiscovery[D.tag]) {
config = this._config.peerDiscovery[D.tag]
}
// If not configured to be enabled/disabled then enable by default
const enabled = config.enabled == null ? true : config.enabled
// If enabled then start it
if (enabled) {
let d
if (typeof D === 'function') {
d = new D(Object.assign({}, config, { peerInfo: this.peerInfo }))
} else {
d = D
}
d.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
this._discovery.push(d)
d.start(_cb)
} else {
_cb()
}
}, cb)
} else {
cb()
}
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!this._transport.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
cb()
},
(cb) => {
if (this._dht) {
this._dht.start(() => {
this._dht.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
this._dht.on('peer', this._peerDiscovered)
cb()
})
} else {
@ -403,17 +389,13 @@ class Node extends EventEmitter {
}
cb()
},
// Peer Discovery
(cb) => {
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!this._transport.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
cb()
if (this._modules.peerDiscovery) {
this._setupPeerDiscovery(cb)
} else {
cb()
}
}
], (err) => {
if (err) {
@ -428,16 +410,17 @@ class Node extends EventEmitter {
_onStopping () {
series([
(cb) => {
if (this._modules.peerDiscovery) {
// stop all discoveries before continuing with shutdown
return parallel(
this._discovery.map((d) => {
return (_cb) => d.stop(() => { _cb() })
}),
cb
)
}
cb()
// stop all discoveries before continuing with shutdown
parallel(
this._discovery.map((d) => {
d.removeListener('peer', this._peerDiscovered)
return (_cb) => d.stop((err) => {
log.error('an error occurred stopping the discovery service', err)
_cb()
})
}),
cb
)
},
(cb) => {
if (this._floodSub) {
@ -447,6 +430,7 @@ class Node extends EventEmitter {
},
(cb) => {
if (this._dht) {
this._dht.removeListener('peer', this._peerDiscovered)
return this._dht.stop(cb)
}
cb()
@ -468,6 +452,86 @@ class Node extends EventEmitter {
this.state('done')
})
}
/**
* Handles discovered peers. Each discovered peer will be emitted via
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
*
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
* it would be ideal if only new peers were emitted. Currently, with
* other modules adding peers to the `PeerBook` we have no way of knowing
* if a peer is new or not, so it has to be emitted.
*
* @private
* @param {PeerInfo} peerInfo
*/
_peerDiscovered (peerInfo) {
peerInfo = this.peerBook.put(peerInfo)
if (!this.isStarted()) return
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
}
/**
* Will dial to the given `peerInfo` if the current number of
* connected peers is less than the configured `ConnectionManager`
* minPeers.
* @private
* @param {PeerInfo} peerInfo
*/
_maybeConnect (peerInfo) {
// If auto dialing is on, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !peerInfo.isConnected()) {
const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > Object.keys(this._switch.connection.connections).length) {
log('connecting to discovered peer')
this._switch.dialer.connect(peerInfo, (err) => {
err && log.error('could not connect to discovered peer', err)
})
}
}
}
/**
* Initializes and starts peer discovery services
*
* @private
* @param {function(Error)} callback
*/
_setupPeerDiscovery (callback) {
for (const DiscoveryService of this._modules.peerDiscovery) {
let config = {
enabled: true // on by default
}
if (DiscoveryService.tag &&
this._config.peerDiscovery &&
this._config.peerDiscovery[DiscoveryService.tag]) {
config = { ...config, ...this._config.peerDiscovery[DiscoveryService.tag] }
}
if (config.enabled) {
let discoveryService
if (typeof DiscoveryService === 'function') {
discoveryService = new DiscoveryService(Object.assign({}, config, { peerInfo: this.peerInfo }))
} else {
discoveryService = DiscoveryService
}
discoveryService.on('peer', this._peerDiscovered)
this._discovery.push(discoveryService)
}
}
each(this._discovery, (d, cb) => {
d.start(cb)
}, callback)
}
}
module.exports = Node