refactor: connection manager (#511)

* refactor: initial refactor of the connection manager

* fix: start/stop issues

* fix: add tests and resolve pruning issues

* chore: fix lint

* test: move conn manager tests to node only for now

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>

* fix: assert min max connection options

* test: fix assertion check for browser

* docs: add api and config docs for conn manager
This commit is contained in:
Jacob Heun
2019-12-12 10:19:57 +01:00
parent f1eb373235
commit 14a1955a78
9 changed files with 370 additions and 252 deletions

View File

@ -1,12 +1,14 @@
'use strict'
const EventEmitter = require('events')
const assert = require('assert')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('latency-monitor').default
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')
const defaultOptions = {
maxPeers: Infinity,
minPeers: 0,
maxConnections: Infinity,
minConnections: 0,
maxData: Infinity,
maxSentData: Infinity,
maxReceivedData: Infinity,
@ -16,170 +18,171 @@ const defaultOptions = {
defaultPeerValue: 1
}
class ConnectionManager extends EventEmitter {
class ConnectionManager {
/**
* @constructor
* @param {Libp2p} libp2p
* @param {object} options
* @param {Number} options.maxConnections The maximum number of connections allowed. Default=Infinity
* @param {Number} options.minConnections The minimum number of connections to avoid pruning. Default=0
* @param {Number} options.maxData The max data (in and out), per average interval to allow. Default=Infinity
* @param {Number} options.maxSentData The max outgoing data, per average interval to allow. Default=Infinity
* @param {Number} options.maxReceivedData The max incoming data, per average interval to allow.. Default=Infinity
* @param {Number} options.maxEventLoopDelay The upper limit the event loop can take to run. Default=Infinity
* @param {Number} options.pollInterval How often, in milliseconds, metrics and latency should be checked. Default=2000
* @param {Number} options.movingAverageInterval How often, in milliseconds, to compute averages. Default=60000
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
*/
constructor (libp2p, options) {
super()
this._libp2p = libp2p
this._options = Object.assign({}, defaultOptions, options)
this._options.maxPeersPerProtocol = fixMaxPeersPerProtocol(this._options.maxPeersPerProtocol)
this._registrar = libp2p.registrar
this._peerId = libp2p.peerInfo.id.toString()
this._options = mergeOptions(defaultOptions, options)
assert(
this._options.maxConnections > this._options.minConnections,
'Connection Manager maxConnections must be greater than minConnections'
)
debug('options: %j', this._options)
this._stats = libp2p.stats
if (options && !this._stats) {
throw new Error('No libp2p.stats')
}
this._metrics = libp2p.metrics
this._peerValues = new Map()
this._peers = new Map()
this._peerProtocols = new Map()
this._peerCountPerProtocol = new Map()
this._onStatsUpdate = this._onStatsUpdate.bind(this)
this._onPeerConnect = this._onPeerConnect.bind(this)
this._onPeerDisconnect = this._onPeerDisconnect.bind(this)
if (this._libp2p.isStarted()) {
this._onceStarted()
} else {
this._libp2p.once('start', this._onceStarted.bind(this))
}
this._connections = new Map()
this._timer = null
this._checkMetrics = this._checkMetrics.bind(this)
}
/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
*/
start () {
this._stats.on('update', this._onStatsUpdate)
this._libp2p.on('connection:start', this._onPeerConnect)
this._libp2p.on('connection:end', this._onPeerDisconnect)
if (this._metrics) {
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
}
// latency monitor
this._latencyMonitor = new LatencyMonitor({
latencyCheckIntervalMs: this._options.pollInterval,
dataEmitIntervalMs: this._options.pollInterval
})
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
debug('started')
}
/**
* Stops the Connection Manager
*/
stop () {
this._stats.removeListener('update', this._onStatsUpdate)
this._libp2p.removeListener('connection:start', this._onPeerConnect)
this._libp2p.removeListener('connection:end', this._onPeerDisconnect)
this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
debug('stopped')
}
/**
* Sets the value of the given peer. Peers with lower values
* will be disconnected first.
* @param {PeerId} peerId
* @param {number} value A number between 0 and 1
*/
setPeerValue (peerId, value) {
if (value < 0 || value > 1) {
throw new Error('value should be a number between 0 and 1')
}
if (peerId.toB58String) {
peerId = peerId.toB58String()
if (peerId.toString) {
peerId = peerId.toString()
}
this._peerValues.set(peerId, value)
}
_onceStarted () {
this._peerId = this._libp2p.peerInfo.id.toB58String()
}
_onStatsUpdate () {
const movingAvgs = this._stats.global.movingAverages
const received = movingAvgs.dataReceived[this._options.movingAverageInterval].movingAverage()
/**
* Checks the libp2p metrics to determine if any values have exceeded
* the configured maximums.
* @private
*/
_checkMetrics () {
const movingAverages = this._metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
const sent = movingAvgs.dataSent[this._options.movingAverageInterval].movingAverage()
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxSentData', sent)
const total = received + sent
this._checkLimit('maxData', total)
debug('stats update', total)
debug('metrics update', total)
this._timer.reschedule(this._options.pollInterval)
}
_onPeerConnect (peerInfo) {
const peerId = peerInfo.id.toB58String()
debug('%s: connected to %s', this._peerId, peerId)
this._peerValues.set(peerId, this._options.defaultPeerValue)
this._peers.set(peerId, peerInfo)
this.emit('connected', peerId)
this._checkLimit('maxPeers', this._peers.size)
protocolsFromPeerInfo(peerInfo).forEach((protocolTag) => {
const protocol = this._peerCountPerProtocol[protocolTag]
if (!protocol) {
this._peerCountPerProtocol[protocolTag] = 0
}
this._peerCountPerProtocol[protocolTag]++
let peerProtocols = this._peerProtocols[peerId]
if (!peerProtocols) {
peerProtocols = this._peerProtocols[peerId] = new Set()
}
peerProtocols.add(protocolTag)
this._checkProtocolMaxPeersLimit(protocolTag, this._peerCountPerProtocol[protocolTag])
})
}
_onPeerDisconnect (peerInfo) {
const peerId = peerInfo.id.toB58String()
debug('%s: disconnected from %s', this._peerId, peerId)
this._peerValues.delete(peerId)
this._peers.delete(peerId)
const peerProtocols = this._peerProtocols[peerId]
if (peerProtocols) {
Array.from(peerProtocols).forEach((protocolTag) => {
const peerCountForProtocol = this._peerCountPerProtocol[protocolTag]
if (peerCountForProtocol) {
this._peerCountPerProtocol[protocolTag]--
}
})
/**
* Tracks the incoming connection and check the connection limit
* @param {Connection} connection
*/
onConnect (connection) {
const peerId = connection.remotePeer.toString()
this._connections.set(connection.id, connection)
if (!this._peerValues.has(peerId)) {
this._peerValues.set(peerId, this._options.defaultPeerValue)
}
this.emit('disconnected', peerId)
this._checkLimit('maxConnections', this._connections.size)
}
/**
* Removes the connection from tracking
* @param {Connection} connection
*/
onDisconnect (connection) {
this._connections.delete(connection.id)
this._peerValues.delete(connection.remotePeer.toString())
}
/**
* If the event loop is slow, maybe close a connection
* @private
* @param {*} summary The LatencyMonitor summary
*/
_onLatencyMeasure (summary) {
this._checkLimit('maxEventLoopDelay', summary.avgMs)
}
/**
* If the `value` of `name` has exceeded its limit, maybe close a connection
* @private
* @param {string} name The name of the field to check limits for
* @param {number} value The current value of the field
*/
_checkLimit (name, value) {
const limit = this._options[name]
debug('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
this.emit('limit:exceeded', name, value)
this._maybeDisconnectOne()
}
}
_checkProtocolMaxPeersLimit (protocolTag, value) {
debug('checking protocol limit. current value of %s is %d', protocolTag, value)
const limit = this._options.maxPeersPerProtocol[protocolTag]
if (value > limit) {
debug('%s: protocol max peers limit exceeded: %s, %d', this._peerId, protocolTag, value)
this.emit('limit:exceeded', protocolTag, value)
this._maybeDisconnectOne()
}
}
/**
* If we have more connections than our maximum, close a connection
* to the lowest valued peer.
* @private
*/
_maybeDisconnectOne () {
if (this._options.minPeers < this._peerValues.size) {
if (this._options.minConnections < this._connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: forcing disconnection from %j', this._peerId, peerId)
this._disconnectPeer(peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
for (const connection of this._connections.values()) {
if (connection.remotePeer.toString() === peerId) {
connection.close()
break
}
}
}
}
}
_disconnectPeer (peerId) {
debug('preemptively disconnecting peer', peerId)
this.emit('%s: disconnect:preemptive', this._peerId, peerId)
const peer = this._peers.get(peerId)
this._libp2p.hangUp(peer, (err) => {
if (err) {
this.emit('error', err)
}
})
}
}
module.exports = ConnectionManager
@ -187,32 +190,3 @@ module.exports = ConnectionManager
function byPeerValue (peerValueEntryA, peerValueEntryB) {
return peerValueEntryA[1] - peerValueEntryB[1]
}
function fixMaxPeersPerProtocol (maxPeersPerProtocol) {
if (!maxPeersPerProtocol) {
maxPeersPerProtocol = {}
}
Object.keys(maxPeersPerProtocol).forEach((transportTag) => {
const max = maxPeersPerProtocol[transportTag]
delete maxPeersPerProtocol[transportTag]
maxPeersPerProtocol[transportTag.toLowerCase()] = max
})
return maxPeersPerProtocol
}
function protocolsFromPeerInfo (peerInfo) {
const protocolTags = new Set()
peerInfo.multiaddrs.forEach((multiaddr) => {
multiaddr.protos().map(protocolToProtocolTag).forEach((protocolTag) => {
protocolTags.add(protocolTag)
})
})
return Array.from(protocolTags)
}
function protocolToProtocolTag (protocol) {
return protocol.name.toLowerCase()
}