mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
fix: reduce identify message size limit (#1230)
Adds a config option to specify a maximum message size we'll accept for an Identify message. The default is 8KB, the same as go-libp2p - previously we fell back to the default `maxMessageLength` option of `it-length-prefixed` which is 4MB. Also adds a default timeout for reading responses to identify requests which is used if an AbortSignal is not passed in. The default timeout also aligns with go-libp2p.
This commit is contained in:
parent
4c0c2c6d3e
commit
824720fb8f
@ -97,7 +97,7 @@ Creates an instance of Libp2p.
|
||||
| options.modules | [`Array<object>`](./CONFIGURATION.md#modules) | libp2p [modules](./CONFIGURATION.md#modules) to use |
|
||||
| [options.addresses] | `{ listen: Array<string>, announce: Array<string>, announceFilter: (ma: Array<multiaddr>) => Array<multiaddr> }` | Addresses for transport listening and to advertise to the network |
|
||||
| [options.config] | `object` | libp2p modules configuration and core configuration |
|
||||
| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number }` | libp2p identify protocol options |
|
||||
| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number, maxIdentifyMessageSize: number }` | libp2p identify protocol options |
|
||||
| [options.ping] | `{ protocolPrefix: string }` | libp2p ping protocol options |
|
||||
| [options.fetch] | `{ protocolPrefix: string }` | libp2p fetch protocol options |
|
||||
| [options.connectionManager] | [`object`](./CONFIGURATION.md#configuring-connection-manager) | libp2p Connection Manager [configuration](./CONFIGURATION.md#configuring-connection-manager) |
|
||||
|
@ -30,16 +30,36 @@ import type { Duplex } from 'it-stream-types'
|
||||
|
||||
const log = logger('libp2p:identify')
|
||||
|
||||
const IDENTIFY_TIMEOUT = 30000
|
||||
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48
|
||||
const IDENTIFY_TIMEOUT = 60000
|
||||
|
||||
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52
|
||||
const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8
|
||||
|
||||
export interface HostProperties {
|
||||
agentVersion: string
|
||||
}
|
||||
|
||||
export interface IdentifyServiceInit {
|
||||
/**
|
||||
* The prefix to use for the protocol (default: 'ipfs')
|
||||
*/
|
||||
protocolPrefix: string
|
||||
|
||||
/**
|
||||
* What details we should send as part of an identify message
|
||||
*/
|
||||
host: HostProperties
|
||||
|
||||
/**
|
||||
* How long we should wait for a remote peer to send their identify response
|
||||
*/
|
||||
timeout?: number
|
||||
|
||||
/**
|
||||
* Identify responses larger than this in bytes will be rejected (default: 8192)
|
||||
*/
|
||||
maxIdentifyMessageSize?: number
|
||||
}
|
||||
|
||||
export class IdentifyService implements Startable {
|
||||
@ -200,17 +220,25 @@ export class IdentifyService implements Startable {
|
||||
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
|
||||
const { stream } = await connection.newStream([this.identifyProtocolStr], options)
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
let timeoutController
|
||||
let signal = options.signal
|
||||
|
||||
// create a timeout if no abort signal passed
|
||||
if (signal == null) {
|
||||
timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
signal = timeoutController.signal
|
||||
}
|
||||
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
source = abortableDuplex(stream, signal)
|
||||
|
||||
try {
|
||||
const data = await pipe(
|
||||
[],
|
||||
source,
|
||||
lp.decode(),
|
||||
lp.decode({
|
||||
maxDataLength: this.init.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE
|
||||
}),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
|
||||
@ -224,6 +252,10 @@ export class IdentifyService implements Startable {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
} finally {
|
||||
if (timeoutController != null) {
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
@ -404,7 +436,9 @@ export class IdentifyService implements Startable {
|
||||
const data = await pipe(
|
||||
[],
|
||||
source,
|
||||
lp.decode(),
|
||||
lp.decode({
|
||||
maxDataLength: this.init.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE
|
||||
}),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
/* eslint-env mocha */
|
||||
/* eslint max-nested-callbacks: ["error", 6] */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
@ -26,6 +27,8 @@ import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
import delay from 'delay'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import pDefer from 'p-defer'
|
||||
|
||||
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
@ -270,4 +273,105 @@ describe('identify', () => {
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
|
||||
it('should limit incoming identify message sizes', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, {
|
||||
...defaultInit,
|
||||
maxIdentifyMessageSize: 100
|
||||
})
|
||||
await start(remoteIdentify)
|
||||
|
||||
const identifySpy = sinon.spy(remoteIdentify, 'identify')
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// handle incoming identify requests and send too much data
|
||||
await localComponents.getRegistrar().handle('/ipfs/id/1.0.0', ({ stream }) => {
|
||||
const data = new Uint8Array(1024)
|
||||
|
||||
void Promise.resolve().then(async () => {
|
||||
await pipe(
|
||||
[data],
|
||||
lp.encode(),
|
||||
stream,
|
||||
async (source) => await drain(source)
|
||||
)
|
||||
|
||||
deferred.resolve()
|
||||
})
|
||||
})
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
await deferred.promise
|
||||
await stop(remoteIdentify)
|
||||
|
||||
expect(identifySpy.called).to.be.true()
|
||||
|
||||
await expect(identifySpy.getCall(0).returnValue)
|
||||
.to.eventually.be.rejected.with.property('code', 'ERR_MSG_DATA_TOO_LONG')
|
||||
})
|
||||
|
||||
it('should time out incoming identify messages', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, {
|
||||
...defaultInit,
|
||||
timeout: 100
|
||||
})
|
||||
await start(remoteIdentify)
|
||||
|
||||
const identifySpy = sinon.spy(remoteIdentify, 'identify')
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// handle incoming identify requests and don't send anything
|
||||
await localComponents.getRegistrar().handle('/ipfs/id/1.0.0', ({ stream }) => {
|
||||
const data = new Uint8Array(1024)
|
||||
|
||||
void Promise.resolve().then(async () => {
|
||||
await pipe(
|
||||
[data],
|
||||
lp.encode(),
|
||||
async (source) => {
|
||||
await stream.sink(async function * () {
|
||||
for await (const buf of source) {
|
||||
// don't send all of the data, remote will expect another message
|
||||
yield buf.slice(0, buf.length - 100)
|
||||
|
||||
// wait for longer than the timeout without sending any more data or closing the stream
|
||||
await delay(500)
|
||||
}
|
||||
}())
|
||||
}
|
||||
)
|
||||
|
||||
deferred.resolve()
|
||||
})
|
||||
})
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
await deferred.promise
|
||||
await stop(remoteIdentify)
|
||||
|
||||
expect(identifySpy.called).to.be.true()
|
||||
|
||||
await expect(identifySpy.getCall(0).returnValue)
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
})
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user