mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
chore: fix hop/stop unit tests
This commit is contained in:
parent
b2be4637e2
commit
abe2b22af6
@ -22,7 +22,7 @@ const StreamHandler = require('./v1/stream-handler')
|
||||
const StreamHandlerV2 = require('./v2/stream-handler')
|
||||
const { handleHopProtocol } = require('./v2/hop')
|
||||
const { handleStop: handleStopV2 } = require('./v2/stop')
|
||||
const { Status, HopMessage, StopMessage, Peer } = require('./v2/protocol')
|
||||
const { Status, HopMessage, StopMessage } = require('./v2/protocol')
|
||||
|
||||
const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit')
|
||||
|
||||
|
@ -4,7 +4,6 @@ const debug = require('debug')
|
||||
const { pipe } = require('it-pipe')
|
||||
const PeerId = require('peer-id')
|
||||
const Envelope = require('../../record/envelope')
|
||||
const multicodec = require('../multicodec')
|
||||
const log = Object.assign(debug('libp2p:circuitv2:hop'), {
|
||||
error: debug('libp2p:circuitv2:hop:err')
|
||||
})
|
||||
@ -19,7 +18,6 @@ const { Multiaddr } = require('multiaddr')
|
||||
* @typedef {import('./protocol').IReservation} IReservation
|
||||
* @typedef {import('./protocol').ILimit} ILimit
|
||||
* @typedef {import('./stream-handler')} StreamHandler
|
||||
* @typedef {import('multiaddr').Multiaddr} Multiaddr
|
||||
* @typedef {import('./interfaces').ReservationStore} ReservationStore
|
||||
* @typedef {import('./interfaces').Acl} Acl
|
||||
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
|
||||
@ -38,19 +36,17 @@ const { Multiaddr } = require('multiaddr')
|
||||
* @param {ILimit|null} options.limit
|
||||
* @param {Acl?} options.acl
|
||||
* @param {ReservationStore} options.reservationStore
|
||||
* @returns {Promise<import('../..').MuxedStream|null>}
|
||||
*/
|
||||
module.exports.handleHopProtocol = async function (options) {
|
||||
switch (options.request.type) {
|
||||
case HopMessage.Type.RESERVE: await handleReserve(options); break
|
||||
case HopMessage.Type.CONNECT: return await handleConnect(options)
|
||||
case HopMessage.Type.CONNECT: await handleConnect(options); break
|
||||
default: {
|
||||
log.error('invalid hop request type %s via peer %s', options.request.type, options.connection.remotePeer.toB58String())
|
||||
writeErrorResponse(options.streamHandler, Status.MALFORMED_MESSAGE)
|
||||
options.streamHandler.close()
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
@ -109,7 +105,6 @@ async function handleReserve ({ connection, streamHandler, relayPeer, relayAddrs
|
||||
* @param {StreamHandler} options.streamHandler
|
||||
* @param {Transport} options.circuit
|
||||
* @param {Acl?} options.acl
|
||||
* @returns {Promise<import('../..').MuxedStream>}
|
||||
*/
|
||||
async function handleConnect ({ connection, streamHandler, request, reservationStore, circuit, acl }) {
|
||||
log('hop connect request from %s', connection.remotePeer.toB58String())
|
||||
@ -118,7 +113,8 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
|
||||
validateHopConnectRequest(request, streamHandler)
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('invalid hop connect request via peer %s', connection.remotePeer.toB58String(), err)
|
||||
throw err
|
||||
writeErrorResponse(streamHandler, Status.MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
// @ts-ignore peer is defined at this point
|
||||
@ -129,18 +125,21 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
|
||||
if (status !== Status.OK) {
|
||||
log.error('hop connect denied for %s with status %s', connection.remotePeer.toB58String(), status)
|
||||
writeErrorResponse(streamHandler, status)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if (!reservationStore.hasReservation(request.peer)) {
|
||||
log.error('hop connect denied for %s with status %s', connection.remotePeer.toB58String(), Status.NO_RESERVATION)
|
||||
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
|
||||
return
|
||||
}
|
||||
|
||||
const destinationConnection = circuit._connectionManager.get(dstPeer)
|
||||
if (!destinationConnection) {
|
||||
log('hop connect denied for %s as there is no destination connection', connection.remotePeer.toB58String())
|
||||
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
|
||||
return
|
||||
}
|
||||
|
||||
log('hop connect request from %s to %s is valid', connection.remotePeer.toB58String(), dstPeer.toB58String())
|
||||
@ -159,7 +158,7 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
|
||||
if (!destinationStream) {
|
||||
log.error('failed to open stream to destination peer %s', destinationConnection?.remotePeer.toB58String())
|
||||
writeErrorResponse(streamHandler, Status.CONNECTION_FAILED)
|
||||
throw new Error('failed to open stream to destination peer')
|
||||
return
|
||||
}
|
||||
|
||||
writeResponse(streamHandler, { type: HopMessage.Type.STATUS, status: Status.OK })
|
||||
|
Loading…
x
Reference in New Issue
Block a user