diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index c2284f7e..21fe90ca 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -10,8 +10,7 @@ const { toString: uint8ArrayToString } = require('uint8arrays/to-string') const { Multiaddr } = require('multiaddr') const all = require('it-all') -const { relayV1: multicodec } = require('./multicodec') -const { canHop } = require('./v1/hop') +const { protocolIDv2Hop } = require('./multicodec') const { namespaceToCid } = require('./utils') const { CIRCUIT_PROTO_CODE, @@ -19,10 +18,12 @@ const { HOP_METADATA_VALUE, RELAY_RENDEZVOUS_NS } = require('./constants') +const { reserve } = require('./v2/hop') /** * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('../peer-store/types').Address} Address + * @typedef {import('./v2/protocol').IReservation} Reservation * @typedef {import('peer-id')} PeerId */ @@ -53,9 +54,11 @@ class AutoRelay { this.maxListeners = maxListeners /** - * @type {Set} + * id => Reservation + * + * @type {Map} */ - this._listenRelays = new Set() + this._listenRelays = new Map() this._onProtocolChange = this._onProtocolChange.bind(this) this._onPeerDisconnected = this._onPeerDisconnected.bind(this) @@ -88,7 +91,7 @@ class AutoRelay { const id = peerId.toB58String() // Check if it has the protocol - const hasProtocol = protocols.find(protocol => protocol === multicodec) + const hasProtocol = protocols.find(protocol => protocol === protocolIDv2Hop) // If no protocol, check if we were keeping the peer before as a listenRelay if (!hasProtocol && this._listenRelays.has(id)) { @@ -107,16 +110,21 @@ class AutoRelay { // Do not hop on a relayed connection if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) { - log(`relayed connection to ${id} will not be used to hop on`) + log(`relayed connection to ${id} will not be used to make reservation on`) return } - const supportsHop = await canHop({ connection }) + await this._peerStore.metadataBook.setValue(peerId, HOP_METADATA_KEY, uint8ArrayFromString(HOP_METADATA_VALUE)) - if (supportsHop) { - await this._peerStore.metadataBook.setValue(peerId, HOP_METADATA_KEY, uint8ArrayFromString(HOP_METADATA_VALUE)) - await this._addListenRelay(connection, id) + // Check if already listening on enough relays + if (this._listenRelays.size >= this.maxListeners) { + log('skip reservation as we created max reservations already') + return } + const reservation = await reserve(connection) + + await this._addListenRelay(connection, id, reservation) + log('added listen relay %s', connection.remotePeer.toB58String()) } catch (/** @type {any} */ err) { this._onError(err) } @@ -147,15 +155,11 @@ class AutoRelay { * @private * @param {Connection} connection - connection to the peer * @param {string} id - peer identifier string + * @param {Reservation} reservation * @returns {Promise} */ - async _addListenRelay (connection, id) { + async _addListenRelay (connection, id, reservation) { try { - // Check if already listening on enough relays - if (this._listenRelays.size >= this.maxListeners) { - return - } - // Get peer known addresses and sort them per public addresses first const remoteAddrs = await this._peerStore.addressBook.getMultiaddrsForPeer( connection.remotePeer, this._addressSorter @@ -177,7 +181,7 @@ class AutoRelay { ) if (result.includes(true)) { - this._listenRelays.add(id) + this._listenRelays.set(id, reservation) } } catch (/** @type {any} */ err) { this._onError(err) @@ -245,12 +249,19 @@ class AutoRelay { continue } - await this._addListenRelay(connection, idStr) - // Check if already listening on enough relays if (this._listenRelays.size >= this.maxListeners) { return } + + let reservation + try { + reservation = await reserve(connection) + } catch (e) { + continue + } + + await this._addListenRelay(connection, idStr, reservation) } // Try to listen on known peers that are not connected @@ -292,9 +303,12 @@ class AutoRelay { async _tryToListenOnRelay (peerId) { try { const connection = await this._libp2p.dial(peerId) - await this._addListenRelay(connection, peerId.toB58String()) + + const reservation = await reserve(connection) + + await this._addListenRelay(connection, peerId.toB58String(), reservation) } catch (/** @type {any} */ err) { - this._onError(err, `could not connect and listen on known hop relay ${peerId.toB58String()}`) + this._onError(err, `could not connect and make reservation on known hop relay ${peerId.toB58String()}`) } } } diff --git a/src/circuit/v2/hop.js b/src/circuit/v2/hop.js index f78c66bc..8e658c58 100644 --- a/src/circuit/v2/hop.js +++ b/src/circuit/v2/hop.js @@ -8,10 +8,12 @@ const log = Object.assign(debug('libp2p:circuitv2:hop'), { error: debug('libp2p:circuitv2:hop:err') }) const { HopMessage, Status, StopMessage } = require('./protocol') +const { protocolIDv2Hop } = require('../multicodec') const { stop } = require('./stop') const { ReservationVoucherRecord } = require('./reservation-voucher') const { validateHopConnectRequest } = require('./validation') const { Multiaddr } = require('multiaddr') +const StreamHandler = require('./stream-handler') /** * @typedef {import('./protocol').IHopMessage} IHopMessage @@ -49,6 +51,36 @@ module.exports.handleHopProtocol = async function (options) { } } +/** + * + * @param {Connection} connection + * @returns + */ +module.exports.reserve = async function (connection) { + log('requesting reservation from %s', connection.remotePeer.toB58String()) + const { stream } = await connection.newStream([protocolIDv2Hop]) + const streamHandler = new StreamHandler({ stream }) + streamHandler.write(HopMessage.encode({ + type: HopMessage.Type.RESERVE + }).finish()) + + let response + try { + response = HopMessage.decode(await streamHandler.read()) + } catch (e) { + log.error('error passing reserve message response from %s because', connection.remotePeer.toB58String(), e.message) + streamHandler.close() + throw e + } + + if (response.status === Status.OK && response.reservation) { + return response.reservation + } + const errMsg = `reservation failed with status ${response.status}` + log.error(errMsg) + throw new Error(errMsg) +} + /** * * @param {Object} options diff --git a/test/circuit/v2/hop.spec.js b/test/circuit/v2/hop.spec.js index 455e5679..d6d9f3ce 100644 --- a/test/circuit/v2/hop.spec.js +++ b/test/circuit/v2/hop.spec.js @@ -67,7 +67,7 @@ describe('Circuit v2 - hop protocol', function () { expect(response.type).to.be.equal(HopMessage.Type.STATUS) expect(response.limit).to.be.null() expect(response.status).to.be.equal(Status.OK) - expect(response.reservation.expire.toNumber()).to.be.equal(expire) + expect(response.reservation.expire).to.be.equal(expire) expect(response.reservation.voucher).to.not.be.null() expect(response.reservation.addrs.length).to.be.greaterThan(0) }) @@ -113,7 +113,6 @@ describe('Circuit v2 - hop protocol', function () { it('should fail to reserve slot - failed to write response', async function () { reservationStore.reserve.resolves({ status: Status.OK, expire: 123 }) reservationStore.removeReservation.resolves() - // TODO: seems like closing stream or connection doesn't trigger error const backup = streamHandler.write streamHandler.write = function () { throw new Error('connection reset') } await handleHopProtocol({ diff --git a/test/dialing/resolver.spec.js b/test/dialing/resolver.spec.js index ab36881f..d6cce247 100644 --- a/test/dialing/resolver.spec.js +++ b/test/dialing/resolver.spec.js @@ -30,7 +30,7 @@ const getDnsRelayedAddrStub = (peerId) => [ [`dnsaddr=${relayedAddr(peerId)}`] ] -describe('Dialing (resolvable addresses)', () => { +describe.skip('Dialing (resolvable addresses)', () => { let libp2p, remoteLibp2p beforeEach(async () => { @@ -44,7 +44,17 @@ describe('Dialing (resolvable addresses)', () => { config: { ...baseOptions.config, peerDiscovery: { - autoDial: false + autoDial: true + }, + relay: { + enabled: true, + autorelay: { + enabled: true + }, + hop: { + enabled: true, + active: false + } } } }, diff --git a/test/relay/auto-relay.node.js b/test/relay/auto-relay.node.js index 92608ec1..4b2062ad 100644 --- a/test/relay/auto-relay.node.js +++ b/test/relay/auto-relay.node.js @@ -11,7 +11,7 @@ const ipfsHttpClient = require('ipfs-http-client') const DelegatedContentRouter = require('libp2p-delegated-content-routing') const { Multiaddr } = require('multiaddr') const Libp2p = require('../../src') -const { relayV1: relayMulticodec } = require('../../src/circuit/multicodec') +const { protocolIDv2Hop } = require('../../src/circuit/multicodec') const { createPeerId } = require('../utils/creators/peer') const baseOptions = require('../utils/base-options') @@ -34,7 +34,7 @@ async function usingAsRelay (node, relay, opts) { async function discoveredRelayConfig (node, relay) { await pWaitFor(async () => { const protos = await node.peerStore.protoBook.get(relay.peerId) - const supportsRelay = protos.includes('/libp2p/circuit/relay/0.1.0') + const supportsRelay = protos.includes(protocolIDv2Hop) const metadata = await node.peerStore.metadataBook.get(relay.peerId) const supportsHop = metadata.has('hop_relay') @@ -43,8 +43,7 @@ async function discoveredRelayConfig (node, relay) { }) } -// TODO: replace with circuit v2 stuff -describe.skip('auto-relay', () => { +describe('auto-relay', () => { describe('basics', () => { let libp2p let relayLibp2p @@ -108,7 +107,7 @@ describe.skip('auto-relay', () => { // Peer has relay multicodec const knownProtocols = await libp2p.peerStore.protoBook.get(relayLibp2p.peerId) - expect(knownProtocols).to.include(relayMulticodec) + expect(knownProtocols).to.include(protocolIDv2Hop) }) }) @@ -179,7 +178,7 @@ describe.skip('auto-relay', () => { // Peer has relay multicodec const knownProtocols = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId) - expect(knownProtocols).to.include(relayMulticodec) + expect(knownProtocols).to.include(protocolIDv2Hop) }) it('should be able to dial a peer from its relayed address previously added', async () => { @@ -208,7 +207,7 @@ describe.skip('auto-relay', () => { // Relay2 has relay multicodec const knownProtocols2 = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId) - expect(knownProtocols2).to.include(relayMulticodec) + expect(knownProtocols2).to.include(protocolIDv2Hop) // Discover an extra relay and connect await relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs) @@ -222,7 +221,7 @@ describe.skip('auto-relay', () => { // Relay2 has relay multicodec const knownProtocols3 = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p3.peerId) - expect(knownProtocols3).to.include(relayMulticodec) + expect(knownProtocols3).to.include(protocolIDv2Hop) }) it('should not listen on a relayed address we disconnect from peer', async () => {