js-libp2p/src/dialer/index.js

193 lines
5.4 KiB
JavaScript
Raw Normal View History

'use strict'
const multiaddr = require('multiaddr')
const errCode = require('err-code')
const TimeoutController = require('timeout-abort-controller')
2019-12-04 15:59:01 +01:00
const anySignal = require('any-signal')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
2019-12-04 23:04:43 +01:00
const { DialRequest } = require('./dial-request')
const getPeer = require('../get-peer')
2019-12-04 23:04:43 +01:00
const { codes } = require('../errors')
const {
2019-12-03 10:28:52 +01:00
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS,
MAX_PER_PEER_DIALS
2019-12-04 23:04:43 +01:00
} = require('../constants')
class Dialer {
/**
* @constructor
* @param {object} options
* @param {TransportManager} options.transportManager
* @param {Peerstore} peerStore
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
*/
constructor ({
transportManager,
peerStore,
concurrency = MAX_PARALLEL_DIALS,
2019-12-03 10:28:52 +01:00
timeout = DIAL_TIMEOUT,
perPeerLimit = MAX_PER_PEER_DIALS
}) {
this.transportManager = transportManager
this.peerStore = peerStore
this.concurrency = concurrency
this.timeout = timeout
2019-12-03 10:28:52 +01:00
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
this._pendingDials = new Map()
}
/**
* Clears any pending dials
*/
destroy () {
for (const dial of this._pendingDials.values()) {
try {
dial.controller.abort()
} catch (err) {
log.error(err)
}
}
this._pendingDials.clear()
}
2019-12-03 10:28:52 +01:00
/**
* Connects to a given `peer` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
2019-12-03 10:28:52 +01:00
*
* @param {PeerId|Multiaddr|string} peer The peer to dial
2019-12-03 10:28:52 +01:00
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
const dialTarget = this._createDialTarget(peer)
if (!dialTarget.addrs.length) {
throw errCode(new Error('The dial request has no addresses'), codes.ERR_NO_VALID_ADDRESSES)
}
const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options)
2019-12-06 17:45:29 +01:00
try {
const connection = await pendingDial.promise
log('dial succeeded to %s', dialTarget.id)
return connection
} catch (err) {
2019-12-04 16:59:38 +01:00
// Error is a timeout
if (pendingDial.controller.signal.aborted) {
2019-12-04 17:32:11 +01:00
err.code = codes.ERR_TIMEOUT
2019-12-04 16:59:38 +01:00
}
2019-12-03 10:28:52 +01:00
log.error(err)
throw err
2019-12-06 17:45:29 +01:00
} finally {
pendingDial.destroy()
}
}
/**
* @typedef DialTarget
* @property {string} id
* @property {Multiaddr[]} addrs
*/
/**
* Creates a DialTarget. The DialTarget is used to create and track
* the DialRequest to a given peer.
* If a multiaddr is received it should be the first address attempted.
* @private
* @param {PeerId|Multiaddr|string} peer A PeerId or Multiaddr
* @returns {DialTarget}
*/
_createDialTarget (peer) {
const { id, multiaddrs } = getPeer(peer)
if (multiaddrs) {
this.peerStore.addressBook.add(id, multiaddrs)
}
let addrs = this.peerStore.addressBook.getMultiaddrsForPeer(id)
// If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too.
if (multiaddr.isMultiaddr(peer)) {
addrs = addrs.filter((addr) => !peer.equals(addr))
addrs.unshift(peer)
}
return {
id: id.toB58String(),
addrs
}
}
/**
* @typedef PendingDial
* @property {DialRequest} dialRequest
* @property {TimeoutController} controller
* @property {Promise} promise
* @property {function():void} destroy
*/
/**
* Creates a PendingDial that wraps the underlying DialRequest
* @private
* @param {DialTarget} dialTarget
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {PendingDial}
*/
_createPendingDial (dialTarget, options) {
const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), codes.ERR_ALREADY_ABORTED)
return this.transportManager.dial(addr, options)
}
const dialRequest = new DialRequest({
addrs: dialTarget.addrs,
dialAction,
dialer: this
})
// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)
2019-12-03 10:28:52 +01:00
const pendingDial = {
dialRequest,
controller: timeoutController,
promise: dialRequest.run({ ...options, signal }),
destroy: () => {
timeoutController.clear()
this._pendingDials.delete(dialTarget.id)
}
}
this._pendingDials.set(dialTarget.id, pendingDial)
return pendingDial
2019-12-03 10:28:52 +01:00
}
2019-12-03 10:28:52 +01:00
getTokens (num) {
const total = Math.min(num, this.perPeerLimit, this.tokens.length)
const tokens = this.tokens.splice(0, total)
log('%d tokens request, returning %d, %d remaining', num, total, this.tokens.length)
return tokens
}
releaseToken (token) {
// Guard against duplicate releases
if (this.tokens.indexOf(token) > -1) return
2019-12-03 10:28:52 +01:00
log('token %d released', token)
this.tokens.push(token)
}
}
module.exports = Dialer