refactor: circuit relay to async (#477)

* refactor: add dialing over relay support

* chore: fix lint

* fix: dont clear listeners on close

* fix: if dial errors already have codes, just rethrow them

* fix: clear the registrar when libp2p stops

* fix: improve connection maintenance with circuit

* chore: correct feedback

* test: use chai as promised

* test(fix): reset multiaddrs on dial test
This commit is contained in:
Jacob Heun
2019-11-29 16:41:08 +01:00
parent 997ee166b0
commit b518391a47
30 changed files with 937 additions and 1169 deletions

View File

@ -3,6 +3,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const Transport = require('libp2p-tcp')
@ -77,14 +78,9 @@ describe('Dialing (direct, TCP)', () => {
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
try {
await dialer.connectToMultiaddr(unsupportedAddr)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToMultiaddr(unsupportedAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
})
it('should be able to connect to a given peer info', async () => {
@ -121,14 +117,9 @@ describe('Dialing (direct, TCP)', () => {
const peerInfo = new PeerInfo(peerId)
peerInfo.multiaddrs.add(unsupportedAddr)
try {
await dialer.connectToPeer(peerInfo)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToPeer(peerInfo))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
})
it('should abort dials on queue task timeout', async () => {
@ -144,14 +135,9 @@ describe('Dialing (direct, TCP)', () => {
expect(options.signal.aborted).to.equal(true)
})
try {
await dialer.connectToMultiaddr(remoteAddr)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToMultiaddr(remoteAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
})
it('should dial to the max concurrency', async () => {

View File

@ -3,6 +3,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const pDefer = require('p-defer')
@ -67,14 +68,9 @@ describe('Dialing (direct, WebSockets)', () => {
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM })
try {
await dialer.connectToMultiaddr(unsupportedAddr)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToMultiaddr(unsupportedAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
})
it('should be able to connect to a given peer', async () => {
@ -94,14 +90,9 @@ describe('Dialing (direct, WebSockets)', () => {
const peerInfo = new PeerInfo(peerId)
peerInfo.multiaddrs.add(unsupportedAddr)
try {
await dialer.connectToPeer(peerInfo)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToPeer(peerInfo))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
})
it('should abort dials on queue task timeout', async () => {
@ -117,14 +108,9 @@ describe('Dialing (direct, WebSockets)', () => {
expect(options.signal.aborted).to.equal(true)
})
try {
await dialer.connectToMultiaddr(remoteAddr)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
return
}
expect.fail('Dial should have failed')
await expect(dialer.connectToMultiaddr(remoteAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
})
it('should dial to the max concurrency', async () => {

162
test/dialing/relay.node.js Normal file
View File

@ -0,0 +1,162 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const multiaddr = require('multiaddr')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options')
const Libp2p = require('../../src')
const { codes: Errors } = require('../../src/errors')
describe('Dialing (via relay, TCP)', () => {
let srcLibp2p
let relayLibp2p
let dstLibp2p
before(async () => {
const peerInfos = await createPeerInfoFromFixture(3)
// Create 3 nodes, and turn HOP on for the relay
;[srcLibp2p, relayLibp2p, dstLibp2p] = peerInfos.map((peerInfo, index) => {
const opts = baseOptions
index === 1 && (opts.config.relay.hop.enabled = true)
return new Libp2p({
...opts,
peerInfo
})
})
dstLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
})
beforeEach(() => {
// Start each node
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => {
// Reset multiaddrs and start
libp2p.peerInfo.multiaddrs.clear()
libp2p.peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
libp2p.start()
}))
})
afterEach(() => {
// Stop each node
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop()))
})
it('should be able to connect to a peer over a relay with active connections', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
const connection = await srcLibp2p.dial(dialAddr)
expect(connection).to.exist()
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerInfo.id.toBytes())
expect(connection.localPeer.toBytes()).to.eql(srcLibp2p.peerInfo.id.toBytes())
expect(connection.remoteAddr).to.eql(dialAddr)
expect(connection.localAddr).to.eql(
relayAddr // the relay address
.encapsulate(`/p2p/${relayIdString}`) // with its peer id
.encapsulate('/p2p-circuit') // the local peer is connected over the relay
.encapsulate(`/p2p/${srcLibp2p.peerInfo.id.toB58String()}`) // and the local peer id
)
const { stream: echoStream } = await connection.newStream('/echo/1.0.0')
const input = Buffer.from('hello')
const [output] = await pipe(
[input],
echoStream,
collect
)
expect(output.slice()).to.eql(input)
})
it('should fail to connect to a peer over a relay with inactive connections', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
await expect(srcLibp2p.dial(dialAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
})
it('should not stay connected to a relay when not already connected and HOP fails', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
await expect(srcLibp2p.dial(dialAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
// We should not be connected to the relay, because we weren't before the dial
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(srcToRelayConn).to.not.exist()
})
it('dialer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
await srcLibp2p.dial(relayAddr)
await expect(srcLibp2p.dial(dialAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(srcToRelayConn).to.exist()
expect(srcToRelayConn.stat.status).to.equal('open')
})
it('destination peer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toString()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
// Connect the destination peer and the relay
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
// Tamper with the our multiaddrs for the circuit message
sinon.stub(srcLibp2p.peerInfo.multiaddrs, 'toArray').returns([{
buffer: Buffer.from('an invalid multiaddr')
}])
await expect(srcLibp2p.dial(dialAddr))
.to.eventually.be.rejected()
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
expect(dstToRelayConn).to.exist()
expect(dstToRelayConn.stat.status).to.equal('open')
})
})

View File

@ -3,6 +3,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
@ -98,20 +99,18 @@ describe('Identify', () => {
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY })
// Run identify
try {
await Promise.all([
localIdentify.identify(localConnectionMock, localPeer.id),
remoteIdentify.handleMessage({
connection: remoteConnectionMock,
stream: remote,
protocol: multicodecs.IDENTIFY
})
])
expect.fail('should have thrown')
} catch (err) {
expect(err).to.exist()
expect(err.code).to.eql(Errors.ERR_INVALID_PEER)
}
const identifyPromise = Promise.all([
localIdentify.identify(localConnectionMock, localPeer.id),
remoteIdentify.handleMessage({
connection: remoteConnectionMock,
stream: remote,
protocol: multicodecs.IDENTIFY
})
])
await expect(identifyPromise)
.to.eventually.be.rejected()
.and.to.have.property('code', Errors.ERR_INVALID_PEER)
})
describe('push', () => {
@ -229,6 +228,7 @@ describe('Identify', () => {
// Wait for identify to finish
await libp2p.dialer.identifyService.identify.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
libp2p.handle('/echo/2.0.0', () => {})
libp2p.unhandle('/echo/2.0.0')

View File

@ -54,4 +54,19 @@ describe('registrar on dial', () => {
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
expect(remoteConn).to.exist()
})
it('should be closed on libp2p stop', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))
await libp2p.dial(remoteAddr)
expect(libp2p.registrar.connections.size).to.equal(1)
sinon.spy(libp2p.registrar, 'close')
await libp2p.stop()
expect(libp2p.registrar.close.callCount).to.equal(1)
expect(libp2p.registrar.connections.size).to.equal(0)
})
})

View File

@ -24,26 +24,14 @@ describe('registrar', () => {
})
it('should fail to register a protocol if no multicodec is provided', () => {
try {
registrar.register()
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if no multicodec is provided')
expect(() => registrar.register()).to.throw()
})
it('should fail to register a protocol if an invalid topology is provided', () => {
const fakeTopology = {
random: 1
}
try {
registrar.register()
} catch (err) {
expect(err).to.exist(fakeTopology)
return
}
throw new Error('should fail to register a protocol if an invalid topology is provided')
expect(() => registrar.register(fakeTopology)).to.throw()
})
})

View File

@ -38,11 +38,12 @@ describe('Transport Manager (TCP)', () => {
it('should be able to listen', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen(addrs)
expect(tm._listeners.size).to.equal(1)
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
// Ephemeral ip addresses may result in multiple listeners
expect(tm.getAddrs().length).to.equal(addrs.length)
await tm.close()
expect(tm._listeners.size).to.equal(0)
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
})
it('should be able to dial', async () => {

View File

@ -3,6 +3,7 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
@ -39,21 +40,25 @@ describe('Transport Manager (WebSockets)', () => {
await tm.remove(Transport.prototype[Symbol.toStringTag])
})
it('should not be able to add a transport without a key', () => {
expect(() => {
it('should not be able to add a transport without a key', async () => {
// Chai as promised conflicts with normal `throws` validation,
// so wrap the call in an async function
await expect((async () => { // eslint-disable-line
tm.add(undefined, Transport)
}).to.throw().that.satisfies((err) => {
return err.code === ErrorCodes.ERR_INVALID_KEY
})
})())
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_INVALID_KEY)
})
it('should not be able to add a transport twice', () => {
it('should not be able to add a transport twice', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
expect(() => {
// Chai as promised conflicts with normal `throws` validation,
// so wrap the call in an async function
await expect((async () => { // eslint-disable-line
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
}).to.throw().that.satisfies((err) => {
return err.code === ErrorCodes.ERR_DUPLICATE_TRANSPORT
})
})())
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_DUPLICATE_TRANSPORT)
})
it('should be able to dial', async () => {
@ -67,27 +72,18 @@ describe('Transport Manager (WebSockets)', () => {
it('should fail to dial an unsupported address', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
const addr = multiaddr('/ip4/127.0.0.1/tcp/0')
try {
await tm.dial(addr)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
return
}
expect.fail('Dial attempt should have failed')
await expect(tm.dial(addr))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
})
it('should fail to listen with no valid address', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
const addrs = [multiaddr('/ip4/127.0.0.1/tcp/0')]
try {
await tm.listen(addrs)
} catch (err) {
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_NO_VALID_ADDRESSES)
return
}
expect.fail('should have failed')
await expect(tm.listen(addrs))
.to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
})
})
@ -115,7 +111,8 @@ describe('libp2p.transportManager', () => {
})
expect(libp2p.transportManager).to.exist()
expect(libp2p.transportManager._transports.size).to.equal(1)
// Our transport and circuit relay
expect(libp2p.transportManager._transports.size).to.equal(2)
})
it('starting and stopping libp2p should start and stop TransportManager', async () => {

View File

@ -9,5 +9,13 @@ module.exports = {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
},
config: {
relay: {
enabled: true,
hop: {
enabled: false
}
}
}
}

View File

@ -9,5 +9,13 @@ module.exports = {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
},
config: {
relay: {
enabled: true,
hop: {
enabled: false
}
}
}
}

View File

@ -5,7 +5,7 @@ const PeerInfo = require('peer-info')
const Peers = require('../../fixtures/peers')
module.exports.createPeerInfo = async (length) => {
async function createPeerInfo (length) {
const peers = await Promise.all(
Array.from({ length })
.map((_, i) => PeerId.create())
@ -14,11 +14,19 @@ module.exports.createPeerInfo = async (length) => {
return peers.map((peer) => new PeerInfo(peer))
}
module.exports.createPeerInfoFromFixture = async (length) => {
const peers = await Promise.all(
function createPeerIdsFromFixture (length) {
return Promise.all(
Array.from({ length })
.map((_, i) => PeerId.createFromJSON(Peers[i]))
)
}
async function createPeerInfoFromFixture (length) {
const peers = await createPeerIdsFromFixture(length)
return peers.map((peer) => new PeerInfo(peer))
}
module.exports.createPeerInfo = createPeerInfo
module.exports.createPeerIdsFromFixture = createPeerIdsFromFixture
module.exports.createPeerInfoFromFixture = createPeerInfoFromFixture

View File

@ -1,10 +1,15 @@
'use strict'
const pipe = require('it-pipe')
const { Connection } = require('libp2p-interfaces/src/connection')
const multiaddr = require('multiaddr')
const Muxer = require('libp2p-mplex')
const Multistream = require('multistream-select')
const pair = require('it-pair')
const errCode = require('err-code')
const { codes } = require('../../src/errors')
const mockMultiaddrConnPair = require('./mockMultiaddrConn')
const peerUtils = require('./creators/peer')
module.exports = async (properties = {}) => {
@ -48,3 +53,103 @@ module.exports = async (properties = {}) => {
...properties
})
}
/**
* Creates a full connection pair, without the transport or encryption
*
* @param {object} options
* @param {Multiaddr[]} options.addrs Should contain two addresses for the local and remote peer respectively
* @param {PeerId[]} options.remotePeer Should contain two peer ids, for the local and remote peer respectively
* @param {Map<string, function>} options.protocols The protocols the connections should support
* @returns {{inbound:Connection, outbound:Connection}}
*/
module.exports.pair = function connectionPair ({ addrs, peers, protocols }) {
const [localPeer, remotePeer] = peers
const {
inbound: inboundMaConn,
outbound: outboundMaConn
} = mockMultiaddrConnPair({ addrs, remotePeer })
const inbound = createConnection({
direction: 'inbound',
maConn: inboundMaConn,
protocols,
// Inbound connection peers are reversed
localPeer: remotePeer,
remotePeer: localPeer
})
const outbound = createConnection({
direction: 'outbound',
maConn: outboundMaConn,
protocols,
localPeer,
remotePeer
})
return { inbound, outbound }
}
function createConnection ({
direction,
maConn,
localPeer,
remotePeer,
protocols
}) {
// Create the muxer
const muxer = new Muxer({
// Run anytime a remote stream is created
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(protocols.keys()))
connection.addStream(stream, protocol)
// Need to be able to notify a peer of this this._onStream({ connection, stream, protocol })
const handler = protocols.get(protocol)
handler({ connection, stream, protocol })
} catch (err) {
// Do nothing
}
},
// Run anytime a stream closes
onStreamEnd: muxedStream => {
connection.removeStream(muxedStream.id)
}
})
const newStream = async protocols => {
const muxedStream = muxer.newStream()
const mss = new Multistream.Dialer(muxedStream)
try {
const { stream, protocol } = await mss.select(protocols)
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err) {
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
}
}
// Pipe all data through the muxer
pipe(maConn, muxer, maConn)
maConn.timeline.upgraded = Date.now()
// Create the connection
const connection = new Connection({
localAddr: maConn.localAddr,
remoteAddr: maConn.remoteAddr,
localPeer: localPeer,
remotePeer: remotePeer,
stat: {
direction,
timeline: maConn.timeline,
multiplexer: Muxer.multicodec,
encryption: 'N/A'
},
newStream,
getStreams: () => muxer.streams,
close: err => maConn.close(err)
})
return connection
}

View File

@ -13,10 +13,11 @@ const AbortController = require('abort-controller')
*/
module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) {
const controller = new AbortController()
const [localAddr, remoteAddr] = addrs
const [inbound, outbound] = duplexPair()
outbound.localAddr = addrs[0]
outbound.remoteAddr = addrs[1].encapsulate(`/p2p/${remotePeer.toB58String()}`)
outbound.localAddr = localAddr
outbound.remoteAddr = remoteAddr.encapsulate(`/p2p/${remotePeer.toB58String()}`)
outbound.timeline = {
open: Date.now()
}
@ -25,8 +26,8 @@ module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) {
controller.abort()
}
inbound.localAddr = addrs[1]
inbound.remoteAddr = addrs[0]
inbound.localAddr = remoteAddr
inbound.remoteAddr = localAddr
inbound.timeline = {
open: Date.now()
}