fix: clean up pending dials abort per feedback

This commit is contained in:
Jacob Heun 2019-12-10 13:48:34 +01:00
parent 43b98e64b6
commit 7c3371bf17
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
4 changed files with 66 additions and 17 deletions

View File

@ -43,7 +43,7 @@
"dependencies": { "dependencies": {
"abort-controller": "^3.0.0", "abort-controller": "^3.0.0",
"aggregate-error": "^3.0.1", "aggregate-error": "^3.0.1",
"any-signal": "^1.0.0", "any-signal": "^1.1.0",
"async": "^2.6.2", "async": "^2.6.2",
"async-iterator-all": "^1.0.0", "async-iterator-all": "^1.0.0",
"bignumber.js": "^9.0.0", "bignumber.js": "^9.0.0",
@ -78,6 +78,7 @@
"pull-handshake": "^1.1.4", "pull-handshake": "^1.1.4",
"pull-stream": "^3.6.9", "pull-stream": "^3.6.9",
"retimer": "^2.0.0", "retimer": "^2.0.0",
"timeout-abort-controller": "^1.0.0",
"xsalsa20": "^1.0.2" "xsalsa20": "^1.0.2"
}, },
"devDependencies": { "devDependencies": {

View File

@ -2,7 +2,7 @@
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const errCode = require('err-code') const errCode = require('err-code')
const AbortController = require('abort-controller') const TimeoutController = require('timeout-abort-controller')
const anySignal = require('any-signal') const anySignal = require('any-signal')
const debug = require('debug') const debug = require('debug')
const log = debug('libp2p:dialer') const log = debug('libp2p:dialer')
@ -38,7 +38,21 @@ class Dialer {
this.timeout = timeout this.timeout = timeout
this.perPeerLimit = perPeerLimit this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index) this.tokens = [...new Array(concurrency)].map((_, index) => index)
this.pendingDials = new Set() this._pendingDials = new Set()
}
/**
* Clears any pending dials
*/
destroy () {
for (const dial of this._pendingDials.values()) {
try {
dial.controller.abort()
} catch (err) {
log.error(err)
}
}
this._pendingDials.clear()
} }
/** /**
@ -64,21 +78,20 @@ class Dialer {
}) })
// Combine the timeout signal and options.signal, if provided // Combine the timeout signal and options.signal, if provided
const timeoutController = new AbortController() const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal] const signals = [timeoutController.signal]
options.signal && signals.push(options.signal) options.signal && signals.push(options.signal)
const signal = anySignal(signals) const signal = anySignal(signals)
const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout)
const dial = { const dial = {
dialRequest, dialRequest,
controller: timeoutController controller: timeoutController
} }
this.pendingDials.add(dial) this._pendingDials.add(dial)
try { try {
const dialResult = await dialRequest.run({ ...options, signal }) const dialResult = await dialRequest.run({ ...options, signal })
clearTimeout(timeoutId) timeoutController.clear()
log('dial succeeded to %s', dialResult.remoteAddr) log('dial succeeded to %s', dialResult.remoteAddr)
return dialResult return dialResult
} catch (err) { } catch (err) {
@ -89,7 +102,7 @@ class Dialer {
log.error(err) log.error(err)
throw err throw err
} finally { } finally {
this.pendingDials.delete(dial) this._pendingDials.delete(dial)
} }
} }

View File

@ -199,9 +199,7 @@ class Libp2p extends EventEmitter {
this._dht && this._dht.stop() this._dht && this._dht.stop()
]) ])
for (const dial of this.dialer.pendingDials.values()) { this.dialer.destroy()
dial.abort()
}
await this.transportManager.close() await this.transportManager.close()
await this.registrar.close() await this.registrar.close()

View File

@ -169,7 +169,7 @@ describe('Dialing (direct, WebSockets)', () => {
// We should have 2 in progress, and 1 waiting // We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.length(0) expect(dialer.tokens).to.have.length(0)
expect(dialer.pendingDials.size).to.equal(1) // 1 dial request expect(dialer._pendingDials.size).to.equal(1) // 1 dial request
deferredDial.resolve(await createMockConnection()) deferredDial.resolve(await createMockConnection())
@ -179,7 +179,45 @@ describe('Dialing (direct, WebSockets)', () => {
// Only two dials will be run, as the first two succeeded // Only two dials will be run, as the first two succeeded
expect(localTM.dial.callCount).to.equal(2) expect(localTM.dial.callCount).to.equal(2)
expect(dialer.tokens).to.have.length(2) expect(dialer.tokens).to.have.length(2)
expect(dialer.pendingDials.size).to.equal(0) expect(dialer._pendingDials.size).to.equal(0)
})
it('.destroy should abort pending dials', async () => {
const dialer = new Dialer({
transportManager: localTM,
concurrency: 2
})
expect(dialer.tokens).to.have.length(2)
sinon.stub(localTM, 'dial').callsFake((_, options) => {
const deferredDial = pDefer()
const onAbort = () => {
options.signal.removeEventListener('abort', onAbort)
deferredDial.reject(new AbortError())
}
options.signal.addEventListener('abort', onAbort)
return deferredDial.promise
})
// Perform 3 multiaddr dials
const dialPromise = dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr])
// Let the call stack run
await delay(0)
// We should have 2 in progress, and 1 waiting
expect(dialer.tokens).to.have.length(0)
expect(dialer._pendingDials.size).to.equal(1) // 1 dial request
try {
dialer.destroy()
await dialPromise
expect.fail('should have failed')
} catch (err) {
expect(err).to.be.an.instanceof(AggregateError)
expect(dialer._pendingDials.size).to.equal(0) // 1 dial request
}
}) })
describe('libp2p.dialer', () => { describe('libp2p.dialer', () => {
@ -290,13 +328,12 @@ describe('Dialing (direct, WebSockets)', () => {
connEncryption: [Crypto] connEncryption: [Crypto]
} }
}) })
const abort = sinon.stub()
const dials = [{ abort }, { abort }, { abort }] sinon.spy(libp2p.dialer, 'destroy')
sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials))
await libp2p.stop() await libp2p.stop()
expect(abort).to.have.property('callCount', 3) expect(libp2p.dialer.destroy).to.have.property('callCount', 1)
}) })
}) })
}) })