fix: dialer leaking resources after stopping (#947)

* fix: dialer leaking resources after stopping

* chore: add error code to test
This commit is contained in:
Vasco Santos 2021-06-14 09:19:23 +02:00 committed by GitHub
parent 755eb909f2
commit b291bc06ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 21 deletions

View File

@ -97,6 +97,11 @@ class ConnectionManager extends EventEmitter {
this._autoDialTimeout = null
this._checkMetrics = this._checkMetrics.bind(this)
this._autoDial = this._autoDial.bind(this)
this._latencyMonitor = new LatencyMonitor({
latencyCheckIntervalMs: this._options.pollInterval,
dataEmitIntervalMs: this._options.pollInterval
})
}
/**
@ -117,10 +122,7 @@ class ConnectionManager extends EventEmitter {
}
// latency monitor
this._latencyMonitor = new LatencyMonitor({
latencyCheckIntervalMs: this._options.pollInterval,
dataEmitIntervalMs: this._options.pollInterval
})
this._latencyMonitor.start()
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this._latencyMonitor.on('data', this._onLatencyMeasure)
@ -138,7 +140,9 @@ class ConnectionManager extends EventEmitter {
async stop () {
this._autoDialTimeout && this._autoDialTimeout.clear()
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
this._latencyMonitor.stop()
this._started = false
await this._close()

View File

@ -69,49 +69,55 @@ class LatencyMonitor extends EventEmitter {
}
that.asyncTestFn = asyncTestFn // If there is no asyncFn, we measure latency
}
start () {
// If process: use high resolution timer
if (globalThis.process && globalThis.process.hrtime) { // eslint-disable-line no-undef
debug('Using process.hrtime for timing')
that.now = globalThis.process.hrtime // eslint-disable-line no-undef
that.getDeltaMS = (startTime) => {
const hrtime = that.now(startTime)
this.now = globalThis.process.hrtime // eslint-disable-line no-undef
this.getDeltaMS = (startTime) => {
const hrtime = this.now(startTime)
return (hrtime[0] * 1000) + (hrtime[1] / 1000000)
}
// Let's try for a timer that only monotonically increases
} else if (typeof window !== 'undefined' && window.performance && window.performance.now) {
debug('Using performance.now for timing')
that.now = window.performance.now.bind(window.performance)
that.getDeltaMS = (startTime) => Math.round(that.now() - startTime)
this.now = window.performance.now.bind(window.performance)
this.getDeltaMS = (startTime) => Math.round(this.now() - startTime)
} else {
debug('Using Date.now for timing')
that.now = Date.now
that.getDeltaMS = (startTime) => that.now() - startTime
this.now = Date.now
this.getDeltaMS = (startTime) => this.now() - startTime
}
that._latencyData = that._initLatencyData()
this._latencyData = this._initLatencyData()
// We check for isBrowser because of browsers set max rates of timeouts when a page is hidden,
// so we fall back to another library
// See: http://stackoverflow.com/questions/6032429/chrome-timeouts-interval-suspended-in-background-tabs
if (isBrowser()) {
that._visibilityChangeEmitter = new VisibilityChangeEmitter()
this._visibilityChangeEmitter = new VisibilityChangeEmitter()
that._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => {
this._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => {
if (pageInFocus) {
that._startTimers()
this._startTimers()
} else {
that._emitSummary()
that._stopTimers()
this._emitSummary()
this._stopTimers()
}
})
}
if (!that._visibilityChangeEmitter || that._visibilityChangeEmitter.isVisible()) {
that._startTimers()
if (!this._visibilityChangeEmitter || this._visibilityChangeEmitter.isVisible()) {
this._startTimers()
}
}
stop () {
this._stopTimers()
}
/**
* Start internal timers
*

View File

@ -8,6 +8,7 @@ const errCode = require('err-code')
const { Multiaddr } = require('multiaddr')
// @ts-ignore timeout-abourt-controles does not export types
const TimeoutController = require('timeout-abort-controller')
const { AbortError } = require('abortable-iterator')
const { anySignal } = require('any-signal')
const DialRequest = require('./dial-request')
@ -76,6 +77,7 @@ class Dialer {
this.maxDialsPerPeer = maxDialsPerPeer
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
this._pendingDials = new Map()
this._pendingDialTargets = new Map()
for (const [key, value] of Object.entries(resolvers)) {
Multiaddr.resolvers.set(key, value)
@ -94,6 +96,11 @@ class Dialer {
}
}
this._pendingDials.clear()
for (const pendingTarget of this._pendingDialTargets.values()) {
pendingTarget.reject(new AbortError('Dialer was destroyed'))
}
this._pendingDialTargets.clear()
}
/**
@ -107,7 +114,7 @@ class Dialer {
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
const dialTarget = await this._createDialTarget(peer)
const dialTarget = await this._createCancellableDialTarget(peer)
if (!dialTarget.addrs.length) {
throw errCode(new Error('The dial request has no valid addresses'), codes.ERR_NO_VALID_ADDRESSES)
@ -130,6 +137,31 @@ class Dialer {
}
}
/**
* Connects to a given `peer` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @param {PeerId|Multiaddr|string} peer - The peer to dial
* @returns {Promise<DialTarget>}
*/
async _createCancellableDialTarget (peer) {
// Make dial target promise cancellable
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
const cancellablePromise = new Promise((resolve, reject) => {
this._pendingDialTargets.set(id, { resolve, reject })
})
const dialTarget = await Promise.race([
this._createDialTarget(peer),
cancellablePromise
])
this._pendingDialTargets.delete(id)
return dialTarget
}
/**
* Creates a DialTarget. The DialTarget is used to create and track
* the DialRequest to a given peer.

View File

@ -290,6 +290,34 @@ describe('Dialing (direct, WebSockets)', () => {
}
})
it('should cancel pending dial targets before proceeding', async () => {
const dialer = new Dialer({
transportManager: localTM,
peerStore: {
addressBook: {
set: () => { }
}
}
})
sinon.stub(dialer, '_createDialTarget').callsFake(() => {
const deferredDial = pDefer()
return deferredDial.promise
})
// Perform dial
const dialPromise = dialer.connectToPeer(peerId)
// Let the call stack run
await delay(0)
dialer.destroy()
await expect(dialPromise)
.to.eventually.be.rejected()
.and.to.have.property('code', 'ABORT_ERR')
})
describe('libp2p.dialer', () => {
const transportKey = Transport.prototype[Symbol.toStringTag]
let libp2p
@ -462,6 +490,42 @@ describe('Dialing (direct, WebSockets)', () => {
await libp2p.hangUp(remoteAddr)
})
it('should cancel pending dial targets and stop', async () => {
const [, remotePeerId] = await createPeerId({ number: 2 })
libp2p = new Libp2p({
peerId,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
},
config: {
transport: {
[transportKey]: {
filter: filters.all
}
}
}
})
sinon.stub(libp2p.dialer, '_createDialTarget').callsFake(() => {
const deferredDial = pDefer()
return deferredDial.promise
})
// Perform dial
const dialPromise = libp2p.dial(remotePeerId)
// Let the call stack run
await delay(0)
await libp2p.stop()
await expect(dialPromise)
.to.eventually.be.rejected()
.and.to.have.property('code', 'ABORT_ERR')
})
it('should abort pending dials on stop', async () => {
libp2p = new Libp2p({
peerId,