feat: add token based dialer

This commit is contained in:
Jacob Heun
2019-12-03 10:28:52 +01:00
parent 2329ef3ea3
commit e445a17278
14 changed files with 611 additions and 138 deletions

View File

@ -6,6 +6,7 @@ module.exports = {
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
PER_PEER_LIMIT: 4, // Allowed parallel dials per DialRequest
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20

View File

@ -1,19 +1,20 @@
'use strict'
const nextTick = require('async/nextTick')
const multiaddr = require('multiaddr')
const errCode = require('err-code')
const { default: PQueue } = require('p-queue')
const AbortController = require('abort-controller')
const delay = require('delay')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
const PeerId = require('peer-id')
const { DialRequest } = require('./dialer/dial-request')
const { anySignal } = require('./util')
const { codes } = require('./errors')
const {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS,
DIAL_TIMEOUT
PER_PEER_LIMIT
} = require('./constants')
class Dialer {
@ -29,74 +30,76 @@ class Dialer {
transportManager,
peerStore,
concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT
timeout = DIAL_TIMEOUT,
perPeerLimit = PER_PEER_LIMIT
}) {
this.transportManager = transportManager
this.peerStore = peerStore
this.concurrency = concurrency
this.timeout = timeout
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
/**
* @property {IdentifyService}
*/
this._identifyService = null
}
set identifyService (service) {
this._identifyService = service
}
/**
* @type {IdentifyService}
*/
get identifyService () {
return this._identifyService
this.releaseToken = this.releaseToken.bind(this)
}
/**
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
* dialed, it will be used for encryption verification.
*
* @async
* @param {Multiaddr} addr The address to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddr (addr, options = {}) {
connectToMultiaddr (addr, options = {}) {
addr = multiaddr(addr)
let conn
let controller
if (!options.signal) {
controller = new AbortController()
options.signal = controller.signal
}
return this.connectToMultiaddrs([addr], options)
}
/**
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
* include the id of the peer being dialed, it will be used for encryption verification.
*
* @param {Array<Multiaddr>} addrs
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddrs (addrs, options = {}) {
const dialAction = (addr, options) => this.transportManager.dial(addr, options)
const dialRequest = new DialRequest({
addrs,
dialAction,
dialer: this
})
// Combine the timeout signal and options.signal, if provided
const timeoutController = new AbortController()
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)
const timeoutPromise = delay.reject(this.timeout, {
value: errCode(new Error('Dial timed out'), codes.ERR_TIMEOUT)
})
try {
conn = await this.queue.add(() => this.transportManager.dial(addr, options))
// Race the dial request and the timeout
const dialResult = await Promise.race([
dialRequest.run({
...options,
signal
}),
timeoutPromise
])
timeoutPromise.clear()
return dialResult
} catch (err) {
if (err.name === 'TimeoutError') {
controller.abort()
err.code = codes.ERR_TIMEOUT
}
log.error('Error dialing address %s,', addr, err)
log.error(err)
timeoutController.abort()
throw err
}
// Perform a delayed Identify handshake
if (this.identifyService) {
nextTick(async () => {
try {
await this.identifyService.identify(conn, conn.remotePeer)
} catch (err) {
log.error(err)
}
})
}
return conn
}
/**
@ -104,31 +107,57 @@ class Dialer {
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @async
* @param {PeerInfo|PeerId} peer The remote peer to dial
* @param {PeerId} peerId The remote peer id to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
if (PeerId.isPeerId(peer)) {
peer = this.peerStore.get(peer.toB58String())
}
connectToPeer (peerId, options = {}) {
const addrs = this.peerStore.multiaddrsForPeer(peerId)
const addrs = peer.multiaddrs.toArray()
for (const addr of addrs) {
try {
return await this.connectToMultiaddr(addr, options)
} catch (_) {
// The error is already logged, just move to the next addr
continue
}
}
// TODO: ensure the peer id is on the multiaddr
const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED)
log.error(err)
throw err
return this.connectToMultiaddrs(addrs, options)
}
getTokens (num) {
const total = Math.min(num, this.perPeerLimit, this.tokens.length)
const tokens = this.tokens.splice(0, total)
log('%d tokens request, returning %d, %d remaining', num, total, this.tokens.length)
return tokens
}
releaseToken (token) {
log('token %d released', token)
this.tokens.push(token)
}
}
module.exports = Dialer
// class ActionLimiter {
// constructor(actions, options = {}) {
// this.actions = actions
// this.limit = options.limit || 4
// this.controller = options.controller || new AbortController()
// }
// async abort () {
// this.controller.abort()
// }
// async run () {
// const limit = pLimit(this.limit)
// let result
// try {
// result = await pAny(this.actions.map(action => limit(action)))
// } catch (err) {
// console.log(err)
// if (!err.code) err.code = codes.ERR_CONNECTION_FAILED
// log.error(err)
// throw err
// } finally {
// console.log('RES', result)
// this.controller.abort()
// }
// return result
// }
// }

197
src/dialer/dial-request.js Normal file
View File

@ -0,0 +1,197 @@
'use strict'
const AbortController = require('abort-controller')
const AggregateError = require('aggregate-error')
const pDefer = require('p-defer')
const debug = require('debug')
const log = debug('libp2p:dialer:request')
log.error = debug('libp2p:dialer:request:error')
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const { anySignal } = require('../util')
const { TokenHolder } = require('./token-holder')
class DialRequest {
/**
*
* @param {object} options
* @param {Multiaddr[]} options.addrs
* @param {TransportManager} options.transportManager
* @param {Dialer} options.dialer
*/
constructor ({
addrs,
dialAction,
dialer
}) {
this.addrs = addrs
this.dialer = dialer
this.dialAction = dialAction
}
/**
* @async
* @param {object} options
* @param {AbortSignal} options.signal An AbortController signal
* @param {number} options.timeout The max dial time for each request
* @returns {Connection}
*/
async run (options) {
// Determine how many tokens we need
const tokensWanted = Math.min(this.addrs.length, this.dialer.perPeerLimit)
// Get the tokens
const tokens = this.dialer.getTokens(tokensWanted)
// If no tokens are available, throw
if (tokens.length < 1) {
throw Object.assign(new Error('No dial tokens available'), { code: 'ERR_NO_DIAL_TOKENS' })
}
// For every token, run a multiaddr dial
// If there are tokens left, release them
// If there are multiaddrs left, wait for tokens to finish
const th = new TokenHolder(tokens, this.dialer.releaseToken)
// Create the dial functions
const dials = this.addrs.map(addr => {
return () => this._abortableDial(addr, options)
})
const dialResolver = new DialResolver()
while (dials.length > 0) {
if (dialResolver.finished) break
// Wait for the next available token
const token = await th.getToken()
const dial = dials.shift()
dialResolver.add(dial, () => th.releaseToken(token))
}
// Start giving back the tokens
th.drain()
// Flush all the dials to get the final response
return dialResolver.flush()
}
/**
* @private
* @param {Multiaddr} addr
* @param {object} options
* @param {AbortSignal} options.signal An AbortController signal
* @param {number} options.timeout The max dial time for each request
* @returns {{abort: function(), promise: Promise<Connection>}} An AbortableDial
*/
_abortableDial (addr, options) {
log('starting dial to %s', addr)
const controller = new AbortController()
const signals = [controller.signal]
options.signal && signals.push(options.signal)
const signal = anySignal([controller.signal, options.signal])
const promise = this.dialAction(addr, { signal, timeout: options.timeout })
return {
abort: () => controller.abort(),
promise
}
}
}
class DialResolver {
constructor () {
this.dials = new Set()
this.errors = []
this.finished = false
this.didFlush = false
this._waiting = null
}
/**
* Adds a dial function to the resolver. The function will be immediately
* executed and its resolution tracked.
* @async
* @param {function()} dial A function that returns an AbortableDial
* @param {function()} [finallyHandler] Called when the dial resolves or rejects
*/
async add (dial, finallyHandler) {
if (this.finished) return
const abortableDial = dial()
this.dials.add(abortableDial)
try {
this._onResolve(await abortableDial.promise)
} catch (err) {
this._onReject(err)
} finally {
this._onFinally(abortableDial)
finallyHandler && finallyHandler()
}
}
/**
* Called when a dial resolves
* @param {Connection} result
*/
_onResolve (result) {
this.result = result
}
/**
* Called when a dial rejects
* @param {Error} err
*/
_onReject (err) {
if (err.code === AbortError.code) return
this.errors.push(err)
}
_onFinally (dial) {
this.dials.delete(dial)
// If we have a result, or all dials have finished
if (this.result || (this._waiting && this.dials.size === 0)) {
this._onFinish()
}
}
/**
* Called when dialing is completed, which means one of:
* 1. One dial succeeded
* 2. All dials failed
* 3. All dials were aborted
* @private
*/
_onFinish () {
this.finished = true
// Abort all remaining dials
for (const abortableDial of this.dials) {
abortableDial.abort()
}
this.dials.clear()
// Flush must be called
if (!this._waiting) return
// If we have a result, or an abort occurred (no errors and no result)
if (this.result || this.errors.length === 0) {
this._waiting.resolve(this.result)
} else {
this._waiting.reject(new AggregateError(this.errors))
}
}
/**
* Flushes any remaining dials and resolves the first
* successful `Connection`. Flush should be called after all
* dials have been added.
* @returns {Promise<Connection>}
*/
flush () {
if (this.finished) {
if (this.result) {
return Promise.resolve(this.result)
} else {
return Promise.reject(new AggregateError(this.errors))
}
}
this._waiting = pDefer()
return this._waiting.promise
}
}
module.exports.DialResolver = DialResolver
module.exports.DialRequest = DialRequest

