mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-10 14:21:33 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
893f8c280f | |||
824720fb8f | |||
4c0c2c6d3e | |||
a1220d22f5 | |||
5934b13cce | |||
b09eb8fc53 | |||
35f9c0c793 | |||
d5386df684 | |||
1f5d5c2de1 |
17
CHANGELOG.md
17
CHANGELOG.md
@ -10,6 +10,23 @@
|
||||
|
||||
|
||||
|
||||
### [0.37.2](https://www.github.com/libp2p/js-libp2p/compare/v0.37.1...v0.37.2) (2022-05-31)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* reduce identify message size limit ([#1230](https://www.github.com/libp2p/js-libp2p/issues/1230)) ([824720f](https://www.github.com/libp2p/js-libp2p/commit/824720fb8f21f868ed88e881fbc3ce6b9459600d))
|
||||
|
||||
### [0.37.1](https://www.github.com/libp2p/js-libp2p/compare/v0.37.0...v0.37.1) (2022-05-25)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* do upnp hole punch after startup ([#1217](https://www.github.com/libp2p/js-libp2p/issues/1217)) ([d5386df](https://www.github.com/libp2p/js-libp2p/commit/d5386df68478a71ac269acb2d00d36a7a5c9ebc5))
|
||||
* explicitly close streams when connnections close ([#1221](https://www.github.com/libp2p/js-libp2p/issues/1221)) ([b09eb8f](https://www.github.com/libp2p/js-libp2p/commit/b09eb8fc53ec1d8f6280d681c9ca6a467ec259b5))
|
||||
* fix unintended aborts in dialer ([#1185](https://www.github.com/libp2p/js-libp2p/issues/1185)) ([35f9c0c](https://www.github.com/libp2p/js-libp2p/commit/35f9c0c79387232465848b450a47cafe841405e7))
|
||||
* time out slow reads ([#1227](https://www.github.com/libp2p/js-libp2p/issues/1227)) ([a1220d2](https://www.github.com/libp2p/js-libp2p/commit/a1220d22f5affb64e64dec0cd6a92cd8241b26df))
|
||||
|
||||
## [0.37.0](https://www.github.com/libp2p/js-libp2p/compare/v0.36.2...v0.37.0) (2022-05-16)
|
||||
|
||||
|
||||
|
@ -97,7 +97,9 @@ Creates an instance of Libp2p.
|
||||
| options.modules | [`Array<object>`](./CONFIGURATION.md#modules) | libp2p [modules](./CONFIGURATION.md#modules) to use |
|
||||
| [options.addresses] | `{ listen: Array<string>, announce: Array<string>, announceFilter: (ma: Array<multiaddr>) => Array<multiaddr> }` | Addresses for transport listening and to advertise to the network |
|
||||
| [options.config] | `object` | libp2p modules configuration and core configuration |
|
||||
| [options.host] | `{ agentVersion: string }` | libp2p host options |
|
||||
| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number, maxIdentifyMessageSize: number }` | libp2p identify protocol options |
|
||||
| [options.ping] | `{ protocolPrefix: string }` | libp2p ping protocol options |
|
||||
| [options.fetch] | `{ protocolPrefix: string }` | libp2p fetch protocol options |
|
||||
| [options.connectionManager] | [`object`](./CONFIGURATION.md#configuring-connection-manager) | libp2p Connection Manager [configuration](./CONFIGURATION.md#configuring-connection-manager) |
|
||||
| [options.transportManager] | [`object`](./CONFIGURATION.md#configuring-transport-manager) | libp2p transport manager [configuration](./CONFIGURATION.md#configuring-transport-manager) |
|
||||
| [options.datastore] | `object` | must implement [ipfs/interface-datastore](https://github.com/ipfs/interface-datastore) (in memory datastore will be used if not provided) |
|
||||
|
@ -885,7 +885,12 @@ Changing the protocol name prefix can isolate default public network (IPFS) for
|
||||
|
||||
```js
|
||||
const node = await createLibp2p({
|
||||
protocolPrefix: 'ipfs' // default
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs' // default
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: 'ipfs' // default
|
||||
}
|
||||
})
|
||||
/*
|
||||
protocols: [
|
||||
|
@ -10,6 +10,7 @@ A migration guide for refactoring your application code from libp2p v0.36.x to v
|
||||
- [Config](#config)
|
||||
- [Bundled modules](#bundled-modules)
|
||||
- [Events](#events)
|
||||
- [Pubsub](#pubsub)
|
||||
|
||||
## ESM
|
||||
|
||||
@ -36,6 +37,7 @@ The following changes have been made to the configuration object:
|
||||
3. Use of the `enabled` flag has been removed - if you don't want a particular feature enabled, don't pass a module implementing that feature
|
||||
4. Some keys have been renamed = `transport` -> `transports`, `streamMuxer` -> `streamMuxers`, `connEncryption` -> `connectionEncryption`, etc
|
||||
5. Keys from `config.dialer` have been moved to `config.connectionManager` as the connection manager is now responsible for managing connections
|
||||
6. The `protocolPrefix` configuration option is now passed on a per-protocol basis for `identify`, `fetch` and `ping`
|
||||
|
||||
**Before**
|
||||
|
||||
@ -70,6 +72,7 @@ const node = await Libp2p.create({
|
||||
MulticastDNS
|
||||
]
|
||||
},
|
||||
protocolPrefix: 'ipfs',
|
||||
config: {
|
||||
peerDiscovery: {
|
||||
autoDial: true,
|
||||
@ -135,7 +138,10 @@ const node = await createLibp2p({
|
||||
new MulticastDNS({
|
||||
interval: 1000
|
||||
})
|
||||
]
|
||||
],
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
@ -202,7 +208,7 @@ libp2p.addEventListener('peer:discovery', handler)
|
||||
libp2p.removeEventListener('peer:discovery', handler)
|
||||
```
|
||||
|
||||
## Pubsub
|
||||
## Pubsub
|
||||
|
||||
Similar to the events refactor above, pubsub is now driven by the standard [EventTarget](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget) API.
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^6.1.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"ipfs-core": "^0.14.1",
|
||||
"libp2p": "../../",
|
||||
"@libp2p/delegated-content-routing": "^1.0.1",
|
||||
|
@ -9,7 +9,7 @@
|
||||
},
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^6.1.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"@libp2p/webrtc-star": "^1.0.8",
|
||||
|
@ -11,6 +11,7 @@
|
||||
"dependencies": {
|
||||
"@libp2p/pubsub-peer-discovery": "^5.0.2",
|
||||
"@libp2p/floodsub": "^1.0.6",
|
||||
"@nodeutils/defaults-deep": "^1.1.0",
|
||||
"execa": "^2.1.0",
|
||||
"fs-extra": "^8.1.0",
|
||||
"libp2p": "../",
|
||||
|
@ -167,10 +167,105 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t
|
||||
|
||||
With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream.
|
||||
|
||||
You can see this working on example [3.js](./3.js). The result should look like the following:
|
||||
You can see this working on example [3.js](./3.js).
|
||||
|
||||
As we've seen earlier, we can create our node with this createNode function.
|
||||
```js
|
||||
const createNode = async () => {
|
||||
const node = await Libp2p.create({
|
||||
addresses: {
|
||||
listen: ['/ip4/0.0.0.0/tcp/0']
|
||||
},
|
||||
modules: {
|
||||
transport: [TCP],
|
||||
streamMuxer: [MPLEX],
|
||||
connEncryption: [NOISE]
|
||||
}
|
||||
})
|
||||
|
||||
await node.start()
|
||||
|
||||
return node
|
||||
}
|
||||
```
|
||||
|
||||
We can now create our two nodes for this example.
|
||||
```js
|
||||
const [node1, node2] = await Promise.all([
|
||||
createNode(),
|
||||
createNode()
|
||||
])
|
||||
```
|
||||
|
||||
Since, we want to connect these nodes `node1` & `node2`, we add our `node2` multiaddr in key-value pair in `node1` peer store.
|
||||
```js
|
||||
await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
|
||||
```
|
||||
|
||||
You may notice that we are only adding `node2` to `node1` peer store. This is because we want to dial up a bidirectional connection between these two nodes.
|
||||
|
||||
Finally, let's create protocols for `node1` & `node2` and dial those protocols.
|
||||
```js
|
||||
node1.handle('/node-1', ({ stream }) => {
|
||||
pipe(
|
||||
stream,
|
||||
async function (source) {
|
||||
for await (const msg of source) {
|
||||
console.log(msg.toString())
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
node2.handle('/node-2', ({ stream }) => {
|
||||
pipe(
|
||||
stream,
|
||||
async function (source) {
|
||||
for await (const msg of source) {
|
||||
console.log(msg.toString())
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
// Dialing node2 from node1
|
||||
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
|
||||
await pipe(
|
||||
['from 1 to 2'],
|
||||
stream1
|
||||
)
|
||||
|
||||
// Dialing node1 from node2
|
||||
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
|
||||
await pipe(
|
||||
['from 2 to 1'],
|
||||
stream2
|
||||
)
|
||||
```
|
||||
|
||||
If we run this code, the result should look like the following:
|
||||
|
||||
```Bash
|
||||
> node 3.js
|
||||
from 1 to 2
|
||||
from 2 to 1
|
||||
```
|
||||
|
||||
So, we have successfully set up a bidirectional connection with protocol muxing. But you should be aware that we were able to dial from `node2` to `node1` even we haven't added the `node1` peerId to node2 address book is because we dialed node2 from node1 first. Then, we just dialed back our stream out from `node2` to `node1`. So, if we dial from `node2` to `node1` before dialing from `node1` to `node2` we will get an error.
|
||||
|
||||
The code below will result into an error as `the dial address is not valid`.
|
||||
```js
|
||||
// Dialing from node2 to node1
|
||||
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
|
||||
await pipe(
|
||||
['from 2 to 1'],
|
||||
stream2
|
||||
)
|
||||
|
||||
// Dialing from node1 to node2
|
||||
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
|
||||
await pipe(
|
||||
['from 1 to 2'],
|
||||
stream1
|
||||
)
|
||||
```
|
@ -10,7 +10,7 @@
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@libp2p/webrtc-direct": "^1.0.1",
|
||||
"@chainsafe/libp2p-noise": "^6.1.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"libp2p": "../../",
|
||||
|
20
package.json
20
package.json
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "libp2p",
|
||||
"version": "0.37.0",
|
||||
"version": "0.37.2",
|
||||
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"homepage": "https://github.com/libp2p/js-libp2p#readme",
|
||||
@ -92,10 +92,10 @@
|
||||
"test:interop": "aegir test -t node -f dist/test/interop.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@achingbrain/nat-port-mapper": "^1.0.0",
|
||||
"@libp2p/connection": "^1.1.5",
|
||||
"@achingbrain/nat-port-mapper": "^1.0.3",
|
||||
"@libp2p/connection": "^2.0.2",
|
||||
"@libp2p/crypto": "^0.22.11",
|
||||
"@libp2p/interfaces": "^1.3.31",
|
||||
"@libp2p/interfaces": "^2.0.2",
|
||||
"@libp2p/logger": "^1.1.4",
|
||||
"@libp2p/multistream-select": "^1.0.4",
|
||||
"@libp2p/peer-collections": "^1.0.2",
|
||||
@ -127,7 +127,6 @@
|
||||
"it-pipe": "^2.0.3",
|
||||
"it-sort": "^1.0.1",
|
||||
"it-stream-types": "^1.0.4",
|
||||
"it-take": "^1.0.2",
|
||||
"merge-options": "^3.0.4",
|
||||
"multiformats": "^9.6.3",
|
||||
"mutable-proxy": "^1.0.0",
|
||||
@ -146,31 +145,28 @@
|
||||
"xsalsa20": "^1.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@chainsafe/libp2p-noise": "^6.1.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/daemon-client": "^1.0.2",
|
||||
"@libp2p/daemon-server": "^1.0.2",
|
||||
"@libp2p/delegated-content-routing": "^1.0.2",
|
||||
"@libp2p/delegated-peer-routing": "^1.0.2",
|
||||
"@libp2p/floodsub": "^1.0.6",
|
||||
"@libp2p/interface-compliance-tests": "^1.1.32",
|
||||
"@libp2p/interface-compliance-tests": "^2.0.3",
|
||||
"@libp2p/interop": "^1.0.3",
|
||||
"@libp2p/kad-dht": "^1.0.9",
|
||||
"@libp2p/mdns": "^1.0.5",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"@libp2p/mplex": "^1.1.0",
|
||||
"@libp2p/pubsub": "^1.2.18",
|
||||
"@libp2p/tcp": "^1.0.9",
|
||||
"@libp2p/topology": "^1.1.7",
|
||||
"@libp2p/webrtc-star": "^1.0.8",
|
||||
"@libp2p/websockets": "^1.0.7",
|
||||
"@nodeutils/defaults-deep": "^1.1.0",
|
||||
"@types/node": "^16.11.26",
|
||||
"@types/node-forge": "^1.0.0",
|
||||
"@types/p-fifo": "^1.0.0",
|
||||
"@types/varint": "^6.0.0",
|
||||
"@types/xsalsa20": "^1.1.0",
|
||||
"aegir": "^37.0.9",
|
||||
"buffer": "^6.0.3",
|
||||
"cborg": "^1.8.1",
|
||||
"delay": "^5.0.0",
|
||||
"execa": "^6.1.0",
|
||||
@ -187,7 +183,7 @@
|
||||
"p-wait-for": "^4.1.0",
|
||||
"protons": "^3.0.4",
|
||||
"rimraf": "^3.0.2",
|
||||
"sinon": "^13.0.1",
|
||||
"sinon": "^14.0.0",
|
||||
"ts-sinon": "^2.0.2"
|
||||
},
|
||||
"browser": {
|
||||
|
@ -35,9 +35,6 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
transportManager: {
|
||||
faultTolerance: FaultTolerance.FATAL_ALL
|
||||
},
|
||||
host: {
|
||||
agentVersion: AGENT_VERSION
|
||||
},
|
||||
metrics: {
|
||||
enabled: false,
|
||||
computeThrottleMaxQueueSize: 1000,
|
||||
@ -56,7 +53,6 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
bootDelay: 10e3
|
||||
}
|
||||
},
|
||||
protocolPrefix: 'ipfs',
|
||||
nat: {
|
||||
enabled: true,
|
||||
ttl: 7200,
|
||||
@ -77,6 +73,19 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
enabled: false,
|
||||
maxListeners: 2
|
||||
}
|
||||
},
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs',
|
||||
host: {
|
||||
agentVersion: AGENT_VERSION
|
||||
},
|
||||
timeout: 30000
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: 'ipfs'
|
||||
},
|
||||
fetch: {
|
||||
protocolPrefix: 'libp2p'
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
import errCode from 'err-code'
|
||||
import { anySignal } from 'any-signal'
|
||||
import FIFO from 'p-fifo'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import { codes } from '../../errors.js'
|
||||
import { logger } from '@libp2p/logger'
|
||||
@ -62,7 +61,7 @@ export class DialRequest {
|
||||
})
|
||||
}
|
||||
|
||||
const dialAbortControllers = this.addrs.map(() => {
|
||||
const dialAbortControllers: Array<(AbortController | undefined)> = this.addrs.map(() => {
|
||||
const controller = new AbortController()
|
||||
try {
|
||||
// fails on node < 15.4
|
||||
@ -80,16 +79,27 @@ export class DialRequest {
|
||||
}
|
||||
|
||||
let completedDials = 0
|
||||
let done = false
|
||||
|
||||
try {
|
||||
return await Promise.any(this.addrs.map(async (addr, i) => {
|
||||
const token = await tokenHolder.shift() // get token
|
||||
// End attempt once another attempt succeeded
|
||||
if (done) {
|
||||
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
|
||||
throw errCode(new Error('dialAction already succeeded'), codes.ERR_ALREADY_SUCCEEDED)
|
||||
}
|
||||
|
||||
const controller = dialAbortControllers[i]
|
||||
if (controller == null) {
|
||||
throw errCode(new Error('dialAction did not come with an AbortController'), codes.ERR_INVALID_PARAMETERS)
|
||||
}
|
||||
let conn
|
||||
try {
|
||||
const signal = dialAbortControllers[i].signal
|
||||
const signal = controller.signal
|
||||
conn = await this.dialAction(addr, { ...options, signal: (options.signal != null) ? anySignal([signal, options.signal]) : signal })
|
||||
// Remove the successful AbortController so it is not aborted
|
||||
dialAbortControllers.splice(i, 1)
|
||||
dialAbortControllers[i] = undefined
|
||||
} finally {
|
||||
completedDials++
|
||||
// If we have more or equal dials remaining than tokens, recycle the token, otherwise release it
|
||||
@ -102,10 +112,25 @@ export class DialRequest {
|
||||
}
|
||||
}
|
||||
|
||||
if (conn == null) {
|
||||
// Notify Promise.any that attempt was not successful
|
||||
// to prevent from returning undefined despite there
|
||||
// were successful dial attempts
|
||||
throw errCode(new Error('dialAction led to empty object'), codes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
} else {
|
||||
// This dial succeeded, don't attempt anything else
|
||||
done = true
|
||||
}
|
||||
|
||||
return conn
|
||||
}))
|
||||
} finally {
|
||||
dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else
|
||||
// success/failure happened, abort everything else
|
||||
dialAbortControllers.forEach(c => {
|
||||
if (c !== undefined) {
|
||||
c.abort()
|
||||
}
|
||||
})
|
||||
tokens.forEach(token => this.dialer.releaseToken(token)) // release tokens back to the dialer
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import { Multiaddr, Resolver } from '@multiformats/multiaddr'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import { AbortError } from '@libp2p/interfaces/errors'
|
||||
import { anySignal } from 'any-signal'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import { DialAction, DialRequest } from './dial-request.js'
|
||||
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
|
||||
|
@ -10,7 +10,6 @@ import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import { trackedMap } from '@libp2p/tracked-map'
|
||||
import { codes } from '../errors.js'
|
||||
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
@ -254,10 +253,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
*/
|
||||
async _close () {
|
||||
// Close all connections we're tracking
|
||||
const tasks = []
|
||||
const tasks: Array<Promise<void>> = []
|
||||
for (const connectionList of this.connections.values()) {
|
||||
for (const connection of connectionList) {
|
||||
tasks.push(connection.close())
|
||||
tasks.push((async () => {
|
||||
try {
|
||||
await connection.close()
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
})())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,5 +69,6 @@ export enum codes {
|
||||
ERR_INVALID_PASS_LENGTH = 'ERR_INVALID_PASS_LENGTH',
|
||||
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
|
||||
ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK',
|
||||
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD'
|
||||
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD',
|
||||
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED'
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
|
||||
// https://github.com/libp2p/specs/tree/master/fetch#wire-protocol
|
||||
export const PROTOCOL = '/libp2p/fetch/0.0.1'
|
||||
export const PROTOCOL_VERSION = '0.0.1'
|
||||
export const PROTOCOL_NAME = 'fetch'
|
||||
|
@ -4,16 +4,19 @@ import { codes } from '../errors.js'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { FetchRequest, FetchResponse } from './pb/proto.js'
|
||||
import { handshake } from 'it-handshake'
|
||||
import { PROTOCOL } from './constants.js'
|
||||
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
|
||||
const log = logger('libp2p:fetch')
|
||||
|
||||
export interface FetchInit {
|
||||
export interface FetchServiceInit {
|
||||
protocolPrefix: string
|
||||
}
|
||||
|
||||
@ -33,15 +36,15 @@ export interface LookupFunction {
|
||||
* by a fixed prefix that all keys that should be routed to that lookup function will start with.
|
||||
*/
|
||||
export class FetchService implements Startable {
|
||||
public readonly protocol: string
|
||||
private readonly components: Components
|
||||
private readonly lookupFunctions: Map<string, LookupFunction>
|
||||
private readonly protocol: string
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: FetchInit) {
|
||||
constructor (components: Components, init: FetchServiceInit) {
|
||||
this.started = false
|
||||
this.components = components
|
||||
this.protocol = PROTOCOL
|
||||
this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
|
||||
this.lookupFunctions = new Map() // Maps key prefix to value lookup function
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
}
|
||||
@ -67,12 +70,19 @@ export class FetchService implements Startable {
|
||||
/**
|
||||
* Sends a request to fetch the value associated with the given key from the given peer
|
||||
*/
|
||||
async fetch (peer: PeerId, key: string): Promise<Uint8Array | null> {
|
||||
async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
|
||||
log('dialing %s to %p', this.protocol, peer)
|
||||
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer)
|
||||
const { stream } = await connection.newStream([this.protocol])
|
||||
const shake = handshake(stream)
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
const { stream } = await connection.newStream([this.protocol], options)
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
|
||||
const shake = handshake(source)
|
||||
|
||||
// send message
|
||||
shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice())
|
||||
|
@ -2,8 +2,6 @@ import { logger } from '@libp2p/logger'
|
||||
import errCode from 'err-code'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { pipe } from 'it-pipe'
|
||||
import all from 'it-all'
|
||||
import take from 'it-take'
|
||||
import drain from 'it-drain'
|
||||
import first from 'it-first'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
@ -21,20 +19,47 @@ import {
|
||||
} from './consts.js'
|
||||
import { codes } from '../errors.js'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { Connection, Stream } from '@libp2p/interfaces/connection'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import { peerIdFromKeys } from '@libp2p/peer-id'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
|
||||
const log = logger('libp2p:identify')
|
||||
|
||||
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L48
|
||||
const IDENTIFY_TIMEOUT = 60000
|
||||
|
||||
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52
|
||||
const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8
|
||||
|
||||
export interface HostProperties {
|
||||
agentVersion: string
|
||||
}
|
||||
|
||||
export interface IdentifyServiceInit {
|
||||
/**
|
||||
* The prefix to use for the protocol (default: 'ipfs')
|
||||
*/
|
||||
protocolPrefix: string
|
||||
|
||||
/**
|
||||
* What details we should send as part of an identify message
|
||||
*/
|
||||
host: HostProperties
|
||||
|
||||
/**
|
||||
* How long we should wait for a remote peer to send their identify response
|
||||
*/
|
||||
timeout?: number
|
||||
|
||||
/**
|
||||
* Identify responses larger than this in bytes will be rejected (default: 8192)
|
||||
*/
|
||||
maxIdentifyMessageSize?: number
|
||||
}
|
||||
|
||||
export class IdentifyService implements Startable {
|
||||
@ -46,11 +71,13 @@ export class IdentifyService implements Startable {
|
||||
agentVersion: string
|
||||
}
|
||||
|
||||
private readonly init: IdentifyServiceInit
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: IdentifyServiceInit) {
|
||||
this.components = components
|
||||
this.started = false
|
||||
this.init = init
|
||||
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
|
||||
@ -128,8 +155,17 @@ export class IdentifyService implements Startable {
|
||||
const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId())
|
||||
|
||||
const pushes = connections.map(async connection => {
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
let stream: Stream | undefined
|
||||
|
||||
try {
|
||||
const { stream } = await connection.newStream([this.identifyPushProtocolStr])
|
||||
const data = await connection.newStream([this.identifyPushProtocolStr], {
|
||||
signal: timeoutController.signal
|
||||
})
|
||||
stream = data.stream
|
||||
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
await pipe(
|
||||
[Identify.encode({
|
||||
@ -138,12 +174,18 @@ export class IdentifyService implements Startable {
|
||||
protocols
|
||||
})],
|
||||
lp.encode(),
|
||||
stream,
|
||||
source,
|
||||
drain
|
||||
)
|
||||
} catch (err: any) {
|
||||
// Just log errors
|
||||
log.error('could not push identify update to peer', err)
|
||||
} finally {
|
||||
if (stream != null) {
|
||||
stream.close()
|
||||
}
|
||||
|
||||
timeoutController.clear()
|
||||
}
|
||||
})
|
||||
|
||||
@ -175,31 +217,56 @@ export class IdentifyService implements Startable {
|
||||
await this.push(connections)
|
||||
}
|
||||
|
||||
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
|
||||
const { stream } = await connection.newStream([this.identifyProtocolStr], options)
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
let timeoutController
|
||||
let signal = options.signal
|
||||
|
||||
// create a timeout if no abort signal passed
|
||||
if (signal == null) {
|
||||
timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
signal = timeoutController.signal
|
||||
}
|
||||
|
||||
// make stream abortable if AbortSignal passed
|
||||
source = abortableDuplex(stream, signal)
|
||||
|
||||
try {
|
||||
const data = await pipe(
|
||||
[],
|
||||
source,
|
||||
lp.decode({
|
||||
maxDataLength: this.init.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE
|
||||
}),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
|
||||
if (data == null) {
|
||||
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
|
||||
}
|
||||
|
||||
try {
|
||||
return Identify.decode(data)
|
||||
} catch (err: any) {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
} finally {
|
||||
if (timeoutController != null) {
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests the `Identify` message from peer associated with the given `connection`.
|
||||
* If the identified peer does not match the `PeerId` associated with the connection,
|
||||
* an error will be thrown.
|
||||
*/
|
||||
async identify (connection: Connection): Promise<void> {
|
||||
const { stream } = await connection.newStream([this.identifyProtocolStr])
|
||||
const [data] = await pipe(
|
||||
[],
|
||||
stream,
|
||||
lp.decode(),
|
||||
(source) => take(source, 1),
|
||||
async (source) => await all(source)
|
||||
)
|
||||
|
||||
if (data == null) {
|
||||
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
|
||||
}
|
||||
|
||||
let message: Identify
|
||||
try {
|
||||
message = Identify.decode(data)
|
||||
} catch (err: any) {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
async identify (connection: Connection, options: AbortOptions = {}): Promise<void> {
|
||||
const message = await this._identify(connection, options)
|
||||
|
||||
const {
|
||||
publicKey,
|
||||
@ -308,6 +375,8 @@ export class IdentifyService implements Startable {
|
||||
*/
|
||||
async _handleIdentify (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
|
||||
try {
|
||||
const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0)
|
||||
const peerData = await this.components.getPeerStore().get(this.components.getPeerId())
|
||||
@ -335,14 +404,20 @@ export class IdentifyService implements Startable {
|
||||
protocols: peerData.protocols
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
await pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
source,
|
||||
drain
|
||||
)
|
||||
} catch (err: any) {
|
||||
log.error('could not respond to identify request', err)
|
||||
} finally {
|
||||
stream.close()
|
||||
timeoutController.clear()
|
||||
}
|
||||
}
|
||||
|
||||
@ -351,13 +426,19 @@ export class IdentifyService implements Startable {
|
||||
*/
|
||||
async _handlePush (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
|
||||
let message: Identify | undefined
|
||||
try {
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
const data = await pipe(
|
||||
[],
|
||||
stream,
|
||||
lp.decode(),
|
||||
source,
|
||||
lp.decode({
|
||||
maxDataLength: this.init.maxIdentifyMessageSize ?? MAX_IDENTIFY_MESSAGE_SIZE
|
||||
}),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
|
||||
@ -366,6 +447,9 @@ export class IdentifyService implements Startable {
|
||||
}
|
||||
} catch (err: any) {
|
||||
return log.error('received invalid message', err)
|
||||
} finally {
|
||||
stream.close()
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
if (message == null) {
|
||||
|
13
src/index.ts
13
src/index.ts
@ -4,7 +4,7 @@ import type { EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { FaultTolerance } from './transport-manager.js'
|
||||
import type { HostProperties } from './identify/index.js'
|
||||
import type { IdentifyServiceInit } from './identify/index.js'
|
||||
import type { DualDHT } from '@libp2p/interfaces/dht'
|
||||
import type { Datastore } from 'interface-datastore'
|
||||
import type { PeerStore, PeerStoreInit } from '@libp2p/interfaces/peer-store'
|
||||
@ -24,6 +24,8 @@ import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics'
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
import type { KeyChain } from './keychain/index.js'
|
||||
import type { ConnectionManagerInit } from './connection-manager/index.js'
|
||||
import type { PingServiceInit } from './ping/index.js'
|
||||
import type { FetchServiceInit } from './fetch/index.js'
|
||||
|
||||
export interface PersistentPeerStoreOptions {
|
||||
threshold?: number
|
||||
@ -95,7 +97,6 @@ export interface RefreshManagerConfig {
|
||||
|
||||
export interface Libp2pInit {
|
||||
peerId: PeerId
|
||||
host: HostProperties
|
||||
addresses: AddressesConfig
|
||||
connectionManager: ConnectionManagerInit
|
||||
connectionGater: Partial<ConnectionGater>
|
||||
@ -105,9 +106,11 @@ export interface Libp2pInit {
|
||||
peerStore: PeerStoreInit
|
||||
peerRouting: PeerRoutingConfig
|
||||
keychain: KeychainConfig
|
||||
protocolPrefix: string
|
||||
nat: NatManagerConfig
|
||||
relay: RelayConfig
|
||||
identify: IdentifyServiceInit
|
||||
ping: PingServiceInit
|
||||
fetch: FetchServiceInit
|
||||
|
||||
transports: Transport[]
|
||||
streamMuxers?: StreamMuxerFactory[]
|
||||
@ -195,12 +198,12 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
|
||||
/**
|
||||
* Pings the given peer in order to obtain the operation latency
|
||||
*/
|
||||
ping: (peer: Multiaddr |PeerId) => Promise<number>
|
||||
ping: (peer: Multiaddr | PeerId, options?: AbortOptions) => Promise<number>
|
||||
|
||||
/**
|
||||
* Sends a request to fetch the value associated with the given key from the given peer.
|
||||
*/
|
||||
fetch: (peer: PeerId | Multiaddr | string, key: string) => Promise<Uint8Array | null>
|
||||
fetch: (peer: PeerId | Multiaddr | string, key: string, options?: AbortOptions) => Promise<Uint8Array | null>
|
||||
|
||||
/**
|
||||
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA' type
|
||||
|
@ -166,10 +166,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
if (init.streamMuxers != null && init.streamMuxers.length > 0) {
|
||||
// Add the identify service since we can multiplex
|
||||
this.identifyService = new IdentifyService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix,
|
||||
host: {
|
||||
agentVersion: init.host.agentVersion
|
||||
}
|
||||
...init.identify
|
||||
})
|
||||
this.configureComponent(this.identifyService)
|
||||
}
|
||||
@ -229,11 +226,11 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
}
|
||||
|
||||
this.fetchService = this.configureComponent(new FetchService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix
|
||||
...init.fetch
|
||||
}))
|
||||
|
||||
this.pingService = this.configureComponent(new PingService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix
|
||||
...init.ping
|
||||
}))
|
||||
|
||||
const autoDialer = this.configureComponent(new AutoDialer(this.components, {
|
||||
@ -419,9 +416,9 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
|
||||
}
|
||||
|
||||
const connection = await this.dial(peer)
|
||||
const connection = await this.dial(peer, options)
|
||||
|
||||
return await connection.newStream(protocols)
|
||||
return await connection.newStream(protocols, options)
|
||||
}
|
||||
|
||||
getMultiaddrs (): Multiaddr[] {
|
||||
@ -473,24 +470,24 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
throw errCode(new Error(`Node not responding with its public key: ${peer.toString()}`), codes.ERR_INVALID_RECORD)
|
||||
}
|
||||
|
||||
async fetch (peer: PeerId | Multiaddr | string, key: string): Promise<Uint8Array | null> {
|
||||
async fetch (peer: PeerId | Multiaddr | string, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
|
||||
const { id, multiaddrs } = getPeer(peer)
|
||||
|
||||
if (multiaddrs != null) {
|
||||
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
|
||||
}
|
||||
|
||||
return await this.fetchService.fetch(id, key)
|
||||
return await this.fetchService.fetch(id, key, options)
|
||||
}
|
||||
|
||||
async ping (peer: PeerId | Multiaddr | string): Promise<number> {
|
||||
async ping (peer: PeerId | Multiaddr | string, options: AbortOptions = {}): Promise<number> {
|
||||
const { id, multiaddrs } = getPeer(peer)
|
||||
|
||||
if (multiaddrs.length > 0) {
|
||||
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
|
||||
}
|
||||
|
||||
return await this.pingService.ping(id)
|
||||
return await this.pingService.ping(id, options)
|
||||
}
|
||||
|
||||
async handle (protocols: string | string[], handler: StreamHandler): Promise<void> {
|
||||
|
@ -94,10 +94,14 @@ export class NatManager implements Startable {
|
||||
return this.started
|
||||
}
|
||||
|
||||
start () {}
|
||||
|
||||
/**
|
||||
* Starts the NAT manager
|
||||
* Attempt to use uPnP to configure port mapping using the current gateway.
|
||||
*
|
||||
* Run after start to ensure the transport manager has all addresses configured.
|
||||
*/
|
||||
start () {
|
||||
afterStart () {
|
||||
if (isBrowser || !this.enabled || this.started) {
|
||||
return
|
||||
}
|
||||
@ -105,7 +109,7 @@ export class NatManager implements Startable {
|
||||
this.started = true
|
||||
|
||||
// done async to not slow down startup
|
||||
this._start().catch((err) => {
|
||||
void this._start().catch((err) => {
|
||||
// hole punching errors are non-fatal
|
||||
log.error(err)
|
||||
})
|
||||
|
@ -17,7 +17,6 @@ import {
|
||||
clearDelayedInterval
|
||||
// @ts-expect-error module with no types
|
||||
} from 'set-delayed-interval'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
|
||||
|
@ -10,6 +10,9 @@ import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
|
||||
const log = logger('libp2p:ping')
|
||||
|
||||
@ -18,8 +21,8 @@ export interface PingServiceInit {
|
||||
}
|
||||
|
||||
export class PingService implements Startable {
|
||||
public readonly protocol: string
|
||||
private readonly components: Components
|
||||
private readonly protocol: string
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: PingServiceInit) {
|
||||
@ -60,25 +63,36 @@ export class PingService implements Startable {
|
||||
* @param {PeerId|Multiaddr} peer
|
||||
* @returns {Promise<number>}
|
||||
*/
|
||||
async ping (peer: PeerId): Promise<number> {
|
||||
async ping (peer: PeerId, options: AbortOptions = {}): Promise<number> {
|
||||
log('dialing %s to %p', this.protocol, peer)
|
||||
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer)
|
||||
const { stream } = await connection.newStream([this.protocol])
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
const { stream } = await connection.newStream([this.protocol], options)
|
||||
const start = Date.now()
|
||||
const data = randomBytes(PING_LENGTH)
|
||||
|
||||
const result = await pipe(
|
||||
[data],
|
||||
stream,
|
||||
async (source) => await first(source)
|
||||
)
|
||||
const end = Date.now()
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
|
||||
if (result == null || !uint8ArrayEquals(data, result)) {
|
||||
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
|
||||
return end - start
|
||||
try {
|
||||
const result = await pipe(
|
||||
[data],
|
||||
source,
|
||||
async (source) => await first(source)
|
||||
)
|
||||
const end = Date.now()
|
||||
|
||||
if (result == null || !uint8ArrayEquals(data, result)) {
|
||||
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
|
||||
}
|
||||
|
||||
return end - start
|
||||
} finally {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { MultiaddrConnection, Upgrader, UpgraderEvents } from '@libp2p/interfaces/transport'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
|
||||
const log = logger('libp2p:upgrader')
|
||||
|
||||
@ -266,7 +267,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
} = opts
|
||||
|
||||
let muxer: StreamMuxer | undefined
|
||||
let newStream: ((multicodecs: string[]) => Promise<ProtocolStream>) | undefined
|
||||
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<ProtocolStream>) | undefined
|
||||
let connection: Connection // eslint-disable-line prefer-const
|
||||
|
||||
if (muxerFactory != null) {
|
||||
@ -308,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
}
|
||||
})
|
||||
|
||||
newStream = async (protocols: string[]): Promise<ProtocolStream> => {
|
||||
newStream = async (protocols: string[], options: AbortOptions = {}): Promise<ProtocolStream> => {
|
||||
if (muxer == null) {
|
||||
throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE)
|
||||
}
|
||||
@ -319,7 +320,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
const metrics = this.components.getMetrics()
|
||||
|
||||
try {
|
||||
let { stream, protocol } = await mss.select(protocols)
|
||||
let { stream, protocol } = await mss.select(protocols, options)
|
||||
|
||||
if (metrics != null) {
|
||||
stream = metrics.trackStream({ stream, remotePeer, protocol })
|
||||
@ -328,6 +329,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
return { stream: { ...muxedStream, ...stream }, protocol }
|
||||
} catch (err: any) {
|
||||
log.error('could not create new stream', err)
|
||||
|
||||
if (err.code != null) {
|
||||
throw err
|
||||
}
|
||||
|
||||
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||
}
|
||||
}
|
||||
@ -382,9 +388,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
getStreams: () => muxer != null ? muxer.streams : errConnectionNotMultiplexed(),
|
||||
close: async () => {
|
||||
await maConn.close()
|
||||
// Ensure remaining streams are aborted
|
||||
// Ensure remaining streams are closed
|
||||
if (muxer != null) {
|
||||
muxer.streams.map(stream => stream.abort())
|
||||
await Promise.all(muxer.streams.map(async stream => {
|
||||
await stream.close()
|
||||
}))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -18,13 +18,21 @@ describe('Protocol prefix is configurable', () => {
|
||||
it('protocolPrefix is provided', async () => {
|
||||
const testProtocol = 'test-protocol'
|
||||
libp2p = await createLibp2pNode(mergeOptions(baseOptions, {
|
||||
protocolPrefix: testProtocol
|
||||
identify: {
|
||||
protocolPrefix: testProtocol
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: testProtocol
|
||||
},
|
||||
fetch: {
|
||||
protocolPrefix: testProtocol
|
||||
}
|
||||
}))
|
||||
await libp2p.start()
|
||||
|
||||
const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId)
|
||||
expect(protocols).to.include.members([
|
||||
'/libp2p/fetch/0.0.1',
|
||||
`/${testProtocol}/fetch/0.0.1`,
|
||||
'/libp2p/circuit/relay/0.1.0',
|
||||
`/${testProtocol}/id/1.0.0`,
|
||||
`/${testProtocol}/id/push/1.0.0`,
|
||||
@ -41,7 +49,8 @@ describe('Protocol prefix is configurable', () => {
|
||||
'/libp2p/circuit/relay/0.1.0',
|
||||
'/ipfs/id/1.0.0',
|
||||
'/ipfs/id/push/1.0.0',
|
||||
'/ipfs/ping/1.0.0'
|
||||
'/ipfs/ping/1.0.0',
|
||||
'/libp2p/fetch/0.0.1'
|
||||
])
|
||||
})
|
||||
})
|
||||
|
@ -15,10 +15,11 @@ const error = new Error('dial failure')
|
||||
describe('Dial Request', () => {
|
||||
it('should end when a single multiaddr dials succeeds', async () => {
|
||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||
const deferredConn = pDefer()
|
||||
const actions: Record<string, () => Promise<any>> = {
|
||||
'/ip4/127.0.0.1/tcp/1231': async () => await Promise.reject(error),
|
||||
'/ip4/127.0.0.1/tcp/1232': async () => await Promise.resolve(connection),
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => await Promise.reject(error)
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => await deferredConn.promise
|
||||
}
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const controller = new AbortController()
|
||||
@ -32,15 +33,12 @@ describe('Dial Request', () => {
|
||||
dialAction
|
||||
})
|
||||
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1231')
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1232')
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1233')
|
||||
// Make sure that dial attempt comes back before terminating last dial action
|
||||
expect(await dialRequest.run({ signal: controller.signal })).to.equal(connection)
|
||||
|
||||
// End third dial attempt
|
||||
deferredConn.resolve()
|
||||
|
||||
const result = await dialRequest.run({ signal: controller.signal })
|
||||
expect(result).to.equal(connection)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||
})
|
||||
|
||||
@ -73,14 +71,16 @@ describe('Dial Request', () => {
|
||||
// Let the first dials run
|
||||
await delay(0)
|
||||
|
||||
// Finish the first 2 dials
|
||||
firstDials.reject(error)
|
||||
await delay(0)
|
||||
|
||||
// Only 1 dial should remain, so 1 token should have been released
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
||||
|
||||
// Finish the first 2 dials
|
||||
firstDials.reject(error)
|
||||
|
||||
await delay(0)
|
||||
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(1)
|
||||
|
||||
// Finish the dial and release the 2nd token
|
||||
@ -214,4 +214,45 @@ describe('Dial Request', () => {
|
||||
expect(dialerGetTokensSpy.calledWith(addrs.length)).to.equal(true)
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||
})
|
||||
|
||||
it('should abort other dials when one succeeds', async () => {
|
||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||
const actions: Record<string, () => Promise<any>> = {
|
||||
'/ip4/127.0.0.1/tcp/1231': async () => {
|
||||
await delay(100)
|
||||
},
|
||||
'/ip4/127.0.0.1/tcp/1232': async () => {
|
||||
// Successful dial takes longer to establish
|
||||
await delay(1000)
|
||||
|
||||
return connection
|
||||
},
|
||||
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => {
|
||||
await delay(100)
|
||||
}
|
||||
}
|
||||
|
||||
const signals: Record<string, AbortSignal | undefined> = {}
|
||||
|
||||
const dialRequest = new DialRequest({
|
||||
addrs: Object.keys(actions).map(str => new Multiaddr(str)),
|
||||
dialer: new Dialer({
|
||||
maxParallelDials: 3
|
||||
}),
|
||||
dialAction: async (ma, opts) => {
|
||||
signals[ma.toString()] = opts.signal
|
||||
return await actions[ma.toString()]()
|
||||
}
|
||||
})
|
||||
|
||||
await expect(dialRequest.run()).to.eventually.equal(connection)
|
||||
|
||||
// Dial attempt finished without connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', false)
|
||||
// Dial attempt led to connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
|
||||
// Dial attempt finished without connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', false)
|
||||
})
|
||||
})
|
||||
|
133
test/fetch/index.spec.ts
Normal file
133
test/fetch/index.spec.ts
Normal file
@ -0,0 +1,133 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { FetchService } from '../../src/fetch/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import { pipe } from 'it-pipe'
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('fetch', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to fetch from another peer', async () => {
|
||||
const key = 'key'
|
||||
const value = Uint8Array.from([0, 1, 2, 3, 4])
|
||||
const localFetch = new FetchService(localComponents, defaultInit)
|
||||
const remoteFetch = new FetchService(remoteComponents, defaultInit)
|
||||
|
||||
remoteFetch.registerLookupFunction(key, async (identifier) => {
|
||||
expect(identifier).to.equal(key)
|
||||
|
||||
return value
|
||||
})
|
||||
|
||||
await start(localFetch)
|
||||
await start(remoteFetch)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// Run fetch
|
||||
const result = await localFetch.fetch(remoteComponents.getPeerId(), key)
|
||||
|
||||
expect(result).to.equalBytes(value)
|
||||
})
|
||||
|
||||
it('should time out fetching from another peer when waiting for the record', async () => {
|
||||
const key = 'key'
|
||||
const localFetch = new FetchService(localComponents, defaultInit)
|
||||
const remoteFetch = new FetchService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localFetch)
|
||||
await start(remoteFetch)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol)
|
||||
await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
for await (const chunk of source) {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// Run fetch, should time out
|
||||
await expect(localFetch.fetch(remoteComponents.getPeerId(), key, {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
})
|
@ -1,19 +1,16 @@
|
||||
/* eslint-env mocha */
|
||||
/* eslint max-nested-callbacks: ["error", 6] */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { codes } from '../../src/errors.js'
|
||||
import { IdentifyService, Message } from '../../src/identify/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { createLibp2pNode } from '../../src/libp2p.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { createBaseOptions } from '../utils/base-options.browser.js'
|
||||
import { DefaultAddressManager } from '../../src/address-manager/index.js'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import drain from 'it-drain'
|
||||
import { pipe } from 'it-pipe'
|
||||
@ -27,14 +24,11 @@ import {
|
||||
} from '../../src/identify/consts.js'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import delay from 'delay'
|
||||
import pWaitFor from 'p-wait-for'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import pDefer from 'p-defer'
|
||||
|
||||
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
@ -75,18 +69,16 @@ async function createComponents (index: number) {
|
||||
return components
|
||||
}
|
||||
|
||||
describe('Identify', () => {
|
||||
describe('identify', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
let localPeerRecordUpdater: PeerRecordUpdater
|
||||
let remotePeerRecordUpdater: PeerRecordUpdater
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
|
||||
remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents)
|
||||
|
||||
await Promise.all([
|
||||
@ -238,355 +230,148 @@ describe('Identify', () => {
|
||||
await stop(localIdentify)
|
||||
})
|
||||
|
||||
describe('push', () => {
|
||||
it('should be able to push identify updates to another peer', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
it('should time out during identify', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY)
|
||||
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
// we receive no data in the identify protocol, we just send our data
|
||||
await drain(source)
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to update the peer record and send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// ensure sequence number of peer record we are about to create is different
|
||||
await delay(1000)
|
||||
|
||||
// make sure we have a peer record to send
|
||||
await localPeerRecordUpdater.update()
|
||||
|
||||
// wait for the remote peer store to notice the changes
|
||||
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
|
||||
|
||||
// push updated peer record to connections
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await eventPromise
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: true
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
yield new Uint8Array()
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
// LEGACY
|
||||
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
// Run identify
|
||||
await expect(localIdentify.identify(localToRemote, {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// wait until remote peer store notices protocol list update
|
||||
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
|
||||
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await waitForUpdate
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: false
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
})
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
|
||||
describe('libp2p.dialer.identifyService', () => {
|
||||
let peerId: PeerId
|
||||
let libp2p: Libp2pNode
|
||||
let remoteLibp2p: Libp2pNode
|
||||
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
it('should limit incoming identify message sizes', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
before(async () => {
|
||||
peerId = await createFromJSON(Peers[0])
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, {
|
||||
...defaultInit,
|
||||
maxIdentifyMessageSize: 100
|
||||
})
|
||||
await start(remoteIdentify)
|
||||
|
||||
const identifySpy = sinon.spy(remoteIdentify, 'identify')
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// handle incoming identify requests and send too much data
|
||||
await localComponents.getRegistrar().handle('/ipfs/id/1.0.0', ({ stream }) => {
|
||||
const data = new Uint8Array(1024)
|
||||
|
||||
void Promise.resolve().then(async () => {
|
||||
await pipe(
|
||||
[data],
|
||||
lp.encode(),
|
||||
stream,
|
||||
async (source) => await drain(source)
|
||||
)
|
||||
|
||||
deferred.resolve()
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
if (libp2p != null) {
|
||||
await libp2p.stop()
|
||||
}
|
||||
await deferred.promise
|
||||
await stop(remoteIdentify)
|
||||
|
||||
expect(identifySpy.called).to.be.true()
|
||||
|
||||
await expect(identifySpy.getCall(0).returnValue)
|
||||
.to.eventually.be.rejected.with.property('code', 'ERR_MSG_DATA_TOO_LONG')
|
||||
})
|
||||
|
||||
it('should time out incoming identify messages', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, {
|
||||
...defaultInit,
|
||||
timeout: 100
|
||||
})
|
||||
await start(remoteIdentify)
|
||||
|
||||
const identifySpy = sinon.spy(remoteIdentify, 'identify')
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// handle incoming identify requests and don't send anything
|
||||
await localComponents.getRegistrar().handle('/ipfs/id/1.0.0', ({ stream }) => {
|
||||
const data = new Uint8Array(1024)
|
||||
|
||||
void Promise.resolve().then(async () => {
|
||||
await pipe(
|
||||
[data],
|
||||
lp.encode(),
|
||||
async (source) => {
|
||||
await stream.sink(async function * () {
|
||||
for await (const buf of source) {
|
||||
// don't send all of the data, remote will expect another message
|
||||
yield buf.slice(0, buf.length - 100)
|
||||
|
||||
// wait for longer than the timeout without sending any more data or closing the stream
|
||||
await delay(500)
|
||||
}
|
||||
}())
|
||||
}
|
||||
)
|
||||
|
||||
deferred.resolve()
|
||||
})
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
if (remoteLibp2p != null) {
|
||||
await remoteLibp2p.stop()
|
||||
}
|
||||
})
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
it('should run identify automatically after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
await deferred.promise
|
||||
await stop(remoteIdentify)
|
||||
|
||||
await libp2p.start()
|
||||
expect(identifySpy.called).to.be.true()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
})
|
||||
|
||||
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
|
||||
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
|
||||
|
||||
expect(storedAgentVersion).to.exist()
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push protocol updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.handle('/echo/2.0.0', () => {})
|
||||
await libp2p.unhandle('/echo/2.0.0')
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 2)
|
||||
|
||||
// Verify the remote peer is notified of both changes
|
||||
expect(identityServicePushSpy.callCount).to.equal(2)
|
||||
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
|
||||
it('should store host data and protocol version into metadataBook', async () => {
|
||||
const agentVersion = 'js-project/1.0.0'
|
||||
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId,
|
||||
host: {
|
||||
agentVersion
|
||||
}
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
|
||||
|
||||
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push multiaddr updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 1)
|
||||
|
||||
// Verify the remote peer is notified of change
|
||||
expect(identityServicePushSpy.callCount).to.equal(1)
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
await expect(identifySpy.getCall(0).returnValue)
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
})
|
||||
})
|
||||
|
296
test/identify/push.spec.ts
Normal file
296
test/identify/push.spec.ts
Normal file
@ -0,0 +1,296 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { IdentifyService } from '../../src/identify/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DefaultAddressManager } from '../../src/address-manager/index.js'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import drain from 'it-drain'
|
||||
import { pipe } from 'it-pipe'
|
||||
import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { PeerRecordUpdater } from '../../src/peer-record-updater.js'
|
||||
import {
|
||||
MULTICODEC_IDENTIFY,
|
||||
MULTICODEC_IDENTIFY_PUSH
|
||||
} from '../../src/identify/consts.js'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import delay from 'delay'
|
||||
import { pEvent } from 'p-event'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
|
||||
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs',
|
||||
host: {
|
||||
agentVersion: 'v1.0.0'
|
||||
}
|
||||
}
|
||||
|
||||
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
datastore: new MemoryDatastore(),
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater(),
|
||||
peerStore: new PersistentPeerStore(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
components.setAddressManager(new DefaultAddressManager(components, {
|
||||
announce: listenMaddrs.map(ma => ma.toString())
|
||||
}))
|
||||
|
||||
const transportManager = new DefaultTransportManager(components)
|
||||
components.setTransportManager(transportManager)
|
||||
|
||||
await components.getPeerStore().protoBook.set(peerId, protocols)
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('identify (push)', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
let localPeerRecordUpdater: PeerRecordUpdater
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to push identify updates to another peer', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to update the peer record and send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// ensure sequence number of peer record we are about to create is different
|
||||
await delay(1000)
|
||||
|
||||
// make sure we have a peer record to send
|
||||
await localPeerRecordUpdater.update()
|
||||
|
||||
// wait for the remote peer store to notice the changes
|
||||
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
|
||||
|
||||
// push updated peer record to connections
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await eventPromise
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: true
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
})
|
||||
|
||||
it('should time out during push identify', async () => {
|
||||
let streamEnded = false
|
||||
const localIdentify = new IdentifyService(localComponents, {
|
||||
...defaultInit,
|
||||
timeout: 10
|
||||
})
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY_PUSH)
|
||||
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY_PUSH, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
// ignore the sent data
|
||||
await drain(source)
|
||||
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
// the delay should have caused the local push to time out so this should
|
||||
// occur after the local push method invocation has completed
|
||||
streamEnded = true
|
||||
|
||||
yield new Uint8Array()
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// push updated peer record to remote
|
||||
await localIdentify.push([localToRemote])
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
|
||||
// method should have returned before the remote handler completes as we timed
|
||||
// out so we ignore the return value
|
||||
expect(streamEnded).to.be.false()
|
||||
})
|
||||
|
||||
// LEGACY
|
||||
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// wait until remote peer store notices protocol list update
|
||||
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
|
||||
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await waitForUpdate
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: false
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
})
|
||||
})
|
216
test/identify/service.spec.ts
Normal file
216
test/identify/service.spec.ts
Normal file
@ -0,0 +1,216 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { createLibp2pNode } from '../../src/libp2p.js'
|
||||
import { createBaseOptions } from '../utils/base-options.browser.js'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import pWaitFor from 'p-wait-for'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
|
||||
describe('libp2p.dialer.identifyService', () => {
|
||||
let peerId: PeerId
|
||||
let libp2p: Libp2pNode
|
||||
let remoteLibp2p: Libp2pNode
|
||||
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
|
||||
before(async () => {
|
||||
peerId = await createFromJSON(Peers[0])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
if (libp2p != null) {
|
||||
await libp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
if (remoteLibp2p != null) {
|
||||
await remoteLibp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
it('should run identify automatically after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
})
|
||||
|
||||
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
|
||||
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
|
||||
|
||||
expect(storedAgentVersion).to.exist()
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push protocol updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.handle('/echo/2.0.0', () => {})
|
||||
await libp2p.unhandle('/echo/2.0.0')
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 2)
|
||||
|
||||
// Verify the remote peer is notified of both changes
|
||||
expect(identityServicePushSpy.callCount).to.equal(2)
|
||||
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
|
||||
it('should store host data and protocol version into metadataBook', async () => {
|
||||
const agentVersion = 'js-project/1.0.0'
|
||||
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId,
|
||||
identify: {
|
||||
host: {
|
||||
agentVersion
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
|
||||
|
||||
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push multiaddr updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 1)
|
||||
|
||||
// Verify the remote peer is notified of change
|
||||
expect(identityServicePushSpy.callCount).to.equal(1)
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
})
|
@ -13,6 +13,7 @@ import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import type { NatAPI } from '@achingbrain/nat-port-mapper'
|
||||
import { StubbedInstance, stubInterface } from 'ts-sinon'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
|
||||
const DEFAULT_ADDRESSES = [
|
||||
'/ip4/127.0.0.1/tcp/0',
|
||||
@ -49,7 +50,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
await components.getTransportManager().listen(components.getAddressManager().getListenAddrs())
|
||||
|
||||
teardown.push(async () => {
|
||||
await natManager.stop()
|
||||
await stop(natManager)
|
||||
await components.getTransportManager().removeAll()
|
||||
})
|
||||
|
||||
@ -78,7 +79,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.not.be.empty()
|
||||
@ -127,7 +128,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
enabled: false
|
||||
})
|
||||
|
||||
natManager.start()
|
||||
await start(natManager)
|
||||
|
||||
await delay(100)
|
||||
|
||||
@ -146,7 +147,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@ -163,7 +164,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@ -180,7 +181,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@ -197,7 +198,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@ -214,7 +215,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
122
test/ping/index.spec.ts
Normal file
122
test/ping/index.spec.ts
Normal file
@ -0,0 +1,122 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { PingService } from '../../src/ping/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import { pipe } from 'it-pipe'
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('ping', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to ping another peer', async () => {
|
||||
const localPing = new PingService(localComponents, defaultInit)
|
||||
const remotePing = new PingService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localPing)
|
||||
await start(remotePing)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// Run ping
|
||||
await expect(localPing.ping(remoteComponents.getPeerId())).to.eventually.be.gte(0)
|
||||
})
|
||||
|
||||
it('should time out pinging another peer when waiting for a pong', async () => {
|
||||
const localPing = new PingService(localComponents, defaultInit)
|
||||
const remotePing = new PingService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localPing)
|
||||
await start(remotePing)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(remotePing.protocol)
|
||||
await remoteComponents.getRegistrar().handle(remotePing.protocol, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
for await (const chunk of source) {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// Run ping, should time out
|
||||
await expect(localPing.ping(remoteComponents.getPeerId(), {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
})
|
@ -14,7 +14,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import swarmKey from '../fixtures/swarm.key.js'
|
||||
import { DefaultUpgrader } from '../../src/upgrader.js'
|
||||
import { codes } from '../../src/errors.js'
|
||||
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import type { Upgrader } from '@libp2p/interfaces/transport'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
@ -27,6 +27,9 @@ import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import pDefer from 'p-defer'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import drain from 'it-drain'
|
||||
|
||||
const addrs = [
|
||||
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
|
||||
@ -35,6 +38,7 @@ const addrs = [
|
||||
|
||||
describe('Upgrader', () => {
|
||||
let localUpgrader: Upgrader
|
||||
let localMuxerFactory: StreamMuxerFactory
|
||||
let remoteUpgrader: Upgrader
|
||||
let localPeer: PeerId
|
||||
let remotePeer: PeerId
|
||||
@ -55,12 +59,13 @@ describe('Upgrader', () => {
|
||||
connectionGater: mockConnectionGater(),
|
||||
registrar: mockRegistrar()
|
||||
})
|
||||
localMuxerFactory = new Mplex()
|
||||
localUpgrader = new DefaultUpgrader(localComponents, {
|
||||
connectionEncryption: [
|
||||
new Plaintext()
|
||||
],
|
||||
muxers: [
|
||||
new Mplex()
|
||||
localMuxerFactory
|
||||
]
|
||||
})
|
||||
|
||||
@ -366,6 +371,40 @@ describe('Upgrader', () => {
|
||||
expect(result).to.have.nested.property('reason.code', codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||
})
|
||||
})
|
||||
|
||||
it('should abort protocol selection for slow streams', async () => {
|
||||
const createStreamMuxerSpy = sinon.spy(localMuxerFactory, 'createStreamMuxer')
|
||||
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||
|
||||
const connections = await Promise.all([
|
||||
localUpgrader.upgradeOutbound(outbound),
|
||||
remoteUpgrader.upgradeInbound(inbound)
|
||||
])
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// should have created muxer for connection
|
||||
expect(createStreamMuxerSpy).to.have.property('callCount', 1)
|
||||
|
||||
// create mock muxed stream that never sends data
|
||||
const muxer = createStreamMuxerSpy.getCall(0).returnValue
|
||||
muxer.newStream = () => {
|
||||
return mockStream({
|
||||
source: (async function * () {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
yield new Uint8Array()
|
||||
}()),
|
||||
sink: drain
|
||||
})
|
||||
}
|
||||
|
||||
await expect(connections[0].newStream('/echo/1.0.0', {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
})
|
||||
})
|
||||
|
||||
describe('libp2p.upgrader', () => {
|
||||
|
Reference in New Issue
Block a user