diff --git a/.gitignore b/.gitignore index 69f5439f..7e4d369a 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ test/repo-tests* logs *.log -coverage +.coverage .nyc_output # Runtime data diff --git a/doc/API.md b/doc/API.md index d8ba8fd8..c26a8c71 100644 --- a/doc/API.md +++ b/doc/API.md @@ -389,7 +389,7 @@ await libp2p.hangUp(remotePeerId) Sets up [multistream-select routing](https://github.com/multiformats/multistream-select) of protocols to their application handlers. Whenever a stream is opened on one of the provided protocols, the handler will be called. `handle` must be called in order to register a handler and support for a given protocol. This also informs other peers of the protocols you support. -`libp2p.handle(protocols, handler)` +`libp2p.handle(protocols, handler, options)` In the event of a new handler for the same protocol being added, the first one is discarded. @@ -399,6 +399,7 @@ In the event of a new handler for the same protocol being added, the first one i |------|------|-------------| | protocols | `Array|string` | protocols to register | | handler | `function({ connection:*, stream:*, protocol:string })` | handler to call | +| options | `StreamHandlerOptions` | Options including protocol stream limits | #### Example @@ -409,7 +410,10 @@ const handler = ({ connection, stream, protocol }) => { // use stream or connection according to the needs } -libp2p.handle('/echo/1.0.0', handler) +libp2p.handle('/echo/1.0.0', handler, { + maxInboundStreams: 5, + maxOutboundStreams: 5 +}) ``` ### unhandle diff --git a/examples/chat/src/dialer.js b/examples/chat/src/dialer.js index e9d59262..8f37e755 100644 --- a/examples/chat/src/dialer.js +++ b/examples/chat/src/dialer.js @@ -32,7 +32,7 @@ async function run () { // Dial to the remote peer (the "listener") const listenerMa = new Multiaddr(`/ip4/127.0.0.1/tcp/10333/p2p/${idListener.toString()}`) - const { stream } = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0') + const stream = await nodeDialer.dialProtocol(listenerMa, '/chat/1.0.0') console.log('Dialer dialed to listener on protocol: /chat/1.0.0') console.log('Type a message and see what happens') diff --git a/examples/connection-encryption/1.js b/examples/connection-encryption/1.js index eafb4ff7..0e774fcb 100644 --- a/examples/connection-encryption/1.js +++ b/examples/connection-encryption/1.js @@ -40,7 +40,7 @@ const createNode = async () => { ) }) - const { stream } = await node1.dialProtocol(node2.peerId, '/a-protocol') + const stream = await node1.dialProtocol(node2.peerId, '/a-protocol') await pipe( [uint8ArrayFromString('This information is sent out encrypted to the other peer')], diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json index 399cc0af..3892af93 100644 --- a/examples/delegated-routing/package.json +++ b/examples/delegated-routing/package.json @@ -8,10 +8,10 @@ "libp2p": "../../", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", - "@libp2p/kad-dht": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/kad-dht": "^3.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/webrtc-star": "^2.0.1", + "@libp2p/websockets": "^3.0.0", "react": "^17.0.2", "react-dom": "^17.0.2", "react-scripts": "5.0.0" diff --git a/examples/echo/src/dialer.js b/examples/echo/src/dialer.js index 74a65087..435a9ea2 100644 --- a/examples/echo/src/dialer.js +++ b/examples/echo/src/dialer.js @@ -37,7 +37,7 @@ async function run() { // Dial the listener node console.log('Dialing to peer:', listenerMultiaddr) - const { stream } = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0') + const stream = await dialerNode.dialProtocol(listenerMultiaddr, '/echo/1.0.0') console.log('nodeA dialed to nodeB on protocol: /echo/1.0.0') diff --git a/examples/libp2p-in-the-browser/package.json b/examples/libp2p-in-the-browser/package.json index 6dbc8396..4736108c 100644 --- a/examples/libp2p-in-the-browser/package.json +++ b/examples/libp2p-in-the-browser/package.json @@ -11,9 +11,9 @@ "dependencies": { "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/webrtc-star": "^2.0.1", + "@libp2p/websockets": "^3.0.0", "libp2p": "../../" }, "devDependencies": { diff --git a/examples/package.json b/examples/package.json index 70984696..a699ca30 100644 --- a/examples/package.json +++ b/examples/package.json @@ -10,7 +10,7 @@ "license": "MIT", "dependencies": { "@libp2p/pubsub-peer-discovery": "^6.0.0", - "@libp2p/floodsub": "^2.0.0", + "@libp2p/floodsub": "^3.0.0", "@nodeutils/defaults-deep": "^1.1.0", "execa": "^2.1.0", "fs-extra": "^8.1.0", diff --git a/examples/pnet/index.js b/examples/pnet/index.js index 59adbaa8..fa8268df 100644 --- a/examples/pnet/index.js +++ b/examples/pnet/index.js @@ -43,7 +43,7 @@ generateKey(otherSwarmKey) ) }) - const { stream } = await node1.dialProtocol(node2.peerId, '/private') + const stream = await node1.dialProtocol(node2.peerId, '/private') await pipe( [uint8ArrayFromString('This message is sent on a private network')], diff --git a/examples/protocol-and-stream-muxing/1.js b/examples/protocol-and-stream-muxing/1.js index 364bffdb..f1ab2f20 100644 --- a/examples/protocol-and-stream-muxing/1.js +++ b/examples/protocol-and-stream-muxing/1.js @@ -60,14 +60,14 @@ const createNode = async () => { }) */ - const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol']) + const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol']) await pipe( [uint8ArrayFromString('my own protocol, wow!')], stream ) /* - const { stream } = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) + const stream = node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) await pipe( ['my own protocol, wow!'], diff --git a/examples/protocol-and-stream-muxing/2.js b/examples/protocol-and-stream-muxing/2.js index 44b0b1eb..2605938d 100644 --- a/examples/protocol-and-stream-muxing/2.js +++ b/examples/protocol-and-stream-muxing/2.js @@ -38,22 +38,25 @@ const createNode = async () => { console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`) } } - ) + ).finally(() => { + // clean up resources + stream.close() + }) }) - const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/a']) + const stream1 = await node1.dialProtocol(node2.peerId, ['/a']) await pipe( [uint8ArrayFromString('protocol (a)')], stream1 ) - const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b']) + const stream2 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( [uint8ArrayFromString('protocol (b)')], stream2 ) - const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b']) + const stream3 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( [uint8ArrayFromString('another stream on protocol (b)')], stream3 diff --git a/examples/protocol-and-stream-muxing/3.js b/examples/protocol-and-stream-muxing/3.js index a3687435..af63bdea 100644 --- a/examples/protocol-and-stream-muxing/3.js +++ b/examples/protocol-and-stream-muxing/3.js @@ -54,13 +54,13 @@ const createNode = async () => { ) }) - const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) + const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( [uint8ArrayFromString('from 1 to 2')], stream1 ) - const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) + const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( [uint8ArrayFromString('from 2 to 1')], stream2 diff --git a/examples/protocol-and-stream-muxing/README.md b/examples/protocol-and-stream-muxing/README.md index cb34a65d..3e76b396 100644 --- a/examples/protocol-and-stream-muxing/README.md +++ b/examples/protocol-and-stream-muxing/README.md @@ -40,7 +40,7 @@ node2.handle('/your-protocol', ({ stream }) => { After the protocol is _handled_, now we can dial to it. ```JavaScript -const { stream } = await node1.dialProtocol(node2.peerId, ['/your-protocol']) +const stream = await node1.dialProtocol(node2.peerId, ['/your-protocol']) await pipe( ['my own protocol, wow!'], @@ -62,7 +62,7 @@ node2.handle('/another-protocol/1.0.1', ({ stream }) => { ) }) // ... -const { stream } = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) +const stream = await node1.dialProtocol(node2.peerId, ['/another-protocol/1.0.0']) await pipe( ['my own protocol, wow!'], @@ -133,19 +133,19 @@ node2.handle(['/a', '/b'], ({ protocol, stream }) => { ) }) -const { stream } = await node1.dialProtocol(node2.peerId, ['/a']) +const stream = await node1.dialProtocol(node2.peerId, ['/a']) await pipe( ['protocol (a)'], stream ) -const { stream: stream2 } = await node1.dialProtocol(node2.peerId, ['/b']) +const stream2 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( ['protocol (b)'], stream2 ) -const { stream: stream3 } = await node1.dialProtocol(node2.peerId, ['/b']) +const stream3 = await node1.dialProtocol(node2.peerId, ['/b']) await pipe( ['another stream on protocol (b)'], stream3 @@ -167,7 +167,7 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream. -You can see this working on example [3.js](./3.js). +You can see this working on example [3.js](./3.js). As we've seen earlier, we can create our node with this createNode function. ```js @@ -229,14 +229,14 @@ node2.handle('/node-2', ({ stream }) => { }) // Dialing node2 from node1 -const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) +const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( ['from 1 to 2'], stream1 ) // Dialing node1 from node2 -const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) +const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( ['from 2 to 1'], stream2 @@ -256,14 +256,14 @@ So, we have successfully set up a bidirectional connection with protocol muxing. The code below will result into an error as `the dial address is not valid`. ```js // Dialing from node2 to node1 -const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1']) +const stream2 = await node2.dialProtocol(node1.peerId, ['/node-1']) await pipe( ['from 2 to 1'], stream2 ) // Dialing from node1 to node2 -const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2']) +const stream1 = await node1.dialProtocol(node2.peerId, ['/node-2']) await pipe( ['from 1 to 2'], stream1 diff --git a/examples/transports/2.js b/examples/transports/2.js index d157da11..9dee8878 100644 --- a/examples/transports/2.js +++ b/examples/transports/2.js @@ -48,7 +48,7 @@ function printAddrs (node, number) { }) await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs()) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['Hello', ' ', 'p2p', ' ', 'world', '!'].map(str => uint8ArrayFromString(str)), diff --git a/examples/transports/3.js b/examples/transports/3.js index b1a233f8..0bc9fa70 100644 --- a/examples/transports/3.js +++ b/examples/transports/3.js @@ -63,14 +63,14 @@ function print ({ stream }) { await node3.peerStore.addressBook.set(node1.peerId, node1.getMultiaddrs()) // node 1 (TCP) dials to node 2 (TCP+WebSockets) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( [uint8ArrayFromString('node 1 dialed to node 2 successfully')], stream ) // node 2 (TCP+WebSockets) dials to node 2 (WebSockets) - const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print') + const stream2 = await node2.dialProtocol(node3.peerId, '/print') await pipe( [uint8ArrayFromString('node 2 dialed to node 3 successfully')], stream2 diff --git a/examples/transports/4.js b/examples/transports/4.js index 389217aa..0e13c569 100644 --- a/examples/transports/4.js +++ b/examples/transports/4.js @@ -78,7 +78,7 @@ function print ({ stream }) { const targetAddr = node1.getMultiaddrs()[0]; // node 2 (Secure WebSockets) dials to node 1 (Secure Websockets) - const { stream } = await node2.dialProtocol(targetAddr, '/print') + const stream = await node2.dialProtocol(targetAddr, '/print') await pipe( [uint8ArrayFromString('node 2 dialed to node 1 successfully')], stream diff --git a/examples/transports/README.md b/examples/transports/README.md index 1d3f5d4f..ca951834 100644 --- a/examples/transports/README.md +++ b/examples/transports/README.md @@ -139,7 +139,7 @@ Then add, }) await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs) - const { stream } = await node1.dialProtocol(node2.peerId, '/print') + const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['Hello', ' ', 'p2p', ' ', 'world', '!'], @@ -225,14 +225,14 @@ await node2.peerStore.addressBook.set(node3.peerId, node3.multiaddrs) await node3.peerStore.addressBook.set(node1.peerId, node1.multiaddrs) // node 1 (TCP) dials to node 2 (TCP+WebSockets) -const { stream } = await node1.dialProtocol(node2.peerId, '/print') +const stream = await node1.dialProtocol(node2.peerId, '/print') await pipe( ['node 1 dialed to node 2 successfully'], stream ) // node 2 (TCP+WebSockets) dials to node 2 (WebSockets) -const { stream: stream2 } = await node2.dialProtocol(node3.peerId, '/print') +const stream2 = await node2.dialProtocol(node3.peerId, '/print') await pipe( ['node 2 dialed to node 3 successfully'], stream2 diff --git a/examples/webrtc-direct/dialer.js b/examples/webrtc-direct/dialer.js index d2357dfa..1663054c 100644 --- a/examples/webrtc-direct/dialer.js +++ b/examples/webrtc-direct/dialer.js @@ -1,5 +1,5 @@ import { createLibp2p } from 'libp2p' -import { WebRTCDirect } from '@achingbrain/webrtc-direct' +import { WebRTCDirect } from '@libp2p/webrtc-direct' import { Mplex } from '@libp2p/mplex' import { Noise } from '@chainsafe/libp2p-noise' import { Bootstrap } from '@libp2p/bootstrap' diff --git a/examples/webrtc-direct/listener.js b/examples/webrtc-direct/listener.js index e27cb66b..8a822d0b 100644 --- a/examples/webrtc-direct/listener.js +++ b/examples/webrtc-direct/listener.js @@ -1,5 +1,5 @@ import { createLibp2p } from 'libp2p' -import { WebRTCDirect } from '@achingbrain/webrtc-direct' +import { WebRTCDirect } from '@libp2p/webrtc-direct' import { Mplex } from '@libp2p/mplex' import { Noise } from '@chainsafe/libp2p-noise' import { createFromJSON } from '@libp2p/peer-id-factory' diff --git a/examples/webrtc-direct/package.json b/examples/webrtc-direct/package.json index c1d34ffe..57d33ee0 100644 --- a/examples/webrtc-direct/package.json +++ b/examples/webrtc-direct/package.json @@ -12,7 +12,7 @@ "@libp2p/webrtc-direct": "^2.0.0", "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^2.0.0", + "@libp2p/mplex": "^3.0.0", "libp2p": "../../", "wrtc": "^0.4.7" }, diff --git a/package.json b/package.json index 997908ac..37e572e9 100644 --- a/package.json +++ b/package.json @@ -97,11 +97,11 @@ }, "dependencies": { "@achingbrain/nat-port-mapper": "^1.0.3", - "@libp2p/components": "^1.0.0", - "@libp2p/connection": "^3.0.0", + "@libp2p/components": "^2.0.0", + "@libp2p/connection": "^4.0.0", "@libp2p/crypto": "^1.0.0", "@libp2p/interface-address-manager": "^1.0.1", - "@libp2p/interface-connection": "^1.0.1", + "@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-connection-encrypter": "^1.0.2", "@libp2p/interface-content-routing": "^1.0.1", "@libp2p/interface-dht": "^1.0.0", @@ -111,18 +111,18 @@ "@libp2p/interface-peer-info": "^1.0.1", "@libp2p/interface-peer-routing": "^1.0.0", "@libp2p/interface-peer-store": "^1.0.0", - "@libp2p/interface-pubsub": "^1.0.1", - "@libp2p/interface-registrar": "^1.0.0", + "@libp2p/interface-pubsub": "^1.0.3", + "@libp2p/interface-registrar": "^2.0.0", "@libp2p/interface-stream-muxer": "^1.0.1", "@libp2p/interface-transport": "^1.0.0", "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^2.0.0", + "@libp2p/multistream-select": "^2.0.1", "@libp2p/peer-collections": "^1.0.2", "@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id-factory": "^1.0.9", "@libp2p/peer-record": "^2.0.0", - "@libp2p/peer-store": "^2.0.0", + "@libp2p/peer-store": "^3.0.0", "@libp2p/tracked-map": "^1.0.5", "@libp2p/utils": "^2.0.0", "@multiformats/mafmt": "^11.0.2", @@ -171,24 +171,24 @@ "@libp2p/daemon-server": "^2.0.0", "@libp2p/delegated-content-routing": "^2.0.0", "@libp2p/delegated-peer-routing": "^2.0.0", - "@libp2p/floodsub": "^2.0.0", + "@libp2p/floodsub": "^3.0.0", "@libp2p/interface-compliance-tests": "^3.0.1", "@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0", - "@libp2p/interface-mocks": "^1.0.1", + "@libp2p/interface-mocks": "^2.0.0", "@libp2p/interop": "^2.0.0", - "@libp2p/kad-dht": "^2.0.0", + "@libp2p/kad-dht": "^3.0.0", "@libp2p/mdns": "^2.0.0", - "@libp2p/mplex": "^2.0.0", - "@libp2p/pubsub": "^2.0.0", - "@libp2p/tcp": "^2.0.0", - "@libp2p/topology": "^2.0.0", + "@libp2p/mplex": "^3.0.0", + "@libp2p/pubsub": "^3.0.1", + "@libp2p/tcp": "^3.0.0", + "@libp2p/topology": "^3.0.0", "@libp2p/webrtc-star": "^2.0.0", - "@libp2p/websockets": "^2.0.0", + "@libp2p/websockets": "^3.0.0", "@types/node-forge": "^1.0.0", "@types/p-fifo": "^1.0.0", "@types/varint": "^6.0.0", "@types/xsalsa20": "^1.1.0", - "aegir": "^37.0.9", + "aegir": "^37.3.0", "cborg": "^1.8.1", "delay": "^5.0.0", "execa": "^6.1.0", diff --git a/src/circuit/circuit/hop.ts b/src/circuit/circuit/hop.ts index 4f5474d1..d8bbe03e 100644 --- a/src/circuit/circuit/hop.ts +++ b/src/circuit/circuit/hop.ts @@ -134,7 +134,7 @@ export async function hop (options: HopConfig): Promise> { } = options // Create a new stream to the relay - const { stream } = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC) // Send the HOP request const streamHandler = new StreamHandler({ stream }) streamHandler.write(request) @@ -169,7 +169,7 @@ export async function canHop (options: CanHopOptions) { } = options // Create a new stream to the relay - const { stream } = await connection.newStream(RELAY_CODEC) + const stream = await connection.newStream(RELAY_CODEC) // Send the HOP request const streamHandler = new StreamHandler({ stream }) diff --git a/src/circuit/circuit/stop.ts b/src/circuit/circuit/stop.ts index c953ce56..75c97f66 100644 --- a/src/circuit/circuit/stop.ts +++ b/src/circuit/circuit/stop.ts @@ -56,7 +56,7 @@ export async function stop (options: StopOptions) { request } = options - const { stream } = await connection.newStream([RELAY_CODEC]) + const stream = await connection.newStream([RELAY_CODEC]) log('starting stop request to %p', connection.remotePeer) const streamHandler = new StreamHandler({ stream }) diff --git a/src/config.ts b/src/config.ts index 432d7e9a..f2619142 100644 --- a/src/config.ts +++ b/src/config.ts @@ -79,13 +79,21 @@ const DefaultConfig: Partial = { host: { agentVersion: AGENT_VERSION }, - timeout: 30000 + timeout: 30000, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 }, ping: { - protocolPrefix: 'ipfs' + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 }, fetch: { - protocolPrefix: 'libp2p' + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1 } } diff --git a/src/errors.ts b/src/errors.ts index ec02ed60..50e2441d 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -70,5 +70,8 @@ export enum codes { ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED', ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK', ERR_INVALID_RECORD = 'ERR_INVALID_RECORD', - ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED' + ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED', + ERR_NO_HANDLER_FOR_PROTOCOL = 'ERR_NO_HANDLER_FOR_PROTOCOL', + ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS', + ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS' } diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 218b182f..04d7efc8 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -3,7 +3,6 @@ import errCode from 'err-code' import { codes } from '../errors.js' import * as lp from 'it-length-prefixed' import { FetchRequest, FetchResponse } from './pb/proto.js' -import { handshake } from 'it-handshake' import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' @@ -13,11 +12,15 @@ import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import type { Duplex } from 'it-stream-types' import { abortableDuplex } from 'abortable-iterator' +import { pipe } from 'it-pipe' +import first from 'it-first' const log = logger('libp2p:fetch') export interface FetchServiceInit { protocolPrefix: string + maxInboundStreams: number + maxOutboundStreams: number } export interface HandleMessageOptions { @@ -40,6 +43,7 @@ export class FetchService implements Startable { private readonly components: Components private readonly lookupFunctions: Map private started: boolean + private readonly init: FetchServiceInit constructor (components: Components, init: FetchServiceInit) { this.started = false @@ -47,13 +51,21 @@ export class FetchService implements Startable { this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` this.lookupFunctions = new Map() // Maps key prefix to value lookup function this.handleMessage = this.handleMessage.bind(this) + this.init = init } async start () { await this.components.getRegistrar().handle(this.protocol, (data) => { - void this.handleMessage(data).catch(err => { - log.error(err) - }) + void this.handleMessage(data) + .catch(err => { + log.error(err) + }) + .finally(() => { + data.stream.close() + }) + }, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams }) this.started = true } @@ -74,7 +86,7 @@ export class FetchService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const { stream } = await connection.newStream([this.protocol], options) + const stream = await connection.newStream([this.protocol], options) let source: Duplex = stream // make stream abortable if AbortSignal passed @@ -82,28 +94,42 @@ export class FetchService implements Startable { source = abortableDuplex(stream, options.signal) } - const shake = handshake(source) + try { + const result = await pipe( + [FetchRequest.encode({ identifier: key })], + lp.encode(), + source, + lp.decode(), + async function (source) { + const buf = await first(source) - // send message - shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice()) + if (buf == null) { + throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE) + } - // read response - // @ts-expect-error fromReader returns a Source which has no .next method - const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) - switch (response.status) { - case (FetchResponse.StatusCode.OK): { - return response.data - } - case (FetchResponse.StatusCode.NOT_FOUND): { - return null - } - case (FetchResponse.StatusCode.ERROR): { - const errmsg = (new TextDecoder()).decode(response.data) - throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) - } - default: { - throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) - } + const response = FetchResponse.decode(buf) + + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + return null + } + case (FetchResponse.StatusCode.ERROR): { + const errmsg = (new TextDecoder()).decode(response.data) + throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS) + } + default: { + throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE) + } + } + } + ) + + return result ?? null + } finally { + stream.close() } } @@ -114,25 +140,40 @@ export class FetchService implements Startable { */ async handleMessage (data: IncomingStreamData) { const { stream } = data - const shake = handshake(stream) - // @ts-expect-error fromReader returns a Source which has no .next method - const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice()) + const self = this - let response: FetchResponse - const lookup = this._getLookupFunction(request.identifier) - if (lookup != null) { - const data = await lookup(request.identifier) - if (data != null) { - response = { status: FetchResponse.StatusCode.OK, data } - } else { - response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } - } - } else { - const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) - response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } - } + await pipe( + stream, + lp.decode(), + async function * (source) { + const buf = await first(source) - shake.write(lp.encode.single(FetchResponse.encode(response)).slice()) + if (buf == null) { + throw errCode(new Error('No data received'), codes.ERR_INVALID_MESSAGE) + } + + // for await (const buf of source) { + const request = FetchRequest.decode(buf) + + let response: FetchResponse + const lookup = self._getLookupFunction(request.identifier) + if (lookup != null) { + const data = await lookup(request.identifier) + if (data != null) { + response = { status: FetchResponse.StatusCode.OK, data } + } else { + response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } + } + } else { + const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier) + response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } + } + + yield FetchResponse.encode(response) + }, + lp.encode(), + stream + ) } /** diff --git a/src/identify/index.ts b/src/identify/index.ts index b3fe0e9e..e921308d 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -60,6 +60,12 @@ export interface IdentifyServiceInit { * Identify responses larger than this in bytes will be rejected (default: 8192) */ maxIdentifyMessageSize?: number + + maxInboundStreams: number + maxOutboundStreams: number + + maxPushIncomingStreams: number + maxPushOutgoingStreams: number } export class IdentifyService implements Startable { @@ -129,11 +135,17 @@ export class IdentifyService implements Startable { void this._handleIdentify(data).catch(err => { log.error(err) }) + }, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams }) await this.components.getRegistrar().handle(this.identifyPushProtocolStr, (data) => { void this._handlePush(data).catch(err => { log.error(err) }) + }, { + maxInboundStreams: this.init.maxPushIncomingStreams, + maxOutboundStreams: this.init.maxPushOutgoingStreams }) this.started = true @@ -159,10 +171,9 @@ export class IdentifyService implements Startable { let stream: Stream | undefined try { - const data = await connection.newStream([this.identifyPushProtocolStr], { + stream = await connection.newStream([this.identifyPushProtocolStr], { signal: timeoutController.signal }) - stream = data.stream // make stream abortable const source: Duplex = abortableDuplex(stream, timeoutController.signal) @@ -218,7 +229,7 @@ export class IdentifyService implements Startable { } async _identify (connection: Connection, options: AbortOptions = {}): Promise { - const { stream } = await connection.newStream([this.identifyProtocolStr], options) + const stream = await connection.newStream([this.identifyProtocolStr], options) let source: Duplex = stream let timeoutController let signal = options.signal diff --git a/src/index.ts b/src/index.ts index 62a300e6..b7aa6287 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,14 +11,14 @@ import type { PeerStore, PeerStoreInit } from '@libp2p/interface-peer-store' import type { PeerId } from '@libp2p/interface-peer-id' import type { AutoRelayConfig, RelayAdvertiseConfig } from './circuit/index.js' import type { PeerDiscovery } from '@libp2p/interface-peer-discovery' -import type { Connection, ConnectionGater, ConnectionProtector, ProtocolStream } from '@libp2p/interface-connection' +import type { Connection, ConnectionGater, ConnectionProtector, Stream } from '@libp2p/interface-connection' import type { Transport } from '@libp2p/interface-transport' import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter' import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { PubSub } from '@libp2p/interface-pubsub' -import type { Registrar, StreamHandler } from '@libp2p/interface-registrar' +import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { Metrics, MetricsInit } from '@libp2p/interface-metrics' import type { PeerInfo } from '@libp2p/interface-peer-info' @@ -177,7 +177,7 @@ export interface Libp2p extends Startable, EventEmitter { * If successful, the known metadata of the peer will be added to the nodes `peerStore`, * and the `MuxedStream` will be returned together with the successful negotiated protocol. */ - dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise + dialProtocol: (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions) => Promise /** * Disconnects all connections to the given `peer` @@ -187,7 +187,7 @@ export interface Libp2p extends Startable, EventEmitter { /** * Registers the `handler` for each protocol */ - handle: (protocol: string | string[], handler: StreamHandler) => Promise + handle: (protocol: string | string[], handler: StreamHandler, options?: StreamHandlerOptions) => Promise /** * Removes the handler for each protocol. The protocol diff --git a/src/libp2p.ts b/src/libp2p.ts index b7ff6b4a..32676c48 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -33,7 +33,7 @@ import type { Connection } from '@libp2p/interface-connection' import type { PeerRouting } from '@libp2p/interface-peer-routing' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { PubSub } from '@libp2p/interface-pubsub' -import type { Registrar, StreamHandler } from '@libp2p/interface-registrar' +import type { Registrar, StreamHandler, StreamHandlerOptions } from '@libp2p/interface-registrar' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js' @@ -490,14 +490,14 @@ export class Libp2pNode extends EventEmitter implements Libp2p { return await this.pingService.ping(id, options) } - async handle (protocols: string | string[], handler: StreamHandler): Promise { + async handle (protocols: string | string[], handler: StreamHandler, options?: StreamHandlerOptions): Promise { if (!Array.isArray(protocols)) { protocols = [protocols] } await Promise.all( protocols.map(async protocol => { - await this.components.getRegistrar().handle(protocol, handler) + await this.components.getRegistrar().handle(protocol, handler, options) }) ) } diff --git a/src/ping/index.ts b/src/ping/index.ts index c7766ed3..f755bd51 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -18,21 +18,28 @@ const log = logger('libp2p:ping') export interface PingServiceInit { protocolPrefix: string + maxInboundStreams: number + maxOutboundStreams: number } export class PingService implements Startable { public readonly protocol: string private readonly components: Components private started: boolean + private readonly init: PingServiceInit constructor (components: Components, init: PingServiceInit) { this.components = components this.started = false this.protocol = `/${init.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + this.init = init } async start () { - await this.components.getRegistrar().handle(this.protocol, this.handleMessage) + await this.components.getRegistrar().handle(this.protocol, this.handleMessage, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams + }) this.started = true } @@ -67,7 +74,7 @@ export class PingService implements Startable { log('dialing %s to %p', this.protocol, peer) const connection = await this.components.getConnectionManager().openConnection(peer, options) - const { stream } = await connection.newStream([this.protocol], options) + const stream = await connection.newStream([this.protocol], options) const start = Date.now() const data = randomBytes(PING_LENGTH) diff --git a/src/registrar.ts b/src/registrar.ts index 5cc8a7e7..d9304d25 100644 --- a/src/registrar.ts +++ b/src/registrar.ts @@ -10,8 +10,8 @@ import type { Components } from '@libp2p/components' const log = logger('libp2p:registrar') -const DEFAULT_MAX_INCOMING_STREAMS = 1 -const DEFAULT_MAX_OUTGOING_STREAMS = 1 +export const DEFAULT_MAX_INBOUND_STREAMS = 1 +export const DEFAULT_MAX_OUTBOUND_STREAMS = 1 /** * Responsible for notifying registered protocols of events in the network. @@ -46,7 +46,7 @@ export class DefaultRegistrar implements Registrar { const handler = this.handlers.get(protocol) if (handler == null) { - throw new Error(`No handler registered for protocol ${protocol}`) + throw errCode(new Error(`No handler registered for protocol ${protocol}`), codes.ERR_NO_HANDLER_FOR_PROTOCOL) } return handler @@ -72,9 +72,9 @@ export class DefaultRegistrar implements Registrar { throw errCode(new Error(`Handler already registered for protocol ${protocol}`), codes.ERR_PROTOCOL_HANDLER_ALREADY_REGISTERED) } - const options = merge({ - maxIncomingStreams: DEFAULT_MAX_INCOMING_STREAMS, - maxOutgoingStreams: DEFAULT_MAX_OUTGOING_STREAMS + const options = merge.bind({ ignoreUndefined: true })({ + maxInboundStreams: DEFAULT_MAX_INBOUND_STREAMS, + maxOutboundStreams: DEFAULT_MAX_OUTBOUND_STREAMS }, opts) this.handlers.set(protocol, { diff --git a/src/upgrader.ts b/src/upgrader.ts index 961fd10d..cb344dff 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -8,7 +8,7 @@ import { codes } from './errors.js' import { createConnection } from '@libp2p/connection' import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' import { peerIdFromString } from '@libp2p/peer-id' -import type { MultiaddrConnection, Connection, ProtocolStream, Stream } from '@libp2p/interface-connection' +import type { MultiaddrConnection, Connection, Stream } from '@libp2p/interface-connection' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { PeerId } from '@libp2p/interface-peer-id' @@ -16,6 +16,8 @@ import type { Upgrader, UpgraderEvents } from '@libp2p/interface-transport' import type { Duplex } from 'it-stream-types' import { Components, isInitializable } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' +import type { Registrar } from '@libp2p/interface-registrar' +import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' const log = logger('libp2p:upgrader') @@ -43,6 +45,46 @@ export interface UpgraderInit { muxers: StreamMuxerFactory[] } +function findIncomingStreamLimit (protocol: string, registrar: Registrar) { + try { + const { options } = registrar.getHandler(protocol) + + return options.maxInboundStreams + } catch (err: any) { + if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) { + throw err + } + } + + return DEFAULT_MAX_INBOUND_STREAMS +} + +function findOutgoingStreamLimit (protocol: string, registrar: Registrar) { + try { + const { options } = registrar.getHandler(protocol) + + return options.maxOutboundStreams + } catch (err: any) { + if (err.code !== codes.ERR_NO_HANDLER_FOR_PROTOCOL) { + throw err + } + } + + return DEFAULT_MAX_OUTBOUND_STREAMS +} + +function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection) { + let streamCount = 0 + + connection.streams.forEach(stream => { + if (stream.stat.direction === direction && stream.stat.protocol === protocol) { + streamCount++ + } + }) + + return streamCount +} + export class DefaultUpgrader extends EventEmitter implements Upgrader { private readonly components: Components private readonly connectionEncryption: Map @@ -267,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg } = opts let muxer: StreamMuxer | undefined - let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise) | undefined + let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise) | undefined let connection: Connection // eslint-disable-line prefer-const if (muxerFactory != null) { @@ -296,13 +338,22 @@ export class DefaultUpgrader extends EventEmitter implements Upg return } - connection.addStream(muxedStream, { protocol }) + const incomingLimit = findIncomingStreamLimit(protocol, this.components.getRegistrar()) + const streamCount = countStreams(protocol, 'inbound', connection) + + if (streamCount === incomingLimit) { + throw errCode(new Error('Too many incoming protocol streams'), codes.ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS) + } + + muxedStream.stat.protocol = protocol + + connection.addStream(muxedStream) this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol }) }) .catch(err => { log.error(err) - if (muxedStream.timeline.close == null) { + if (muxedStream.stat.timeline.close == null) { muxedStream.close() } }) @@ -317,7 +368,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg muxer.init(this.components) } - newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { + newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { if (muxer == null) { throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE) } @@ -334,11 +385,27 @@ export class DefaultUpgrader extends EventEmitter implements Upg stream = metrics.trackStream({ stream, remotePeer, protocol }) } - return { stream: { ...muxedStream, ...stream }, protocol } + const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar()) + const streamCount = countStreams(protocol, 'outbound', connection) + + if (streamCount === outgoingLimit) { + throw errCode(new Error('Too many outgoing protocol streams'), codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + } + + muxedStream.stat.protocol = protocol + + return { + ...muxedStream, + ...stream, + stat: { + ...muxedStream.stat, + protocol + } + } } catch (err: any) { log.error('could not create new stream', err) - if (muxedStream.timeline.close == null) { + if (muxedStream.stat.timeline.close == null) { muxedStream.close() } @@ -402,9 +469,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg await maConn.close() // Ensure remaining streams are closed if (muxer != null) { - await Promise.all(muxer.streams.map(async stream => { - await stream.close() - })) + muxer.streams.forEach(s => s.close()) } } }) @@ -422,7 +487,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg _onStream (opts: OnStreamOptions): void { const { connection, stream, protocol } = opts const { handler } = this.components.getRegistrar().getHandler(protocol) - handler({ connection, stream, protocol }) + + handler({ connection, stream }) } /** diff --git a/test/content-routing/dht/operation.node.ts b/test/content-routing/dht/operation.node.ts index 412ae765..82b03e67 100644 --- a/test/content-routing/dht/operation.node.ts +++ b/test/content-routing/dht/operation.node.ts @@ -73,9 +73,9 @@ describe('DHT subsystem operates correctly', () => { }) it('should get notified of connected peers on dial', async () => { - const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) + const stream = await libp2p.dialProtocol(remAddr, subsystemMulticodecs) - expect(connection).to.exist() + expect(stream).to.exist() return await Promise.all([ pWaitFor(() => libp2p.dht.lan.routingTable.size === 1), diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index c5e3b4f5..428aba06 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -307,9 +307,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream(['/echo/1.0.0']) + const stream = await connection.newStream(['/echo/1.0.0']) expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') expect(dialerDialSpy.callCount).to.be.greaterThan(0) await connection.close() }) @@ -336,9 +336,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remotePeerId) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(dialerDialSpy.callCount).to.be.greaterThan(0) }) @@ -377,7 +377,7 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remotePeerId) // Create local to remote streams - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') await connection.newStream('/stream-count/3') await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4') @@ -487,9 +487,9 @@ describe('libp2p.dialer (direct, TCP)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(protectorProtectSpy.callCount).to.equal(1) }) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index e4112c97..1af0e7c6 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -427,9 +427,9 @@ describe('libp2p.dialer (direct, WebSockets)', () => { const connection = await libp2p.dial(MULTIADDRS_WEBSOCKETS[0]) expect(connection).to.exist() - const { stream, protocol } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() - expect(protocol).to.equal('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') await connection.close() expect(dialerDialSpy.callCount).to.be.at.least(1) expect(addressBookAddSpy.callCount).to.be.at.least(1) diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts index f776d777..eadf4f29 100644 --- a/test/fetch/index.spec.ts +++ b/test/fetch/index.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' -import { FetchService } from '../../src/fetch/index.js' +import { FetchService, FetchServiceInit } from '../../src/fetch/index.js' import Peers from '../fixtures/peers.js' import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import { pipe } from 'it-pipe' -const defaultInit = { - protocolPrefix: 'ipfs' +const defaultInit: FetchServiceInit = { + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 } async function createComponents (index: number) { @@ -127,7 +129,7 @@ describe('fetch', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) }) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index b59bbe07..3d6623a0 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -6,7 +6,7 @@ import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { codes } from '../../src/errors.js' -import { IdentifyService, Message } from '../../src/identify/index.js' +import { IdentifyService, IdentifyServiceInit, Message } from '../../src/identify/index.js' import Peers from '../fixtures/peers.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultAddressManager } from '../../src/address-manager/index.js' @@ -32,11 +32,15 @@ import pDefer from 'p-defer' const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] -const defaultInit = { +const defaultInit: IdentifyServiceInit = { protocolPrefix: 'ipfs', host: { agentVersion: 'v1.0.0' - } + }, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -130,6 +134,7 @@ describe('identify', () => { it('should be able to identify another peer with no certified peer records support', async () => { const agentVersion = 'js-libp2p/5.0.0' const localIdentify = new IdentifyService(localComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion: agentVersion @@ -137,6 +142,7 @@ describe('identify', () => { }) await start(localIdentify) const remoteIdentify = new IdentifyService(remoteComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion: agentVersion @@ -209,6 +215,7 @@ describe('identify', () => { it('should store own host data and protocol version into metadataBook on start', async () => { const agentVersion = 'js-project/1.0.0' const localIdentify = new IdentifyService(localComponents, { + ...defaultInit, protocolPrefix: 'ipfs', host: { agentVersion @@ -270,8 +277,8 @@ describe('identify', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) it('should limit incoming identify message sizes', async () => { diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts index e9b5a7df..9dbf95c8 100644 --- a/test/identify/push.spec.ts +++ b/test/identify/push.spec.ts @@ -3,7 +3,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' -import { IdentifyService } from '../../src/identify/index.js' +import { IdentifyService, IdentifyServiceInit } from '../../src/identify/index.js' import Peers from '../fixtures/peers.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultAddressManager } from '../../src/address-manager/index.js' @@ -27,11 +27,15 @@ import { start, stop } from '@libp2p/interfaces/startable' const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] -const defaultInit = { +const defaultInit: IdentifyServiceInit = { protocolPrefix: 'ipfs', host: { agentVersion: 'v1.0.0' - } + }, + maxInboundStreams: 1, + maxOutboundStreams: 1, + maxPushIncomingStreams: 1, + maxPushOutgoingStreams: 1 } const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] @@ -213,8 +217,8 @@ describe('identify (push)', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') // method should have returned before the remote handler completes as we timed // out so we ignore the return value diff --git a/test/metrics/index.node.ts b/test/metrics/index.node.ts index 1be7558d..89296b1e 100644 --- a/test/metrics/index.node.ts +++ b/test/metrics/index.node.ts @@ -91,7 +91,7 @@ describe('libp2p.metrics', () => { }) const connection = await libp2p.dial(remoteLibp2p.peerId) - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') const bytes = randomBytes(512) const result = await pipe( @@ -156,7 +156,7 @@ describe('libp2p.metrics', () => { }) const connection = await libp2p.dial(remoteLibp2p.peerId) - const { stream } = await connection.newStream('/echo/1.0.0') + const stream = await connection.newStream('/echo/1.0.0') const bytes = randomBytes(512) await pipe( diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts index b8be7e64..1acf352e 100644 --- a/test/ping/index.spec.ts +++ b/test/ping/index.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' -import { PingService } from '../../src/ping/index.js' +import { PingService, PingServiceInit } from '../../src/ping/index.js' import Peers from '../fixtures/peers.js' import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-mocks' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -14,8 +14,10 @@ import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import { pipe } from 'it-pipe' -const defaultInit = { - protocolPrefix: 'ipfs' +const defaultInit: PingServiceInit = { + protocolPrefix: 'ipfs', + maxInboundStreams: 1, + maxOutboundStreams: 1 } async function createComponents (index: number) { @@ -116,7 +118,7 @@ describe('ping', () => { // should have closed stream expect(newStreamSpy).to.have.property('callCount', 1) - const { stream } = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') + const stream = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('stat.timeline.close') }) }) diff --git a/test/ping/ping.node.ts b/test/ping/ping.node.ts index 8b64ffba..5ee6a13a 100644 --- a/test/ping/ping.node.ts +++ b/test/ping/ping.node.ts @@ -1,7 +1,6 @@ /* eslint-env mocha */ import { expect } from 'aegir/chai' -import pTimes from 'p-times' import { pipe } from 'it-pipe' import { createNode, populateAddressBooks } from '../utils/creators/peer.js' import { createBaseOptions } from '../utils/base-options.js' @@ -41,7 +40,11 @@ describe('ping', () => { }) it('ping several times for getting an average', async () => { - const latencies = await pTimes(5, async () => await nodes[1].ping(nodes[0].peerId)) + const latencies = [] + + for (let i = 0; i < 5; i++) { + latencies.push(await nodes[1].ping(nodes[0].peerId)) + } const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length expect(averageLatency).to.be.a('Number') diff --git a/test/relay/relay.node.ts b/test/relay/relay.node.ts index 8b33ff07..60e1bf5f 100644 --- a/test/relay/relay.node.ts +++ b/test/relay/relay.node.ts @@ -80,7 +80,7 @@ describe('Dialing (via relay, TCP)', () => { expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerId.toBytes()) expect(connection.remoteAddr).to.eql(dialAddr) - const { stream: echoStream } = await connection.newStream('/echo/1.0.0') + const echoStream = await connection.newStream('/echo/1.0.0') const input = uint8ArrayFromString('hello') const [output] = await pipe( @@ -156,7 +156,7 @@ describe('Dialing (via relay, TCP)', () => { // send an invalid relay message from the relay to the destination peer const connections = relayLibp2p.getConnections(dstLibp2p.peerId) - const { stream } = await connections[0].newStream(RELAY_CODEC) + const stream = await connections[0].newStream(RELAY_CODEC) const streamHandler = new StreamHandler({ stream }) streamHandler.write({ type: CircuitRelay.Type.STATUS diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 216343d5..ac7806ba 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -85,9 +85,15 @@ describe('Upgrader', () => { await localComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { void pipe(stream, stream) + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 }) await remoteComponents.getRegistrar().handle('/echo/1.0.0', ({ stream }) => { void pipe(stream, stream) + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 }) }) @@ -105,8 +111,8 @@ describe('Upgrader', () => { expect(connections).to.have.length(2) - const { stream, protocol } = await connections[0].newStream('/echo/1.0.0') - expect(protocol).to.equal('/echo/1.0.0') + const stream = await connections[0].newStream('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') const hello = uint8ArrayFromString('hello there!') const result = await pipe( @@ -175,8 +181,8 @@ describe('Upgrader', () => { expect(connections).to.have.length(2) - const { stream, protocol } = await connections[0].newStream('/echo/1.0.0') - expect(protocol).to.equal('/echo/1.0.0') + const stream = await connections[0].newStream('/echo/1.0.0') + expect(stream).to.have.nested.property('stat.protocol', '/echo/1.0.0') const hello = uint8ArrayFromString('hello there!') const result = await pipe( @@ -515,11 +521,11 @@ describe('libp2p.upgrader', () => { ]) const remoteLibp2pUpgraderOnStreamSpy = sinon.spy(remoteLibp2p.components.getUpgrader() as DefaultUpgrader, '_onStream') - const { stream } = await localConnection.newStream(['/echo/1.0.0']) - expect(stream).to.include.keys(['id', 'close', 'reset', 'timeline']) + const stream = await localConnection.newStream(['/echo/1.0.0']) + expect(stream).to.include.keys(['id', 'close', 'reset', 'stat']) const [arg0] = remoteLibp2pUpgraderOnStreamSpy.getCall(0).args - expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'timeline']) + expect(arg0.stream).to.include.keys(['id', 'close', 'reset', 'stat']) }) it('should emit connect and disconnect events', async () => { @@ -579,4 +585,128 @@ describe('libp2p.upgrader', () => { // @ts-expect-error detail is only on CustomEvent type expect(remotePeer.equals(event.detail.remotePeer)).to.equal(true) }) + + it('should limit the number of incoming streams that can be opened using a protocol', async () => { + const protocol = '/a-test-protocol/1.0.0' + const remotePeer = peers[1] + libp2p = await createLibp2pNode({ + peerId: peers[0], + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await libp2p.start() + + remoteLibp2p = await createLibp2pNode({ + peerId: remotePeer, + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await remoteLibp2p.start() + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const [localToRemote] = await Promise.all([ + libp2p.components.getUpgrader().upgradeOutbound(outbound), + remoteLibp2p.components.getUpgrader().upgradeInbound(inbound) + ]) + + let streamCount = 0 + + await libp2p.handle(protocol, (data) => {}, { + maxInboundStreams: 10, + maxOutboundStreams: 10 + }) + + await remoteLibp2p.handle(protocol, (data) => { + streamCount++ + }, { + maxInboundStreams: 1, + maxOutboundStreams: 1 + }) + + expect(streamCount).to.equal(0) + + await localToRemote.newStream(protocol) + + expect(streamCount).to.equal(1) + + await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() + .with.property('code', 'ERR_UNDER_READ') + }) + + it('should limit the number of outgoing streams that can be opened using a protocol', async () => { + const protocol = '/a-test-protocol/1.0.0' + const remotePeer = peers[1] + libp2p = await createLibp2pNode({ + peerId: peers[0], + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await libp2p.start() + + remoteLibp2p = await createLibp2pNode({ + peerId: remotePeer, + transports: [ + new WebSockets() + ], + streamMuxers: [ + new Mplex() + ], + connectionEncryption: [ + NOISE + ] + }) + await remoteLibp2p.start() + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const [localToRemote] = await Promise.all([ + libp2p.components.getUpgrader().upgradeOutbound(outbound), + remoteLibp2p.components.getUpgrader().upgradeInbound(inbound) + ]) + + let streamCount = 0 + + await libp2p.handle(protocol, (data) => {}, { + maxInboundStreams: 1, + maxOutboundStreams: 1 + }) + + await remoteLibp2p.handle(protocol, (data) => { + streamCount++ + }, { + maxInboundStreams: 10, + maxOutboundStreams: 10 + }) + + expect(streamCount).to.equal(0) + + await localToRemote.newStream(protocol) + + expect(streamCount).to.equal(1) + + await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected() + .with.property('code', codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS) + }) })