From 4866b279e07855e3daa8e46b15ad98bb677ae0b5 Mon Sep 17 00:00:00 2001 From: Marin Date: Mon, 21 Mar 2022 19:07:19 +0100 Subject: [PATCH] feat: add reservation store --- src/circuit/transport.js | 11 ++-- src/circuit/v2/hop.js | 2 +- src/circuit/v2/reservation-store.js | 62 +++++++++++++++++++++++ src/index.js | 1 + test/circuit/v2/reservation-store.spec.js | 47 +++++++++++++++++ test/relay/relay.node.js | 20 +++++++- 6 files changed, 134 insertions(+), 9 deletions(-) create mode 100644 src/circuit/v2/reservation-store.js create mode 100644 test/circuit/v2/reservation-store.spec.js diff --git a/src/circuit/transport.js b/src/circuit/transport.js index afc18f29..e56ede12 100644 --- a/src/circuit/transport.js +++ b/src/circuit/transport.js @@ -24,6 +24,7 @@ const { handleHopProtocol } = require('./v2/hop') const { handleStop: handleStopV2 } = require('./v2/stop') const { Status, HopMessage, StopMessage } = require('./v2/protocol') const createError = require('err-code') +const ReservationStore = require('./v2/reservation-store') const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit') @@ -49,6 +50,7 @@ class Circuit { this._options = libp2p._config.relay this._libp2p = libp2p this.peerId = libp2p.peerId + this._reservationStore = new ReservationStore(this._options.reservationLimit) this._registrar.handle(protocolIDv1, this._onV1Protocol.bind(this)) this._registrar.handle(protocolIDv2Hop, this._onV2ProtocolHop.bind(this)) @@ -143,12 +145,7 @@ class Circuit { circuit: this, relayPeer: this._libp2p.peerId, relayAddrs: this._libp2p.multiaddrs, - // TODO: replace with real reservation store - reservationStore: { - reserve: async function () { return { status: Status.OK, expire: (new Date().getTime() / 1000 + 21600) } }, - hasReservation: async function () { return true }, - removeReservation: async function () { } - }, + reservationStore: this._reservationStore, request, limit: null, acl: null @@ -321,7 +318,7 @@ class Circuit { const status = HopMessage.decode(await streamHandler.read()) if (status.status !== Status.OK) { - throw createError(new Error('failed to connect via realy with status ' + status.status), codes.ERR_HOP_REQUEST_FAILED) + throw createError(new Error('failed to connect via relay with status ' + status.status), codes.ERR_HOP_REQUEST_FAILED) } // TODO: do something with limit and transient connection diff --git a/src/circuit/v2/hop.js b/src/circuit/v2/hop.js index 80318bf3..f78c66bc 100644 --- a/src/circuit/v2/hop.js +++ b/src/circuit/v2/hop.js @@ -129,7 +129,7 @@ async function handleConnect ({ connection, streamHandler, request, reservationS } } - if (!reservationStore.hasReservation(request.peer)) { + if (!await reservationStore.hasReservation(dstPeer)) { log.error('hop connect denied for %s with status %s', connection.remotePeer.toB58String(), Status.NO_RESERVATION) writeErrorResponse(streamHandler, Status.NO_RESERVATION) return diff --git a/src/circuit/v2/reservation-store.js b/src/circuit/v2/reservation-store.js new file mode 100644 index 00000000..a88de3ce --- /dev/null +++ b/src/circuit/v2/reservation-store.js @@ -0,0 +1,62 @@ +'use strict' + +const { Status } = require('./protocol') + +/** + * @typedef {import('./interfaces').ReservationStore} IReservationStore + * @typedef {import('./interfaces').ReservationStatus} ReservationStatus + * @typedef {import('multiaddr').Multiaddr} Multiaddr + * @typedef {import('peer-id')} PeerId + */ + +/** + * @implements IReservationStore + */ +class ReservationStore { + constructor (limit = 15) { + /** + * PeerId => + */ + this._reservations = new Map() + this._limit = limit + } + + /** + * @typedef {Object} Result + * @property {ReservationStatus} status + * @property {number|undefined} expire + */ + + /** + * + * @param {PeerId} peer + * @param {Multiaddr} addr + * @returns {Promise} + */ + async reserve (peer, addr) { + if (this._reservations.size >= this._limit && !this._reservations.has(peer.toB58String())) { + return { status: Status.RESERVATION_REFUSED, expire: undefined } + } + const expire = new Date() + this._reservations.set(peer.toB58String(), { addr, expire }) + return { status: Status.OK, expire: expire.getTime() } + } + + /** + * @param {PeerId} peer + */ + async removeReservation (peer) { + this._reservations.delete(peer.toB58String()) + } + + /** + * + * @param {PeerId} dst + * @returns {Promise} + */ + async hasReservation (dst) { + return this._reservations.has(dst.toB58String()) + } +} + +module.exports = ReservationStore diff --git a/src/index.js b/src/index.js index f5e0f6c0..6921bbed 100644 --- a/src/index.js +++ b/src/index.js @@ -83,6 +83,7 @@ const { updateSelfPeerRecord } = require('./record/utils') * @property {import('./circuit').RelayAdvertiseOptions} [advertise] * @property {import('./circuit').HopOptions} [hop] * @property {import('./circuit').AutoRelayOptions} [autoRelay] + * @property {number} [reservationLimit] * * @typedef {Object} Libp2pConfig * @property {DhtOptions} [dht] dht module options diff --git a/test/circuit/v2/reservation-store.spec.js b/test/circuit/v2/reservation-store.spec.js new file mode 100644 index 00000000..4d4f6fb7 --- /dev/null +++ b/test/circuit/v2/reservation-store.spec.js @@ -0,0 +1,47 @@ +'use strict' + +const { expect } = require('aegir/utils/chai') +const { Multiaddr } = require('multiaddr') +const PeerId = require('peer-id') +const { Status } = require('../../../src/circuit/v2/protocol') +const ReservationStore = require('../../../src/circuit/v2/reservation-store') + +/* eslint-env mocha */ + +describe('Circuit v2 - reservation store', function () { + it('should add reservation', async function () { + const store = new ReservationStore(2) + const peer = await PeerId.create() + const result = await store.reserve(peer, new Multiaddr()) + expect(result.status).to.equal(Status.OK) + expect(result.expire).to.not.be.undefined() + expect(await store.hasReservation(peer)).to.be.true() + }) + it('should add reservation if peer already has reservation', async function () { + const store = new ReservationStore(1) + const peer = await PeerId.create() + await store.reserve(peer, new Multiaddr()) + const result = await store.reserve(peer, new Multiaddr()) + expect(result.status).to.equal(Status.OK) + expect(result.expire).to.not.be.undefined() + expect(await store.hasReservation(peer)).to.be.true() + }) + + it('should fail to add reservation on exceeding limit', async function () { + const store = new ReservationStore(0) + const peer = await PeerId.create() + const result = await store.reserve(peer, new Multiaddr()) + expect(result.status).to.equal(Status.RESERVATION_REFUSED) + }) + + it('should remove reservation', async function () { + const store = new ReservationStore(10) + const peer = await PeerId.create() + const result = await store.reserve(peer, new Multiaddr()) + expect(result.status).to.equal(Status.OK) + expect(await store.hasReservation(peer)).to.be.true() + await store.removeReservation(peer) + expect(await store.hasReservation(peer)).to.be.false() + await store.removeReservation(peer) + }) +}) diff --git a/test/relay/relay.node.js b/test/relay/relay.node.js index ca8aa40b..d6b99b38 100644 --- a/test/relay/relay.node.js +++ b/test/relay/relay.node.js @@ -60,7 +60,7 @@ describe('Dialing (via relay, TCP)', () => { const tcpAddrs = dstLibp2p.transportManager.getAddrs() sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) - + await relayLibp2p.transportManager._transports.get('Circuit')._reservationStore.reserve(dstLibp2p.peerId, new Multiaddr()) await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs()) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) const connection = await srcLibp2p.dial(dialAddr) @@ -86,6 +86,24 @@ describe('Dialing (via relay, TCP)', () => { expect(output.slice()).to.eql(input) }) + it('should fail to connect without reservation', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerId.toB58String() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId}`) + + const tcpAddrs = dstLibp2p.transportManager.getAddrs() + sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) + + await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs()) + expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) + await expect(srcLibp2p.dial(dialAddr)) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) + }) + it('should fail to connect to a peer over a relay with inactive connections', async () => { const relayAddr = relayLibp2p.transportManager.getAddrs()[0] const relayIdString = relayLibp2p.peerId.toB58String()