feat: resolve multiaddrs before dial (#782)

This commit is contained in:
Vasco Santos 2020-11-04 13:54:50 +01:00 committed by GitHub
parent 61c36f9e09
commit 093c0ea13f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 267 additions and 27 deletions

View File

@ -465,6 +465,7 @@ Dialing in libp2p can be configured to limit the rate of dialing, and how long d
| maxParallelDials | `number` | How many multiaddrs we can dial in parallel. | | maxParallelDials | `number` | How many multiaddrs we can dial in parallel. |
| maxDialsPerPeer | `number` | How many multiaddrs we can dial per peer, in parallel. | | maxDialsPerPeer | `number` | How many multiaddrs we can dial per peer, in parallel. |
| dialTimeout | `number` | Second dial timeout per peer in ms. | | dialTimeout | `number` | Second dial timeout per peer in ms. |
| resolvers | `object` | Dial [Resolvers](https://github.com/multiformats/js-multiaddr/blob/master/src/resolvers/index.js) for resolving multiaddrs |
The below configuration example shows how the dialer should be configured, with the current defaults: The below configuration example shows how the dialer should be configured, with the current defaults:
@ -474,6 +475,8 @@ const TCP = require('libp2p-tcp')
const MPLEX = require('libp2p-mplex') const MPLEX = require('libp2p-mplex')
const { NOISE } = require('libp2p-noise') const { NOISE } = require('libp2p-noise')
const { dnsaddrResolver } = require('multiaddr/src/resolvers')
const node = await Libp2p.create({ const node = await Libp2p.create({
modules: { modules: {
transport: [TCP], transport: [TCP],
@ -483,7 +486,10 @@ const node = await Libp2p.create({
dialer: { dialer: {
maxParallelDials: 100, maxParallelDials: 100,
maxDialsPerPeer: 4, maxDialsPerPeer: 4,
dialTimeout: 30e3 dialTimeout: 30e3,
resolvers: {
dnsaddr: dnsaddrResolver
}
} }
``` ```

View File

@ -204,8 +204,8 @@ const Bootstrap = require('libp2p-bootstrap')
// Known peers addresses // Known peers addresses
const bootstrapMultiaddrs = [ const bootstrapMultiaddrs = [
'/dns4/ams-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd', '/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb',
'/dns4/lon-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3' '/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN'
] ]
const node = await Libp2p.create({ const node = await Libp2p.create({

View File

@ -31,12 +31,11 @@ document.addEventListener('DOMContentLoaded', async () => {
[Bootstrap.tag]: { [Bootstrap.tag]: {
enabled: true, enabled: true,
list: [ list: [
'/dns4/ams-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd', '/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN',
'/dns4/lon-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3', '/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb',
'/dns4/sfo-3.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM', '/dnsaddr/bootstrap.libp2p.io/p2p/QmZa1sAxajnQjVM8WjWXoMbmPd7NsWhfKsPkErzpm9wGkp',
'/dns4/sgp-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu', '/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa',
'/dns4/nyc-1.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLueR4xBeUbY9WZ9xGUUxunbKWcrNFTDAadQJmocnWm', '/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt'
'/dns4/nyc-2.bootstrap.libp2p.io/tcp/443/wss/p2p/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64'
] ]
} }
} }

View File

@ -64,7 +64,7 @@
"mafmt": "^8.0.0", "mafmt": "^8.0.0",
"merge-options": "^2.0.0", "merge-options": "^2.0.0",
"moving-average": "^1.0.0", "moving-average": "^1.0.0",
"multiaddr": "^8.0.0", "multiaddr": "^8.1.0",
"multicodec": "^2.0.0", "multicodec": "^2.0.0",
"multistream-select": "^1.0.0", "multistream-select": "^1.0.0",
"mutable-proxy": "^1.0.0", "mutable-proxy": "^1.0.0",

View File

@ -1,6 +1,8 @@
'use strict' 'use strict'
const mergeOptions = require('merge-options') const mergeOptions = require('merge-options')
const { dnsaddrResolver } = require('multiaddr/src/resolvers')
const Constants = require('./constants') const Constants = require('./constants')
const { FaultTolerance } = require('./transport-manager') const { FaultTolerance } = require('./transport-manager')
@ -20,7 +22,10 @@ const DefaultConfig = {
dialer: { dialer: {
maxParallelDials: Constants.MAX_PARALLEL_DIALS, maxParallelDials: Constants.MAX_PARALLEL_DIALS,
maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS, maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS,
dialTimeout: Constants.DIAL_TIMEOUT dialTimeout: Constants.DIAL_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
}
}, },
metrics: { metrics: {
enabled: false enabled: false

View File

@ -27,13 +27,15 @@ class Dialer {
* @param {number} [options.concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials. * @param {number} [options.concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials.
* @param {number} [options.perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer. * @param {number} [options.perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer.
* @param {number} [options.timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take. * @param {number} [options.timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take.
* @param {object} [options.resolvers = {}] - multiaddr resolvers to use when dialing
*/ */
constructor ({ constructor ({
transportManager, transportManager,
peerStore, peerStore,
concurrency = MAX_PARALLEL_DIALS, concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT, timeout = DIAL_TIMEOUT,
perPeerLimit = MAX_PER_PEER_DIALS perPeerLimit = MAX_PER_PEER_DIALS,
resolvers = {}
}) { }) {
this.transportManager = transportManager this.transportManager = transportManager
this.peerStore = peerStore this.peerStore = peerStore
@ -42,6 +44,10 @@ class Dialer {
this.perPeerLimit = perPeerLimit this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index) this.tokens = [...new Array(concurrency)].map((_, index) => index)
this._pendingDials = new Map() this._pendingDials = new Map()
for (const [key, value] of Object.entries(resolvers)) {
multiaddr.resolvers.set(key, value)
}
} }
/** /**
@ -69,7 +75,7 @@ class Dialer {
* @returns {Promise<Connection>} * @returns {Promise<Connection>}
*/ */
async connectToPeer (peer, options = {}) { async connectToPeer (peer, options = {}) {
const dialTarget = this._createDialTarget(peer) const dialTarget = await this._createDialTarget(peer)
if (!dialTarget.addrs.length) { if (!dialTarget.addrs.length) {
throw errCode(new Error('The dial request has no addresses'), codes.ERR_NO_VALID_ADDRESSES) throw errCode(new Error('The dial request has no addresses'), codes.ERR_NO_VALID_ADDRESSES)
@ -105,22 +111,28 @@ class Dialer {
* *
* @private * @private
* @param {PeerId|Multiaddr|string} peer - A PeerId or Multiaddr * @param {PeerId|Multiaddr|string} peer - A PeerId or Multiaddr
* @returns {DialTarget} * @returns {Promise<DialTarget>}
*/ */
_createDialTarget (peer) { async _createDialTarget (peer) {
const { id, multiaddrs } = getPeer(peer) const { id, multiaddrs } = getPeer(peer)
if (multiaddrs) { if (multiaddrs) {
this.peerStore.addressBook.add(id, multiaddrs) this.peerStore.addressBook.add(id, multiaddrs)
} }
let addrs = this.peerStore.addressBook.getMultiaddrsForPeer(id) || [] let knownAddrs = this.peerStore.addressBook.getMultiaddrsForPeer(id) || []
// If received a multiaddr to dial, it should be the first to use // If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too. // But, if we know other multiaddrs for the peer, we should try them too.
if (multiaddr.isMultiaddr(peer)) { if (multiaddr.isMultiaddr(peer)) {
addrs = addrs.filter((addr) => !peer.equals(addr)) knownAddrs = knownAddrs.filter((addr) => !peer.equals(addr))
addrs.unshift(peer) knownAddrs.unshift(peer)
}
const addrs = []
for (const a of knownAddrs) {
const resolvedAddrs = await this._resolve(a)
resolvedAddrs.forEach(ra => addrs.push(ra))
} }
return { return {
@ -190,6 +202,52 @@ class Dialer {
log('token %d released', token) log('token %d released', token)
this.tokens.push(token) this.tokens.push(token)
} }
/**
* Resolve multiaddr recursively.
*
* @param {Multiaddr} ma
* @returns {Promise<Array<Multiaddr>>}
*/
async _resolve (ma) {
// TODO: recursive logic should live in multiaddr once dns4/dns6 support is in place
// Now only supporting resolve for dnsaddr
const resolvableProto = ma.protoNames().includes('dnsaddr')
// Multiaddr is not resolvable? End recursion!
if (!resolvableProto) {
return [ma]
}
const resolvedMultiaddrs = await this._resolveRecord(ma)
const recursiveMultiaddrs = await Promise.all(resolvedMultiaddrs.map((nm) => {
return this._resolve(nm)
}))
return recursiveMultiaddrs.flat().reduce((array, newM) => {
if (!array.find(m => m.equals(newM))) {
array.push(newM)
}
return array
}, []) // Unique addresses
}
/**
* Resolve a given multiaddr. If this fails, an empty array will be returned
*
* @param {Multiaddr} ma
* @returns {Promise<Array<Multiaddr>>}
*/
async _resolveRecord (ma) {
try {
ma = multiaddr(ma.toString()) // Use current multiaddr module
const multiaddrs = await ma.resolve()
return multiaddrs
} catch (_) {
log.error(`multiaddr ${ma} could not be resolved`)
return []
}
}
} }
module.exports = Dialer module.exports = Dialer

View File

@ -134,7 +134,8 @@ class Libp2p extends EventEmitter {
peerStore: this.peerStore, peerStore: this.peerStore,
concurrency: this._options.dialer.maxParallelDials, concurrency: this._options.dialer.maxParallelDials,
perPeerLimit: this._options.dialer.maxDialsPerPeer, perPeerLimit: this._options.dialer.maxDialsPerPeer,
timeout: this._options.dialer.dialTimeout timeout: this._options.dialer.dialTimeout,
resolvers: this._options.dialer.resolvers
}) })
this._modules.transport.forEach((Transport) => { this._modules.transport.forEach((Transport) => {

View File

@ -158,9 +158,9 @@ describe('Dialing (direct, TCP)', () => {
it('should dial to the max concurrency', async () => { it('should dial to the max concurrency', async () => {
const addrs = [ const addrs = [
'/ip4/0.0.0.0/tcp/8000', multiaddr('/ip4/0.0.0.0/tcp/8000'),
'/ip4/0.0.0.0/tcp/8001', multiaddr('/ip4/0.0.0.0/tcp/8001'),
'/ip4/0.0.0.0/tcp/8002' multiaddr('/ip4/0.0.0.0/tcp/8002')
] ]
const dialer = new Dialer({ const dialer = new Dialer({
transportManager: localTM, transportManager: localTM,

View File

@ -263,7 +263,6 @@ describe('Dialing (direct, WebSockets)', () => {
describe('libp2p.dialer', () => { describe('libp2p.dialer', () => {
let libp2p let libp2p
let remoteLibp2p
afterEach(async () => { afterEach(async () => {
sinon.restore() sinon.restore()
@ -271,10 +270,6 @@ describe('Dialing (direct, WebSockets)', () => {
libp2p = null libp2p = null
}) })
after(async () => {
remoteLibp2p && await remoteLibp2p.stop()
})
it('should create a dialer', () => { it('should create a dialer', () => {
libp2p = new Libp2p({ libp2p = new Libp2p({
peerId, peerId,

View File

@ -0,0 +1,176 @@
'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const multiaddr = require('multiaddr')
const { Resolver } = require('multiaddr/src/resolvers/dns')
const { codes: ErrorCodes } = require('../../src/errors')
const peerUtils = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options.browser')
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
const relayAddr = MULTIADDRS_WEBSOCKETS[0]
const getDnsaddrStub = (peerId) => [
[`dnsaddr=/dnsaddr/ams-1.bootstrap.libp2p.io/p2p/${peerId}`],
[`dnsaddr=/dnsaddr/ams-2.bootstrap.libp2p.io/p2p/${peerId}`],
[`dnsaddr=/dnsaddr/lon-1.bootstrap.libp2p.io/p2p/${peerId}`],
[`dnsaddr=/dnsaddr/nrt-1.bootstrap.libp2p.io/p2p/${peerId}`],
[`dnsaddr=/dnsaddr/nyc-1.bootstrap.libp2p.io/p2p/${peerId}`],
[`dnsaddr=/dnsaddr/sfo-2.bootstrap.libp2p.io/p2p/${peerId}`]
]
const relayedAddr = (peerId) => `${relayAddr}/p2p-circuit/p2p/${peerId}`
const getDnsRelayedAddrStub = (peerId) => [
[`dnsaddr=${relayedAddr(peerId)}`]
]
describe('Dialing (resolvable addresses)', () => {
let libp2p, remoteLibp2p
beforeEach(async () => {
[libp2p, remoteLibp2p] = await peerUtils.createPeer({
number: 2,
config: {
modules: baseOptions.modules,
addresses: {
listen: [multiaddr(`${relayAddr}/p2p-circuit`)]
},
config: {
peerDiscovery: {
autoDial: false
}
}
},
started: true,
populateAddressBooks: false
})
})
afterEach(async () => {
sinon.restore()
await Promise.all([libp2p, remoteLibp2p].map(n => n.stop()))
})
it('resolves dnsaddr to ws local address', async () => {
const remoteId = remoteLibp2p.peerId.toB58String()
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId}`)
const relayedAddrFetched = multiaddr(relayedAddr(remoteId))
// Transport spy
const transport = libp2p.transportManager._transports.get('Circuit')
sinon.spy(transport, 'dial')
// Resolver stub
const stub = sinon.stub(Resolver.prototype, 'resolveTxt')
stub.onCall(0).returns(Promise.resolve(getDnsRelayedAddrStub(remoteId)))
// Dial with address resolve
const connection = await libp2p.dial(dialAddr)
expect(connection).to.exist()
expect(connection.remoteAddr.equals(relayedAddrFetched))
const dialArgs = transport.dial.firstCall.args
expect(dialArgs[0].equals(relayedAddrFetched)).to.eql(true)
})
it('resolves a dnsaddr recursively', async () => {
const remoteId = remoteLibp2p.peerId.toB58String()
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId}`)
const relayedAddrFetched = multiaddr(relayedAddr(remoteId))
// Transport spy
const transport = libp2p.transportManager._transports.get('Circuit')
sinon.spy(transport, 'dial')
// Resolver stub
const stub = sinon.stub(Resolver.prototype, 'resolveTxt')
let firstCall = false
stub.callsFake(() => {
if (!firstCall) {
firstCall = true
// Return an array of dnsaddr
return Promise.resolve(getDnsaddrStub(remoteId))
}
return Promise.resolve(getDnsRelayedAddrStub(remoteId))
})
// Dial with address resolve
const connection = await libp2p.dial(dialAddr)
expect(connection).to.exist()
expect(connection.remoteAddr.equals(relayedAddrFetched))
const dialArgs = transport.dial.firstCall.args
expect(dialArgs[0].equals(relayedAddrFetched)).to.eql(true)
})
// TODO: Temporary solution does not resolve dns4/dns6
// Resolver just returns the received multiaddrs
it('stops recursive resolve if finds dns4/dns6 and dials it', async () => {
const remoteId = remoteLibp2p.peerId.toB58String()
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId}`)
// Stub resolver
const dnsMa = multiaddr(`/dns4/ams-1.remote.libp2p.io/tcp/443/wss/p2p/${remoteId}`)
const stubResolve = sinon.stub(Resolver.prototype, 'resolveTxt')
stubResolve.returns(Promise.resolve([
[`dnsaddr=${dnsMa}`]
]))
// Stub transport
const transport = libp2p.transportManager._transports.get('WebSockets')
const stubTransport = sinon.stub(transport, 'dial')
stubTransport.callsFake((multiaddr) => {
expect(multiaddr.equals(dnsMa)).to.eql(true)
})
await libp2p.dial(dialAddr)
})
it('resolves a dnsaddr recursively not failing if one address fails to resolve', async () => {
const remoteId = remoteLibp2p.peerId.toB58String()
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId}`)
const relayedAddrFetched = multiaddr(relayedAddr(remoteId))
// Transport spy
const transport = libp2p.transportManager._transports.get('Circuit')
sinon.spy(transport, 'dial')
// Resolver stub
const stub = sinon.stub(Resolver.prototype, 'resolveTxt')
stub.onCall(0).callsFake(() => Promise.resolve(getDnsaddrStub(remoteId)))
stub.onCall(1).callsFake(() => Promise.reject(new Error()))
stub.callsFake(() => Promise.resolve(getDnsRelayedAddrStub(remoteId)))
// Dial with address resolve
const connection = await libp2p.dial(dialAddr)
expect(connection).to.exist()
expect(connection.remoteAddr.equals(relayedAddrFetched))
const dialArgs = transport.dial.firstCall.args
expect(dialArgs[0].equals(relayedAddrFetched)).to.eql(true)
})
it('fails to dial if resolve fails and there are no addresses to dial', async () => {
const remoteId = remoteLibp2p.peerId.toB58String()
const dialAddr = multiaddr(`/dnsaddr/remote.libp2p.io/p2p/${remoteId}`)
// Stub resolver
const stubResolve = sinon.stub(Resolver.prototype, 'resolveTxt')
stubResolve.returns(Promise.reject(new Error()))
// Stub transport
const transport = libp2p.transportManager._transports.get('WebSockets')
const spy = sinon.spy(transport, 'dial')
await expect(libp2p.dial(dialAddr))
.to.eventually.be.rejectedWith(Error)
.and.to.have.nested.property('.code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
expect(spy.callCount).to.eql(0)
})
})