feat: auto relay network query for new relays

This commit is contained in:
Vasco Santos 2020-09-11 19:02:56 +02:00
parent 2530b834a1
commit e6b0134299
9 changed files with 458 additions and 184 deletions

View File

@ -45,6 +45,7 @@
"aggregate-error": "^3.0.1",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"cids": "^1.0.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
@ -66,6 +67,7 @@
"moving-average": "^1.0.0",
"multiaddr": "^8.1.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.1",
"multistream-select": "^1.0.0",
"mutable-proxy": "^1.0.0",
"node-forge": "^0.9.1",
@ -89,7 +91,6 @@
"chai-as-promised": "^7.1.1",
"chai-bytes": "^0.1.2",
"chai-string": "^1.5.0",
"cids": "^1.0.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interop-libp2p": "^0.3.0",

View File

@ -11,10 +11,13 @@ const PeerId = require('peer-id')
const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')
const circuitProtoCode = 290
const hopMetadataKey = 'hop_relay'
const hopMetadataValue = 'true'
const { namespaceToCid } = require('./utils')
const {
CIRCUIT_PROTO_CODE,
HOP_METADATA_KEY,
HOP_METADATA_VALUE,
RELAY_RENDEZVOUS_NS
} = require('./constants')
class AutoRelay {
/**
@ -76,7 +79,7 @@ class AutoRelay {
const connection = this._connectionManager.get(peerId)
// Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) {
log(`relayed connection to ${id} will not be used to hop on`)
return
}
@ -84,7 +87,7 @@ class AutoRelay {
const supportsHop = await canHop({ connection })
if (supportsHop) {
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
this._peerStore.metadataBook.set(peerId, HOP_METADATA_KEY, uint8ArrayFromString(HOP_METADATA_VALUE))
await this._addListenRelay(connection, id)
}
} catch (err) {
@ -125,15 +128,21 @@ class AutoRelay {
}
// Create relay listen addr
let listenAddr, remoteMultiaddr
let listenAddr, remoteMultiaddr, remoteAddrs
try {
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
// TODO: HOP Relays should avoid advertising private addresses!
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
} catch (_) {
log.error(`${id} does not have announced certified multiaddrs`)
return
// Attempt first if existing
if (!remoteAddrs || !remoteAddrs.length) {
return
}
remoteMultiaddr = remoteAddrs[0].multiaddr
}
if (!remoteMultiaddr.protoNames().includes('p2p')) {
@ -194,10 +203,10 @@ class AutoRelay {
continue
}
const supportsHop = metadataMap.get(hopMetadataKey)
const supportsHop = metadataMap.get(HOP_METADATA_KEY)
// Continue to next if it does not support Hop
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
if (!supportsHop || uint8ArrayToString(supportsHop) !== HOP_METADATA_VALUE) {
continue
}
@ -229,7 +238,32 @@ class AutoRelay {
}
}
// TODO: Try to find relays to hop on the network
// Try to find relays to hop on the network
try {
const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS)
for await (const provider of this._libp2p.contentRouting.findProviders(cid)) {
if (!provider || !provider.id || !provider.multiaddrs || !provider.multiaddrs.length) {
continue
}
const peerId = provider.id
this._peerStore.addressBook.add(peerId, provider.multiaddrs)
const connection = await this._libp2p.dial(peerId)
await this._addListenRelay(connection, peerId.toB58String())
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}
} catch (err) {
if (err.code !== 'NO_ROUTERS_AVAILABLE') {
throw err
} else {
log('there are no routers configured to find hop relay services')
}
}
}
}

12
src/circuit/constants.js Normal file
View File

@ -0,0 +1,12 @@
'use strict'
const minute = 60 * 1000
module.exports = {
ADVERTISE_BOOT_DELAY: 15 * minute,
ADVERTISE_TTL: 30 * minute,
CIRCUIT_PROTO_CODE: 290,
HOP_METADATA_KEY: 'hop_relay',
HOP_METADATA_VALUE: 'true',
RELAY_RENDEZVOUS_NS: '/libp2p/relay'
}

View File

@ -1,197 +1,76 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
const log = debug('libp2p:relay')
log.error = debug('libp2p:relay:error')
const AutoRelay = require('./auto-relay')
const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
const { handleStop } = require('./circuit/stop')
const StreamHandler = require('./circuit/stream-handler')
const { namespaceToCid } = require('./utils')
const {
ADVERTISE_BOOT_DELAY,
ADVERTISE_TTL,
RELAY_RENDEZVOUS_NS
} = require('./constants')
class Circuit {
class Relay {
/**
* Creates an instance of Circuit.
* Creates an instance of Relay.
*
* @class
* @param {object} options
* @param {Libp2p} options.libp2p
* @param {Upgrader} options.upgrader
* @param {Libp2p} libp2p
*/
constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer
this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader
constructor (libp2p) {
this._options = libp2p._config.relay
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this))
// Create autoRelay if enabled
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
}
async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()
/**
* Start Relay service.
* @returns {void}
*/
start () {
// Advertise service if HOP enabled
const canHop = this._options.hop.enabled
if (!request) {
return
}
const circuit = this
let virtualConnection
switch (request.type) {
case CircuitPB.Type.CAN_HOP: {
log('received CAN_HOP request from %s', connection.remotePeer.toB58String())
await handleCanHop({ circuit, connection, streamHandler })
break
}
case CircuitPB.Type.HOP: {
log('received HOP request from %s', connection.remotePeer.toB58String())
virtualConnection = await handleHop({
connection,
request,
streamHandler,
circuit
})
break
}
case CircuitPB.Type.STOP: {
log('received STOP request from %s', connection.remotePeer.toB58String())
virtualConnection = await handleStop({
connection,
request,
streamHandler,
circuit
})
break
}
default: {
log('Request of type %s not supported', request.type)
}
}
if (virtualConnection) {
const remoteAddr = multiaddr(request.dstPeer.addrs[0])
const localAddr = multiaddr(request.srcPeer.addrs[0])
const maConn = toConnection({
stream: virtualConnection,
remoteAddr,
localAddr
})
const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr)
const conn = await this._upgrader.upgradeInbound(maConn)
log('%s connection %s upgraded', type, maConn.remoteAddr)
this.handler && this.handler(conn)
if (canHop) {
this._timeout = setTimeout(() => {
this._advertiseService()
}, this._options.advertise.bootDelay || ADVERTISE_BOOT_DELAY)
}
}
/**
* Dial a peer over a relay
*
* @param {multiaddr} ma - the multiaddr of the peer to dial
* @param {Object} options - dial options
* @param {AbortSignal} [options.signal] - An optional abort signal
* @returns {Connection} - the connection
* Stop Relay service.
* @returns {void}
*/
async dial (ma, options) {
// Check the multiaddr to see if it contains a relay and a destination peer
const addrs = ma.toString().split('/p2p-circuit')
const relayAddr = multiaddr(addrs[0])
const destinationAddr = multiaddr(addrs[addrs.length - 1])
const relayPeer = PeerId.createFromCID(relayAddr.getPeerId())
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
let disconnectOnFailure = false
let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
}
stop () {
clearTimeout(this._timeout)
}
/**
* Advertise hop relay service in the network.
* @returns {Promise<void>}
*/
async _advertiseService () {
try {
const virtualConnection = await hop({
connection: relayConnection,
circuit: this,
request: {
type: CircuitPB.Type.HOP,
srcPeer: {
id: this.peerId.toBytes(),
addrs: this._libp2p.multiaddrs.map(addr => addr.bytes)
},
dstPeer: {
id: destinationPeer.toBytes(),
addrs: [multiaddr(destinationAddr).bytes]
}
}
})
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toB58String()}`)
const maConn = toConnection({
stream: virtualConnection,
remoteAddr: ma,
localAddr
})
log('new outbound connection %s', maConn.remoteAddr)
return this._upgrader.upgradeOutbound(maConn)
const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS)
await this._libp2p.contentRouting.provide(cid)
} catch (err) {
log.error('Circuit relay dial failed', err)
disconnectOnFailure && await relayConnection.close()
throw err
}
}
/**
* Create a listener
*
* @param {any} options
* @param {Function} handler
* @returns {listener}
*/
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
if (err.code === 'NO_ROUTERS_AVAILABLE') {
log('there are no routers configured to advertise hop relay service')
} else {
log.error(err)
}
}
// Called on successful HOP and STOP requests
this.handler = handler
return createListener(this._libp2p, options)
}
/**
* Filter check for all Multiaddrs that this transport can dial on
*
* @param {Array<Multiaddr>} multiaddrs
* @returns {Array<Multiaddr>}
*/
filter (multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter((ma) => {
return mafmt.Circuit.matches(ma)
})
// Restart timeout
this._timeout = setTimeout(() => {
this._advertiseService()
}, this._options.advertise.ttl || ADVERTISE_TTL)
}
}
/**
* @type {Circuit}
*/
module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' })
module.exports = Relay

194
src/circuit/transport.js Normal file
View File

@ -0,0 +1,194 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
const { handleStop } = require('./circuit/stop')
const StreamHandler = require('./circuit/stream-handler')
class Circuit {
/**
* Creates an instance of the Circuit Transport.
*
* @constructor
* @param {object} options
* @param {Libp2p} options.libp2p
* @param {Upgrader} options.upgrader
*/
constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer
this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader
this._options = libp2p._config.relay
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this))
}
async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read()
if (!request) {
return
}
const circuit = this
let virtualConnection
switch (request.type) {
case CircuitPB.Type.CAN_HOP: {
log('received CAN_HOP request from %s', connection.remotePeer.toB58String())
await handleCanHop({ circuit, connection, streamHandler })
break
}
case CircuitPB.Type.HOP: {
log('received HOP request from %s', connection.remotePeer.toB58String())
virtualConnection = await handleHop({
connection,
request,
streamHandler,
circuit
})
break
}
case CircuitPB.Type.STOP: {
log('received STOP request from %s', connection.remotePeer.toB58String())
virtualConnection = await handleStop({
connection,
request,
streamHandler,
circuit
})
break
}
default: {
log('Request of type %s not supported', request.type)
}
}
if (virtualConnection) {
const remoteAddr = multiaddr(request.dstPeer.addrs[0])
const localAddr = multiaddr(request.srcPeer.addrs[0])
const maConn = toConnection({
stream: virtualConnection,
remoteAddr,
localAddr
})
const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr)
const conn = await this._upgrader.upgradeInbound(maConn)
log('%s connection %s upgraded', type, maConn.remoteAddr)
this.handler && this.handler(conn)
}
}
/**
* Dial a peer over a relay
*
* @param {multiaddr} ma - the multiaddr of the peer to dial
* @param {Object} options - dial options
* @param {AbortSignal} [options.signal] - An optional abort signal
* @returns {Connection} - the connection
*/
async dial (ma, options) {
// Check the multiaddr to see if it contains a relay and a destination peer
const addrs = ma.toString().split('/p2p-circuit')
const relayAddr = multiaddr(addrs[0])
const destinationAddr = multiaddr(addrs[addrs.length - 1])
const relayPeer = PeerId.createFromCID(relayAddr.getPeerId())
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
let disconnectOnFailure = false
let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
}
try {
const virtualConnection = await hop({
connection: relayConnection,
circuit: this,
request: {
type: CircuitPB.Type.HOP,
srcPeer: {
id: this.peerId.toBytes(),
addrs: this._libp2p.multiaddrs.map(addr => addr.bytes)
},
dstPeer: {
id: destinationPeer.toBytes(),
addrs: [multiaddr(destinationAddr).bytes]
}
}
})
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toB58String()}`)
const maConn = toConnection({
stream: virtualConnection,
remoteAddr: ma,
localAddr
})
log('new outbound connection %s', maConn.remoteAddr)
return this._upgrader.upgradeOutbound(maConn)
} catch (err) {
log.error('Circuit relay dial failed', err)
disconnectOnFailure && await relayConnection.close()
throw err
}
}
/**
* Create a listener
*
* @param {any} options
* @param {Function} handler
* @return {listener}
*/
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
// Called on successful HOP and STOP requests
this.handler = handler
return createListener(this._libp2p, options)
}
/**
* Filter check for all Multiaddrs that this transport can dial on
*
* @param {Array<Multiaddr>} multiaddrs
* @returns {Array<Multiaddr>}
*/
filter (multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter((ma) => {
return mafmt.Circuit.matches(ma)
})
}
}
/**
* @type {Circuit}
*/
module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' })

