mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
chore: integrate circuit v2 in transport
This commit is contained in:
parent
7815e44427
commit
b2be4637e2
@ -14,11 +14,15 @@ const { codes } = require('../errors')
|
||||
|
||||
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
|
||||
|
||||
const { relayV1: multicodec } = require('./multicodec')
|
||||
const { relayV1: protocolIDv1, protocolIDv2Hop, protocolIDv2Stop } = require('./multicodec')
|
||||
const createListener = require('./listener')
|
||||
const { handleCanHop, handleHop, hop } = require('./v1/hop')
|
||||
const { handleStop } = require('./v1/stop')
|
||||
const { handleStop: handleStopV1 } = require('./v1/stop')
|
||||
const StreamHandler = require('./v1/stream-handler')
|
||||
const StreamHandlerV2 = require('./v2/stream-handler')
|
||||
const { handleHopProtocol } = require('./v2/hop')
|
||||
const { handleStop: handleStopV2 } = require('./v2/stop')
|
||||
const { Status, HopMessage, StopMessage, Peer } = require('./v2/protocol')
|
||||
|
||||
const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit')
|
||||
|
||||
@ -45,7 +49,9 @@ class Circuit {
|
||||
this._libp2p = libp2p
|
||||
this.peerId = libp2p.peerId
|
||||
|
||||
this._registrar.handle(multicodec, this._onProtocol.bind(this))
|
||||
this._registrar.handle(protocolIDv1, this._onV1Protocol.bind(this))
|
||||
this._registrar.handle(protocolIDv2Hop, this._onV2ProtocolHop.bind(this))
|
||||
this._registrar.handle(protocolIDv2Stop, this._onV2ProtocolStop.bind(this))
|
||||
}
|
||||
|
||||
/**
|
||||
@ -53,7 +59,7 @@ class Circuit {
|
||||
* @param {Connection} props.connection
|
||||
* @param {MuxedStream} props.stream
|
||||
*/
|
||||
async _onProtocol ({ connection, stream }) {
|
||||
async _onV1Protocol ({ connection, stream }) {
|
||||
/** @type {import('./v1/stream-handler')} */
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const request = await streamHandler.read()
|
||||
@ -83,7 +89,7 @@ class Circuit {
|
||||
}
|
||||
case CircuitPB.Type.STOP: {
|
||||
log('received STOP request from %s', connection.remotePeer.toB58String())
|
||||
virtualConnection = await handleStop({
|
||||
virtualConnection = await handleStopV1({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
@ -114,6 +120,77 @@ class Circuit {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As a relay it handles hop connect and reserve request
|
||||
*
|
||||
* @param {Object} props
|
||||
* @param {Connection} props.connection
|
||||
* @param {MuxedStream} props.stream
|
||||
*/
|
||||
async _onV2ProtocolHop ({ connection, stream }) {
|
||||
log('received circuit v2 hop protocol stream from %s', connection.remotePeer.toB58String())
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
const request = HopMessage.decode(await streamHandler.read())
|
||||
|
||||
if (!request) {
|
||||
return
|
||||
}
|
||||
|
||||
await handleHopProtocol({
|
||||
connection,
|
||||
streamHandler,
|
||||
circuit: this,
|
||||
relayPeer: this._libp2p.peerId,
|
||||
relayAddrs: this._libp2p.multiaddrs,
|
||||
// TODO: replace with real reservation store
|
||||
reservationStore: {
|
||||
reserve: async function () { return { status: Status.OK, expire: (new Date().getTime() / 1000 + 21600) } },
|
||||
hasReservation: async function () { return true },
|
||||
removeReservation: async function () { }
|
||||
},
|
||||
request,
|
||||
limit: null,
|
||||
acl: null
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* As a client this is used to
|
||||
*
|
||||
* @param {Object} props
|
||||
* @param {Connection} props.connection
|
||||
* @param {MuxedStream} props.stream
|
||||
*/
|
||||
async _onV2ProtocolStop ({ connection, stream }) {
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
const request = StopMessage.decode(await streamHandler.read())
|
||||
log('received circuit v2 stop protocol request from %s', connection.remotePeer.toB58String())
|
||||
if (!request) {
|
||||
return
|
||||
}
|
||||
|
||||
const mStream = await handleStopV2({
|
||||
connection,
|
||||
streamHandler,
|
||||
request
|
||||
})
|
||||
|
||||
if (mStream) {
|
||||
// @ts-ignore dst peer will not be undefined
|
||||
const remoteAddr = new Multiaddr(request.peer.addrs[0])
|
||||
const localAddr = this._libp2p.addressManager.getListenAddrs()[0]
|
||||
const maConn = toConnection({
|
||||
stream: mStream,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
|
||||
const conn = await this._upgrader.upgradeInbound(maConn)
|
||||
log('%s connection %s upgraded', 'inbound', maConn.remoteAddr)
|
||||
this.handler && this.handler(conn)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*
|
||||
@ -146,9 +223,49 @@ class Circuit {
|
||||
disconnectOnFailure = true
|
||||
}
|
||||
|
||||
const stream = await relayConnection.newStream([protocolIDv2Hop, protocolIDv1])
|
||||
|
||||
switch (stream.protocol) {
|
||||
case protocolIDv1: return await this.connectV1({
|
||||
stream: stream.stream,
|
||||
connection: relayConnection,
|
||||
destinationPeer,
|
||||
destinationAddr,
|
||||
relayAddr,
|
||||
ma,
|
||||
disconnectOnFailure
|
||||
})
|
||||
case protocolIDv2Hop: return await this.connectV2({
|
||||
stream: stream.stream,
|
||||
connection: relayConnection,
|
||||
destinationPeer,
|
||||
destinationAddr,
|
||||
relayAddr,
|
||||
ma,
|
||||
disconnectOnFailure
|
||||
})
|
||||
default:
|
||||
stream.stream.reset()
|
||||
throw new Error('Unexpected stream protocol')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Object} params
|
||||
* @param {MuxedStream} params.stream
|
||||
* @param {Connection} params.connection
|
||||
* @param {PeerId} params.destinationPeer
|
||||
* @param {Multiaddr} params.destinationAddr
|
||||
* @param {Multiaddr} params.relayAddr
|
||||
* @param {Multiaddr} params.ma
|
||||
* @param {boolean} params.disconnectOnFailure
|
||||
* @returns {Promise<Connection>}
|
||||
*/
|
||||
async connectV1 ({ stream, connection, destinationPeer, destinationAddr, relayAddr, ma, disconnectOnFailure }) {
|
||||
try {
|
||||
const virtualConnection = await hop({
|
||||
connection: relayConnection,
|
||||
stream,
|
||||
request: {
|
||||
type: CircuitPB.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -173,7 +290,54 @@ class Circuit {
|
||||
return this._upgrader.upgradeOutbound(maConn)
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('Circuit relay dial failed', err)
|
||||
disconnectOnFailure && await relayConnection.close()
|
||||
disconnectOnFailure && await connection.close()
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Object} params
|
||||
* @param {MuxedStream} params.stream
|
||||
* @param {Connection} params.connection
|
||||
* @param {PeerId} params.destinationPeer
|
||||
* @param {Multiaddr} params.destinationAddr
|
||||
* @param {Multiaddr} params.relayAddr
|
||||
* @param {Multiaddr} params.ma
|
||||
* @param {boolean} params.disconnectOnFailure
|
||||
* @returns {Promise<Connection>}
|
||||
*/
|
||||
async connectV2 ({ stream, connection, destinationPeer, destinationAddr, relayAddr, ma, disconnectOnFailure }) {
|
||||
try {
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
streamHandler.write(HopMessage.encode({
|
||||
type: HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: destinationPeer.toBytes(),
|
||||
addrs: [new Multiaddr(destinationAddr).bytes]
|
||||
}
|
||||
}).finish())
|
||||
|
||||
const status = HopMessage.decode(await streamHandler.read())
|
||||
if (status.status !== Status.OK) {
|
||||
throw new Error('failed to connect via realy with status ' + status.status)
|
||||
}
|
||||
|
||||
// TODO: do something with limit and transient connection
|
||||
|
||||
let localAddr = connection.localAddr ?? relayAddr
|
||||
localAddr = localAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toB58String()}`)
|
||||
const maConn = toConnection({
|
||||
stream: streamHandler.rest(),
|
||||
remoteAddr: ma,
|
||||
localAddr
|
||||
})
|
||||
log('new outbound connection %s', maConn.remoteAddr)
|
||||
const conn = await this._upgrader.upgradeOutbound(maConn)
|
||||
return conn
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('Circuit relay dial failed', err)
|
||||
disconnectOnFailure && await connection.close()
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
@ -117,16 +117,14 @@ async function handleHop ({
|
||||
* peer. A new, virtual, connection will be created between the two via the relay.
|
||||
*
|
||||
* @param {object} options
|
||||
* @param {Connection} options.connection - Connection to the relay
|
||||
* @param {MuxedStream} options.stream - Stream to the relay
|
||||
* @param {ICircuitRelay} options.request
|
||||
* @returns {Promise<MuxedStream>}
|
||||
*/
|
||||
async function hop ({
|
||||
connection,
|
||||
stream,
|
||||
request
|
||||
}) {
|
||||
// Create a new stream to the relay
|
||||
const { stream } = await connection.newStream([multicodec.relayV1])
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
streamHandler.write(request)
|
||||
|
@ -8,9 +8,11 @@ const multicodec = require('../multicodec')
|
||||
const log = Object.assign(debug('libp2p:circuitv2:hop'), {
|
||||
error: debug('libp2p:circuitv2:hop:err')
|
||||
})
|
||||
const { HopMessage, Status } = require('./protocol')
|
||||
const { HopMessage, Status, StopMessage } = require('./protocol')
|
||||
const { stop } = require('./stop')
|
||||
const { ReservationVoucherRecord } = require('./reservation-voucher')
|
||||
const { validateHopConnectRequest } = require('./validation')
|
||||
const { Multiaddr } = require('multiaddr')
|
||||
|
||||
/**
|
||||
* @typedef {import('./protocol').IHopMessage} IHopMessage
|
||||
@ -36,17 +38,19 @@ const { validateHopConnectRequest } = require('./validation')
|
||||
* @param {ILimit|null} options.limit
|
||||
* @param {Acl?} options.acl
|
||||
* @param {ReservationStore} options.reservationStore
|
||||
* @returns {Promise<import('../..').MuxedStream|null>}
|
||||
*/
|
||||
module.exports.handleHopProtocol = async function (options) {
|
||||
switch (options.request.type) {
|
||||
case HopMessage.Type.RESERVE: await handleReserve(options); break
|
||||
case HopMessage.Type.CONNECT: await handleConnect(options); break
|
||||
case HopMessage.Type.CONNECT: return await handleConnect(options)
|
||||
default: {
|
||||
log.error('invalid hop request type %s via peer %s', options.request.type, options.connection.remotePeer.toB58String())
|
||||
writeErrorResponse(options.streamHandler, Status.MALFORMED_MESSAGE)
|
||||
options.streamHandler.close()
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
@ -105,6 +109,7 @@ async function handleReserve ({ connection, streamHandler, relayPeer, relayAddrs
|
||||
* @param {StreamHandler} options.streamHandler
|
||||
* @param {Transport} options.circuit
|
||||
* @param {Acl?} options.acl
|
||||
* @returns {Promise<import('../..').MuxedStream>}
|
||||
*/
|
||||
async function handleConnect ({ connection, streamHandler, request, reservationStore, circuit, acl }) {
|
||||
log('hop connect request from %s', connection.remotePeer.toB58String())
|
||||
@ -112,9 +117,11 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
|
||||
try {
|
||||
validateHopConnectRequest(request, streamHandler)
|
||||
} catch (/** @type {any} */ err) {
|
||||
return log.error('invalid hop connect request via peer %s', connection.remotePeer.toB58String(), err)
|
||||
log.error('invalid hop connect request via peer %s', connection.remotePeer.toB58String(), err)
|
||||
throw err
|
||||
}
|
||||
|
||||
// @ts-ignore peer is defined at this point
|
||||
const dstPeer = new PeerId(request.peer.id)
|
||||
|
||||
if (acl && acl.allowConnect) {
|
||||
@ -132,20 +139,37 @@ async function handleConnect ({ connection, streamHandler, request, reservationS
|
||||
|
||||
const destinationConnection = circuit._connectionManager.get(dstPeer)
|
||||
if (!destinationConnection) {
|
||||
log('hop connect denied for %s as there is no destincation connection', connection.remotePeer.toB58String())
|
||||
log('hop connect denied for %s as there is no destination connection', connection.remotePeer.toB58String())
|
||||
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
|
||||
}
|
||||
|
||||
log('hop connect request from %s to %s is valid', connection.remotePeer.toB58String(), dstPeer.toB58String())
|
||||
|
||||
const destinationStream = await stop({
|
||||
connection: destinationConnection,
|
||||
request: {
|
||||
type: StopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: connection.remotePeer.id,
|
||||
addrs: [new Multiaddr(connection.remoteAddr).bytes]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if (!destinationStream) {
|
||||
log.error('failed to open stream to destination peer %s', destinationConnection?.remotePeer.toB58String())
|
||||
writeErrorResponse(streamHandler, Status.CONNECTION_FAILED)
|
||||
throw new Error('failed to open stream to destination peer')
|
||||
}
|
||||
|
||||
writeResponse(streamHandler, { type: HopMessage.Type.STATUS, status: Status.OK })
|
||||
|
||||
const sourceStream = streamHandler.rest()
|
||||
|
||||
log('connection to destination established, short circuiting streams...')
|
||||
// Short circuit the two streams to create the relayed connection
|
||||
pipe([
|
||||
return pipe([
|
||||
sourceStream,
|
||||
destinationConnection?.newStream([multicodec.protocolIDv2Stop]),
|
||||
destinationStream,
|
||||
sourceStream
|
||||
])
|
||||
}
|
||||
|
@ -86,4 +86,5 @@ class StreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.StreamHandler = StreamHandler
|
||||
module.exports = StreamHandler
|
||||
|
@ -50,10 +50,16 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop()))
|
||||
})
|
||||
|
||||
it('should be able to connect to a peer over a relay with active connections', async () => {
|
||||
it.only('should be able to connect to a peer over a relay with active connections', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
|
||||
console.log({
|
||||
source: srcLibp2p.peerId.toB58String(),
|
||||
relay: relayLibp2p.peerId.toB58String(),
|
||||
destination: dstLibp2p.peerId.toB58String()
|
||||
})
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`)
|
||||
@ -63,7 +69,6 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
|
||||
await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
|
||||
const connection = await srcLibp2p.dial(dialAddr)
|
||||
expect(connection).to.exist()
|
||||
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes())
|
||||
|
Loading…
x
Reference in New Issue
Block a user