refactor(async): add dialer and upgrader (#462)

* chore(deps): update connection and multistream

* feat: add basic dial support for addresses and peers

* test: automatically require all node test files

* fix: dont catch and log in the wrong place

* test: add direct spec test

fix: improve dial error consistency

* feat: add dial timeouts and concurrency

Queue timeouts will result in aborts of the dials

* chore: fix linting

* test: verify dialer defaults

* feat: add initial upgrader

* fix: add more test coverage and fix bugs

* feat: libp2p creates the upgrader

* feat: hook up handle to the upgrader

* feat: hook up the dialer to libp2p

test: add node dialer libp2p tests

* feat: add connection listeners to upgrader

* feat: emit connect and disconnect events

* chore: use libp2p-interfaces

* fix: address review feedback

* fix: correct import

* refactor: dedupe connection creation code
This commit is contained in:
Jacob Heun
2019-10-21 16:53:58 +02:00
parent 10c8553c58
commit af364b070b
20 changed files with 1481 additions and 142 deletions

View File

@ -13,20 +13,22 @@ const nextTick = require('async/nextTick')
const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
const Ping = require('./ping')
const ConnectionManager = require('./connection-manager')
const { emitFirst } = require('./util')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const { getPeerInfoRemote } = require('./get-peer-info')
const validateConfig = require('./config').validate
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const notStarted = (action, state) => {
return errCode(
@ -61,64 +63,49 @@ class Libp2p extends EventEmitter {
// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch.on('error', (...args) => this.emit('error', ...args))
this.stats = this._switch.stats
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.emit('peer:disconnect', peerInfo)
}
})
// Setup the transport manager
this.transportManager = new TransportManager({
libp2p: this,
// TODO: set the actual upgrader
upgrader: {
upgradeInbound: (maConn) => maConn,
upgradeOutbound: (maConn) => maConn
},
// TODO: Route incoming connections to a multiplex protocol router
onConnection: () => {}
upgrader: this.upgrader
})
this._modules.transport.forEach((Transport) => {
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
})
// Attach stream multiplexers
if (this._modules.streamMuxer) {
const muxers = this._modules.streamMuxer
muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer))
// If muxer exists
// we can use Identify
this._switch.connection.reuse()
// we can use Relay for listening/dialing
this._switch.connection.enableCircuitRelay(this._config.relay)
// Received incomming dial and muxer upgrade happened,
// reuse this muxed connection
this._switch.on('peer-mux-established', (peerInfo) => {
this.emit('peer:connect', peerInfo)
})
this._switch.on('peer-mux-closed', (peerInfo) => {
this.emit('peer:disconnect', peerInfo)
})
}
// Events for anytime connections are created/removed
this._switch.on('connection:start', (peerInfo) => {
this.emit('connection:start', peerInfo)
})
this._switch.on('connection:end', (peerInfo) => {
this.emit('connection:end', peerInfo)
})
// Attach crypto channels
if (this._modules.connEncryption) {
const cryptos = this._modules.connEncryption
cryptos.forEach((crypto) => {
this._switch.connection.crypto(crypto.tag, crypto.encrypt)
this.upgrader.cryptos.set(crypto.tag, crypto)
})
}
// Attach stream multiplexers
if (this._modules.streamMuxer) {
const muxers = this._modules.streamMuxer
muxers.forEach((muxer) => {
this.upgrader.muxers.set(muxer.multicodec, muxer)
})
}
this.dialer = new Dialer({
transportManager: this.transportManager
})
// Attach private network protector
if (this._modules.connProtector) {
this._switch.protector = this._modules.connProtector
@ -153,7 +140,8 @@ class Libp2p extends EventEmitter {
this.state = new FSM('STOPPED', {
STOPPED: {
start: 'STARTING',
stop: 'STOPPED'
stop: 'STOPPED',
done: 'STOPPED'
},
STARTING: {
done: 'STARTED',
@ -175,7 +163,6 @@ class Libp2p extends EventEmitter {
})
this.state.on('STOPPING', () => {
log('libp2p is stopping')
this._onStopping()
})
this.state.on('STARTED', () => {
log('libp2p has started')
@ -201,7 +188,7 @@ class Libp2p extends EventEmitter {
this._peerDiscovered = this._peerDiscovered.bind(this)
// promisify all instance methods
;['start', 'stop', 'dial', 'dialProtocol', 'dialFSM', 'hangUp', 'ping'].forEach(method => {
;['start', 'hangUp', 'ping'].forEach(method => {
this[method] = promisify(this[method], { context: this })
})
}
@ -234,13 +221,21 @@ class Libp2p extends EventEmitter {
/**
* Stop the libp2p node by closing its listeners and open connections
*
* @param {function(Error)} callback
* @async
* @returns {void}
*/
stop (callback = () => {}) {
emitFirst(this, ['error', 'stop'], callback)
async stop () {
this.state('stop')
try {
await this.transportManager.close()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
this.state('done')
}
isStarted () {
@ -252,11 +247,12 @@ class Libp2p extends EventEmitter {
* peer will be added to the nodes `PeerBook`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {function(Error)} callback
* @returns {void}
* @param {object} options
* @param {AbortSignal} [options.signal]
* @returns {Promise<Connection>}
*/
dial (peer, callback) {
this.dialProtocol(peer, null, callback)
dial (peer, options) {
return this.dialProtocol(peer, null, options)
}
/**
@ -264,50 +260,28 @@ class Libp2p extends EventEmitter {
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* and the `Connection` will be sent in the callback
*
* @async
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {string} protocol
* @param {function(Error, Connection)} callback
* @returns {void}
* @param {string[]|string} protocols
* @param {object} options
* @param {AbortSignal} [options.signal]
* @returns {Promise<Connection|*>}
*/
dialProtocol (peer, protocol, callback) {
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
async dialProtocol (peer, protocols, options) {
let connection
if (multiaddr.isMultiaddr(peer)) {
connection = await this.dialer.connectToMultiaddr(peer, options)
} else {
peer = await getPeerInfoRemote(peer, this)
connection = await this.dialer.connectToPeer(peer, options)
}
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
}
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.dial(peerInfo, protocol, callback)
}, callback)
}
/**
* Similar to `dial` and `dialProtocol`, but the callback will contain a
* Connection State Machine.
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {string} protocol
* @param {function(Error, ConnectionFSM)} callback
* @returns {void}
*/
dialFSM (peer, protocol, callback) {
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
}
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.dialFSM(peerInfo, protocol, callback)
}, callback)
return connection
}
/**
@ -342,12 +316,28 @@ class Libp2p extends EventEmitter {
}, callback)
}
handle (protocol, handlerFunc, matchFunc) {
this._switch.handle(protocol, handlerFunc, matchFunc)
/**
* Registers the `handler` for each protocol
* @param {string[]|string} protocols
* @param {function({ stream:*, protocol:string })} handler
*/
handle (protocols, handler) {
protocols = Array.isArray(protocols) ? protocols : [protocols]
protocols.forEach(protocol => {
this.upgrader.protocols.set(protocol, handler)
})
}
unhandle (protocol) {
this._switch.unhandle(protocol)
/**
* Removes the handler for each protocol. The protocol
* will no longer be supported on streams.
* @param {string[]|string} protocols
*/
unhandle (protocols) {
protocols = Array.isArray(protocols) ? protocols : [protocols]
protocols.forEach(protocol => {
this.upgrader.protocols.delete(protocol)
})
}
async _onStarting () {
@ -373,21 +363,6 @@ class Libp2p extends EventEmitter {
this.state('done')
}
async _onStopping () {
// Start parallel tasks
try {
await this.transportManager.close()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
// libp2p has stopped
this.state('done')
}
/**
* Handles discovered peers. Each discovered peer will be emitted via
* the `peer:discovery` event. If auto dial is enabled for libp2p