mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-28 08:21:33 +00:00
refactor: stats (#501)
* docs: add initial notes on stats * feat: initial refactor of stats to metrics * feat: add support for placeholder metrics This is helpful for tracking metrics prior to knowing the remote peers id * fix: add metrics tests and fix issues * fix: always clear the dial timeout timer * docs: add metrics to api doc * chore: apply suggestions from code review Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio> * docs: update metrics docs * fix: call metrics.onDisconnect * docs(config): add example headers so they appear in the TOC * docs(config): add metrics configuration * docs(relay): fix relay configuration docs
This commit is contained in:
259
test/metrics/index.spec.js
Normal file
259
test/metrics/index.spec.js
Normal file
@ -0,0 +1,259 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
const { randomBytes } = require('libp2p-crypto')
|
||||
const duplexPair = require('it-pair/duplex')
|
||||
const pipe = require('it-pipe')
|
||||
const concat = require('it-concat')
|
||||
const pushable = require('it-pushable')
|
||||
const { consume } = require('streaming-iterables')
|
||||
const delay = require('delay')
|
||||
|
||||
const Metrics = require('../../src/metrics')
|
||||
const Stats = require('../../src/metrics/stats')
|
||||
const { createPeerId } = require('../utils/creators/peer')
|
||||
|
||||
describe('Metrics', () => {
|
||||
let peerId
|
||||
let peerId2
|
||||
|
||||
before(async () => {
|
||||
[peerId, peerId2] = await createPeerId({ number: 2 })
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
it('should not track data if not started', async () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
|
||||
metrics.trackStream({
|
||||
stream: local,
|
||||
remotePeer: peerId
|
||||
})
|
||||
|
||||
// Echo back
|
||||
pipe(remote, remote)
|
||||
|
||||
const bytes = randomBytes(1024)
|
||||
|
||||
const results = await pipe(
|
||||
[bytes],
|
||||
local,
|
||||
concat
|
||||
)
|
||||
|
||||
// Flush the call stack
|
||||
await delay(0)
|
||||
|
||||
expect(results.length).to.eql(bytes.length)
|
||||
|
||||
expect(metrics.forPeer(peerId)).to.equal(undefined)
|
||||
expect(metrics.peers).to.eql([])
|
||||
const globalStats = metrics.global
|
||||
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(0)
|
||||
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(0)
|
||||
})
|
||||
|
||||
it('should be able to track a duplex stream', async () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
|
||||
metrics.trackStream({
|
||||
stream: local,
|
||||
remotePeer: peerId
|
||||
})
|
||||
metrics.start()
|
||||
|
||||
// Echo back
|
||||
pipe(remote, remote)
|
||||
|
||||
const bytes = randomBytes(1024)
|
||||
const input = (async function * () {
|
||||
let i = 0
|
||||
while (i < 10) {
|
||||
await delay(10)
|
||||
yield bytes
|
||||
i++
|
||||
}
|
||||
})()
|
||||
|
||||
const results = await pipe(
|
||||
input,
|
||||
local,
|
||||
concat
|
||||
)
|
||||
|
||||
// Flush the call stack
|
||||
await delay(0)
|
||||
|
||||
expect(results.length).to.eql(bytes.length * 10)
|
||||
|
||||
const stats = metrics.forPeer(peerId)
|
||||
expect(metrics.peers).to.eql([peerId.toString()])
|
||||
expect(stats.snapshot.dataReceived.toNumber()).to.equal(results.length)
|
||||
expect(stats.snapshot.dataSent.toNumber()).to.equal(results.length)
|
||||
|
||||
const globalStats = metrics.global
|
||||
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(results.length)
|
||||
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(results.length)
|
||||
})
|
||||
|
||||
it('should properly track global stats', async () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const [local2, remote2] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
const protocol = '/echo/1.0.0'
|
||||
metrics.start()
|
||||
|
||||
// Echo back remotes
|
||||
pipe(remote, remote)
|
||||
pipe(remote2, remote2)
|
||||
|
||||
metrics.trackStream({
|
||||
stream: local,
|
||||
remotePeer: peerId,
|
||||
protocol
|
||||
})
|
||||
metrics.trackStream({
|
||||
stream: local2,
|
||||
remotePeer: peerId2,
|
||||
protocol
|
||||
})
|
||||
|
||||
const bytes = randomBytes(1024)
|
||||
|
||||
await Promise.all([
|
||||
pipe([bytes], local, consume),
|
||||
pipe([bytes], local2, consume)
|
||||
])
|
||||
|
||||
// Flush the call stack
|
||||
await delay(0)
|
||||
|
||||
expect(metrics.peers).to.eql([peerId.toString(), peerId2.toString()])
|
||||
// Verify global metrics
|
||||
const globalStats = metrics.global
|
||||
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
|
||||
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
|
||||
|
||||
// Verify individual metrics
|
||||
for (const peer of [peerId, peerId2]) {
|
||||
const stats = metrics.forPeer(peer)
|
||||
|
||||
expect(stats.snapshot.dataReceived.toNumber()).to.equal(bytes.length)
|
||||
expect(stats.snapshot.dataSent.toNumber()).to.equal(bytes.length)
|
||||
}
|
||||
|
||||
// Verify protocol metrics
|
||||
const protocolStats = metrics.forProtocol(protocol)
|
||||
expect(metrics.protocols).to.eql([protocol])
|
||||
expect(protocolStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
|
||||
expect(protocolStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
|
||||
})
|
||||
|
||||
it('should be able to replace an existing peer', async () => {
|
||||
const [local, remote] = duplexPair()
|
||||
const metrics = new Metrics({
|
||||
computeThrottleMaxQueueSize: 1, // compute after every message
|
||||
movingAverageIntervals: [10, 100, 1000]
|
||||
})
|
||||
metrics.start()
|
||||
|
||||
// Echo back remotes
|
||||
pipe(remote, remote)
|
||||
|
||||
const mockPeer = {
|
||||
toString: () => 'a temporary id'
|
||||
}
|
||||
metrics.trackStream({
|
||||
stream: local,
|
||||
remotePeer: mockPeer
|
||||
})
|
||||
|
||||
const bytes = randomBytes(1024)
|
||||
const input = pushable()
|
||||
|
||||
const deferredPromise = pipe(input, local, consume)
|
||||
|
||||
input.push(bytes)
|
||||
|
||||
await delay(0)
|
||||
|
||||
metrics.updatePlaceholder(mockPeer.toString(), peerId)
|
||||
mockPeer.toString = peerId.toString.bind(peerId)
|
||||
|
||||
input.push(bytes)
|
||||
input.end()
|
||||
|
||||
await deferredPromise
|
||||
await delay(0)
|
||||
|
||||
expect(metrics.peers).to.eql([peerId.toString()])
|
||||
// Verify global metrics
|
||||
const globalStats = metrics.global
|
||||
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
|
||||
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
|
||||
|
||||
// Verify individual metrics
|
||||
const stats = metrics.forPeer(peerId)
|
||||
|
||||
expect(stats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
|
||||
expect(stats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
|
||||
})
|
||||
|
||||
it('should only keep track of a set number of disconnected peers', () => {
|
||||
const spies = []
|
||||
const trackedPeers = new Map([...new Array(50)].map((_, index) => {
|
||||
const stat = new Stats([], { movingAverageIntervals: [] })
|
||||
spies.push(sinon.spy(stat, 'stop'))
|
||||
return [String(index), stat]
|
||||
}))
|
||||
|
||||
const metrics = new Metrics({
|
||||
maxOldPeersRetention: 5 // Only keep track of 5
|
||||
})
|
||||
|
||||
// Clone so trackedPeers isn't modified
|
||||
metrics._peerStats = new Map(trackedPeers)
|
||||
|
||||
// Disconnect every peer
|
||||
for (const id of trackedPeers.keys()) {
|
||||
metrics.onPeerDisconnected({
|
||||
toString: () => id
|
||||
})
|
||||
}
|
||||
|
||||
// Verify only the last 5 have been retained
|
||||
expect(metrics.peers).to.have.length(0)
|
||||
const retainedPeers = []
|
||||
for (const id of trackedPeers.keys()) {
|
||||
const stat = metrics.forPeer(id)
|
||||
if (stat) retainedPeers.push(id)
|
||||
}
|
||||
expect(retainedPeers).to.eql(['45', '46', '47', '48', '49'])
|
||||
|
||||
// Verify all stats were stopped
|
||||
expect(spies).to.have.length(50)
|
||||
for (const spy of spies) {
|
||||
expect(spy).to.have.property('callCount', 1)
|
||||
}
|
||||
})
|
||||
})
|
Reference in New Issue
Block a user