mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-24 14:31:35 +00:00
feat: add token based dialer
This commit is contained in:
83
test/dialing/dial-resolver.spec.js
Normal file
83
test/dialing/dial-resolver.spec.js
Normal file
@ -0,0 +1,83 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
const pDefer = require('p-defer')
|
||||
const pWaitFor = require('p-wait-for')
|
||||
const AggregateError = require('aggregate-error')
|
||||
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
||||
|
||||
const { DialResolver } = require('../../src/dialer/dial-request')
|
||||
|
||||
const mockAbortableDial = () => {
|
||||
const deferred = pDefer()
|
||||
function dial () {
|
||||
return {
|
||||
promise: deferred.promise,
|
||||
abort: () => deferred.reject(new AbortError())
|
||||
}
|
||||
}
|
||||
dial.reject = deferred.reject
|
||||
dial.resolve = deferred.resolve
|
||||
return dial
|
||||
}
|
||||
|
||||
describe('DialResolver', () => {
|
||||
it('should not run subsequent dials if finished', async () => {
|
||||
const deferred = pDefer()
|
||||
const dial = sinon.stub().callsFake(() => {
|
||||
return deferred
|
||||
})
|
||||
const dialResolver = new DialResolver()
|
||||
dialResolver.add(dial)
|
||||
deferred.resolve(true)
|
||||
|
||||
await pWaitFor(() => dialResolver.finished === true)
|
||||
|
||||
dialResolver.add(dial)
|
||||
expect(dial.callCount).to.equal(1)
|
||||
})
|
||||
|
||||
it('.flush should throw if all dials errored', async () => {
|
||||
const dialResolver = new DialResolver()
|
||||
const dials = [
|
||||
mockAbortableDial(),
|
||||
mockAbortableDial(),
|
||||
mockAbortableDial()
|
||||
]
|
||||
for (const dial of dials) {
|
||||
dialResolver.add(dial)
|
||||
dial.reject(new Error('transport error'))
|
||||
}
|
||||
|
||||
await expect(dialResolver.flush()).to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors.length', 3)
|
||||
})
|
||||
|
||||
it('.flush should resolve the successful dial', async () => {
|
||||
const dialResolver = new DialResolver()
|
||||
const mockConn = {}
|
||||
const dials = [
|
||||
mockAbortableDial(),
|
||||
mockAbortableDial(),
|
||||
mockAbortableDial()
|
||||
]
|
||||
|
||||
// Make the first succeed
|
||||
const successfulDial = dials.shift()
|
||||
dialResolver.add(successfulDial)
|
||||
successfulDial.resolve(mockConn)
|
||||
|
||||
// Error the rest
|
||||
for (const dial of dials) {
|
||||
dialResolver.add(dial)
|
||||
dial.reject(new Error('transport error'))
|
||||
}
|
||||
|
||||
await expect(dialResolver.flush()).to.eventually.be(mockConn)
|
||||
})
|
||||
})
|
@ -15,6 +15,7 @@ const PeerInfo = require('peer-info')
|
||||
const delay = require('delay')
|
||||
const pDefer = require('p-defer')
|
||||
const pipe = require('it-pipe')
|
||||
const AggregateError = require('aggregate-error')
|
||||
|
||||
const Libp2p = require('../../src')
|
||||
const Dialer = require('../../src/dialer')
|
||||
@ -79,15 +80,19 @@ describe('Dialing (direct, TCP)', () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
|
||||
await expect(dialer.connectToMultiaddr(unsupportedAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
})
|
||||
|
||||
it('should be able to connect to a given peer info', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
const dialer = new Dialer({
|
||||
transportManager: localTM,
|
||||
peerStore: {
|
||||
multiaddrsForPeer: () => [remoteAddr]
|
||||
}
|
||||
})
|
||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(remoteAddr)
|
||||
|
||||
const connection = await dialer.connectToPeer(peerInfo)
|
||||
expect(connection).to.exist()
|
||||
@ -112,14 +117,18 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
const dialer = new Dialer({
|
||||
transportManager: localTM,
|
||||
peerStore: {
|
||||
multiaddrsForPeer: () => [unsupportedAddr]
|
||||
}
|
||||
})
|
||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||
|
||||
await expect(dialer.connectToPeer(peerInfo))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
@ -136,7 +145,7 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
await expect(dialer.connectToMultiaddr(remoteAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.to.eventually.be.rejectedWith(Error)
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
|
||||
})
|
||||
|
||||
@ -146,25 +155,21 @@ describe('Dialing (direct, TCP)', () => {
|
||||
concurrency: 2
|
||||
})
|
||||
|
||||
expect(dialer.tokens).to.have.length(2)
|
||||
|
||||
const deferredDial = pDefer()
|
||||
sinon.stub(localTM, 'dial').callsFake(async () => {
|
||||
await deferredDial.promise
|
||||
})
|
||||
|
||||
// Add 3 dials
|
||||
Promise.all([
|
||||
dialer.connectToMultiaddr(remoteAddr),
|
||||
dialer.connectToMultiaddr(remoteAddr),
|
||||
dialer.connectToMultiaddr(remoteAddr)
|
||||
])
|
||||
// Perform 3 multiaddr dials
|
||||
dialer.connectToMultiaddrs([remoteAddr, remoteAddr, remoteAddr])
|
||||
|
||||
// Let the call stack run
|
||||
await delay(0)
|
||||
|
||||
// We should have 2 in progress, and 1 waiting
|
||||
expect(localTM.dial.callCount).to.equal(2)
|
||||
expect(dialer.queue.pending).to.equal(2)
|
||||
expect(dialer.queue.size).to.equal(1)
|
||||
expect(dialer.tokens).to.have.length(0)
|
||||
|
||||
deferredDial.resolve()
|
||||
|
||||
@ -172,8 +177,7 @@ describe('Dialing (direct, TCP)', () => {
|
||||
await delay(0)
|
||||
// All dials should have executed
|
||||
expect(localTM.dial.callCount).to.equal(3)
|
||||
expect(dialer.queue.pending).to.equal(0)
|
||||
expect(dialer.queue.size).to.equal(0)
|
||||
expect(dialer.tokens).to.have.length(2)
|
||||
})
|
||||
|
||||
describe('libp2p.dialer', () => {
|
||||
@ -214,7 +218,7 @@ describe('Dialing (direct, TCP)', () => {
|
||||
|
||||
after(() => remoteLibp2p.stop())
|
||||
|
||||
it('should use the dialer for connecting', async () => {
|
||||
it('should use the dialer for connecting to a multiaddr', async () => {
|
||||
libp2p = new Libp2p({
|
||||
peerInfo,
|
||||
modules: {
|
||||
@ -235,6 +239,29 @@ describe('Dialing (direct, TCP)', () => {
|
||||
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
|
||||
})
|
||||
|
||||
it('should use the dialer for connecting to a peer', async () => {
|
||||
libp2p = new Libp2p({
|
||||
peerInfo,
|
||||
modules: {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
}
|
||||
})
|
||||
|
||||
sinon.spy(libp2p.dialer, 'connectToMultiaddrs')
|
||||
const remotePeer = new PeerInfo(remoteLibp2p.peerInfo.id)
|
||||
remotePeer.multiaddrs.add(remoteAddr)
|
||||
|
||||
const connection = await libp2p.dial(remotePeer)
|
||||
expect(connection).to.exist()
|
||||
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
|
||||
expect(stream).to.exist()
|
||||
expect(protocol).to.equal('/echo/1.0.0')
|
||||
await connection.close()
|
||||
expect(libp2p.dialer.connectToMultiaddrs.callCount).to.equal(1)
|
||||
})
|
||||
|
||||
it('should be able to use hangup to close connections', async () => {
|
||||
libp2p = new Libp2p({
|
||||
peerInfo,
|
||||
|
@ -7,6 +7,7 @@ chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
const pDefer = require('p-defer')
|
||||
const pWaitFor = require('p-wait-for')
|
||||
const delay = require('delay')
|
||||
const Transport = require('libp2p-websockets')
|
||||
const Muxer = require('libp2p-mplex')
|
||||
@ -14,6 +15,7 @@ const Crypto = require('libp2p-secio')
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
const PeerInfo = require('peer-info')
|
||||
const AggregateError = require('aggregate-error')
|
||||
|
||||
const { codes: ErrorCodes } = require('../../src/errors')
|
||||
const Constants = require('../../src/constants')
|
||||
@ -49,6 +51,22 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
expect(dialer.timeout).to.equal(Constants.DIAL_TIMEOUT)
|
||||
})
|
||||
|
||||
it('should limit the number of tokens it provides', () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
const maxPerPeer = Constants.PER_PEER_LIMIT
|
||||
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS)
|
||||
const tokens = dialer.getTokens(maxPerPeer + 1)
|
||||
expect(tokens).to.have.length(maxPerPeer)
|
||||
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS - maxPerPeer)
|
||||
})
|
||||
|
||||
it('should not return tokens if non are left', () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
sinon.stub(dialer, 'tokens').value([])
|
||||
const tokens = dialer.getTokens(1)
|
||||
expect(tokens.length).to.equal(0)
|
||||
})
|
||||
|
||||
it('should be able to connect to a remote node via its multiaddr', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
|
||||
@ -69,30 +87,36 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
|
||||
await expect(dialer.connectToMultiaddr(unsupportedAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
})
|
||||
|
||||
it('should be able to connect to a given peer', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
const dialer = new Dialer({
|
||||
transportManager: localTM,
|
||||
peerStore: {
|
||||
multiaddrsForPeer: () => [remoteAddr]
|
||||
}
|
||||
})
|
||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(remoteAddr)
|
||||
|
||||
const connection = await dialer.connectToPeer(peerInfo)
|
||||
const connection = await dialer.connectToPeer(peerId)
|
||||
expect(connection).to.exist()
|
||||
await connection.close()
|
||||
})
|
||||
|
||||
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
const dialer = new Dialer({
|
||||
transportManager: localTM,
|
||||
peerStore: {
|
||||
multiaddrsForPeer: () => [unsupportedAddr]
|
||||
}
|
||||
})
|
||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||
|
||||
await expect(dialer.connectToPeer(peerInfo))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
await expect(dialer.connectToPeer(peerId))
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
@ -119,25 +143,21 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
concurrency: 2
|
||||
})
|
||||
|
||||
expect(dialer.tokens).to.have.length(2)
|
||||
|
||||
const deferredDial = pDefer()
|
||||
sinon.stub(localTM, 'dial').callsFake(async () => {
|
||||
await deferredDial.promise
|
||||
})
|
||||
|
||||
// Add 3 dials
|
||||
Promise.all([
|
||||
dialer.connectToMultiaddr(remoteAddr),
|
||||
dialer.connectToMultiaddr(remoteAddr),
|
||||
dialer.connectToMultiaddr(remoteAddr)
|
||||
])
|
||||
// Perform 3 multiaddr dials
|
||||
dialer.connectToMultiaddrs([remoteAddr, remoteAddr, remoteAddr])
|
||||
|
||||
// Let the call stack run
|
||||
await delay(0)
|
||||
|
||||
// We should have 2 in progress, and 1 waiting
|
||||
expect(localTM.dial.callCount).to.equal(2)
|
||||
expect(dialer.queue.pending).to.equal(2)
|
||||
expect(dialer.queue.size).to.equal(1)
|
||||
expect(dialer.tokens).to.have.length(0)
|
||||
|
||||
deferredDial.resolve()
|
||||
|
||||
@ -145,8 +165,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
await delay(0)
|
||||
// All dials should have executed
|
||||
expect(localTM.dial.callCount).to.equal(3)
|
||||
expect(dialer.queue.pending).to.equal(0)
|
||||
expect(dialer.queue.size).to.equal(0)
|
||||
expect(dialer.tokens).to.have.length(2)
|
||||
})
|
||||
|
||||
describe('libp2p.dialer', () => {
|
||||
@ -215,16 +234,18 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
}
|
||||
})
|
||||
|
||||
sinon.spy(libp2p.dialer.identifyService, 'identify')
|
||||
sinon.spy(libp2p.identifyService, 'identify')
|
||||
sinon.spy(libp2p.peerStore, 'replace')
|
||||
sinon.spy(libp2p.upgrader, 'onConnection')
|
||||
|
||||
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
// Wait for setImmediate to trigger the identify call
|
||||
await delay(1)
|
||||
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
|
||||
await libp2p.dialer.identifyService.identify.firstCall.returnValue
|
||||
|
||||
// Wait for onConnection to be called
|
||||
await pWaitFor(() => libp2p.upgrader.onConnection.callCount === 1)
|
||||
|
||||
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
||||
await libp2p.identifyService.identify.firstCall.returnValue
|
||||
|
||||
expect(libp2p.peerStore.replace.callCount).to.equal(1)
|
||||
})
|
||||
|
@ -10,6 +10,7 @@ const sinon = require('sinon')
|
||||
const multiaddr = require('multiaddr')
|
||||
const { collect } = require('streaming-iterables')
|
||||
const pipe = require('it-pipe')
|
||||
const AggregateError = require('aggregate-error')
|
||||
const { createPeerInfo } = require('../utils/creators/peer')
|
||||
const baseOptions = require('../utils/base-options')
|
||||
const Libp2p = require('../../src')
|
||||
@ -93,8 +94,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
})
|
||||
|
||||
it('should not stay connected to a relay when not already connected and HOP fails', async () => {
|
||||
@ -106,8 +107,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// We should not be connected to the relay, because we weren't before the dial
|
||||
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
@ -125,8 +126,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
await srcLibp2p.dial(relayAddr)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
expect(srcToRelayConn).to.exist()
|
||||
@ -152,8 +153,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
}])
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
.to.eventually.be.rejectedWith(AggregateError)
|
||||
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
expect(dstToRelayConn).to.exist()
|
||||
|
@ -197,15 +197,15 @@ describe('Identify', () => {
|
||||
peerInfo
|
||||
})
|
||||
|
||||
sinon.spy(libp2p.dialer.identifyService, 'identify')
|
||||
sinon.spy(libp2p.identifyService, 'identify')
|
||||
sinon.spy(libp2p.peerStore, 'replace')
|
||||
|
||||
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
// Wait for nextTick to trigger the identify call
|
||||
await delay(1)
|
||||
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
|
||||
await libp2p.dialer.identifyService.identify.firstCall.returnValue
|
||||
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
||||
await libp2p.identifyService.identify.firstCall.returnValue
|
||||
|
||||
expect(libp2p.peerStore.replace.callCount).to.equal(1)
|
||||
await connection.close()
|
||||
@ -217,8 +217,8 @@ describe('Identify', () => {
|
||||
peerInfo
|
||||
})
|
||||
|
||||
sinon.spy(libp2p.dialer.identifyService, 'identify')
|
||||
sinon.spy(libp2p.dialer.identifyService, 'push')
|
||||
sinon.spy(libp2p.identifyService, 'identify')
|
||||
sinon.spy(libp2p.identifyService, 'push')
|
||||
sinon.spy(libp2p.peerStore, 'update')
|
||||
|
||||
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
|
||||
@ -227,15 +227,15 @@ describe('Identify', () => {
|
||||
await delay(1)
|
||||
|
||||
// Wait for identify to finish
|
||||
await libp2p.dialer.identifyService.identify.firstCall.returnValue
|
||||
await libp2p.identifyService.identify.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
libp2p.handle('/echo/2.0.0', () => {})
|
||||
libp2p.unhandle('/echo/2.0.0')
|
||||
|
||||
// Verify the remote peer is notified of both changes
|
||||
expect(libp2p.dialer.identifyService.push.callCount).to.equal(2)
|
||||
for (const call of libp2p.dialer.identifyService.push.getCalls()) {
|
||||
expect(libp2p.identifyService.push.callCount).to.equal(2)
|
||||
for (const call of libp2p.identifyService.push.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId())
|
||||
|
Reference in New Issue
Block a user