feat: coalescing dial support (#518)

* docs: fix spelling in api

* fix: dont create peerstore twice

* feat: add support for dial coalescing

* doc(fix): add setPeerValue to API TOC

* docs: add more jsdocs to dialer

* chore: remove old comment

* fix: ensure connections are closed

* fix: registrar.getConnections returns first open conn

* fix: directly set the closed status

* chore: remove unneeded log

* refactor: peerStore.put takes an options object
This commit is contained in:
Jacob Heun
2019-12-15 17:33:16 +01:00
parent 4384d139d2
commit 15f7c2a974
14 changed files with 325 additions and 172 deletions

View File

@@ -14,8 +14,10 @@ const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const delay = require('delay')
const pDefer = require('p-defer')
const pSettle = require('p-settle')
const pipe = require('it-pipe')
const AggregateError = require('aggregate-error')
const { Connection } = require('libp2p-interfaces/src/connection')
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const Libp2p = require('../../src')
@@ -29,6 +31,7 @@ const swarmKeyBuffer = Buffer.from(require('../fixtures/swarm.key'))
const mockUpgrader = require('../utils/mockUpgrader')
const createMockConnection = require('../utils/mockConnection')
const Peers = require('../fixtures/peers')
const { createPeerInfo } = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
@@ -65,7 +68,7 @@ describe('Dialing (direct, TCP)', () => {
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
const connection = await dialer.connectToMultiaddr(remoteAddr)
const connection = await dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
await connection.close()
})
@@ -73,7 +76,8 @@ describe('Dialing (direct, TCP)', () => {
it('should be able to connect to a remote node via its stringified multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
const connection = await dialer.connectToMultiaddr(remoteAddr.toString())
const dialable = Dialer.getDialable(remoteAddr.toString())
const connection = await dialer.connectToPeer(dialable)
expect(connection).to.exist()
await connection.close()
})
@@ -81,7 +85,7 @@ describe('Dialing (direct, TCP)', () => {
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
await expect(dialer.connectToMultiaddr(unsupportedAddr))
await expect(dialer.connectToPeer(unsupportedAddr))
.to.eventually.be.rejectedWith(AggregateError)
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
})
@@ -113,7 +117,7 @@ describe('Dialing (direct, TCP)', () => {
peerInfo.multiaddrs.add(remoteAddr)
peerStore.put(peerInfo)
const connection = await dialer.connectToPeer(peerId)
const connection = await dialer.connectToPeer(peerInfo)
expect(connection).to.exist()
await connection.close()
})
@@ -147,15 +151,23 @@ describe('Dialing (direct, TCP)', () => {
throw new AbortError()
})
await expect(dialer.connectToMultiaddr(remoteAddr))
await expect(dialer.connectToPeer(remoteAddr))
.to.eventually.be.rejectedWith(Error)
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
})
it('should dial to the max concurrency', async () => {
const addrs = [
'/ip4/0.0.0.0/tcp/8000',
'/ip4/0.0.0.0/tcp/8001',
'/ip4/0.0.0.0/tcp/8002'
]
const dialer = new Dialer({
transportManager: localTM,
concurrency: 2
concurrency: 2,
peerStore: {
multiaddrsForPeer: () => addrs
}
})
expect(dialer.tokens).to.have.length(2)
@@ -163,8 +175,10 @@ describe('Dialing (direct, TCP)', () => {
const deferredDial = pDefer()
sinon.stub(localTM, 'dial').callsFake(() => deferredDial.promise)
const [peerInfo] = await createPeerInfo()
// Perform 3 multiaddr dials
dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr])
dialer.connectToPeer(peerInfo)
// Let the call stack run
await delay(0)
@@ -206,9 +220,10 @@ describe('Dialing (direct, TCP)', () => {
connEncryption: [Crypto]
}
})
remoteLibp2p.peerInfo.multiaddrs.add(listenAddr)
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
await remoteLibp2p.transportManager.listen([listenAddr])
await remoteLibp2p.start()
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
@@ -230,7 +245,7 @@ describe('Dialing (direct, TCP)', () => {
}
})
sinon.spy(libp2p.dialer, 'connectToMultiaddr')
sinon.spy(libp2p.dialer, 'connectToPeer')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
@@ -238,7 +253,7 @@ describe('Dialing (direct, TCP)', () => {
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
await connection.close()
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1)
})
it('should use the dialer for connecting to a peer', async () => {
@@ -251,7 +266,7 @@ describe('Dialing (direct, TCP)', () => {
}
})
sinon.spy(libp2p.dialer, 'connectToMultiaddr')
sinon.spy(libp2p.dialer, 'connectToPeer')
const remotePeer = new PeerInfo(remoteLibp2p.peerInfo.id)
remotePeer.multiaddrs.add(remoteAddr)
@@ -261,7 +276,7 @@ describe('Dialing (direct, TCP)', () => {
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
await connection.close()
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1)
})
it('should be able to use hangup to close connections', async () => {
@@ -296,7 +311,7 @@ describe('Dialing (direct, TCP)', () => {
sinon.spy(libp2p.upgrader.protector, 'protect')
sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer))
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
@@ -304,5 +319,95 @@ describe('Dialing (direct, TCP)', () => {
await connection.close()
expect(libp2p.upgrader.protector.protect.callCount).to.equal(1)
})
it('should coalesce parallel dials to the same peer (no id in multiaddr)', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const dials = 10
const dialResults = await Promise.all([...new Array(dials)].map((_, index) => {
if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo)
return libp2p.dial(remoteLibp2p.peerInfo.multiaddrs.toArray()[0])
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const connection of dialResults) {
expect(Connection.isConnection(connection)).to.equal(true)
}
// We will have two connections, since the multiaddr dial doesn't have a peer id
expect(libp2p.connectionManager._connections.size).to.equal(2)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(2)
})
it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const dials = 10
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerInfo.id.toString()}`)
const dialResults = await Promise.all([...new Array(dials)].map((_, index) => {
if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo)
return libp2p.dial(fullAddress)
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const connection of dialResults) {
expect(Connection.isConnection(connection)).to.equal(true)
}
// 1 connection, because we know the peer in the multiaddr
expect(libp2p.connectionManager._connections.size).to.equal(1)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(1)
})
it('should coalesce parallel dials to the same error on failure', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const dials = 10
const error = new Error('Boom')
sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error))
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerInfo.id.toString()}`)
const dialResults = await pSettle([...new Array(dials)].map((_, index) => {
if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo)
return libp2p.dial(fullAddress)
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const result of dialResults) {
expect(result).to.have.property('isRejected', true)
expect(result.reason).to.be.an.instanceof(AggregateError)
// All errors should be the exact same as `error`
for (const err of result.reason) {
expect(err).to.equal(error)
}
}
// 1 connection, because we know the peer in the multiaddr
expect(libp2p.connectionManager._connections.size).to.equal(0)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(0)
})
})
})

