diff --git a/src/connection-manager/auto-dialler.js b/src/connection-manager/auto-dialler.js new file mode 100644 index 00000000..977b13f7 --- /dev/null +++ b/src/connection-manager/auto-dialler.js @@ -0,0 +1,118 @@ +'use strict' + +const debug = require('debug') +const mergeOptions = require('merge-options') +// @ts-ignore retimer does not have types +const retimer = require('retimer') + +const log = Object.assign(debug('libp2p:connection-manager:auto-dialler'), { + error: debug('libp2p:connection-manager:auto-dialler:err') +}) + +const defaultOptions = { + enabled: true, + minConnections: 0, + autoDialInterval: 10000 +} + +/** + * @typedef {import('../index')} Libp2p + * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection + */ + +/** + * @typedef {Object} AutoDiallerOptions + * @property {boolean} [enabled = true] - Should preemptively guarantee connections are above the low watermark + * @property {number} [minConnections = 0] - The minimum number of connections to avoid pruning + * @property {number} [autoDialInterval = 10000] - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark + */ + +class AutoDialler { + /** + * Proactively tries to connect to known peers stored in the PeerStore. + * It will keep the number of connections below the upper limit and sort + * the peers to connect based on wether we know their keys and protocols. + * + * @class + * @param {Libp2p} libp2p + * @param {AutoDiallerOptions} options + */ + constructor (libp2p, options = {}) { + this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options) + this._libp2p = libp2p + this._running = false + this._autoDialTimeout = null + this._autoDial = this._autoDial.bind(this) + + log('options: %j', this._options) + } + + /** + * Starts the auto dialer + */ + start () { + if (!this._options.enabled) { + log('not enabled') + return + } + + this._running = true + this._autoDial() + log('started') + } + + /** + * Stops the auto dialler + */ + async stop () { + if (!this._options.enabled) { + log('not enabled') + return + } + + this._running = false + this._autoDialTimeout && this._autoDialTimeout.clear() + log('stopped') + } + + async _autoDial () { + const minConnections = this._options.minConnections + + // Already has enough connections + if (this._libp2p.connections.size >= minConnections) { + this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval) + return + } + + // Sort peers on wether we know protocols of public keys for them + const peers = Array.from(this._libp2p.peerStore.peers.values()) + .sort((a, b) => { + if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) { + return 1 + } else if (b.id.pubKey && !a.id.pubKey) { + return 1 + } + return -1 + }) + + for (let i = 0; this._running && i < peers.length && this._libp2p.connections.size < minConnections; i++) { + if (!this._libp2p.connectionManager.get(peers[i].id)) { + log('connecting to a peerStore stored peer %s', peers[i].id.toB58String()) + try { + await this._libp2p.dialer.connectToPeer(peers[i].id) + } catch (/** @type {any} */ err) { + log.error('could not connect to peerStore stored peer', err) + } + } + } + + // Connection Manager was stopped + if (!this._running) { + return + } + + this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval) + } +} + +module.exports = AutoDialler diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 802c9fe6..84711272 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -94,9 +94,7 @@ class ConnectionManager extends EventEmitter { this._started = false this._timer = null - this._autoDialTimeout = null this._checkMetrics = this._checkMetrics.bind(this) - this._autoDial = this._autoDial.bind(this) this._latencyMonitor = new LatencyMonitor({ latencyCheckIntervalMs: this._options.pollInterval, @@ -128,8 +126,6 @@ class ConnectionManager extends EventEmitter { this._started = true log('started') - - this._options.autoDial && this._autoDial() } /** @@ -138,7 +134,6 @@ class ConnectionManager extends EventEmitter { * @async */ async stop () { - this._autoDialTimeout && this._autoDialTimeout.clear() this._timer && this._timer.clear() this._latencyMonitor.removeListener('data', this._onLatencyMeasure) @@ -312,53 +307,6 @@ class ConnectionManager extends EventEmitter { } } - /** - * Proactively tries to connect to known peers stored in the PeerStore. - * It will keep the number of connections below the upper limit and sort - * the peers to connect based on wether we know their keys and protocols. - * - * @async - * @private - */ - async _autoDial () { - const minConnections = this._options.minConnections - - // Already has enough connections - if (this.size >= minConnections) { - this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval) - return - } - - // Sort peers on wether we know protocols of public keys for them - const peers = Array.from(this._libp2p.peerStore.peers.values()) - .sort((a, b) => { - if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) { - return 1 - } else if (b.id.pubKey && !a.id.pubKey) { - return 1 - } - return -1 - }) - - for (let i = 0; i < peers.length && this.size < minConnections; i++) { - if (!this.get(peers[i].id)) { - log('connecting to a peerStore stored peer %s', peers[i].id.toB58String()) - try { - await this._libp2p.dialer.connectToPeer(peers[i].id) - - // Connection Manager was stopped - if (!this._started) { - return - } - } catch (/** @type {any} */ err) { - log.error('could not connect to peerStore stored peer', err) - } - } - } - - this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval) - } - /** * If we have more connections than our maximum, close a connection * to the lowest valued peer. diff --git a/src/index.js b/src/index.js index 63edbf69..2b428223 100644 --- a/src/index.js +++ b/src/index.js @@ -18,6 +18,7 @@ const { codes, messages } = require('./errors') const AddressManager = require('./address-manager') const ConnectionManager = require('./connection-manager') +const AutoDialler = require('./connection-manager/auto-dialler') const Circuit = require('./circuit/transport') const Relay = require('./circuit') const Dialer = require('./dialer') @@ -193,9 +194,13 @@ class Libp2p extends EventEmitter { // Create the Connection Manager this.connectionManager = new ConnectionManager(this, { - autoDial: this._config.peerDiscovery.autoDial, ...this._options.connectionManager }) + this._autodialler = new AutoDialler(this, { + enabled: this._config.peerDiscovery.autoDial, + minConnections: this._options.connectionManager.minConnections, + autoDialInterval: this._options.connectionManager.autoDialInterval + }) // Create Metrics if (this._options.metrics.enabled) { @@ -380,6 +385,8 @@ class Libp2p extends EventEmitter { this.relay && this.relay.stop() this.peerRouting.stop() + this._autodialler.stop() + await (this._dht && this._dht.stop()) for (const service of this._discovery.values()) { service.removeListener('peer', this._onDiscoveryPeer) @@ -394,7 +401,6 @@ class Libp2p extends EventEmitter { await Promise.all([ this.pubsub && this.pubsub.stop(), - this._dht && this._dht.stop(), this.metrics && this.metrics.stop() ]) @@ -650,6 +656,7 @@ class Libp2p extends EventEmitter { } this.connectionManager.start() + this._autodialler.start() // Peer discovery await this._setupPeerDiscovery()