refactor: async routing (#489)

* feat: async routing

* chore: put dht extra api commands under content routing

* chore: add default option to createPeerInfo

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>

* chore: address review

* chore: rm dlv
This commit is contained in:
Vasco Santos
2019-12-01 22:54:59 +01:00
committed by Jacob Heun
parent f77ce39484
commit a020db183a
24 changed files with 805 additions and 228 deletions

View File

@ -1,16 +1,18 @@
'use strict'
const tryEach = require('async/tryEach')
const parallel = require('async/parallel')
const errCode = require('err-code')
const promisify = require('promisify-es6')
const { messages, codes } = require('./errors')
const all = require('async-iterator-all')
const pAny = require('p-any')
module.exports = (node) => {
const routers = node._modules.contentRouting || []
const dht = node._dht
// If we have the dht, make it first
if (node._dht) {
routers.unshift(node._dht)
if (dht) {
routers.unshift(dht)
}
return {
@ -19,66 +21,93 @@ module.exports = (node) => {
* Once a content router succeeds, iteration will stop.
*
* @param {CID} key The CID key of the content to find
* @param {object} options
* @param {number} options.maxTimeout How long the query should run
* @param {number} options.maxNumProviders - maximum number of providers to find
* @param {function(Error, Result<Array>)} callback
* @returns {void}
* @param {object} [options]
* @param {number} [options.timeout] How long the query should run
* @param {number} [options.maxNumProviders] - maximum number of providers to find
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
} else if (typeof options === 'number') { // This can be deprecated in a future release
options = {
maxTimeout: options
}
}
async * findProviders (key, options) {
if (!routers.length) {
return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}
const tasks = routers.map((router) => {
return (cb) => router.findProviders(key, options, (err, results) => {
if (err) {
return cb(err)
}
const result = await pAny(
routers.map(async (router) => {
const provs = await all(router.findProviders(key, options))
// If we don't have any results, we need to provide an error to keep trying
if (!results || Object.keys(results).length === 0) {
return cb(errCode(new Error('not found'), 'NOT_FOUND'), null)
if (!provs || !provs.length) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
cb(null, results)
return provs
})
})
)
tryEach(tasks, (err, results) => {
if (err && err.code !== 'NOT_FOUND') {
return callback(err)
}
results = results || []
callback(null, results)
})
}),
for (const pInfo of result) {
yield pInfo
}
},
/**
* Iterates over all content routers in parallel to notify it is
* a provider of the given key.
*
* @param {CID} key The CID key of the content to find
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
provide: promisify((key, callback) => {
async provide (key) { // eslint-disable-line require-await
if (!routers.length) {
return callback(errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}
parallel(routers.map((router) => {
return (cb) => router.provide(key, cb)
}), callback)
})
return Promise.all(routers.map((router) => router.provide(key)))
},
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
async put (key, value, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.put(key, value, options)
},
/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
async get (key, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.get(key, options)
},
/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
async getMany (key, nVals, options) { // eslint-disable-line require-await
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.getMany(key, nVals, options)
}
}
}

View File

@ -1,72 +0,0 @@
'use strict'
const errCode = require('err-code')
const { messages, codes } = require('./errors')
module.exports = (node, DHT, config) => {
const dht = new DHT({
dialer: node.dialer,
peerInfo: node.peerInfo,
peerStore: node.peerStore,
registrar: node.registrar,
datastore: this.datastore,
...config
})
return {
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put: (key, value, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.put(key, value, options)
},
/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get: (key, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.get(key, options)
},
/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
getMany: (key, nVals, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return dht.getMany(key, nVals, options)
},
_dht: dht,
start: () => dht.start(),
stop: () => dht.stop()
}
}

View File

@ -10,7 +10,6 @@ const multiaddr = require('multiaddr')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
@ -124,7 +123,15 @@ class Libp2p extends EventEmitter {
// dht provided components (peerRouting, contentRouting, dht)
if (this._modules.dht) {
this._dht = dht(this, this._modules.dht, this._config.dht)
const DHT = this._modules.dht
this._dht = new DHT({
dialer: this.dialer,
peerInfo: this.peerInfo,
peerStore: this.peerStore,
registrar: this.registrar,
datastore: this.datastore,
...this._config.dht
})
}
// start pubsub
@ -333,7 +340,7 @@ class Libp2p extends EventEmitter {
// TODO: this should be modified once random-walk is used as
// the other discovery modules
this._dht._dht.on('peer', this._peerDiscovered)
this._dht.on('peer', this._peerDiscovered)
}
}

View File

@ -1,8 +1,7 @@
'use strict'
const tryEach = require('async/tryEach')
const errCode = require('err-code')
const promisify = require('promisify-es6')
const pAny = require('p-any')
module.exports = (node) => {
const routers = node._modules.peerRouting || []
@ -17,43 +16,25 @@ module.exports = (node) => {
* Iterates over all peer routers in series to find the given peer.
*
* @param {String} id The id of the peer to find
* @param {object} options
* @param {number} options.maxTimeout How long the query should run
* @param {function(Error, Result<Array>)} callback
* @returns {void}
* @param {object} [options]
* @param {number} [options.timeout] How long the query should run
* @returns {Promise<PeerInfo>}
*/
findPeer: promisify((id, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
findPeer: async (id, options) => { // eslint-disable-line require-await
if (!routers.length) {
callback(errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE'))
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}
const tasks = routers.map((router) => {
return (cb) => router.findPeer(id, options, (err, result) => {
if (err) {
return cb(err)
}
return pAny(routers.map(async (router) => {
const result = await router.findPeer(id, options)
// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
return cb(errCode(new Error('not found'), 'NOT_FOUND'), null)
}
cb(null, result)
})
})
tryEach(tasks, (err, results) => {
if (err) {
return callback(err)
// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
throw errCode(new Error('not found'), 'NOT_FOUND')
}
results = results || []
callback(null, results)
})
})
return result
}))
}
}
}