mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-21 13:01:33 +00:00
refactor: add core modules to libp2p (#400)
* refactor: add js-libp2p-connection-manager to repo Co-authored-by: David Dias <daviddias.p@gmail.com> Co-authored-by: Jacob Heun <jacobheun@gmail.com> Co-authored-by: Pedro Teixeira <i@pgte.me> Co-authored-by: Vasco Santos <vasco.santos@ua.pt> * test(conn-mgr): only run in node * refactor: add js-libp2p-identify to repo Co-authored-by: David Dias <daviddias.p@gmail.com> Co-authored-by: Friedel Ziegelmayer <dignifiedquire@gmail.com> Co-authored-by: Hugo Dias <hugomrdias@gmail.com> Co-authored-by: Jacob Heun <jacobheun@gmail.com> Co-authored-by: Maciej Krüger <mkg20001@gmail.com> Co-authored-by: Richard Littauer <richard.littauer@gmail.com> Co-authored-by: Vasco Santos <vasco.santos@moxy.studio> Co-authored-by: Yusef Napora <yusef@protocol.ai> Co-authored-by: ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com> * refactor: add libp2p-pnet to repo Co-authored-by: Jacob Heun <jacobheun@gmail.com> Co-authored-by: Vasco Santos <vasco.santos@moxy.studio> * refactor: add libp2p-ping to repo Co-authored-by: David Dias <daviddias.p@gmail.com> Co-authored-by: Francisco Baio Dias <xicombd@gmail.com> Co-authored-by: Friedel Ziegelmayer <dignifiedquire@gmail.com> Co-authored-by: Hugo Dias <mail@hugodias.me> Co-authored-by: Jacob Heun <jacobheun@gmail.com> Co-authored-by: João Antunes <j.goncalo.antunes@gmail.com> Co-authored-by: Richard Littauer <richard.littauer@gmail.com> Co-authored-by: Vasco Santos <vasco.santos@moxy.studio> Co-authored-by: Vasco Santos <vasco.santos@ua.pt> Co-authored-by: ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com> * refactor: add libp2p-circuit to repo Co-authored-by: David Dias <daviddias.p@gmail.com> Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com> Co-authored-by: Friedel Ziegelmayer <dignifiedquire@gmail.com> Co-authored-by: Hugo Dias <mail@hugodias.me> Co-authored-by: Jacob Heun <jacobheun@gmail.com> Co-authored-by: Maciej Krüger <mkg20001@gmail.com> Co-authored-by: Oli Evans <oli@tableflip.io> Co-authored-by: Pedro Teixeira <i@pgte.me> Co-authored-by: Vasco Santos <vasco.santos@ua.pt> Co-authored-by: Victor Bjelkholm <victorbjelkholm@gmail.com> Co-authored-by: Yusef Napora <yusef@napora.org> Co-authored-by: dirkmc <dirk@mccormick.cx> * test(switch): avoid using instanceof * chore(switch): update bignumber dep * refactor(circuit): clean up tests * refactor(switch): consolidate get peer utils * test(identify): do deep checks of addresses * test(identify): bump timeout for identify test * test(switch): tidy up limit dialer test * refactor(switch): remove redundant circuit tests * chore: add coverage script * refactor(circuit): consolidate get peer info * docs: reference original repositories in each sub readme * docs: fix comment * refactor: clean up sub package.json files and readmes
This commit is contained in:
218
src/connection-manager/index.js
Normal file
218
src/connection-manager/index.js
Normal file
@ -0,0 +1,218 @@
|
||||
'use strict'
|
||||
|
||||
const EventEmitter = require('events')
|
||||
const LatencyMonitor = require('latency-monitor').default
|
||||
const debug = require('debug')('libp2p:connection-manager')
|
||||
|
||||
const defaultOptions = {
|
||||
maxPeers: Infinity,
|
||||
minPeers: 0,
|
||||
maxData: Infinity,
|
||||
maxSentData: Infinity,
|
||||
maxReceivedData: Infinity,
|
||||
maxEventLoopDelay: Infinity,
|
||||
pollInterval: 2000,
|
||||
movingAverageInterval: 60000,
|
||||
defaultPeerValue: 1
|
||||
}
|
||||
|
||||
class ConnectionManager extends EventEmitter {
|
||||
constructor (libp2p, options) {
|
||||
super()
|
||||
this._libp2p = libp2p
|
||||
this._options = Object.assign({}, defaultOptions, options)
|
||||
this._options.maxPeersPerProtocol = fixMaxPeersPerProtocol(this._options.maxPeersPerProtocol)
|
||||
|
||||
debug('options: %j', this._options)
|
||||
|
||||
this._stats = libp2p.stats
|
||||
if (options && !this._stats) {
|
||||
throw new Error('No libp2p.stats')
|
||||
}
|
||||
|
||||
this._peerValues = new Map()
|
||||
this._peers = new Map()
|
||||
this._peerProtocols = new Map()
|
||||
this._peerCountPerProtocol = new Map()
|
||||
this._onStatsUpdate = this._onStatsUpdate.bind(this)
|
||||
this._onPeerConnect = this._onPeerConnect.bind(this)
|
||||
this._onPeerDisconnect = this._onPeerDisconnect.bind(this)
|
||||
|
||||
if (this._libp2p.isStarted()) {
|
||||
this._onceStarted()
|
||||
} else {
|
||||
this._libp2p.once('start', this._onceStarted.bind(this))
|
||||
}
|
||||
}
|
||||
|
||||
start () {
|
||||
this._stats.on('update', this._onStatsUpdate)
|
||||
this._libp2p.on('connection:start', this._onPeerConnect)
|
||||
this._libp2p.on('connection:end', this._onPeerDisconnect)
|
||||
// latency monitor
|
||||
this._latencyMonitor = new LatencyMonitor({
|
||||
dataEmitIntervalMs: this._options.pollInterval
|
||||
})
|
||||
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
|
||||
this._latencyMonitor.on('data', this._onLatencyMeasure)
|
||||
}
|
||||
|
||||
stop () {
|
||||
this._stats.removeListener('update', this._onStatsUpdate)
|
||||
this._libp2p.removeListener('connection:start', this._onPeerConnect)
|
||||
this._libp2p.removeListener('connection:end', this._onPeerDisconnect)
|
||||
this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
|
||||
}
|
||||
|
||||
setPeerValue (peerId, value) {
|
||||
if (value < 0 || value > 1) {
|
||||
throw new Error('value should be a number between 0 and 1')
|
||||
}
|
||||
if (peerId.toB58String) {
|
||||
peerId = peerId.toB58String()
|
||||
}
|
||||
this._peerValues.set(peerId, value)
|
||||
}
|
||||
|
||||
_onceStarted () {
|
||||
this._peerId = this._libp2p.peerInfo.id.toB58String()
|
||||
}
|
||||
|
||||
_onStatsUpdate () {
|
||||
const movingAvgs = this._stats.global.movingAverages
|
||||
const received = movingAvgs.dataReceived[this._options.movingAverageInterval].movingAverage()
|
||||
this._checkLimit('maxReceivedData', received)
|
||||
const sent = movingAvgs.dataSent[this._options.movingAverageInterval].movingAverage()
|
||||
this._checkLimit('maxSentData', sent)
|
||||
const total = received + sent
|
||||
this._checkLimit('maxData', total)
|
||||
debug('stats update', total)
|
||||
}
|
||||
|
||||
_onPeerConnect (peerInfo) {
|
||||
const peerId = peerInfo.id.toB58String()
|
||||
debug('%s: connected to %s', this._peerId, peerId)
|
||||
this._peerValues.set(peerId, this._options.defaultPeerValue)
|
||||
this._peers.set(peerId, peerInfo)
|
||||
this.emit('connected', peerId)
|
||||
this._checkLimit('maxPeers', this._peers.size)
|
||||
|
||||
protocolsFromPeerInfo(peerInfo).forEach((protocolTag) => {
|
||||
const protocol = this._peerCountPerProtocol[protocolTag]
|
||||
if (!protocol) {
|
||||
this._peerCountPerProtocol[protocolTag] = 0
|
||||
}
|
||||
this._peerCountPerProtocol[protocolTag]++
|
||||
|
||||
let peerProtocols = this._peerProtocols[peerId]
|
||||
if (!peerProtocols) {
|
||||
peerProtocols = this._peerProtocols[peerId] = new Set()
|
||||
}
|
||||
peerProtocols.add(protocolTag)
|
||||
this._checkProtocolMaxPeersLimit(protocolTag, this._peerCountPerProtocol[protocolTag])
|
||||
})
|
||||
}
|
||||
|
||||
_onPeerDisconnect (peerInfo) {
|
||||
const peerId = peerInfo.id.toB58String()
|
||||
debug('%s: disconnected from %s', this._peerId, peerId)
|
||||
this._peerValues.delete(peerId)
|
||||
this._peers.delete(peerId)
|
||||
|
||||
const peerProtocols = this._peerProtocols[peerId]
|
||||
if (peerProtocols) {
|
||||
Array.from(peerProtocols).forEach((protocolTag) => {
|
||||
const peerCountForProtocol = this._peerCountPerProtocol[protocolTag]
|
||||
if (peerCountForProtocol) {
|
||||
this._peerCountPerProtocol[protocolTag]--
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
this.emit('disconnected', peerId)
|
||||
}
|
||||
|
||||
_onLatencyMeasure (summary) {
|
||||
this._checkLimit('maxEventLoopDelay', summary.avgMs)
|
||||
}
|
||||
|
||||
_checkLimit (name, value) {
|
||||
const limit = this._options[name]
|
||||
debug('checking limit of %s. current value: %d of %d', name, value, limit)
|
||||
if (value > limit) {
|
||||
debug('%s: limit exceeded: %s, %d', this._peerId, name, value)
|
||||
this.emit('limit:exceeded', name, value)
|
||||
this._maybeDisconnectOne()
|
||||
}
|
||||
}
|
||||
|
||||
_checkProtocolMaxPeersLimit (protocolTag, value) {
|
||||
debug('checking protocol limit. current value of %s is %d', protocolTag, value)
|
||||
const limit = this._options.maxPeersPerProtocol[protocolTag]
|
||||
if (value > limit) {
|
||||
debug('%s: protocol max peers limit exceeded: %s, %d', this._peerId, protocolTag, value)
|
||||
this.emit('limit:exceeded', protocolTag, value)
|
||||
this._maybeDisconnectOne()
|
||||
}
|
||||
}
|
||||
|
||||
_maybeDisconnectOne () {
|
||||
if (this._options.minPeers < this._peerValues.size) {
|
||||
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
|
||||
debug('%s: sorted peer values: %j', this._peerId, peerValues)
|
||||
const disconnectPeer = peerValues[0]
|
||||
if (disconnectPeer) {
|
||||
const peerId = disconnectPeer[0]
|
||||
debug('%s: lowest value peer is %s', this._peerId, peerId)
|
||||
debug('%s: forcing disconnection from %j', this._peerId, peerId)
|
||||
this._disconnectPeer(peerId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_disconnectPeer (peerId) {
|
||||
debug('preemptively disconnecting peer', peerId)
|
||||
this.emit('%s: disconnect:preemptive', this._peerId, peerId)
|
||||
const peer = this._peers.get(peerId)
|
||||
this._libp2p.hangUp(peer, (err) => {
|
||||
if (err) {
|
||||
this.emit('error', err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ConnectionManager
|
||||
|
||||
function byPeerValue (peerValueEntryA, peerValueEntryB) {
|
||||
return peerValueEntryA[1] - peerValueEntryB[1]
|
||||
}
|
||||
|
||||
function fixMaxPeersPerProtocol (maxPeersPerProtocol) {
|
||||
if (!maxPeersPerProtocol) {
|
||||
maxPeersPerProtocol = {}
|
||||
}
|
||||
|
||||
Object.keys(maxPeersPerProtocol).forEach((transportTag) => {
|
||||
const max = maxPeersPerProtocol[transportTag]
|
||||
delete maxPeersPerProtocol[transportTag]
|
||||
maxPeersPerProtocol[transportTag.toLowerCase()] = max
|
||||
})
|
||||
|
||||
return maxPeersPerProtocol
|
||||
}
|
||||
|
||||
function protocolsFromPeerInfo (peerInfo) {
|
||||
const protocolTags = new Set()
|
||||
peerInfo.multiaddrs.forEach((multiaddr) => {
|
||||
multiaddr.protos().map(protocolToProtocolTag).forEach((protocolTag) => {
|
||||
protocolTags.add(protocolTag)
|
||||
})
|
||||
})
|
||||
|
||||
return Array.from(protocolTags)
|
||||
}
|
||||
|
||||
function protocolToProtocolTag (protocol) {
|
||||
return protocol.name.toLowerCase()
|
||||
}
|
Reference in New Issue
Block a user