feat: discover and connect to closest peers (#798)

This commit is contained in:
Vasco Santos 2020-11-25 18:50:23 +01:00 committed by Vasco Santos
parent 4ebcdb085c
commit baedf3fe5a
7 changed files with 401 additions and 27 deletions

View File

@ -20,6 +20,7 @@
* [`contentRouting.get`](#contentroutingget) * [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany) * [`contentRouting.getMany`](#contentroutinggetmany)
* [`peerRouting.findPeer`](#peerroutingfindpeer) * [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerRouting.getClosestPeers`](#peerroutinggetclosestpeers)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
* [`peerStore.addressBook.get`](#peerstoreaddressbookget) * [`peerStore.addressBook.get`](#peerstoreaddressbookget)
@ -100,6 +101,7 @@ Creates an instance of Libp2p.
| [options.keychain] | [`object`](./CONFIGURATION.md#setup-with-keychain) | keychain [configuration](./CONFIGURATION.md#setup-with-keychain) | | [options.keychain] | [`object`](./CONFIGURATION.md#setup-with-keychain) | keychain [configuration](./CONFIGURATION.md#setup-with-keychain) |
| [options.metrics] | [`object`](./CONFIGURATION.md#configuring-metrics) | libp2p Metrics [configuration](./CONFIGURATION.md#configuring-metrics) | | [options.metrics] | [`object`](./CONFIGURATION.md#configuring-metrics) | libp2p Metrics [configuration](./CONFIGURATION.md#configuring-metrics) |
| [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) | | [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) |
| [options.peerRouting] | [`object`](./CONFIGURATION.md#setup-with-content-and-peer-routing) | libp2p Peer routing service [configuration](./CONFIGURATION.md#setup-with-content-and-peer-routing) |
| [options.peerStore] | [`object`](./CONFIGURATION.md#configuring-peerstore) | libp2p PeerStore [configuration](./CONFIGURATION.md#configuring-peerstore) | | [options.peerStore] | [`object`](./CONFIGURATION.md#configuring-peerstore) | libp2p PeerStore [configuration](./CONFIGURATION.md#configuring-peerstore) |
For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md). For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md).
@ -707,6 +709,36 @@ Iterates over all peer routers in series to find the given peer. If the DHT is e
const peer = await libp2p.peerRouting.findPeer(peerId, options) const peer = await libp2p.peerRouting.findPeer(peerId, options)
``` ```
### peerRouting.getClosestPeers
Iterates over all content routers in series to get the closest peers of the given key.
Once a content router succeeds, the iteration will stop. If the DHT is enabled, it will be queried first.
`libp2p.peerRouting.getClosestPeers(cid, options)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| key | `Uint8Array` | A CID like key |
| options | `object` | operation options |
| options.timeout | `number` | How long the query can take (ms). |
#### Returns
| Type | Description |
|------|-------------|
| `AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }` | Async iterator for peer data |
#### Example
```js
// Iterate over the closest peers found for the given key
for await (const peer of libp2p.peerRouting.getClosestPeers(key)) {
console.log(peer.id, peer.multiaddrs)
}
```
### peerStore.addressBook.add ### peerStore.addressBook.add
Adds known `multiaddrs` of a given peer. If the peer is not known, it will be set with the provided multiaddrs. Adds known `multiaddrs` of a given peer. If the peer is not known, it will be set with the provided multiaddrs.

View File

@ -397,7 +397,14 @@ const node = await Libp2p.create({
new DelegatedPeerRouter() new DelegatedPeerRouter()
], ],
}, },
peerId peerId,
peerRouting: { // Peer routing configuration
refreshManager: { // Refresh known and connected closest peers
enabled: true, // Should find the closest peers.
interval: 6e5, // Interval for getting the new for closest peers of 10min
bootDelay: 10e3 // Delay for the initial query for closest peers
}
}
}) })
``` ```

View File

@ -79,6 +79,7 @@
"protons": "^2.0.0", "protons": "^2.0.0",
"retimer": "^2.0.0", "retimer": "^2.0.0",
"sanitize-filename": "^1.6.3", "sanitize-filename": "^1.6.3",
"set-delayed-interval": "^1.0.0",
"streaming-iterables": "^5.0.2", "streaming-iterables": "^5.0.2",
"timeout-abort-controller": "^1.1.1", "timeout-abort-controller": "^1.1.1",
"varint": "^5.0.0", "varint": "^5.0.0",
@ -92,6 +93,7 @@
"chai-string": "^1.5.0", "chai-string": "^1.5.0",
"delay": "^4.3.0", "delay": "^4.3.0",
"interop-libp2p": "^0.3.0", "interop-libp2p": "^0.3.0",
"into-stream": "^6.0.0",
"ipfs-http-client": "^47.0.1", "ipfs-http-client": "^47.0.1",
"it-concat": "^1.0.0", "it-concat": "^1.0.0",
"it-pair": "^1.0.0", "it-pair": "^1.0.0",

View File

@ -41,6 +41,13 @@ const DefaultConfig = {
persistence: false, persistence: false,
threshold: 5 threshold: 5
}, },
peerRouting: {
refreshManager: {
enabled: true,
interval: 6e5,
bootDelay: 10e3
}
},
config: { config: {
dht: { dht: {
enabled: false, enabled: false,

View File

@ -9,7 +9,7 @@ log.error = debug('libp2p:error')
const errCode = require('err-code') const errCode = require('err-code')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const peerRouting = require('./peer-routing') const PeerRouting = require('./peer-routing')
const contentRouting = require('./content-routing') const contentRouting = require('./content-routing')
const getPeer = require('./get-peer') const getPeer = require('./get-peer')
const { validate: validateConfig } = require('./config') const { validate: validateConfig } = require('./config')
@ -193,7 +193,7 @@ class Libp2p extends EventEmitter {
// Attach remaining APIs // Attach remaining APIs
// peer and content routing will automatically get modules from _modules and _dht // peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this) this.peerRouting = new PeerRouting(this)
this.contentRouting = contentRouting(this) this.contentRouting = contentRouting(this)
// Mount default protocols // Mount default protocols
@ -250,8 +250,8 @@ class Libp2p extends EventEmitter {
try { try {
this._isStarted = false this._isStarted = false
// Relay
this.relay && this.relay.stop() this.relay && this.relay.stop()
this.peerRouting.stop()
for (const service of this._discovery.values()) { for (const service of this._discovery.values()) {
service.removeListener('peer', this._onDiscoveryPeer) service.removeListener('peer', this._onDiscoveryPeer)
@ -501,6 +501,8 @@ class Libp2p extends EventEmitter {
// Relay // Relay
this.relay && this.relay.start() this.relay && this.relay.start()
this.peerRouting.start()
} }
/** /**

View File

@ -1,40 +1,126 @@
'use strict' 'use strict'
const errCode = require('err-code') const errCode = require('err-code')
const debug = require('debug')
const log = debug('libp2p:peer-routing')
log.error = debug('libp2p:peer-routing:error')
const all = require('it-all')
const pAny = require('p-any') const pAny = require('p-any')
const {
setDelayedInterval,
clearDelayedInterval
} = require('set-delayed-interval')
module.exports = (node) => { /**
const routers = node._modules.peerRouting || [] * Responsible for managing the usage of the available Peer Routing modules.
*/
class PeerRouting {
/**
* @class
* @param {Libp2p} libp2p
*/
constructor (libp2p) {
this._peerId = libp2p.peerId
this._peerStore = libp2p.peerStore
this._routers = libp2p._modules.peerRouting || []
// If we have the dht, make it first // If we have the dht, make it first
if (node._dht) { if (libp2p._dht) {
routers.unshift(node._dht) this._routers.unshift(libp2p._dht)
}
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager
this._findClosestPeersTask = this._findClosestPeersTask.bind(this)
} }
return { /**
/** * Start peer routing service.
* Iterates over all peer routers in series to find the given peer. */
* start () {
* @param {string} id - The id of the peer to find if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) {
* @param {object} [options] return
* @param {number} [options.timeout] - How long the query should run }
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/ this._timeoutId = setDelayedInterval(
findPeer: async (id, options) => { // eslint-disable-line require-await this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay
if (!routers.length) { )
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') }
/**
* Recurrent task to find closest peers and add their addresses to the Address Book.
*/
async _findClosestPeersTask () {
try {
for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) {
this._peerStore.addressBook.add(id, multiaddrs)
}
} catch (err) {
log.error(err)
}
}
/**
* Stop peer routing service.
*/
stop () {
clearDelayedInterval(this._timeoutId)
}
/**
* Iterates over all peer routers in series to find the given peer.
*
* @param {string} id - The id of the peer to find
* @param {object} [options]
* @param {number} [options.timeout] - How long the query should run
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options) { // eslint-disable-line require-await
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}
return pAny(this._routers.map(async (router) => {
const result = await router.findPeer(id, options)
// If we don't have a result, we need to provide an error to keep trying
if (!result || Object.keys(result).length === 0) {
throw errCode(new Error('not found'), 'NOT_FOUND')
} }
return pAny(routers.map(async (router) => { return result
const result = await router.findPeer(id, options) }))
}
// If we don't have a result, we need to provide an error to keep trying /**
if (!result || Object.keys(result).length === 0) { * Attempt to find the closest peers on the network to the given key.
*
* @param {Uint8Array} key - A CID like key
* @param {Object} [options]
* @param {number} [options.timeout=30e3] - How long the query can take.
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * getClosestPeers (key, options = { timeout: 30e3 }) {
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}
const result = await pAny(
this._routers.map(async (router) => {
const peers = await all(router.getClosestPeers(key, options))
if (!peers || !peers.length) {
throw errCode(new Error('not found'), 'NOT_FOUND') throw errCode(new Error('not found'), 'NOT_FOUND')
} }
return peers
})
)
return result for (const peer of result) {
})) yield peer
} }
} }
} }
module.exports = PeerRouting

View File

@ -4,12 +4,17 @@
const { expect } = require('aegir/utils/chai') const { expect } = require('aegir/utils/chai')
const nock = require('nock') const nock = require('nock')
const sinon = require('sinon') const sinon = require('sinon')
const intoStream = require('into-stream')
const delay = require('delay')
const pDefer = require('p-defer') const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const mergeOptions = require('merge-options') const mergeOptions = require('merge-options')
const ipfsHttpClient = require('ipfs-http-client') const ipfsHttpClient = require('ipfs-http-client')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing') const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const peerUtils = require('../utils/creators/peer') const peerUtils = require('../utils/creators/peer')
const { baseOptions, routingOptions } = require('./utils') const { baseOptions, routingOptions } = require('./utils')
@ -29,6 +34,16 @@ describe('peer-routing', () => {
.to.eventually.be.rejected() .to.eventually.be.rejected()
.and.to.have.property('code', 'NO_ROUTERS_AVAILABLE') .and.to.have.property('code', 'NO_ROUTERS_AVAILABLE')
}) })
it('.getClosestPeers should return an error', async () => {
try {
for await (const _ of node.peerRouting.getClosestPeers('a cid')) { } // eslint-disable-line
throw new Error('.getClosestPeers should return an error')
} catch (err) {
expect(err).to.exist()
expect(err.code).to.equal('NO_ROUTERS_AVAILABLE')
}
})
}) })
describe('via dht router', () => { describe('via dht router', () => {
@ -64,6 +79,19 @@ describe('peer-routing', () => {
nodes[0].peerRouting.findPeer() nodes[0].peerRouting.findPeer()
return deferred.promise return deferred.promise
}) })
it('should use the nodes dht to get the closest peers', async () => {
const deferred = pDefer()
sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(function * () {
deferred.resolve()
yield
})
await nodes[0].peerRouting.getClosestPeers().next()
return deferred.promise
})
}) })
describe('via delegate router', () => { describe('via delegate router', () => {
@ -110,6 +138,19 @@ describe('peer-routing', () => {
return deferred.promise return deferred.promise
}) })
it('should use the delegate router to get the closest peers', async () => {
const deferred = pDefer()
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
deferred.resolve()
yield
})
await node.peerRouting.getClosestPeers().next()
return deferred.promise
})
it('should be able to find a peer', async () => { it('should be able to find a peer', async () => {
const peerKey = 'QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL' const peerKey = 'QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL'
const mockApi = nock('http://0.0.0.0:60197') const mockApi = nock('http://0.0.0.0:60197')
@ -154,6 +195,60 @@ describe('peer-routing', () => {
expect(mockApi.isDone()).to.equal(true) expect(mockApi.isDone()).to.equal(true)
}) })
it('should be able to get the closest peers', async () => {
const peerId = await PeerId.create({ keyType: 'ed25519' })
const closest1 = '12D3KooWLewYMMdGWAtuX852n4rgCWkK7EBn4CWbwwBzhsVoKxk3'
const closest2 = '12D3KooWDtoQbpKhtnWddfj72QmpFvvLDTsBLTFkjvgQm6cde2AK'
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/query')
.query(true)
.reply(200,
() => intoStream([
`{"extra":"","id":"${closest1}","responses":[{"ID":"${closest1}","Addrs":["/ip4/127.0.0.1/tcp/63930","/ip4/127.0.0.1/tcp/63930"]}],"type":1}\n`,
`{"extra":"","id":"${closest2}","responses":[{"ID":"${closest2}","Addrs":["/ip4/127.0.0.1/tcp/63506","/ip4/127.0.0.1/tcp/63506"]}],"type":1}\n`,
`{"Extra":"","ID":"${closest2}","Responses":[],"Type":2}\n`,
`{"Extra":"","ID":"${closest1}","Responses":[],"Type":2}\n`
]),
[
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const closestPeers = []
for await (const peer of node.peerRouting.getClosestPeers(peerId.id, { timeout: 1000 })) {
closestPeers.push(peer)
}
expect(closestPeers).to.have.length(2)
expect(closestPeers[0].id.toB58String()).to.equal(closest2)
expect(closestPeers[0].multiaddrs).to.have.lengthOf(2)
expect(closestPeers[1].id.toB58String()).to.equal(closest1)
expect(closestPeers[1].multiaddrs).to.have.lengthOf(2)
expect(mockApi.isDone()).to.equal(true)
})
it('should handle errors when getting the closest peers', async () => {
const peerId = await PeerId.create({ keyType: 'ed25519' })
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/query')
.query(true)
.reply(502, 'Bad Gateway', [
'X-Chunked-Output', '1'
])
try {
for await (const _ of node.peerRouting.getClosestPeers(peerId.id)) { } // eslint-disable-line
throw new Error('should handle errors when getting the closest peers')
} catch (err) {
expect(err).to.exist()
}
expect(mockApi.isDone()).to.equal(true)
})
}) })
describe('via dht and delegate routers', () => { describe('via dht and delegate routers', () => {
@ -208,5 +303,148 @@ describe('peer-routing', () => {
const peer = await node.peerRouting.findPeer('a peer id') const peer = await node.peerRouting.findPeer('a peer id')
expect(peer).to.eql(results) expect(peer).to.eql(results)
}) })
it('should only use the dht if it gets the closest peers', async () => {
const results = [true]
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield results[0]
})
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () { // eslint-disable-line require-yield
throw new Error('the delegate should not have been called')
})
const closest = []
for await (const peer of node.peerRouting.getClosestPeers('a cid')) {
closest.push(peer)
}
expect(closest).to.have.length.above(0)
expect(closest).to.eql(results)
})
it('should use the delegate if the dht fails to get the closest peer', async () => {
const results = [true]
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () { })
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
yield results[0]
})
const closest = []
for await (const peer of node.peerRouting.getClosestPeers('a cid')) {
closest.push(peer)
}
expect(closest).to.have.length.above(0)
expect(closest).to.eql(results)
})
})
describe('peer routing refresh manager service', () => {
let node
let peerIds
before(async () => {
peerIds = await peerUtils.createPeerId({ number: 2 })
})
afterEach(() => {
sinon.restore()
return node && node.stop()
})
it('should be enabled and start by default', async () => {
const results = [
{ id: peerIds[0], multiaddrs: [multiaddr('/ip4/30.0.0.1/tcp/2000')] },
{ id: peerIds[1], multiaddrs: [multiaddr('/ip4/32.0.0.1/tcp/2000')] }
]
;[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
bootDelay: 100
}
}
}),
started: false
})
sinon.spy(node.peerStore.addressBook, 'add')
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield results[0]
yield results[1]
})
await node.start()
await pWaitFor(() => node._dht.getClosestPeers.callCount === 1)
await pWaitFor(() => node.peerStore.addressBook.add.callCount === results.length)
const call0 = node.peerStore.addressBook.add.getCall(0)
expect(call0.args[0].equals(results[0].id))
call0.args[1].forEach((m, index) => {
expect(m.equals(results[0].multiaddrs[index]))
})
const call1 = node.peerStore.addressBook.add.getCall(1)
expect(call1.args[0].equals(results[1].id))
call0.args[1].forEach((m, index) => {
expect(m.equals(results[1].multiaddrs[index]))
})
})
it('should support being disabled', async () => {
[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
bootDelay: 100,
enabled: false
}
}
}),
started: false
})
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield
throw new Error('should not be called')
})
await node.start()
await delay(100)
expect(node._dht.getClosestPeers.callCount === 0)
})
it('should start and run recurrently on interval', async () => {
[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
interval: 500,
bootDelay: 200
}
}
}),
started: false
})
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield { id: peerIds[0], multiaddrs: [multiaddr('/ip4/30.0.0.1/tcp/2000')] }
})
await node.start()
await delay(300)
expect(node._dht.getClosestPeers.callCount).to.eql(1)
await delay(500)
expect(node._dht.getClosestPeers.callCount).to.eql(2)
})
}) })
}) })