mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-18 11:41:21 +00:00
fix: fix unintended aborts in dialer (#1185)
Fix a bug where `DialRequest` can abort wrong dial attempts. Co-authored-by: Robert Kiel <robert.kiel@hoprnet.io>
This commit is contained in:
@ -62,7 +62,7 @@ export class DialRequest {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
const dialAbortControllers = this.addrs.map(() => {
|
const dialAbortControllers: Array<(AbortController | undefined)> = this.addrs.map(() => {
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
try {
|
try {
|
||||||
// fails on node < 15.4
|
// fails on node < 15.4
|
||||||
@ -80,16 +80,27 @@ export class DialRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let completedDials = 0
|
let completedDials = 0
|
||||||
|
let done = false
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await Promise.any(this.addrs.map(async (addr, i) => {
|
return await Promise.any(this.addrs.map(async (addr, i) => {
|
||||||
const token = await tokenHolder.shift() // get token
|
const token = await tokenHolder.shift() // get token
|
||||||
|
// End attempt once another attempt succeeded
|
||||||
|
if (done) {
|
||||||
|
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
|
||||||
|
throw errCode(new Error('dialAction already succeeded'), codes.ERR_ALREADY_SUCCEEDED)
|
||||||
|
}
|
||||||
|
|
||||||
|
const controller = dialAbortControllers[i]
|
||||||
|
if (controller == null) {
|
||||||
|
throw errCode(new Error('dialAction did not come with an AbortController'), codes.ERR_INVALID_PARAMETERS)
|
||||||
|
}
|
||||||
let conn
|
let conn
|
||||||
try {
|
try {
|
||||||
const signal = dialAbortControllers[i].signal
|
const signal = controller.signal
|
||||||
conn = await this.dialAction(addr, { ...options, signal: (options.signal != null) ? anySignal([signal, options.signal]) : signal })
|
conn = await this.dialAction(addr, { ...options, signal: (options.signal != null) ? anySignal([signal, options.signal]) : signal })
|
||||||
// Remove the successful AbortController so it is not aborted
|
// Remove the successful AbortController so it is not aborted
|
||||||
dialAbortControllers.splice(i, 1)
|
dialAbortControllers[i] = undefined
|
||||||
} finally {
|
} finally {
|
||||||
completedDials++
|
completedDials++
|
||||||
// If we have more or equal dials remaining than tokens, recycle the token, otherwise release it
|
// If we have more or equal dials remaining than tokens, recycle the token, otherwise release it
|
||||||
@ -102,10 +113,25 @@ export class DialRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (conn == null) {
|
||||||
|
// Notify Promise.any that attempt was not successful
|
||||||
|
// to prevent from returning undefined despite there
|
||||||
|
// were successful dial attempts
|
||||||
|
throw errCode(new Error('dialAction led to empty object'), codes.ERR_TRANSPORT_DIAL_FAILED)
|
||||||
|
} else {
|
||||||
|
// This dial succeeded, don't attempt anything else
|
||||||
|
done = true
|
||||||
|
}
|
||||||
|
|
||||||
return conn
|
return conn
|
||||||
}))
|
}))
|
||||||
} finally {
|
} finally {
|
||||||
dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else
|
// success/failure happened, abort everything else
|
||||||
|
dialAbortControllers.forEach(c => {
|
||||||
|
if (c !== undefined) {
|
||||||
|
c.abort()
|
||||||
|
}
|
||||||
|
})
|
||||||
tokens.forEach(token => this.dialer.releaseToken(token)) // release tokens back to the dialer
|
tokens.forEach(token => this.dialer.releaseToken(token)) // release tokens back to the dialer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,5 +69,6 @@ export enum codes {
|
|||||||
ERR_INVALID_PASS_LENGTH = 'ERR_INVALID_PASS_LENGTH',
|
ERR_INVALID_PASS_LENGTH = 'ERR_INVALID_PASS_LENGTH',
|
||||||
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
|
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
|
||||||
ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK',
|
ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK',
|
||||||
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD'
|
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD',
|
||||||
|
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED'
|
||||||
}
|
}
|
||||||
|
@ -15,10 +15,11 @@ const error = new Error('dial failure')
|
|||||||
describe('Dial Request', () => {
|
describe('Dial Request', () => {
|
||||||
it('should end when a single multiaddr dials succeeds', async () => {
|
it('should end when a single multiaddr dials succeeds', async () => {
|
||||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||||
|
const deferredConn = pDefer()
|
||||||
const actions: Record<string, () => Promise<any>> = {
|
const actions: Record<string, () => Promise<any>> = {
|
||||||
'/ip4/127.0.0.1/tcp/1231': async () => await Promise.reject(error),
|
'/ip4/127.0.0.1/tcp/1231': async () => await Promise.reject(error),
|
||||||
'/ip4/127.0.0.1/tcp/1232': async () => await Promise.resolve(connection),
|
'/ip4/127.0.0.1/tcp/1232': async () => await Promise.resolve(connection),
|
||||||
'/ip4/127.0.0.1/tcp/1233': async () => await Promise.reject(error)
|
'/ip4/127.0.0.1/tcp/1233': async () => await deferredConn.promise
|
||||||
}
|
}
|
||||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
@ -32,15 +33,12 @@ describe('Dial Request', () => {
|
|||||||
dialAction
|
dialAction
|
||||||
})
|
})
|
||||||
|
|
||||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1231')
|
// Make sure that dial attempt comes back before terminating last dial action
|
||||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1232')
|
expect(await dialRequest.run({ signal: controller.signal })).to.equal(connection)
|
||||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1233')
|
|
||||||
|
// End third dial attempt
|
||||||
|
deferredConn.resolve()
|
||||||
|
|
||||||
const result = await dialRequest.run({ signal: controller.signal })
|
|
||||||
expect(result).to.equal(connection)
|
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
|
||||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -73,14 +71,16 @@ describe('Dial Request', () => {
|
|||||||
// Let the first dials run
|
// Let the first dials run
|
||||||
await delay(0)
|
await delay(0)
|
||||||
|
|
||||||
// Finish the first 2 dials
|
|
||||||
firstDials.reject(error)
|
|
||||||
await delay(0)
|
|
||||||
|
|
||||||
// Only 1 dial should remain, so 1 token should have been released
|
// Only 1 dial should remain, so 1 token should have been released
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
||||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 1)
|
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
||||||
|
|
||||||
|
// Finish the first 2 dials
|
||||||
|
firstDials.reject(error)
|
||||||
|
|
||||||
|
await delay(0)
|
||||||
|
|
||||||
expect(dialerReleaseTokenSpy.callCount).to.equal(1)
|
expect(dialerReleaseTokenSpy.callCount).to.equal(1)
|
||||||
|
|
||||||
// Finish the dial and release the 2nd token
|
// Finish the dial and release the 2nd token
|
||||||
@ -214,4 +214,45 @@ describe('Dial Request', () => {
|
|||||||
expect(dialerGetTokensSpy.calledWith(addrs.length)).to.equal(true)
|
expect(dialerGetTokensSpy.calledWith(addrs.length)).to.equal(true)
|
||||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should abort other dials when one succeeds', async () => {
|
||||||
|
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||||
|
const actions: Record<string, () => Promise<any>> = {
|
||||||
|
'/ip4/127.0.0.1/tcp/1231': async () => {
|
||||||
|
await delay(100)
|
||||||
|
},
|
||||||
|
'/ip4/127.0.0.1/tcp/1232': async () => {
|
||||||
|
// Successful dial takes longer to establish
|
||||||
|
await delay(1000)
|
||||||
|
|
||||||
|
return connection
|
||||||
|
},
|
||||||
|
|
||||||
|
'/ip4/127.0.0.1/tcp/1233': async () => {
|
||||||
|
await delay(100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const signals: Record<string, AbortSignal | undefined> = {}
|
||||||
|
|
||||||
|
const dialRequest = new DialRequest({
|
||||||
|
addrs: Object.keys(actions).map(str => new Multiaddr(str)),
|
||||||
|
dialer: new Dialer({
|
||||||
|
maxParallelDials: 3
|
||||||
|
}),
|
||||||
|
dialAction: async (ma, opts) => {
|
||||||
|
signals[ma.toString()] = opts.signal
|
||||||
|
return await actions[ma.toString()]()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
await expect(dialRequest.run()).to.eventually.equal(connection)
|
||||||
|
|
||||||
|
// Dial attempt finished without connection
|
||||||
|
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', false)
|
||||||
|
// Dial attempt led to connection
|
||||||
|
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
|
||||||
|
// Dial attempt finished without connection
|
||||||
|
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', false)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
Reference in New Issue
Block a user