feat: add reservation store

This commit is contained in:
Marin 2022-03-21 19:07:19 +01:00
parent 84e38d7e95
commit 4866b279e0
6 changed files with 134 additions and 9 deletions

View File

@ -24,6 +24,7 @@ const { handleHopProtocol } = require('./v2/hop')
const { handleStop: handleStopV2 } = require('./v2/stop')
const { Status, HopMessage, StopMessage } = require('./v2/protocol')
const createError = require('err-code')
const ReservationStore = require('./v2/reservation-store')
const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit')
@ -49,6 +50,7 @@ class Circuit {
this._options = libp2p._config.relay
this._libp2p = libp2p
this.peerId = libp2p.peerId
this._reservationStore = new ReservationStore(this._options.reservationLimit)
this._registrar.handle(protocolIDv1, this._onV1Protocol.bind(this))
this._registrar.handle(protocolIDv2Hop, this._onV2ProtocolHop.bind(this))
@ -143,12 +145,7 @@ class Circuit {
circuit: this,
relayPeer: this._libp2p.peerId,
relayAddrs: this._libp2p.multiaddrs,
// TODO: replace with real reservation store
reservationStore: {
reserve: async function () { return { status: Status.OK, expire: (new Date().getTime() / 1000 + 21600) } },
hasReservation: async function () { return true },
removeReservation: async function () { }
},
reservationStore: this._reservationStore,
request,
limit: null,
acl: null
@ -321,7 +318,7 @@ class Circuit {
const status = HopMessage.decode(await streamHandler.read())
if (status.status !== Status.OK) {
throw createError(new Error('failed to connect via realy with status ' + status.status), codes.ERR_HOP_REQUEST_FAILED)
throw createError(new Error('failed to connect via relay with status ' + status.status), codes.ERR_HOP_REQUEST_FAILED)
}
// TODO: do something with limit and transient connection

View File

@ -129,7 +129,7 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
}
}
if (!reservationStore.hasReservation(request.peer)) {
if (!await reservationStore.hasReservation(dstPeer)) {
log.error('hop connect denied for %s with status %s', connection.remotePeer.toB58String(), Status.NO_RESERVATION)
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
return

View File

@ -0,0 +1,62 @@
'use strict'
const { Status } = require('./protocol')
/**
* @typedef {import('./interfaces').ReservationStore} IReservationStore
* @typedef {import('./interfaces').ReservationStatus} ReservationStatus
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('peer-id')} PeerId
*/
/**
* @implements IReservationStore
*/
class ReservationStore {
constructor (limit = 15) {
/**
* PeerId =>
*/
this._reservations = new Map()
this._limit = limit
}
/**
* @typedef {Object} Result
* @property {ReservationStatus} status
* @property {number|undefined} expire
*/
/**
*
* @param {PeerId} peer
* @param {Multiaddr} addr
* @returns {Promise<Result>}
*/
async reserve (peer, addr) {
if (this._reservations.size >= this._limit && !this._reservations.has(peer.toB58String())) {
return { status: Status.RESERVATION_REFUSED, expire: undefined }
}
const expire = new Date()
this._reservations.set(peer.toB58String(), { addr, expire })
return { status: Status.OK, expire: expire.getTime() }
}
/**
* @param {PeerId} peer
*/
async removeReservation (peer) {
this._reservations.delete(peer.toB58String())
}
/**
*
* @param {PeerId} dst
* @returns {Promise<boolean>}
*/
async hasReservation (dst) {
return this._reservations.has(dst.toB58String())
}
}
module.exports = ReservationStore

View File

@ -83,6 +83,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {import('./circuit').RelayAdvertiseOptions} [advertise]
* @property {import('./circuit').HopOptions} [hop]
* @property {import('./circuit').AutoRelayOptions} [autoRelay]
* @property {number} [reservationLimit]
*
* @typedef {Object} Libp2pConfig
* @property {DhtOptions} [dht] dht module options

View File

@ -0,0 +1,47 @@
'use strict'
const { expect } = require('aegir/utils/chai')
const { Multiaddr } = require('multiaddr')
const PeerId = require('peer-id')
const { Status } = require('../../../src/circuit/v2/protocol')
const ReservationStore = require('../../../src/circuit/v2/reservation-store')
/* eslint-env mocha */
describe('Circuit v2 - reservation store', function () {
it('should add reservation', async function () {
const store = new ReservationStore(2)
const peer = await PeerId.create()
const result = await store.reserve(peer, new Multiaddr())
expect(result.status).to.equal(Status.OK)
expect(result.expire).to.not.be.undefined()
expect(await store.hasReservation(peer)).to.be.true()
})
it('should add reservation if peer already has reservation', async function () {
const store = new ReservationStore(1)
const peer = await PeerId.create()
await store.reserve(peer, new Multiaddr())
const result = await store.reserve(peer, new Multiaddr())
expect(result.status).to.equal(Status.OK)
expect(result.expire).to.not.be.undefined()
expect(await store.hasReservation(peer)).to.be.true()
})
it('should fail to add reservation on exceeding limit', async function () {
const store = new ReservationStore(0)
const peer = await PeerId.create()
const result = await store.reserve(peer, new Multiaddr())
expect(result.status).to.equal(Status.RESERVATION_REFUSED)
})
it('should remove reservation', async function () {
const store = new ReservationStore(10)
const peer = await PeerId.create()
const result = await store.reserve(peer, new Multiaddr())
expect(result.status).to.equal(Status.OK)
expect(await store.hasReservation(peer)).to.be.true()
await store.removeReservation(peer)
expect(await store.hasReservation(peer)).to.be.false()
await store.removeReservation(peer)
})
})

View File

@ -60,7 +60,7 @@ describe('Dialing (via relay, TCP)', () => {
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
await relayLibp2p.transportManager._transports.get('Circuit')._reservationStore.reserve(dstLibp2p.peerId, new Multiaddr())
await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
const connection = await srcLibp2p.dial(dialAddr)
@ -86,6 +86,24 @@ describe('Dialing (via relay, TCP)', () => {
expect(output.slice()).to.eql(input)
})
it('should fail to connect without reservation', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerId.toB58String()
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId}`)
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
await expect(srcLibp2p.dial(dialAddr))
.to.eventually.be.rejectedWith(AggregateError)
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
})
it('should fail to connect to a peer over a relay with inactive connections', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerId.toB58String()