diff --git a/src/circuit/transport.js b/src/circuit/transport.js index f0e7e18e..4be449d6 100644 --- a/src/circuit/transport.js +++ b/src/circuit/transport.js @@ -14,11 +14,15 @@ const { codes } = require('../errors') const toConnection = require('libp2p-utils/src/stream-to-ma-conn') -const { relayV1: multicodec } = require('./multicodec') +const { relayV1: protocolIDv1, protocolIDv2Hop, protocolIDv2Stop } = require('./multicodec') const createListener = require('./listener') const { handleCanHop, handleHop, hop } = require('./v1/hop') -const { handleStop } = require('./v1/stop') +const { handleStop: handleStopV1 } = require('./v1/stop') const StreamHandler = require('./v1/stream-handler') +const StreamHandlerV2 = require('./v2/stream-handler') +const { handleHopProtocol } = require('./v2/hop') +const { handleStop: handleStopV2 } = require('./v2/stop') +const { Status, HopMessage, StopMessage, Peer } = require('./v2/protocol') const transportSymbol = Symbol.for('@libp2p/js-libp2p-circuit/circuit') @@ -45,7 +49,9 @@ class Circuit { this._libp2p = libp2p this.peerId = libp2p.peerId - this._registrar.handle(multicodec, this._onProtocol.bind(this)) + this._registrar.handle(protocolIDv1, this._onV1Protocol.bind(this)) + this._registrar.handle(protocolIDv2Hop, this._onV2ProtocolHop.bind(this)) + this._registrar.handle(protocolIDv2Stop, this._onV2ProtocolStop.bind(this)) } /** @@ -53,7 +59,7 @@ class Circuit { * @param {Connection} props.connection * @param {MuxedStream} props.stream */ - async _onProtocol ({ connection, stream }) { + async _onV1Protocol ({ connection, stream }) { /** @type {import('./v1/stream-handler')} */ const streamHandler = new StreamHandler({ stream }) const request = await streamHandler.read() @@ -83,7 +89,7 @@ class Circuit { } case CircuitPB.Type.STOP: { log('received STOP request from %s', connection.remotePeer.toB58String()) - virtualConnection = await handleStop({ + virtualConnection = await handleStopV1({ connection, request, streamHandler @@ -114,6 +120,77 @@ class Circuit { } } + /** + * As a relay it handles hop connect and reserve request + * + * @param {Object} props + * @param {Connection} props.connection + * @param {MuxedStream} props.stream + */ + async _onV2ProtocolHop ({ connection, stream }) { + log('received circuit v2 hop protocol stream from %s', connection.remotePeer.toB58String()) + const streamHandler = new StreamHandlerV2({ stream }) + const request = HopMessage.decode(await streamHandler.read()) + + if (!request) { + return + } + + await handleHopProtocol({ + connection, + streamHandler, + circuit: this, + relayPeer: this._libp2p.peerId, + relayAddrs: this._libp2p.multiaddrs, + // TODO: replace with real reservation store + reservationStore: { + reserve: async function () { return { status: Status.OK, expire: (new Date().getTime() / 1000 + 21600) } }, + hasReservation: async function () { return true }, + removeReservation: async function () { } + }, + request, + limit: null, + acl: null + }) + } + + /** + * As a client this is used to + * + * @param {Object} props + * @param {Connection} props.connection + * @param {MuxedStream} props.stream + */ + async _onV2ProtocolStop ({ connection, stream }) { + const streamHandler = new StreamHandlerV2({ stream }) + const request = StopMessage.decode(await streamHandler.read()) + log('received circuit v2 stop protocol request from %s', connection.remotePeer.toB58String()) + if (!request) { + return + } + + const mStream = await handleStopV2({ + connection, + streamHandler, + request + }) + + if (mStream) { + // @ts-ignore dst peer will not be undefined + const remoteAddr = new Multiaddr(request.peer.addrs[0]) + const localAddr = this._libp2p.addressManager.getListenAddrs()[0] + const maConn = toConnection({ + stream: mStream, + remoteAddr, + localAddr + }) + + const conn = await this._upgrader.upgradeInbound(maConn) + log('%s connection %s upgraded', 'inbound', maConn.remoteAddr) + this.handler && this.handler(conn) + } + } + /** * Dial a peer over a relay * @@ -146,9 +223,49 @@ class Circuit { disconnectOnFailure = true } + const stream = await relayConnection.newStream([protocolIDv2Hop, protocolIDv1]) + + switch (stream.protocol) { + case protocolIDv1: return await this.connectV1({ + stream: stream.stream, + connection: relayConnection, + destinationPeer, + destinationAddr, + relayAddr, + ma, + disconnectOnFailure + }) + case protocolIDv2Hop: return await this.connectV2({ + stream: stream.stream, + connection: relayConnection, + destinationPeer, + destinationAddr, + relayAddr, + ma, + disconnectOnFailure + }) + default: + stream.stream.reset() + throw new Error('Unexpected stream protocol') + } + } + + /** + * + * @param {Object} params + * @param {MuxedStream} params.stream + * @param {Connection} params.connection + * @param {PeerId} params.destinationPeer + * @param {Multiaddr} params.destinationAddr + * @param {Multiaddr} params.relayAddr + * @param {Multiaddr} params.ma + * @param {boolean} params.disconnectOnFailure + * @returns {Promise} + */ + async connectV1 ({ stream, connection, destinationPeer, destinationAddr, relayAddr, ma, disconnectOnFailure }) { try { const virtualConnection = await hop({ - connection: relayConnection, + stream, request: { type: CircuitPB.Type.HOP, srcPeer: { @@ -173,7 +290,54 @@ class Circuit { return this._upgrader.upgradeOutbound(maConn) } catch (/** @type {any} */ err) { log.error('Circuit relay dial failed', err) - disconnectOnFailure && await relayConnection.close() + disconnectOnFailure && await connection.close() + throw err + } + } + + /** + * + * @param {Object} params + * @param {MuxedStream} params.stream + * @param {Connection} params.connection + * @param {PeerId} params.destinationPeer + * @param {Multiaddr} params.destinationAddr + * @param {Multiaddr} params.relayAddr + * @param {Multiaddr} params.ma + * @param {boolean} params.disconnectOnFailure + * @returns {Promise} + */ + async connectV2 ({ stream, connection, destinationPeer, destinationAddr, relayAddr, ma, disconnectOnFailure }) { + try { + const streamHandler = new StreamHandlerV2({ stream }) + streamHandler.write(HopMessage.encode({ + type: HopMessage.Type.CONNECT, + peer: { + id: destinationPeer.toBytes(), + addrs: [new Multiaddr(destinationAddr).bytes] + } + }).finish()) + + const status = HopMessage.decode(await streamHandler.read()) + if (status.status !== Status.OK) { + throw new Error('failed to connect via realy with status ' + status.status) + } + + // TODO: do something with limit and transient connection + + let localAddr = connection.localAddr ?? relayAddr + localAddr = localAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toB58String()}`) + const maConn = toConnection({ + stream: streamHandler.rest(), + remoteAddr: ma, + localAddr + }) + log('new outbound connection %s', maConn.remoteAddr) + const conn = await this._upgrader.upgradeOutbound(maConn) + return conn + } catch (/** @type {any} */ err) { + log.error('Circuit relay dial failed', err) + disconnectOnFailure && await connection.close() throw err } } diff --git a/src/circuit/v1/hop.js b/src/circuit/v1/hop.js index 68f12980..321caa62 100644 --- a/src/circuit/v1/hop.js +++ b/src/circuit/v1/hop.js @@ -117,16 +117,14 @@ async function handleHop ({ * peer. A new, virtual, connection will be created between the two via the relay. * * @param {object} options - * @param {Connection} options.connection - Connection to the relay + * @param {MuxedStream} options.stream - Stream to the relay * @param {ICircuitRelay} options.request * @returns {Promise} */ async function hop ({ - connection, + stream, request }) { - // Create a new stream to the relay - const { stream } = await connection.newStream([multicodec.relayV1]) // Send the HOP request const streamHandler = new StreamHandler({ stream }) streamHandler.write(request) diff --git a/src/circuit/v2/hop.js b/src/circuit/v2/hop.js index 4dea71da..2f124d5c 100644 --- a/src/circuit/v2/hop.js +++ b/src/circuit/v2/hop.js @@ -8,9 +8,11 @@ const multicodec = require('../multicodec') const log = Object.assign(debug('libp2p:circuitv2:hop'), { error: debug('libp2p:circuitv2:hop:err') }) -const { HopMessage, Status } = require('./protocol') +const { HopMessage, Status, StopMessage } = require('./protocol') +const { stop } = require('./stop') const { ReservationVoucherRecord } = require('./reservation-voucher') const { validateHopConnectRequest } = require('./validation') +const { Multiaddr } = require('multiaddr') /** * @typedef {import('./protocol').IHopMessage} IHopMessage @@ -36,17 +38,19 @@ const { validateHopConnectRequest } = require('./validation') * @param {ILimit|null} options.limit * @param {Acl?} options.acl * @param {ReservationStore} options.reservationStore + * @returns {Promise} */ module.exports.handleHopProtocol = async function (options) { switch (options.request.type) { case HopMessage.Type.RESERVE: await handleReserve(options); break - case HopMessage.Type.CONNECT: await handleConnect(options); break + case HopMessage.Type.CONNECT: return await handleConnect(options) default: { log.error('invalid hop request type %s via peer %s', options.request.type, options.connection.remotePeer.toB58String()) writeErrorResponse(options.streamHandler, Status.MALFORMED_MESSAGE) options.streamHandler.close() } } + return null } /** @@ -105,6 +109,7 @@ async function handleReserve ({ connection, streamHandler, relayPeer, relayAddrs * @param {StreamHandler} options.streamHandler * @param {Transport} options.circuit * @param {Acl?} options.acl + * @returns {Promise} */ async function handleConnect ({ connection, streamHandler, request, reservationStore, circuit, acl }) { log('hop connect request from %s', connection.remotePeer.toB58String()) @@ -112,9 +117,11 @@ async function handleConnect ({ connection, streamHandler, request, reservationS try { validateHopConnectRequest(request, streamHandler) } catch (/** @type {any} */ err) { - return log.error('invalid hop connect request via peer %s', connection.remotePeer.toB58String(), err) + log.error('invalid hop connect request via peer %s', connection.remotePeer.toB58String(), err) + throw err } + // @ts-ignore peer is defined at this point const dstPeer = new PeerId(request.peer.id) if (acl && acl.allowConnect) { @@ -132,20 +139,37 @@ async function handleConnect ({ connection, streamHandler, request, reservationS const destinationConnection = circuit._connectionManager.get(dstPeer) if (!destinationConnection) { - log('hop connect denied for %s as there is no destincation connection', connection.remotePeer.toB58String()) + log('hop connect denied for %s as there is no destination connection', connection.remotePeer.toB58String()) writeErrorResponse(streamHandler, Status.NO_RESERVATION) } log('hop connect request from %s to %s is valid', connection.remotePeer.toB58String(), dstPeer.toB58String()) + const destinationStream = await stop({ + connection: destinationConnection, + request: { + type: StopMessage.Type.CONNECT, + peer: { + id: connection.remotePeer.id, + addrs: [new Multiaddr(connection.remoteAddr).bytes] + } + } + }) + + if (!destinationStream) { + log.error('failed to open stream to destination peer %s', destinationConnection?.remotePeer.toB58String()) + writeErrorResponse(streamHandler, Status.CONNECTION_FAILED) + throw new Error('failed to open stream to destination peer') + } + writeResponse(streamHandler, { type: HopMessage.Type.STATUS, status: Status.OK }) const sourceStream = streamHandler.rest() - + log('connection to destination established, short circuiting streams...') // Short circuit the two streams to create the relayed connection - pipe([ + return pipe([ sourceStream, - destinationConnection?.newStream([multicodec.protocolIDv2Stop]), + destinationStream, sourceStream ]) } diff --git a/src/circuit/v2/stream-handler.js b/src/circuit/v2/stream-handler.js index a1b9db6c..d6f63e51 100644 --- a/src/circuit/v2/stream-handler.js +++ b/src/circuit/v2/stream-handler.js @@ -86,4 +86,5 @@ class StreamHandler { } } +module.exports.StreamHandler = StreamHandler module.exports = StreamHandler diff --git a/test/relay/relay.node.js b/test/relay/relay.node.js index e9c12078..9b721793 100644 --- a/test/relay/relay.node.js +++ b/test/relay/relay.node.js @@ -50,10 +50,16 @@ describe('Dialing (via relay, TCP)', () => { return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop())) }) - it('should be able to connect to a peer over a relay with active connections', async () => { + it.only('should be able to connect to a peer over a relay with active connections', async () => { const relayAddr = relayLibp2p.transportManager.getAddrs()[0] const relayIdString = relayLibp2p.peerId.toB58String() + console.log({ + source: srcLibp2p.peerId.toB58String(), + relay: relayLibp2p.peerId.toB58String(), + destination: dstLibp2p.peerId.toB58String() + }) + const dialAddr = relayAddr .encapsulate(`/p2p/${relayIdString}`) .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`) @@ -63,7 +69,6 @@ describe('Dialing (via relay, TCP)', () => { await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs()) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) - const connection = await srcLibp2p.dial(dialAddr) expect(connection).to.exist() expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes())