mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-26 11:02:14 +00:00
fix: release tokens as soon as they are available
This commit is contained in:
parent
633b0c291f
commit
2570a1ba30
@ -47,11 +47,12 @@ class DialRequest {
|
|||||||
const tokenHolder = new FIFO()
|
const tokenHolder = new FIFO()
|
||||||
tokens.forEach(token => tokenHolder.push(token))
|
tokens.forEach(token => tokenHolder.push(token))
|
||||||
const dialAbortControllers = this.addrs.map(() => new AbortController())
|
const dialAbortControllers = this.addrs.map(() => new AbortController())
|
||||||
let completedDials = 0
|
let startedDials = 0
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await pAny(this.addrs.map(async (addr, i) => {
|
return await pAny(this.addrs.map(async (addr, i) => {
|
||||||
const token = await tokenHolder.shift() // get token
|
const token = await tokenHolder.shift() // get token
|
||||||
|
startedDials++
|
||||||
let conn
|
let conn
|
||||||
try {
|
try {
|
||||||
const signal = dialAbortControllers[i].signal
|
const signal = dialAbortControllers[i].signal
|
||||||
@ -59,9 +60,8 @@ class DialRequest {
|
|||||||
// Remove the successful AbortController so it is not aborted
|
// Remove the successful AbortController so it is not aborted
|
||||||
dialAbortControllers.splice(i, 1)
|
dialAbortControllers.splice(i, 1)
|
||||||
} finally {
|
} finally {
|
||||||
completedDials++
|
|
||||||
// If we have more dials to make, recycle the token, otherwise release it
|
// If we have more dials to make, recycle the token, otherwise release it
|
||||||
if (completedDials < this.addrs.length) {
|
if (startedDials < this.addrs.length) {
|
||||||
tokenHolder.push(token)
|
tokenHolder.push(token)
|
||||||
} else {
|
} else {
|
||||||
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
|
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
|
||||||
|
@ -11,6 +11,7 @@ const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
|||||||
const AbortController = require('abort-controller')
|
const AbortController = require('abort-controller')
|
||||||
const AggregateError = require('aggregate-error')
|
const AggregateError = require('aggregate-error')
|
||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
|
const delay = require('delay')
|
||||||
|
|
||||||
const { DialRequest } = require('../../src/dialer/dial-request')
|
const { DialRequest } = require('../../src/dialer/dial-request')
|
||||||
const createMockConnection = require('../utils/mockConnection')
|
const createMockConnection = require('../utils/mockConnection')
|
||||||
@ -50,6 +51,54 @@ describe('Dial Request', () => {
|
|||||||
expect(dialer.releaseToken).to.have.property('callCount', tokens.length)
|
expect(dialer.releaseToken).to.have.property('callCount', tokens.length)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should release tokens when all addr dials have started', async () => {
|
||||||
|
const mockConnection = await createMockConnection()
|
||||||
|
const deferred = pDefer()
|
||||||
|
const actions = {
|
||||||
|
1: async () => {
|
||||||
|
await delay(0)
|
||||||
|
return Promise.reject(error)
|
||||||
|
},
|
||||||
|
2: async () => {
|
||||||
|
await delay(0)
|
||||||
|
return Promise.reject(error)
|
||||||
|
},
|
||||||
|
3: () => deferred.promise
|
||||||
|
}
|
||||||
|
const dialAction = (num) => actions[num]()
|
||||||
|
const tokens = ['a', 'b']
|
||||||
|
const controller = new AbortController()
|
||||||
|
const dialer = {
|
||||||
|
getTokens: () => [...tokens],
|
||||||
|
releaseToken: () => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
const dialRequest = new DialRequest({
|
||||||
|
addrs: Object.keys(actions),
|
||||||
|
dialer,
|
||||||
|
dialAction
|
||||||
|
})
|
||||||
|
|
||||||
|
sinon.spy(actions, 1)
|
||||||
|
sinon.spy(actions, 2)
|
||||||
|
sinon.spy(actions, 3)
|
||||||
|
sinon.spy(dialer, 'releaseToken')
|
||||||
|
dialRequest.run({ signal: controller.signal })
|
||||||
|
// Let the first dials run
|
||||||
|
await delay(10)
|
||||||
|
|
||||||
|
// Only 1 dial should remain, so 1 token should have been released
|
||||||
|
expect(actions[1]).to.have.property('callCount', 1)
|
||||||
|
expect(actions[2]).to.have.property('callCount', 1)
|
||||||
|
expect(actions[3]).to.have.property('callCount', 1)
|
||||||
|
expect(dialer.releaseToken).to.have.property('callCount', 1)
|
||||||
|
|
||||||
|
// Finish the dial
|
||||||
|
deferred.resolve(mockConnection)
|
||||||
|
await delay(0)
|
||||||
|
expect(dialer.releaseToken).to.have.property('callCount', 2)
|
||||||
|
})
|
||||||
|
|
||||||
it('should throw an AggregateError if all dials fail', async () => {
|
it('should throw an AggregateError if all dials fail', async () => {
|
||||||
const actions = {
|
const actions = {
|
||||||
1: () => Promise.reject(error),
|
1: () => Promise.reject(error),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user