From b291bc06ec13feeb6e010730edfad754a3b2dc1b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 14 Jun 2021 09:19:23 +0200 Subject: [PATCH] fix: dialer leaking resources after stopping (#947) * fix: dialer leaking resources after stopping * chore: add error code to test --- src/connection-manager/index.js | 14 +++-- src/connection-manager/latency-monitor.js | 36 +++++++------ src/dialer/index.js | 34 +++++++++++- test/dialing/direct.spec.js | 64 +++++++++++++++++++++++ 4 files changed, 127 insertions(+), 21 deletions(-) diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 7a865127..d63ebf75 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -97,6 +97,11 @@ class ConnectionManager extends EventEmitter { this._autoDialTimeout = null this._checkMetrics = this._checkMetrics.bind(this) this._autoDial = this._autoDial.bind(this) + + this._latencyMonitor = new LatencyMonitor({ + latencyCheckIntervalMs: this._options.pollInterval, + dataEmitIntervalMs: this._options.pollInterval + }) } /** @@ -117,10 +122,7 @@ class ConnectionManager extends EventEmitter { } // latency monitor - this._latencyMonitor = new LatencyMonitor({ - latencyCheckIntervalMs: this._options.pollInterval, - dataEmitIntervalMs: this._options.pollInterval - }) + this._latencyMonitor.start() this._onLatencyMeasure = this._onLatencyMeasure.bind(this) this._latencyMonitor.on('data', this._onLatencyMeasure) @@ -138,7 +140,9 @@ class ConnectionManager extends EventEmitter { async stop () { this._autoDialTimeout && this._autoDialTimeout.clear() this._timer && this._timer.clear() - this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure) + + this._latencyMonitor.removeListener('data', this._onLatencyMeasure) + this._latencyMonitor.stop() this._started = false await this._close() diff --git a/src/connection-manager/latency-monitor.js b/src/connection-manager/latency-monitor.js index 6c3061b9..374794c8 100644 --- a/src/connection-manager/latency-monitor.js +++ b/src/connection-manager/latency-monitor.js @@ -69,49 +69,55 @@ class LatencyMonitor extends EventEmitter { } that.asyncTestFn = asyncTestFn // If there is no asyncFn, we measure latency + } + start () { // If process: use high resolution timer if (globalThis.process && globalThis.process.hrtime) { // eslint-disable-line no-undef debug('Using process.hrtime for timing') - that.now = globalThis.process.hrtime // eslint-disable-line no-undef - that.getDeltaMS = (startTime) => { - const hrtime = that.now(startTime) + this.now = globalThis.process.hrtime // eslint-disable-line no-undef + this.getDeltaMS = (startTime) => { + const hrtime = this.now(startTime) return (hrtime[0] * 1000) + (hrtime[1] / 1000000) } // Let's try for a timer that only monotonically increases } else if (typeof window !== 'undefined' && window.performance && window.performance.now) { debug('Using performance.now for timing') - that.now = window.performance.now.bind(window.performance) - that.getDeltaMS = (startTime) => Math.round(that.now() - startTime) + this.now = window.performance.now.bind(window.performance) + this.getDeltaMS = (startTime) => Math.round(this.now() - startTime) } else { debug('Using Date.now for timing') - that.now = Date.now - that.getDeltaMS = (startTime) => that.now() - startTime + this.now = Date.now + this.getDeltaMS = (startTime) => this.now() - startTime } - that._latencyData = that._initLatencyData() + this._latencyData = this._initLatencyData() // We check for isBrowser because of browsers set max rates of timeouts when a page is hidden, // so we fall back to another library // See: http://stackoverflow.com/questions/6032429/chrome-timeouts-interval-suspended-in-background-tabs if (isBrowser()) { - that._visibilityChangeEmitter = new VisibilityChangeEmitter() + this._visibilityChangeEmitter = new VisibilityChangeEmitter() - that._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => { + this._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => { if (pageInFocus) { - that._startTimers() + this._startTimers() } else { - that._emitSummary() - that._stopTimers() + this._emitSummary() + this._stopTimers() } }) } - if (!that._visibilityChangeEmitter || that._visibilityChangeEmitter.isVisible()) { - that._startTimers() + if (!this._visibilityChangeEmitter || this._visibilityChangeEmitter.isVisible()) { + this._startTimers() } } + stop () { + this._stopTimers() + } + /** * Start internal timers * diff --git a/src/dialer/index.js b/src/dialer/index.js index 31919a1f..3be5c36b 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -8,6 +8,7 @@ const errCode = require('err-code') const { Multiaddr } = require('multiaddr') // @ts-ignore timeout-abourt-controles does not export types const TimeoutController = require('timeout-abort-controller') +const { AbortError } = require('abortable-iterator') const { anySignal } = require('any-signal') const DialRequest = require('./dial-request') @@ -76,6 +77,7 @@ class Dialer { this.maxDialsPerPeer = maxDialsPerPeer this.tokens = [...new Array(maxParallelDials)].map((_, index) => index) this._pendingDials = new Map() + this._pendingDialTargets = new Map() for (const [key, value] of Object.entries(resolvers)) { Multiaddr.resolvers.set(key, value) @@ -94,6 +96,11 @@ class Dialer { } } this._pendingDials.clear() + + for (const pendingTarget of this._pendingDialTargets.values()) { + pendingTarget.reject(new AbortError('Dialer was destroyed')) + } + this._pendingDialTargets.clear() } /** @@ -107,7 +114,7 @@ class Dialer { * @returns {Promise} */ async connectToPeer (peer, options = {}) { - const dialTarget = await this._createDialTarget(peer) + const dialTarget = await this._createCancellableDialTarget(peer) if (!dialTarget.addrs.length) { throw errCode(new Error('The dial request has no valid addresses'), codes.ERR_NO_VALID_ADDRESSES) @@ -130,6 +137,31 @@ class Dialer { } } + /** + * Connects to a given `peer` by dialing all of its known addresses. + * The dial to the first address that is successfully able to upgrade a connection + * will be used. + * + * @param {PeerId|Multiaddr|string} peer - The peer to dial + * @returns {Promise} + */ + async _createCancellableDialTarget (peer) { + // Make dial target promise cancellable + const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}` + const cancellablePromise = new Promise((resolve, reject) => { + this._pendingDialTargets.set(id, { resolve, reject }) + }) + + const dialTarget = await Promise.race([ + this._createDialTarget(peer), + cancellablePromise + ]) + + this._pendingDialTargets.delete(id) + + return dialTarget + } + /** * Creates a DialTarget. The DialTarget is used to create and track * the DialRequest to a given peer. diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index a074e02f..1dc29544 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -290,6 +290,34 @@ describe('Dialing (direct, WebSockets)', () => { } }) + it('should cancel pending dial targets before proceeding', async () => { + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + addressBook: { + set: () => { } + } + } + }) + + sinon.stub(dialer, '_createDialTarget').callsFake(() => { + const deferredDial = pDefer() + return deferredDial.promise + }) + + // Perform dial + const dialPromise = dialer.connectToPeer(peerId) + + // Let the call stack run + await delay(0) + + dialer.destroy() + + await expect(dialPromise) + .to.eventually.be.rejected() + .and.to.have.property('code', 'ABORT_ERR') + }) + describe('libp2p.dialer', () => { const transportKey = Transport.prototype[Symbol.toStringTag] let libp2p @@ -462,6 +490,42 @@ describe('Dialing (direct, WebSockets)', () => { await libp2p.hangUp(remoteAddr) }) + it('should cancel pending dial targets and stop', async () => { + const [, remotePeerId] = await createPeerId({ number: 2 }) + + libp2p = new Libp2p({ + peerId, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + }, + config: { + transport: { + [transportKey]: { + filter: filters.all + } + } + } + }) + + sinon.stub(libp2p.dialer, '_createDialTarget').callsFake(() => { + const deferredDial = pDefer() + return deferredDial.promise + }) + + // Perform dial + const dialPromise = libp2p.dial(remotePeerId) + + // Let the call stack run + await delay(0) + + await libp2p.stop() + await expect(dialPromise) + .to.eventually.be.rejected() + .and.to.have.property('code', 'ABORT_ERR') + }) + it('should abort pending dials on stop', async () => { libp2p = new Libp2p({ peerId,