chore: refactor connection manager and registrar

This commit is contained in:
Vasco Santos
2020-04-18 17:06:56 +02:00
committed by Jacob Heun
parent d33919d0b3
commit d3a4bf0a3f
18 changed files with 424 additions and 411 deletions

View File

@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
// Get the connection to the destination (stop) peer
const destinationPeer = new PeerId(request.dstPeer.id)
const destinationConnection = circuit._registrar.getConnection(destinationPeer)
const destinationConnection = circuit._connectionManager.get(destinationPeer)
if (!destinationConnection && !circuit._options.hop.active) {
log('HOP request received but we are not connected to the destination peer')
return streamHandler.end({

View File

@ -29,6 +29,7 @@ class Circuit {
constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer
this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader
this._options = libp2p._config.relay
this.addresses = libp2p.addresses
@ -107,7 +108,7 @@ class Circuit {
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(relayPeer)
let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true

View File

@ -6,6 +6,11 @@ const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')
const { EventEmitter } = require('events')
const PeerId = require('peer-id')
const { Connection } = require('libp2p-interfaces/src/connection')
const {
ERR_INVALID_PARAMETERS
} = require('../errors')
@ -22,7 +27,12 @@ const defaultOptions = {
defaultPeerValue: 1
}
class ConnectionManager {
/**
* Responsible for managing known connections.
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
* @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected.
*/
class ConnectionManager extends EventEmitter {
/**
* @constructor
* @param {Libp2p} libp2p
@ -38,9 +48,11 @@ class ConnectionManager {
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
*/
constructor (libp2p, options) {
super()
this._libp2p = libp2p
this._registrar = libp2p.registrar
this._peerId = libp2p.peerId.toB58String()
this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
if (this._options.maxConnections < this._options.minConnections) {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
@ -48,20 +60,38 @@ class ConnectionManager {
debug('options: %j', this._options)
this._metrics = libp2p.metrics
this._libp2p = libp2p
/**
* Map of peer identifiers to their peer value for pruning connections.
* @type {Map<string, number>}
*/
this._peerValues = new Map()
this._connections = new Map()
/**
* Map of connections per peer
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()
this._timer = null
this._checkMetrics = this._checkMetrics.bind(this)
}
/**
* Get current number of open connections.
*/
get size () {
return Array.from(this.connections.values())
.reduce((accumulator, value) => accumulator + value.length, 0)
}
/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
*/
start () {
if (this._metrics) {
if (this._libp2p.metrics) {
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
}
@ -77,13 +107,33 @@ class ConnectionManager {
/**
* Stops the Connection Manager
* @async
*/
stop () {
async stop () {
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
await this._close()
debug('stopped')
}
/**
* Cleans up the connections
* @async
*/
async _close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}
await tasks
this.connections.clear()
}
/**
* Sets the value of the given peer. Peers with lower values
* will be disconnected first.
@ -106,7 +156,7 @@ class ConnectionManager {
* @private
*/
_checkMetrics () {
const movingAverages = this._metrics.global.movingAverages
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
@ -122,12 +172,25 @@ class ConnectionManager {
* @param {Connection} connection
*/
onConnect (connection) {
if (!Connection.isConnection(connection)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}
const peerId = connection.remotePeer.toB58String()
this._connections.set(connection.id, connection)
const storedConn = this.connections.get(peerId)
if (storedConn) {
storedConn.push(connection)
} else {
this.connections.set(peerId, [connection])
this.emit('peer:connect', connection)
}
if (!this._peerValues.has(peerId)) {
this._peerValues.set(peerId, this._options.defaultPeerValue)
}
this._checkLimit('maxConnections', this._connections.size)
this._checkLimit('maxConnections', this.size)
}
/**
@ -135,8 +198,37 @@ class ConnectionManager {
* @param {Connection} connection
*/
onDisconnect (connection) {
this._connections.delete(connection.id)
this._peerValues.delete(connection.remotePeer.toB58String())
const peerId = connection.remotePeer.toB58String()
let storedConn = this.connections.get(peerId)
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn) {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
}
}
/**
* Get a connection with a peer.
* @param {PeerId} peerId
* @returns {Connection}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
const connections = this.connections.get(id)
// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
}
/**
@ -169,7 +261,7 @@ class ConnectionManager {
* @private
*/
_maybeDisconnectOne () {
if (this._options.minConnections < this._connections.size) {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
@ -177,9 +269,9 @@ class ConnectionManager {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
for (const connection of this._connections.values()) {
if (connection.remotePeer.toB58String() === peerId) {
connection.close()
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
break
}
}

View File

@ -46,16 +46,28 @@ class IdentifyService {
/**
* @constructor
* @param {object} options
* @param {Registrar} options.registrar
* @param {PeerStore} options.peerStore
* @param {ConnectionManager} options.connectionManager
* @param {Map<string, handler>} options.protocols A reference to the protocols we support
* @param {PeerId} options.peerId The peer running the identify service
* @param {{ listen: Array<Multiaddr>}} options.addresses The peer addresses
*/
constructor (options) {
/**
* @property {Registrar}
* @property {PeerStore}
*/
this.registrar = options.registrar
this.peerStore = options.peerStore
/**
* @property {ConnectionManager}
*/
this.connectionManager = options.connectionManager
this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer
this.identify(connection, peerId).catch(log.error)
})
/**
* @property {PeerId}
*/
@ -104,7 +116,7 @@ class IdentifyService {
const connections = []
let connection
for (const peer of peerStore.peers.values()) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer.id))) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
connections.push(connection)
}
}
@ -160,8 +172,8 @@ class IdentifyService {
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
// Update peers data in PeerStore
this.registrar.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
this.registrar.peerStore.protoBook.set(id, protocols)
this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
this.peerStore.protoBook.set(id, protocols)
// TODO: Track our observed address so that we can score it
log('received observed address of %s', observedAddr)
@ -245,13 +257,13 @@ class IdentifyService {
// Update peers data in PeerStore
const id = connection.remotePeer
try {
this.registrar.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
} catch (err) {
return log.error('received invalid listen addrs', err)
}
// Update the protocols
this.registrar.peerStore.protoBook.set(id, message.protocols)
this.peerStore.protoBook.set(id, message.protocols)
}
}

View File

@ -54,55 +54,40 @@ class Libp2p extends EventEmitter {
this._transport = [] // Transport instances/references
this._discovery = new Map() // Discovery service instances/references
// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Create Metrics
if (this._options.metrics.enabled) {
this.metrics = new Metrics(this._options.metrics)
this.metrics = new Metrics({
...this._options.metrics,
connectionManager: this.connectionManager
})
}
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerId,
metrics: this.metrics,
onConnection: (connection) => {
const peerId = connection.remotePeer
this.registrar.onConnect(peerId, connection)
this.connectionManager.onConnect(connection)
this.emit('peer:connect', peerId)
// Run identify for every connection
if (this.identifyService) {
this.identifyService.identify(connection, peerId)
.catch(log.error)
}
},
onConnectionEnd: (connection) => {
const peerId = connection.remotePeer
this.registrar.onDisconnect(peerId, connection)
this.connectionManager.onDisconnect(connection)
// If there are no connections to the peer, disconnect
if (!this.registrar.getConnection(peerId)) {
this.emit('peer:disconnect', peerId)
this.metrics && this.metrics.onPeerDisconnected(peerId)
}
}
onConnection: (connection) => this.connectionManager.onConnect(connection),
onConnectionEnd: (connection) => this.connectionManager.onDisconnect(connection)
})
// Create the Registrar
this.registrar = new Registrar({ peerStore: this.peerStore })
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle
// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Setup the transport manager
this.transportManager = new TransportManager({
libp2p: this,
upgrader: this.upgrader
})
// Create the Registrar
this.registrar = new Registrar({
peerStore: this.peerStore,
connectionManager: this.connectionManager
})
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle
// Attach crypto channels
if (this._modules.connEncryption) {
const cryptos = this._modules.connEncryption
@ -138,7 +123,8 @@ class Libp2p extends EventEmitter {
// Add the identify service since we can multiplex
this.identifyService = new IdentifyService({
registrar: this.registrar,
peerStore: this.peerStore,
connectionManager: this.connectionManager,
peerId: this.peerId,
addresses: this.addresses,
protocols: this.upgrader.protocols
@ -242,7 +228,6 @@ class Libp2p extends EventEmitter {
])
await this.transportManager.close()
await this.registrar.close()
ping.unmount(this)
this.dialer.destroy()
@ -294,7 +279,7 @@ class Libp2p extends EventEmitter {
*/
async dialProtocol (peer, protocols, options) {
const { id, multiaddrs } = getPeer(peer, this.peerStore)
let connection = this.registrar.getConnection(id)
let connection = this.connectionManager.get(id)
if (!connection) {
connection = await this.dialer.connectToPeer(peer, options)
@ -318,7 +303,7 @@ class Libp2p extends EventEmitter {
async hangUp (peer) {
const { id } = getPeer(peer)
const connections = this.registrar.connections.get(id.toB58String())
const connections = this.connectionManager.connections.get(id.toB58String())
if (!connections) {
return
@ -452,9 +437,9 @@ class Libp2p extends EventEmitter {
*/
async _maybeConnect (peerId) {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerId)) {
if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) {
const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > this.connectionManager._connections.size) {
if (minPeers > this.connectionManager.size) {
log('connecting to discovered peer %s', peerId.toB58String())
try {
await this.dialer.connectToPeer(peerId)

View File

@ -21,6 +21,7 @@ class Metrics {
/**
*
* @param {object} options
* @param {ConnectionManager} options.connectionManager
* @param {number} options.computeThrottleMaxQueueSize
* @param {number} options.computeThrottleTimeout
* @param {Array<number>} options.movingAverageIntervals
@ -34,6 +35,10 @@ class Metrics {
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
this._running = false
this._onMessage = this._onMessage.bind(this)
this._connectionManager = options.connectionManager
this._connectionManager.on('peer:disconnect', (connection) => {
this.onPeerDisconnected(connection.remotePeer)
})
}
/**

View File

@ -5,13 +5,10 @@ const errcode = require('err-code')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')
const PeerId = require('peer-id')
const {
ERR_INVALID_PARAMETERS
} = require('./errors')
const Topology = require('libp2p-interfaces/src/topology')
const { Connection } = require('libp2p-interfaces/src/connection')
/**
* Responsible for notifying registered protocols of events in the network.
@ -20,18 +17,14 @@ class Registrar {
/**
* @param {Object} props
* @param {PeerStore} props.peerStore
* @param {connectionManager} props.connectionManager
* @constructor
*/
constructor ({ peerStore }) {
constructor ({ peerStore, connectionManager }) {
// Used on topology to listen for protocol changes
this.peerStore = peerStore
/**
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()
this.connectionManager = connectionManager
/**
* Map of topologies
@ -41,6 +34,9 @@ class Registrar {
this.topologies = new Map()
this._handle = undefined
this._onDisconnect = this._onDisconnect.bind(this)
this.connectionManager.on('peer:disconnect', this._onDisconnect)
}
get handle () {
@ -51,93 +47,13 @@ class Registrar {
this._handle = handle
}
/**
* Cleans up the registrar
* @async
*/
async close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}
await tasks
this.connections.clear()
}
/**
* Add a new connected peer to the record
* TODO: this should live in the ConnectionManager
* @param {PeerId} peerId
* @param {Connection} conn
* @returns {void}
*/
onConnect (peerId, conn) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
if (!Connection.isConnection(conn)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
const storedConn = this.connections.get(id)
if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
}
/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerId} peerId
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerId, connection, error) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
let storedConn = this.connections.get(id)
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(id, storedConn)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerId, error)
}
this.connections.delete(id)
}
}
/**
* Get a connection with a peer.
* @param {PeerId} peerId
* @returns {Connection}
*/
getConnection (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const connections = this.connections.get(peerId.toB58String())
// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
return this.connectionManager.get(peerId)
}
/**
@ -169,6 +85,18 @@ class Registrar {
unregister (id) {
return this.topologies.delete(id)
}
/**
* Remove a disconnected peer from the record
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
_onDisconnect (connection, error) {
for (const [, topology] of this.topologies) {
topology.disconnect(connection.remotePeer, error)
}
}
}
module.exports = Registrar