View File

@ -0,0 +1,63 @@
'use strict'
/**
* @class TokenHolder
* @example
* const th = new TokenHolder(tokens, dialer.releaseToken)
* for (const action of actions) {
* const token = await th.getToken()
* action(token).then(() => th.releaseToken(token))
* }
*
* await th.drain()
*/
class TokenHolder {
/**
* @param {Array<*>} tokens Tokens to track
* @param {function(*)} release Called when releasing control of the tokens
*/
constructor (tokens, release) {
this.originalTokens = tokens
this.tokens = [...tokens]
this._release = release
}
/**
* Resolves a token once once is available. Once the token is no
* longer needed it MUST be release with `releaseToken()`.
* @returns {Promise<*>}
*/
getToken () {
if (this.tokens.length) return Promise.resolve(this.tokens.shift())
return new Promise(resolve => {
const _push = this.tokens.push
this.tokens.push = (token) => {
this.tokens.push = _push
resolve(token)
}
})
}
/**
* Makes the token available via `getToken()`
* @param {*} token
*/
releaseToken (token) {
this.tokens.push(token)
}
/**
* Once tokens are no longer needed for a series of actions,
* drain will release them to the owner via `this._release()`
*/
async drain () {
let drained = 0
while (drained < this.originalTokens.length) {
this._release(await this.getToken())
// Remove the token
drained++
}
}
}
module.exports.TokenHolder = TokenHolder

