feat!: limit protocol streams per-connection (#1255)

* feat: limit protocol streams per-connection

Uses the `maxInboundStreams` and `maxOutboundStreams` of the `registrar.handle`
opts to limit the number of concurrent streams open on each connection
on a per-protocol basis.

Both values default to 1 so some tuning will be necessary to set
appropriate values for some protocols.

* chore: make error codes consistent

* chore: fix up examples
This commit is contained in:
Alex Potsides 2022-06-17 14:46:31 +01:00 committed by GitHub
parent 5371729646
commit de30c2cec7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 472 additions and 181 deletions

2
.gitignore vendored
View File

@ -10,7 +10,7 @@ test/repo-tests*
logs
*.log
coverage
.coverage
.nyc_output
# Runtime data

View File

@ -389,7 +389,7 @@ await libp2p.hangUp(remotePeerId)
Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support.
`libp2p.handle(protocols, handler)`
`libp2p.handle(protocols, handler, options)`
In the event of a new handler for the same protocol being added, the first one is discarded.
@ -399,6 +399,7 @@ In the event of a new handler for the same protocol being added, the first one i
|------|------|-------------|
| protocols | `Array<string>|string` | protocols to register |
| handler | `function({ connection:*, stream:*, protocol:string })` | handler to call |
| options | `StreamHandlerOptions` | Options including protocol stream limits |
#### Example
@ -409,7 +410,10 @@ const handler = ({ connection, stream, protocol }) => {
// use stream or connection according to the needs
}
libp2p.handle('/echo/1.0.0', handler)
libp2p.handle('/echo/1.0.0', handler, {
maxInboundStreams: 5,
maxOutboundStreams: 5
})
```
### unhandle

View File

@ -32,7 +32,7 @@ async function run () {
// Dial to the remote peer (the "listener")
const listenerMa = new Multiaddr(`/ip4/127.0.0.1/tcp/10333/p2p/${idListener.toString()}`)
const { stream } = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0')
const stream = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0')
console.log('Dialer dialed to listener on protocol: /chat/1.0.0')
console.log('Type a message and see what happens')

View File

@ -40,7 +40,7 @@ const createNode = async () => {
)
})
const { stream } = await node1.dialProtocol(node2.peerId, '/a-protocol')
const stream = await node1.dialProtocol(node2.peerId, '/a-protocol')
await pipe(
[uint8ArrayFromString('This information is sent out encrypted to the other peer')],

View File

@ -8,10 +8,10 @@
"libp2p": "../../",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/kad-dht": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/webrtc-star": "^2.0.0",
"@libp2p/websockets": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/websockets": "^3.0.0",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-scripts": "5.0.0"

View File

@ -37,7 +37,7 @@ async function run() {
// Dial the listener node
console.log('Dialing to peer:', listenerMultiaddr)
const { stream } = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0')
const stream = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0')
console.log('nodeA dialed to nodeB on protocol: /echo/1.0.0')

View File

@ -11,9 +11,9 @@
"dependencies": {
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/webrtc-star": "^2.0.0",
"@libp2p/websockets": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/websockets": "^3.0.0",
"libp2p": "../../"
},
"devDependencies": {

View File

@ -10,7 +10,7 @@
"license": "MIT",
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^6.0.0",
"@libp2p/floodsub": "^2.0.0",
"@libp2p/floodsub": "^3.0.0",
"@nodeutils/defaults-deep": "^1.1.0",
"execa": "^2.1.0",
"fs-extra": "^8.1.0",

View File

@ -43,7 +43,7 @@ generateKey(otherSwarmKey)
)
})
const { stream } = await node1.dialProtocol(node2.peerId, '/private')
const stream = await node1.dialProtocol(node2.peerId, '/private')
await pipe(
[uint8ArrayFromString('This message is sent on a private network')],

View File

@ -60,14 +60,14 @@ const createNode = async () => {
})
*/
const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
await pipe(
[uint8ArrayFromString('my own protocol, wow!')],
stream
)
/*
const { stream } = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
const stream = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
await pipe(
['my own protocol, wow!'],

View File

@ -38,22 +38,25 @@ const createNode = async () => {
console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`)
}
}
)
).finally(() => {
// clean up resources
stream.close()
})
})
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/a'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/a'])
await pipe(
[uint8ArrayFromString('protocol (a)')],
stream1
)
const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream2 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
[uint8ArrayFromString('protocol (b)')],
stream2
)
const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream3 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
[uint8ArrayFromString('another stream on protocol (b)')],
stream3

View File

@ -54,13 +54,13 @@ const createNode = async () => {
)
})
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
[uint8ArrayFromString('from 1 to 2')],
stream1
)
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
[uint8ArrayFromString('from 2 to 1')],
stream2

View File

