mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
fix: add tracked map (#1069)
Small refactor of the component stats - adds a `TrackedMap` which encapsulates updating the metrics and means we don't need to null guard on `this._metrics` everywhere. If metrics are not enabled a regular `Map` is used.
This commit is contained in:
parent
0a485d07b3
commit
b425fa1230
@ -12,7 +12,7 @@ const LatencyMonitor = require('./latency-monitor')
|
|||||||
const retimer = require('retimer')
|
const retimer = require('retimer')
|
||||||
|
|
||||||
const { EventEmitter } = require('events')
|
const { EventEmitter } = require('events')
|
||||||
|
const trackedMap = require('../metrics/tracked-map')
|
||||||
const PeerId = require('peer-id')
|
const PeerId = require('peer-id')
|
||||||
|
|
||||||
const {
|
const {
|
||||||
@ -34,7 +34,7 @@ const defaultOptions = {
|
|||||||
|
|
||||||
const METRICS_COMPONENT = 'connection-manager'
|
const METRICS_COMPONENT = 'connection-manager'
|
||||||
const METRICS_PEER_CONNECTIONS = 'peer-connections'
|
const METRICS_PEER_CONNECTIONS = 'peer-connections'
|
||||||
const METRICS_ALL_CONNECTIONS = 'all-connections'
|
const METRICS_PEER_VALUES = 'peer-values'
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('../')} Libp2p
|
* @typedef {import('../')} Libp2p
|
||||||
@ -87,14 +87,14 @@ class ConnectionManager extends EventEmitter {
|
|||||||
*
|
*
|
||||||
* @type {Map<string, number>}
|
* @type {Map<string, number>}
|
||||||
*/
|
*/
|
||||||
this._peerValues = new Map()
|
this._peerValues = trackedMap(METRICS_COMPONENT, METRICS_PEER_VALUES, this._libp2p.metrics)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map of connections per peer
|
* Map of connections per peer
|
||||||
*
|
*
|
||||||
* @type {Map<string, Connection[]>}
|
* @type {Map<string, Connection[]>}
|
||||||
*/
|
*/
|
||||||
this.connections = new Map()
|
this.connections = trackedMap(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this._libp2p.metrics)
|
||||||
|
|
||||||
this._started = false
|
this._started = false
|
||||||
this._timer = null
|
this._timer = null
|
||||||
@ -164,8 +164,6 @@ class ConnectionManager extends EventEmitter {
|
|||||||
|
|
||||||
await Promise.all(tasks)
|
await Promise.all(tasks)
|
||||||
this.connections.clear()
|
this.connections.clear()
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0)
|
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -222,8 +220,6 @@ class ConnectionManager extends EventEmitter {
|
|||||||
storedConn.push(connection)
|
storedConn.push(connection)
|
||||||
} else {
|
} else {
|
||||||
this.connections.set(peerIdStr, [connection])
|
this.connections.set(peerIdStr, [connection])
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
|
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey)
|
this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey)
|
||||||
@ -255,9 +251,6 @@ class ConnectionManager extends EventEmitter {
|
|||||||
|
|
||||||
this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer)
|
this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
|
|
||||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -14,7 +14,7 @@ const { setMaxListeners } = require('events')
|
|||||||
const DialRequest = require('./dial-request')
|
const DialRequest = require('./dial-request')
|
||||||
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
|
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
|
||||||
const getPeer = require('../get-peer')
|
const getPeer = require('../get-peer')
|
||||||
|
const trackedMap = require('../metrics/tracked-map')
|
||||||
const { codes } = require('../errors')
|
const { codes } = require('../errors')
|
||||||
const {
|
const {
|
||||||
DIAL_TIMEOUT,
|
DIAL_TIMEOUT,
|
||||||
@ -86,9 +86,12 @@ class Dialer {
|
|||||||
this.timeout = dialTimeout
|
this.timeout = dialTimeout
|
||||||
this.maxDialsPerPeer = maxDialsPerPeer
|
this.maxDialsPerPeer = maxDialsPerPeer
|
||||||
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
|
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
|
||||||
this._pendingDials = new Map()
|
|
||||||
this._pendingDialTargets = new Map()
|
/** @type {Map<string, PendingDial>} */
|
||||||
this._metrics = metrics
|
this._pendingDials = trackedMap(METRICS_COMPONENT, METRICS_PENDING_DIALS, metrics)
|
||||||
|
|
||||||
|
/** @type {Map<string, { resolve: (value: any) => void, reject: (err: Error) => void}>} */
|
||||||
|
this._pendingDialTargets = trackedMap(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, metrics)
|
||||||
|
|
||||||
for (const [key, value] of Object.entries(resolvers)) {
|
for (const [key, value] of Object.entries(resolvers)) {
|
||||||
Multiaddr.resolvers.set(key, value)
|
Multiaddr.resolvers.set(key, value)
|
||||||
@ -112,9 +115,6 @@ class Dialer {
|
|||||||
pendingTarget.reject(new AbortError('Dialer was destroyed'))
|
pendingTarget.reject(new AbortError('Dialer was destroyed'))
|
||||||
}
|
}
|
||||||
this._pendingDialTargets.clear()
|
this._pendingDialTargets.clear()
|
||||||
|
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0)
|
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -164,7 +164,6 @@ class Dialer {
|
|||||||
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
|
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
|
||||||
const cancellablePromise = new Promise((resolve, reject) => {
|
const cancellablePromise = new Promise((resolve, reject) => {
|
||||||
this._pendingDialTargets.set(id, { resolve, reject })
|
this._pendingDialTargets.set(id, { resolve, reject })
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -176,7 +175,6 @@ class Dialer {
|
|||||||
return dialTarget
|
return dialTarget
|
||||||
} finally {
|
} finally {
|
||||||
this._pendingDialTargets.delete(id)
|
this._pendingDialTargets.delete(id)
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -269,13 +267,10 @@ class Dialer {
|
|||||||
destroy: () => {
|
destroy: () => {
|
||||||
timeoutController.clear()
|
timeoutController.clear()
|
||||||
this._pendingDials.delete(dialTarget.id)
|
this._pendingDials.delete(dialTarget.id)
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this._pendingDials.set(dialTarget.id, pendingDial)
|
this._pendingDials.set(dialTarget.id, pendingDial)
|
||||||
|
|
||||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)
|
|
||||||
|
|
||||||
return pendingDial
|
return pendingDial
|
||||||
}
|
}
|
||||||
|
|
||||||
|
18
src/index.js
18
src/index.js
@ -161,6 +161,15 @@ class Libp2p extends EventEmitter {
|
|||||||
this.peerId = this._options.peerId
|
this.peerId = this._options.peerId
|
||||||
this.datastore = this._options.datastore
|
this.datastore = this._options.datastore
|
||||||
|
|
||||||
|
// Create Metrics
|
||||||
|
if (this._options.metrics.enabled) {
|
||||||
|
const metrics = new Metrics({
|
||||||
|
...this._options.metrics
|
||||||
|
})
|
||||||
|
|
||||||
|
this.metrics = metrics
|
||||||
|
}
|
||||||
|
|
||||||
this.peerStore = (this.datastore && this._options.peerStore.persistence)
|
this.peerStore = (this.datastore && this._options.peerStore.persistence)
|
||||||
? new PersistentPeerStore({
|
? new PersistentPeerStore({
|
||||||
peerId: this.peerId,
|
peerId: this.peerId,
|
||||||
@ -195,15 +204,6 @@ class Libp2p extends EventEmitter {
|
|||||||
autoDialInterval: this._options.connectionManager.autoDialInterval
|
autoDialInterval: this._options.connectionManager.autoDialInterval
|
||||||
})
|
})
|
||||||
|
|
||||||
// Create Metrics
|
|
||||||
if (this._options.metrics.enabled) {
|
|
||||||
const metrics = new Metrics({
|
|
||||||
...this._options.metrics
|
|
||||||
})
|
|
||||||
|
|
||||||
this.metrics = metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create keychain
|
// Create keychain
|
||||||
if (this._options.keychain && this._options.keychain.datastore) {
|
if (this._options.keychain && this._options.keychain.datastore) {
|
||||||
log('creating keychain')
|
log('creating keychain')
|
||||||
|
62
src/metrics/tracked-map.js
Normal file
62
src/metrics/tracked-map.js
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @template K
|
||||||
|
* @template V
|
||||||
|
*/
|
||||||
|
class TrackedMap extends Map {
|
||||||
|
/**
|
||||||
|
* @param {string} component
|
||||||
|
* @param {string} name
|
||||||
|
* @param {import('.')} metrics
|
||||||
|
*/
|
||||||
|
constructor (component, name, metrics) {
|
||||||
|
super()
|
||||||
|
|
||||||
|
this._component = component
|
||||||
|
this._name = name
|
||||||
|
this._metrics = metrics
|
||||||
|
|
||||||
|
this._metrics.updateComponentMetric(this._component, this._name, this.size)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {K} key
|
||||||
|
* @param {V} value
|
||||||
|
*/
|
||||||
|
set (key, value) {
|
||||||
|
super.set(key, value)
|
||||||
|
this._metrics.updateComponentMetric(this._component, this._name, this.size)
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {K} key
|
||||||
|
*/
|
||||||
|
delete (key) {
|
||||||
|
const deleted = super.delete(key)
|
||||||
|
this._metrics.updateComponentMetric(this._component, this._name, this.size)
|
||||||
|
return deleted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @template K
|
||||||
|
* @template V
|
||||||
|
* @param {string} component
|
||||||
|
* @param {string} name
|
||||||
|
* @param {import('.')} [metrics]
|
||||||
|
* @returns {Map<K, V>}
|
||||||
|
*/
|
||||||
|
module.exports = (component, name, metrics) => {
|
||||||
|
/** @type {Map<K, V>} */
|
||||||
|
let map
|
||||||
|
|
||||||
|
if (metrics) {
|
||||||
|
map = new TrackedMap(component, name, metrics)
|
||||||
|
} else {
|
||||||
|
map = new Map()
|
||||||
|
}
|
||||||
|
|
||||||
|
return map
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user