mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-24 18:12:14 +00:00
fix: add timeout for circuit relay (#1294)
Make sure we don't potentially wait forever during incoming circuit relay handshakes. Adds a timeout option to the hop config to control how long we will wait.
This commit is contained in:
parent
0bb1b802c8
commit
ba56c64662
@ -12,6 +12,7 @@ import { peerIdFromBytes } from '@libp2p/peer-id'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { Circuit } from '../transport.js'
|
||||
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
|
||||
const log = logger('libp2p:circuit:hop')
|
||||
|
||||
@ -118,7 +119,7 @@ export async function handleHop (hopRequest: HopRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
export interface HopConfig {
|
||||
export interface HopConfig extends AbortOptions {
|
||||
connection: Connection
|
||||
request: CircuitPB
|
||||
}
|
||||
@ -130,11 +131,14 @@ export interface HopConfig {
|
||||
export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
|
||||
const {
|
||||
connection,
|
||||
request
|
||||
request,
|
||||
signal
|
||||
} = options
|
||||
|
||||
// Create a new stream to the relay
|
||||
const stream = await connection.newStream(RELAY_CODEC)
|
||||
const stream = await connection.newStream(RELAY_CODEC, {
|
||||
signal
|
||||
})
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
streamHandler.write(request)
|
||||
@ -156,7 +160,7 @@ export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
|
||||
throw errCode(new Error(`HOP request failed with code "${response.code ?? 'unknown'}"`), Errors.ERR_HOP_REQUEST_FAILED)
|
||||
}
|
||||
|
||||
export interface CanHopOptions {
|
||||
export interface CanHopOptions extends AbortOptions {
|
||||
connection: Connection
|
||||
}
|
||||
|
||||
@ -165,11 +169,14 @@ export interface CanHopOptions {
|
||||
*/
|
||||
export async function canHop (options: CanHopOptions) {
|
||||
const {
|
||||
connection
|
||||
connection,
|
||||
signal
|
||||
} = options
|
||||
|
||||
// Create a new stream to the relay
|
||||
const stream = await connection.newStream(RELAY_CODEC)
|
||||
const stream = await connection.newStream(RELAY_CODEC, {
|
||||
signal
|
||||
})
|
||||
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
|
@ -5,6 +5,7 @@ import { StreamHandler } from './stream-handler.js'
|
||||
import { validateAddrs } from './utils.js'
|
||||
import type { Connection } from '@libp2p/interface-connection'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
|
||||
const log = logger('libp2p:circuit:stop')
|
||||
|
||||
@ -42,7 +43,7 @@ export function handleStop (options: HandleStopOptions): Duplex<Uint8Array> | un
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
export interface StopOptions {
|
||||
export interface StopOptions extends AbortOptions {
|
||||
connection: Connection
|
||||
request: CircuitPB
|
||||
}
|
||||
@ -53,10 +54,13 @@ export interface StopOptions {
|
||||
export async function stop (options: StopOptions) {
|
||||
const {
|
||||
connection,
|
||||
request
|
||||
request,
|
||||
signal
|
||||
} = options
|
||||
|
||||
const stream = await connection.newStream([RELAY_CODEC])
|
||||
const stream = await connection.newStream(RELAY_CODEC, {
|
||||
signal
|
||||
})
|
||||
log('starting stop request to %p', connection.remotePeer)
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
|
||||
|
@ -13,6 +13,7 @@ import {
|
||||
import type { AddressSorter } from '@libp2p/interface-peer-store'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/components'
|
||||
import type { RelayConfig } from '../index.js'
|
||||
|
||||
const log = logger('libp2p:relay')
|
||||
|
||||
@ -22,11 +23,6 @@ export interface RelayAdvertiseConfig {
|
||||
ttl?: number
|
||||
}
|
||||
|
||||
export interface HopConfig {
|
||||
enabled?: boolean
|
||||
active?: boolean
|
||||
}
|
||||
|
||||
export interface AutoRelayConfig {
|
||||
enabled?: boolean
|
||||
|
||||
@ -36,13 +32,8 @@ export interface AutoRelayConfig {
|
||||
maxListeners: number
|
||||
}
|
||||
|
||||
export interface RelayInit {
|
||||
export interface RelayInit extends RelayConfig {
|
||||
addressSorter?: AddressSorter
|
||||
maxListeners?: number
|
||||
onError?: (error: Error, msg?: string) => void
|
||||
hop: HopConfig
|
||||
advertise: RelayAdvertiseConfig
|
||||
autoRelay: AutoRelayConfig
|
||||
}
|
||||
|
||||
export class Relay implements Startable {
|
||||
|
@ -17,12 +17,20 @@ import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { IncomingStreamData } from '@libp2p/interface-registrar'
|
||||
import type { Listener, Transport, CreateListenerOptions, ConnectionHandler } from '@libp2p/interface-transport'
|
||||
import type { Connection } from '@libp2p/interface-connection'
|
||||
import type { RelayConfig } from '../index.js'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
|
||||
const log = logger('libp2p:circuit')
|
||||
|
||||
export class Circuit implements Transport, Initializable {
|
||||
private handler?: ConnectionHandler
|
||||
private components: Components = new Components()
|
||||
private readonly _init: RelayConfig
|
||||
|
||||
constructor (init: RelayConfig) {
|
||||
this._init = init
|
||||
}
|
||||
|
||||
init (components: Components): void {
|
||||
this.components = components
|
||||
@ -54,49 +62,20 @@ export class Circuit implements Transport, Initializable {
|
||||
|
||||
async _onProtocol (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const request = await streamHandler.read()
|
||||
const controller = new TimeoutController(this._init.hop.timeout)
|
||||
|
||||
if (request == null) {
|
||||
log('request was invalid, could not read from stream')
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.MALFORMED_MESSAGE
|
||||
try {
|
||||
const source = abortableDuplex(stream, controller.signal)
|
||||
const streamHandler = new StreamHandler({
|
||||
stream: {
|
||||
...stream,
|
||||
...source
|
||||
}
|
||||
})
|
||||
streamHandler.close()
|
||||
return
|
||||
}
|
||||
const request = await streamHandler.read()
|
||||
|
||||
let virtualConnection
|
||||
|
||||
switch (request.type) {
|
||||
case CircuitPB.Type.CAN_HOP: {
|
||||
log('received CAN_HOP request from %p', connection.remotePeer)
|
||||
await handleCanHop({ circuit: this, connection, streamHandler })
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.HOP: {
|
||||
log('received HOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleHop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
circuit: this,
|
||||
connectionManager: this.components.getConnectionManager()
|
||||
})
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.STOP: {
|
||||
log('received STOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleStop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
})
|
||||
break
|
||||
}
|
||||
default: {
|
||||
log('Request of type %s not supported', request.type)
|
||||
if (request == null) {
|
||||
log('request was invalid, could not read from stream')
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.MALFORMED_MESSAGE
|
||||
@ -104,27 +83,68 @@ export class Circuit implements Transport, Initializable {
|
||||
streamHandler.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if (virtualConnection != null) {
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const remoteAddr = new Multiaddr(request.dstPeer.addrs[0])
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const localAddr = new Multiaddr(request.srcPeer.addrs[0])
|
||||
const maConn = streamToMaConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
|
||||
log('new %s connection %s', type, maConn.remoteAddr)
|
||||
let virtualConnection
|
||||
|
||||
const conn = await this.components.getUpgrader().upgradeInbound(maConn)
|
||||
log('%s connection %s upgraded', type, maConn.remoteAddr)
|
||||
|
||||
if (this.handler != null) {
|
||||
this.handler(conn)
|
||||
switch (request.type) {
|
||||
case CircuitPB.Type.CAN_HOP: {
|
||||
log('received CAN_HOP request from %p', connection.remotePeer)
|
||||
await handleCanHop({ circuit: this, connection, streamHandler })
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.HOP: {
|
||||
log('received HOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleHop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
circuit: this,
|
||||
connectionManager: this.components.getConnectionManager()
|
||||
})
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.STOP: {
|
||||
log('received STOP request from %p', connection.remotePeer)
|
||||
virtualConnection = await handleStop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
})
|
||||
break
|
||||
}
|
||||
default: {
|
||||
log('Request of type %s not supported', request.type)
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.MALFORMED_MESSAGE
|
||||
})
|
||||
streamHandler.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if (virtualConnection != null) {
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const remoteAddr = new Multiaddr(request.dstPeer.addrs[0])
|
||||
// @ts-expect-error dst peer will not be undefined
|
||||
const localAddr = new Multiaddr(request.srcPeer.addrs[0])
|
||||
const maConn = streamToMaConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
|
||||
log('new %s connection %s', type, maConn.remoteAddr)
|
||||
|
||||
const conn = await this.components.getUpgrader().upgradeInbound(maConn)
|
||||
log('%s connection %s upgraded', type, maConn.remoteAddr)
|
||||
|
||||
if (this.handler != null) {
|
||||
this.handler(conn)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
controller.clear()
|
||||
}
|
||||
}
|
||||
|
||||
@ -160,6 +180,7 @@ export class Circuit implements Transport, Initializable {
|
||||
|
||||
try {
|
||||
const virtualConnection = await hop({
|
||||
...options,
|
||||
connection: relayConnection,
|
||||
request: {
|
||||
type: CircuitPB.Type.HOP,
|
||||
|
@ -69,7 +69,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
},
|
||||
hop: {
|
||||
enabled: false,
|
||||
active: false
|
||||
active: false,
|
||||
timeout: 30000
|
||||
},
|
||||
autoRelay: {
|
||||
enabled: false,
|
||||
|
@ -93,6 +93,7 @@ export class FetchService implements Startable {
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
let timeoutController
|
||||
let signal = options.signal
|
||||
let stream: Stream | undefined
|
||||
|
||||
// create a timeout if no abort signal passed
|
||||
if (signal == null) {
|
||||
@ -100,14 +101,14 @@ export class FetchService implements Startable {
|
||||
signal = timeoutController.signal
|
||||
}
|
||||
|
||||
const stream = await connection.newStream([this.protocol], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
try {
|
||||
stream = await connection.newStream([this.protocol], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
const result = await pipe(
|
||||
[FetchRequest.encode({ identifier: key })],
|
||||
lp.encode(),
|
||||
@ -146,7 +147,9 @@ export class FetchService implements Startable {
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
stream.close()
|
||||
if (stream != null) {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -228,6 +228,7 @@ export class IdentifyService implements Startable {
|
||||
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
|
||||
let timeoutController
|
||||
let signal = options.signal
|
||||
let stream: Stream | undefined
|
||||
|
||||
// create a timeout if no abort signal passed
|
||||
if (signal == null) {
|
||||
@ -235,14 +236,14 @@ export class IdentifyService implements Startable {
|
||||
signal = timeoutController.signal
|
||||
}
|
||||
|
||||
const stream = await connection.newStream([this.identifyProtocolStr], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
try {
|
||||
stream = await connection.newStream([this.identifyProtocolStr], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
const data = await pipe(
|
||||
[],
|
||||
source,
|
||||
@ -266,7 +267,9 @@ export class IdentifyService implements Startable {
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
stream.close()
|
||||
if (stream != null) {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,7 @@ export interface MetricsConfig {
|
||||
export interface HopConfig {
|
||||
enabled?: boolean
|
||||
active?: boolean
|
||||
timeout: number
|
||||
}
|
||||
|
||||
export interface RelayConfig {
|
||||
|
@ -218,7 +218,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
})))
|
||||
|
||||
if (init.relay.enabled) {
|
||||
this.components.getTransportManager().add(this.configureComponent(new Circuit()))
|
||||
this.components.getTransportManager().add(this.configureComponent(new Circuit(init.relay)))
|
||||
|
||||
this.configureComponent(new Relay(this.components, {
|
||||
addressSorter: init.connectionManager.addressSorter,
|
||||
|
@ -13,6 +13,7 @@ import type { Components } from '@libp2p/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import type { Stream } from '@libp2p/interface-connection'
|
||||
|
||||
const log = logger('libp2p:ping')
|
||||
|
||||
@ -83,6 +84,7 @@ export class PingService implements Startable {
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
let timeoutController
|
||||
let signal = options.signal
|
||||
let stream: Stream | undefined
|
||||
|
||||
// create a timeout if no abort signal passed
|
||||
if (signal == null) {
|
||||
@ -90,14 +92,14 @@ export class PingService implements Startable {
|
||||
signal = timeoutController.signal
|
||||
}
|
||||
|
||||
const stream = await connection.newStream([this.protocol], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
try {
|
||||
stream = await connection.newStream([this.protocol], {
|
||||
signal
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source = abortableDuplex(stream, signal)
|
||||
|
||||
const result = await pipe(
|
||||
[data],
|
||||
source,
|
||||
@ -115,7 +117,9 @@ export class PingService implements Startable {
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
stream.close()
|
||||
if (stream != null) {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ import { RELAY_CODEC } from '../../src/circuit/multicodec.js'
|
||||
import { StreamHandler } from '../../src/circuit/circuit/stream-handler.js'
|
||||
import { CircuitRelay } from '../../src/circuit/pb/index.js'
|
||||
import { createNodeOptions, createRelayOptions } from './utils.js'
|
||||
import delay from 'delay'
|
||||
|
||||
describe('Dialing (via relay, TCP)', () => {
|
||||
let srcLibp2p: Libp2pNode
|
||||
@ -170,4 +171,37 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
expect(dstToRelayConn).to.have.lengthOf(1)
|
||||
expect(dstToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN')
|
||||
})
|
||||
|
||||
it('should time out when establishing a relay connection', async () => {
|
||||
await relayLibp2p.stop()
|
||||
relayLibp2p = await createNode({
|
||||
config: createRelayOptions({
|
||||
relay: {
|
||||
autoRelay: {
|
||||
enabled: false
|
||||
},
|
||||
hop: {
|
||||
// very short timeout
|
||||
timeout: 10
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
const relayAddr = relayLibp2p.components.getTransportManager().getAddrs()[0]
|
||||
const dialAddr = relayAddr.encapsulate(`/p2p/${relayLibp2p.peerId.toString()}`)
|
||||
|
||||
const connection = await srcLibp2p.dial(dialAddr)
|
||||
const stream = await connection.newStream('/libp2p/circuit/relay/0.1.0')
|
||||
|
||||
await stream.sink(async function * () {
|
||||
// delay for longer than the timeout
|
||||
await delay(1000)
|
||||
yield Uint8Array.from([0])
|
||||
}())
|
||||
|
||||
// because we timed out, the remote should have reset the stream
|
||||
await expect(all(stream.source)).to.eventually.be.rejected
|
||||
.with.property('code', 'ERR_MPLEX_STREAM_RESET')
|
||||
})
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user