mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
feat: abort all pending dials on stop
This commit is contained in:
parent
404fa69513
commit
ba02764c5f
@ -38,6 +38,7 @@ class Dialer {
|
|||||||
this.timeout = timeout
|
this.timeout = timeout
|
||||||
this.perPeerLimit = perPeerLimit
|
this.perPeerLimit = perPeerLimit
|
||||||
this.tokens = [...new Array(concurrency)].map((_, index) => index)
|
this.tokens = [...new Array(concurrency)].map((_, index) => index)
|
||||||
|
this.pendingDials = new Set()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -69,6 +70,12 @@ class Dialer {
|
|||||||
const signal = anySignal(signals)
|
const signal = anySignal(signals)
|
||||||
const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout)
|
const timeoutId = setTimeout(() => timeoutController.abort(), this.timeout)
|
||||||
|
|
||||||
|
const dial = {
|
||||||
|
dialRequest,
|
||||||
|
controller: timeoutController
|
||||||
|
}
|
||||||
|
this.pendingDials.add(dial)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const dialResult = await dialRequest.run({ ...options, signal })
|
const dialResult = await dialRequest.run({ ...options, signal })
|
||||||
clearTimeout(timeoutId)
|
clearTimeout(timeoutId)
|
||||||
@ -81,6 +88,8 @@ class Dialer {
|
|||||||
}
|
}
|
||||||
log.error(err)
|
log.error(err)
|
||||||
throw err
|
throw err
|
||||||
|
} finally {
|
||||||
|
this.pendingDials.delete(dial)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
11
src/index.js
11
src/index.js
@ -194,8 +194,15 @@ class Libp2p extends EventEmitter {
|
|||||||
log('libp2p is stopping')
|
log('libp2p is stopping')
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.pubsub && await this.pubsub.stop()
|
await Promise.all([
|
||||||
this._dht && await this._dht.stop()
|
this.pubsub && this.pubsub.stop(),
|
||||||
|
this._dht && this._dht.stop()
|
||||||
|
])
|
||||||
|
|
||||||
|
for (const dial of this.dialer.pendingDials.values()) {
|
||||||
|
dial.abort()
|
||||||
|
}
|
||||||
|
|
||||||
await this.transportManager.close()
|
await this.transportManager.close()
|
||||||
await this.registrar.close()
|
await this.registrar.close()
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
@ -169,6 +169,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
|
|
||||||
// We should have 2 in progress, and 1 waiting
|
// We should have 2 in progress, and 1 waiting
|
||||||
expect(dialer.tokens).to.have.length(0)
|
expect(dialer.tokens).to.have.length(0)
|
||||||
|
expect(dialer.pendingDials.size).to.equal(1) // 1 dial request
|
||||||
|
|
||||||
deferredDial.resolve(await createMockConnection())
|
deferredDial.resolve(await createMockConnection())
|
||||||
|
|
||||||
@ -178,6 +179,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
// Only two dials will be run, as the first two succeeded
|
// Only two dials will be run, as the first two succeeded
|
||||||
expect(localTM.dial.callCount).to.equal(2)
|
expect(localTM.dial.callCount).to.equal(2)
|
||||||
expect(dialer.tokens).to.have.length(2)
|
expect(dialer.tokens).to.have.length(2)
|
||||||
|
expect(dialer.pendingDials.size).to.equal(0)
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('libp2p.dialer', () => {
|
describe('libp2p.dialer', () => {
|
||||||
@ -278,5 +280,23 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
await libp2p.hangUp(connection.remotePeer)
|
await libp2p.hangUp(connection.remotePeer)
|
||||||
expect(connection.stat.timeline.close).to.exist()
|
expect(connection.stat.timeline.close).to.exist()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should abort pending dials on stop', async () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [Crypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
const abort = sinon.stub()
|
||||||
|
const dials = [{ abort }, { abort }, { abort }]
|
||||||
|
sinon.stub(libp2p.dialer, 'pendingDials').value(new Set(dials))
|
||||||
|
|
||||||
|
await libp2p.stop()
|
||||||
|
|
||||||
|
expect(abort).to.have.property('callCount', 3)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -29,7 +29,9 @@ describe('peer discovery scenarios', () => {
|
|||||||
remotePeerInfo2.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
|
remotePeerInfo2.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
|
||||||
})
|
})
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(async function () {
|
||||||
|
// Increase timeout until abort support for dht queries is in place
|
||||||
|
this.timeout(10e3)
|
||||||
libp2p && await libp2p.stop()
|
libp2p && await libp2p.stop()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user