16
src/circuit/utils.js Normal file
View File

@ -0,0 +1,16 @@
'use strict'
const CID = require('cids')
const multihashing = require('multihashing-async')
/**
* Convert a namespace string into a cid.
* @param {string} namespace
* @return {Promise<CID>}
*/
module.exports.namespaceToCid = async (namespace) => {
const bytes = new TextEncoder('utf8').encode(namespace)
const hash = await multihashing(bytes, 'sha2-256')
return new CID(hash)
}

View File

@ -4,6 +4,7 @@ const mergeOptions = require('merge-options')
const { dnsaddrResolver } = require('multiaddr/src/resolvers')
const Constants = require('./constants')
const RelayConstants = require('./circuit/constants')
const { FaultTolerance } = require('./transport-manager')
@ -56,6 +57,10 @@ const DefaultConfig = {
},
relay: {
enabled: true,
advertise: {
bootDelay: RelayConstants.ADVERTISE_BOOT_DELAY,
ttl: RelayConstants.ADVERTISE_TTL
},
hop: {
enabled: false,
active: false

View File

@ -17,7 +17,8 @@ const { codes, messages } = require('./errors')
const AddressManager = require('./address-manager')
const ConnectionManager = require('./connection-manager')
const Circuit = require('./circuit')
const Circuit = require('./circuit/transport')
const Relay = require('./circuit')
const Dialer = require('./dialer')
const Keychain = require('./keychain')
const Metrics = require('./metrics')
@ -146,6 +147,7 @@ class Libp2p extends EventEmitter {
if (this._config.relay.enabled) {
this.transportManager.add(Circuit.prototype[Symbol.toStringTag], Circuit)
this.relay = new Relay(this)
}
// Attach stream multiplexers
@ -249,6 +251,10 @@ class Libp2p extends EventEmitter {
try {
this._isStarted = false
// Relay
this.relay && this.relay.stop()
for (const service of this._discovery.values()) {
service.removeListener('peer', this._onDiscoveryPeer)
}
@ -503,6 +509,9 @@ class Libp2p extends EventEmitter {
// Peer discovery
await this._setupPeerDiscovery()
// Relay
this.relay && this.relay.start()
}
/**

View File

@ -8,7 +8,10 @@ const { expect } = chai
const delay = require('delay')
const pWaitFor = require('p-wait-for')
const sinon = require('sinon')
const nock = require('nock')
const ipfsHttpClient = require('ipfs-http-client')
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
const multiaddr = require('multiaddr')
const Libp2p = require('../../src')
const { relay: relayMulticodec } = require('../../src/circuit/multicodec')
@ -59,7 +62,7 @@ describe('auto-relay', () => {
})
})
autoRelay = libp2p.transportManager._transports.get('Circuit')._autoRelay
autoRelay = libp2p.relay._autoRelay
expect(autoRelay.maxListeners).to.eql(1)
})
@ -144,7 +147,7 @@ describe('auto-relay', () => {
})
})
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
autoRelay1 = relayLibp2p1.relay._autoRelay
expect(autoRelay1.maxListeners).to.eql(1)
})
@ -412,8 +415,8 @@ describe('auto-relay', () => {
})
})
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
autoRelay2 = relayLibp2p2.transportManager._transports.get('Circuit')._autoRelay
autoRelay1 = relayLibp2p1.relay._autoRelay
autoRelay2 = relayLibp2p2.relay._autoRelay
})
beforeEach(() => {
@ -457,4 +460,125 @@ describe('auto-relay', () => {
expect(autoRelay1._listenRelays.size).to.equal(1)
})
})
describe('discovery', () => {
let libp2p
let libp2p2
let relayLibp2p
beforeEach(async () => {
const peerIds = await createPeerId({ number: 3 })
// Create 2 nodes, and turn HOP on for the relay
;[libp2p, libp2p2, relayLibp2p] = peerIds.map((peerId, index) => {
const delegate = new DelegatedContentRouter(peerId, ipfsHttpClient({
host: '0.0.0.0',
protocol: 'http',
port: 60197
}), [
multiaddr('/ip4/0.0.0.0/tcp/60197')
])
const opts = {
...baseOptions,
config: {
...baseOptions.config,
relay: {
advertise: {
bootDelay: 1000,
ttl: 1000
},
hop: {
enabled: index === 2
},
autoRelay: {
enabled: true,
maxListeners: 1
}
}
}
}
return new Libp2p({
...opts,
modules: {
...opts.modules,
contentRouting: [delegate]
},
addresses: {
listen: [listenAddr]
},
connectionManager: {
autoDial: false
},
peerDiscovery: {
autoDial: false
},
peerId
})
})
sinon.spy(relayLibp2p.contentRouting, 'provide')
})
beforeEach(async () => {
nock('http://0.0.0.0:60197')
// mock the refs call
.post('/api/v0/refs')
.query(true)
.reply(200, null, [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
// Start each node
await Promise.all([libp2p, libp2p2, relayLibp2p].map(libp2p => libp2p.start()))
// Should provide on start
await pWaitFor(() => relayLibp2p.contentRouting.provide.callCount === 1)
const provider = relayLibp2p.peerId.toB58String()
const multiaddrs = relayLibp2p.multiaddrs.map((m) => m.toString())
// Mock findProviders
nock('http://0.0.0.0:60197')
.post('/api/v0/dht/findprovs')
.query(true)
.reply(200, `{"Extra":"","ID":"${provider}","Responses":[{"Addrs":${JSON.stringify(multiaddrs)},"ID":"${provider}"}],"Type":4}\n`, [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
})
afterEach(() => {
// Stop each node
return Promise.all([libp2p, libp2p2, relayLibp2p].map(libp2p => libp2p.stop()))
})
it('should find providers for relay and add it as listen relay', async () => {
const originalMultiaddrsLength = libp2p.multiaddrs.length
// Spy add listen relay
sinon.spy(libp2p.relay._autoRelay, '_addListenRelay')
// Spy Find Providers
sinon.spy(libp2p.contentRouting, 'findProviders')
// Try to listen on Available hop relays
await libp2p.relay._autoRelay._listenOnAvailableHopRelays()
// Should try to find relay service providers
await pWaitFor(() => libp2p.contentRouting.findProviders.callCount === 1)
// Wait for peer added as listen relay
await pWaitFor(() => libp2p.relay._autoRelay._addListenRelay.callCount === 1)
expect(libp2p.relay._autoRelay._listenRelays.size).to.equal(1)
await pWaitFor(() => libp2p.multiaddrs.length === originalMultiaddrsLength + 1)
const relayedAddr = libp2p.multiaddrs[libp2p.multiaddrs.length - 1]
libp2p2.peerStore.addressBook.set(libp2p2.peerId, [relayedAddr])
// Dial from peer 2 through the relayed address
const conn = await libp2p2.dial(libp2p2.peerId)
expect(conn).to.exist()
})
})
})