refactor: stats (#501)

* docs: add initial notes on stats

* feat: initial refactor of stats to metrics

* feat: add support for placeholder metrics

This is helpful for tracking metrics prior to knowing the remote peers id

* fix: add metrics tests and fix issues

* fix: always clear the dial timeout timer

* docs: add metrics to api doc

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>

* docs: update metrics docs

* fix: call metrics.onDisconnect

* docs(config): add example headers so they appear in the TOC

* docs(config): add metrics configuration

* docs(relay): fix relay configuration docs
This commit is contained in:
Jacob Heun 2019-12-11 16:05:59 +01:00
parent 17b6a3fd73
commit dabee00127
19 changed files with 939 additions and 191 deletions

View File

@ -21,6 +21,13 @@
* [`pubsub.getTopics`](#pubsub.getTopics)
* [`pubsub.publish`](#pubsub.publish)
* [`pubsub.subscribe`](#pubsub.subscribe)
* [`metrics.global`](#metrics.global)
* [`metrics.peers`](#metrics.peers)
* [`metrics.protocols`](#metrics.protocols)
* [`metrics.forPeer`](#metrics.forPeer)
* [`metrics.forProtocol`](#metrics.forProtocol)
* [Types](#types)
* [`Stats`](#stats)
## Static Functions
@ -117,7 +124,7 @@ unless they are performing a specific action. See [peer discovery and auto dial]
</details>
## Libp2p Instance Methods
## Libp2p Instance Methods
### start
@ -635,3 +642,103 @@ const handler = (msg) => {
libp2p.pubsub.unsubscribe(topic, handler)
```
### metrics.global
A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
#### Example
```js
const peerIdStrings = libp2p.metrics.peers
```
### metrics.peers
An array of `PeerId` strings of each peer currently being tracked.
#### Example
```js
const peerIdStrings = libp2p.metrics.peers
```
### metrics.protocols
An array of protocol strings that are currently being tracked.
#### Example
```js
const protocols = libp2p.metrics.protocols
```
### metrics.forPeer
Returns the [`Stats`](#stats) object for a given `PeerId` if it is being tracked.
`libp2p.metrics.forPeer(peerId)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| peerId | `PeerId` | The peer to get stats for |
#### Returns
| Type | Description |
|------|-------------|
| [`Stats`](#stats) | The bandwidth stats of the peer |
#### Example
```js
const peerStats = libp2p.metrics.forPeer(peerInfo)
console.log(peerStats.toJSON())
```
### metrics.forProtocol
Returns the [`Stats`](#stats) object for a given protocol if it is being tracked.
`libp2p.metrics.forProtocol(protocol)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| protocol | `string` | The protocol to get stats for |
#### Returns
| Type | Description |
|------|-------------|
| [`Stats`](#stats) | The bandwidth stats of the protocol across all peers |
#### Example
```js
const peerStats = libp2p.metrics.forProtocol('/meshsub/1.0.0')
console.log(peerStats.toJSON())
```
## Types
### Stats
- `Stats`
- `toJSON<function()>`: Returns a JSON snapshot of the stats.
- `dataReceived<string>`: The stringified value of total incoming data for this stat.
- `dataSent<string>`: The stringified value of total outgoing data for this stat.
- `movingAverages<object>`: The properties are dependent on the configuration of the moving averages interval. Defaults are listed here.
- `['60000']<Number>`: The calculated moving average at a 1 minute interval.
- `['300000']<Number>`: The calculated moving average at a 5 minute interval.
- `['900000']<Number>`: The calculated moving average at a 15 minute interval.
- `snapshot<object>`: A getter that returns a clone of the raw stats.
- `dataReceived<BigNumber>`: A [`BigNumber`](https://github.com/MikeMcl/bignumber.js/) of the amount of incoming data
- `dataSent<BigNumber>`: A [`BigNumber`](https://github.com/MikeMcl/bignumber.js/) of the amount of outgoing data
- `movingAverages<object>`: A getter that returns a clone of the raw [moving averages](https://www.npmjs.com/package/moving-averages) stats. **Note**: The properties of this are dependent on configuration. The defaults are shown here.
- `['60000']<MovingAverage>`: The [MovingAverage](https://www.npmjs.com/package/moving-averages) at a 1 minute interval.
- `['300000']<MovingAverage>`: The [MovingAverage](https://www.npmjs.com/package/moving-averages) at a 5 minute interval.
- `['900000']<MovingAverage>`: The [MovingAverage](https://www.npmjs.com/package/moving-averages) at a 15 minute interval.

View File

@ -1,18 +1,26 @@
# Configuration
* [Overview](#overview)
* [Modules](#modules)
* [Transport](#transport)
* [Stream Multiplexing](#stream-multiplexing)
* [Connection Encryption](#connection-encryption)
* [Peer Discovery](#peer-discovery)
* [Content Routing](#content-routing)
* [Peer Routing](#peer-routing)
* [DHT](#dht)
* [Pubsub](#pubsub)
* [Customizing Libp2p](#customizing-libp2p)
* [Examples](#examples)
* [Configuration examples](#configuration-examples)
- [Configuration](#configuration)
- [Overview](#overview)
- [Modules](#modules)
- [Transport](#transport)
- [Stream Multiplexing](#stream-multiplexing)
- [Connection Encryption](#connection-encryption)
- [Peer Discovery](#peer-discovery)
- [Content Routing](#content-routing)
- [Peer Routing](#peer-routing)
- [DHT](#dht)
- [Pubsub](#pubsub)
- [Customizing libp2p](#customizing-libp2p)
- [Examples](#examples)
- [Basic setup](#basic-setup)
- [Customizing Peer Discovery](#customizing-peer-discovery)
- [Customizing Pubsub](#customizing-pubsub)
- [Customizing DHT](#customizing-dht)
- [Setup with Content and Peer Routing](#setup-with-content-and-peer-routing)
- [Setup with Relay](#setup-with-relay)
- [Configuring Metrics](#configuring-metrics)
- [Configuration examples](#configuration-examples)
## Overview
@ -45,7 +53,7 @@ Bear in mind that only a **transport** and **connection encryption** are require
Some available transports are:
- [libp2p/js-libp2p-tcp](https://github.com/libp2p/js-libp2p-tcp)
- [libp2p/js-libp2p-tcp](https://github.com/libp2p/js-libp2p-tcp)
- [libp2p/js-libp2p-webrtc-star](https://github.com/libp2p/js-libp2p-webrtc-star)
- [libp2p/js-libp2p-webrtc-direct](https://github.com/libp2p/js-libp2p-webrtc-direct)
- [libp2p/js-libp2p-websockets](https://github.com/libp2p/js-libp2p-websockets)
@ -151,7 +159,7 @@ If you want to know more about libp2p DHT, you should read the following content
- https://docs.libp2p.io/concepts/protocols/#kad-dht
- https://github.com/libp2p/specs/pull/108
#### Pubsub
### Pubsub
> Publish/Subscribe is a system where peers congregate around topics they are interested in. Peers interested in a topic are said to be subscribed to that topic and should receive the data published on it from other peers.
@ -194,7 +202,7 @@ Besides the `modules` and `config`, libp2p allows other internal options and con
### Examples
**1) Basic setup**
#### Basic setup
```js
// Creating a libp2p node with:
@ -229,7 +237,7 @@ const node = await Libp2p.create({
})
```
**2) Customizing Peer Discovery**
#### Customizing Peer Discovery
```js
const Libp2p = require('libp2p')
@ -248,7 +256,7 @@ const node = await Libp2p.create({
config: {
peerDiscovery: {
autoDial: true, // Auto connect to discovered peers (limited by ConnectionManager minPeers)
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
// The `tag` property will be searched when creating the instance of your Peer Discovery service.
// The associated object, will be passed to the service when it is instantiated.
[MulticastDNS.tag]: {
interval: 1000,
@ -260,7 +268,7 @@ const node = await Libp2p.create({
})
```
**3) Customizing Pubsub**
#### Customizing Pubsub
```js
const Libp2p = require('libp2p')
@ -287,7 +295,7 @@ const node = await Libp2p.create({
})
```
**4) Customizing DHT**
#### Customizing DHT
```js
const Libp2p = require('libp2p')
@ -317,7 +325,7 @@ const node = await Libp2p.create({
})
```
**5) Setup with Content and Peer Routing**
#### Setup with Content and Peer Routing
```js
const Libp2p = require('libp2p')
@ -347,7 +355,7 @@ const node = await Libp2p.create({
})
```
**6) Setup with Relay**
#### Setup with Relay
```js
const Libp2p = require('libp2p')
@ -361,13 +369,45 @@ const node = await Libp2p.create({
streamMuxer: [MPLEX],
connEncryption: [SECIO]
},
relay: { // Circuit Relay options (this config is part of libp2p core configurations)
enabled: true,
hop: {
enabled: true,
active: true
config: {
relay: { // Circuit Relay options (this config is part of libp2p core configurations)
enabled: true, // Allows you to dial and accept relayed connections. Does not make you a relay.
hop: {
enabled: true, // Allows you to be a relay for other peers
active: true // You will attempt to dial destination peers if you are not connected to them
}
}
}
})
```
#### Configuring Metrics
Metrics are disabled in libp2p by default. You can enable and configure them as follows. Aside from enabled being `false` by default, the configuration options listed here are the current defaults.
```js
const Libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const node = await Libp2p.create({
modules: {
transport: [TCP],
streamMuxer: [MPLEX],
connEncryption: [SECIO]
},
metrics: {
enabled: true,
computeThrottleMaxQueueSize: 1000, // How many messages a stat will queue before processing
computeThrottleTimeout: 2000, // Time in milliseconds a stat will wait, after the last item was added, before processing
movingAverageIntervals: [ // The moving averages that will be computed
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
],
maxOldPeersRetention: 50 // How many disconnected peers we will retain stats for
}
})
```

28
doc/METRICS.md Normal file
View File

@ -0,0 +1,28 @@
# Bandwidth Metrics
- Metrics gathering is optional, as there is a performance hit to using it.
- Metrics do NOT currently contain OS level stats, only libp2p application level Metrics. For example, TCP messages (ACK, FIN, etc) are not accounted for.
- See the [API](./API.md) for Metrics usage. Metrics in libp2p do not emit events, as such applications wishing to read Metrics will need to do so actively. This ensures that the system is not unnecessarily firing update notifications.
## Tracking
- When a transport hands off a connection for upgrading, Metrics are hooked up if enabled.
- When a stream is created, Metrics will be tracked on that stream and associated to that streams protocol.
- Tracked Metrics are associated to a specific peer, and count towards global bandwidth Metrics.
### Metrics Processing
- The main Metrics object consists of individual `Stats` objects
- The following categories are tracked:
- Global stats; every byte in and out
- Peer stats; every byte in and out, per peer
- Protocol stats; every byte in and out, per protocol
- When a message goes through Metrics:
- It is added to the global stat
- It is added to the stats for the remote peer
- It is added to the protocol stats if there is one
- When data is pushed onto a `Stat` it is added to a queue
- The queue is processed at the earliest of either (configurable):
- every 2 seconds after the last item was added to the queue
- or once 1000 items have been queued
- When the queue is processed:
- The data length is added to either the `in` or `out` stat
- The moving averages is calculated since the last queue processing (based on most recently processed item timestamp)

View File

@ -63,6 +63,7 @@
"moving-average": "^1.0.0",
"multiaddr": "^7.2.1",
"multistream-select": "^0.15.0",
"mutable-proxy": "^1.0.0",
"p-any": "^2.1.0",
"p-fifo": "^1.0.0",
"p-settle": "^3.1.0",
@ -82,7 +83,9 @@
"cids": "^0.7.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"it-concat": "^1.0.0",
"it-pair": "^1.0.0",
"it-pushable": "^1.4.0",
"libp2p-bootstrap": "^0.10.3",
"libp2p-delegated-content-routing": "^0.4.1",
"libp2p-delegated-peer-routing": "^0.4.0",

View File

@ -6,6 +6,9 @@ const DefaultConfig = {
connectionManager: {
minPeers: 25
},
metrics: {
enabled: false
},
config: {
dht: {
enabled: false,

View File

@ -9,5 +9,15 @@ module.exports = {
MAX_PER_PEER_DIALS: 4, // Allowed parallel dials per DialRequest
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
PRIORITY_LOW: 20,
METRICS: {
computeThrottleMaxQueueSize: 1000,
computeThrottleTimeout: 2000,
movingAverageIntervals: [
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
],
maxOldPeersRetention: 50
}
}

View File

@ -91,7 +91,6 @@ class Dialer {
try {
const dialResult = await dialRequest.run({ ...options, signal })
timeoutController.clear()
log('dial succeeded to %s', dialResult.remoteAddr)
return dialResult
} catch (err) {
@ -102,6 +101,7 @@ class Dialer {
log.error(err)
throw err
} finally {
timeoutController.clear()
this._pendingDials.delete(dial)
}
}

View File

@ -17,6 +17,7 @@ const { codes } = require('./errors')
const Circuit = require('./circuit')
const Dialer = require('./dialer')
const Metrics = require('./metrics')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
@ -51,9 +52,14 @@ class Libp2p extends EventEmitter {
this.peerStore = new PeerStore()
if (this._options.metrics.enabled) {
this.metrics = new Metrics(this._options.metrics)
}
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
metrics: this.metrics,
onConnection: (connection) => {
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
this.registrar.onConnect(peerInfo, connection)
@ -67,9 +73,13 @@ class Libp2p extends EventEmitter {
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.registrar.onDisconnect(peerInfo, connection)
this.emit('peer:disconnect', peerInfo)
// If there are no connections to the peer, disconnect
if (!this.registrar.getConnection(peerInfo)) {
this.emit('peer:disconnect', peerInfo)
this.metrics && this.metrics.onPeerDisconnected(peerInfo.id)
}
}
})
@ -200,15 +210,15 @@ class Libp2p extends EventEmitter {
try {
await Promise.all([
this.pubsub && this.pubsub.stop(),
this._dht && this._dht.stop()
this._dht && this._dht.stop(),
this.metrics && this.metrics.stop()
])
this.dialer.destroy()
await this.transportManager.close()
await this.registrar.close()
ping.unmount(this)
this.dialer.destroy()
} catch (err) {
if (err) {
log.error(err)
@ -356,6 +366,9 @@ class Libp2p extends EventEmitter {
// the other discovery modules
this._dht.on('peer', this._onDiscoveryPeer)
}
// Start metrics if present
this.metrics && this.metrics.start()
}
/**
@ -403,13 +416,13 @@ class Libp2p extends EventEmitter {
*/
async _maybeConnect (peerInfo) {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.connections.get(peerInfo)) {
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerInfo)) {
const minPeers = this._options.connectionManager.minPeers || 0
// TODO: This does not account for multiple connections to a peer
if (minPeers > this.registrar.connections.size) {
log('connecting to discovered peer')
try {
await this.dialer.connectToPeer(peerInfo)
await this.dialer.connectToPeer(peerInfo.id)
} catch (err) {
log.error('could not connect to discovered peer', err)
}

247
src/metrics/index.js Normal file
View File

@ -0,0 +1,247 @@
'use strict'
const mergeOptions = require('merge-options')
const pipe = require('it-pipe')
const { tap } = require('streaming-iterables')
const oldPeerLRU = require('./old-peers')
const { METRICS: defaultOptions } = require('../constants')
const Stats = require('./stats')
const initialCounters = [
'dataReceived',
'dataSent'
]
const directionToEvent = {
in: 'dataReceived',
out: 'dataSent'
}
class Metrics {
/**
*
* @param {object} options
* @param {number} options.computeThrottleMaxQueueSize
* @param {number} options.computeThrottleTimeout
* @param {Array<number>} options.movingAverageIntervals
* @param {number} options.maxOldPeersRetention
*/
constructor (options) {
this._options = mergeOptions(defaultOptions, options)
this._globalStats = new Stats(initialCounters, this._options)
this._peerStats = new Map()
this._protocolStats = new Map()
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
this._running = false
this._onMessage = this._onMessage.bind(this)
}
/**
* Must be called for stats to saved. Any data pushed for tracking
* will be ignored.
*/
start () {
this._running = true
}
/**
* Stops all averages timers and prevents new data from being tracked.
* Once `stop` is called, `start` must be called to resume stats tracking.
*/
stop () {
this._running = false
this._globalStats.stop()
for (const stats of this._peerStats.values()) {
stats.stop()
}
for (const stats of this._protocolStats.values()) {
stats.stop()
}
}
/**
* Gets the global `Stats` object
* @returns {Stats}
*/
get global () {
return this._globalStats
}
/**
* Returns a list of `PeerId` strings currently being tracked
* @returns {Array<string>}
*/
get peers () {
return Array.from(this._peerStats.keys())
}
/**
* Returns the `Stats` object for the given `PeerId` whether it
* is a live peer, or in the disconnected peer LRU cache.
* @param {PeerId} peerId
* @returns {Stats}
*/
forPeer (peerId) {
const idString = peerId.toString()
return this._peerStats.get(idString) || this._oldPeers.get(idString)
}
/**
* Returns a list of all protocol strings currently being tracked.
* @returns {Array<string>}
*/
get protocols () {
return Array.from(this._protocolStats.keys())
}
/**
* Returns the `Stats` object for the given `protocol`.
* @param {string} protocol
* @returns {Stats}
*/
forProtocol (protocol) {
return this._protocolStats.get(protocol)
}
/**
* Should be called when all connections to a given peer
* have closed. The `Stats` collection for the peer will
* be stopped and moved to an LRU for temporary retention.
* @param {PeerId} peerId
*/
onPeerDisconnected (peerId) {
const idString = peerId.toString()
const peerStats = this._peerStats.get(idString)
if (peerStats) {
peerStats.stop()
this._peerStats.delete(idString)
this._oldPeers.set(idString, peerStats)
}
}
/**
* Takes the metadata for a message and tracks it in the
* appropriate categories. If the protocol is present, protocol
* stats will also be tracked.
*
* @private
* @param {object} params
* @param {PeerId} params.remotePeer Remote peer
* @param {string} [params.protocol] Protocol string the stream is running
* @param {string} params.direction One of ['in','out']
* @param {number} params.dataLength Size of the message
* @returns {void}
*/
_onMessage ({ remotePeer, protocol, direction, dataLength }) {
if (!this._running) return
const key = directionToEvent[direction]
let peerStats = this.forPeer(remotePeer)
if (!peerStats) {
peerStats = new Stats(initialCounters, this._options)
this._peerStats.set(remotePeer.toString(), peerStats)
}
// Peer and global stats
peerStats.push(key, dataLength)
this._globalStats.push(key, dataLength)
// Protocol specific stats
if (protocol) {
let protocolStats = this.forProtocol(protocol)
if (!protocolStats) {
protocolStats = new Stats(initialCounters, this._options)
this._protocolStats.set(protocol, protocolStats)
}
protocolStats.push(key, dataLength)
}
}
/**
* Replaces the `PeerId` string with the given `peerId`.
* If stats are already being tracked for the given `peerId`, the
* placeholder stats will be merged with the existing stats.
* @param {string} placeholder A peerId string
* @param {PeerId} peerId
*/
updatePlaceholder (placeholder, peerId) {
if (!this._running) return
const placeholderStats = this.forPeer(placeholder)
const peerIdString = peerId.toString()
const existingStats = this.forPeer(peerId)
let mergedStats = placeholderStats
// If we already have stats, merge the two
if (existingStats) {
// If existing, merge
mergedStats = Metrics.mergeStats(existingStats, mergedStats)
// Attempt to delete from the old peers list just in case it was tracked there
this._oldPeers.delete(peerIdString)
}
this._peerStats.delete(placeholder.toString())
this._peerStats.set(peerIdString, mergedStats)
mergedStats.start()
}
/**
* Tracks data running through a given Duplex Iterable `stream`. If
* the `peerId` is not provided, a placeholder string will be created and
* returned. This allows lazy tracking of a peer when the peer is not yet known.
* When the `PeerId` is known, `Metrics.updatePlaceholder` should be called
* with the placeholder string returned from here, and the known `PeerId`.
*
* @param {Object} options
* @param {{ sink: function(*), source: function() }} options.stream A duplex iterable stream
* @param {PeerId} [options.peerId] The id of the remote peer that's connected
* @param {string} [options.protocol] The protocol the stream is running
* @returns {string} The peerId string or placeholder string
*/
trackStream ({ stream, remotePeer, protocol }) {
const metrics = this
const _source = stream.source
stream.source = tap(chunk => metrics._onMessage({
remotePeer,
protocol,
direction: 'in',
dataLength: chunk.length
}))(_source)
const _sink = stream.sink
stream.sink = source => {
pipe(
source,
tap(chunk => metrics._onMessage({
remotePeer,
protocol,
direction: 'out',
dataLength: chunk.length
})),
_sink
)
}
return stream
}
/**
* Merges `other` into `target`. `target` will be modified
* and returned.
* @param {Stats} target
* @param {Stats} other
* @returns {Stats}
*/
static mergeStats (target, other) {
target.stop()
other.stop()
// Merge queues
target._queue = [...target._queue, ...other._queue]
// TODO: how to merge moving averages?
return target
}
}
module.exports = Metrics

View File

@ -83,6 +83,31 @@ class Stats extends EventEmitter {
return Object.assign({}, this._movingAverages)
}
/**
* Returns a plain JSON object of the stats
*
* @returns {*}
*/
toJSON () {
const snapshot = this.snapshot
const movingAverages = this.movingAverages
const data = {
dataReceived: snapshot.dataReceived.toString(),
dataSent: snapshot.dataSent.toString(),
movingAverages: {}
}
const counters = Object.keys(movingAverages)
for (const key of counters) {
data.movingAverages[key] = {}
for (const interval of Object.keys(movingAverages[key])) {
data.movingAverages[key][interval] = movingAverages[key][interval].movingAverage()
}
}
return data
}
/**
* Pushes the given operation data to the queue, along with the
* current Timestamp, then resets the update timer.
@ -151,7 +176,7 @@ class Stats extends EventEmitter {
}
/**
* For each key in the stats, the frequncy and moving averages
* For each key in the stats, the frequency and moving averages
* will be updated via Stats._updateFrequencyFor based on the time
* difference between calls to this method.
*

View File

@ -100,6 +100,7 @@ class Registrar {
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id === connection.id)
this.connections.set(id, storedConn)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)

View File

@ -1,150 +0,0 @@
'use strict'
const EventEmitter = require('events')
const Stat = require('./stat')
const OldPeers = require('./old-peers')
const defaultOptions = {
computeThrottleMaxQueueSize: 1000,
computeThrottleTimeout: 2000,
movingAverageIntervals: [
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
],
maxOldPeersRetention: 50
}
const initialCounters = [
'dataReceived',
'dataSent'
]
const directionToEvent = {
in: 'dataReceived',
out: 'dataSent'
}
/**
* Binds to message events on the given `observer` to generate stats
* based on the Peer, Protocol and Transport used for the message. Stat
* events will be emitted via the `update` event.
*
* @param {Observer} observer
* @param {any} _options
* @returns {Stats}
*/
module.exports = (observer, _options) => {
const options = Object.assign({}, defaultOptions, _options)
const globalStats = new Stat(initialCounters, options)
const stats = Object.assign(new EventEmitter(), {
start: start,
stop: stop,
global: globalStats,
peers: () => Array.from(peerStats.keys()),
forPeer: (peerId) => {
return peerStats.get(peerId) || oldPeers.get(peerId)
},
transports: () => Array.from(transportStats.keys()),
forTransport: (transport) => transportStats.get(transport),
protocols: () => Array.from(protocolStats.keys()),
forProtocol: (protocol) => protocolStats.get(protocol)
})
globalStats.on('update', propagateChange)
const oldPeers = OldPeers(options.maxOldPeersRetention)
const peerStats = new Map()
const transportStats = new Map()
const protocolStats = new Map()
observer.on('peer:closed', (peerId) => {
const peer = peerStats.get(peerId)
if (peer) {
peer.removeListener('update', propagateChange)
peer.stop()
peerStats.delete(peerId)
oldPeers.set(peerId, peer)
}
})
return stats
function onMessage (peerId, transportTag, protocolTag, direction, bufferLength) {
const event = directionToEvent[direction]
if (transportTag) {
// because it has a transport tag, this message is at the global level, so we account this
// traffic as global.
globalStats.push(event, bufferLength)
// peer stats
let peer = peerStats.get(peerId)
if (!peer) {
peer = oldPeers.get(peerId)
if (peer) {
oldPeers.delete(peerId)
} else {
peer = new Stat(initialCounters, options)
}
peer.on('update', propagateChange)
peer.start()
peerStats.set(peerId, peer)
}
peer.push(event, bufferLength)
}
// transport stats
if (transportTag) {
let transport = transportStats.get(transportTag)
if (!transport) {
transport = new Stat(initialCounters, options)
transport.on('update', propagateChange)
transportStats.set(transportTag, transport)
}
transport.push(event, bufferLength)
}
// protocol stats
if (protocolTag) {
let protocol = protocolStats.get(protocolTag)
if (!protocol) {
protocol = new Stat(initialCounters, options)
protocol.on('update', propagateChange)
protocolStats.set(protocolTag, protocol)
}
protocol.push(event, bufferLength)
}
}
function start () {
observer.on('message', onMessage)
globalStats.start()
for (const peerStat of peerStats.values()) {
peerStat.start()
}
for (const transportStat of transportStats.values()) {
transportStat.start()
}
}
function stop () {
observer.removeListener('message', onMessage)
globalStats.stop()
for (const peerStat of peerStats.values()) {
peerStat.stop()
}
for (const transportStat of transportStats.values()) {
transportStat.stop()
}
}
function propagateChange () {
stats.emit('update')
}
}

View File

@ -8,6 +8,7 @@ const { Connection } = require('libp2p-interfaces/src/connection')
const PeerId = require('peer-id')
const pipe = require('it-pipe')
const errCode = require('err-code')
const mutableProxy = require('mutable-proxy')
const { codes } = require('./errors')
@ -30,6 +31,7 @@ class Upgrader {
/**
* @param {object} options
* @param {PeerId} options.localPeer
* @param {Metrics} options.metrics
* @param {Map<string, Crypto>} options.cryptos
* @param {Map<string, Muxer>} options.muxers
* @param {function(Connection)} options.onConnection Called when a connection is upgraded
@ -37,12 +39,14 @@ class Upgrader {
*/
constructor ({
localPeer,
metrics,
cryptos,
muxers,
onConnectionEnd = () => {},
onConnection = () => {}
}) {
this.localPeer = localPeer
this.metrics = metrics
this.cryptos = cryptos || new Map()
this.muxers = muxers || new Map()
this.protector = null
@ -63,6 +67,15 @@ class Upgrader {
let muxedConnection
let Muxer
let cryptoProtocol
let setPeer
let proxyPeer
if (this.metrics) {
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
setPeer({ toString: () => idString })
maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer })
}
log('Starting the inbound connection upgrade')
@ -89,6 +102,11 @@ class Upgrader {
throw err
}
if (this.metrics) {
this.metrics.updatePlaceholder(proxyPeer, remotePeer)
setPeer(remotePeer)
}
log('Successfully upgraded inbound connection')
return this._createConnection({
@ -120,6 +138,15 @@ class Upgrader {
let muxedConnection
let cryptoProtocol
let Muxer
let setPeer
let proxyPeer
if (this.metrics) {
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
setPeer({ toString: () => idString })
maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer })
}
log('Starting the outbound connection upgrade')
@ -145,6 +172,11 @@ class Upgrader {
throw err
}
if (this.metrics) {
this.metrics.updatePlaceholder(proxyPeer, remotePeer)
setPeer(remotePeer)
}
log('Successfully upgraded outbound connection')
return this._createConnection({
@ -185,6 +217,7 @@ class Upgrader {
try {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol })
} catch (err) {
@ -203,6 +236,7 @@ class Upgrader {
const mss = new Multistream.Dialer(muxedStream)
try {
const { stream, protocol } = await mss.select(protocols)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err) {
log.error('could not create new stream', err)

View File

@ -29,7 +29,7 @@ describe('content-routing', () => {
it('.findProviders should return an error', async () => {
try {
for await (const _ of node.contentRouting.findProviders('a cid')) {} // eslint-disable-line no-unused-vars
for await (const _ of node.contentRouting.findProviders('a cid')) {} // eslint-disable-line
throw new Error('.findProviders should return an error')
} catch (err) {
expect(err).to.exist()
@ -222,7 +222,7 @@ describe('content-routing', () => {
])
try {
for await (const _ of node.contentRouting.findProviders(cid)) { } // eslint-disable-line no-unused-vars
for await (const _ of node.contentRouting.findProviders(cid)) { } // eslint-disable-line
throw new Error('should handle errors when finding providers')
} catch (err) {
expect(err).to.exist()

View File

@ -82,6 +82,7 @@ describe('Dialing (via relay, TCP)', () => {
echoStream,
collect
)
expect(output.slice()).to.eql(input)
})

127
test/metrics/index.node.js Normal file
View File

@ -0,0 +1,127 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const { randomBytes } = require('libp2p-crypto')
const pipe = require('it-pipe')
const concat = require('it-concat')
const delay = require('delay')
const { createPeer } = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options')
describe('libp2p.metrics', () => {
let libp2p
afterEach(async () => {
libp2p && await libp2p.stop()
})
it('should disable metrics by default', async () => {
[libp2p] = await createPeer({
config: {
modules: baseOptions.modules
}
})
expect(libp2p.metrics).to.not.exist()
})
it('should start/stop metrics on startup/shutdown when enabled', async () => {
const config = { ...baseOptions }
config.metrics = {
enabled: true
}
;[libp2p] = await createPeer({ started: false, config })
expect(libp2p.metrics).to.exist()
sinon.spy(libp2p.metrics, 'start')
sinon.spy(libp2p.metrics, 'stop')
await libp2p.start()
expect(libp2p.metrics.start).to.have.property('callCount', 1)
await libp2p.stop()
expect(libp2p.metrics.stop).to.have.property('callCount', 1)
})
it('should record metrics on connections and streams when enabled', async () => {
const config = { ...baseOptions }
config.metrics = {
enabled: true,
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10]
}
let remoteLibp2p
;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config })
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
const connection = await libp2p.dial(remoteLibp2p.peerInfo)
const { stream } = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
const result = await pipe(
[bytes],
stream,
concat
)
// Flush the call stack
await delay(0)
expect(result).to.have.length(bytes.length)
// Protocol stats should equal the echo size
const protocolStats = libp2p.metrics.forProtocol('/echo/1.0.0').toJSON()
expect(Number(protocolStats.dataReceived)).to.equal(bytes.length)
expect(Number(protocolStats.dataSent)).to.equal(bytes.length)
// A lot more traffic will be sent over the wire for the peer
const peerStats = libp2p.metrics.forPeer(connection.remotePeer).toJSON()
expect(Number(peerStats.dataReceived)).to.be.at.least(bytes.length)
await remoteLibp2p.stop()
})
it('should move disconnected peers to the old peers list', async () => {
const config = { ...baseOptions }
config.metrics = {
enabled: true,
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10]
}
let remoteLibp2p
;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config })
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
const connection = await libp2p.dial(remoteLibp2p.peerInfo)
const { stream } = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
await pipe(
[bytes],
stream,
concat
)
sinon.spy(libp2p.metrics, 'onPeerDisconnected')
await libp2p.hangUp(connection.remotePeer)
// Flush call stack
await delay(0)
expect(libp2p.metrics.onPeerDisconnected).to.have.property('callCount', 1)
expect(libp2p.metrics.peers).to.have.length(0)
// forPeer should still give us the old peer stats,
// even though its not in the active peer list
const peerStats = libp2p.metrics.forPeer(connection.remotePeer).toJSON()
expect(Number(peerStats.dataReceived)).to.be.at.least(bytes.length)
await remoteLibp2p.stop()
})
})

259
test/metrics/index.spec.js Normal file
View File

@ -0,0 +1,259 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const { randomBytes } = require('libp2p-crypto')
const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe')
const concat = require('it-concat')
const pushable = require('it-pushable')
const { consume } = require('streaming-iterables')
const delay = require('delay')
const Metrics = require('../../src/metrics')
const Stats = require('../../src/metrics/stats')
const { createPeerId } = require('../utils/creators/peer')
describe('Metrics', () => {
let peerId
let peerId2
before(async () => {
[peerId, peerId2] = await createPeerId({ number: 2 })
})
afterEach(() => {
sinon.restore()
})
it('should not track data if not started', async () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
})
metrics.trackStream({
stream: local,
remotePeer: peerId
})
// Echo back
pipe(remote, remote)
const bytes = randomBytes(1024)
const results = await pipe(
[bytes],
local,
concat
)
// Flush the call stack
await delay(0)
expect(results.length).to.eql(bytes.length)
expect(metrics.forPeer(peerId)).to.equal(undefined)
expect(metrics.peers).to.eql([])
const globalStats = metrics.global
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(0)
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(0)
})
it('should be able to track a duplex stream', async () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
})
metrics.trackStream({
stream: local,
remotePeer: peerId
})
metrics.start()
// Echo back
pipe(remote, remote)
const bytes = randomBytes(1024)
const input = (async function * () {
let i = 0
while (i < 10) {
await delay(10)
yield bytes
i++
}
})()
const results = await pipe(
input,
local,
concat
)
// Flush the call stack
await delay(0)
expect(results.length).to.eql(bytes.length * 10)
const stats = metrics.forPeer(peerId)
expect(metrics.peers).to.eql([peerId.toString()])
expect(stats.snapshot.dataReceived.toNumber()).to.equal(results.length)
expect(stats.snapshot.dataSent.toNumber()).to.equal(results.length)
const globalStats = metrics.global
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(results.length)
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(results.length)
})
it('should properly track global stats', async () => {
const [local, remote] = duplexPair()
const [local2, remote2] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
})
const protocol = '/echo/1.0.0'
metrics.start()
// Echo back remotes
pipe(remote, remote)
pipe(remote2, remote2)
metrics.trackStream({
stream: local,
remotePeer: peerId,
protocol
})
metrics.trackStream({
stream: local2,
remotePeer: peerId2,
protocol
})
const bytes = randomBytes(1024)
await Promise.all([
pipe([bytes], local, consume),
pipe([bytes], local2, consume)
])
// Flush the call stack
await delay(0)
expect(metrics.peers).to.eql([peerId.toString(), peerId2.toString()])
// Verify global metrics
const globalStats = metrics.global
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
// Verify individual metrics
for (const peer of [peerId, peerId2]) {
const stats = metrics.forPeer(peer)
expect(stats.snapshot.dataReceived.toNumber()).to.equal(bytes.length)
expect(stats.snapshot.dataSent.toNumber()).to.equal(bytes.length)
}
// Verify protocol metrics
const protocolStats = metrics.forProtocol(protocol)
expect(metrics.protocols).to.eql([protocol])
expect(protocolStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
expect(protocolStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
})
it('should be able to replace an existing peer', async () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
})
metrics.start()
// Echo back remotes
pipe(remote, remote)
const mockPeer = {
toString: () => 'a temporary id'
}
metrics.trackStream({
stream: local,
remotePeer: mockPeer
})
const bytes = randomBytes(1024)
const input = pushable()
const deferredPromise = pipe(input, local, consume)
input.push(bytes)
await delay(0)
metrics.updatePlaceholder(mockPeer.toString(), peerId)
mockPeer.toString = peerId.toString.bind(peerId)
input.push(bytes)
input.end()
await deferredPromise
await delay(0)
expect(metrics.peers).to.eql([peerId.toString()])
// Verify global metrics
const globalStats = metrics.global
expect(globalStats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
expect(globalStats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
// Verify individual metrics
const stats = metrics.forPeer(peerId)
expect(stats.snapshot.dataReceived.toNumber()).to.equal(bytes.length * 2)
expect(stats.snapshot.dataSent.toNumber()).to.equal(bytes.length * 2)
})
it('should only keep track of a set number of disconnected peers', () => {
const spies = []
const trackedPeers = new Map([...new Array(50)].map((_, index) => {
const stat = new Stats([], { movingAverageIntervals: [] })
spies.push(sinon.spy(stat, 'stop'))
return [String(index), stat]
}))
const metrics = new Metrics({
maxOldPeersRetention: 5 // Only keep track of 5
})
// Clone so trackedPeers isn't modified
metrics._peerStats = new Map(trackedPeers)
// Disconnect every peer
for (const id of trackedPeers.keys()) {
metrics.onPeerDisconnected({
toString: () => id
})
}
// Verify only the last 5 have been retained
expect(metrics.peers).to.have.length(0)
const retainedPeers = []
for (const id of trackedPeers.keys()) {
const stat = metrics.forPeer(id)
if (stat) retainedPeers.push(id)
}
expect(retainedPeers).to.eql(['45', '46', '47', '48', '49'])
// Verify all stats were stopped
expect(spies).to.have.length(50)
for (const spy of spies) {
expect(spy).to.have.property('callCount', 1)
}
})
})

View File

@ -18,7 +18,7 @@ const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
* @param {Object} [properties.config]
* @param {number} [properties.number] number of peers (default: 1).
* @param {boolean} [properties.fixture] use fixture for peer-id generation (default: true)
* @param {boolean} [properties.started] nodes should start (defaul: true)
* @param {boolean} [properties.started] nodes should start (default: true)
* @return {Promise<Array<Libp2p>>}
*/
async function createPeer ({ number = 1, fixture = true, started = true, config = defaultOptions } = {}) {