feat: custom dialer addr sorter (#792)

* feat: custom dialer addr sorter

* chore: use libp2p utils sorter via addressBook getMultiaddrsForPeer

* chore: use new libp2p utils

* chore: apply suggestions from code review

Co-authored-by: Jacob Heun <jacobheun@gmail.com>

Co-authored-by: Jacob Heun <jacobheun@gmail.com>
This commit is contained in:
Vasco Santos 2020-11-20 15:16:40 +01:00 committed by Vasco Santos
parent e50c6abcf2
commit 585ad52b4c
9 changed files with 76 additions and 57 deletions

View File

@ -499,6 +499,7 @@ Dialing in libp2p can be configured to limit the rate of dialing, and how long d
| maxDialsPerPeer | `number` | How many multiaddrs we can dial per peer, in parallel. | | maxDialsPerPeer | `number` | How many multiaddrs we can dial per peer, in parallel. |
| dialTimeout | `number` | Second dial timeout per peer in ms. | | dialTimeout | `number` | Second dial timeout per peer in ms. |
| resolvers | `object` | Dial [Resolvers](https://github.com/multiformats/js-multiaddr/blob/master/src/resolvers/index.js) for resolving multiaddrs | | resolvers | `object` | Dial [Resolvers](https://github.com/multiformats/js-multiaddr/blob/master/src/resolvers/index.js) for resolving multiaddrs |
| addressSorter | `(Array<Address>) => Array<Address>` | Sort the known addresses of a peer before trying to dial. |
The below configuration example shows how the dialer should be configured, with the current defaults: The below configuration example shows how the dialer should be configured, with the current defaults:
@ -509,6 +510,7 @@ const MPLEX = require('libp2p-mplex')
const { NOISE } = require('libp2p-noise') const { NOISE } = require('libp2p-noise')
const { dnsaddrResolver } = require('multiaddr/src/resolvers') const { dnsaddrResolver } = require('multiaddr/src/resolvers')
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
const node = await Libp2p.create({ const node = await Libp2p.create({
modules: { modules: {
@ -522,7 +524,8 @@ const node = await Libp2p.create({
dialTimeout: 30e3, dialTimeout: 30e3,
resolvers: { resolvers: {
dnsaddr: dnsaddrResolver dnsaddr: dnsaddrResolver
} },
addressSorter: publicAddressesFirst
} }
``` ```

View File

@ -61,7 +61,7 @@
"it-protocol-buffers": "^0.2.0", "it-protocol-buffers": "^0.2.0",
"libp2p-crypto": "^0.18.0", "libp2p-crypto": "^0.18.0",
"libp2p-interfaces": "^0.7.2", "libp2p-interfaces": "^0.7.2",
"libp2p-utils": "^0.2.1", "libp2p-utils": "^0.2.2",
"mafmt": "^8.0.0", "mafmt": "^8.0.0",
"merge-options": "^2.0.0", "merge-options": "^2.0.0",
"moving-average": "^1.0.0", "moving-average": "^1.0.0",

View File

@ -4,8 +4,6 @@ const debug = require('debug')
const log = debug('libp2p:auto-relay') const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error') log.error = debug('libp2p:auto-relay:error')
const isPrivate = require('libp2p-utils/src/multiaddr/is-private')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayToString = require('uint8arrays/to-string')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
@ -36,6 +34,7 @@ class AutoRelay {
this._peerStore = libp2p.peerStore this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager this._connectionManager = libp2p.connectionManager
this._transportManager = libp2p.transportManager this._transportManager = libp2p.transportManager
this._addressSorter = libp2p.dialer.addressSorter
this.maxListeners = maxListeners this.maxListeners = maxListeners
@ -129,37 +128,19 @@ class AutoRelay {
return return
} }
// Create relay listen addr // Get peer known addresses and sort them per public addresses first
let listenAddr, remoteMultiaddr, remoteAddrs const remoteAddrs = this._peerStore.addressBook.getMultiaddrsForPeer(
connection.remotePeer, this._addressSorter
)
try { if (!remoteAddrs || !remoteAddrs.length) {
// Get peer known addresses and sort them per public addresses first return
remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
// TODO: This sort should be customizable in the config (dialer addr sort)
remoteAddrs.sort(multiaddrsCompareFunction)
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
// TODO: HOP Relays should avoid advertising private addresses!
} catch (_) {
log.error(`${id} does not have announced certified multiaddrs`)
// Attempt first if existing
if (!remoteAddrs || !remoteAddrs.length) {
return
}
remoteMultiaddr = remoteAddrs[0].multiaddr
} }
if (!remoteMultiaddr.protoNames().includes('p2p')) { const listenAddr = `${remoteAddrs[0].toString()}/p2p-circuit`
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
}
// Attempt to listen on relay
this._listenRelays.add(id) this._listenRelays.add(id)
// Attempt to listen on relay
try { try {
await this._transportManager.listen([multiaddr(listenAddr)]) await this._transportManager.listen([multiaddr(listenAddr)])
// Announce multiaddrs will update on listen success by TransportManager event being triggered // Announce multiaddrs will update on listen success by TransportManager event being triggered
@ -269,24 +250,4 @@ class AutoRelay {
} }
} }
/**
* Compare function for array.sort().
* This sort aims to move the private adresses to the end of the array.
*
* @param {Address} a
* @param {Address} b
* @returns {number}
*/
function multiaddrsCompareFunction (a, b) {
const isAPrivate = isPrivate(a.multiaddr)
const isBPrivate = isPrivate(b.multiaddr)
if (isAPrivate && !isBPrivate) {
return 1
} else if (!isAPrivate && isBPrivate) {
return -1
}
return 0
}
module.exports = AutoRelay module.exports = AutoRelay

View File

@ -7,6 +7,7 @@ const Constants = require('./constants')
const { AGENT_VERSION } = require('./identify/consts') const { AGENT_VERSION } = require('./identify/consts')
const RelayConstants = require('./circuit/constants') const RelayConstants = require('./circuit/constants')
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
const { FaultTolerance } = require('./transport-manager') const { FaultTolerance } = require('./transport-manager')
const DefaultConfig = { const DefaultConfig = {
@ -27,7 +28,8 @@ const DefaultConfig = {
dialTimeout: Constants.DIAL_TIMEOUT, dialTimeout: Constants.DIAL_TIMEOUT,
resolvers: { resolvers: {
dnsaddr: dnsaddrResolver dnsaddr: dnsaddrResolver
} },
addressSorter: publicAddressesFirst
}, },
host: { host: {
agentVersion: AGENT_VERSION agentVersion: AGENT_VERSION

View File

@ -9,6 +9,7 @@ const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error') log.error = debug('libp2p:dialer:error')
const { DialRequest } = require('./dial-request') const { DialRequest } = require('./dial-request')
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
const getPeer = require('../get-peer') const getPeer = require('../get-peer')
const { codes } = require('../errors') const { codes } = require('../errors')
@ -24,6 +25,7 @@ class Dialer {
* @param {object} options * @param {object} options
* @param {TransportManager} options.transportManager * @param {TransportManager} options.transportManager
* @param {Peerstore} options.peerStore * @param {Peerstore} options.peerStore
* @param {(addresses: Array<Address) => Array<Address>} [options.addressSorter = publicAddressesFirst] - Sort the known addresses of a peer before trying to dial.
* @param {number} [options.concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials. * @param {number} [options.concurrency = MAX_PARALLEL_DIALS] - Number of max concurrent dials.
* @param {number} [options.perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer. * @param {number} [options.perPeerLimit = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer.
* @param {number} [options.timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take. * @param {number} [options.timeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take.
@ -32,6 +34,7 @@ class Dialer {
constructor ({ constructor ({
transportManager, transportManager,
peerStore, peerStore,
addressSorter = publicAddressesFirst,
concurrency = MAX_PARALLEL_DIALS, concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT, timeout = DIAL_TIMEOUT,
perPeerLimit = MAX_PER_PEER_DIALS, perPeerLimit = MAX_PER_PEER_DIALS,
@ -39,6 +42,7 @@ class Dialer {
}) { }) {
this.transportManager = transportManager this.transportManager = transportManager
this.peerStore = peerStore this.peerStore = peerStore
this.addressSorter = addressSorter
this.concurrency = concurrency this.concurrency = concurrency
this.timeout = timeout this.timeout = timeout
this.perPeerLimit = perPeerLimit this.perPeerLimit = perPeerLimit
@ -120,7 +124,7 @@ class Dialer {
this.peerStore.addressBook.add(id, multiaddrs) this.peerStore.addressBook.add(id, multiaddrs)
} }
let knownAddrs = this.peerStore.addressBook.getMultiaddrsForPeer(id) || [] let knownAddrs = this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter) || []
// If received a multiaddr to dial, it should be the first to use // If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too. // But, if we know other multiaddrs for the peer, we should try them too.

View File

@ -136,7 +136,8 @@ class Libp2p extends EventEmitter {
concurrency: this._options.dialer.maxParallelDials, concurrency: this._options.dialer.maxParallelDials,
perPeerLimit: this._options.dialer.maxDialsPerPeer, perPeerLimit: this._options.dialer.maxDialsPerPeer,
timeout: this._options.dialer.dialTimeout, timeout: this._options.dialer.dialTimeout,
resolvers: this._options.dialer.resolvers resolvers: this._options.dialer.resolvers,
addressSorter: this._options.dialer.addressSorter
}) })
this._modules.transport.forEach((Transport) => { this._modules.transport.forEach((Transport) => {

View File

@ -319,20 +319,22 @@ class AddressBook extends Book {
* Returns `undefined` if there are no known multiaddrs for the given peer. * Returns `undefined` if there are no known multiaddrs for the given peer.
* *
* @param {PeerId} peerId * @param {PeerId} peerId
* @param {(addresses: Array<Address) => Array<Address>} [addressSorter]
* @returns {Array<Multiaddr>|undefined} * @returns {Array<Multiaddr>|undefined}
*/ */
getMultiaddrsForPeer (peerId) { getMultiaddrsForPeer (peerId, addressSorter = (ms) => ms) {
if (!PeerId.isPeerId(peerId)) { if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
} }
const entry = this.data.get(peerId.toB58String()) const entry = this.data.get(peerId.toB58String())
if (!entry || !entry.addresses) { if (!entry || !entry.addresses) {
return undefined return undefined
} }
return entry.addresses.map((address) => { return addressSorter(
entry.addresses || []
).map((address) => {
const multiaddr = address.multiaddr const multiaddr = address.multiaddr
const idString = multiaddr.getPeerId() const idString = multiaddr.getPeerId()

View File

@ -16,6 +16,7 @@ const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const { codes: ErrorCodes } = require('../../src/errors') const { codes: ErrorCodes } = require('../../src/errors')
const Constants = require('../../src/constants') const Constants = require('../../src/constants')
const Dialer = require('../../src/dialer') const Dialer = require('../../src/dialer')
const addressSort = require('libp2p-utils/src/address-sort')
const PeerStore = require('../../src/peer-store') const PeerStore = require('../../src/peer-store')
const TransportManager = require('../../src/transport-manager') const TransportManager = require('../../src/transport-manager')
const Libp2p = require('../../src') const Libp2p = require('../../src')
@ -44,6 +45,7 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
afterEach(() => { afterEach(() => {
peerStore.delete(peerId)
sinon.restore() sinon.restore()
}) })
@ -176,6 +178,37 @@ describe('Dialing (direct, WebSockets)', () => {
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
}) })
it('should sort addresses on dial', async () => {
const peerMultiaddrs = [
multiaddr('/ip4/127.0.0.1/tcp/15001/ws'),
multiaddr('/ip4/20.0.0.1/tcp/15001/ws'),
multiaddr('/ip4/30.0.0.1/tcp/15001/ws')
]
sinon.spy(addressSort, 'publicAddressesFirst')
sinon.stub(localTM, 'dial').callsFake(createMockConnection)
const dialer = new Dialer({
transportManager: localTM,
addressSorter: addressSort.publicAddressesFirst,
concurrency: 3,
peerStore
})
// Inject data in the AddressBook
peerStore.addressBook.add(peerId, peerMultiaddrs)
// Perform 3 multiaddr dials
await dialer.connectToPeer(peerId)
expect(addressSort.publicAddressesFirst.callCount).to.eql(1)
const sortedAddresses = addressSort.publicAddressesFirst(peerMultiaddrs.map((m) => ({ multiaddr: m })))
expect(localTM.dial.getCall(0).args[0].equals(sortedAddresses[0].multiaddr))
expect(localTM.dial.getCall(1).args[0].equals(sortedAddresses[1].multiaddr))
expect(localTM.dial.getCall(2).args[0].equals(sortedAddresses[2].multiaddr))
})
it('should dial to the max concurrency', async () => { it('should dial to the max concurrency', async () => {
const dialer = new Dialer({ const dialer = new Dialer({
transportManager: localTM, transportManager: localTM,

View File

@ -6,6 +6,7 @@ const { expect } = require('aegir/utils/chai')
const { Buffer } = require('buffer') const { Buffer } = require('buffer')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const arrayEquals = require('libp2p-utils/src/array-equals') const arrayEquals = require('libp2p-utils/src/array-equals')
const addressSort = require('libp2p-utils/src/address-sort')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const pDefer = require('p-defer') const pDefer = require('p-defer')
@ -19,7 +20,7 @@ const {
} = require('../../src/errors') } = require('../../src/errors')
const addr1 = multiaddr('/ip4/127.0.0.1/tcp/8000') const addr1 = multiaddr('/ip4/127.0.0.1/tcp/8000')
const addr2 = multiaddr('/ip4/127.0.0.1/tcp/8001') const addr2 = multiaddr('/ip4/20.0.0.1/tcp/8001')
const addr3 = multiaddr('/ip4/127.0.0.1/tcp/8002') const addr3 = multiaddr('/ip4/127.0.0.1/tcp/8002')
describe('addressBook', () => { describe('addressBook', () => {
@ -340,6 +341,18 @@ describe('addressBook', () => {
expect(m.getPeerId()).to.equal(peerId.toB58String()) expect(m.getPeerId()).to.equal(peerId.toB58String())
}) })
}) })
it('can sort multiaddrs providing a sorter', () => {
const supportedMultiaddrs = [addr1, addr2]
ab.set(peerId, supportedMultiaddrs)
const multiaddrs = ab.getMultiaddrsForPeer(peerId, addressSort.publicAddressesFirst)
const sortedAddresses = addressSort.publicAddressesFirst(supportedMultiaddrs.map((m) => ({ multiaddr: m })))
multiaddrs.forEach((m, index) => {
expect(m.equals(sortedAddresses[index].multiaddr))
})
})
}) })
describe('addressBook.delete', () => { describe('addressBook.delete', () => {