mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-18 01:51:57 +00:00
Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
0a485d07b3 | ||
|
0c3ed0a4ac | ||
|
09a0f940df | ||
|
a642ad2a03 | ||
|
8ce2f08589 | ||
|
fe0d9828bb | ||
|
c8e1b08c19 | ||
|
faf1f89d9e | ||
|
76f4ea5e8a | ||
|
2f0b311df7 |
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
- run: npx aegir lint
|
||||
- run: npx aegir build
|
||||
- run: npx aegir dep-check
|
||||
- uses: ipfs/aegir/actions/bundle-size@v32.1.0
|
||||
- uses: ipfs/aegir/actions/bundle-size
|
||||
name: size
|
||||
with:
|
||||
github_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
22
CHANGELOG.md
22
CHANGELOG.md
@@ -1,3 +1,25 @@
|
||||
## [0.35.6](https://github.com/libp2p/js-libp2p/compare/v0.35.5...v0.35.6) (2021-12-18)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* increase the maxlisteners for timeout controllers ([#1065](https://github.com/libp2p/js-libp2p/issues/1065)) ([09a0f94](https://github.com/libp2p/js-libp2p/commit/09a0f940df7fdb4ece34604e85693709df5c213e))
|
||||
|
||||
|
||||
|
||||
## [0.35.5](https://github.com/libp2p/js-libp2p/compare/v0.35.4...v0.35.5) (2021-12-15)
|
||||
|
||||
|
||||
|
||||
## [0.35.4](https://github.com/libp2p/js-libp2p/compare/v0.35.3...v0.35.4) (2021-12-15)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* allow per-component metrics to be collected ([#1061](https://github.com/libp2p/js-libp2p/issues/1061)) ([2f0b311](https://github.com/libp2p/js-libp2p/commit/2f0b311df7127aa44512c2008142d4ca30268986)), closes [#1060](https://github.com/libp2p/js-libp2p/issues/1060)
|
||||
|
||||
|
||||
|
||||
## [0.35.3](https://github.com/libp2p/js-libp2p/compare/v0.35.2...v0.35.3) (2021-12-13)
|
||||
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "libp2p",
|
||||
"version": "0.35.3",
|
||||
"version": "0.35.6",
|
||||
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
|
||||
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
|
||||
"main": "src/index.js",
|
||||
@@ -152,7 +152,7 @@
|
||||
"libp2p-bootstrap": "^0.14.0",
|
||||
"libp2p-delegated-content-routing": "^0.11.0",
|
||||
"libp2p-delegated-peer-routing": "^0.11.1",
|
||||
"libp2p-floodsub": "^0.27.0",
|
||||
"libp2p-floodsub": "^0.28.0",
|
||||
"libp2p-gossipsub": "^0.12.1",
|
||||
"libp2p-interfaces-compliance-tests": "^2.0.1",
|
||||
"libp2p-interop": "^0.5.0",
|
||||
|
@@ -32,6 +32,10 @@ const defaultOptions = {
|
||||
defaultPeerValue: 1
|
||||
}
|
||||
|
||||
const METRICS_COMPONENT = 'connection-manager'
|
||||
const METRICS_PEER_CONNECTIONS = 'peer-connections'
|
||||
const METRICS_ALL_CONNECTIONS = 'all-connections'
|
||||
|
||||
/**
|
||||
* @typedef {import('../')} Libp2p
|
||||
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
|
||||
@@ -160,6 +164,8 @@ class ConnectionManager extends EventEmitter {
|
||||
|
||||
await Promise.all(tasks)
|
||||
this.connections.clear()
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0)
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -211,10 +217,13 @@ class ConnectionManager extends EventEmitter {
|
||||
const storedConn = this.connections.get(peerIdStr)
|
||||
|
||||
this.emit('peer:connect', connection)
|
||||
|
||||
if (storedConn) {
|
||||
storedConn.push(connection)
|
||||
} else {
|
||||
this.connections.set(peerIdStr, [connection])
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
|
||||
}
|
||||
|
||||
this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey)
|
||||
@@ -243,7 +252,12 @@ class ConnectionManager extends EventEmitter {
|
||||
this.connections.delete(peerId)
|
||||
this._peerValues.delete(connection.remotePeer.toB58String())
|
||||
this.emit('peer:disconnect', connection)
|
||||
|
||||
this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer)
|
||||
}
|
||||
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
|
||||
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -9,7 +9,8 @@ const { Multiaddr } = require('multiaddr')
|
||||
const { TimeoutController } = require('timeout-abort-controller')
|
||||
const { AbortError } = require('abortable-iterator')
|
||||
const { anySignal } = require('any-signal')
|
||||
|
||||
// @ts-expect-error setMaxListeners is missing from the types
|
||||
const { setMaxListeners } = require('events')
|
||||
const DialRequest = require('./dial-request')
|
||||
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
|
||||
const getPeer = require('../get-peer')
|
||||
@@ -22,6 +23,10 @@ const {
|
||||
MAX_ADDRS_TO_DIAL
|
||||
} = require('../constants')
|
||||
|
||||
const METRICS_COMPONENT = 'dialler'
|
||||
const METRICS_PENDING_DIALS = 'pending-dials'
|
||||
const METRICS_PENDING_DIAL_TARGETS = 'pending-dial-targets'
|
||||
|
||||
/**
|
||||
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
|
||||
* @typedef {import('peer-id')} PeerId
|
||||
@@ -44,6 +49,7 @@ const {
|
||||
* @property {number} [maxDialsPerPeer = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer.
|
||||
* @property {number} [dialTimeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take.
|
||||
* @property {Record<string, Resolver>} [resolvers = {}] - multiaddr resolvers to use when dialing
|
||||
* @property {import('../metrics')} [metrics]
|
||||
*
|
||||
* @typedef DialTarget
|
||||
* @property {string} id
|
||||
@@ -69,7 +75,8 @@ class Dialer {
|
||||
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
|
||||
dialTimeout = DIAL_TIMEOUT,
|
||||
maxDialsPerPeer = MAX_PER_PEER_DIALS,
|
||||
resolvers = {}
|
||||
resolvers = {},
|
||||
metrics
|
||||
}) {
|
||||
this.transportManager = transportManager
|
||||
this.peerStore = peerStore
|
||||
@@ -81,6 +88,7 @@ class Dialer {
|
||||
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
|
||||
this._pendingDials = new Map()
|
||||
this._pendingDialTargets = new Map()
|
||||
this._metrics = metrics
|
||||
|
||||
for (const [key, value] of Object.entries(resolvers)) {
|
||||
Multiaddr.resolvers.set(key, value)
|
||||
@@ -104,6 +112,9 @@ class Dialer {
|
||||
pendingTarget.reject(new AbortError('Dialer was destroyed'))
|
||||
}
|
||||
this._pendingDialTargets.clear()
|
||||
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0)
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -153,6 +164,7 @@ class Dialer {
|
||||
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
|
||||
const cancellablePromise = new Promise((resolve, reject) => {
|
||||
this._pendingDialTargets.set(id, { resolve, reject })
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
|
||||
})
|
||||
|
||||
try {
|
||||
@@ -164,6 +176,7 @@ class Dialer {
|
||||
return dialTarget
|
||||
} finally {
|
||||
this._pendingDialTargets.delete(id)
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,6 +254,10 @@ class Dialer {
|
||||
|
||||
// Combine the timeout signal and options.signal, if provided
|
||||
const timeoutController = new TimeoutController(this.timeout)
|
||||
// this controller will potentially be used while dialing lots of
|
||||
// peers so prevent MaxListenersExceededWarning appearing in the console
|
||||
setMaxListeners && setMaxListeners(Infinity, timeoutController.signal)
|
||||
|
||||
const signals = [timeoutController.signal]
|
||||
options.signal && signals.push(options.signal)
|
||||
const signal = anySignal(signals)
|
||||
@@ -252,9 +269,13 @@ class Dialer {
|
||||
destroy: () => {
|
||||
timeoutController.clear()
|
||||
this._pendingDials.delete(dialTarget.id)
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)
|
||||
}
|
||||
}
|
||||
this._pendingDials.set(dialTarget.id, pendingDial)
|
||||
|
||||
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)
|
||||
|
||||
return pendingDial
|
||||
}
|
||||
|
||||
|
@@ -197,10 +197,11 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
// Create Metrics
|
||||
if (this._options.metrics.enabled) {
|
||||
this.metrics = new Metrics({
|
||||
...this._options.metrics,
|
||||
connectionManager: this.connectionManager
|
||||
const metrics = new Metrics({
|
||||
...this._options.metrics
|
||||
})
|
||||
|
||||
this.metrics = metrics
|
||||
}
|
||||
|
||||
// Create keychain
|
||||
@@ -262,6 +263,7 @@ class Libp2p extends EventEmitter {
|
||||
this.dialer = new Dialer({
|
||||
transportManager: this.transportManager,
|
||||
peerStore: this.peerStore,
|
||||
metrics: this.metrics,
|
||||
...this._options.dialer
|
||||
})
|
||||
|
||||
|
@@ -24,9 +24,6 @@ const directionToEvent = {
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef MetricsProperties
|
||||
* @property {import('../connection-manager')} connectionManager
|
||||
*
|
||||
* @typedef MetricsOptions
|
||||
* @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize]
|
||||
* @property {number} [computeThrottleTimeout = defaultOptions.computeThrottleTimeout]
|
||||
@@ -37,7 +34,7 @@ const directionToEvent = {
|
||||
class Metrics {
|
||||
/**
|
||||
* @class
|
||||
* @param {MetricsProperties & MetricsOptions} options
|
||||
* @param {MetricsOptions} options
|
||||
*/
|
||||
constructor (options) {
|
||||
this._options = mergeOptions(defaultOptions, options)
|
||||
@@ -47,10 +44,7 @@ class Metrics {
|
||||
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
|
||||
this._running = false
|
||||
this._onMessage = this._onMessage.bind(this)
|
||||
this._connectionManager = options.connectionManager
|
||||
this._connectionManager.on('peer:disconnect', (connection) => {
|
||||
this.onPeerDisconnected(connection.remotePeer)
|
||||
})
|
||||
this._componentMetrics = new Map()
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -94,6 +88,22 @@ class Metrics {
|
||||
return Array.from(this._peerStats.keys())
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {Map}
|
||||
*/
|
||||
getComponentMetrics () {
|
||||
return this._componentMetrics
|
||||
}
|
||||
|
||||
updateComponentMetric (component, metric, value) {
|
||||
if (!this._componentMetrics.has(component)) {
|
||||
this._componentMetrics.set(component, new Map())
|
||||
}
|
||||
|
||||
const map = this._componentMetrics.get(component)
|
||||
map.set(metric, value)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the `Stats` object for the given `PeerId` whether it
|
||||
* is a live peer, or in the disconnected peer LRU cache.
|
||||
|
@@ -24,6 +24,8 @@ const {
|
||||
// @ts-ignore module with no types
|
||||
} = require('set-delayed-interval')
|
||||
const { DHTPeerRouting } = require('./dht/dht-peer-routing')
|
||||
// @ts-expect-error setMaxListeners is missing from the types
|
||||
const { setMaxListeners } = require('events')
|
||||
|
||||
/**
|
||||
* @typedef {import('peer-id')} PeerId
|
||||
@@ -149,7 +151,12 @@ class PeerRouting {
|
||||
}
|
||||
|
||||
if (options.timeout) {
|
||||
options.signal = new TimeoutController(options.timeout).signal
|
||||
const controller = new TimeoutController(options.timeout)
|
||||
// this controller will potentially be used while dialing lots of
|
||||
// peers so prevent MaxListenersExceededWarning appearing in the console
|
||||
setMaxListeners && setMaxListeners(Infinity, controller.signal)
|
||||
|
||||
options.signal = controller.signal
|
||||
}
|
||||
|
||||
yield * pipe(
|
||||
|
@@ -3,9 +3,6 @@
|
||||
|
||||
const { expect } = require('aegir/utils/chai')
|
||||
const sinon = require('sinon')
|
||||
|
||||
const { EventEmitter } = require('events')
|
||||
|
||||
const { randomBytes } = require('libp2p-crypto')
|
||||
const duplexPair = require('it-pair/duplex')
|
||||
const pipe = require('it-pipe')
|
||||
@@ -34,8 +31,7 @@ describe('Metrics', () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000],
|
||||
connectionManager: new EventEmitter()
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
|
||||
metrics.trackStream({
|
||||
@@ -70,8 +66,7 @@ describe('Metrics', () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000],
|
||||
connectionManager: new EventEmitter()
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
|
||||
metrics.trackStream({
|
||||
@@ -119,8 +114,7 @@ describe('Metrics', () => {
|
||||
const [local2, remote2] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000],
|
||||
connectionManager: new EventEmitter()
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
const protocol = '/echo/1.0.0'
|
||||
metrics.start()
|
||||
@@ -175,8 +169,7 @@ describe('Metrics', () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000],
|
||||
connectionManager: new EventEmitter()
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
metrics.start()
|
||||
|
||||
@@ -231,8 +224,7 @@ describe('Metrics', () => {
|
||||
}))
|
||||
|
||||
const metrics = new Metrics({
|
||||
maxOldPeersRetention: 5, // Only keep track of 5
|
||||
connectionManager: new EventEmitter()
|
||||
maxOldPeersRetention: 5 // Only keep track of 5
|
||||
})
|
||||
|
||||
// Clone so trackedPeers isn't modified
|
||||
@@ -262,4 +254,22 @@ describe('Metrics', () => {
|
||||
expect(spy).to.have.property('callCount', 1)
|
||||
}
|
||||
})
|
||||
|
||||
it('should allow components to track metrics', () => {
|
||||
const metrics = new Metrics({
|
||||
maxOldPeersRetention: 5 // Only keep track of 5
|
||||
})
|
||||
|
||||
expect(metrics.getComponentMetrics()).to.be.empty()
|
||||
|
||||
const component = 'my-component'
|
||||
const metric = 'some-metric'
|
||||
const value = 1
|
||||
|
||||
metrics.updateComponentMetric(component, metric, value)
|
||||
|
||||
expect(metrics.getComponentMetrics()).to.have.lengthOf(1)
|
||||
expect(metrics.getComponentMetrics().get(component)).to.have.lengthOf(1)
|
||||
expect(metrics.getComponentMetrics().get(component).get(metric)).to.equal(value)
|
||||
})
|
||||
})
|
||||
|
Reference in New Issue
Block a user