refactor: core async (#478)

* refactor: cleanup core

test: auto dial on startup

* fix: make hangup work properly

* chore: fix lint

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>
This commit is contained in:
Jacob Heun
2019-11-19 17:44:45 -06:00
parent 3c79d33db9
commit 86b275a0d3
7 changed files with 162 additions and 179 deletions

View File

@ -1,21 +1,13 @@
'use strict'
const FSM = require('fsm-event')
const { EventEmitter } = require('events')
const debug = require('debug')
const log = debug('libp2p')
log.error = debug('libp2p:error')
const errCode = require('err-code')
const promisify = require('promisify-es6')
const each = require('async/each')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
const Ping = require('./ping')
const { emitFirst } = require('./util')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
@ -34,20 +26,11 @@ const {
multicodecs: IDENTIFY_PROTOCOLS
} = require('./identify')
const notStarted = (action, state) => {
return errCode(
new Error(`libp2p cannot ${action} when not started; state is ${state}`),
codes.ERR_NODE_NOT_STARTED
)
}
/**
* @fires Libp2p#error Emitted when an error occurs
* @fires Libp2p#peer:connect Emitted when a peer is connected to this node
* @fires Libp2p#peer:disconnect Emitted when a peer disconnects from this node
* @fires Libp2p#peer:discovery Emitted when a peer is discovered
* @fires Libp2p#start Emitted when the node and its services has started
* @fires Libp2p#stop Emitted when the node and its services has stopped
*/
class Libp2p extends EventEmitter {
constructor (_options) {
@ -67,9 +50,6 @@ class Libp2p extends EventEmitter {
this.peerStore = new PeerStore()
// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
@ -158,63 +138,7 @@ class Libp2p extends EventEmitter {
this.contentRouting = contentRouting(this)
this.dht = dht(this)
// Mount default protocols
Ping.mount(this._switch)
this.state = new FSM('STOPPED', {
STOPPED: {
start: 'STARTING',
stop: 'STOPPED',
done: 'STOPPED'
},
STARTING: {
done: 'STARTED',
abort: 'STOPPED',
stop: 'STOPPING'
},
STARTED: {
stop: 'STOPPING',
start: 'STARTED'
},
STOPPING: {
stop: 'STOPPING',
done: 'STOPPED'
}
})
this.state.on('STARTING', () => {
log('libp2p is starting')
this._onStarting()
})
this.state.on('STOPPING', () => {
log('libp2p is stopping')
})
this.state.on('STARTED', () => {
log('libp2p has started')
this.emit('start')
})
this.state.on('STOPPED', () => {
log('libp2p has stopped')
this.emit('stop')
})
this.state.on('error', (err) => {
log.error(err)
this.emit('error', err)
})
// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
for (const peerInfo of this.peerStore.peers) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
}
})
this._peerDiscovered = this._peerDiscovered.bind(this)
// promisify all instance methods
;['start', 'hangUp', 'ping'].forEach(method => {
this[method] = promisify(this[method], { context: this })
})
}
/**
@ -233,14 +157,23 @@ class Libp2p extends EventEmitter {
}
/**
* Starts the libp2p node and all sub services
* Starts the libp2p node and all its subsystems
*
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
start (callback = () => {}) {
emitFirst(this, ['error', 'start'], callback)
this.state('start')
async start () {
log('libp2p is starting')
try {
await this._onStarting()
await this._onDidStart()
log('libp2p has started')
} catch (err) {
this.emit('error', err)
log.error('An error occurred starting libp2p', err)
await this.stop()
throw err
}
this._isStarted = true
}
/**
@ -249,23 +182,22 @@ class Libp2p extends EventEmitter {
* @returns {void}
*/
async stop () {
this.state('stop')
log('libp2p is stopping')
try {
this.pubsub && await this.pubsub.stop()
await this.transportManager.close()
await this._switch.stop()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
this.state('done')
log('libp2p has stopped')
}
isStarted () {
return this.state ? this.state._state === 'STARTED' : false
return this._isStarted
}
/**
@ -319,36 +251,30 @@ class Libp2p extends EventEmitter {
}
/**
* Disconnects from the given peer
* Disconnects all connections to the given `peer`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
* @param {function(Error)} callback
* @returns {void}
* @param {PeerId} peer The PeerId to close connections to
* @returns {Promise<void>}
*/
hangUp (peer, callback) {
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.hangUp(peerInfo, callback)
}, callback)
hangUp (peer) {
return Promise.all(
this.registrar.connections.get(peer.toB58String()).map(connection => {
return connection.close()
})
)
}
/**
* Pings the provided peer
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
* @param {function(Error, Ping)} callback
* @returns {void}
*/
ping (peer, callback) {
if (!this.isStarted()) {
return callback(notStarted('ping', this.state._state))
}
getPeerInfoRemote(peer, this)
.then(peerInfo => {
callback(null, new Ping(this._switch, peerInfo))
}, callback)
}
// TODO: Update ping
// /**
// * Pings the provided peer
// *
// * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
// * @returns {Promise<Ping>}
// */
// ping (peer) {
// const peerInfo = await getPeerInfoRemote(peer, this)
// return new Ping(this._switch, peerInfo)
// }
/**
* Registers the `handler` for each protocol
@ -379,32 +305,25 @@ class Libp2p extends EventEmitter {
}
async _onStarting () {
if (!this._modules.transport) {
this.emit('error', new Error('no transports were present'))
return this.state('abort')
}
const multiaddrs = this.peerInfo.multiaddrs.toArray()
// Start parallel tasks
const tasks = [
this.transportManager.listen(multiaddrs)
]
await this.transportManager.listen(multiaddrs)
if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}
}
try {
await Promise.all(tasks)
} catch (err) {
log.error(err)
this.emit('error', err)
return this.state('stop')
/**
* Called when libp2p has started and before it returns
* @private
*/
_onDidStart () {
// Once we start, emit and dial any peers we may have already discovered
for (const peerInfo of this.peerStore.peers.values()) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
}
// libp2p has started
this.state('done')
}
/**
@ -435,15 +354,18 @@ class Libp2p extends EventEmitter {
* @private
* @param {PeerInfo} peerInfo
*/
_maybeConnect (peerInfo) {
// If auto dialing is on, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !peerInfo.isConnected()) {
async _maybeConnect (peerInfo) {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.connections.get(peerInfo)) {
const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > Object.keys(this._switch.connection.connections).length) {
// TODO: This does not account for multiple connections to a peer
if (minPeers > this.registrar.connections.size) {
log('connecting to discovered peer')
this._switch.dialer.connect(peerInfo, (err) => {
err && log.error('could not connect to discovered peer', err)
})
try {
await this.dialer.connectToPeer(peerInfo)
} catch (err) {
log.error('could not connect to discovered peer', err)
}
}
}
}
@ -452,9 +374,9 @@ class Libp2p extends EventEmitter {
* Initializes and starts peer discovery services
*
* @private
* @param {function(Error)} callback
* @returns {Promise<void>}
*/
_setupPeerDiscovery (callback) {
_setupPeerDiscovery () {
for (const DiscoveryService of this._modules.peerDiscovery) {
let config = {
enabled: true // on by default
@ -480,9 +402,7 @@ class Libp2p extends EventEmitter {
}
}
each(this._discovery, (d, cb) => {
d.start(cb)
}, callback)
return this._discovery.map(d => d.start())
}
}