mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-01 19:51:19 +00:00
fix: stop dht before connection manager (#1041)
Stop the dht before the connection manager, otherwise in-flight eviction pings fail and we move on to the next one when we should just abort them all. Also pulls in the fix from #1039 and splits the auto-dialler out from the connection manager as during shutdown it can get into a weird state where it's simultaneously killing and creating connections so stop auto-dialling things before we cause connections to dip below the low watermark by killing existing connections. Fixes: https://github.com/ipfs/js-ipfs/issues/3923
This commit is contained in:
parent
eacd7e8f76
commit
3a9d5f64d9
118
src/connection-manager/auto-dialler.js
Normal file
118
src/connection-manager/auto-dialler.js
Normal file
@ -0,0 +1,118 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const mergeOptions = require('merge-options')
|
||||
// @ts-ignore retimer does not have types
|
||||
const retimer = require('retimer')
|
||||
|
||||
const log = Object.assign(debug('libp2p:connection-manager:auto-dialler'), {
|
||||
error: debug('libp2p:connection-manager:auto-dialler:err')
|
||||
})
|
||||
|
||||
const defaultOptions = {
|
||||
enabled: true,
|
||||
minConnections: 0,
|
||||
autoDialInterval: 10000
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('../index')} Libp2p
|
||||
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} AutoDiallerOptions
|
||||
* @property {boolean} [enabled = true] - Should preemptively guarantee connections are above the low watermark
|
||||
* @property {number} [minConnections = 0] - The minimum number of connections to avoid pruning
|
||||
* @property {number} [autoDialInterval = 10000] - How often, in milliseconds, it should preemptively guarantee connections are above the low watermark
|
||||
*/
|
||||
|
||||
class AutoDialler {
|
||||
/**
|
||||
* Proactively tries to connect to known peers stored in the PeerStore.
|
||||
* It will keep the number of connections below the upper limit and sort
|
||||
* the peers to connect based on wether we know their keys and protocols.
|
||||
*
|
||||
* @class
|
||||
* @param {Libp2p} libp2p
|
||||
* @param {AutoDiallerOptions} options
|
||||
*/
|
||||
constructor (libp2p, options = {}) {
|
||||
this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
|
||||
this._libp2p = libp2p
|
||||
this._running = false
|
||||
this._autoDialTimeout = null
|
||||
this._autoDial = this._autoDial.bind(this)
|
||||
|
||||
log('options: %j', this._options)
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the auto dialer
|
||||
*/
|
||||
start () {
|
||||
if (!this._options.enabled) {
|
||||
log('not enabled')
|
||||
return
|
||||
}
|
||||
|
||||
this._running = true
|
||||
this._autoDial()
|
||||
log('started')
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the auto dialler
|
||||
*/
|
||||
async stop () {
|
||||
if (!this._options.enabled) {
|
||||
log('not enabled')
|
||||
return
|
||||
}
|
||||
|
||||
this._running = false
|
||||
this._autoDialTimeout && this._autoDialTimeout.clear()
|
||||
log('stopped')
|
||||
}
|
||||
|
||||
async _autoDial () {
|
||||
const minConnections = this._options.minConnections
|
||||
|
||||
// Already has enough connections
|
||||
if (this._libp2p.connections.size >= minConnections) {
|
||||
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
|
||||
return
|
||||
}
|
||||
|
||||
// Sort peers on wether we know protocols of public keys for them
|
||||
const peers = Array.from(this._libp2p.peerStore.peers.values())
|
||||
.sort((a, b) => {
|
||||
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
|
||||
return 1
|
||||
} else if (b.id.pubKey && !a.id.pubKey) {
|
||||
return 1
|
||||
}
|
||||
return -1
|
||||
})
|
||||
|
||||
for (let i = 0; this._running && i < peers.length && this._libp2p.connections.size < minConnections; i++) {
|
||||
if (!this._libp2p.connectionManager.get(peers[i].id)) {
|
||||
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
|
||||
try {
|
||||
await this._libp2p.dialer.connectToPeer(peers[i].id)
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('could not connect to peerStore stored peer', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Connection Manager was stopped
|
||||
if (!this._running) {
|
||||
return
|
||||
}
|
||||
|
||||
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = AutoDialler
|
@ -94,9 +94,7 @@ class ConnectionManager extends EventEmitter {
|
||||
|
||||
this._started = false
|
||||
this._timer = null
|
||||
this._autoDialTimeout = null
|
||||
this._checkMetrics = this._checkMetrics.bind(this)
|
||||
this._autoDial = this._autoDial.bind(this)
|
||||
|
||||
this._latencyMonitor = new LatencyMonitor({
|
||||
latencyCheckIntervalMs: this._options.pollInterval,
|
||||
@ -128,8 +126,6 @@ class ConnectionManager extends EventEmitter {
|
||||
|
||||
this._started = true
|
||||
log('started')
|
||||
|
||||
this._options.autoDial && this._autoDial()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -138,7 +134,6 @@ class ConnectionManager extends EventEmitter {
|
||||
* @async
|
||||
*/
|
||||
async stop () {
|
||||
this._autoDialTimeout && this._autoDialTimeout.clear()
|
||||
this._timer && this._timer.clear()
|
||||
|
||||
this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
|
||||
@ -312,53 +307,6 @@ class ConnectionManager extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Proactively tries to connect to known peers stored in the PeerStore.
|
||||
* It will keep the number of connections below the upper limit and sort
|
||||
* the peers to connect based on wether we know their keys and protocols.
|
||||
*
|
||||
* @async
|
||||
* @private
|
||||
*/
|
||||
async _autoDial () {
|
||||
const minConnections = this._options.minConnections
|
||||
|
||||
// Already has enough connections
|
||||
if (this.size >= minConnections) {
|
||||
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
|
||||
return
|
||||
}
|
||||
|
||||
// Sort peers on wether we know protocols of public keys for them
|
||||
const peers = Array.from(this._libp2p.peerStore.peers.values())
|
||||
.sort((a, b) => {
|
||||
if (b.protocols && b.protocols.length && (!a.protocols || !a.protocols.length)) {
|
||||
return 1
|
||||
} else if (b.id.pubKey && !a.id.pubKey) {
|
||||
return 1
|
||||
}
|
||||
return -1
|
||||
})
|
||||
|
||||
for (let i = 0; i < peers.length && this.size < minConnections; i++) {
|
||||
if (!this.get(peers[i].id)) {
|
||||
log('connecting to a peerStore stored peer %s', peers[i].id.toB58String())
|
||||
try {
|
||||
await this._libp2p.dialer.connectToPeer(peers[i].id)
|
||||
|
||||
// Connection Manager was stopped
|
||||
if (!this._started) {
|
||||
return
|
||||
}
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('could not connect to peerStore stored peer', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this._autoDialTimeout = retimer(this._autoDial, this._options.autoDialInterval)
|
||||
}
|
||||
|
||||
/**
|
||||
* If we have more connections than our maximum, close a connection
|
||||
* to the lowest valued peer.
|
||||
|
11
src/index.js
11
src/index.js
@ -18,6 +18,7 @@ const { codes, messages } = require('./errors')
|
||||
|
||||
const AddressManager = require('./address-manager')
|
||||
const ConnectionManager = require('./connection-manager')
|
||||
const AutoDialler = require('./connection-manager/auto-dialler')
|
||||
const Circuit = require('./circuit/transport')
|
||||
const Relay = require('./circuit')
|
||||
const Dialer = require('./dialer')
|
||||
@ -193,9 +194,13 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
// Create the Connection Manager
|
||||
this.connectionManager = new ConnectionManager(this, {
|
||||
autoDial: this._config.peerDiscovery.autoDial,
|
||||
...this._options.connectionManager
|
||||
})
|
||||
this._autodialler = new AutoDialler(this, {
|
||||
enabled: this._config.peerDiscovery.autoDial,
|
||||
minConnections: this._options.connectionManager.minConnections,
|
||||
autoDialInterval: this._options.connectionManager.autoDialInterval
|
||||
})
|
||||
|
||||
// Create Metrics
|
||||
if (this._options.metrics.enabled) {
|
||||
@ -380,6 +385,8 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
this.relay && this.relay.stop()
|
||||
this.peerRouting.stop()
|
||||
this._autodialler.stop()
|
||||
await (this._dht && this._dht.stop())
|
||||
|
||||
for (const service of this._discovery.values()) {
|
||||
service.removeListener('peer', this._onDiscoveryPeer)
|
||||
@ -394,7 +401,6 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
await Promise.all([
|
||||
this.pubsub && this.pubsub.stop(),
|
||||
this._dht && this._dht.stop(),
|
||||
this.metrics && this.metrics.stop()
|
||||
])
|
||||
|
||||
@ -650,6 +656,7 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
|
||||
this.connectionManager.start()
|
||||
this._autodialler.start()
|
||||
|
||||
// Peer discovery
|
||||
await this._setupPeerDiscovery()
|
||||
|
Loading…
x
Reference in New Issue
Block a user