mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-26 23:41:34 +00:00
feat: auto relay (#723)
* feat: auto relay * fix: leverage protoBook events to ask relay peers if they support hop * chore: refactor disconnect * chore: do not listen on a relayed conn * chore: tweaks * chore: improve _listenOnAvailableHopRelays logic * chore: default value of 1 to maxListeners on auto-relay
This commit is contained in:
committed by
Vasco Santos
parent
48656712ea
commit
caf66ea143
231
src/circuit/auto-relay.js
Normal file
231
src/circuit/auto-relay.js
Normal file
@ -0,0 +1,231 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:auto-relay')
|
||||
log.error = debug('libp2p:auto-relay:error')
|
||||
|
||||
const uint8ArrayFromString = require('uint8arrays/from-string')
|
||||
const uint8ArrayToString = require('uint8arrays/to-string')
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const { relay: multicodec } = require('./multicodec')
|
||||
const { canHop } = require('./circuit/hop')
|
||||
|
||||
const circuitProtoCode = 290
|
||||
const hopMetadataKey = 'hop_relay'
|
||||
const hopMetadataValue = 'true'
|
||||
|
||||
class AutoRelay {
|
||||
/**
|
||||
* Creates an instance of AutoRelay.
|
||||
* @constructor
|
||||
* @param {object} props
|
||||
* @param {Libp2p} props.libp2p
|
||||
* @param {number} [props.maxListeners = 1] maximum number of relays to listen.
|
||||
*/
|
||||
constructor ({ libp2p, maxListeners = 1 }) {
|
||||
this._libp2p = libp2p
|
||||
this._peerId = libp2p.peerId
|
||||
this._peerStore = libp2p.peerStore
|
||||
this._connectionManager = libp2p.connectionManager
|
||||
this._transportManager = libp2p.transportManager
|
||||
|
||||
this.maxListeners = maxListeners
|
||||
|
||||
/**
|
||||
* @type {Set<string>}
|
||||
*/
|
||||
this._listenRelays = new Set()
|
||||
|
||||
this._onProtocolChange = this._onProtocolChange.bind(this)
|
||||
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
|
||||
|
||||
this._peerStore.on('change:protocols', this._onProtocolChange)
|
||||
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a peer supports the relay protocol.
|
||||
* If the protocol is not supported, check if it was supported before and remove it as a listen relay.
|
||||
* If the protocol is supported, check if the peer supports **HOP** and add it as a listener if
|
||||
* inside the threshold.
|
||||
* @param {Object} props
|
||||
* @param {PeerId} props.peerId
|
||||
* @param {Array<string>} props.protocols
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async _onProtocolChange ({ peerId, protocols }) {
|
||||
const id = peerId.toB58String()
|
||||
|
||||
// Check if it has the protocol
|
||||
const hasProtocol = protocols.find(protocol => protocol === multicodec)
|
||||
|
||||
// If no protocol, check if we were keeping the peer before as a listenRelay
|
||||
if (!hasProtocol && this._listenRelays.has(id)) {
|
||||
this._removeListenRelay(id)
|
||||
return
|
||||
} else if (!hasProtocol || this._listenRelays.has(id)) {
|
||||
return
|
||||
}
|
||||
|
||||
// If protocol, check if can hop, store info in the metadataBook and listen on it
|
||||
try {
|
||||
const connection = this._connectionManager.get(peerId)
|
||||
|
||||
// Do not hop on a relayed connection
|
||||
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
|
||||
log(`relayed connection to ${id} will not be used to hop on`)
|
||||
return
|
||||
}
|
||||
|
||||
const supportsHop = await canHop({ connection })
|
||||
|
||||
if (supportsHop) {
|
||||
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
|
||||
await this._addListenRelay(connection, id)
|
||||
}
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Peer disconnects.
|
||||
* @param {Connection} connection connection to the peer
|
||||
* @return {void}
|
||||
*/
|
||||
_onPeerDisconnected (connection) {
|
||||
const peerId = connection.remotePeer
|
||||
const id = peerId.toB58String()
|
||||
|
||||
// Not listening on this relay
|
||||
if (!this._listenRelays.has(id)) {
|
||||
return
|
||||
}
|
||||
|
||||
this._removeListenRelay(id)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to listen on the given relay connection.
|
||||
* @private
|
||||
* @param {Connection} connection connection to the peer
|
||||
* @param {string} id peer identifier string
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async _addListenRelay (connection, id) {
|
||||
// Check if already listening on enough relays
|
||||
if (this._listenRelays.size >= this.maxListeners) {
|
||||
return
|
||||
}
|
||||
|
||||
// Create relay listen addr
|
||||
let listenAddr, remoteMultiaddr
|
||||
|
||||
try {
|
||||
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
|
||||
// TODO: HOP Relays should avoid advertising private addresses!
|
||||
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
|
||||
} catch (_) {
|
||||
log.error(`${id} does not have announced certified multiaddrs`)
|
||||
return
|
||||
}
|
||||
|
||||
if (!remoteMultiaddr.protoNames().includes('p2p')) {
|
||||
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
|
||||
} else {
|
||||
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
|
||||
}
|
||||
|
||||
// Attempt to listen on relay
|
||||
this._listenRelays.add(id)
|
||||
|
||||
try {
|
||||
await this._transportManager.listen([multiaddr(listenAddr)])
|
||||
// TODO: push announce multiaddrs update
|
||||
// await this._libp2p.identifyService.pushToPeerStore()
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
this._listenRelays.delete(id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove listen relay.
|
||||
* @private
|
||||
* @param {string} id peer identifier string.
|
||||
* @return {void}
|
||||
*/
|
||||
_removeListenRelay (id) {
|
||||
if (this._listenRelays.delete(id)) {
|
||||
// TODO: this should be responsibility of the connMgr
|
||||
this._listenOnAvailableHopRelays([id])
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to listen on available hop relay connections.
|
||||
* The following order will happen while we do not have enough relays.
|
||||
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected.
|
||||
* 2. Dial and try to listen on the peers we know that support hop but are not connected.
|
||||
* 3. Search the network.
|
||||
* @param {Array<string>} [peersToIgnore]
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async _listenOnAvailableHopRelays (peersToIgnore = []) {
|
||||
// TODO: The peer redial issue on disconnect should be handled by connection gating
|
||||
// Check if already listening on enough relays
|
||||
if (this._listenRelays.size >= this.maxListeners) {
|
||||
return
|
||||
}
|
||||
|
||||
const knownHopsToDial = []
|
||||
|
||||
// Check if we have known hop peers to use and attempt to listen on the already connected
|
||||
for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) {
|
||||
// Continue to next if listening on this or peer to ignore
|
||||
if (this._listenRelays.has(id) || peersToIgnore.includes(id)) {
|
||||
continue
|
||||
}
|
||||
|
||||
const supportsHop = metadataMap.get(hopMetadataKey)
|
||||
|
||||
// Continue to next if it does not support Hop
|
||||
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
|
||||
continue
|
||||
}
|
||||
|
||||
const peerId = PeerId.createFromCID(id)
|
||||
const connection = this._connectionManager.get(peerId)
|
||||
|
||||
// If not connected, store for possible later use.
|
||||
if (!connection) {
|
||||
knownHopsToDial.push(peerId)
|
||||
continue
|
||||
}
|
||||
|
||||
await this._addListenRelay(connection, id)
|
||||
|
||||
// Check if already listening on enough relays
|
||||
if (this._listenRelays.size >= this.maxListeners) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Try to listen on known peers that are not connected
|
||||
for (const peerId of knownHopsToDial) {
|
||||
const connection = await this._libp2p.dial(peerId)
|
||||
await this._addListenRelay(connection, peerId.toB58String())
|
||||
|
||||
// Check if already listening on enough relays
|
||||
if (this._listenRelays.size >= this.maxListeners) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Try to find relays to hop on the network
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = AutoRelay
|
@ -116,6 +116,33 @@ module.exports.hop = async function hop ({
|
||||
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
|
||||
* @param {object} options
|
||||
* @param {Connection} options.connection Connection to the relay
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
module.exports.canHop = async function canHop ({
|
||||
connection
|
||||
}) {
|
||||
// Create a new stream to the relay
|
||||
const { stream } = await connection.newStream([multicodec.relay])
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.CAN_HOP
|
||||
})
|
||||
|
||||
const response = await streamHandler.read()
|
||||
await streamHandler.close()
|
||||
|
||||
if (response.code !== CircuitPB.Status.SUCCESS) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an unencoded CAN_HOP response based on the Circuits configuration
|
||||
*
|
||||
|
@ -1,16 +1,18 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit')
|
||||
log.error = debug('libp2p:circuit:error')
|
||||
|
||||
const mafmt = require('mafmt')
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
const withIs = require('class-is')
|
||||
const { CircuitRelay: CircuitPB } = require('./protocol')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit')
|
||||
log.error = debug('libp2p:circuit:error')
|
||||
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
|
||||
|
||||
const AutoRelay = require('./auto-relay')
|
||||
const { relay: multicodec } = require('./multicodec')
|
||||
const createListener = require('./listener')
|
||||
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
|
||||
@ -35,11 +37,19 @@ class Circuit {
|
||||
this._libp2p = libp2p
|
||||
this.peerId = libp2p.peerId
|
||||
this._registrar.handle(multicodec, this._onProtocol.bind(this))
|
||||
|
||||
// Create autoRelay if enabled
|
||||
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
|
||||
}
|
||||
|
||||
async _onProtocol ({ connection, stream, protocol }) {
|
||||
async _onProtocol ({ connection, stream }) {
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const request = await streamHandler.read()
|
||||
|
||||
if (!request) {
|
||||
return
|
||||
}
|
||||
|
||||
const circuit = this
|
||||
let virtualConnection
|
||||
|
||||
@ -163,7 +173,7 @@ class Circuit {
|
||||
// Called on successful HOP and STOP requests
|
||||
this.handler = handler
|
||||
|
||||
return createListener(this, options)
|
||||
return createListener(this._libp2p, options)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener')
|
||||
log.err = debug('libp2p:circuit:error:listener')
|
||||
|
||||
/**
|
||||
* @param {*} circuit
|
||||
* @param {Libp2p} libp2p
|
||||
* @returns {Listener} a transport listener
|
||||
*/
|
||||
module.exports = (circuit) => {
|
||||
module.exports = (libp2p) => {
|
||||
const listener = new EventEmitter()
|
||||
const listeningAddrs = new Map()
|
||||
|
||||
// Remove listeningAddrs when a peer disconnects
|
||||
libp2p.connectionManager.on('peer:disconnect', (connection) => {
|
||||
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())
|
||||
|
||||
if (deleted) {
|
||||
// TODO push announce multiaddrs update
|
||||
// libp2p.identifyService.pushToPeerStore()
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Add swarm handler and listen for incoming connections
|
||||
*
|
||||
@ -24,7 +34,7 @@ module.exports = (circuit) => {
|
||||
listener.listen = async (addr) => {
|
||||
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')
|
||||
|
||||
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
|
||||
const relayConn = await libp2p.dial(multiaddr(addrString))
|
||||
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
|
||||
|
||||
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
|
||||
|
@ -63,6 +63,10 @@ const DefaultConfig = {
|
||||
hop: {
|
||||
enabled: false,
|
||||
active: false
|
||||
},
|
||||
autoRelay: {
|
||||
enabled: false,
|
||||
maxListeners: 2
|
||||
}
|
||||
},
|
||||
transport: {}
|
||||
|
@ -131,13 +131,12 @@ class IdentifyService {
|
||||
|
||||
/**
|
||||
* Calls `push` for all peers in the `peerStore` that are connected
|
||||
*
|
||||
* @param {PeerStore} peerStore
|
||||
* @returns {void}
|
||||
*/
|
||||
pushToPeerStore (peerStore) {
|
||||
pushToPeerStore () {
|
||||
const connections = []
|
||||
let connection
|
||||
for (const peer of peerStore.peers.values()) {
|
||||
for (const peer of this.peerStore.peers.values()) {
|
||||
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
|
||||
connections.push(connection)
|
||||
}
|
||||
|
@ -438,7 +438,7 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted() && this.identifyService) {
|
||||
this.identifyService.pushToPeerStore(this.peerStore)
|
||||
this.identifyService.pushToPeerStore()
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,13 +456,14 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted() && this.identifyService) {
|
||||
this.identifyService.pushToPeerStore(this.peerStore)
|
||||
this.identifyService.pushToPeerStore()
|
||||
}
|
||||
}
|
||||
|
||||
async _onStarting () {
|
||||
// Listen on the provided transports
|
||||
await this.transportManager.listen()
|
||||
// Listen on the provided transports for the provided addresses
|
||||
const addrs = this.addressManager.getListenAddrs()
|
||||
await this.transportManager.listen(addrs)
|
||||
|
||||
// Start PeerStore
|
||||
await this.peerStore.start()
|
||||
|
@ -270,7 +270,7 @@ class AddressBook extends Book {
|
||||
*
|
||||
* @override
|
||||
* @param {PeerId} peerId
|
||||
* @returns {Array<data>}
|
||||
* @returns {Array<Address>|undefined}
|
||||
*/
|
||||
get (peerId) {
|
||||
if (!PeerId.isPeerId(peerId)) {
|
||||
|
@ -139,11 +139,10 @@ class TransportManager {
|
||||
* Starts listeners for each listen Multiaddr.
|
||||
*
|
||||
* @async
|
||||
* @param {Array<Multiaddr>} addrs addresses to attempt to listen on
|
||||
*/
|
||||
async listen () {
|
||||
const addrs = this.libp2p.addressManager.getListenAddrs()
|
||||
|
||||
if (addrs.length === 0) {
|
||||
async listen (addrs) {
|
||||
if (!addrs || addrs.length === 0) {
|
||||
log('no addresses were provided for listening, this node is dial only')
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user