View File

@@ -28,6 +28,7 @@ const Peers = require('../fixtures/peers')
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
const mockUpgrader = require('../utils/mockUpgrader')
const createMockConnection = require('../utils/mockConnection')
const { createPeerId } = require('../utils/creators/peer')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
@@ -80,17 +81,27 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
const dialer = new Dialer({
transportManager: localTM,
peerStore: {
multiaddrsForPeer: () => [remoteAddr]
}
})
const connection = await dialer.connectToMultiaddr(remoteAddr)
const connection = await dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
await connection.close()
})
it('should be able to connect to a remote node via its stringified multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
const dialer = new Dialer({
transportManager: localTM,
peerStore: {
multiaddrsForPeer: () => [remoteAddr]
}
})
const connection = await dialer.connectToMultiaddr(remoteAddr.toString())
const connection = await dialer.connectToPeer(remoteAddr.toString())
expect(connection).to.exist()
await connection.close()
})
@@ -98,7 +109,7 @@ describe('Dialing (direct, WebSockets)', () => {
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
await expect(dialer.connectToMultiaddr(unsupportedAddr))
await expect(dialer.connectToPeer(unsupportedAddr))
.to.eventually.be.rejectedWith(AggregateError)
.and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
})
@@ -134,7 +145,10 @@ describe('Dialing (direct, WebSockets)', () => {
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
transportManager: localTM,
timeout: 50
timeout: 50,
peerStore: {
multiaddrsForPeer: () => [remoteAddr]
}
})
sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
expect(options.signal).to.exist()
@@ -145,7 +159,7 @@ describe('Dialing (direct, WebSockets)', () => {
throw new AbortError()
})
await expect(dialer.connectToMultiaddr(remoteAddr))
await expect(dialer.connectToPeer(remoteAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
})
@@ -153,7 +167,10 @@ describe('Dialing (direct, WebSockets)', () => {
it('should dial to the max concurrency', async () => {
const dialer = new Dialer({
transportManager: localTM,
concurrency: 2
concurrency: 2,
peerStore: {
multiaddrsForPeer: () => [remoteAddr, remoteAddr, remoteAddr]
}
})
expect(dialer.tokens).to.have.length(2)
@@ -161,8 +178,9 @@ describe('Dialing (direct, WebSockets)', () => {
const deferredDial = pDefer()
sinon.stub(localTM, 'dial').callsFake(() => deferredDial.promise)
const [peerId] = await createPeerId()
// Perform 3 multiaddr dials
dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr])
dialer.connectToPeer(peerId)
// Let the call stack run
await delay(0)
@@ -185,7 +203,10 @@ describe('Dialing (direct, WebSockets)', () => {
it('.destroy should abort pending dials', async () => {
const dialer = new Dialer({
transportManager: localTM,
concurrency: 2
concurrency: 2,
peerStore: {
multiaddrsForPeer: () => [remoteAddr, remoteAddr, remoteAddr]
}
})
expect(dialer.tokens).to.have.length(2)
@@ -201,7 +222,8 @@ describe('Dialing (direct, WebSockets)', () => {
})
// Perform 3 multiaddr dials
const dialPromise = dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr])
const [peerId] = await createPeerId()
const dialPromise = dialer.connectToPeer(peerId)
// Let the call stack run
await delay(0)
@@ -265,7 +287,8 @@ describe('Dialing (direct, WebSockets)', () => {
}
})
sinon.spy(libp2p.dialer, 'connectToMultiaddr')
sinon.spy(libp2p.dialer, 'connectToPeer')
sinon.spy(libp2p.peerStore, 'put')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
@@ -273,7 +296,8 @@ describe('Dialing (direct, WebSockets)', () => {
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
await connection.close()
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1)
expect(libp2p.peerStore.put.callCount).to.be.at.least(1)
})
it('should run identify automatically after connecting', async () => {
@@ -290,7 +314,7 @@ describe('Dialing (direct, WebSockets)', () => {
sinon.spy(libp2p.peerStore, 'replace')
sinon.spy(libp2p.upgrader, 'onConnection')
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for onConnection to be called

View File

@@ -42,13 +42,19 @@ describe('Dialing (via relay, TCP)', () => {
// Reset multiaddrs and start
libp2p.peerInfo.multiaddrs.clear()
libp2p.peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
libp2p.start()
return libp2p.start()
}))
})
afterEach(() => {
// Stop each node
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop()))
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(async libp2p => {
await libp2p.stop()
// Clear the peer stores
for (const peerId of libp2p.peerStore.peers.keys()) {
libp2p.peerStore.remove(peerId)
}
}))
})
it('should be able to connect to a peer over a relay with active connections', async () => {

View File

@@ -200,7 +200,7 @@ describe('Identify', () => {
sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'replace')
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)
@@ -221,7 +221,7 @@ describe('Identify', () => {
sinon.spy(libp2p.identifyService, 'push')
sinon.spy(libp2p.peerStore, 'update')
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)

View File

@@ -7,18 +7,12 @@ const { expect } = chai
const sinon = require('sinon')
const pDefer = require('p-defer')
const mergeOptions = require('merge-options')
const Libp2p = require('../../src')
const PeerStore = require('../../src/peer-store')
const multiaddr = require('multiaddr')
const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const mockConnection = require('../utils/mockConnection')
const addr = multiaddr('/ip4/127.0.0.1/tcp/8000')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
describe('peer-store', () => {
let peerStore
@@ -168,51 +162,6 @@ describe('peer-store', () => {
})
})
describe('peer-store on dial', () => {
let peerInfo
let remotePeerInfo
let libp2p
let remoteLibp2p
before(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfo({ number: 2 })
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo: remotePeerInfo
}))
})
after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})
it('should put the remote peerInfo after dial and emit event', async () => {
const remoteId = remotePeerInfo.id.toB58String()
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))
sinon.spy(libp2p.peerStore, 'put')
sinon.spy(libp2p.peerStore, 'add')
sinon.spy(libp2p.peerStore, 'update')
sinon.stub(libp2p.dialer, 'connectToMultiaddr').returns(mockConnection({
remotePeer: remotePeerInfo.id
}))
const connection = await libp2p.dial(listenAddr)
await connection.close()
expect(libp2p.peerStore.put.callCount).to.equal(1)
expect(libp2p.peerStore.add.callCount).to.equal(1)
expect(libp2p.peerStore.update.callCount).to.equal(0)
const storedPeer = libp2p.peerStore.get(remoteId)
expect(storedPeer).to.exist()
})
})
describe('peer-store on discovery', () => {
// TODO: implement with discovery
})