fix: report dialer metrics (#1377)

Converts the dialer to a component so it can access metrics
This commit is contained in:
Alex Potsides 2022-09-09 19:00:11 +01:00 committed by GitHub
parent b87632f97f
commit 0218acfae2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 54 additions and 82 deletions

View File

@ -98,12 +98,13 @@
},
"dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/components": "^2.0.3",
"@libp2p/components": "^2.1.0",
"@libp2p/connection": "^4.0.1",
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-address-manager": "^1.0.2",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-connection-encrypter": "^2.0.1",
"@libp2p/interface-connection-manager": "^1.1.0",
"@libp2p/interface-content-routing": "^1.0.2",
"@libp2p/interface-dht": "^1.0.1",
"@libp2p/interface-metrics": "^3.0.0",

View File

@ -7,7 +7,7 @@ import { logger } from '@libp2p/logger'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Connection } from '@libp2p/interface-connection'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Dialer } from './index.js'
import type { Dialer } from '@libp2p/interface-connection-manager'
const log = logger('libp2p:dialer:dial-request')

View File

@ -24,10 +24,11 @@ import type { Startable } from '@libp2p/interfaces/startable'
import type { PeerId } from '@libp2p/interface-peer-id'
import { getPeer } from '../../get-peer.js'
import sort from 'it-sort'
import { Components, Initializable } from '@libp2p/components'
import type { Components } from '@libp2p/components'
import map from 'it-map'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { ComponentMetricsTracker } from '@libp2p/interface-metrics'
import type { Dialer } from '@libp2p/interface-connection-manager'
const log = logger('libp2p:dialer')
@ -85,8 +86,8 @@ export interface DialerInit {
metrics?: ComponentMetricsTracker
}
export class Dialer implements Startable, Initializable {
private components: Components = new Components()
export class DefaultDialer implements Startable, Dialer {
private readonly components: Components
private readonly addressSorter: AddressSorter
private readonly maxAddrsToDial: number
private readonly timeout: number
@ -96,13 +97,14 @@ export class Dialer implements Startable, Initializable {
public pendingDialTargets: Map<string, PendingDialTarget>
private started: boolean
constructor (init: DialerInit = {}) {
constructor (components: Components, init: DialerInit = {}) {
this.started = false
this.addressSorter = init.addressSorter ?? publicAddressesFirst
this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL
this.timeout = init.dialTimeout ?? DIAL_TIMEOUT
this.maxDialsPerPeer = init.maxDialsPerPeer ?? MAX_PER_PEER_DIALS
this.tokens = [...new Array(init.maxParallelDials ?? MAX_PARALLEL_DIALS)].map((_, index) => index)
this.components = components
this.pendingDials = trackedMap({
component: METRICS_COMPONENT,
metric: METRICS_PENDING_DIALS,
@ -111,7 +113,7 @@ export class Dialer implements Startable, Initializable {
this.pendingDialTargets = trackedMap({
component: METRICS_COMPONENT,
metric: METRICS_PENDING_DIAL_TARGETS,
metrics: init.metrics
metrics: components.getMetrics()
})
for (const [key, value] of Object.entries(init.resolvers ?? {})) {
@ -119,10 +121,6 @@ export class Dialer implements Startable, Initializable {
}
}
init (components: Components): void {
this.components = components
}
isStarted () {
return this.started
}

View File

@ -14,7 +14,6 @@ import type { Connection } from '@libp2p/interface-connection'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import { Components, Initializable } from '@libp2p/components'
import * as STATUS from '@libp2p/interface-connection/status'
import { Dialer } from './dialer/index.js'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
@ -144,7 +143,6 @@ export interface ConnectionManagerEvents {
* Responsible for managing known connections.
*/
export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable, Initializable {
public readonly dialer: Dialer
private components = new Components()
private readonly opts: Required<ConnectionManagerInit>
private readonly connections: Map<string, Connection[]>
@ -184,8 +182,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
setMaxListeners?.(Infinity, this)
} catch {}
this.dialer = new Dialer(this.opts)
this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)
@ -196,8 +192,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
init (components: Components): void {
this.components = components
this.dialer.init(components)
// track inbound/outbound connections
this.components.getMetrics()?.updateComponentMetric({
system: METRICS_SYSTEM,
@ -304,7 +298,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.latencyMonitor.start()
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this.latencyMonitor.addEventListener('data', this._onLatencyMeasure)
await this.dialer.start()
this.started = true
log('started')
@ -370,7 +363,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.latencyMonitor.removeEventListener('data', this._onLatencyMeasure)
this.latencyMonitor.stop()
await this.dialer.stop()
this.started = false
await this._close()
@ -526,7 +518,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}
try {
const connection = await this.dialer.dial(peerId, options)
const connection = await this.components.getDialer().dial(peerId, options)
let peerConnections = this.connections.get(peerId.toString())
if (peerConnections == null) {

View File

@ -49,6 +49,7 @@ import type { Metrics } from '@libp2p/interface-metrics'
import { DummyDHT } from './dht/dummy-dht.js'
import { DummyPubSub } from './pubsub/dummy-pubsub.js'
import { PeerSet } from '@libp2p/peer-collections'
import { DefaultDialer } from './connection-manager/dialer/index.js'
const log = logger('libp2p')
@ -128,6 +129,9 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout
}))
// Create the dialer
this.components.setDialer(new DefaultDialer(this.components, init.connectionManager))
// Create the Connection Manager
this.connectionManager = this.components.setConnectionManager(new DefaultConnectionManager(init.connectionManager))
@ -338,7 +342,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
)
await Promise.all(
this.services.map(servce => servce.stop())
this.services.map(service => service.stop())
)
await Promise.all(

View File

@ -9,7 +9,8 @@ import { DialAction, DialRequest } from '../../src/connection-manager/dialer/dia
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { Multiaddr } from '@multiformats/multiaddr'
import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultDialer } from '../../src/connection-manager/dialer/index.js'
import { Components } from '@libp2p/components'
const error = new Error('dial failure')
describe('Dial Request', () => {
@ -23,7 +24,7 @@ describe('Dial Request', () => {
}
const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController()
const dialer = new Dialer({
const dialer = new DefaultDialer(new Components(), {
maxParallelDials: 2
})
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -53,7 +54,7 @@ describe('Dial Request', () => {
}
const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController()
const dialer = new Dialer({
const dialer = new DefaultDialer(new Components(), {
maxParallelDials: 2
})
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -98,7 +99,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]()
const addrs = Object.keys(actions)
const controller = new AbortController()
const dialer = new Dialer({
const dialer = new DefaultDialer(new Components(), {
maxParallelDials: 2
})
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -138,7 +139,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController()
const dialer = new Dialer({
const dialer = new DefaultDialer(new Components(), {
maxParallelDials: 2
})
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -184,7 +185,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]()
const addrs = Object.keys(actions)
const controller = new AbortController()
const dialer = new Dialer({
const dialer = new DefaultDialer(new Components(), {
maxParallelDials: 2
})
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -237,7 +238,7 @@ describe('Dial Request', () => {
const dialRequest = new DialRequest({
addrs: Object.keys(actions).map(str => new Multiaddr(str)),
dialer: new Dialer({
dialer: new DefaultDialer(new Components(), {
maxParallelDials: 3
}),
dialAction: async (ma, opts) => {

View File

@ -17,7 +17,7 @@ import { Connection, isConnection } from '@libp2p/interface-connection'
import { AbortError } from '@libp2p/interfaces/errors'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MemoryDatastore } from 'datastore-core/memory'
import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultDialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js'
@ -95,8 +95,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const connection = await dialer.dial(remoteAddr)
expect(connection).to.exist()
@ -104,8 +103,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
await expect(dialer.dial(unsupportedAddr))
.to.eventually.be.rejectedWith(Error)
@ -113,8 +111,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should fail to connect if peer has no known addresses', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const peerId = await createFromJSON(Peers[1])
await expect(dialer.dial(peerId))
@ -125,8 +122,7 @@ describe('Dialing (direct, TCP)', () => {
it('should be able to connect to a given peer id', async () => {
await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs())
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const connection = await dialer.dial(remoteComponents.getPeerId())
expect(connection).to.exist()
@ -136,8 +132,7 @@ describe('Dialing (direct, TCP)', () => {
it('should fail to connect to a given peer with unsupported addresses', async () => {
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr])
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
await expect(dialer.dial(remoteComponents.getPeerId()))
.to.eventually.be.rejectedWith(Error)
@ -150,8 +145,7 @@ describe('Dialing (direct, TCP)', () => {
const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr])
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
sinon.spy(localTM, 'dial')
const connection = await dialer.dial(peerId)
@ -162,10 +156,9 @@ describe('Dialing (direct, TCP)', () => {
})
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
dialTimeout: 50
})
dialer.init(localComponents)
sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => {
expect(options.signal).to.exist()
@ -191,10 +184,9 @@ describe('Dialing (direct, TCP)', () => {
await localComponents.getPeerStore().addressBook.add(peerId, addrs)
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
maxParallelDials: 2
})
dialer.init(localComponents)
expect(dialer.tokens).to.have.lengthOf(2)

View File

@ -13,7 +13,7 @@ import { AbortError } from '@libp2p/interfaces/errors'
import { MemoryDatastore } from 'datastore-core/memory'
import { codes as ErrorCodes } from '../../src/errors.js'
import * as Constants from '../../src/constants.js'
import { Dialer, DialTarget } from '../../src/connection-manager/dialer/index.js'
import { DefaultDialer, DialTarget } from '../../src/connection-manager/dialer/index.js'
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js'
@ -72,8 +72,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should limit the number of tokens it provides', () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const maxPerPeer = Constants.MAX_PER_PEER_DIALS
expect(dialer.tokens).to.have.lengthOf(Constants.MAX_PARALLEL_DIALS)
@ -83,10 +82,9 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should not return tokens if none are left', () => {
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
maxDialsPerPeer: Infinity
})
dialer.init(localComponents)
const maxTokens = dialer.tokens.length
@ -97,8 +95,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should NOT be able to return a token twice', () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const tokens = dialer.getTokens(1)
expect(tokens).to.have.length(1)
@ -109,8 +106,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -121,8 +117,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
await expect(dialer.dial(unsupportedAddr.encapsulate(`/p2p/${remoteComponents.getPeerId().toString()}`)))
.to.eventually.be.rejectedWith(Error)
@ -130,8 +125,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should be able to connect to a given peer', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -142,8 +136,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should fail to connect to a given peer with unsupported addresses', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [unsupportedAddr])
@ -154,10 +147,9 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
dialTimeout: 50
})
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -177,10 +169,9 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should throw when a peer advertises more than the allowed number of peers', async () => {
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
maxAddrsToDial: 10
})
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, Array.from({ length: 11 }, (_, i) => new Multiaddr(`/ip4/127.0.0.1/tcp/1500${i}/ws/p2p/12D3KooWHFKTMzwerBtsVmtz4ZZEQy2heafxzWw6wNn5PPYkBxJ5`)))
@ -200,11 +191,10 @@ describe('Dialing (direct, WebSockets)', () => {
const publicAddressesFirstSpy = sinon.spy(publicAddressesFirst)
const localTMDialStub = sinon.stub(localTM, 'dial').callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), peerIdFromString(ma.getPeerId() ?? ''))))
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
addressSorter: publicAddressesFirstSpy,
maxParallelDials: 3
})
dialer.init(localComponents)
// Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), peerMultiaddrs)
@ -229,10 +219,9 @@ describe('Dialing (direct, WebSockets)', () => {
]
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
maxParallelDials: 2
})
dialer.init(localComponents)
// Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remotePeerId, addrs)
@ -268,10 +257,9 @@ describe('Dialing (direct, WebSockets)', () => {
new Multiaddr('/ip4/0.0.0.0/tcp/8001/ws'),
new Multiaddr('/ip4/0.0.0.0/tcp/8002/ws')
]
const dialer = new Dialer({
const dialer = new DefaultDialer(localComponents, {
maxParallelDials: 2
})
dialer.init(localComponents)
// Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), addrs)
@ -309,8 +297,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should cancel pending dial targets before proceeding', async () => {
const dialer = new Dialer()
dialer.init(localComponents)
const dialer = new DefaultDialer(localComponents)
sinon.stub(dialer, '_createDialTarget').callsFake(async () => {
const deferredDial = pDefer<DialTarget>()
@ -364,8 +351,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
]
})
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialer = connectionManager.dialer
const dialer = libp2p.components.getDialer()
expect(dialer).to.exist()
expect(dialer).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS)
@ -395,8 +381,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
}
libp2p = await createLibp2pNode(config)
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialer = connectionManager.dialer
const dialer = libp2p.components.getDialer()
expect(dialer).to.exist()
expect(dialer).to.have.property('tokens').with.lengthOf(config.connectionManager.maxParallelDials)
@ -420,8 +405,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
]
})
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialerDialSpy = sinon.spy(connectionManager.dialer, 'dial')
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial')
const addressBookAddSpy = sinon.spy(libp2p.components.getPeerStore().addressBook, 'add')
await libp2p.start()
@ -543,8 +527,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
]
})
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
sinon.stub(connectionManager.dialer, '_createDialTarget').callsFake(async () => {
const dialer = libp2p.components.getDialer() as DefaultDialer
sinon.stub(dialer, '_createDialTarget').callsFake(async () => {
const deferredDial = pDefer<DialTarget>()
return await deferredDial.promise
})
@ -582,8 +566,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
await libp2p.start()
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialerDestroyStub = sinon.spy(connectionManager.dialer, 'stop')
const dialer = libp2p.components.getDialer() as DefaultDialer
const dialerDestroyStub = sinon.spy(dialer, 'stop')
await libp2p.stop()