View File

@ -57,6 +57,12 @@ class Libp2p extends EventEmitter {
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
// Run identify for every connection
if (this.identifyService) {
this.identifyService.identify(connection, connection.remotePeer)
.catch(log.error)
}
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
@ -104,12 +110,12 @@ class Libp2p extends EventEmitter {
})
// Add the identify service since we can multiplex
this.dialer.identifyService = new IdentifyService({
this.identifyService = new IdentifyService({
registrar: this.registrar,
peerInfo: this.peerInfo,
protocols: this.upgrader.protocols
})
this.handle(Object.values(IDENTIFY_PROTOCOLS), this.dialer.identifyService.handleMessage)
this.handle(Object.values(IDENTIFY_PROTOCOLS), this.identifyService.handleMessage)
}
// Attach private network protector
@ -236,7 +242,7 @@ class Libp2p extends EventEmitter {
connection = await this.dialer.connectToMultiaddr(peer, options)
} else {
peer = await getPeerInfoRemote(peer, this)
connection = await this.dialer.connectToPeer(peer, options)
connection = await this.dialer.connectToPeer(peer.id, options)
}
const peerInfo = getPeerInfo(connection.remotePeer)
@ -293,8 +299,8 @@ class Libp2p extends EventEmitter {
})
// Only push if libp2p is running
if (this.isStarted()) {
this.dialer.identifyService.pushToPeerStore(this.peerStore)
if (this.isStarted() && this.identifyService) {
this.identifyService.pushToPeerStore(this.peerStore)
}
}
@ -310,8 +316,8 @@ class Libp2p extends EventEmitter {
})
// Only push if libp2p is running
if (this.isStarted()) {
this.dialer.identifyService.pushToPeerStore(this.peerStore)
if (this.isStarted() && this.identifyService) {
this.identifyService.pushToPeerStore(this.peerStore)
}
}

View File

@ -217,6 +217,16 @@ class PeerStore extends EventEmitter {
protocols: Array.from(peerInfo.protocols)
})
}
/**
* Returns the known multiaddrs for a given `PeerId`
* @param {PeerId} peerId
* @returns {Array<Multiaddr>}
*/
multiaddrsForPeer (peerId) {
const peerInfo = this.get(peerId.toB58String())
return peerInfo.multiaddrs.toArray()
}
}
module.exports = PeerStore

View File

@ -1,5 +1,7 @@
'use strict'
const AbortController = require('abort-controller')
/**
* Converts BufferList messages to Buffers
* @param {*} source
@ -13,4 +15,34 @@ function toBuffer (source) {
})()
}
/**
* Takes an array of AbortSignals and returns a single signal.
* If any signals are aborted, the returned signal will be aborted.
* @param {Array<AbortSignal>} signals
* @returns {AbortSignal}
*/
function anySignal (signals) {
const controller = new AbortController()
function onAbort () {
controller.abort()
// Cleanup
for (const signal of signals) {
signal.removeEventListener('abort', onAbort)
}
}
for (const signal of signals) {
if (signal.aborted) {
onAbort()
break
}
signal.addEventListener('abort', onAbort)
}
return controller.signal
}
module.exports.toBuffer = toBuffer
module.exports.anySignal = anySignal