mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-16 17:11:57 +00:00
Compare commits
10 Commits
v0.37.2
...
feat/circu
Author | SHA1 | Date | |
---|---|---|---|
|
6d83603541 | ||
|
f74e4d5d13 | ||
|
04297984d0 | ||
|
29a74c5c36 | ||
|
fa314ebfe0 | ||
|
4866b279e0 | ||
|
84e38d7e95 | ||
|
abe2b22af6 | ||
|
b2be4637e2 | ||
|
7815e44427 |
19
package.json
19
package.json
@@ -78,16 +78,17 @@
|
||||
"scripts": {
|
||||
"lint": "aegir lint",
|
||||
"build": "tsc",
|
||||
"postbuild": "mkdirp dist/src/circuit/pb dist/src/fetch/pb dist/src/identify/pb dist/src/insecure/pb && cp src/circuit/pb/*.js src/circuit/pb/*.d.ts dist/src/circuit/pb && cp src/fetch/pb/*.js src/fetch/pb/*.d.ts dist/src/fetch/pb && cp src/identify/pb/*.js src/identify/pb/*.d.ts dist/src/identify/pb && cp src/insecure/pb/*.js src/insecure/pb/*.d.ts dist/src/insecure/pb",
|
||||
"postbuild": "mkdirp dist/src/circuit/v1/pb dist/src/circuit/v2/pb dist/src/fetch/pb dist/src/identify/pb dist/src/insecure/pb && cp src/circuit/v1/pb/*.js src/circuit/v1/pb/*.d.ts dist/src/circuit/v1/pb && cp src/circuit/v2/pb/*.js src/circuit/v2/pb/*.d.ts dist/src/circuit/v2/pb && cp src/fetch/pb/*.js src/fetch/pb/*.d.ts dist/src/fetch/pb && cp src/identify/pb/*.js src/identify/pb/*.d.ts dist/src/identify/pb && cp src/insecure/pb/*.js src/insecure/pb/*.d.ts dist/src/insecure/pb",
|
||||
"generate": "run-s generate:proto:* generate:proto-types:*",
|
||||
"generate:proto:circuit": "pbjs -t static-module -w es6 -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/protocol/index.js ./src/circuit/protocol/index.proto",
|
||||
"generate:proto:fetch": "pbjs -t static-module -w es6 -r libp2p-fetch --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/fetch/proto.js ./src/fetch/proto.proto",
|
||||
"generate:proto:identify": "pbjs -t static-module -w es6 -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/message.js ./src/identify/message.proto",
|
||||
"generate:proto:plaintext": "pbjs -t static-module -w es6 -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/proto.js ./src/insecure/proto.proto",
|
||||
"generate:proto-types:circuit": "pbts -o src/circuit/protocol/index.d.ts src/circuit/protocol/index.js",
|
||||
"generate:proto-types:fetch": "pbts -o src/fetch/proto.d.ts src/fetch/proto.js",
|
||||
"generate:proto-types:identify": "pbts -o src/identify/message.d.ts src/identify/message.js",
|
||||
"generate:proto-types:plaintext": "pbts -o src/insecure/proto.d.ts src/insecure/proto.js",
|
||||
"generate:proto:fetch": "pbjs -t static-module -w es6 -r libp2p-fetch --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/fetch/pb/proto.js ./src/fetch/pb/proto.proto",
|
||||
"generate:proto:identify": "pbjs -t static-module -w es6 -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/pb/message.js ./src/identify/pb/message.proto",
|
||||
"generate:proto:plaintext": "pbjs -t static-module -w es6 -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/pb/proto.js ./src/insecure/pb/proto.proto",
|
||||
"generate:proto:circuit": "pbjs -t static-module -w es6 -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/v1/pb/index.js ./src/circuit/v1/pb/index.proto",
|
||||
"generate:proto:circuitv2": "pbjs -t static-module -w es6 -r libp2p-circuitv2 --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/v2/pb/index.js ./src/circuit/v2/pb/index.proto && pbts --out src/circuit/v2/pb/index.d.ts src/circuit/v2/pb/index.js",
|
||||
"generate:proto-types:circuit": "pbts -o src/circuit/v1/pb/index.d.ts src/circuit/v1/pb/index.js",
|
||||
"generate:proto-types:fetch": "pbts -o src/fetch/pb/proto.d.ts src/fetch/pb/proto.js",
|
||||
"generate:proto-types:identify": "pbts -o src/identify/pb/message.d.ts src/identify/pb/message.js",
|
||||
"generate:proto-types:plaintext": "pbts -o src/insecure/pb/proto.d.ts src/insecure/pb/proto.js",
|
||||
"pretest": "npm run build",
|
||||
"test": "aegir test",
|
||||
"test:node": "npm run test -- -t node -f \"./dist/test/**/*.{node,spec}.js\" --cov",
|
||||
|
@@ -1,8 +1,8 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { RELAY_CODEC } from './multicodec.js'
|
||||
import { canHop } from './circuit/hop.js'
|
||||
import { RELAY_V1_CODEC } from './multicodec.js'
|
||||
import { canHop } from './v1/hop.js'
|
||||
import { namespaceToCid } from './utils.js'
|
||||
import {
|
||||
CIRCUIT_PROTO_CODE,
|
||||
@@ -68,7 +68,7 @@ export class AutoRelay {
|
||||
const id = peerId.toString()
|
||||
|
||||
// Check if it has the protocol
|
||||
const hasProtocol = protocols.find(protocol => protocol === RELAY_CODEC)
|
||||
const hasProtocol = protocols.find(protocol => protocol === RELAY_V1_CODEC)
|
||||
|
||||
// If no protocol, check if we were keeping the peer before as a listenRelay
|
||||
if (hasProtocol == null) {
|
||||
|
@@ -1,2 +1,4 @@
|
||||
|
||||
export const RELAY_CODEC = '/libp2p/circuit/relay/0.1.0'
|
||||
export const RELAY_V1_CODEC = '/libp2p/circuit/relay/0.1.0'
|
||||
export const protocolIDv2Hop = '/libp2p/circuit/relay/0.2.0/hop'
|
||||
export const protocolIDv2Stop = '/libp2p/circuit/relay/0.2.0/stop'
|
||||
|
@@ -1,33 +1,56 @@
|
||||
import * as CircuitV1 from './v1/pb/index.js'
|
||||
import * as CircuitV2 from './v2/pb/index.js'
|
||||
import { ReservationStore } from './v2/reservation-store.js'
|
||||
import { logger } from '@libp2p/logger'
|
||||
import errCode from 'err-code'
|
||||
import createError from 'err-code'
|
||||
import * as mafmt from '@multiformats/mafmt'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { CircuitRelay as CircuitPB } from './pb/index.js'
|
||||
import { codes } from '../errors.js'
|
||||
import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
|
||||
import { RELAY_CODEC } from './multicodec.js'
|
||||
import { protocolIDv2Hop, RELAY_V1_CODEC } from './multicodec.js'
|
||||
import { createListener } from './listener.js'
|
||||
import { handleCanHop, handleHop, hop } from './circuit/hop.js'
|
||||
import { handleStop } from './circuit/stop.js'
|
||||
import { StreamHandler } from './circuit/stream-handler.js'
|
||||
import { symbol } from '@libp2p/interfaces/transport'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import { Components, Initializable } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { Listener, Transport, CreateListenerOptions, ConnectionHandler } from '@libp2p/interfaces/transport'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { Connection, Stream } from '@libp2p/interfaces/connection'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { StreamHandlerV2 } from './v2/stream-handler.js'
|
||||
import { StreamHandlerV1 } from './v1/stream-handler.js'
|
||||
import * as CircuitV1Handler from './v1/index.js'
|
||||
import * as CircuitV2Handler from './v2/index.js'
|
||||
|
||||
const log = logger('libp2p:circuit')
|
||||
|
||||
export interface CircuitOptions {
|
||||
limit?: number
|
||||
}
|
||||
|
||||
interface ConnectOptions {
|
||||
stream: Stream
|
||||
connection: Connection
|
||||
destinationPeer: PeerId
|
||||
destinationAddr: Multiaddr
|
||||
relayAddr: Multiaddr
|
||||
ma: Multiaddr
|
||||
disconnectOnFailure: boolean
|
||||
}
|
||||
export class Circuit implements Transport, Initializable {
|
||||
private handler?: ConnectionHandler
|
||||
private components: Components = new Components()
|
||||
private readonly reservationStore: ReservationStore
|
||||
|
||||
constructor (options: CircuitOptions) {
|
||||
this.reservationStore = new ReservationStore(options.limit)
|
||||
}
|
||||
|
||||
init (components: Components): void {
|
||||
this.components = components
|
||||
void this.components.getRegistrar().handle(RELAY_CODEC, (data) => {
|
||||
void this._onProtocol(data).catch(err => {
|
||||
|
||||
void this.components.getRegistrar().handle(RELAY_V1_CODEC, (data) => {
|
||||
void this._onProtocolV1(data).catch(err => {
|
||||
log.error(err)
|
||||
})
|
||||
})
|
||||
@@ -52,16 +75,20 @@ export class Circuit implements Transport, Initializable {
|
||||
return this.constructor.name
|
||||
}
|
||||
|
||||
async _onProtocol (data: IncomingStreamData) {
|
||||
getPeerConnection (dstPeer: PeerId) {
|
||||
return this.components.getConnectionManager().getConnection(dstPeer)
|
||||
}
|
||||
|
||||
async _onProtocolV1 (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const streamHandler = new StreamHandlerV1({ stream })
|
||||
const request = await streamHandler.read()
|
||||
|
||||
if (request == null) {
|
||||
log('request was invalid, could not read from stream')
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.MALFORMED_MESSAGE
|
||||
type: CircuitV1.CircuitRelay.Type.STATUS,
|
||||
code: CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE
|
||||
})
|
||||
streamHandler.close()
|
||||
return
|
||||
@@ -70,14 +97,14 @@ export class Circuit implements Transport, Initializable {
|
||||
let virtualConnection
|
||||
|
||||
switch (request.type) {
|
||||
case CircuitPB.Type.CAN_HOP: {
|
||||
case CircuitV1.CircuitRelay.Type.CAN_HOP: {
|
||||
log('received CAN_HOP request from %p', connection.remotePeer)
|
||||
await handleCanHop({ circuit: this, connection, streamHandler })
|
||||
await CircuitV1Handler.handleCanHop({ circuit: this, connection, streamHandler })
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.HOP: {
|
||||
case CircuitV1.CircuitRelay.Type.HOP: {
|
||||
log('received HOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleHop({
|
||||
virtualConnection = await CircuitV1Handler.handleHop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
@@ -86,9 +113,9 @@ export class Circuit implements Transport, Initializable {
|
||||
})
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.STOP: {
|
||||
case CircuitV1.CircuitRelay.Type.STOP: {
|
||||
log('received STOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleStop({
|
||||
virtualConnection = await CircuitV1Handler.handleStop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
@@ -98,8 +125,8 @@ export class Circuit implements Transport, Initializable {
|
||||
default: {
|
||||
log('Request of type %s not supported', request.type)
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.MALFORMED_MESSAGE
|
||||
type: CircuitV1.CircuitRelay.Type.STATUS,
|
||||
code: CircuitV1.CircuitRelay.Status.MALFORMED_MESSAGE
|
||||
})
|
||||
streamHandler.close()
|
||||
return
|
||||
@@ -107,16 +134,14 @@ export class Circuit implements Transport, Initializable {
|
||||
}
|
||||
|
||||
if (virtualConnection != null) {
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const remoteAddr = new Multiaddr(request.dstPeer.addrs[0])
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const localAddr = new Multiaddr(request.srcPeer.addrs[0])
|
||||
const remoteAddr = new Multiaddr(request.dstPeer?.addrs?.[0] ?? '')
|
||||
const localAddr = new Multiaddr(request.srcPeer?.addrs?.[0] ?? '')
|
||||
const maConn = streamToMaConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
|
||||
const type = request.type === CircuitV1.CircuitRelay.Type.HOP ? 'relay' : 'inbound'
|
||||
log('new %s connection %s', type, maConn.remoteAddr)
|
||||
|
||||
const conn = await this.components.getUpgrader().upgradeInbound(maConn)
|
||||
@@ -128,6 +153,55 @@ export class Circuit implements Transport, Initializable {
|
||||
}
|
||||
}
|
||||
|
||||
async _onV2ProtocolHop ({ connection, stream }: IncomingStreamData) {
|
||||
log('received circuit v2 hop protocol stream from %s', connection.remotePeer)
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
const request = CircuitV2.HopMessage.decode(await streamHandler.read())
|
||||
|
||||
if (request?.type === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
await CircuitV2Handler.handleHopProtocol({
|
||||
connection,
|
||||
streamHandler,
|
||||
circuit: this,
|
||||
relayPeer: this.components.getPeerId(),
|
||||
relayAddrs: this.components.getAddressManager().getListenAddrs(),
|
||||
reservationStore: this.reservationStore,
|
||||
request
|
||||
})
|
||||
}
|
||||
|
||||
async _onV2ProtocolStop ({ connection, stream }: IncomingStreamData) {
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
const request = CircuitV2.StopMessage.decode(await streamHandler.read())
|
||||
log('received circuit v2 stop protocol request from %s', connection.remotePeer)
|
||||
if (request?.type === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const mStream = await CircuitV2Handler.handleStop({
|
||||
connection,
|
||||
streamHandler,
|
||||
request
|
||||
})
|
||||
|
||||
if (mStream !== null && mStream !== undefined) {
|
||||
const remoteAddr = new Multiaddr(request.peer?.addrs?.[0])
|
||||
const localAddr = this.components.getTransportManager().getAddrs()[0]
|
||||
const maConn = streamToMaConnection({
|
||||
stream: mStream,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
log('new inbound connection %s', maConn.remoteAddr)
|
||||
const conn = await this.components.getUpgrader().upgradeInbound(maConn)
|
||||
log('%s connection %s upgraded', 'inbound', maConn.remoteAddr)
|
||||
this.handler?.(conn)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*/
|
||||
@@ -142,7 +216,7 @@ export class Circuit implements Transport, Initializable {
|
||||
if (relayId == null || destinationId == null) {
|
||||
const errMsg = 'Circuit relay dial failed as addresses did not have peer id'
|
||||
log.error(errMsg)
|
||||
throw errCode(new Error(errMsg), codes.ERR_RELAYED_DIAL)
|
||||
throw createError(new Error(errMsg), codes.ERR_RELAYED_DIAL)
|
||||
}
|
||||
|
||||
const relayPeer = peerIdFromString(relayId)
|
||||
@@ -156,33 +230,107 @@ export class Circuit implements Transport, Initializable {
|
||||
}
|
||||
|
||||
try {
|
||||
const virtualConnection = await hop({
|
||||
connection: relayConnection,
|
||||
request: {
|
||||
type: CircuitPB.Type.HOP,
|
||||
srcPeer: {
|
||||
id: this.components.getPeerId().toBytes(),
|
||||
addrs: this.components.getAddressManager().getAddresses().map(addr => addr.bytes)
|
||||
},
|
||||
dstPeer: {
|
||||
id: destinationPeer.toBytes(),
|
||||
addrs: [new Multiaddr(destinationAddr).bytes]
|
||||
}
|
||||
}
|
||||
})
|
||||
const stream = await relayConnection.newStream([protocolIDv2Hop, RELAY_V1_CODEC])
|
||||
|
||||
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.components.getPeerId().toString()}`)
|
||||
switch (stream.protocol) {
|
||||
case RELAY_V1_CODEC: return await this.connectV1({
|
||||
stream: stream.stream,
|
||||
connection: relayConnection,
|
||||
destinationPeer,
|
||||
destinationAddr,
|
||||
relayAddr,
|
||||
ma,
|
||||
disconnectOnFailure
|
||||
})
|
||||
case protocolIDv2Hop: return await this.connectV2({
|
||||
stream: stream.stream,
|
||||
connection: relayConnection,
|
||||
destinationPeer,
|
||||
destinationAddr,
|
||||
relayAddr,
|
||||
ma,
|
||||
disconnectOnFailure
|
||||
})
|
||||
default:
|
||||
stream.stream.reset()
|
||||
throw new Error('Unexpected stream protocol')
|
||||
}
|
||||
} catch (err: any) {
|
||||
log.error('Circuit relay dial failed', err)
|
||||
disconnectOnFailure && await relayConnection.close()
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
async connectV1 ({
|
||||
stream, connection, destinationPeer,
|
||||
destinationAddr, relayAddr, ma,
|
||||
disconnectOnFailure
|
||||
}: ConnectOptions
|
||||
) {
|
||||
const virtualConnection = await CircuitV1Handler.hop({
|
||||
stream,
|
||||
request: {
|
||||
type: CircuitV1.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
id: this.components.getPeerId().toBytes(),
|
||||
addrs: this.components.getAddressManager().getListenAddrs().map(addr => addr.bytes)
|
||||
},
|
||||
dstPeer: {
|
||||
id: destinationPeer.toBytes(),
|
||||
addrs: [new Multiaddr(destinationAddr).bytes]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.components.getPeerId().toString()}`)
|
||||
const maConn = streamToMaConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr: ma,
|
||||
localAddr
|
||||
})
|
||||
log('new outbound connection %s', maConn.remoteAddr)
|
||||
|
||||
return await this.components.getUpgrader().upgradeOutbound(maConn)
|
||||
}
|
||||
|
||||
async connectV2 (
|
||||
{
|
||||
stream, connection, destinationPeer,
|
||||
destinationAddr, relayAddr, ma,
|
||||
disconnectOnFailure
|
||||
}: ConnectOptions
|
||||
) {
|
||||
try {
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
streamHandler.write(CircuitV2.HopMessage.encode({
|
||||
type: CircuitV2.HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: destinationPeer.toBytes(),
|
||||
addrs: [new Multiaddr(destinationAddr).bytes]
|
||||
}
|
||||
}).finish())
|
||||
|
||||
const status = CircuitV2.HopMessage.decode(await streamHandler.read())
|
||||
if (status.status !== CircuitV2.Status.OK) {
|
||||
throw createError(new Error('failed to connect via relay with status ' + status.status.toString()), codes.ERR_HOP_REQUEST_FAILED)
|
||||
}
|
||||
|
||||
// TODO: do something with limit and transient connection
|
||||
|
||||
let localAddr = relayAddr
|
||||
localAddr = localAddr.encapsulate(`/p2p-circuit/p2p/${this.components.getPeerId().toString()}`)
|
||||
const maConn = streamToMaConnection({
|
||||
stream: virtualConnection,
|
||||
stream: streamHandler.rest(),
|
||||
remoteAddr: ma,
|
||||
localAddr
|
||||
})
|
||||
log('new outbound connection %s', maConn.remoteAddr)
|
||||
|
||||
return await this.components.getUpgrader().upgradeOutbound(maConn)
|
||||
} catch (err: any) {
|
||||
const conn = await this.components.getUpgrader().upgradeOutbound(maConn)
|
||||
return conn
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('Circuit relay dial failed', err)
|
||||
disconnectOnFailure && await relayConnection.close()
|
||||
disconnectOnFailure && await connection.close()
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
@@ -1,13 +1,13 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import errCode from 'err-code'
|
||||
import { validateAddrs } from './utils.js'
|
||||
import { StreamHandler } from './stream-handler.js'
|
||||
import { CircuitRelay as CircuitPB, ICircuitRelay } from '../pb/index.js'
|
||||
import { StreamHandlerV1 } from './stream-handler.js'
|
||||
import { CircuitRelay as CircuitPB, ICircuitRelay } from './pb/index.js'
|
||||
import { pipe } from 'it-pipe'
|
||||
import { codes as Errors } from '../../errors.js'
|
||||
import { stop } from './stop.js'
|
||||
import { RELAY_CODEC } from '../multicodec.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import { RELAY_V1_CODEC } from '../multicodec.js'
|
||||
import type { Connection, Stream } from '@libp2p/interfaces/connection'
|
||||
import { peerIdFromBytes } from '@libp2p/peer-id'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { Circuit } from '../transport.js'
|
||||
@@ -18,7 +18,7 @@ const log = logger('libp2p:circuit:hop')
|
||||
export interface HopRequest {
|
||||
connection: Connection
|
||||
request: ICircuitRelay
|
||||
streamHandler: StreamHandler
|
||||
streamHandler: StreamHandlerV1
|
||||
circuit: Circuit
|
||||
connectionManager: ConnectionManager
|
||||
}
|
||||
@@ -119,7 +119,7 @@ export async function handleHop (hopRequest: HopRequest) {
|
||||
}
|
||||
|
||||
export interface HopConfig {
|
||||
connection: Connection
|
||||
stream: Stream
|
||||
request: ICircuitRelay
|
||||
}
|
||||
|
||||
@@ -129,14 +129,12 @@ export interface HopConfig {
|
||||
*/
|
||||
export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
|
||||
const {
|
||||
connection,
|
||||
stream,
|
||||
request
|
||||
} = options
|
||||
|
||||
// Create a new stream to the relay
|
||||
const { stream } = await connection.newStream(RELAY_CODEC)
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const streamHandler = new StreamHandlerV1({ stream })
|
||||
streamHandler.write(request)
|
||||
|
||||
const response = await streamHandler.read()
|
||||
@@ -169,10 +167,10 @@ export async function canHop (options: CanHopOptions) {
|
||||
} = options
|
||||
|
||||
// Create a new stream to the relay
|
||||
const { stream } = await connection.newStream(RELAY_CODEC)
|
||||
const { stream } = await connection.newStream(RELAY_V1_CODEC)
|
||||
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const streamHandler = new StreamHandlerV1({ stream })
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.CAN_HOP
|
||||
})
|
||||
@@ -189,7 +187,7 @@ export async function canHop (options: CanHopOptions) {
|
||||
|
||||
export interface HandleCanHopOptions {
|
||||
connection: Connection
|
||||
streamHandler: StreamHandler
|
||||
streamHandler: StreamHandlerV1
|
||||
circuit: Circuit
|
||||
}
|
||||
|
2
src/circuit/v1/index.ts
Normal file
2
src/circuit/v1/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from './hop.js'
|
||||
export * from './stop.js'
|
@@ -1,7 +1,7 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { CircuitRelay as CircuitPB, ICircuitRelay } from '../pb/index.js'
|
||||
import { RELAY_CODEC } from '../multicodec.js'
|
||||
import { StreamHandler } from './stream-handler.js'
|
||||
import { CircuitRelay as CircuitPB, ICircuitRelay } from './pb/index.js'
|
||||
import { RELAY_V1_CODEC } from '../multicodec.js'
|
||||
import { StreamHandlerV1 } from './stream-handler.js'
|
||||
import { validateAddrs } from './utils.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
@@ -11,7 +11,7 @@ const log = logger('libp2p:circuit:stop')
|
||||
export interface HandleStopOptions {
|
||||
connection: Connection
|
||||
request: ICircuitRelay
|
||||
streamHandler: StreamHandler
|
||||
streamHandler: StreamHandlerV1
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -56,9 +56,9 @@ export async function stop (options: StopOptions) {
|
||||
request
|
||||
} = options
|
||||
|
||||
const { stream } = await connection.newStream([RELAY_CODEC])
|
||||
const { stream } = await connection.newStream([RELAY_V1_CODEC])
|
||||
log('starting stop request to %p', connection.remotePeer)
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const streamHandler = new StreamHandlerV1({ stream })
|
||||
|
||||
streamHandler.write(request)
|
||||
const response = await streamHandler.read()
|
@@ -1,11 +1,11 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { Handshake, handshake } from 'it-handshake'
|
||||
import { CircuitRelay, ICircuitRelay } from '../pb/index.js'
|
||||
import { CircuitRelay, ICircuitRelay } from './pb/index.js'
|
||||
import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import type { Source } from 'it-stream-types'
|
||||
|
||||
const log = logger('libp2p:circuit:stream-handler')
|
||||
const log = logger('libp2p:circuitv1:stream-handler')
|
||||
|
||||
export interface StreamHandlerOptions {
|
||||
/**
|
||||
@@ -19,7 +19,7 @@ export interface StreamHandlerOptions {
|
||||
maxLength?: number
|
||||
}
|
||||
|
||||
export class StreamHandler {
|
||||
export class StreamHandlerV1 {
|
||||
private readonly stream: Stream
|
||||
private readonly shake: Handshake
|
||||
private readonly decoder: Source<Uint8Array>
|
@@ -1,11 +1,11 @@
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { CircuitRelay, ICircuitRelay } from '../pb/index.js'
|
||||
import type { StreamHandler } from './stream-handler.js'
|
||||
import { CircuitRelay, ICircuitRelay } from './pb/index.js'
|
||||
import type { StreamHandlerV1 } from './stream-handler.js'
|
||||
|
||||
/**
|
||||
* Write a response
|
||||
*/
|
||||
function writeResponse (streamHandler: StreamHandler, status: CircuitRelay.Status) {
|
||||
function writeResponse (streamHandler: StreamHandlerV1, status: CircuitRelay.Status) {
|
||||
streamHandler.write({
|
||||
type: CircuitRelay.Type.STATUS,
|
||||
code: status
|
||||
@@ -15,7 +15,7 @@ function writeResponse (streamHandler: StreamHandler, status: CircuitRelay.Statu
|
||||
/**
|
||||
* Validate incomming HOP/STOP message
|
||||
*/
|
||||
export function validateAddrs (msg: ICircuitRelay, streamHandler: StreamHandler) {
|
||||
export function validateAddrs (msg: ICircuitRelay, streamHandler: StreamHandlerV1) {
|
||||
try {
|
||||
if (msg.dstPeer?.addrs != null) {
|
||||
msg.dstPeer.addrs.forEach((addr) => {
|
220
src/circuit/v2/hop.ts
Normal file
220
src/circuit/v2/hop.ts
Normal file
@@ -0,0 +1,220 @@
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { RecordEnvelope } from '@libp2p/peer-record'
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { pipe } from 'it-pipe'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import { HopMessage, IHopMessage, IReservation, ILimit, Status, StopMessage } from './pb/index.js'
|
||||
import { StreamHandlerV2 } from './stream-handler.js'
|
||||
import type { Circuit } from '../transport.js'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { Acl, ReservationStore } from './interfaces.js'
|
||||
import { protocolIDv2Hop } from '../multicodec.js'
|
||||
import { validateHopConnectRequest } from './validation.js'
|
||||
import { stop } from './stop.js'
|
||||
import { ReservationVoucherRecord } from './reservation-voucher.js'
|
||||
import { peerIdFromBytes } from '@libp2p/peer-id'
|
||||
|
||||
const log = logger('libp2p:circuitv2:hop')
|
||||
|
||||
export interface HopProtocolOptions {
|
||||
connection: Connection
|
||||
request: IHopMessage
|
||||
streamHandler: StreamHandlerV2
|
||||
circuit: Circuit
|
||||
relayPeer: PeerId
|
||||
relayAddrs: Multiaddr[]
|
||||
limit?: ILimit
|
||||
acl?: Acl
|
||||
reservationStore: ReservationStore
|
||||
}
|
||||
|
||||
export async function handleHopProtocol (options: HopProtocolOptions) {
|
||||
switch (options.request.type) {
|
||||
case HopMessage.Type.RESERVE: await handleReserve(options); break
|
||||
case HopMessage.Type.CONNECT: await handleConnect(options); break
|
||||
default: {
|
||||
log.error('invalid hop request type %s via peer %s', options.request.type, options.connection.remotePeer)
|
||||
writeErrorResponse(options.streamHandler, Status.MALFORMED_MESSAGE)
|
||||
options.streamHandler.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function reserve (connection: Connection) {
|
||||
log('requesting reservation from %s', connection.remotePeer)
|
||||
const { stream } = await connection.newStream([protocolIDv2Hop])
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
streamHandler.write(HopMessage.encode({
|
||||
type: HopMessage.Type.RESERVE
|
||||
}).finish())
|
||||
|
||||
let response
|
||||
try {
|
||||
response = HopMessage.decode(await streamHandler.read())
|
||||
} catch (e: any) {
|
||||
log.error('error passing reserve message response from %s because', connection.remotePeer, e.message)
|
||||
streamHandler.close()
|
||||
throw e
|
||||
}
|
||||
|
||||
if (response.status === Status.OK && response.reservation !== null) {
|
||||
return response.reservation
|
||||
}
|
||||
const errMsg = `reservation failed with status ${response.status}`
|
||||
log.error(errMsg)
|
||||
throw new Error(errMsg)
|
||||
}
|
||||
|
||||
async function handleReserve ({ connection, streamHandler, relayPeer, relayAddrs, limit, acl, reservationStore }: HopProtocolOptions) {
|
||||
log('hop reserve request from %s', connection.remotePeer)
|
||||
|
||||
// TODO: prevent reservation over relay address
|
||||
|
||||
if ((await acl?.allowReserve?.(connection.remotePeer, connection.remoteAddr)) === false) {
|
||||
log.error('acl denied reservation to %s', connection.remotePeer)
|
||||
writeErrorResponse(streamHandler, Status.PERMISSION_DENIED)
|
||||
streamHandler.close()
|
||||
return
|
||||
}
|
||||
|
||||
const result = await reservationStore.reserve(connection.remotePeer, connection.remoteAddr)
|
||||
|
||||
if (result.status !== Status.OK) {
|
||||
writeErrorResponse(streamHandler, result.status)
|
||||
streamHandler.close()
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
writeResponse(
|
||||
streamHandler,
|
||||
{
|
||||
type: HopMessage.Type.STATUS,
|
||||
reservation: await makeReservation(relayAddrs, relayPeer, connection.remotePeer, result.expire ?? 0),
|
||||
limit
|
||||
})
|
||||
log('sent confirmation response to %s', connection.remotePeer)
|
||||
} catch (err) {
|
||||
log.error('failed to send confirmation response to %s', connection.remotePeer)
|
||||
await reservationStore.removeReservation(connection.remotePeer)
|
||||
}
|
||||
// TODO: how to ensure connection manager doesn't close reserved relay conn
|
||||
}
|
||||
|
||||
type HopConnectOptions = Pick<
|
||||
HopProtocolOptions,
|
||||
'connection' | 'streamHandler' | 'request' | 'reservationStore' |'circuit' |'acl'
|
||||
>
|
||||
|
||||
async function handleConnect (options: HopConnectOptions) {
|
||||
const { connection, streamHandler, request, reservationStore, circuit, acl } = options
|
||||
log('hop connect request from %s', connection.remotePeer)
|
||||
// Validate the HOP connect request has the required input
|
||||
try {
|
||||
validateHopConnectRequest(request, streamHandler)
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('invalid hop connect request via peer %s', connection.remotePeer, err)
|
||||
writeErrorResponse(streamHandler, Status.MALFORMED_MESSAGE)
|
||||
return
|
||||
}
|
||||
|
||||
// @ts-expect-error
|
||||
const dstPeer = peerIdFromBytes(request.peer.id)
|
||||
|
||||
if (acl?.allowConnect !== undefined) {
|
||||
const status = await acl.allowConnect(connection.remotePeer, connection.remoteAddr, dstPeer)
|
||||
if (status !== Status.OK) {
|
||||
log.error('hop connect denied for %s with status %s', connection.remotePeer, status)
|
||||
writeErrorResponse(streamHandler, status)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if (!await reservationStore.hasReservation(dstPeer)) {
|
||||
log.error('hop connect denied for %s with status %s', connection.remotePeer, Status.NO_RESERVATION)
|
||||
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
|
||||
return
|
||||
}
|
||||
|
||||
const destinationConnection = circuit.getPeerConnection(dstPeer)
|
||||
if (destinationConnection === undefined || destinationConnection === null) {
|
||||
log('hop connect denied for %s as there is no destination connection', connection.remotePeer)
|
||||
writeErrorResponse(streamHandler, Status.NO_RESERVATION)
|
||||
return
|
||||
}
|
||||
|
||||
log('hop connect request from %s to %s is valid', connection.remotePeer, dstPeer)
|
||||
|
||||
const destinationStream = await stop({
|
||||
connection: destinationConnection,
|
||||
request: {
|
||||
type: StopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: connection.remotePeer.toBytes(),
|
||||
addrs: [new Multiaddr('/p2p/' + connection.remotePeer.toString()).bytes]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if (destinationStream === undefined || destinationStream === null) {
|
||||
log.error('failed to open stream to destination peer %s', destinationConnection?.remotePeer)
|
||||
writeErrorResponse(streamHandler, Status.CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
writeResponse(streamHandler, { type: HopMessage.Type.STATUS, status: Status.OK })
|
||||
|
||||
const sourceStream = streamHandler.rest()
|
||||
log('connection to destination established, short circuiting streams...')
|
||||
// Short circuit the two streams to create the relayed connection
|
||||
return await pipe(
|
||||
sourceStream,
|
||||
destinationStream,
|
||||
sourceStream
|
||||
)
|
||||
}
|
||||
|
||||
async function makeReservation (
|
||||
relayAddrs: Multiaddr[],
|
||||
relayPeerId: PeerId,
|
||||
remotePeer: PeerId,
|
||||
expire: number
|
||||
): Promise<IReservation> {
|
||||
const addrs = []
|
||||
|
||||
for (const relayAddr of relayAddrs) {
|
||||
addrs.push(relayAddr.bytes)
|
||||
}
|
||||
|
||||
const voucher = await RecordEnvelope.seal(new ReservationVoucherRecord({
|
||||
peer: remotePeer,
|
||||
relay: relayPeerId,
|
||||
expiration: expire
|
||||
}), relayPeerId)
|
||||
|
||||
return {
|
||||
addrs,
|
||||
expire,
|
||||
voucher: voucher.marshal()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an error response and closes stream
|
||||
*
|
||||
*/
|
||||
function writeErrorResponse (streamHandler: StreamHandlerV2, status: Status) {
|
||||
writeResponse(streamHandler, {
|
||||
type: HopMessage.Type.STATUS,
|
||||
status
|
||||
})
|
||||
streamHandler.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a response
|
||||
*
|
||||
*/
|
||||
function writeResponse (streamHandler: StreamHandlerV2, msg: IHopMessage) {
|
||||
streamHandler.write(HopMessage.encode(msg).finish())
|
||||
}
|
2
src/circuit/v2/index.ts
Normal file
2
src/circuit/v2/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from './hop.js'
|
||||
export * from './stop.js'
|
21
src/circuit/v2/interfaces.ts
Normal file
21
src/circuit/v2/interfaces.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { Status } from './pb/index.js'
|
||||
|
||||
export type ReservationStatus = Status.OK | Status.PERMISSION_DENIED | Status.RESERVATION_REFUSED
|
||||
|
||||
export interface ReservationStore {
|
||||
reserve: (peer: PeerId, addr: Multiaddr) => Promise<{status: ReservationStatus, expire?: number}>
|
||||
removeReservation: (peer: PeerId) => Promise<void>
|
||||
hasReservation: (dst: PeerId) => Promise<boolean>
|
||||
}
|
||||
|
||||
type AclStatus = Status.OK | Status.RESOURCE_LIMIT_EXCEEDED | Status.PERMISSION_DENIED
|
||||
|
||||
export interface Acl {
|
||||
allowReserve: (peer: PeerId, addr: Multiaddr) => Promise<boolean>
|
||||
/**
|
||||
* Checks if connection should be allowed
|
||||
*/
|
||||
allowConnect: (src: PeerId, addr: Multiaddr, dst: PeerId) => Promise<AclStatus>
|
||||
}
|
450
src/circuit/v2/pb/index.d.ts
vendored
Normal file
450
src/circuit/v2/pb/index.d.ts
vendored
Normal file
@@ -0,0 +1,450 @@
|
||||
import * as $protobuf from "protobufjs";
|
||||
/** Properties of a HopMessage. */
|
||||
export interface IHopMessage {
|
||||
|
||||
/** HopMessage type */
|
||||
type: HopMessage.Type;
|
||||
|
||||
/** HopMessage peer */
|
||||
peer?: (IPeer|null);
|
||||
|
||||
/** HopMessage reservation */
|
||||
reservation?: (IReservation|null);
|
||||
|
||||
/** HopMessage limit */
|
||||
limit?: (ILimit|null);
|
||||
|
||||
/** HopMessage status */
|
||||
status?: (Status|null);
|
||||
}
|
||||
|
||||
/** Represents a HopMessage. */
|
||||
export class HopMessage implements IHopMessage {
|
||||
|
||||
/**
|
||||
* Constructs a new HopMessage.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: IHopMessage);
|
||||
|
||||
/** HopMessage type. */
|
||||
public type: HopMessage.Type;
|
||||
|
||||
/** HopMessage peer. */
|
||||
public peer?: (IPeer|null);
|
||||
|
||||
/** HopMessage reservation. */
|
||||
public reservation?: (IReservation|null);
|
||||
|
||||
/** HopMessage limit. */
|
||||
public limit?: (ILimit|null);
|
||||
|
||||
/** HopMessage status. */
|
||||
public status: Status;
|
||||
|
||||
/**
|
||||
* Encodes the specified HopMessage message. Does not implicitly {@link HopMessage.verify|verify} messages.
|
||||
* @param m HopMessage message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: IHopMessage, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a HopMessage message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns HopMessage
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): HopMessage;
|
||||
|
||||
/**
|
||||
* Creates a HopMessage message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns HopMessage
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): HopMessage;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a HopMessage message. Also converts values to other types if specified.
|
||||
* @param m HopMessage
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: HopMessage, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this HopMessage to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
||||
|
||||
export namespace HopMessage {
|
||||
|
||||
/** Type enum. */
|
||||
enum Type {
|
||||
RESERVE = 0,
|
||||
CONNECT = 1,
|
||||
STATUS = 2
|
||||
}
|
||||
}
|
||||
|
||||
/** Properties of a StopMessage. */
|
||||
export interface IStopMessage {
|
||||
|
||||
/** StopMessage type */
|
||||
type: StopMessage.Type;
|
||||
|
||||
/** StopMessage peer */
|
||||
peer?: (IPeer|null);
|
||||
|
||||
/** StopMessage limit */
|
||||
limit?: (ILimit|null);
|
||||
|
||||
/** StopMessage status */
|
||||
status?: (Status|null);
|
||||
}
|
||||
|
||||
/** Represents a StopMessage. */
|
||||
export class StopMessage implements IStopMessage {
|
||||
|
||||
/**
|
||||
* Constructs a new StopMessage.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: IStopMessage);
|
||||
|
||||
/** StopMessage type. */
|
||||
public type: StopMessage.Type;
|
||||
|
||||
/** StopMessage peer. */
|
||||
public peer?: (IPeer|null);
|
||||
|
||||
/** StopMessage limit. */
|
||||
public limit?: (ILimit|null);
|
||||
|
||||
/** StopMessage status. */
|
||||
public status: Status;
|
||||
|
||||
/**
|
||||
* Encodes the specified StopMessage message. Does not implicitly {@link StopMessage.verify|verify} messages.
|
||||
* @param m StopMessage message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: IStopMessage, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a StopMessage message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns StopMessage
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): StopMessage;
|
||||
|
||||
/**
|
||||
* Creates a StopMessage message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns StopMessage
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): StopMessage;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a StopMessage message. Also converts values to other types if specified.
|
||||
* @param m StopMessage
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: StopMessage, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this StopMessage to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
||||
|
||||
export namespace StopMessage {
|
||||
|
||||
/** Type enum. */
|
||||
enum Type {
|
||||
CONNECT = 0,
|
||||
STATUS = 1
|
||||
}
|
||||
}
|
||||
|
||||
/** Properties of a Peer. */
|
||||
export interface IPeer {
|
||||
|
||||
/** Peer id */
|
||||
id: Uint8Array;
|
||||
|
||||
/** Peer addrs */
|
||||
addrs?: (Uint8Array[]|null);
|
||||
}
|
||||
|
||||
/** Represents a Peer. */
|
||||
export class Peer implements IPeer {
|
||||
|
||||
/**
|
||||
* Constructs a new Peer.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: IPeer);
|
||||
|
||||
/** Peer id. */
|
||||
public id: Uint8Array;
|
||||
|
||||
/** Peer addrs. */
|
||||
public addrs: Uint8Array[];
|
||||
|
||||
/**
|
||||
* Encodes the specified Peer message. Does not implicitly {@link Peer.verify|verify} messages.
|
||||
* @param m Peer message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: IPeer, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a Peer message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns Peer
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): Peer;
|
||||
|
||||
/**
|
||||
* Creates a Peer message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns Peer
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): Peer;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a Peer message. Also converts values to other types if specified.
|
||||
* @param m Peer
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: Peer, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this Peer to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
||||
|
||||
/** Properties of a Reservation. */
|
||||
export interface IReservation {
|
||||
|
||||
/** Reservation expire */
|
||||
expire: number;
|
||||
|
||||
/** Reservation addrs */
|
||||
addrs?: (Uint8Array[]|null);
|
||||
|
||||
/** Reservation voucher */
|
||||
voucher?: (Uint8Array|null);
|
||||
}
|
||||
|
||||
/** Represents a Reservation. */
|
||||
export class Reservation implements IReservation {
|
||||
|
||||
/**
|
||||
* Constructs a new Reservation.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: IReservation);
|
||||
|
||||
/** Reservation expire. */
|
||||
public expire: number;
|
||||
|
||||
/** Reservation addrs. */
|
||||
public addrs: Uint8Array[];
|
||||
|
||||
/** Reservation voucher. */
|
||||
public voucher: Uint8Array;
|
||||
|
||||
/**
|
||||
* Encodes the specified Reservation message. Does not implicitly {@link Reservation.verify|verify} messages.
|
||||
* @param m Reservation message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: IReservation, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a Reservation message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns Reservation
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): Reservation;
|
||||
|
||||
/**
|
||||
* Creates a Reservation message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns Reservation
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): Reservation;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a Reservation message. Also converts values to other types if specified.
|
||||
* @param m Reservation
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: Reservation, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this Reservation to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
||||
|
||||
/** Properties of a Limit. */
|
||||
export interface ILimit {
|
||||
|
||||
/** Limit duration */
|
||||
duration?: (number|null);
|
||||
|
||||
/** Limit data */
|
||||
data?: (number|null);
|
||||
}
|
||||
|
||||
/** Represents a Limit. */
|
||||
export class Limit implements ILimit {
|
||||
|
||||
/**
|
||||
* Constructs a new Limit.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: ILimit);
|
||||
|
||||
/** Limit duration. */
|
||||
public duration: number;
|
||||
|
||||
/** Limit data. */
|
||||
public data: number;
|
||||
|
||||
/**
|
||||
* Encodes the specified Limit message. Does not implicitly {@link Limit.verify|verify} messages.
|
||||
* @param m Limit message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: ILimit, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a Limit message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns Limit
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): Limit;
|
||||
|
||||
/**
|
||||
* Creates a Limit message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns Limit
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): Limit;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a Limit message. Also converts values to other types if specified.
|
||||
* @param m Limit
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: Limit, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this Limit to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
||||
|
||||
/** Status enum. */
|
||||
export enum Status {
|
||||
OK = 100,
|
||||
RESERVATION_REFUSED = 200,
|
||||
RESOURCE_LIMIT_EXCEEDED = 201,
|
||||
PERMISSION_DENIED = 202,
|
||||
CONNECTION_FAILED = 203,
|
||||
NO_RESERVATION = 204,
|
||||
MALFORMED_MESSAGE = 400,
|
||||
UNEXPECTED_MESSAGE = 401
|
||||
}
|
||||
|
||||
/** Represents a ReservationVoucher. */
|
||||
export class ReservationVoucher implements IReservationVoucher {
|
||||
|
||||
/**
|
||||
* Constructs a new ReservationVoucher.
|
||||
* @param [p] Properties to set
|
||||
*/
|
||||
constructor(p?: IReservationVoucher);
|
||||
|
||||
/** ReservationVoucher relay. */
|
||||
public relay: Uint8Array;
|
||||
|
||||
/** ReservationVoucher peer. */
|
||||
public peer: Uint8Array;
|
||||
|
||||
/** ReservationVoucher expiration. */
|
||||
public expiration: number;
|
||||
|
||||
/**
|
||||
* Encodes the specified ReservationVoucher message. Does not implicitly {@link ReservationVoucher.verify|verify} messages.
|
||||
* @param m ReservationVoucher message or plain object to encode
|
||||
* @param [w] Writer to encode to
|
||||
* @returns Writer
|
||||
*/
|
||||
public static encode(m: IReservationVoucher, w?: $protobuf.Writer): $protobuf.Writer;
|
||||
|
||||
/**
|
||||
* Decodes a ReservationVoucher message from the specified reader or buffer.
|
||||
* @param r Reader or buffer to decode from
|
||||
* @param [l] Message length if known beforehand
|
||||
* @returns ReservationVoucher
|
||||
* @throws {Error} If the payload is not a reader or valid buffer
|
||||
* @throws {$protobuf.util.ProtocolError} If required fields are missing
|
||||
*/
|
||||
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): ReservationVoucher;
|
||||
|
||||
/**
|
||||
* Creates a ReservationVoucher message from a plain object. Also converts values to their respective internal types.
|
||||
* @param d Plain object
|
||||
* @returns ReservationVoucher
|
||||
*/
|
||||
public static fromObject(d: { [k: string]: any }): ReservationVoucher;
|
||||
|
||||
/**
|
||||
* Creates a plain object from a ReservationVoucher message. Also converts values to other types if specified.
|
||||
* @param m ReservationVoucher
|
||||
* @param [o] Conversion options
|
||||
* @returns Plain object
|
||||
*/
|
||||
public static toObject(m: ReservationVoucher, o?: $protobuf.IConversionOptions): { [k: string]: any };
|
||||
|
||||
/**
|
||||
* Converts this ReservationVoucher to JSON.
|
||||
* @returns JSON object
|
||||
*/
|
||||
public toJSON(): { [k: string]: any };
|
||||
}
|
1346
src/circuit/v2/pb/index.js
Normal file
1346
src/circuit/v2/pb/index.js
Normal file
File diff suppressed because it is too large
Load Diff
64
src/circuit/v2/pb/index.proto
Normal file
64
src/circuit/v2/pb/index.proto
Normal file
@@ -0,0 +1,64 @@
|
||||
syntax = "proto2";
|
||||
|
||||
message HopMessage {
|
||||
enum Type {
|
||||
RESERVE = 0;
|
||||
CONNECT = 1;
|
||||
STATUS = 2;
|
||||
}
|
||||
|
||||
required Type type = 1;
|
||||
|
||||
optional Peer peer = 2;
|
||||
optional Reservation reservation = 3;
|
||||
optional Limit limit = 4;
|
||||
|
||||
optional Status status = 5;
|
||||
}
|
||||
|
||||
message StopMessage {
|
||||
enum Type {
|
||||
CONNECT = 0;
|
||||
STATUS = 1;
|
||||
}
|
||||
|
||||
required Type type = 1;
|
||||
|
||||
optional Peer peer = 2;
|
||||
optional Limit limit = 3;
|
||||
|
||||
optional Status status = 4;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
required bytes id = 1;
|
||||
repeated bytes addrs = 2;
|
||||
}
|
||||
|
||||
message Reservation {
|
||||
required uint64 expire = 1; // Unix expiration time (UTC)
|
||||
repeated bytes addrs = 2; // relay addrs for reserving peer
|
||||
optional bytes voucher = 3; // reservation voucher
|
||||
}
|
||||
|
||||
message Limit {
|
||||
optional uint32 duration = 1; // seconds
|
||||
optional uint64 data = 2; // bytes
|
||||
}
|
||||
|
||||
enum Status {
|
||||
OK = 100;
|
||||
RESERVATION_REFUSED = 200;
|
||||
RESOURCE_LIMIT_EXCEEDED = 201;
|
||||
PERMISSION_DENIED = 202;
|
||||
CONNECTION_FAILED = 203;
|
||||
NO_RESERVATION = 204;
|
||||
MALFORMED_MESSAGE = 400;
|
||||
UNEXPECTED_MESSAGE = 401;
|
||||
}
|
||||
|
||||
message ReservationVoucher {
|
||||
required bytes relay = 1;
|
||||
required bytes peer = 2;
|
||||
required uint64 expiration = 3;
|
||||
}
|
33
src/circuit/v2/reservation-store.ts
Normal file
33
src/circuit/v2/reservation-store.ts
Normal file
@@ -0,0 +1,33 @@
|
||||
import { Status } from './pb/index.js'
|
||||
import type { ReservationStore as IReservationStore, ReservationStatus } from './interfaces.js'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
|
||||
interface Reservation {
|
||||
addr: Multiaddr
|
||||
expire: Date
|
||||
}
|
||||
|
||||
export class ReservationStore implements IReservationStore {
|
||||
private readonly reservations = new Map<string, Reservation>()
|
||||
|
||||
constructor (private readonly limit = 15) {
|
||||
}
|
||||
|
||||
async reserve (peer: PeerId, addr: Multiaddr): Promise<{status: ReservationStatus, expire?: number}> {
|
||||
if (this.reservations.size >= this.limit && !this.reservations.has(peer.toString())) {
|
||||
return { status: Status.RESERVATION_REFUSED, expire: undefined }
|
||||
}
|
||||
const expire = new Date()
|
||||
this.reservations.set(peer.toString(), { addr, expire })
|
||||
return { status: Status.OK, expire: expire.getTime() }
|
||||
}
|
||||
|
||||
async removeReservation (peer: PeerId) {
|
||||
this.reservations.delete(peer.toString())
|
||||
}
|
||||
|
||||
async hasReservation (dst: PeerId) {
|
||||
return this.reservations.has(dst.toString())
|
||||
}
|
||||
}
|
51
src/circuit/v2/reservation-voucher.ts
Normal file
51
src/circuit/v2/reservation-voucher.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Record } from '@libp2p/interfaces/record'
|
||||
import { ReservationVoucher } from './pb/index.js'
|
||||
|
||||
export interface ReservationVoucherOptions {
|
||||
relay: PeerId
|
||||
peer: PeerId
|
||||
expiration: number
|
||||
}
|
||||
|
||||
export class ReservationVoucherRecord implements Record {
|
||||
public readonly domain = 'libp2p-relay-rsvp'
|
||||
public readonly codec = new Uint8Array([0x03, 0x02])
|
||||
|
||||
private readonly relay: PeerId
|
||||
private readonly peer: PeerId
|
||||
private readonly expiration: number
|
||||
|
||||
constructor ({ relay, peer, expiration }: ReservationVoucherOptions) {
|
||||
this.relay = relay
|
||||
this.peer = peer
|
||||
this.expiration = expiration
|
||||
}
|
||||
|
||||
marshal () {
|
||||
return ReservationVoucher.encode({
|
||||
relay: this.relay.toBytes(),
|
||||
peer: this.peer.toBytes(),
|
||||
expiration: this.expiration
|
||||
}).finish()
|
||||
}
|
||||
|
||||
equals (other: Record) {
|
||||
if (!(other instanceof ReservationVoucherRecord)) {
|
||||
return false
|
||||
}
|
||||
if (!this.peer.equals(other.peer)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (!this.relay.equals(other.relay)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (this.expiration !== other.expiration) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
79
src/circuit/v2/stop.ts
Normal file
79
src/circuit/v2/stop.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
|
||||
import { IStopMessage, Status, StopMessage } from './pb/index.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { StreamHandlerV2 } from './stream-handler.js'
|
||||
import { protocolIDv2Stop } from '../multicodec.js'
|
||||
import { validateStopConnectRequest } from './validation.js'
|
||||
|
||||
const log = logger('libp2p:circuitv2:stop')
|
||||
|
||||
export interface HandleStopOptions {
|
||||
connection: Connection
|
||||
request: IStopMessage
|
||||
streamHandler: StreamHandlerV2
|
||||
}
|
||||
|
||||
export async function handleStop ({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
}: HandleStopOptions) {
|
||||
log('new circuit relay v2 stop stream from %s', connection.remotePeer)
|
||||
// Validate the STOP request has the required input
|
||||
try {
|
||||
validateStopConnectRequest(request, streamHandler)
|
||||
} catch (/** @type {any} */ err) {
|
||||
return log.error('invalid stop connect request via peer %s', connection.remotePeer, err)
|
||||
}
|
||||
log('stop request is valid')
|
||||
|
||||
// TODO: go-libp2p marks connection transient if there is limit field present in request.
|
||||
// Cannot find any reference to transient connections in js-libp2p
|
||||
|
||||
streamHandler.write(StopMessage.encode(
|
||||
{
|
||||
type: StopMessage.Type.STATUS,
|
||||
status: Status.OK
|
||||
}
|
||||
).finish())
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
export interface StopOptions {
|
||||
connection: Connection
|
||||
request: IStopMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a STOP request
|
||||
*
|
||||
*/
|
||||
export async function stop ({
|
||||
connection,
|
||||
request
|
||||
}: StopOptions) {
|
||||
const { stream } = await connection.newStream([protocolIDv2Stop])
|
||||
log('starting circuit relay v2 stop request to %s', connection.remotePeer)
|
||||
const streamHandler = new StreamHandlerV2({ stream })
|
||||
streamHandler.write(StopMessage.encode(request).finish())
|
||||
let response
|
||||
try {
|
||||
response = StopMessage.decode(await streamHandler.read())
|
||||
} catch (/** @type {any} */ err) {
|
||||
log.error('error parsing stop message response from %s', connection.remotePeer)
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
return streamHandler.close()
|
||||
}
|
||||
|
||||
if (response.status === Status.OK) {
|
||||
log('stop request to %s was successful', connection.remotePeer)
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
log('stop request failed with code %d', response.status)
|
||||
streamHandler.close()
|
||||
}
|
81
src/circuit/v2/stream-handler.ts
Normal file
81
src/circuit/v2/stream-handler.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { Handshake, handshake } from 'it-handshake'
|
||||
import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import type { Source } from 'it-stream-types'
|
||||
|
||||
const log = logger('libp2p:circuitv2:stream-handler')
|
||||
|
||||
export interface StreamHandlerOptions {
|
||||
/**
|
||||
* A duplex iterable
|
||||
*/
|
||||
stream: Stream
|
||||
|
||||
/**
|
||||
* max bytes length of message
|
||||
*/
|
||||
maxLength?: number
|
||||
}
|
||||
|
||||
export class StreamHandlerV2 {
|
||||
private readonly stream: Stream
|
||||
private readonly shake: Handshake
|
||||
private readonly decoder: Source<Uint8Array>
|
||||
|
||||
constructor (options: StreamHandlerOptions) {
|
||||
const { stream, maxLength = 4096 } = options
|
||||
|
||||
this.stream = stream
|
||||
this.shake = handshake(this.stream)
|
||||
this.decoder = lp.decode.fromReader(this.shake.reader, { maxDataLength: maxLength })
|
||||
}
|
||||
|
||||
/**
|
||||
* Read and decode message
|
||||
*
|
||||
* @async
|
||||
*/
|
||||
async read () {
|
||||
// @ts-expect-error FIXME is a source, needs to be a generator
|
||||
const msg = await this.decoder.next()
|
||||
if (msg.value !== null) {
|
||||
return msg.value.slice()
|
||||
}
|
||||
|
||||
log('read received no value, closing stream')
|
||||
// End the stream, we didn't get data
|
||||
this.close()
|
||||
}
|
||||
|
||||
write (msg: Uint8Array) {
|
||||
this.shake.write(lp.encode.single(msg).slice())
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the handshake rest stream and invalidate handler
|
||||
*/
|
||||
rest () {
|
||||
this.shake.rest()
|
||||
return this.shake.stream
|
||||
}
|
||||
|
||||
/**
|
||||
* @param msg - An encoded Uint8Array protobuf message
|
||||
*/
|
||||
end (msg: Uint8Array) {
|
||||
this.write(msg)
|
||||
this.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the stream
|
||||
*
|
||||
*/
|
||||
close () {
|
||||
log('closing the stream')
|
||||
void this.rest().sink([]).catch(err => {
|
||||
log.error(err)
|
||||
})
|
||||
}
|
||||
}
|
67
src/circuit/v2/validation.ts
Normal file
67
src/circuit/v2/validation.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { Status, StopMessage, IHopMessage, IStopMessage, HopMessage } from './pb/index.js'
|
||||
import type { StreamHandlerV2 } from './stream-handler.js'
|
||||
|
||||
export function validateStopConnectRequest (request: IStopMessage, streamHandler: StreamHandlerV2) {
|
||||
if (request.type !== StopMessage.Type.CONNECT) {
|
||||
writeStopMessageResponse(streamHandler, Status.UNEXPECTED_MESSAGE)
|
||||
throw new Error('Received unexpected stop status msg')
|
||||
}
|
||||
try {
|
||||
if (request.peer?.addrs !== null && request.peer?.addrs !== undefined) {
|
||||
request.peer.addrs.forEach((addr) => {
|
||||
return new Multiaddr(addr)
|
||||
})
|
||||
} else {
|
||||
throw new Error('Missing peer info in stop request')
|
||||
}
|
||||
} catch (/** @type {any} */ err) {
|
||||
writeStopMessageResponse(streamHandler, Status.MALFORMED_MESSAGE)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
export function validateHopConnectRequest (request: IHopMessage, streamHandler: StreamHandlerV2) {
|
||||
// TODO: check if relay connection
|
||||
|
||||
try {
|
||||
if (request.peer?.addrs !== null && request.peer?.addrs !== undefined) {
|
||||
request.peer.addrs.forEach((addr) => {
|
||||
return new Multiaddr(addr)
|
||||
})
|
||||
} else {
|
||||
throw new Error('Missing peer info in hop connect request')
|
||||
}
|
||||
} catch (/** @type {any} */ err) {
|
||||
writeHopMessageResponse(streamHandler, Status.MALFORMED_MESSAGE)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a response
|
||||
*
|
||||
*/
|
||||
function writeStopMessageResponse (streamHandler: StreamHandlerV2, status: Status) {
|
||||
streamHandler.write(StopMessage.encode(
|
||||
{
|
||||
type: StopMessage.Type.STATUS,
|
||||
status: status
|
||||
}
|
||||
).finish())
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a response
|
||||
*
|
||||
* @param {StreamHandler} streamHandler
|
||||
* @param {import('./pb').Status} status
|
||||
*/
|
||||
function writeHopMessageResponse (streamHandler: StreamHandlerV2, status: Status) {
|
||||
streamHandler.write(HopMessage.encode(
|
||||
{
|
||||
type: HopMessage.Type.STATUS,
|
||||
status: status
|
||||
}
|
||||
).finish())
|
||||
}
|
@@ -66,6 +66,7 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
},
|
||||
relay: {
|
||||
enabled: true,
|
||||
limit: 15,
|
||||
advertise: {
|
||||
bootDelay: RelayConstants.ADVERTISE_BOOT_DELAY,
|
||||
enabled: false,
|
||||
|
@@ -51,6 +51,7 @@ export interface RelayConfig {
|
||||
enabled: boolean
|
||||
advertise: RelayAdvertiseConfig
|
||||
hop: HopConfig
|
||||
limit: number
|
||||
autoRelay: AutoRelayConfig
|
||||
}
|
||||
|
||||
|
@@ -207,7 +207,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
})))
|
||||
|
||||
if (init.relay.enabled) {
|
||||
this.components.getTransportManager().add(this.configureComponent(new Circuit()))
|
||||
this.components.getTransportManager().add(this.configureComponent(new Circuit(init.relay)))
|
||||
|
||||
this.configureComponent(new Relay(this.components, {
|
||||
addressSorter: init.dialer.addressSorter,
|
||||
|
278
test/circuit/v2/hop.spec.ts
Normal file
278
test/circuit/v2/hop.spec.ts
Normal file
@@ -0,0 +1,278 @@
|
||||
import { protocolIDv2Hop } from './../../../src/circuit/multicodec.js'
|
||||
import { mockDuplex, mockConnection, mockMultiaddrConnection, mockStream } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { expect } from 'aegir/utils/chai.js'
|
||||
import * as peerUtils from '../../utils/creators/peer.js'
|
||||
import { handleHopProtocol } from '../../../src/circuit/v2/hop.js'
|
||||
import { StreamHandlerV2 } from '../../../src/circuit/v2/stream-handler.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { Status, HopMessage } from '../../../src/circuit/v2/pb/index.js'
|
||||
import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js'
|
||||
import sinon from 'sinon'
|
||||
import { Circuit } from '../../../src/circuit/transport.js'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { pair } from 'it-pair'
|
||||
|
||||
/* eslint-env mocha */
|
||||
|
||||
describe('Circuit v2 - hop protocol', function () {
|
||||
it('error on unknow message type', async function () {
|
||||
const streamHandler = new StreamHandlerV2({ stream: mockStream(pair<Uint8Array>()) })
|
||||
await handleHopProtocol({
|
||||
connection: mockConnection(mockMultiaddrConnection(mockDuplex(), await peerUtils.createPeerId())),
|
||||
streamHandler,
|
||||
request: {
|
||||
// @ts-expect-error
|
||||
type: 'not_existing'
|
||||
}
|
||||
})
|
||||
const msg = HopMessage.decode(await streamHandler.read())
|
||||
expect(msg.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(msg.status).to.be.equal(Status.MALFORMED_MESSAGE)
|
||||
})
|
||||
|
||||
describe('reserve', function () {
|
||||
let relayPeer: PeerId, conn: Connection, streamHandler: StreamHandlerV2, reservationStore: ReservationStore
|
||||
|
||||
beforeEach(async () => {
|
||||
[, relayPeer] = await peerUtils.createPeerIds(2)
|
||||
conn = await mockConnection(mockMultiaddrConnection(mockDuplex(), relayPeer))
|
||||
streamHandler = new StreamHandlerV2({ stream: mockStream(pair<Uint8Array>()) })
|
||||
reservationStore = new ReservationStore()
|
||||
})
|
||||
|
||||
this.afterEach(async function () {
|
||||
streamHandler.close()
|
||||
await conn.close()
|
||||
})
|
||||
|
||||
it('should reserve slot', async function () {
|
||||
const expire: number = 123
|
||||
const reserveStub = sinon.stub(reservationStore, 'reserve')
|
||||
reserveStub.resolves({ status: Status.OK, expire })
|
||||
await handleHopProtocol({
|
||||
request: {
|
||||
type: HopMessage.Type.RESERVE
|
||||
},
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
relayPeer,
|
||||
circuit: sinon.stub() as any,
|
||||
relayAddrs: [new Multiaddr('/ip4/127.0.0.1/udp/1234')],
|
||||
reservationStore
|
||||
})
|
||||
expect(reserveStub.calledOnceWith(conn.remotePeer, conn.remoteAddr)).to.be.true()
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.limit).to.be.null()
|
||||
expect(response.status).to.be.equal(Status.OK)
|
||||
expect(response.reservation?.expire).to.be.equal(expire)
|
||||
expect(response.reservation?.voucher).to.not.be.null()
|
||||
expect(response.reservation?.addrs?.length).to.be.greaterThan(0)
|
||||
})
|
||||
|
||||
it('should fail to reserve slot - acl denied', async function () {
|
||||
const reserveStub = sinon.stub(reservationStore, 'reserve')
|
||||
await handleHopProtocol({
|
||||
request: {
|
||||
type: HopMessage.Type.RESERVE
|
||||
},
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
relayPeer,
|
||||
circuit: sinon.stub() as any,
|
||||
relayAddrs: [new Multiaddr('/ip4/127.0.0.1/udp/1234')],
|
||||
reservationStore,
|
||||
acl: { allowReserve: async function () { return false }, allowConnect: sinon.stub() as any }
|
||||
})
|
||||
expect(reserveStub.notCalled).to.be.true()
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.limit).to.be.null()
|
||||
expect(response.status).to.be.equal(Status.PERMISSION_DENIED)
|
||||
})
|
||||
|
||||
it('should fail to reserve slot - resource exceeded', async function () {
|
||||
const reserveStub = sinon.stub(reservationStore, 'reserve')
|
||||
reserveStub.resolves({ status: Status.RESERVATION_REFUSED })
|
||||
await handleHopProtocol({
|
||||
request: {
|
||||
type: HopMessage.Type.RESERVE
|
||||
},
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
relayPeer,
|
||||
circuit: sinon.stub() as any,
|
||||
relayAddrs: [new Multiaddr('/ip4/127.0.0.1/udp/1234')],
|
||||
reservationStore
|
||||
})
|
||||
expect(reserveStub.calledOnce).to.be.true()
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.limit).to.be.null()
|
||||
expect(response.status).to.be.equal(Status.RESERVATION_REFUSED)
|
||||
})
|
||||
|
||||
it('should fail to reserve slot - failed to write response', async function () {
|
||||
const reserveStub = sinon.stub(reservationStore, 'reserve')
|
||||
const removeReservationStub = sinon.stub(reservationStore, 'removeReservation')
|
||||
reserveStub.resolves({ status: Status.OK, expire: 123 })
|
||||
removeReservationStub.resolves()
|
||||
const backup = streamHandler.write
|
||||
streamHandler.write = function () { throw new Error('connection reset') }
|
||||
await handleHopProtocol({
|
||||
request: {
|
||||
type: HopMessage.Type.RESERVE
|
||||
},
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
relayPeer,
|
||||
circuit: sinon.stub() as any,
|
||||
relayAddrs: [new Multiaddr('/ip4/127.0.0.1/udp/1234')],
|
||||
reservationStore
|
||||
})
|
||||
expect(reserveStub.calledOnce).to.be.true()
|
||||
expect(removeReservationStub.calledOnce).to.be.true()
|
||||
streamHandler.write = backup
|
||||
})
|
||||
})
|
||||
|
||||
describe('connect', function () {
|
||||
let relayPeer: PeerId, dstPeer: PeerId, conn: Connection, streamHandler: StreamHandlerV2, reservationStore: ReservationStore,
|
||||
circuit: Circuit
|
||||
|
||||
beforeEach(async () => {
|
||||
[, relayPeer, dstPeer] = await peerUtils.createPeerIds(3)
|
||||
conn = await mockConnection(mockMultiaddrConnection(mockDuplex(), relayPeer))
|
||||
streamHandler = new StreamHandlerV2({ stream: mockStream(pair<Uint8Array>()) })
|
||||
reservationStore = new ReservationStore()
|
||||
circuit = new Circuit({})
|
||||
})
|
||||
|
||||
this.afterEach(async function () {
|
||||
streamHandler.close()
|
||||
await conn.close()
|
||||
})
|
||||
|
||||
it('should succeed to connect', async function () {
|
||||
const hasReservationStub = sinon.stub(reservationStore, 'hasReservation')
|
||||
hasReservationStub.resolves(true)
|
||||
const dstConn = await mockConnection(
|
||||
mockMultiaddrConnection(pair<Uint8Array>(), dstPeer)
|
||||
)
|
||||
const streamStub = sinon.stub(dstConn, 'newStream')
|
||||
streamStub.resolves({ protocol: protocolIDv2Hop, stream: mockStream(pair<Uint8Array>()) })
|
||||
const stub = sinon.stub(circuit, 'getPeerConnection')
|
||||
stub.returns(dstConn)
|
||||
await handleHopProtocol({
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
request: {
|
||||
type: HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: dstPeer.toBytes(),
|
||||
addrs: []
|
||||
}
|
||||
},
|
||||
relayPeer: relayPeer,
|
||||
relayAddrs: [],
|
||||
reservationStore,
|
||||
circuit
|
||||
})
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.status).to.be.equal(Status.OK)
|
||||
})
|
||||
|
||||
it('should fail to connect - invalid request', async function () {
|
||||
await handleHopProtocol({
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
request: {
|
||||
type: HopMessage.Type.CONNECT,
|
||||
// @ts-expect-error
|
||||
peer: {
|
||||
}
|
||||
},
|
||||
reservationStore,
|
||||
circuit
|
||||
})
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.status).to.be.equal(Status.MALFORMED_MESSAGE)
|
||||
})
|
||||
|
||||
it('should failed to connect - acl denied', async function () {
|
||||
const acl = {
|
||||
allowConnect: function () { return Status.PERMISSION_DENIED }
|
||||
}
|
||||
await handleHopProtocol({
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
request: {
|
||||
type: HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: dstPeer.toBytes(),
|
||||
addrs: []
|
||||
}
|
||||
},
|
||||
reservationStore,
|
||||
circuit,
|
||||
// @ts-expect-error
|
||||
acl
|
||||
})
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.status).to.be.equal(Status.PERMISSION_DENIED)
|
||||
})
|
||||
|
||||
it('should fail to connect - no reservation', async function () {
|
||||
const hasReservationStub = sinon.stub(reservationStore, 'hasReservation')
|
||||
hasReservationStub.resolves(false)
|
||||
await handleHopProtocol({
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
request: {
|
||||
type: HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: dstPeer.toBytes(),
|
||||
addrs: []
|
||||
}
|
||||
},
|
||||
relayPeer: relayPeer,
|
||||
relayAddrs: [],
|
||||
reservationStore,
|
||||
circuit
|
||||
})
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.status).to.be.equal(Status.NO_RESERVATION)
|
||||
})
|
||||
|
||||
it('should fail to connect - no connection', async function () {
|
||||
const hasReservationStub = sinon.stub(reservationStore, 'hasReservation')
|
||||
hasReservationStub.resolves(true)
|
||||
const stub = sinon.stub(circuit, 'getPeerConnection')
|
||||
stub.returns(undefined)
|
||||
await handleHopProtocol({
|
||||
connection: conn,
|
||||
streamHandler,
|
||||
request: {
|
||||
type: HopMessage.Type.CONNECT,
|
||||
peer: {
|
||||
id: dstPeer.toBytes(),
|
||||
addrs: []
|
||||
}
|
||||
},
|
||||
relayPeer: relayPeer,
|
||||
relayAddrs: [],
|
||||
reservationStore,
|
||||
circuit
|
||||
})
|
||||
const response = HopMessage.decode(await streamHandler.read())
|
||||
expect(response.type).to.be.equal(HopMessage.Type.STATUS)
|
||||
expect(response.status).to.be.equal(Status.NO_RESERVATION)
|
||||
expect(stub.calledOnce).to.be.true()
|
||||
})
|
||||
})
|
||||
})
|
45
test/circuit/v2/reservation-store.spec.ts
Normal file
45
test/circuit/v2/reservation-store.spec.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { expect } from 'aegir/utils/chai.js'
|
||||
import { Status } from '../../../src/circuit/v2/pb/index.js'
|
||||
import { ReservationStore } from '../../../src/circuit/v2/reservation-store.js'
|
||||
import { createPeerId } from '../../utils/creators/peer.js'
|
||||
|
||||
/* eslint-env mocha */
|
||||
|
||||
describe('Circuit v2 - reservation store', function () {
|
||||
it('should add reservation', async function () {
|
||||
const store = new ReservationStore(2)
|
||||
const peer = await createPeerId()
|
||||
const result = await store.reserve(peer, new Multiaddr())
|
||||
expect(result.status).to.equal(Status.OK)
|
||||
expect(result.expire).to.not.be.undefined()
|
||||
expect(await store.hasReservation(peer)).to.be.true()
|
||||
})
|
||||
it('should add reservation if peer already has reservation', async function () {
|
||||
const store = new ReservationStore(1)
|
||||
const peer = await createPeerId()
|
||||
await store.reserve(peer, new Multiaddr())
|
||||
const result = await store.reserve(peer, new Multiaddr())
|
||||
expect(result.status).to.equal(Status.OK)
|
||||
expect(result.expire).to.not.be.undefined()
|
||||
expect(await store.hasReservation(peer)).to.be.true()
|
||||
})
|
||||
|
||||
it('should fail to add reservation on exceeding limit', async function () {
|
||||
const store = new ReservationStore(0)
|
||||
const peer = await createPeerId()
|
||||
const result = await store.reserve(peer, new Multiaddr())
|
||||
expect(result.status).to.equal(Status.RESERVATION_REFUSED)
|
||||
})
|
||||
|
||||
it('should remove reservation', async function () {
|
||||
const store = new ReservationStore(10)
|
||||
const peer = await createPeerId()
|
||||
const result = await store.reserve(peer, new Multiaddr())
|
||||
expect(result.status).to.equal(Status.OK)
|
||||
expect(await store.hasReservation(peer)).to.be.true()
|
||||
await store.removeReservation(peer)
|
||||
expect(await store.hasReservation(peer)).to.be.false()
|
||||
await store.removeReservation(peer)
|
||||
})
|
||||
})
|
69
test/circuit/v2/stop.spec.ts
Normal file
69
test/circuit/v2/stop.spec.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
import { protocolIDv2Stop } from './../../../src/circuit/multicodec.js'
|
||||
import { pair } from 'it-pair'
|
||||
import { StreamHandlerV2 } from './../../../src/circuit/v2/stream-handler.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { createPeerIds } from '../../utils/creators/peer.js'
|
||||
import { mockConnection, mockMultiaddrConnection, mockStream } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { handleStop, stop } from '../../../src/circuit/v2/stop.js'
|
||||
import { Status, StopMessage } from '../../../src/circuit/v2/pb/index.js'
|
||||
import { expect } from 'aegir/utils/chai.js'
|
||||
import sinon from 'sinon'
|
||||
|
||||
/* eslint-env mocha */
|
||||
|
||||
describe('Circuit v2 - stop protocol', function () {
|
||||
let srcPeer: PeerId, relayPeer: PeerId, conn: Connection, streamHandler: StreamHandlerV2
|
||||
|
||||
beforeEach(async () => {
|
||||
[srcPeer, relayPeer] = await createPeerIds(2)
|
||||
conn = await mockConnection(mockMultiaddrConnection(pair<Uint8Array>(), relayPeer))
|
||||
streamHandler = new StreamHandlerV2({ stream: mockStream(pair<Uint8Array>()) })
|
||||
})
|
||||
|
||||
this.afterEach(async function () {
|
||||
streamHandler.close()
|
||||
await conn.close()
|
||||
})
|
||||
|
||||
it('handle stop - success', async function () {
|
||||
await handleStop({ connection: conn, request: { type: StopMessage.Type.CONNECT, peer: { id: srcPeer.toBytes(), addrs: [] } }, streamHandler })
|
||||
const response = StopMessage.decode(await streamHandler.read())
|
||||
expect(response.status).to.be.equal(Status.OK)
|
||||
})
|
||||
|
||||
it('handle stop error - invalid request - wrong type', async function () {
|
||||
await handleStop({ connection: conn, request: { type: StopMessage.Type.STATUS, peer: { id: srcPeer.toBytes(), addrs: [] } }, streamHandler })
|
||||
const response = StopMessage.decode(await streamHandler.read())
|
||||
expect(response.status).to.be.equal(Status.UNEXPECTED_MESSAGE)
|
||||
})
|
||||
|
||||
it('handle stop error - invalid request - missing peer', async function () {
|
||||
await handleStop({ connection: conn, request: { type: StopMessage.Type.CONNECT }, streamHandler })
|
||||
const response = StopMessage.decode(await streamHandler.read())
|
||||
expect(response.status).to.be.equal(Status.MALFORMED_MESSAGE)
|
||||
})
|
||||
|
||||
it('handle stop error - invalid request - invalid peer addr', async function () {
|
||||
await handleStop({ connection: conn, request: { type: StopMessage.Type.CONNECT, peer: { id: srcPeer.toBytes(), addrs: [new Uint8Array(32)] } }, streamHandler })
|
||||
const response = StopMessage.decode(await streamHandler.read())
|
||||
expect(response.status).to.be.equal(Status.MALFORMED_MESSAGE)
|
||||
})
|
||||
|
||||
it('send stop - success', async function () {
|
||||
const streamStub = sinon.stub(conn, 'newStream')
|
||||
streamStub.resolves({ protocol: protocolIDv2Stop, stream: mockStream(pair<Uint8Array>()) })
|
||||
await stop({ connection: conn, request: { type: StopMessage.Type.CONNECT, peer: { id: srcPeer.toBytes(), addrs: [] } } })
|
||||
streamHandler.write(StopMessage.encode({
|
||||
type: StopMessage.Type.STATUS,
|
||||
status: Status.OK
|
||||
}).finish())
|
||||
})
|
||||
|
||||
it('send stop - should not fall apart with invalid status response', async function () {
|
||||
const streamStub = sinon.stub(conn, 'newStream')
|
||||
streamStub.resolves({ protocol: protocolIDv2Stop, stream: mockStream(pair<Uint8Array>()) })
|
||||
await stop({ connection: conn, request: { type: StopMessage.Type.CONNECT, peer: { id: srcPeer.toBytes(), addrs: [] } } })
|
||||
streamHandler.write(new Uint8Array(10))
|
||||
})
|
||||
})
|
@@ -7,7 +7,7 @@ import sinon from 'sinon'
|
||||
import nock from 'nock'
|
||||
import { create as createIpfsHttpClient } from 'ipfs-http-client'
|
||||
import { DelegatedContentRouting } from '@libp2p/delegated-content-routing'
|
||||
import { RELAY_CODEC } from '../../src/circuit/multicodec.js'
|
||||
import { protocolIDv2Hop } from '../../src/circuit/multicodec.js'
|
||||
import { createNode } from '../utils/creators/peer.js'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import type { Options as PWaitForOptions } from 'p-wait-for'
|
||||
@@ -31,14 +31,14 @@ async function usingAsRelay (node: Libp2pNode, relay: Libp2pNode, opts?: PWaitFo
|
||||
async function discoveredRelayConfig (node: Libp2pNode, relay: Libp2pNode) {
|
||||
await pWaitFor(async () => {
|
||||
const peerData = await node.peerStore.get(relay.peerId)
|
||||
const supportsRelay = peerData.protocols.includes(RELAY_CODEC)
|
||||
const supportsRelay = peerData.protocols.includes(protocolIDv2Hop)
|
||||
const supportsHop = peerData.metadata.has('hop_relay')
|
||||
|
||||
return supportsRelay && supportsHop
|
||||
})
|
||||
}
|
||||
|
||||
describe('auto-relay', () => {
|
||||
describe.skip('auto-relay', () => {
|
||||
describe('basics', () => {
|
||||
let libp2p: Libp2pNode
|
||||
let relayLibp2p: Libp2pNode
|
||||
@@ -76,7 +76,7 @@ describe('auto-relay', () => {
|
||||
|
||||
// Peer has relay multicodec
|
||||
const knownProtocols = await libp2p.peerStore.protoBook.get(relayLibp2p.peerId)
|
||||
expect(knownProtocols).to.include(RELAY_CODEC)
|
||||
expect(knownProtocols).to.include(protocolIDv2Hop)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -115,7 +115,7 @@ describe('auto-relay', () => {
|
||||
|
||||
// Peer has relay multicodec
|
||||
const knownProtocols = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
|
||||
expect(knownProtocols).to.include(RELAY_CODEC)
|
||||
expect(knownProtocols).to.include(protocolIDv2Hop)
|
||||
})
|
||||
|
||||
it('should be able to dial a peer from its relayed address previously added', async () => {
|
||||
@@ -144,7 +144,7 @@ describe('auto-relay', () => {
|
||||
|
||||
// Relay2 has relay multicodec
|
||||
const knownProtocols2 = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
|
||||
expect(knownProtocols2).to.include(RELAY_CODEC)
|
||||
expect(knownProtocols2).to.include(protocolIDv2Hop)
|
||||
|
||||
// Discover an extra relay and connect
|
||||
await relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.getMultiaddrs())
|
||||
@@ -158,7 +158,7 @@ describe('auto-relay', () => {
|
||||
|
||||
// Relay2 has relay multicodec
|
||||
const knownProtocols3 = await relayLibp2p1.peerStore.protoBook.get(relayLibp2p3.peerId)
|
||||
expect(knownProtocols3).to.include(RELAY_CODEC)
|
||||
expect(knownProtocols3).to.include(protocolIDv2Hop)
|
||||
})
|
||||
|
||||
it('should not listen on a relayed address we disconnect from peer', async () => {
|
||||
|
179
test/relay/relay.node.js
Normal file
179
test/relay/relay.node.js
Normal file
@@ -0,0 +1,179 @@
|
||||
// 'use strict'
|
||||
// /* eslint-env mocha */
|
||||
|
||||
// const { expect } = require('aegir/utils/chai')
|
||||
// const sinon = require('sinon')
|
||||
|
||||
// const { Multiaddr } = require('multiaddr')
|
||||
// const { collect } = require('streaming-iterables')
|
||||
// const pipe = require('it-pipe')
|
||||
// const AggregateError = require('aggregate-error')
|
||||
// const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string')
|
||||
|
||||
// const { createPeerId } = require('../utils/creators/peer')
|
||||
// const baseOptions = require('../utils/base-options')
|
||||
// const Libp2p = require('../../src')
|
||||
// const { codes: Errors } = require('../../src/errors')
|
||||
|
||||
// const listenAddr = '/ip4/0.0.0.0/tcp/0'
|
||||
|
||||
// describe('Dialing (via relay, TCP)', () => {
|
||||
// let srcLibp2p
|
||||
// let relayLibp2p
|
||||
// let dstLibp2p
|
||||
|
||||
// beforeEach(async () => {
|
||||
// const peerIds = await createPeerId({ number: 3 })
|
||||
// // Create 3 nodes, and turn HOP on for the relay
|
||||
// ;[srcLibp2p, relayLibp2p, dstLibp2p] = peerIds.map((peerId, index) => {
|
||||
// const opts = baseOptions
|
||||
// index === 1 && (opts.config.relay.hop.enabled = true)
|
||||
// return new Libp2p({
|
||||
// ...opts,
|
||||
// addresses: {
|
||||
// listen: [listenAddr]
|
||||
// },
|
||||
// peerId
|
||||
// })
|
||||
// })
|
||||
|
||||
// dstLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
|
||||
// })
|
||||
|
||||
// beforeEach(() => {
|
||||
// // Start each node
|
||||
// return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.start()))
|
||||
// })
|
||||
|
||||
// afterEach(async () => {
|
||||
// // Stop each node
|
||||
// return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop()))
|
||||
// })
|
||||
|
||||
// it('should be able to connect to a peer over a relay with active connections', async () => {
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p/${relayIdString}`)
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId}`)
|
||||
|
||||
// const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||
// sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
||||
// await relayLibp2p.transportManager._transports.get('Circuit')._reservationStore.reserve(dstLibp2p.peerId, new Multiaddr())
|
||||
// await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||
// expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
// const connection = await srcLibp2p.dial(dialAddr)
|
||||
// expect(connection).to.exist()
|
||||
// expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes())
|
||||
// expect(connection.localPeer.toBytes()).to.eql(srcLibp2p.peerId.toBytes())
|
||||
// expect(connection.remoteAddr).to.eql(dialAddr)
|
||||
// expect(connection.localAddr).to.eql(
|
||||
// relayAddr // the relay address
|
||||
// .encapsulate(`/p2p/${relayIdString}`) // with its peer id
|
||||
// .encapsulate('/p2p-circuit') // the local peer is connected over the relay
|
||||
// .encapsulate(`/p2p/${srcLibp2p.peerId.toB58String()}`) // and the local peer id
|
||||
// )
|
||||
|
||||
// const { stream: echoStream } = await connection.newStream('/echo/1.0.0')
|
||||
// const input = uint8ArrayFromString('hello')
|
||||
// const [output] = await pipe(
|
||||
// [input],
|
||||
// echoStream,
|
||||
// collect
|
||||
// )
|
||||
|
||||
// expect(output.slice()).to.eql(input)
|
||||
// })
|
||||
|
||||
// it('should fail to connect without reservation', async () => {
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p/${relayIdString}`)
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId}`)
|
||||
|
||||
// const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||
// sinon.stub(dstLibp2p.addressManager, 'listen').value([new Multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
||||
|
||||
// await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||
// expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
// await expect(srcLibp2p.dial(dialAddr))
|
||||
// .to.eventually.be.rejectedWith(AggregateError)
|
||||
// .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
// })
|
||||
|
||||
// it('should fail to connect to a peer over a relay with inactive connections', async () => {
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
// sinon.stub(relayLibp2p.connectionManager, 'get').withArgs(dstLibp2p.peerId).returns(null)
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p/${relayIdString}`)
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`)
|
||||
|
||||
// await expect(srcLibp2p.dial(dialAddr))
|
||||
// .to.eventually.be.rejectedWith(AggregateError)
|
||||
// .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
// })
|
||||
|
||||
// it('should not stay connected to a relay when not already connected and HOP fails', async () => {
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p/${relayIdString}`)
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`)
|
||||
|
||||
// await expect(srcLibp2p.dial(dialAddr))
|
||||
// .to.eventually.be.rejectedWith(AggregateError)
|
||||
// .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// // We should not be connected to the relay, because we weren't before the dial
|
||||
// const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId)
|
||||
// expect(srcToRelayConn).to.not.exist()
|
||||
// })
|
||||
|
||||
// it('dialer should stay connected to an already connected relay on hop failure', async () => {
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${relayIdString}`)
|
||||
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`)
|
||||
|
||||
// await srcLibp2p.dial(relayAddr)
|
||||
|
||||
// await expect(srcLibp2p.dial(dialAddr))
|
||||
// .to.eventually.be.rejectedWith(AggregateError)
|
||||
// .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId)
|
||||
// expect(srcToRelayConn).to.exist()
|
||||
// expect(srcToRelayConn.stat.status).to.equal('open')
|
||||
// })
|
||||
|
||||
// it('destination peer should stay connected to an already connected relay on hop failure', async () => {
|
||||
// const relayIdString = relayLibp2p.peerId.toB58String()
|
||||
// const relayAddr = relayLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${relayIdString}`)
|
||||
|
||||
// const dialAddr = relayAddr
|
||||
// .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerId.toB58String()}`)
|
||||
|
||||
// // Connect the destination peer and the relay
|
||||
// const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||
// sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([new Multiaddr(`${relayAddr}/p2p-circuit`)])
|
||||
|
||||
// await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||
// expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
|
||||
// sinon.stub(relayLibp2p.connectionManager, 'get').withArgs(dstLibp2p.peerId).returns(null)
|
||||
|
||||
// await expect(srcLibp2p.dial(dialAddr))
|
||||
// .to.eventually.be.rejectedWith(AggregateError)
|
||||
// .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// const dstToRelayConn = dstLibp2p.connectionManager.get(relayLibp2p.peerId)
|
||||
// expect(dstToRelayConn).to.exist()
|
||||
// expect(dstToRelayConn.stat.status).to.equal('open')
|
||||
// })
|
||||
// })
|
@@ -1,5 +1,4 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { StreamHandlerV1 } from './../../src/circuit/v1/stream-handler.js'
|
||||
import { expect } from 'aegir/utils/chai.js'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
@@ -9,10 +8,11 @@ import { createNode } from '../utils/creators/peer.js'
|
||||
import { codes as Errors } from '../../src/errors.js'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import all from 'it-all'
|
||||
import { RELAY_CODEC } from '../../src/circuit/multicodec.js'
|
||||
import { StreamHandler } from '../../src/circuit/circuit/stream-handler.js'
|
||||
import { CircuitRelay } from '../../src/circuit/pb/index.js'
|
||||
import { RELAY_V1_CODEC } from '../../src/circuit/multicodec.js'
|
||||
import { createNodeOptions, createRelayOptions } from './utils.js'
|
||||
import { CircuitRelay } from '../../src/circuit/v1/pb/index.js'
|
||||
|
||||
/* eslint-env mocha */
|
||||
|
||||
describe('Dialing (via relay, TCP)', () => {
|
||||
let srcLibp2p: Libp2pNode
|
||||
@@ -156,8 +156,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
|
||||
// send an invalid relay message from the relay to the destination peer
|
||||
const connections = relayLibp2p.getConnections(dstLibp2p.peerId)
|
||||
const { stream } = await connections[0].newStream(RELAY_CODEC)
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const { stream } = await connections[0].newStream(RELAY_V1_CODEC)
|
||||
const streamHandler = new StreamHandlerV1({ stream })
|
||||
streamHandler.write({
|
||||
type: CircuitRelay.Type.STATUS
|
||||
})
|
||||
|
@@ -5,6 +5,7 @@ import { createEd25519PeerId, createFromJSON, createRSAPeerId } from '@libp2p/pe
|
||||
import { createLibp2pNode, Libp2pNode } from '../../../src/libp2p.js'
|
||||
import type { AddressesConfig, Libp2pOptions } from '../../../src/index.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import pTimes from 'p-times'
|
||||
|
||||
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||
|
||||
@@ -71,10 +72,6 @@ export async function populateAddressBooks (peers: Libp2pNode[]) {
|
||||
}
|
||||
|
||||
export interface CreatePeerIdOptions {
|
||||
/**
|
||||
* number of peers (default: 1)
|
||||
*/
|
||||
number?: number
|
||||
|
||||
/**
|
||||
* fixture index for peer-id generation (default: 0)
|
||||
@@ -91,7 +88,7 @@ export interface CreatePeerIdOptions {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Peer-ids
|
||||
* Create Peer-id
|
||||
*/
|
||||
export async function createPeerId (options: CreatePeerIdOptions = {}): Promise<PeerId> {
|
||||
const opts = options.opts ?? {}
|
||||
@@ -102,3 +99,15 @@ export async function createPeerId (options: CreatePeerIdOptions = {}): Promise<
|
||||
|
||||
return await createFromJSON(Peers[options.fixture])
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Peer-ids
|
||||
*/
|
||||
export async function createPeerIds (count: number, options: Omit<CreatePeerIdOptions, 'fixture'> = {}): Promise<PeerId[]> {
|
||||
const opts = options.opts ?? {}
|
||||
|
||||
return await pTimes(count, async (i) => await createPeerId({
|
||||
...opts,
|
||||
fixture: i
|
||||
}))
|
||||
}
|
||||
|
@@ -11,7 +11,8 @@
|
||||
"test"
|
||||
],
|
||||
"exclude": [
|
||||
"src/circuit/pb/index.js",
|
||||
"src/circuit/v1/pb/index.js",
|
||||
"src/circuit/v2/pb/index.js",
|
||||
"src/fetch/pb/proto.js",
|
||||
"src/identify/pb/message.js",
|
||||
"src/insecure/pb/proto.js"
|
||||
|
Reference in New Issue
Block a user