@ -40,7 +40,7 @@ node2.handle('/your-protocol', ({ stream }) => {
After the protocol is _handled_, now we can dial to it.
```JavaScript
const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol'])
await pipe(
['my own protocol, wow!'],
@ -62,7 +62,7 @@ node2.handle('/another-protocol/1.0.1', ({ stream }) => {
)
})
// ...
const { stream } = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
const stream = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0'])
await pipe(
['my own protocol, wow!'],
@ -133,19 +133,19 @@ node2.handle(['/a', '/b'], ({ protocol, stream }) => {
)
})
const { stream } = await node1.dialProtocol(node2.peerId, ['/a'])
const stream = await node1.dialProtocol(node2.peerId, ['/a'])
await pipe(
['protocol (a)'],
stream
)
const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream2 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
['protocol (b)'],
stream2
)
const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b'])
const stream3 = await node1.dialProtocol(node2.peerId, ['/b'])
await pipe(
['another stream on protocol (b)'],
stream3
@ -167,7 +167,7 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t
With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream.
You can see this working on example [3.js](./3.js).
You can see this working on example [3.js](./3.js).
As we've seen earlier, we can create our node with this createNode function.
```js
@ -229,14 +229,14 @@ node2.handle('/node-2', ({ stream }) => {
})
// Dialing node2 from node1
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
['from 1 to 2'],
stream1
)
// Dialing node1 from node2
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
['from 2 to 1'],
stream2
@ -256,14 +256,14 @@ So, we have successfully set up a bidirectional connection with protocol muxing.
The code below will result into an error as `the dial address is not valid`.
```js
// Dialing from node2 to node1
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1'])
await pipe(
['from 2 to 1'],
stream2
)
// Dialing from node1 to node2
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2'])
await pipe(
['from 1 to 2'],
stream1

View File

@ -48,7 +48,7 @@ function printAddrs (node, number) {
})
await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs())
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
['Hello', ' ', 'p2p', ' ', 'world', '!'].map(str => uint8ArrayFromString(str)),

View File

@ -63,14 +63,14 @@ function print ({ stream }) {
await node3.peerStore.addressBook.set(node1.peerId, node1.getMultiaddrs())
// node 1 (TCP) dials to node 2 (TCP+WebSockets)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
[uint8ArrayFromString('node 1 dialed to node 2 successfully')],
stream
)
// node 2 (TCP+WebSockets) dials to node 2 (WebSockets)
const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print')
const stream2 = await node2.dialProtocol(node3.peerId, '/print')
await pipe(
[uint8ArrayFromString('node 2 dialed to node 3 successfully')],
stream2

View File

@ -78,7 +78,7 @@ function print ({ stream }) {
const targetAddr = node1.getMultiaddrs()[0];
// node 2 (Secure WebSockets) dials to node 1 (Secure Websockets)
const { stream } = await node2.dialProtocol(targetAddr, '/print')
const stream = await node2.dialProtocol(targetAddr, '/print')
await pipe(
[uint8ArrayFromString('node 2 dialed to node 1 successfully')],
stream

View File

@ -139,7 +139,7 @@ Then add,
})
await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
['Hello', ' ', 'p2p', ' ', 'world', '!'],
@ -225,14 +225,14 @@ await node2.peerStore.addressBook.set(node3.peerId, node3.multiaddrs)
await node3.peerStore.addressBook.set(node1.peerId, node1.multiaddrs)
// node 1 (TCP) dials to node 2 (TCP+WebSockets)
const { stream } = await node1.dialProtocol(node2.peerId, '/print')
const stream = await node1.dialProtocol(node2.peerId, '/print')
await pipe(
['node 1 dialed to node 2 successfully'],
stream
)
// node 2 (TCP+WebSockets) dials to node 2 (WebSockets)
const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print')
const stream2 = await node2.dialProtocol(node3.peerId, '/print')
await pipe(
['node 2 dialed to node 3 successfully'],
stream2

View File

@ -1,5 +1,5 @@
import { createLibp2p } from 'libp2p'
import { WebRTCDirect } from '@achingbrain/webrtc-direct'
import { WebRTCDirect } from '@libp2p/webrtc-direct'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { Bootstrap } from '@libp2p/bootstrap'

View File

@ -1,5 +1,5 @@
import { createLibp2p } from 'libp2p'
import { WebRTCDirect } from '@achingbrain/webrtc-direct'
import { WebRTCDirect } from '@libp2p/webrtc-direct'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { createFromJSON } from '@libp2p/peer-id-factory'

View File

@ -12,7 +12,7 @@
"@libp2p/webrtc-direct": "^2.0.0",
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"libp2p": "../../",
"wrtc": "^0.4.7"
},

View File

@ -97,11 +97,11 @@
},
"dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/components": "^1.0.0",
"@libp2p/connection": "^3.0.0",
"@libp2p/components": "^2.0.0",
"@libp2p/connection": "^4.0.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-address-manager": "^1.0.1",
"@libp2p/interface-connection": "^1.0.1",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-connection-encrypter": "^1.0.2",
"@libp2p/interface-content-routing": "^1.0.1",
"@libp2p/interface-dht": "^1.0.0",
@ -111,18 +111,18 @@
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-routing": "^1.0.0",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interface-pubsub": "^1.0.1",
"@libp2p/interface-registrar": "^1.0.0",
"@libp2p/interface-pubsub": "^1.0.3",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-stream-muxer": "^1.0.1",
"@libp2p/interface-transport": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^2.0.0",
"@libp2p/multistream-select": "^2.0.1",
"@libp2p/peer-collections": "^1.0.2",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/peer-record": "^2.0.0",
"@libp2p/peer-store": "^2.0.0",
"@libp2p/peer-store": "^3.0.0",
"@libp2p/tracked-map": "^1.0.5",
"@libp2p/utils": "^2.0.0",
"@multiformats/mafmt": "^11.0.2",
@ -171,24 +171,24 @@
"@libp2p/daemon-server": "^2.0.0",
"@libp2p/delegated-content-routing": "^2.0.0",
"@libp2p/delegated-peer-routing": "^2.0.0",
"@libp2p/floodsub": "^2.0.0",
"@libp2p/floodsub": "^3.0.0",
"@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0",
"@libp2p/interface-mocks": "^1.0.1",
"@libp2p/interface-mocks": "^2.0.0",
"@libp2p/interop": "^2.0.0",
"@libp2p/kad-dht": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mdns": "^2.0.0",
"@libp2p/mplex": "^2.0.0",
"@libp2p/pubsub": "^2.0.0",
"@libp2p/tcp": "^2.0.0",
"@libp2p/topology": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/pubsub": "^3.0.1",
"@libp2p/tcp": "^3.0.0",
"@libp2p/topology": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.0",
"@libp2p/websockets": "^2.0.0",
"@libp2p/websockets": "^3.0.0",
"@types/node-forge": "^1.0.0",
"@types/p-fifo": "^1.0.0",
"@types/varint": "^6.0.0",
"@types/xsalsa20": "^1.1.0",
"aegir": "^37.0.9",
"aegir": "^37.3.0",
"cborg": "^1.8.1",
"delay": "^5.0.0",
"execa": "^6.1.0",

View File

@ -134,7 +134,7 @@ export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
} = options
// Create a new stream to the relay
const { stream } = await connection.newStream(RELAY_CODEC)
const stream = await connection.newStream(RELAY_CODEC)
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write(request)
@ -169,7 +169,7 @@ export async function canHop (options: CanHopOptions) {
} = options
// Create a new stream to the relay
const { stream } = await connection.newStream(RELAY_CODEC)
const stream = await connection.newStream(RELAY_CODEC)
// Send the HOP request
const streamHandler = new StreamHandler({ stream })

View File

@ -56,7 +56,7 @@ export async function stop (options: StopOptions) {
request
} = options
const { stream } = await connection.newStream([RELAY_CODEC])
const stream = await connection.newStream([RELAY_CODEC])
log('starting stop request to %p', connection.remotePeer)
const streamHandler = new StreamHandler({ stream })

View File

@ -79,13 +79,21 @@ const DefaultConfig: Partial<Libp2pInit> = {
host: {
agentVersion: AGENT_VERSION
},
timeout: 30000
timeout: 30000,
maxInboundStreams: 1,
maxOutboundStreams: 1,
maxPushIncomingStreams: 1,
maxPushOutgoingStreams: 1
},
ping: {
protocolPrefix: 'ipfs'
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
},
fetch: {
protocolPrefix: 'libp2p'
protocolPrefix: 'libp2p',
maxInboundStreams: 1,
maxOutboundStreams: 1
}
}

View File

@ -70,5 +70,8 @@ export enum codes {
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK',
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD',
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED'
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED',
ERR_NO_HANDLER_FOR_PROTOCOL = 'ERR_NO_HANDLER_FOR_PROTOCOL',
ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS',
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS'
}

View File

@ -3,7 +3,6 @@ import errCode from 'err-code'
import { codes } from '../errors.js'
import * as lp from 'it-length-prefixed'
import { FetchRequest, FetchResponse } from './pb/proto.js'
import { handshake } from 'it-handshake'
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
@ -13,11 +12,15 @@ import type { Components } from '@libp2p/components'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Duplex } from 'it-stream-types'
import { abortableDuplex } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import first from 'it-first'
const log = logger('libp2p:fetch')
export interface FetchServiceInit {
protocolPrefix: string
maxInboundStreams: number
maxOutboundStreams: number
}
export interface HandleMessageOptions {
@ -40,6 +43,7 @@ export class FetchService implements Startable {
private readonly components: Components
private readonly lookupFunctions: Map<string, LookupFunction>
private started: boolean
private readonly init: FetchServiceInit
constructor (components: Components, init: FetchServiceInit) {
this.started = false
@ -47,13 +51,21 @@ export class FetchService implements Startable {
this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
this.lookupFunctions = new Map() // Maps key prefix to value lookup function
this.handleMessage = this.handleMessage.bind(this)
this.init = init
}
async start () {
await this.components.getRegistrar().handle(this.protocol, (data) => {
void this.handleMessage(data).catch(err => {
log.error(err)
})
void this.handleMessage(data)
.catch(err => {
log.error(err)
})
.finally(() => {
data.stream.close()
})
}, {
maxInboundStreams: this.init.maxInboundStreams,
maxOutboundStreams: this.init.maxOutboundStreams
})
this.started = true
}
@ -74,7 +86,7 @@ export class FetchService implements Startable {
log('dialing %s to %p', this.protocol, peer)
const connection = await this.components.getConnectionManager().openConnection(peer, options)
const { stream } = await connection.newStream([this.protocol], options)
const stream = await connection.newStream([this.protocol], options)
let source: Duplex<Uint8Array> = stream
// make stream abortable if AbortSignal passed
@ -82,28 +94,42 @@ export class FetchService implements Startable {
source = abortableDuplex(stream, options.signal)
}
const shake = handshake(source)
try {
const result = await pipe(
[FetchRequest.encode({ identifier: key })],
lp.encode(),
source,
lp.decode(),
async function (source) {
const buf = await first(source)
// send message
shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice())
if (buf == null) {
throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE)
}
// read response
// @ts-expect-error fromReader returns a Source which has no .next method
const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice())
switch (response.status) {
case (FetchResponse.StatusCode.OK): {
return response.data
}
case (FetchResponse.StatusCode.NOT_FOUND): {
return null
}
case (FetchResponse.StatusCode.ERROR): {
const errmsg = (new TextDecoder()).decode(response.data)
throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS)
}
default: {
throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE)
}
const response = FetchResponse.decode(buf)
switch (response.status) {
case (FetchResponse.StatusCode.OK): {
return response.data
}
case (FetchResponse.StatusCode.NOT_FOUND): {
return null
}
case (FetchResponse.StatusCode.ERROR): {
const errmsg = (new TextDecoder()).decode(response.data)
throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS)
}
default: {
throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE)
}
}
}
)
return result ?? null
} finally {
stream.close()
}
}
@ -114,25 +140,40 @@ export class FetchService implements Startable {
*/
async handleMessage (data: IncomingStreamData) {
const { stream } = data
const shake = handshake(stream)
// @ts-expect-error fromReader returns a Source which has no .next method
const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice())
const self = this
let response: FetchResponse
const lookup = this._getLookupFunction(request.identifier)
if (lookup != null) {
const data = await lookup(request.identifier)
if (data != null) {
response = { status: FetchResponse.StatusCode.OK, data }
} else {
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
}
} else {
const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier)
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
}
await pipe(
stream,
lp.decode(),
async function * (source) {
const buf = await first(source)
shake.write(lp.encode.single(FetchResponse.encode(response)).slice())
if (buf == null) {
throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE)
}
// for await (const buf of source) {
const request = FetchRequest.decode(buf)
let response: FetchResponse
const lookup = self._getLookupFunction(request.identifier)
if (lookup != null) {
const data = await lookup(request.identifier)
if (data != null) {
response = { status: FetchResponse.StatusCode.OK, data }
} else {
response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) }
}
} else {
const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier)
response = { status: FetchResponse.StatusCode.ERROR, data: errmsg }
}
yield FetchResponse.encode(response)
},
lp.encode(),
stream
)
}
/**

View File

@ -60,6 +60,12 @@ export interface IdentifyServiceInit {
* Identify responses larger than this in bytes will be rejected (default: 8192)
*/
maxIdentifyMessageSize?: number
maxInboundStreams: number
maxOutboundStreams: number
maxPushIncomingStreams: number
maxPushOutgoingStreams: number
}
export class IdentifyService implements Startable {
@ -129,11 +135,17 @@ export class IdentifyService implements Startable {
void this._handleIdentify(data).catch(err => {
log.error(err)
})
}, {
maxInboundStreams: this.init.maxInboundStreams,
maxOutboundStreams: this.init.maxOutboundStreams
})
await this.components.getRegistrar().handle(this.identifyPushProtocolStr, (data) => {
void this._handlePush(data).catch(err => {
log.error(err)
})
}, {
maxInboundStreams: this.init.maxPushIncomingStreams,
maxOutboundStreams: this.init.maxPushOutgoingStreams
})
this.started = true
@ -159,10 +171,9 @@ export class IdentifyService implements Startable {
let stream: Stream | undefined
try {
const data = await connection.newStream([this.identifyPushProtocolStr], {
stream = await connection.newStream([this.identifyPushProtocolStr], {
signal: timeoutController.signal
})
stream = data.stream
// make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
@ -218,7 +229,7 @@ export class IdentifyService implements Startable {
}
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
const { stream } = await connection.newStream([this.identifyProtocolStr], options)
const stream = await connection.newStream([this.identifyProtocolStr], options)
let source: Duplex<Uint8Array> = stream
let timeoutController
let signal = options.signal

View File

@ -11,14 +11,14 @@ import type { PeerStore, PeerStoreInit } from '@libp2p/interface-peer-store'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { AutoRelayConfig, RelayAdvertiseConfig } from './circuit/index.js'
import type { PeerDiscovery } from '@libp2p/interface-peer-discovery'
import type { Connection, ConnectionGater, ConnectionProtector, ProtocolStream } from '@libp2p/interface-connection'
import type { Connection, ConnectionGater, ConnectionProtector, Stream } from '@libp2p/interface-connection'
import type { Transport } from '@libp2p/interface-transport'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter'
import type { PeerRouting } from '@libp2p/interface-peer-routing'
import type { ContentRouting } from '@libp2p/interface-content-routing'
import type { PubSub } from '@libp2p/interface-pubsub'
import type { Registrar, StreamHandler } from '@libp2p/interface-registrar'
import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { Metrics, MetricsInit } from '@libp2p/interface-metrics'
import type { PeerInfo } from '@libp2p/interface-peer-info'
@ -177,7 +177,7 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
* If successful, the known metadata of the peer will be added to the nodes `peerStore`,
* and the `MuxedStream` will be returned together with the successful negotiated protocol.
*/
dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise<ProtocolStream>
dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise<Stream>
/**
* Disconnects all connections to the given `peer`
@ -187,7 +187,7 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
/**
* Registers the `handler` for each protocol
*/
handle: (protocol: string | string[], handler: StreamHandler) => Promise<void>
handle: (protocol: string | string[], handler: StreamHandler, options?: StreamHandlerOptions) => Promise<void>
/**
* Removes the handler for each protocol. The protocol

View File

@ -33,7 +33,7 @@ import type { Connection } from '@libp2p/interface-connection'
import type { PeerRouting } from '@libp2p/interface-peer-routing'
import type { ContentRouting } from '@libp2p/interface-content-routing'
import type { PubSub } from '@libp2p/interface-pubsub'
import type { Registrar, StreamHandler } from '@libp2p/interface-registrar'
import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js'
@ -490,14 +490,14 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
return await this.pingService.ping(id, options)
}
async handle (protocols: string | string[], handler: StreamHandler): Promise<void> {
async handle (protocols: string | string[], handler: StreamHandler, options?: StreamHandlerOptions): Promise<void> {
if (!Array.isArray(protocols)) {
protocols = [protocols]
}
await Promise.all(
protocols.map(async protocol => {
await this.components.getRegistrar().handle(protocol, handler)
await this.components.getRegistrar().handle(protocol, handler, options)
})
)
}

View File

@ -18,21 +18,28 @@ const log = logger('libp2p:ping')
export interface PingServiceInit {
protocolPrefix: string
maxInboundStreams: number
maxOutboundStreams: number
}
export class PingService implements Startable {
public readonly protocol: string
private readonly components: Components
private started: boolean
private readonly init: PingServiceInit
constructor (components: Components, init: PingServiceInit) {
this.components = components
this.started = false
this.protocol = `/${init.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
this.init = init
}
async start () {
await this.components.getRegistrar().handle(this.protocol, this.handleMessage)
await this.components.getRegistrar().handle(this.protocol, this.handleMessage, {
maxInboundStreams: this.init.maxInboundStreams,
maxOutboundStreams: this.init.maxOutboundStreams
})
this.started = true
}
@ -67,7 +74,7 @@ export class PingService implements Startable {
log('dialing %s to %p', this.protocol, peer)
const connection = await this.components.getConnectionManager().openConnection(peer, options)
const { stream } = await connection.newStream([this.protocol], options)
const stream = await connection.newStream([this.protocol], options)
const start = Date.now()
const data = randomBytes(PING_LENGTH)

View File

@ -10,8 +10,8 @@ import type { Components } from '@libp2p/components'
const log = logger('libp2p:registrar')
const DEFAULT_MAX_INCOMING_STREAMS = 1
const DEFAULT_MAX_OUTGOING_STREAMS = 1
export const DEFAULT_MAX_INBOUND_STREAMS = 1
export const DEFAULT_MAX_OUTBOUND_STREAMS = 1
/**
* Responsible for notifying registered protocols of events in the network.
@ -46,7 +46,7 @@ export class DefaultRegistrar implements Registrar {
const handler = this.handlers.get(protocol)
if (handler == null) {
throw new Error(`No handler registered for protocol ${protocol}`)
throw errCode(new Error(`No handler registered for protocol ${protocol}`), codes.ERR_NO_HANDLER_FOR_PROTOCOL)
}
return handler
@ -72,9 +72,9 @@ export class DefaultRegistrar implements Registrar {
throw errCode(new Error(`Handler already registered for protocol ${protocol}`), codes.ERR_PROTOCOL_HANDLER_ALREADY_REGISTERED)
}
const options = merge({
maxIncomingStreams: DEFAULT_MAX_INCOMING_STREAMS,
maxOutgoingStreams: DEFAULT_MAX_OUTGOING_STREAMS
const options = merge.bind({ ignoreUndefined: true })({
maxInboundStreams: DEFAULT_MAX_INBOUND_STREAMS,
maxOutboundStreams: DEFAULT_MAX_OUTBOUND_STREAMS
}, opts)
this.handlers.set(protocol, {

View File

@ -8,7 +8,7 @@ import { codes } from './errors.js'
import { createConnection } from '@libp2p/connection'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import { peerIdFromString } from '@libp2p/peer-id'
import type { MultiaddrConnection, Connection, ProtocolStream, Stream } from '@libp2p/interface-connection'
import type { MultiaddrConnection, Connection, Stream } from '@libp2p/interface-connection'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { PeerId } from '@libp2p/interface-peer-id'
@ -16,6 +16,8 @@ import type { Upgrader, UpgraderEvents } from '@libp2p/interface-transport'
import type { Duplex } from 'it-stream-types'
import { Components, isInitializable } from '@libp2p/components'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Registrar } from '@libp2p/interface-registrar'
import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js'
const log = logger('libp2p:upgrader')
@ -43,6 +45,46 @@ export interface UpgraderInit {
muxers: StreamMuxerFactory[]
}
function findIncomingStreamLimit (protocol: string, registrar: Registrar) {
try {
const { options } = registrar.getHandler(protocol)
return options.maxInboundStreams
} catch (err: any) {
if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) {
throw err
}
}
return DEFAULT_MAX_INBOUND_STREAMS
}
function findOutgoingStreamLimit (protocol: string, registrar: Registrar) {
try {
const { options } = registrar.getHandler(protocol)
return options.maxOutboundStreams
} catch (err: any) {
if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) {
throw err
}
}
return DEFAULT_MAX_OUTBOUND_STREAMS
}
function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection) {
let streamCount = 0
connection.streams.forEach(stream => {
if (stream.stat.direction === direction && stream.stat.protocol === protocol) {
streamCount++
}
})
return streamCount
}
export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upgrader {
private readonly components: Components
private readonly connectionEncryption: Map<string, ConnectionEncrypter>
@ -267,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
} = opts
let muxer: StreamMuxer | undefined
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<ProtocolStream>) | undefined
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<Stream>) | undefined
let connection: Connection // eslint-disable-line prefer-const
if (muxerFactory != null) {
@ -296,13 +338,22 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
return
}
connection.addStream(muxedStream, { protocol })
const incomingLimit = findIncomingStreamLimit(protocol, this.components.getRegistrar())
const streamCount = countStreams(protocol, 'inbound', connection)
if (streamCount === incomingLimit) {
throw errCode(new Error('Too many incoming protocol streams'), codes.ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS)
}
muxedStream.stat.protocol = protocol
connection.addStream(muxedStream)
this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol })
})
.catch(err => {
log.error(err)
if (muxedStream.timeline.close == null) {
if (muxedStream.stat.timeline.close == null) {
muxedStream.close()
}
})
@ -317,7 +368,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
muxer.init(this.components)
}
newStream = async (protocols: string[], options: AbortOptions = {}): Promise<ProtocolStream> => {
newStream = async (protocols: string[], options: AbortOptions = {}): Promise<Stream> => {
if (muxer == null) {
throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE)
}
@ -334,11 +385,27 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
stream = metrics.trackStream({ stream, remotePeer, protocol })
}
return { stream: { ...muxedStream, ...stream }, protocol }
const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar())
const streamCount = countStreams(protocol, 'outbound', connection)
if (streamCount === outgoingLimit) {
throw errCode(new Error('Too many outgoing protocol streams'), codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
}
muxedStream.stat.protocol = protocol
return {
...muxedStream,
...stream,
stat: {
...muxedStream.stat,
protocol
}
}
} catch (err: any) {
log.error('could not create new stream', err)
if (muxedStream.timeline.close == null) {
if (muxedStream.stat.timeline.close == null) {
muxedStream.close()
}
@ -402,9 +469,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
await maConn.close()
// Ensure remaining streams are closed
if (muxer != null) {
await Promise.all(muxer.streams.map(async stream => {
await stream.close()
}))
muxer.streams.forEach(s => s.close())
}
}
})
@ -422,7 +487,8 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
_onStream (opts: OnStreamOptions): void {
const { connection, stream, protocol } = opts
const { handler } = this.components.getRegistrar().getHandler(protocol)
handler({ connection, stream, protocol })
handler({ connection, stream })
}
/**

View File

@ -73,9 +73,9 @@ describe('DHT subsystem operates correctly', () => {
})
it('should get notified of connected peers on dial', async () => {
const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
const stream = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
expect(connection).to.exist()
expect(stream).to.exist()
return await Promise.all([
pWaitFor(() => libp2p.dht.lan.routingTable.size === 1),

View File

@ -307,9 +307,9 @@ describe('libp2p.dialer (direct, TCP)', () => {
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream(['/echo/1.0.0'])
const stream = await connection.newStream(['/echo/1.0.0'])
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
expect(dialerDialSpy.callCount).to.be.greaterThan(0)
await connection.close()
})
@ -336,9 +336,9 @@ describe('libp2p.dialer (direct, TCP)', () => {
const connection = await libp2p.dial(remotePeerId)
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
expect(dialerDialSpy.callCount).to.be.greaterThan(0)
})
@ -377,7 +377,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
const connection = await libp2p.dial(remotePeerId)
// Create local to remote streams
const { stream } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
await connection.newStream('/stream-count/3')
await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4')
@ -487,9 +487,9 @@ describe('libp2p.dialer (direct, TCP)', () => {
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
expect(protectorProtectSpy.callCount).to.equal(1)
})

View File

@ -427,9 +427,9 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
const connection = await libp2p.dial(MULTIADDRS_WEBSOCKETS[0])
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
await connection.close()
expect(dialerDialSpy.callCount).to.be.at.least(1)
expect(addressBookAddSpy.callCount).to.be.at.least(1)

View File

@ -2,7 +2,7 @@
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { FetchService } from '../../src/fetch/index.js'
import { FetchService, FetchServiceInit } from '../../src/fetch/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
const defaultInit = {
protocolPrefix: 'ipfs'
const defaultInit: FetchServiceInit = {
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
}
async function createComponents (index: number) {
@ -127,7 +129,7 @@ describe('fetch', () => {
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
})
})

View File

@ -6,7 +6,7 @@ import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../../src/errors.js'
import { IdentifyService, Message } from '../../src/identify/index.js'
import { IdentifyService, IdentifyServiceInit, Message } from '../../src/identify/index.js'
import Peers from '../fixtures/peers.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
@ -32,11 +32,15 @@ import pDefer from 'p-defer'
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
const defaultInit = {
const defaultInit: IdentifyServiceInit = {
protocolPrefix: 'ipfs',
host: {
agentVersion: 'v1.0.0'
}
},
maxInboundStreams: 1,
maxOutboundStreams: 1,
maxPushIncomingStreams: 1,
maxPushOutgoingStreams: 1
}
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
@ -130,6 +134,7 @@ describe('identify', () => {
it('should be able to identify another peer with no certified peer records support', async () => {
const agentVersion = 'js-libp2p/5.0.0'
const localIdentify = new IdentifyService(localComponents, {
...defaultInit,
protocolPrefix: 'ipfs',
host: {
agentVersion: agentVersion
@ -137,6 +142,7 @@ describe('identify', () => {
})
await start(localIdentify)
const remoteIdentify = new IdentifyService(remoteComponents, {
...defaultInit,
protocolPrefix: 'ipfs',
host: {
agentVersion: agentVersion
@ -209,6 +215,7 @@ describe('identify', () => {
it('should store own host data and protocol version into metadataBook on start', async () => {
const agentVersion = 'js-project/1.0.0'
const localIdentify = new IdentifyService(localComponents, {
...defaultInit,
protocolPrefix: 'ipfs',
host: {
agentVersion
@ -270,8 +277,8 @@ describe('identify', () => {
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
})
it('should limit incoming identify message sizes', async () => {

View File

@ -3,7 +3,7 @@
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr'
import { IdentifyService } from '../../src/identify/index.js'
import { IdentifyService, IdentifyServiceInit } from '../../src/identify/index.js'
import Peers from '../fixtures/peers.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
@ -27,11 +27,15 @@ import { start, stop } from '@libp2p/interfaces/startable'
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
const defaultInit = {
const defaultInit: IdentifyServiceInit = {
protocolPrefix: 'ipfs',
host: {
agentVersion: 'v1.0.0'
}
},
maxInboundStreams: 1,
maxOutboundStreams: 1,
maxPushIncomingStreams: 1,
maxPushOutgoingStreams: 1
}
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
@ -213,8 +217,8 @@ describe('identify (push)', () => {
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
// method should have returned before the remote handler completes as we timed
// out so we ignore the return value

View File

@ -91,7 +91,7 @@ describe('libp2p.metrics', () => {
})
const connection = await libp2p.dial(remoteLibp2p.peerId)
const { stream } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
const result = await pipe(
@ -156,7 +156,7 @@ describe('libp2p.metrics', () => {
})
const connection = await libp2p.dial(remoteLibp2p.peerId)
const { stream } = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
await pipe(

View File

@ -2,7 +2,7 @@
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { PingService } from '../../src/ping/index.js'
import { PingService, PingServiceInit } from '../../src/ping/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
const defaultInit = {
protocolPrefix: 'ipfs'
const defaultInit: PingServiceInit = {
protocolPrefix: 'ipfs',
maxInboundStreams: 1,
maxOutboundStreams: 1
}
async function createComponents (index: number) {
@ -116,7 +118,7 @@ describe('ping', () => {
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
const stream = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('stat.timeline.close')
})
})

View File

@ -1,7 +1,6 @@
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import pTimes from 'p-times'
import { pipe } from 'it-pipe'
import { createNode, populateAddressBooks } from '../utils/creators/peer.js'
import { createBaseOptions } from '../utils/base-options.js'
@ -41,7 +40,11 @@ describe('ping', () => {
})
it('ping several times for getting an average', async () => {
const latencies = await pTimes(5, async () => await nodes[1].ping(nodes[0].peerId))
const latencies = []
for (let i = 0; i < 5; i++) {
latencies.push(await nodes[1].ping(nodes[0].peerId))
}
const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length
expect(averageLatency).to.be.a('Number')

View File

@ -80,7 +80,7 @@ describe('Dialing (via relay, TCP)', () => {
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes())
expect(connection.remoteAddr).to.eql(dialAddr)
const { stream: echoStream } = await connection.newStream('/echo/1.0.0')
const echoStream = await connection.newStream('/echo/1.0.0')
const input = uint8ArrayFromString('hello')
const [output] = await pipe(
@ -156,7 +156,7 @@ describe('Dialing (via relay, TCP)', () => {
// send an invalid relay message from the relay to the destination peer
const connections = relayLibp2p.getConnections(dstLibp2p.peerId)
const { stream } = await connections[0].newStream(RELAY_CODEC)
const stream = await connections[0].newStream(RELAY_CODEC)
const streamHandler = new StreamHandler({ stream })
streamHandler.write({
type: CircuitRelay.Type.STATUS

View File

@ -85,9 +85,15 @@ describe('Upgrader', () => {
await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
await remoteComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
})
@ -105,8 +111,8 @@ describe('Upgrader', () => {
expect(connections).to.have.length(2)
const { stream, protocol } = await connections[0].newStream('/echo/1.0.0')
expect(protocol).to.equal('/echo/1.0.0')
const stream = await connections[0].newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
const hello = uint8ArrayFromString('hello there!')
const result = await pipe(
@ -175,8 +181,8 @@ describe('Upgrader', () => {
expect(connections).to.have.length(2)
const { stream, protocol } = await connections[0].newStream('/echo/1.0.0')
expect(protocol).to.equal('/echo/1.0.0')
const stream = await connections[0].newStream('/echo/1.0.0')
expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0')
const hello = uint8ArrayFromString('hello there!')
const result = await pipe(
@ -515,11 +521,11 @@ describe('libp2p.upgrader', () => {
])
const remoteLibp2pUpgraderOnStreamSpy = sinon.spy(remoteLibp2p.components.getUpgrader() as DefaultUpgrader, '_onStream')
const { stream } = await localConnection.newStream(['/echo/1.0.0'])
expect(stream).to.include.keys(['id', 'close', 'reset', 'timeline'])
const stream = await localConnection.newStream(['/echo/1.0.0'])
expect(stream).to.include.keys(['id', 'close', 'reset', 'stat'])
const [arg0] = remoteLibp2pUpgraderOnStreamSpy.getCall(0).args
expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'timeline'])
expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'stat'])
})
it('should emit connect and disconnect events', async () => {
@ -579,4 +585,128 @@ describe('libp2p.upgrader', () => {
// @ts-expect-error detail is only on CustomEvent type
expect(remotePeer.equals(event.detail.remotePeer)).to.equal(true)
})
it('should limit the number of incoming streams that can be opened using a protocol', async () => {
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await remoteLibp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const [localToRemote] = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
let streamCount = 0
await libp2p.handle(protocol, (data) => {}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: 1,
maxOutboundStreams: 1
})
expect(streamCount).to.equal(0)
await localToRemote.newStream(protocol)
expect(streamCount).to.equal(1)
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', 'ERR_UNDER_READ')
})
it('should limit the number of outgoing streams that can be opened using a protocol', async () => {
const protocol = '/a-test-protocol/1.0.0'
const remotePeer = peers[1]
libp2p = await createLibp2pNode({
peerId: peers[0],
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await libp2p.start()
remoteLibp2p = await createLibp2pNode({
peerId: remotePeer,
transports: [
new WebSockets()
],
streamMuxers: [
new Mplex()
],
connectionEncryption: [
NOISE
]
})
await remoteLibp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const [localToRemote] = await Promise.all([
libp2p.components.getUpgrader().upgradeOutbound(outbound),
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
])
let streamCount = 0
await libp2p.handle(protocol, (data) => {}, {
maxInboundStreams: 1,
maxOutboundStreams: 1
})
await remoteLibp2p.handle(protocol, (data) => {
streamCount++
}, {
maxInboundStreams: 10,
maxOutboundStreams: 10
})
expect(streamCount).to.equal(0)
await localToRemote.newStream(protocol)
expect(streamCount).to.equal(1)
await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
})
})