mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-20 04:26:31 +00:00
fix: store multiaddrs during content and peer routing queries (#865)
* fix: store provider multiaddrs during find providers Changes the behaviour of `libp2p.contentRouting.findProviders` to store the multiaddrs reported by the routers before yielding results to the caller, so when they try to dial the provider, the multiaddrs are already in the peer store's address book. Also dedupes providers reported by routers but keeps all of the addresses reported, even for duplicates. Also, also fixes a performance bug where the previous implementation would wait for any router to completely finish finding providers before sending any results to the caller. It'll now yield results as they come in which makes it much, much faster.
This commit is contained in:
@ -65,10 +65,16 @@
|
|||||||
"ipfs-utils": "^5.0.1",
|
"ipfs-utils": "^5.0.1",
|
||||||
"it-all": "^1.0.1",
|
"it-all": "^1.0.1",
|
||||||
"it-buffer": "^0.1.2",
|
"it-buffer": "^0.1.2",
|
||||||
|
"it-drain": "^1.0.3",
|
||||||
|
"it-filter": "^1.0.1",
|
||||||
|
"it-first": "^1.0.4",
|
||||||
"it-handshake": "^1.0.1",
|
"it-handshake": "^1.0.1",
|
||||||
"it-length-prefixed": "^3.0.1",
|
"it-length-prefixed": "^3.0.1",
|
||||||
|
"it-map": "^1.0.4",
|
||||||
|
"it-merge": "1.0.0",
|
||||||
"it-pipe": "^1.1.0",
|
"it-pipe": "^1.1.0",
|
||||||
"it-protocol-buffers": "^0.2.0",
|
"it-protocol-buffers": "^0.2.0",
|
||||||
|
"it-take": "1.0.0",
|
||||||
"libp2p-crypto": "^0.18.0",
|
"libp2p-crypto": "^0.18.0",
|
||||||
"libp2p-interfaces": "^0.8.1",
|
"libp2p-interfaces": "^0.8.1",
|
||||||
"libp2p-utils": "^0.2.2",
|
"libp2p-utils": "^0.2.2",
|
||||||
|
@ -1,10 +1,16 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
const { messages, codes } = require('./errors')
|
const { messages, codes } = require('../errors')
|
||||||
|
const {
|
||||||
|
storeAddresses,
|
||||||
|
uniquePeers,
|
||||||
|
requirePeers,
|
||||||
|
maybeLimitSource
|
||||||
|
} = require('./utils')
|
||||||
|
|
||||||
const all = require('it-all')
|
const merge = require('it-merge')
|
||||||
const pAny = require('p-any')
|
const { pipe } = require('it-pipe')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('peer-id')} PeerId
|
* @typedef {import('peer-id')} PeerId
|
||||||
@ -21,22 +27,21 @@ const pAny = require('p-any')
|
|||||||
class ContentRouting {
|
class ContentRouting {
|
||||||
/**
|
/**
|
||||||
* @class
|
* @class
|
||||||
* @param {import('./')} libp2p
|
* @param {import('..')} libp2p
|
||||||
*/
|
*/
|
||||||
constructor (libp2p) {
|
constructor (libp2p) {
|
||||||
this.libp2p = libp2p
|
this.libp2p = libp2p
|
||||||
this.routers = libp2p._modules.contentRouting || []
|
this.routers = libp2p._modules.contentRouting || []
|
||||||
this.dht = libp2p._dht
|
this.dht = libp2p._dht
|
||||||
|
|
||||||
// If we have the dht, make it first
|
// If we have the dht, add it to the available content routers
|
||||||
if (this.dht) {
|
if (this.dht) {
|
||||||
this.routers.unshift(this.dht)
|
this.routers.push(this.dht)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterates over all content routers in series to find providers of the given key.
|
* Iterates over all content routers in parallel to find providers of the given key.
|
||||||
* Once a content router succeeds, iteration will stop.
|
|
||||||
*
|
*
|
||||||
* @param {CID} key - The CID key of the content to find
|
* @param {CID} key - The CID key of the content to find
|
||||||
* @param {object} [options]
|
* @param {object} [options]
|
||||||
@ -44,25 +49,20 @@ class ContentRouting {
|
|||||||
* @param {number} [options.maxNumProviders] - maximum number of providers to find
|
* @param {number} [options.maxNumProviders] - maximum number of providers to find
|
||||||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
|
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
|
||||||
*/
|
*/
|
||||||
async * findProviders (key, options) {
|
async * findProviders (key, options = {}) {
|
||||||
if (!this.routers.length) {
|
if (!this.routers.length) {
|
||||||
throw errCode(new Error('No content this.routers available'), 'NO_ROUTERS_AVAILABLE')
|
throw errCode(new Error('No content this.routers available'), 'NO_ROUTERS_AVAILABLE')
|
||||||
}
|
}
|
||||||
|
|
||||||
const result = await pAny(
|
yield * pipe(
|
||||||
this.routers.map(async (router) => {
|
merge(
|
||||||
const provs = await all(router.findProviders(key, options))
|
...this.routers.map(router => router.findProviders(key, options))
|
||||||
|
),
|
||||||
if (!provs || !provs.length) {
|
(source) => storeAddresses(source, this.libp2p.peerStore),
|
||||||
throw errCode(new Error('not found'), 'NOT_FOUND')
|
(source) => uniquePeers(source),
|
||||||
}
|
(source) => maybeLimitSource(source, options.maxNumProviders),
|
||||||
return provs
|
(source) => requirePeers(source)
|
||||||
})
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for (const peer of result) {
|
|
||||||
yield peer
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
89
src/content-routing/utils.js
Normal file
89
src/content-routing/utils.js
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const errCode = require('err-code')
|
||||||
|
const filter = require('it-filter')
|
||||||
|
const map = require('it-map')
|
||||||
|
const take = require('it-take')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {import('peer-id')} PeerId
|
||||||
|
* @typedef {import('multiaddr')} Multiaddr
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the multiaddrs from every peer in the passed peer store
|
||||||
|
*
|
||||||
|
* @param {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} source
|
||||||
|
* @param {import('../peer-store')} peerStore
|
||||||
|
*/
|
||||||
|
function storeAddresses (source, peerStore) {
|
||||||
|
return map(source, (peer) => {
|
||||||
|
// ensure we have the addresses for a given peer
|
||||||
|
peerStore.addressBook.add(peer.id, peer.multiaddrs)
|
||||||
|
|
||||||
|
return peer
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filter peers by unique peer id
|
||||||
|
*
|
||||||
|
* @param {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} source
|
||||||
|
*/
|
||||||
|
function uniquePeers (source) {
|
||||||
|
/** @type Set<string> */
|
||||||
|
const seen = new Set()
|
||||||
|
|
||||||
|
return filter(source, (peer) => {
|
||||||
|
// dedupe by peer id
|
||||||
|
if (seen.has(peer.id.toString())) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
seen.add(peer.id.toString())
|
||||||
|
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Require at least `min` peers to be yielded from `source`
|
||||||
|
*
|
||||||
|
* @param {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} source
|
||||||
|
* @param {number} min
|
||||||
|
*/
|
||||||
|
async function * requirePeers (source, min = 1) {
|
||||||
|
let seen = 0
|
||||||
|
|
||||||
|
for await (const peer of source) {
|
||||||
|
seen++
|
||||||
|
|
||||||
|
yield peer
|
||||||
|
}
|
||||||
|
|
||||||
|
if (seen < min) {
|
||||||
|
throw errCode(new Error('not found'), 'NOT_FOUND')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If `max` is passed, only take that number of peers from the source
|
||||||
|
* otherwise take all the peers
|
||||||
|
*
|
||||||
|
* @param {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} source
|
||||||
|
* @param {number} [max]
|
||||||
|
*/
|
||||||
|
function maybeLimitSource (source, max) {
|
||||||
|
if (max) {
|
||||||
|
return take(source, max)
|
||||||
|
}
|
||||||
|
|
||||||
|
return source
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
storeAddresses,
|
||||||
|
uniquePeers,
|
||||||
|
requirePeers,
|
||||||
|
maybeLimitSource
|
||||||
|
}
|
@ -5,16 +5,24 @@ const log = Object.assign(debug('libp2p:peer-routing'), {
|
|||||||
error: debug('libp2p:peer-routing:err')
|
error: debug('libp2p:peer-routing:err')
|
||||||
})
|
})
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
|
const {
|
||||||
|
storeAddresses,
|
||||||
|
uniquePeers,
|
||||||
|
requirePeers
|
||||||
|
} = require('./content-routing/utils')
|
||||||
|
|
||||||
const all = require('it-all')
|
const merge = require('it-merge')
|
||||||
const pAny = require('p-any')
|
const { pipe } = require('it-pipe')
|
||||||
|
const first = require('it-first')
|
||||||
|
const drain = require('it-drain')
|
||||||
|
const filter = require('it-filter')
|
||||||
const {
|
const {
|
||||||
setDelayedInterval,
|
setDelayedInterval,
|
||||||
clearDelayedInterval
|
clearDelayedInterval
|
||||||
} = require('set-delayed-interval')
|
} = require('set-delayed-interval')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('peer-id')} PeerId
|
|
||||||
* @typedef {import('multiaddr')} Multiaddr
|
* @typedef {import('multiaddr')} Multiaddr
|
||||||
*/
|
*/
|
||||||
class PeerRouting {
|
class PeerRouting {
|
||||||
@ -27,9 +35,9 @@ class PeerRouting {
|
|||||||
this._peerStore = libp2p.peerStore
|
this._peerStore = libp2p.peerStore
|
||||||
this._routers = libp2p._modules.peerRouting || []
|
this._routers = libp2p._modules.peerRouting || []
|
||||||
|
|
||||||
// If we have the dht, make it first
|
// If we have the dht, add it to the available peer routers
|
||||||
if (libp2p._dht) {
|
if (libp2p._dht) {
|
||||||
this._routers.unshift(libp2p._dht)
|
this._routers.push(libp2p._dht)
|
||||||
}
|
}
|
||||||
|
|
||||||
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager
|
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager
|
||||||
@ -55,9 +63,8 @@ class PeerRouting {
|
|||||||
*/
|
*/
|
||||||
async _findClosestPeersTask () {
|
async _findClosestPeersTask () {
|
||||||
try {
|
try {
|
||||||
for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) {
|
// nb getClosestPeers adds the addresses to the address book
|
||||||
this._peerStore.addressBook.add(id, multiaddrs)
|
await drain(this.getClosestPeers(this._peerId.id))
|
||||||
}
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error(err)
|
log.error(err)
|
||||||
}
|
}
|
||||||
@ -71,7 +78,7 @@ class PeerRouting {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterates over all peer routers in series to find the given peer.
|
* Iterates over all peer routers in parallel to find the given peer.
|
||||||
*
|
*
|
||||||
* @param {PeerId} id - The id of the peer to find
|
* @param {PeerId} id - The id of the peer to find
|
||||||
* @param {object} [options]
|
* @param {object} [options]
|
||||||
@ -83,16 +90,20 @@ class PeerRouting {
|
|||||||
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
|
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
|
||||||
}
|
}
|
||||||
|
|
||||||
return pAny(this._routers.map(async (router) => {
|
const output = await pipe(
|
||||||
const result = await router.findPeer(id, options)
|
merge(
|
||||||
|
...this._routers.map(router => [router.findPeer(id, options)])
|
||||||
|
),
|
||||||
|
(source) => filter(source, Boolean),
|
||||||
|
(source) => storeAddresses(source, this._peerStore),
|
||||||
|
(source) => first(source)
|
||||||
|
)
|
||||||
|
|
||||||
// If we don't have a result, we need to provide an error to keep trying
|
if (output) {
|
||||||
if (!result || Object.keys(result).length === 0) {
|
return output
|
||||||
throw errCode(new Error('not found'), 'NOT_FOUND')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
throw errCode(new Error('not found'), 'NOT_FOUND')
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -108,20 +119,14 @@ class PeerRouting {
|
|||||||
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
|
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
|
||||||
}
|
}
|
||||||
|
|
||||||
const result = await pAny(
|
yield * pipe(
|
||||||
this._routers.map(async (router) => {
|
merge(
|
||||||
const peers = await all(router.getClosestPeers(key, options))
|
...this._routers.map(router => router.getClosestPeers(key, options))
|
||||||
|
),
|
||||||
if (!peers || !peers.length) {
|
(source) => storeAddresses(source, this._peerStore),
|
||||||
throw errCode(new Error('not found'), 'NOT_FOUND')
|
(source) => uniquePeers(source),
|
||||||
}
|
(source) => requirePeers(source)
|
||||||
return peers
|
|
||||||
})
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for (const peer of result) {
|
|
||||||
yield peer
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,8 @@ const CID = require('cids')
|
|||||||
const ipfsHttpClient = require('ipfs-http-client')
|
const ipfsHttpClient = require('ipfs-http-client')
|
||||||
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
|
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
|
const drain = require('it-drain')
|
||||||
|
const all = require('it-all')
|
||||||
|
|
||||||
const peerUtils = require('../utils/creators/peer')
|
const peerUtils = require('../utils/creators/peer')
|
||||||
const { baseOptions, routingOptions } = require('./utils')
|
const { baseOptions, routingOptions } = require('./utils')
|
||||||
@ -78,10 +80,14 @@ describe('content-routing', () => {
|
|||||||
|
|
||||||
it('should use the nodes dht to find providers', async () => {
|
it('should use the nodes dht to find providers', async () => {
|
||||||
const deferred = pDefer()
|
const deferred = pDefer()
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
|
||||||
sinon.stub(nodes[0]._dht, 'findProviders').callsFake(function * () {
|
sinon.stub(nodes[0]._dht, 'findProviders').callsFake(function * () {
|
||||||
deferred.resolve()
|
deferred.resolve()
|
||||||
yield
|
yield {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
await nodes[0].contentRouting.findProviders().next()
|
await nodes[0].contentRouting.findProviders().next()
|
||||||
@ -138,10 +144,14 @@ describe('content-routing', () => {
|
|||||||
|
|
||||||
it('should use the delegate router to find providers', async () => {
|
it('should use the delegate router to find providers', async () => {
|
||||||
const deferred = pDefer()
|
const deferred = pDefer()
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
|
||||||
sinon.stub(delegate, 'findProviders').callsFake(function * () {
|
sinon.stub(delegate, 'findProviders').callsFake(function * () {
|
||||||
deferred.resolve()
|
deferred.resolve()
|
||||||
yield
|
yield {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
await node.contentRouting.findProviders().next()
|
await node.contentRouting.findProviders().next()
|
||||||
@ -251,6 +261,110 @@ describe('content-routing', () => {
|
|||||||
|
|
||||||
afterEach(() => node.stop())
|
afterEach(() => node.stop())
|
||||||
|
|
||||||
|
it('should store the multiaddrs of a peer', async () => {
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const result = {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/49320')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findProviders').callsFake(function * () {})
|
||||||
|
sinon.stub(delegate, 'findProviders').callsFake(function * () {
|
||||||
|
yield result
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(node.peerStore.addressBook.get(providerPeerId)).to.not.be.ok()
|
||||||
|
|
||||||
|
await drain(node.contentRouting.findProviders('a cid'))
|
||||||
|
|
||||||
|
expect(node.peerStore.addressBook.get(providerPeerId)).to.deep.include({
|
||||||
|
isCertified: false,
|
||||||
|
multiaddr: result.multiaddrs[0]
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not wait for routing findProviders to finish before returning results', async () => {
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const result = {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/49320')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findProviders').callsFake(async function * () { // eslint-disable-line require-yield
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findProviders').callsFake(async function * () {
|
||||||
|
yield result
|
||||||
|
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
for await (const provider of node.contentRouting.findProviders('a cid')) {
|
||||||
|
expect(provider.id).to.deep.equal(providerPeerId)
|
||||||
|
defer.resolve()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should dedupe results', async () => {
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const result = {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/49320')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findProviders').callsFake(async function * () {
|
||||||
|
yield result
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findProviders').callsFake(async function * () {
|
||||||
|
yield result
|
||||||
|
})
|
||||||
|
|
||||||
|
const results = await all(node.contentRouting.findProviders('a cid'))
|
||||||
|
|
||||||
|
expect(results).to.be.an('array').with.lengthOf(1).that.deep.equals([result])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should combine multiaddrs when different addresses are returned by different content routers', async () => {
|
||||||
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const result1 = {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/49320')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
const result2 = {
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/213.213.213.213/tcp/2344')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findProviders').callsFake(async function * () {
|
||||||
|
yield result1
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findProviders').callsFake(async function * () {
|
||||||
|
yield result2
|
||||||
|
})
|
||||||
|
|
||||||
|
await drain(node.contentRouting.findProviders('a cid'))
|
||||||
|
|
||||||
|
expect(node.peerStore.addressBook.get(providerPeerId)).to.deep.include({
|
||||||
|
isCertified: false,
|
||||||
|
multiaddr: result1.multiaddrs[0]
|
||||||
|
}).and.to.deep.include({
|
||||||
|
isCertified: false,
|
||||||
|
multiaddr: result2.multiaddrs[0]
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
it('should use both the dht and delegate router to provide', async () => {
|
it('should use both the dht and delegate router to provide', async () => {
|
||||||
const dhtDeferred = pDefer()
|
const dhtDeferred = pDefer()
|
||||||
const delegatedDeferred = pDefer()
|
const delegatedDeferred = pDefer()
|
||||||
@ -271,15 +385,18 @@ describe('content-routing', () => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should only use the dht if it finds providers', async () => {
|
it('should use the dht if the delegate fails to find providers', async () => {
|
||||||
const results = [true]
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = [{
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}]
|
||||||
|
|
||||||
sinon.stub(node._dht, 'findProviders').callsFake(function * () {
|
sinon.stub(node._dht, 'findProviders').callsFake(function * () {
|
||||||
yield results[0]
|
yield results[0]
|
||||||
})
|
})
|
||||||
|
|
||||||
sinon.stub(delegate, 'findProviders').callsFake(function * () { // eslint-disable-line require-yield
|
sinon.stub(delegate, 'findProviders').callsFake(function * () { // eslint-disable-line require-yield
|
||||||
throw new Error('the delegate should not have been called')
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const providers = []
|
const providers = []
|
||||||
@ -292,7 +409,11 @@ describe('content-routing', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it('should use the delegate if the dht fails to find providers', async () => {
|
it('should use the delegate if the dht fails to find providers', async () => {
|
||||||
const results = [true]
|
const [providerPeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = [{
|
||||||
|
id: providerPeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}]
|
||||||
|
|
||||||
sinon.stub(node._dht, 'findProviders').callsFake(function * () {})
|
sinon.stub(node._dht, 'findProviders').callsFake(function * () {})
|
||||||
|
|
||||||
|
@ -10,6 +10,8 @@ const delay = require('delay')
|
|||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
const pWaitFor = require('p-wait-for')
|
const pWaitFor = require('p-wait-for')
|
||||||
const mergeOptions = require('merge-options')
|
const mergeOptions = require('merge-options')
|
||||||
|
const drain = require('it-drain')
|
||||||
|
const all = require('it-all')
|
||||||
|
|
||||||
const ipfsHttpClient = require('ipfs-http-client')
|
const ipfsHttpClient = require('ipfs-http-client')
|
||||||
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
|
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
|
||||||
@ -82,10 +84,14 @@ describe('peer-routing', () => {
|
|||||||
|
|
||||||
it('should use the nodes dht to get the closest peers', async () => {
|
it('should use the nodes dht to get the closest peers', async () => {
|
||||||
const deferred = pDefer()
|
const deferred = pDefer()
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
|
||||||
sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(function * () {
|
sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(function * () {
|
||||||
deferred.resolve()
|
deferred.resolve()
|
||||||
yield
|
yield {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
await nodes[0].peerRouting.getClosestPeers().next()
|
await nodes[0].peerRouting.getClosestPeers().next()
|
||||||
@ -128,10 +134,14 @@ describe('peer-routing', () => {
|
|||||||
|
|
||||||
it('should use the delegate router to find peers', async () => {
|
it('should use the delegate router to find peers', async () => {
|
||||||
const deferred = pDefer()
|
const deferred = pDefer()
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
|
||||||
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
||||||
deferred.resolve()
|
deferred.resolve()
|
||||||
return 'fake peer-id'
|
return {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
await node.peerRouting.findPeer()
|
await node.peerRouting.findPeer()
|
||||||
@ -140,10 +150,14 @@ describe('peer-routing', () => {
|
|||||||
|
|
||||||
it('should use the delegate router to get the closest peers', async () => {
|
it('should use the delegate router to get the closest peers', async () => {
|
||||||
const deferred = pDefer()
|
const deferred = pDefer()
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
|
||||||
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
|
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
|
||||||
deferred.resolve()
|
deferred.resolve()
|
||||||
yield
|
yield {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
await node.peerRouting.getClosestPeers().next()
|
await node.peerRouting.getClosestPeers().next()
|
||||||
@ -152,7 +166,7 @@ describe('peer-routing', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it('should be able to find a peer', async () => {
|
it('should be able to find a peer', async () => {
|
||||||
const peerKey = 'QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL'
|
const peerKey = PeerId.createFromB58String('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL')
|
||||||
const mockApi = nock('http://0.0.0.0:60197')
|
const mockApi = nock('http://0.0.0.0:60197')
|
||||||
.post('/api/v0/dht/findpeer')
|
.post('/api/v0/dht/findpeer')
|
||||||
.query(true)
|
.query(true)
|
||||||
@ -277,55 +291,93 @@ describe('peer-routing', () => {
|
|||||||
|
|
||||||
afterEach(() => node.stop())
|
afterEach(() => node.stop())
|
||||||
|
|
||||||
it('should only use the dht if it finds the peer', async () => {
|
|
||||||
const dhtDeferred = pDefer()
|
|
||||||
|
|
||||||
sinon.stub(node._dht, 'findPeer').callsFake(() => {
|
|
||||||
dhtDeferred.resolve()
|
|
||||||
return { id: node.peerId }
|
|
||||||
})
|
|
||||||
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
|
||||||
throw new Error('the delegate should not have been called')
|
|
||||||
})
|
|
||||||
|
|
||||||
await node.peerRouting.findPeer('a peer id')
|
|
||||||
await dhtDeferred.promise
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should use the delegate if the dht fails to find the peer', async () => {
|
it('should use the delegate if the dht fails to find the peer', async () => {
|
||||||
const results = [true]
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
|
|
||||||
sinon.stub(node._dht, 'findPeer').callsFake(() => {})
|
sinon.stub(node._dht, 'findPeer').callsFake(() => {})
|
||||||
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
||||||
return results
|
return results
|
||||||
})
|
})
|
||||||
|
|
||||||
const peer = await node.peerRouting.findPeer('a peer id')
|
const peer = await node.peerRouting.findPeer(remotePeerId)
|
||||||
expect(peer).to.eql(results)
|
expect(peer).to.eql(results)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should only use the dht if it gets the closest peers', async () => {
|
it('should not wait for the dht to return if the delegate does first', async () => {
|
||||||
const results = [true]
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = {
|
||||||
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
|
id: remotePeerId,
|
||||||
yield results[0]
|
multiaddrs: []
|
||||||
})
|
|
||||||
|
|
||||||
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () { // eslint-disable-line require-yield
|
|
||||||
throw new Error('the delegate should not have been called')
|
|
||||||
})
|
|
||||||
|
|
||||||
const closest = []
|
|
||||||
for await (const peer of node.peerRouting.getClosestPeers('a cid')) {
|
|
||||||
closest.push(peer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
expect(closest).to.have.length.above(0)
|
const defer = pDefer()
|
||||||
expect(closest).to.eql(results)
|
|
||||||
|
sinon.stub(node._dht, 'findPeer').callsFake(async () => {
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findPeer').callsFake(() => {
|
||||||
|
return results
|
||||||
|
})
|
||||||
|
|
||||||
|
const peer = await node.peerRouting.findPeer(remotePeerId)
|
||||||
|
expect(peer).to.eql(results)
|
||||||
|
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not wait for the delegate to return if the dht does first', async () => {
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}
|
||||||
|
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findPeer').callsFake(() => {
|
||||||
|
return results
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findPeer').callsFake(async () => {
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
const peer = await node.peerRouting.findPeer(remotePeerId)
|
||||||
|
expect(peer).to.eql(results)
|
||||||
|
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should store the addresses of the found peer', async () => {
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/38982')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
const spy = sinon.spy(node.peerStore.addressBook, 'add')
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'findPeer').callsFake(() => {
|
||||||
|
return results
|
||||||
|
})
|
||||||
|
sinon.stub(delegate, 'findPeer').callsFake(() => {})
|
||||||
|
|
||||||
|
await node.peerRouting.findPeer(remotePeerId)
|
||||||
|
|
||||||
|
expect(spy.calledWith(results.id, results.multiaddrs)).to.be.true()
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should use the delegate if the dht fails to get the closest peer', async () => {
|
it('should use the delegate if the dht fails to get the closest peer', async () => {
|
||||||
const results = [true]
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = [{
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: []
|
||||||
|
}]
|
||||||
|
|
||||||
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { })
|
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { })
|
||||||
|
|
||||||
@ -333,14 +385,55 @@ describe('peer-routing', () => {
|
|||||||
yield results[0]
|
yield results[0]
|
||||||
})
|
})
|
||||||
|
|
||||||
const closest = []
|
const closest = await all(node.peerRouting.getClosestPeers('a cid'))
|
||||||
for await (const peer of node.peerRouting.getClosestPeers('a cid')) {
|
|
||||||
closest.push(peer)
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(closest).to.have.length.above(0)
|
expect(closest).to.have.length.above(0)
|
||||||
expect(closest).to.eql(results)
|
expect(closest).to.eql(results)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should store the addresses of the closest peer', async () => {
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const result = {
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/38982')
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
const spy = sinon.spy(node.peerStore.addressBook, 'add')
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { })
|
||||||
|
|
||||||
|
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
|
||||||
|
yield result
|
||||||
|
})
|
||||||
|
|
||||||
|
await drain(node.peerRouting.getClosestPeers('a cid'))
|
||||||
|
|
||||||
|
expect(spy.calledWith(result.id, result.multiaddrs)).to.be.true()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should dedupe closest peers', async () => {
|
||||||
|
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
|
||||||
|
const results = [{
|
||||||
|
id: remotePeerId,
|
||||||
|
multiaddrs: [
|
||||||
|
multiaddr('/ip4/123.123.123.123/tcp/38982')
|
||||||
|
]
|
||||||
|
}]
|
||||||
|
|
||||||
|
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
|
||||||
|
yield * results
|
||||||
|
})
|
||||||
|
|
||||||
|
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
|
||||||
|
yield * results
|
||||||
|
})
|
||||||
|
|
||||||
|
const peers = await all(node.peerRouting.getClosestPeers('a cid'))
|
||||||
|
|
||||||
|
expect(peers).to.be.an('array').with.a.lengthOf(1).that.deep.equals(results)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('peer routing refresh manager service', () => {
|
describe('peer routing refresh manager service', () => {
|
||||||
|
Reference in New Issue
Block a user