mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-16 17:11:57 +00:00
Compare commits
17 Commits
fix/load-k
...
v0.37.1
Author | SHA1 | Date | |
---|---|---|---|
|
4c0c2c6d3e | ||
|
a1220d22f5 | ||
|
5934b13cce | ||
|
b09eb8fc53 | ||
|
35f9c0c793 | ||
|
d5386df684 | ||
|
1f5d5c2de1 | ||
|
7678156cf3 | ||
|
f2fd4e30ff | ||
|
31480603f3 | ||
|
4837430d8b | ||
|
da3d19b309 | ||
|
a15254fdd4 | ||
|
d16817ca44 | ||
|
fab4f1385c | ||
|
5397137c65 | ||
|
147304449e |
@@ -11,6 +11,7 @@ export default {
|
||||
},
|
||||
test: {
|
||||
before: async () => {
|
||||
// use dynamic import because we only want to reference these files during the test run, e.g. after building
|
||||
const { createLibp2p } = await import('./dist/src/index.js')
|
||||
const { MULTIADDRS_WEBSOCKETS } = await import('./dist/test/fixtures/browser.js')
|
||||
const { Plaintext } = await import('./dist/src/insecure/index.js')
|
||||
|
36
CHANGELOG.md
36
CHANGELOG.md
@@ -10,6 +10,42 @@
|
||||
|
||||
|
||||
|
||||
### [0.37.1](https://www.github.com/libp2p/js-libp2p/compare/v0.37.0...v0.37.1) (2022-05-25)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* do upnp hole punch after startup ([#1217](https://www.github.com/libp2p/js-libp2p/issues/1217)) ([d5386df](https://www.github.com/libp2p/js-libp2p/commit/d5386df68478a71ac269acb2d00d36a7a5c9ebc5))
|
||||
* explicitly close streams when connnections close ([#1221](https://www.github.com/libp2p/js-libp2p/issues/1221)) ([b09eb8f](https://www.github.com/libp2p/js-libp2p/commit/b09eb8fc53ec1d8f6280d681c9ca6a467ec259b5))
|
||||
* fix unintended aborts in dialer ([#1185](https://www.github.com/libp2p/js-libp2p/issues/1185)) ([35f9c0c](https://www.github.com/libp2p/js-libp2p/commit/35f9c0c79387232465848b450a47cafe841405e7))
|
||||
* time out slow reads ([#1227](https://www.github.com/libp2p/js-libp2p/issues/1227)) ([a1220d2](https://www.github.com/libp2p/js-libp2p/commit/a1220d22f5affb64e64dec0cd6a92cd8241b26df))
|
||||
|
||||
## [0.37.0](https://www.github.com/libp2p/js-libp2p/compare/v0.36.2...v0.37.0) (2022-05-16)
|
||||
|
||||
|
||||
### ⚠ BREAKING CHANGES
|
||||
|
||||
* types are no longer hand crafted, this module is now ESM only
|
||||
|
||||
### Features
|
||||
|
||||
* convert to typescript ([#1172](https://www.github.com/libp2p/js-libp2p/issues/1172)) ([199395d](https://www.github.com/libp2p/js-libp2p/commit/199395de4d8139cc77d0b408626f37c9b8520d28))
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* add transport manager to exports map and fix docs ([#1182](https://www.github.com/libp2p/js-libp2p/issues/1182)) ([cc60cfd](https://www.github.com/libp2p/js-libp2p/commit/cc60cfde1a0907ca68f658f6de5362a708189222))
|
||||
* emit peer:connect after all ([#1171](https://www.github.com/libp2p/js-libp2p/issues/1171)) ([d16817c](https://www.github.com/libp2p/js-libp2p/commit/d16817ca443443e88803ee8096d45debb14af91b))
|
||||
* encode enums correctly ([#1210](https://www.github.com/libp2p/js-libp2p/issues/1210)) ([4837430](https://www.github.com/libp2p/js-libp2p/commit/4837430d8bcdbee0865eeba6fe694bc71fc6c9bb))
|
||||
* expose getPublicKey ([#1188](https://www.github.com/libp2p/js-libp2p/issues/1188)) ([1473044](https://www.github.com/libp2p/js-libp2p/commit/147304449e5f8d3acb8b00bdd9588b56830667c6))
|
||||
* expose metrics and registrar, use dht for peer discovery ([#1183](https://www.github.com/libp2p/js-libp2p/issues/1183)) ([64bfcee](https://www.github.com/libp2p/js-libp2p/commit/64bfcee5093b368df0b381f78afc2ddff3d339a9))
|
||||
* simplify pnet exports ([#1213](https://www.github.com/libp2p/js-libp2p/issues/1213)) ([3148060](https://www.github.com/libp2p/js-libp2p/commit/31480603f3e17d838d2685573995218a1e678e7a))
|
||||
* update deps ([#1181](https://www.github.com/libp2p/js-libp2p/issues/1181)) ([8cca8e4](https://www.github.com/libp2p/js-libp2p/commit/8cca8e4bfc6a339e58b5a5efa8a84fd891aa08ee))
|
||||
* update interfaces ([#1207](https://www.github.com/libp2p/js-libp2p/issues/1207)) ([da3d19b](https://www.github.com/libp2p/js-libp2p/commit/da3d19b30977fd2c7e77d92aa8914b13e3179aaa))
|
||||
* update pubsub interfaces ([#1194](https://www.github.com/libp2p/js-libp2p/issues/1194)) ([fab4f13](https://www.github.com/libp2p/js-libp2p/commit/fab4f1385cf61b7b16719b9aacdfe03146a3f260))
|
||||
* update to new interfaces ([#1206](https://www.github.com/libp2p/js-libp2p/issues/1206)) ([a15254f](https://www.github.com/libp2p/js-libp2p/commit/a15254fdd478a336edf1e1196b721dc56888b2ea))
|
||||
* use placeholder dht/pubsub ([#1193](https://www.github.com/libp2p/js-libp2p/issues/1193)) ([5397137](https://www.github.com/libp2p/js-libp2p/commit/5397137c654dfdec431e0c9ba4b1ff9dee19abf1))
|
||||
|
||||
### [0.36.2](https://www.github.com/libp2p/js-libp2p/compare/v0.36.1...v0.36.2) (2022-01-26)
|
||||
|
||||
|
||||
|
@@ -97,7 +97,9 @@ Creates an instance of Libp2p.
|
||||
| options.modules | [`Array<object>`](./CONFIGURATION.md#modules) | libp2p [modules](./CONFIGURATION.md#modules) to use |
|
||||
| [options.addresses] | `{ listen: Array<string>, announce: Array<string>, announceFilter: (ma: Array<multiaddr>) => Array<multiaddr> }` | Addresses for transport listening and to advertise to the network |
|
||||
| [options.config] | `object` | libp2p modules configuration and core configuration |
|
||||
| [options.host] | `{ agentVersion: string }` | libp2p host options |
|
||||
| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number }` | libp2p identify protocol options |
|
||||
| [options.ping] | `{ protocolPrefix: string }` | libp2p ping protocol options |
|
||||
| [options.fetch] | `{ protocolPrefix: string }` | libp2p fetch protocol options |
|
||||
| [options.connectionManager] | [`object`](./CONFIGURATION.md#configuring-connection-manager) | libp2p Connection Manager [configuration](./CONFIGURATION.md#configuring-connection-manager) |
|
||||
| [options.transportManager] | [`object`](./CONFIGURATION.md#configuring-transport-manager) | libp2p transport manager [configuration](./CONFIGURATION.md#configuring-transport-manager) |
|
||||
| [options.datastore] | `object` | must implement [ipfs/interface-datastore](https://github.com/ipfs/interface-datastore) (in memory datastore will be used if not provided) |
|
||||
|
@@ -885,7 +885,12 @@ Changing the protocol name prefix can isolate default public network (IPFS) for
|
||||
|
||||
```js
|
||||
const node = await createLibp2p({
|
||||
protocolPrefix: 'ipfs' // default
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs' // default
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: 'ipfs' // default
|
||||
}
|
||||
})
|
||||
/*
|
||||
protocols: [
|
||||
|
262
doc/migrations/v0.36-v.037.md
Normal file
262
doc/migrations/v0.36-v.037.md
Normal file
@@ -0,0 +1,262 @@
|
||||
<!--Specify versions for migration below-->
|
||||
# Migrating to libp2p@37 <!-- omit in toc -->
|
||||
|
||||
A migration guide for refactoring your application code from libp2p v0.36.x to v0.37.0.
|
||||
|
||||
## Table of Contents <!-- omit in toc -->
|
||||
|
||||
- [ESM](#esm)
|
||||
- [TypeScript](#typescript)
|
||||
- [Config](#config)
|
||||
- [Bundled modules](#bundled-modules)
|
||||
- [Events](#events)
|
||||
- [Pubsub](#pubsub)
|
||||
|
||||
## ESM
|
||||
|
||||
The biggest change to `libp2p@0.37.0` is that the module is now [ESM-only](https://gist.github.com/sindresorhus/a39789f98801d908bbc7ff3ecc99d99c).
|
||||
|
||||
ESM is the module system for JavaScript, it allows us to structure our code in separate files without polluting a global namespace.
|
||||
|
||||
Other systems have tried to fill this gap, notably CommonJS, AMD, RequireJS and others, but ESM is [the official standard format](https://tc39.es/ecma262/#sec-modules) to package JavaScript code for reuse.
|
||||
|
||||
## TypeScript
|
||||
|
||||
The core `libp2p` module and all supporting modules have now been ported to TypeScript in a complete ground-up rewrite. This will not have a huge impact on most application code, but those that are type-aware, either by being written in TypeScript themselves or using JSDoc comments will notice full type completion and better error message when coding against the libp2p API.
|
||||
|
||||
To reflect the updated nature of these modules, all ecosystem modules have been moved to the `@libp2p` org on npm, so `libp2p-tcp` has become `@libp2p/tcp`, `libp2p-mplex` has become `@libp2p/mplex` and so on. `@chainsafe/libp2p-noise` and `libp2p-gossipsub` are unaffected.
|
||||
|
||||
## Config
|
||||
|
||||
Because libp2p is now fully typed it was necessary to refactor the configuration object passed to the libp2p constructor. The reason being, it previously accepted config objects to pass to the constructors of the various modules - to type those we'd need to know the types of all possible modules in advance which isn't possible.
|
||||
|
||||
The following changes have been made to the configuration object:
|
||||
|
||||
1. It now takes instances of modules rather than their classes
|
||||
2. Keys from the `config` and `modules` objects have been migrated to the root of the object
|
||||
3. Use of the `enabled` flag has been removed - if you don't want a particular feature enabled, don't pass a module implementing that feature
|
||||
4. Some keys have been renamed = `transport` -> `transports`, `streamMuxer` -> `streamMuxers`, `connEncryption` -> `connectionEncryption`, etc
|
||||
5. Keys from `config.dialer` have been moved to `config.connectionManager` as the connection manager is now responsible for managing connections
|
||||
6. The `protocolPrefix` configuration option is now passed on a per-protocol basis for `identify`, `fetch` and `ping`
|
||||
|
||||
**Before**
|
||||
|
||||
```js
|
||||
import Libp2p from 'libp2p'
|
||||
import TCP from 'libp2p-tcp'
|
||||
import Mplex from 'libp2p-mplex'
|
||||
import { NOISE } from '@chainsafe/libp2p-noise'
|
||||
import Gossipsub from 'libp2p-gossipsub'
|
||||
import KadDHT from 'libp2p-kad-dht'
|
||||
import Bootstrap from 'libp2p-bootstrap'
|
||||
import MulticastDNS from 'libp2p-mdns'
|
||||
|
||||
const node = await Libp2p.create({
|
||||
addresses: {
|
||||
listen: ['/ip4/127.0.0.1/tcp/8000']
|
||||
},
|
||||
modules: {
|
||||
transport: [
|
||||
TCP
|
||||
],
|
||||
streamMuxer: [
|
||||
Mplex
|
||||
],
|
||||
connEncryption: [
|
||||
NOISE
|
||||
],
|
||||
dht: KadDHT,
|
||||
pubsub: Gossipsub,
|
||||
peerDiscovery: [
|
||||
Bootstrap,
|
||||
MulticastDNS
|
||||
]
|
||||
},
|
||||
protocolPrefix: 'ipfs',
|
||||
config: {
|
||||
peerDiscovery: {
|
||||
autoDial: true,
|
||||
[MulticastDNS.tag]: {
|
||||
interval: 1000,
|
||||
enabled: true
|
||||
},
|
||||
[Bootstrap.tag]: {
|
||||
list: [
|
||||
// .. multiaddrs here
|
||||
],
|
||||
interval: 2000,
|
||||
enabled: true
|
||||
}
|
||||
},
|
||||
dialer: {
|
||||
dialTimeout: 60000
|
||||
}
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
**After**
|
||||
|
||||
```js
|
||||
import { createLibp2p } from 'libp2p'
|
||||
import { TCP } from '@libp2p/tcp'
|
||||
import { Mplex } from '@libp2p/mplex'
|
||||
import { Noise } from '@chainsafe/libp2p-noise'
|
||||
import Gossipsub from '@chainsafe/libp2p-gossipsub'
|
||||
import { KadDHT } from '@libp2p/kad-dht'
|
||||
import { Bootstrap } from '@libp2p/bootstrap'
|
||||
import { MulticastDNS } from '@libp2p/mdns'
|
||||
|
||||
const node = await createLibp2p({
|
||||
addresses: {
|
||||
listen: ['/ip4/127.0.0.1/tcp/8000']
|
||||
},
|
||||
addressManager: {
|
||||
autoDial: true
|
||||
},
|
||||
connectionManager: {
|
||||
dialTimeout: 60000
|
||||
},
|
||||
transports: [
|
||||
new TCP()
|
||||
],
|
||||
streamMuxers: [
|
||||
new Mplex()
|
||||
],
|
||||
connectionEncryption: [
|
||||
new Noise()
|
||||
],
|
||||
dht: new KadDHT(),
|
||||
pubsub: new Gossipsub(),
|
||||
peerDiscovery: [
|
||||
new Bootstrap({
|
||||
list: [
|
||||
// .. multiaddrs here
|
||||
],
|
||||
interval: 2000
|
||||
}),
|
||||
new MulticastDNS({
|
||||
interval: 1000
|
||||
})
|
||||
],
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
## Bundled modules
|
||||
|
||||
Previously you'd have to use deep import paths to get at bundled modules such as the private network module.
|
||||
|
||||
Access to these modules is now controlled by the package.json export map so your import paths will need to be updated:
|
||||
|
||||
**Before**
|
||||
|
||||
```js
|
||||
import plaintext from 'libp2p/src/insecure/plaintext.js'
|
||||
import Protector from 'libp2p/src/pnet/index.js'
|
||||
import generateKey from 'libp2p/src/pnet/key-generator.js'
|
||||
import TransportManager from 'libp2p/src/transport-manager.js'
|
||||
```
|
||||
|
||||
**After**
|
||||
|
||||
```js
|
||||
import { Plaintext } from 'libp2p/insecure'
|
||||
import { PreSharedKeyConnectionProtector, generateKey } from 'libp2p/pnet'
|
||||
import { TransportManager } from 'libp2p/transport-manager'
|
||||
```
|
||||
|
||||
## Events
|
||||
|
||||
To reduce our dependency on Node.js internals, use of [EventEmitter](https://nodejs.org/api/events.html#class-eventemitter) has been replaced with the standard [EventTarget](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget).
|
||||
|
||||
The EventTarget API is very similar to [HTML DOM Events](https://developer.mozilla.org/en-US/docs/Web/API/Event) used by the browser.
|
||||
|
||||
All events are instances of the [CustomEvent](https://developer.mozilla.org/en-US/docs/Web/API/CustomEvent) class. Event-specific information can be accessed via the `.detail` property of the passed event.
|
||||
|
||||
They type of event emitted can be inferred from the types for each event emitter.
|
||||
|
||||
**Before**
|
||||
|
||||
```js
|
||||
const handler = (peerInfo) => {
|
||||
//...
|
||||
}
|
||||
|
||||
// listen for event
|
||||
libp2p.on('peer:discovery', handler)
|
||||
|
||||
// stop listening for event
|
||||
libp2p.removeListener('peer:discovery', handler)
|
||||
libp2p.off('peer:discovery', handler)
|
||||
```
|
||||
|
||||
**After**
|
||||
|
||||
```js
|
||||
const handler = (event) => {
|
||||
const peerInfo = event.detail
|
||||
//...
|
||||
}
|
||||
|
||||
// listen for event
|
||||
libp2p.addEventListener('peer:discovery', handler)
|
||||
|
||||
// stop listening for event
|
||||
libp2p.removeEventListener('peer:discovery', handler)
|
||||
```
|
||||
|
||||
## Pubsub
|
||||
|
||||
Similar to the events refactor above, pubsub is now driven by the standard [EventTarget](https://developer.mozilla.org/en-US/docs/Web/API/EventTarget) API.
|
||||
|
||||
You can still subscribe to events without a listener with `.subscribe` but all other uses now use the standard API.
|
||||
|
||||
Similar to the other events emitted by libp2p the event type is [CustomEvent](https://developer.mozilla.org/en-US/docs/Web/API/CustomEvent). This is part of the js language but at the time of writing Node.js [does not support](https://github.com/nodejs/node/issues/40678) `CustomEvent`, so a polyfill is supplied as part of the `@libp2p/interfaces`
|
||||
|
||||
**Before**
|
||||
|
||||
```js
|
||||
const handler = (message: Message) => {
|
||||
const topic = message.topic
|
||||
|
||||
//...
|
||||
}
|
||||
|
||||
// listen for event
|
||||
libp2p.pubsub.subscribe('my-topic')
|
||||
libp2p.pubsub.on('my-topic', handler)
|
||||
|
||||
// send event
|
||||
libp2p.pubsub.emit('my-topic', Uint8Array.from([0, 1, 2, 3]))
|
||||
|
||||
// stop listening for event
|
||||
libp2p.unsubscribe('my-topic', handler)
|
||||
libp2p.pubsub.off('my-topic', handler)
|
||||
```
|
||||
|
||||
**After**
|
||||
|
||||
```js
|
||||
import type { Message } from '@libp2p/interfaces/pubsub'
|
||||
|
||||
const handler = (event: CustomEvent<Message>) => {
|
||||
const message = event.detail
|
||||
const topic = message.topic
|
||||
|
||||
//...
|
||||
}
|
||||
|
||||
// listen for event
|
||||
libp2p.pubsub.subscribe('my-topic')
|
||||
libp2p.pubsub.addEventListener('message', handler)
|
||||
|
||||
// send event
|
||||
libp2p.pubsub.publish('my-topic', Uint8Array.from([0, 1, 2, 3]))
|
||||
|
||||
// stop listening for event
|
||||
libp2p.pubsub.unsubscribe('my-topic')
|
||||
libp2p.pubsub.removeEventListener('message', handler)
|
||||
```
|
@@ -3,15 +3,15 @@
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^6.0.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"ipfs-core": "^0.14.1",
|
||||
"libp2p": "../../",
|
||||
"@libp2p/delegated-content-routing": "^1.0.1",
|
||||
"@libp2p/delegated-peer-routing": "^1.0.1",
|
||||
"@libp2p/kad-dht": "^1.0.1",
|
||||
"@libp2p/mplex": "^1.0.2",
|
||||
"@libp2p/webrtc-star": "^1.0.6",
|
||||
"@libp2p/websockets": "^1.0.3",
|
||||
"@libp2p/kad-dht": "^1.0.9",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"@libp2p/webrtc-star": "^1.0.8",
|
||||
"@libp2p/websockets": "^1.0.7",
|
||||
"react": "^17.0.2",
|
||||
"react-dom": "^17.0.2",
|
||||
"react-scripts": "5.0.0"
|
||||
|
@@ -4,7 +4,7 @@ import { createLibp2p } from 'libp2p'
|
||||
import { TCP } from '@libp2p/tcp'
|
||||
import { Mplex } from '@libp2p/mplex'
|
||||
import { Noise } from '@chainsafe/libp2p-noise'
|
||||
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
|
||||
import { FloodSub } from '@libp2p/floodsub'
|
||||
import { Bootstrap } from '@libp2p/bootstrap'
|
||||
import { PubSubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
|
||||
|
||||
@@ -16,7 +16,7 @@ const createNode = async (bootstrappers) => {
|
||||
transports: [new TCP()],
|
||||
streamMuxers: [new Mplex()],
|
||||
connectionEncryption: [new Noise()],
|
||||
pubsub: new Gossipsub(),
|
||||
pubsub: new FloodSub(),
|
||||
peerDiscovery: [
|
||||
new Bootstrap({
|
||||
list: bootstrappers
|
||||
@@ -40,7 +40,7 @@ const createNode = async (bootstrappers) => {
|
||||
transports: [new TCP()],
|
||||
streamMuxers: [new Mplex()],
|
||||
connectionEncryption: [new Noise()],
|
||||
pubsub: new Gossipsub(),
|
||||
pubsub: new FloodSub(),
|
||||
peerDiscovery: [
|
||||
new PubSubPeerDiscovery({
|
||||
interval: 1000
|
||||
|
@@ -9,11 +9,11 @@
|
||||
},
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "^6.0.1",
|
||||
"@libp2p/bootstrap": "^1.0.1",
|
||||
"@libp2p/mplex": "^1.0.2",
|
||||
"@libp2p/webrtc-star": "^1.0.6",
|
||||
"@libp2p/websockets": "^1.0.3",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"@libp2p/webrtc-star": "^1.0.8",
|
||||
"@libp2p/websockets": "^1.0.7",
|
||||
"libp2p": "../../"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@@ -9,8 +9,9 @@
|
||||
},
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@achingbrain/libp2p-gossipsub": "^0.13.5",
|
||||
"@libp2p/pubsub-peer-discovery": "^5.0.1",
|
||||
"@libp2p/pubsub-peer-discovery": "^5.0.2",
|
||||
"@libp2p/floodsub": "^1.0.6",
|
||||
"@nodeutils/defaults-deep": "^1.1.0",
|
||||
"execa": "^2.1.0",
|
||||
"fs-extra": "^8.1.0",
|
||||
"libp2p": "../",
|
||||
|
@@ -1,6 +1,6 @@
|
||||
/* eslint no-console: ["off"] */
|
||||
|
||||
import { generate } from 'libp2p/pnet/generate'
|
||||
import { generateKey } from 'libp2p/pnet'
|
||||
import { privateLibp2pNode } from './libp2p-node.js'
|
||||
import { pipe } from 'it-pipe'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
@@ -8,11 +8,11 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
|
||||
// Create a Uint8Array and write the swarm key to it
|
||||
const swarmKey = new Uint8Array(95)
|
||||
generate(swarmKey)
|
||||
generateKey(swarmKey)
|
||||
|
||||
// This key is for testing a different key not working
|
||||
const otherSwarmKey = new Uint8Array(95)
|
||||
generate(otherSwarmKey)
|
||||
generateKey(otherSwarmKey)
|
||||
|
||||
;(async () => {
|
||||
const node1 = await privateLibp2pNode(swarmKey)
|
||||
|
@@ -167,10 +167,105 @@ There is one last trick on _protocol and stream multiplexing_ that libp2p uses t
|
||||
|
||||
With the aid of both mechanisms, we can reuse an incomming connection to dial streams out too, this is specially useful when you are behind tricky NAT, firewalls or if you are running in a browser, where you can't have listening addrs, but you can dial out. By dialing out, you enable other peers to talk with you in Protocols that they want, simply by opening a new multiplexed stream.
|
||||
|
||||
You can see this working on example [3.js](./3.js). The result should look like the following:
|
||||
You can see this working on example [3.js](./3.js).
|
||||
|
||||
As we've seen earlier, we can create our node with this createNode function.
|
||||
```js
|
||||
const createNode = async () => {
|
||||
const node = await Libp2p.create({
|
||||
addresses: {
|
||||
listen: ['/ip4/0.0.0.0/tcp/0']
|
||||
},
|
||||
modules: {
|
||||
transport: [TCP],
|
||||
streamMuxer: [MPLEX],
|
||||
connEncryption: [NOISE]
|
||||
}
|
||||
})
|
||||
|
||||
await node.start()
|
||||
|
||||
return node
|
||||
}
|
||||
```
|
||||
|
||||
We can now create our two nodes for this example.
|
||||
```js
|
||||
const [node1, node2] = await Promise.all([
|
||||
createNode(),
|
||||
createNode()
|
||||
])
|
||||
```
|
||||
|
||||
Since, we want to connect these nodes `node1` & `node2`, we add our `node2` multiaddr in key-value pair in `node1` peer store.
|
||||
```js
|
||||
await node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs)
|
||||
```
|
||||
|
||||
You may notice that we are only adding `node2` to `node1` peer store. This is because we want to dial up a bidirectional connection between these two nodes.
|
||||
|
||||
Finally, let's create protocols for `node1` & `node2` and dial those protocols.
|
||||
```js
|
||||
node1.handle('/node-1', ({ stream }) => {
|
||||
pipe(
|
||||
stream,
|
||||
async function (source) {
|
||||
for await (const msg of source) {
|
||||
console.log(msg.toString())
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
node2.handle('/node-2', ({ stream }) => {
|
||||
pipe(
|
||||
stream,
|
||||
async function (source) {
|
||||
for await (const msg of source) {
|
||||
console.log(msg.toString())
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
// Dialing node2 from node1
|
||||
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
|
||||
await pipe(
|
||||
['from 1 to 2'],
|
||||
stream1
|
||||
)
|
||||
|
||||
// Dialing node1 from node2
|
||||
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
|
||||
await pipe(
|
||||
['from 2 to 1'],
|
||||
stream2
|
||||
)
|
||||
```
|
||||
|
||||
If we run this code, the result should look like the following:
|
||||
|
||||
```Bash
|
||||
> node 3.js
|
||||
from 1 to 2
|
||||
from 2 to 1
|
||||
```
|
||||
|
||||
So, we have successfully set up a bidirectional connection with protocol muxing. But you should be aware that we were able to dial from `node2` to `node1` even we haven't added the `node1` peerId to node2 address book is because we dialed node2 from node1 first. Then, we just dialed back our stream out from `node2` to `node1`. So, if we dial from `node2` to `node1` before dialing from `node1` to `node2` we will get an error.
|
||||
|
||||
The code below will result into an error as `the dial address is not valid`.
|
||||
```js
|
||||
// Dialing from node2 to node1
|
||||
const { stream: stream2 } = await node2.dialProtocol(node1.peerId, ['/node-1'])
|
||||
await pipe(
|
||||
['from 2 to 1'],
|
||||
stream2
|
||||
)
|
||||
|
||||
// Dialing from node1 to node2
|
||||
const { stream: stream1 } = await node1.dialProtocol(node2.peerId, ['/node-2'])
|
||||
await pipe(
|
||||
['from 1 to 2'],
|
||||
stream1
|
||||
)
|
||||
```
|
@@ -4,10 +4,9 @@ import { createLibp2p } from 'libp2p'
|
||||
import { TCP } from '@libp2p/tcp'
|
||||
import { Mplex } from '@libp2p/mplex'
|
||||
import { Noise } from '@chainsafe/libp2p-noise'
|
||||
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
|
||||
import { FloodSub } from '@libp2p/floodsub'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
|
||||
const createNode = async () => {
|
||||
const node = await createLibp2p({
|
||||
@@ -17,7 +16,7 @@ const createNode = async () => {
|
||||
transports: [new TCP()],
|
||||
streamMuxers: [new Mplex()],
|
||||
connectionEncryption: [new Noise()],
|
||||
pubsub: new Gossipsub()
|
||||
pubsub: new FloodSub()
|
||||
})
|
||||
|
||||
await node.start()
|
||||
@@ -36,17 +35,21 @@ const createNode = async () => {
|
||||
await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs())
|
||||
await node1.dial(node2.peerId)
|
||||
|
||||
node1.pubsub.addEventListener(topic, (evt) => {
|
||||
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
|
||||
node1.pubsub.subscribe(topic)
|
||||
node1.pubsub.addEventListener('message', (evt) => {
|
||||
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
|
||||
})
|
||||
|
||||
// Will not receive own published messages by default
|
||||
node2.pubsub.addEventListener(topic, (evt) => {
|
||||
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
|
||||
node2.pubsub.subscribe(topic)
|
||||
node2.pubsub.addEventListener('message', (evt) => {
|
||||
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
|
||||
})
|
||||
|
||||
// node2 publishes "news" every second
|
||||
setInterval(() => {
|
||||
node2.pubsub.dispatchEvent(new CustomEvent(topic, { detail: uint8ArrayFromString('Bird bird bird, bird is the word!') }))
|
||||
node2.pubsub.publish(topic, uint8ArrayFromString('Bird bird bird, bird is the word!')).catch(err => {
|
||||
console.error(err)
|
||||
})
|
||||
}, 1000)
|
||||
})()
|
||||
|
@@ -69,7 +69,9 @@ await node2.pubsub.subscribe(topic)
|
||||
|
||||
// node2 publishes "news" every second
|
||||
setInterval(() => {
|
||||
node2.pubsub.publish(topic, fromString('Bird bird bird, bird is the word!'))
|
||||
node2.pubsub.publish(topic, fromString('Bird bird bird, bird is the word!')).catch(err => {
|
||||
console.error(err)
|
||||
})
|
||||
}, 1000)
|
||||
```
|
||||
|
||||
|
@@ -4,10 +4,9 @@ import { createLibp2p } from 'libp2p'
|
||||
import { TCP } from '@libp2p/tcp'
|
||||
import { Mplex } from '@libp2p/mplex'
|
||||
import { Noise } from '@chainsafe/libp2p-noise'
|
||||
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
|
||||
import { FloodSub } from '@libp2p/floodsub'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
|
||||
const createNode = async () => {
|
||||
const node = await createLibp2p({
|
||||
@@ -17,7 +16,7 @@ const createNode = async () => {
|
||||
transports: [new TCP()],
|
||||
streamMuxers: [new Mplex()],
|
||||
connectionEncryption: [new Noise()],
|
||||
pubsub: new Gossipsub()
|
||||
pubsub: new FloodSub()
|
||||
})
|
||||
|
||||
await node.start()
|
||||
@@ -45,7 +44,7 @@ const createNode = async () => {
|
||||
// Will not receive own published messages by default
|
||||
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
|
||||
})
|
||||
await node1.pubsub.subscribe(topic)
|
||||
node1.pubsub.subscribe(topic)
|
||||
|
||||
node2.pubsub.addEventListener(topic, (evt) => {
|
||||
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
|
||||
@@ -75,7 +74,9 @@ const createNode = async () => {
|
||||
// car is not a fruit !
|
||||
setInterval(() => {
|
||||
console.log('############## fruit ' + myFruits[count] + ' ##############')
|
||||
node1.pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, { detail: uint8ArrayFromString(myFruits[count]) }))
|
||||
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count])).catch(err => {
|
||||
console.info(err)
|
||||
})
|
||||
count++
|
||||
if (count == myFruits.length) {
|
||||
count = 0
|
||||
|
@@ -88,7 +88,9 @@ const myFruits = ['banana', 'apple', 'car', 'orange'];
|
||||
|
||||
setInterval(() => {
|
||||
console.log('############## fruit ' + myFruits[count] + ' ##############')
|
||||
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count]))
|
||||
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count])).catch(err => {
|
||||
console.error(err)
|
||||
})
|
||||
count++
|
||||
if (count == myFruits.length) {
|
||||
count = 0
|
||||
|
@@ -9,10 +9,10 @@
|
||||
},
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@libp2p/webrtc-direct": "^1.0.0",
|
||||
"@chainsafe/libp2p-noise": "^6.0.1",
|
||||
"@libp2p/bootstrap": "^1.0.1",
|
||||
"@libp2p/mplex": "^1.0.2",
|
||||
"@libp2p/webrtc-direct": "^1.0.1",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/mplex": "^1.0.4",
|
||||
"libp2p": "../../",
|
||||
"wrtc": "^0.4.7"
|
||||
},
|
||||
|
76
package.json
76
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "libp2p",
|
||||
"version": "0.36.2",
|
||||
"version": "0.37.1",
|
||||
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"homepage": "https://github.com/libp2p/js-libp2p#readme",
|
||||
@@ -57,9 +57,6 @@
|
||||
"./pnet": {
|
||||
"import": "./dist/src/pnet/index.js"
|
||||
},
|
||||
"./pnet/generate": {
|
||||
"import": "./dist/src/pnet/key-generator.js"
|
||||
},
|
||||
"./transport-manager": {
|
||||
"import": "./dist/src/transport-manager.js"
|
||||
}
|
||||
@@ -95,26 +92,24 @@
|
||||
"test:interop": "aegir test -t node -f dist/test/interop.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"@achingbrain/nat-port-mapper": "^1.0.0",
|
||||
"@libp2p/connection": "^1.1.4",
|
||||
"@libp2p/crypto": "^0.22.9",
|
||||
"@libp2p/interfaces": "^1.3.21",
|
||||
"@libp2p/logger": "^1.1.3",
|
||||
"@libp2p/multistream-select": "^1.0.3",
|
||||
"@libp2p/peer-id": "^1.1.8",
|
||||
"@libp2p/peer-id-factory": "^1.0.8",
|
||||
"@achingbrain/nat-port-mapper": "^1.0.3",
|
||||
"@libp2p/connection": "^2.0.2",
|
||||
"@libp2p/crypto": "^0.22.11",
|
||||
"@libp2p/interfaces": "^2.0.2",
|
||||
"@libp2p/logger": "^1.1.4",
|
||||
"@libp2p/multistream-select": "^1.0.4",
|
||||
"@libp2p/peer-collections": "^1.0.2",
|
||||
"@libp2p/peer-id": "^1.1.10",
|
||||
"@libp2p/peer-id-factory": "^1.0.9",
|
||||
"@libp2p/peer-record": "^1.0.8",
|
||||
"@libp2p/peer-store": "^1.0.6",
|
||||
"@libp2p/utils": "^1.0.9",
|
||||
"@libp2p/peer-store": "^1.0.10",
|
||||
"@libp2p/tracked-map": "^1.0.5",
|
||||
"@libp2p/utils": "^1.0.10",
|
||||
"@multiformats/mafmt": "^11.0.2",
|
||||
"@multiformats/multiaddr": "^10.1.8",
|
||||
"abortable-iterator": "^4.0.2",
|
||||
"aggregate-error": "^4.0.0",
|
||||
"any-signal": "^3.0.0",
|
||||
"bignumber.js": "^9.0.1",
|
||||
"class-is": "^1.1.0",
|
||||
"datastore-core": "^7.0.0",
|
||||
"debug": "^4.3.3",
|
||||
"err-code": "^3.0.1",
|
||||
"events": "^3.3.0",
|
||||
"hashlru": "^2.3.0",
|
||||
@@ -128,13 +123,11 @@
|
||||
"it-length-prefixed": "^7.0.1",
|
||||
"it-map": "^1.0.6",
|
||||
"it-merge": "^1.0.3",
|
||||
"it-pair": "^2.0.2",
|
||||
"it-pipe": "^2.0.3",
|
||||
"it-sort": "^1.0.1",
|
||||
"it-stream-types": "^1.0.4",
|
||||
"it-take": "^1.0.2",
|
||||
"it-to-buffer": "^2.0.2",
|
||||
"merge-options": "^3.0.4",
|
||||
"mortice": "^3.0.0",
|
||||
"multiformats": "^9.6.3",
|
||||
"mutable-proxy": "^1.0.0",
|
||||
"node-forge": "^1.2.1",
|
||||
@@ -142,62 +135,55 @@
|
||||
"p-retry": "^5.0.0",
|
||||
"p-settle": "^5.0.0",
|
||||
"private-ip": "^2.3.3",
|
||||
"protons-runtime": "^1.0.2",
|
||||
"protons-runtime": "^1.0.4",
|
||||
"retimer": "^3.0.0",
|
||||
"sanitize-filename": "^1.6.3",
|
||||
"set-delayed-interval": "^1.0.0",
|
||||
"streaming-iterables": "^6.0.0",
|
||||
"timeout-abort-controller": "^3.0.0",
|
||||
"uint8arrays": "^3.0.0",
|
||||
"varint": "^6.0.0",
|
||||
"wherearewe": "^1.0.0",
|
||||
"xsalsa20": "^1.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@achingbrain/libp2p-gossipsub": "^0.13.5",
|
||||
"@chainsafe/libp2p-noise": "^6.0.1",
|
||||
"@libp2p/bootstrap": "^1.0.2",
|
||||
"@libp2p/daemon-client": "^1.0.0",
|
||||
"@libp2p/daemon-server": "^1.0.0",
|
||||
"@chainsafe/libp2p-noise": "^6.2.0",
|
||||
"@libp2p/bootstrap": "^1.0.4",
|
||||
"@libp2p/daemon-client": "^1.0.2",
|
||||
"@libp2p/daemon-server": "^1.0.2",
|
||||
"@libp2p/delegated-content-routing": "^1.0.2",
|
||||
"@libp2p/delegated-peer-routing": "^1.0.2",
|
||||
"@libp2p/floodsub": "^1.0.2",
|
||||
"@libp2p/interface-compliance-tests": "^1.1.20",
|
||||
"@libp2p/floodsub": "^1.0.6",
|
||||
"@libp2p/interface-compliance-tests": "^2.0.3",
|
||||
"@libp2p/interop": "^1.0.3",
|
||||
"@libp2p/kad-dht": "^1.0.5",
|
||||
"@libp2p/mdns": "^1.0.3",
|
||||
"@libp2p/mplex": "^1.0.1",
|
||||
"@libp2p/pubsub": "^1.2.14",
|
||||
"@libp2p/tcp": "^1.0.6",
|
||||
"@libp2p/kad-dht": "^1.0.9",
|
||||
"@libp2p/mdns": "^1.0.5",
|
||||
"@libp2p/mplex": "^1.1.0",
|
||||
"@libp2p/pubsub": "^1.2.18",
|
||||
"@libp2p/tcp": "^1.0.9",
|
||||
"@libp2p/topology": "^1.1.7",
|
||||
"@libp2p/tracked-map": "^1.0.4",
|
||||
"@libp2p/webrtc-star": "^1.0.3",
|
||||
"@libp2p/websockets": "^1.0.3",
|
||||
"@nodeutils/defaults-deep": "^1.1.0",
|
||||
"@types/node": "^16.11.26",
|
||||
"@libp2p/webrtc-star": "^1.0.8",
|
||||
"@libp2p/websockets": "^1.0.7",
|
||||
"@types/node-forge": "^1.0.0",
|
||||
"@types/p-fifo": "^1.0.0",
|
||||
"@types/varint": "^6.0.0",
|
||||
"@types/xsalsa20": "^1.1.0",
|
||||
"aegir": "^37.0.9",
|
||||
"buffer": "^6.0.3",
|
||||
"cborg": "^1.8.1",
|
||||
"delay": "^5.0.0",
|
||||
"execa": "^6.1.0",
|
||||
"go-libp2p": "^0.0.6",
|
||||
"into-stream": "^7.0.0",
|
||||
"ipfs-http-client": "^56.0.1",
|
||||
"it-pair": "^2.0.2",
|
||||
"it-pushable": "^2.0.1",
|
||||
"it-to-buffer": "^2.0.2",
|
||||
"nock": "^13.0.3",
|
||||
"npm-run-all": "^4.1.5",
|
||||
"p-defer": "^4.0.0",
|
||||
"p-event": "^5.0.1",
|
||||
"p-times": "^4.0.0",
|
||||
"p-wait-for": "^4.1.0",
|
||||
"protons": "^3.0.2",
|
||||
"protons": "^3.0.4",
|
||||
"rimraf": "^3.0.2",
|
||||
"sinon": "^13.0.1",
|
||||
"sinon": "^14.0.0",
|
||||
"ts-sinon": "^2.0.2"
|
||||
},
|
||||
"browser": {
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import { AddressManagerEvents, CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import type { AddressManagerEvents } from '@libp2p/interfaces/address-manager'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
@@ -85,12 +85,14 @@ export class AutoRelay {
|
||||
|
||||
// If protocol, check if can hop, store info in the metadataBook and listen on it
|
||||
try {
|
||||
const connection = this.components.getConnectionManager().getConnection(peerId)
|
||||
const connections = this.components.getConnectionManager().getConnections(peerId)
|
||||
|
||||
if (connection == null) {
|
||||
if (connections.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const connection = connections[0]
|
||||
|
||||
// Do not hop on a relayed connection
|
||||
if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) {
|
||||
log(`relayed connection to ${id} will not be used to hop on`)
|
||||
@@ -223,15 +225,15 @@ export class AutoRelay {
|
||||
continue
|
||||
}
|
||||
|
||||
const connection = this.components.getConnectionManager().getConnection(id)
|
||||
const connections = this.components.getConnectionManager().getConnections(id)
|
||||
|
||||
// If not connected, store for possible later use.
|
||||
if (connection == null) {
|
||||
if (connections.length === 0) {
|
||||
knownHopsToDial.push(id)
|
||||
continue
|
||||
}
|
||||
|
||||
await this._addListenRelay(connection, idStr)
|
||||
await this._addListenRelay(connections[0], idStr)
|
||||
|
||||
// Check if already listening on enough relays
|
||||
if (this.listenRelays.size >= this.maxListeners) {
|
||||
@@ -274,7 +276,7 @@ export class AutoRelay {
|
||||
|
||||
async _tryToListenOnRelay (peerId: PeerId) {
|
||||
try {
|
||||
const connection = await this.components.getDialer().dial(peerId)
|
||||
const connection = await this.components.getConnectionManager().openConnection(peerId)
|
||||
await this._addListenRelay(connection, peerId.toString())
|
||||
} catch (err: any) {
|
||||
log.error('Could not use %p as relay', peerId, err)
|
||||
|
@@ -11,7 +11,7 @@ import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import { peerIdFromBytes } from '@libp2p/peer-id'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { Circuit } from '../transport.js'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/registrar'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
|
||||
const log = logger('libp2p:circuit:hop')
|
||||
|
||||
@@ -58,8 +58,8 @@ export async function handleHop (hopRequest: HopRequest) {
|
||||
// Get the connection to the destination (stop) peer
|
||||
const destinationPeer = peerIdFromBytes(request.dstPeer.id)
|
||||
|
||||
const destinationConnection = connectionManager.getConnection(destinationPeer)
|
||||
if (destinationConnection == null && !circuit.hopActive()) {
|
||||
const destinationConnections = connectionManager.getConnections(destinationPeer)
|
||||
if (destinationConnections.length === 0 && !circuit.hopActive()) {
|
||||
log('HOP request received but we are not connected to the destination peer')
|
||||
return streamHandler.end({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
@@ -68,7 +68,7 @@ export async function handleHop (hopRequest: HopRequest) {
|
||||
}
|
||||
|
||||
// TODO: Handle being an active relay
|
||||
if (destinationConnection == null) {
|
||||
if (destinationConnections.length === 0) {
|
||||
log('did not have connection to remote peer')
|
||||
return streamHandler.end({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
@@ -87,7 +87,7 @@ export async function handleHop (hopRequest: HopRequest) {
|
||||
try {
|
||||
log('performing STOP request')
|
||||
const result = await stop({
|
||||
connection: destinationConnection,
|
||||
connection: destinationConnections[0],
|
||||
request: stopRequest
|
||||
})
|
||||
|
||||
|
@@ -11,7 +11,7 @@ import {
|
||||
RELAY_RENDEZVOUS_NS
|
||||
} from './constants.js'
|
||||
import type { AddressSorter } from '@libp2p/interfaces/peer-store'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
||||
const log = logger('libp2p:relay')
|
||||
|
@@ -1,11 +1,12 @@
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/registrar'
|
||||
import type { Dialer } from '@libp2p/interfaces/dialer'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
import type { PeerStore } from '@libp2p/interfaces/peer-store'
|
||||
import type { Listener } from '@libp2p/interfaces/transport'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
|
||||
export interface ListenerOptions {
|
||||
dialer: Dialer
|
||||
peerStore: PeerStore
|
||||
connectionManager: ConnectionManager
|
||||
}
|
||||
|
||||
@@ -17,7 +18,19 @@ export function createListener (options: ListenerOptions): Listener {
|
||||
*/
|
||||
async function listen (addr: Multiaddr): Promise<void> {
|
||||
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
|
||||
const relayConn = await options.dialer.dial(new Multiaddr(addrString))
|
||||
const ma = new Multiaddr(addrString)
|
||||
|
||||
const relayPeerStr = ma.getPeerId()
|
||||
|
||||
if (relayPeerStr == null) {
|
||||
throw new Error('Could not determine relay peer from multiaddr')
|
||||
}
|
||||
|
||||
const relayPeerId = peerIdFromString(relayPeerStr)
|
||||
|
||||
await options.peerStore.addressBook.add(relayPeerId, [ma])
|
||||
|
||||
const relayConn = await options.connectionManager.openConnection(relayPeerId)
|
||||
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
|
||||
|
||||
listeningAddrs.set(relayConn.remotePeer.toString(), relayedAddr)
|
||||
|
@@ -2,6 +2,7 @@
|
||||
/* eslint-disable @typescript-eslint/no-namespace */
|
||||
|
||||
import { enumeration, encodeMessage, decodeMessage, message, bytes } from 'protons-runtime'
|
||||
import type { Codec } from 'protons-runtime'
|
||||
|
||||
export interface CircuitRelay {
|
||||
type?: CircuitRelay.Type
|
||||
@@ -30,11 +31,31 @@ export namespace CircuitRelay {
|
||||
MALFORMED_MESSAGE = 'MALFORMED_MESSAGE'
|
||||
}
|
||||
|
||||
enum __StatusValues {
|
||||
SUCCESS = 100,
|
||||
HOP_SRC_ADDR_TOO_LONG = 220,
|
||||
HOP_DST_ADDR_TOO_LONG = 221,
|
||||
HOP_SRC_MULTIADDR_INVALID = 250,
|
||||
HOP_DST_MULTIADDR_INVALID = 251,
|
||||
HOP_NO_CONN_TO_DST = 260,
|
||||
HOP_CANT_DIAL_DST = 261,
|
||||
HOP_CANT_OPEN_DST_STREAM = 262,
|
||||
HOP_CANT_SPEAK_RELAY = 270,
|
||||
HOP_CANT_RELAY_TO_SELF = 280,
|
||||
STOP_SRC_ADDR_TOO_LONG = 320,
|
||||
STOP_DST_ADDR_TOO_LONG = 321,
|
||||
STOP_SRC_MULTIADDR_INVALID = 350,
|
||||
STOP_DST_MULTIADDR_INVALID = 351,
|
||||
STOP_RELAY_REFUSED = 390,
|
||||
MALFORMED_MESSAGE = 400
|
||||
}
|
||||
|
||||
export namespace Status {
|
||||
export const codec = () => {
|
||||
return enumeration<typeof Status>(Status)
|
||||
return enumeration<typeof Status>(__StatusValues)
|
||||
}
|
||||
}
|
||||
|
||||
export enum Type {
|
||||
HOP = 'HOP',
|
||||
STOP = 'STOP',
|
||||
@@ -42,18 +63,26 @@ export namespace CircuitRelay {
|
||||
CAN_HOP = 'CAN_HOP'
|
||||
}
|
||||
|
||||
enum __TypeValues {
|
||||
HOP = 1,
|
||||
STOP = 2,
|
||||
STATUS = 3,
|
||||
CAN_HOP = 4
|
||||
}
|
||||
|
||||
export namespace Type {
|
||||
export const codec = () => {
|
||||
return enumeration<typeof Type>(Type)
|
||||
return enumeration<typeof Type>(__TypeValues)
|
||||
}
|
||||
}
|
||||
|
||||
export interface Peer {
|
||||
id: Uint8Array
|
||||
addrs: Uint8Array[]
|
||||
}
|
||||
|
||||
export namespace Peer {
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<Peer> => {
|
||||
return message<Peer>({
|
||||
1: { name: 'id', codec: bytes },
|
||||
2: { name: 'addrs', codec: bytes, repeats: true }
|
||||
@@ -69,7 +98,7 @@ export namespace CircuitRelay {
|
||||
}
|
||||
}
|
||||
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<CircuitRelay> => {
|
||||
return message<CircuitRelay>({
|
||||
1: { name: 'type', codec: CircuitRelay.Type.codec(), optional: true },
|
||||
2: { name: 'srcPeer', codec: CircuitRelay.Peer.codec(), optional: true },
|
||||
|
@@ -49,7 +49,7 @@ export class Circuit implements Transport, Initializable {
|
||||
}
|
||||
|
||||
get [Symbol.toStringTag] () {
|
||||
return this.constructor.name
|
||||
return 'libp2p/circuit-relay-v1'
|
||||
}
|
||||
|
||||
async _onProtocol (data: IncomingStreamData) {
|
||||
@@ -149,9 +149,12 @@ export class Circuit implements Transport, Initializable {
|
||||
const destinationPeer = peerIdFromString(destinationId)
|
||||
|
||||
let disconnectOnFailure = false
|
||||
let relayConnection = this.components.getConnectionManager().getConnection(relayPeer)
|
||||
const relayConnections = this.components.getConnectionManager().getConnections(relayPeer)
|
||||
let relayConnection = relayConnections[0]
|
||||
|
||||
if (relayConnection == null) {
|
||||
relayConnection = await this.components.getDialer().dial(relayAddr, options)
|
||||
await this.components.getPeerStore().addressBook.add(relayPeer, [relayAddr])
|
||||
relayConnection = await this.components.getConnectionManager().openConnection(relayPeer, options)
|
||||
disconnectOnFailure = true
|
||||
}
|
||||
|
||||
@@ -195,8 +198,8 @@ export class Circuit implements Transport, Initializable {
|
||||
this.handler = options.handler
|
||||
|
||||
return createListener({
|
||||
dialer: this.components.getDialer(),
|
||||
connectionManager: this.components.getConnectionManager()
|
||||
connectionManager: this.components.getConnectionManager(),
|
||||
peerStore: this.components.getPeerStore()
|
||||
})
|
||||
}
|
||||
|
||||
|
@@ -21,14 +21,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
connectionManager: {
|
||||
maxConnections: 300,
|
||||
minConnections: 50,
|
||||
autoDial: true,
|
||||
autoDialInterval: 10000,
|
||||
autoDial: true
|
||||
},
|
||||
connectionGater: {},
|
||||
transportManager: {
|
||||
faultTolerance: FaultTolerance.FATAL_ALL
|
||||
},
|
||||
dialer: {
|
||||
maxParallelDials: Constants.MAX_PARALLEL_DIALS,
|
||||
maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS,
|
||||
dialTimeout: Constants.DIAL_TIMEOUT,
|
||||
@@ -37,8 +31,9 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
},
|
||||
addressSorter: publicAddressesFirst
|
||||
},
|
||||
host: {
|
||||
agentVersion: AGENT_VERSION
|
||||
connectionGater: {},
|
||||
transportManager: {
|
||||
faultTolerance: FaultTolerance.FATAL_ALL
|
||||
},
|
||||
metrics: {
|
||||
enabled: false,
|
||||
@@ -58,7 +53,6 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
bootDelay: 10e3
|
||||
}
|
||||
},
|
||||
protocolPrefix: 'ipfs',
|
||||
nat: {
|
||||
enabled: true,
|
||||
ttl: 7200,
|
||||
@@ -79,6 +73,19 @@ const DefaultConfig: Partial<Libp2pInit> = {
|
||||
enabled: false,
|
||||
maxListeners: 2
|
||||
}
|
||||
},
|
||||
identify: {
|
||||
protocolPrefix: 'ipfs',
|
||||
host: {
|
||||
agentVersion: AGENT_VERSION
|
||||
},
|
||||
timeout: 30000
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: 'ipfs'
|
||||
},
|
||||
fetch: {
|
||||
protocolPrefix: 'libp2p'
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -6,7 +6,7 @@ import all from 'it-all'
|
||||
import { pipe } from 'it-pipe'
|
||||
import filter from 'it-filter'
|
||||
import sort from 'it-sort'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
||||
const log = logger('libp2p:connection-manager:auto-dialler')
|
||||
@@ -102,7 +102,7 @@ export class AutoDialler implements Startable {
|
||||
const minConnections = this.options.minConnections
|
||||
|
||||
// Already has enough connections
|
||||
if (this.components.getConnectionManager().getConnectionList().length >= minConnections) {
|
||||
if (this.components.getConnectionManager().getConnections().length >= minConnections) {
|
||||
this.autoDialTimeout = retimer(this._autoDial, this.options.autoDialInterval)
|
||||
|
||||
return
|
||||
@@ -126,7 +126,7 @@ export class AutoDialler implements Startable {
|
||||
async (source) => await all(source)
|
||||
)
|
||||
|
||||
for (let i = 0; this.running && i < peers.length && this.components.getConnectionManager().getConnectionList().length < minConnections; i++) {
|
||||
for (let i = 0; this.running && i < peers.length && this.components.getConnectionManager().getConnections().length < minConnections; i++) {
|
||||
// Connection Manager was stopped during async dial
|
||||
if (!this.running) {
|
||||
return
|
||||
@@ -134,10 +134,10 @@ export class AutoDialler implements Startable {
|
||||
|
||||
const peer = peers[i]
|
||||
|
||||
if (this.components.getConnectionManager().getConnection(peer.id) == null) {
|
||||
if (this.components.getConnectionManager().getConnections(peer.id).length === 0) {
|
||||
log('connecting to a peerStore stored peer %p', peer.id)
|
||||
try {
|
||||
await this.components.getDialer().dial(peer.id)
|
||||
await this.components.getConnectionManager().openConnection(peer.id)
|
||||
} catch (err: any) {
|
||||
log.error('could not connect to peerStore stored peer', err)
|
||||
}
|
||||
|
@@ -1,39 +1,58 @@
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
import { logger } from '@libp2p/logger'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
|
||||
const log = logger('libp2p:dialer:auto-dialer')
|
||||
|
||||
export interface AutoDialerInit {
|
||||
enabled: boolean
|
||||
minConnections: number
|
||||
dialTimeout: number
|
||||
}
|
||||
|
||||
export class AutoDialer {
|
||||
private readonly components: Components
|
||||
private readonly enabled: boolean
|
||||
private readonly minConnections: number
|
||||
private readonly dialTimeout: number
|
||||
|
||||
constructor (components: Components, init: AutoDialerInit) {
|
||||
this.components = components
|
||||
this.enabled = init.enabled
|
||||
this.minConnections = init.minConnections
|
||||
this.dialTimeout = init.dialTimeout
|
||||
}
|
||||
|
||||
public handle (evt: CustomEvent<PeerInfo>) {
|
||||
const { detail: peer } = evt
|
||||
|
||||
if (!this.enabled) {
|
||||
return
|
||||
}
|
||||
|
||||
const connections = this.components.getConnectionManager().getConnections(peer.id)
|
||||
|
||||
// If auto dialing is on and we have no connection to the peer, check if we should dial
|
||||
if (this.enabled && this.components.getConnectionManager().getConnection(peer.id) == null) {
|
||||
if (connections.length === 0) {
|
||||
const minConnections = this.minConnections ?? 0
|
||||
|
||||
if (minConnections > this.components.getConnectionManager().getConnectionList().length) {
|
||||
log('auto-dialing discovered peer %p', peer.id)
|
||||
const allConnections = this.components.getConnectionManager().getConnections()
|
||||
|
||||
void this.components.getDialer().dial(peer.id)
|
||||
if (minConnections > allConnections.length) {
|
||||
log('auto-dialing discovered peer %p with timeout %d', peer.id, this.dialTimeout)
|
||||
|
||||
const controller = new TimeoutController(this.dialTimeout)
|
||||
|
||||
void this.components.getConnectionManager().openConnection(peer.id, {
|
||||
signal: controller.signal
|
||||
})
|
||||
.catch(err => {
|
||||
log.error('could not connect to discovered peer %p with %o', peer.id, err)
|
||||
})
|
||||
.finally(() => {
|
||||
controller.clear()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,14 +1,13 @@
|
||||
import errCode from 'err-code'
|
||||
import { anySignal } from 'any-signal'
|
||||
import FIFO from 'p-fifo'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import { codes } from '../errors.js'
|
||||
import { codes } from '../../errors.js'
|
||||
import { logger } from '@libp2p/logger'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { DefaultDialer } from './index.js'
|
||||
import type { Dialer } from './index.js'
|
||||
|
||||
const log = logger('libp2p:dialer:dial-request')
|
||||
|
||||
@@ -19,12 +18,12 @@ export interface DialAction {
|
||||
export interface DialRequestOptions {
|
||||
addrs: Multiaddr[]
|
||||
dialAction: DialAction
|
||||
dialer: DefaultDialer
|
||||
dialer: Dialer
|
||||
}
|
||||
|
||||
export class DialRequest {
|
||||
private readonly addrs: Multiaddr[]
|
||||
private readonly dialer: DefaultDialer
|
||||
private readonly dialer: Dialer
|
||||
private readonly dialAction: DialAction
|
||||
|
||||
/**
|
||||
@@ -62,7 +61,7 @@ export class DialRequest {
|
||||
})
|
||||
}
|
||||
|
||||
const dialAbortControllers = this.addrs.map(() => {
|
||||
const dialAbortControllers: Array<(AbortController | undefined)> = this.addrs.map(() => {
|
||||
const controller = new AbortController()
|
||||
try {
|
||||
// fails on node < 15.4
|
||||
@@ -80,16 +79,27 @@ export class DialRequest {
|
||||
}
|
||||
|
||||
let completedDials = 0
|
||||
let done = false
|
||||
|
||||
try {
|
||||
return await Promise.any(this.addrs.map(async (addr, i) => {
|
||||
const token = await tokenHolder.shift() // get token
|
||||
// End attempt once another attempt succeeded
|
||||
if (done) {
|
||||
this.dialer.releaseToken(tokens.splice(tokens.indexOf(token), 1)[0])
|
||||
throw errCode(new Error('dialAction already succeeded'), codes.ERR_ALREADY_SUCCEEDED)
|
||||
}
|
||||
|
||||
const controller = dialAbortControllers[i]
|
||||
if (controller == null) {
|
||||
throw errCode(new Error('dialAction did not come with an AbortController'), codes.ERR_INVALID_PARAMETERS)
|
||||
}
|
||||
let conn
|
||||
try {
|
||||
const signal = dialAbortControllers[i].signal
|
||||
const signal = controller.signal
|
||||
conn = await this.dialAction(addr, { ...options, signal: (options.signal != null) ? anySignal([signal, options.signal]) : signal })
|
||||
// Remove the successful AbortController so it is not aborted
|
||||
dialAbortControllers.splice(i, 1)
|
||||
dialAbortControllers[i] = undefined
|
||||
} finally {
|
||||
completedDials++
|
||||
// If we have more or equal dials remaining than tokens, recycle the token, otherwise release it
|
||||
@@ -102,10 +112,25 @@ export class DialRequest {
|
||||
}
|
||||
}
|
||||
|
||||
if (conn == null) {
|
||||
// Notify Promise.any that attempt was not successful
|
||||
// to prevent from returning undefined despite there
|
||||
// were successful dial attempts
|
||||
throw errCode(new Error('dialAction led to empty object'), codes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
} else {
|
||||
// This dial succeeded, don't attempt anything else
|
||||
done = true
|
||||
}
|
||||
|
||||
return conn
|
||||
}))
|
||||
} finally {
|
||||
dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else
|
||||
// success/failure happened, abort everything else
|
||||
dialAbortControllers.forEach(c => {
|
||||
if (c !== undefined) {
|
||||
c.abort()
|
||||
}
|
||||
})
|
||||
tokens.forEach(token => this.dialer.releaseToken(token)) // release tokens back to the dialer
|
||||
}
|
||||
}
|
@@ -3,31 +3,31 @@ import all from 'it-all'
|
||||
import filter from 'it-filter'
|
||||
import { pipe } from 'it-pipe'
|
||||
import errCode from 'err-code'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { Multiaddr, Resolver } from '@multiformats/multiaddr'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import { AbortError } from '@libp2p/interfaces/errors'
|
||||
import { anySignal } from 'any-signal'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import { DialAction, DialRequest } from './dial-request.js'
|
||||
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
|
||||
import { trackedMap } from '@libp2p/tracked-map'
|
||||
import { codes } from '../errors.js'
|
||||
import { codes } from '../../errors.js'
|
||||
import {
|
||||
DIAL_TIMEOUT,
|
||||
MAX_PARALLEL_DIALS,
|
||||
MAX_PER_PEER_DIALS,
|
||||
MAX_ADDRS_TO_DIAL
|
||||
} from '../constants.js'
|
||||
} from '../../constants.js'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { AbortOptions, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { getPeer } from '../get-peer.js'
|
||||
import { getPeer } from '../../get-peer.js'
|
||||
import sort from 'it-sort'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { Dialer, DialerInit } from '@libp2p/interfaces/dialer'
|
||||
import { Components, Initializable } from '@libp2p/interfaces/components'
|
||||
import map from 'it-map'
|
||||
import type { AddressSorter } from '@libp2p/interfaces/peer-store'
|
||||
import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics'
|
||||
|
||||
const log = logger('libp2p:dialer')
|
||||
|
||||
@@ -52,8 +52,41 @@ export interface PendingDialTarget {
|
||||
reject: (err: Error) => void
|
||||
}
|
||||
|
||||
export class DefaultDialer implements Dialer, Startable {
|
||||
private readonly components: Components
|
||||
export interface DialerInit {
|
||||
/**
|
||||
* Sort the known addresses of a peer before trying to dial
|
||||
*/
|
||||
addressSorter?: AddressSorter
|
||||
|
||||
/**
|
||||
* Number of max concurrent dials
|
||||
*/
|
||||
maxParallelDials?: number
|
||||
|
||||
/**
|
||||
* Number of max addresses to dial for a given peer
|
||||
*/
|
||||
maxAddrsToDial?: number
|
||||
|
||||
/**
|
||||
* How long a dial attempt is allowed to take
|
||||
*/
|
||||
dialTimeout?: number
|
||||
|
||||
/**
|
||||
* Number of max concurrent dials per peer
|
||||
*/
|
||||
maxDialsPerPeer?: number
|
||||
|
||||
/**
|
||||
* Multiaddr resolvers to use when dialing
|
||||
*/
|
||||
resolvers?: Record<string, Resolver>
|
||||
metrics?: ComponentMetricsTracker
|
||||
}
|
||||
|
||||
export class Dialer implements Startable, Initializable {
|
||||
private components: Components = new Components()
|
||||
private readonly addressSorter: AddressSorter
|
||||
private readonly maxAddrsToDial: number
|
||||
private readonly timeout: number
|
||||
@@ -63,8 +96,7 @@ export class DefaultDialer implements Dialer, Startable {
|
||||
public pendingDialTargets: Map<string, PendingDialTarget>
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: DialerInit = {}) {
|
||||
this.components = components
|
||||
constructor (init: DialerInit = {}) {
|
||||
this.started = false
|
||||
this.addressSorter = init.addressSorter ?? publicAddressesFirst
|
||||
this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL
|
||||
@@ -87,6 +119,10 @@ export class DefaultDialer implements Dialer, Startable {
|
||||
}
|
||||
}
|
||||
|
||||
init (components: Components): void {
|
||||
this.components = components
|
||||
}
|
||||
|
||||
isStarted () {
|
||||
return this.started
|
||||
}
|
||||
@@ -139,16 +175,6 @@ export class DefaultDialer implements Dialer, Startable {
|
||||
throw errCode(new Error('The dial request is blocked by gater.allowDialPeer'), codes.ERR_PEER_DIAL_INTERCEPTED)
|
||||
}
|
||||
|
||||
log('dial to %p', id)
|
||||
|
||||
const existingConnection = this.components.getConnectionManager().getConnection(id)
|
||||
|
||||
if (existingConnection != null) {
|
||||
log('had an existing connection to %p', id)
|
||||
|
||||
return existingConnection
|
||||
}
|
||||
|
||||
log('creating dial target for %p', id)
|
||||
|
||||
const dialTarget = await this._createCancellableDialTarget(id)
|
||||
@@ -176,22 +202,6 @@ export class DefaultDialer implements Dialer, Startable {
|
||||
}
|
||||
}
|
||||
|
||||
async dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options: AbortOptions = {}) {
|
||||
if (protocols == null) {
|
||||
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
|
||||
}
|
||||
|
||||
protocols = Array.isArray(protocols) ? protocols : [protocols]
|
||||
|
||||
if (protocols.length === 0) {
|
||||
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
|
||||
}
|
||||
|
||||
const connection = await this.dial(peer, options)
|
||||
|
||||
return await connection.newStream(protocols)
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to a given `peer` by dialing all of its known addresses.
|
||||
* The dial to the first address that is successfully able to upgrade a connection
|
||||
@@ -244,9 +254,6 @@ export class DefaultDialer implements Dialer, Startable {
|
||||
const addrs: Multiaddr[] = []
|
||||
for (const a of knownAddrs) {
|
||||
const resolvedAddrs = await this._resolve(a)
|
||||
|
||||
log('resolved %s to %s', a, resolvedAddrs)
|
||||
|
||||
resolvedAddrs.forEach(ra => addrs.push(ra))
|
||||
}
|
||||
|
@@ -4,16 +4,20 @@ import mergeOptions from 'merge-options'
|
||||
import { LatencyMonitor, SummaryObject } from './latency-monitor.js'
|
||||
// @ts-expect-error retimer does not have types
|
||||
import retimer from 'retimer'
|
||||
import { CustomEvent, EventEmitter, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import { trackedMap } from '@libp2p/tracked-map'
|
||||
import { codes } from '../errors.js'
|
||||
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/registrar'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
import { Components, Initializable } from '@libp2p/interfaces/components'
|
||||
import * as STATUS from '@libp2p/interfaces/connection/status'
|
||||
import { Dialer } from './dialer/index.js'
|
||||
import type { AddressSorter } from '@libp2p/interfaces/peer-store'
|
||||
import type { Resolver } from '@multiformats/multiaddr'
|
||||
|
||||
const log = logger('libp2p:connection-manager')
|
||||
|
||||
@@ -34,21 +38,16 @@ const METRICS_COMPONENT = 'connection-manager'
|
||||
const METRICS_PEER_CONNECTIONS = 'peer-connections'
|
||||
const METRICS_PEER_VALUES = 'peer-values'
|
||||
|
||||
export interface ConnectionManagerEvents {
|
||||
'peer:connect': CustomEvent<PeerId>
|
||||
'peer:disconnect': CustomEvent<PeerId>
|
||||
}
|
||||
|
||||
export interface ConnectionManagerInit {
|
||||
/**
|
||||
* The maximum number of connections allowed
|
||||
* The maximum number of connections to keep open
|
||||
*/
|
||||
maxConnections?: number
|
||||
maxConnections: number
|
||||
|
||||
/**
|
||||
* The minimum number of connections to avoid pruning
|
||||
* The minimum number of connections to keep open
|
||||
*/
|
||||
minConnections?: number
|
||||
minConnections: number
|
||||
|
||||
/**
|
||||
* The max data (in and out), per average interval to allow
|
||||
@@ -86,39 +85,75 @@ export interface ConnectionManagerInit {
|
||||
defaultPeerValue?: number
|
||||
|
||||
/**
|
||||
* Should preemptively guarantee connections are above the low watermark
|
||||
* If true, try to connect to all discovered peers up to the connection manager limit
|
||||
*/
|
||||
autoDial?: boolean
|
||||
|
||||
/**
|
||||
* How often, in milliseconds, it should preemptively guarantee connections are above the low watermark
|
||||
* How long to wait between attempting to keep our number of concurrent connections
|
||||
* above minConnections
|
||||
*/
|
||||
autoDialInterval?: number
|
||||
autoDialInterval: number
|
||||
|
||||
/**
|
||||
* Sort the known addresses of a peer before trying to dial
|
||||
*/
|
||||
addressSorter?: AddressSorter
|
||||
|
||||
/**
|
||||
* Number of max concurrent dials
|
||||
*/
|
||||
maxParallelDials?: number
|
||||
|
||||
/**
|
||||
* Number of max addresses to dial for a given peer
|
||||
*/
|
||||
maxAddrsToDial?: number
|
||||
|
||||
/**
|
||||
* How long a dial attempt is allowed to take
|
||||
*/
|
||||
dialTimeout?: number
|
||||
|
||||
/**
|
||||
* Number of max concurrent dials per peer
|
||||
*/
|
||||
maxDialsPerPeer?: number
|
||||
|
||||
/**
|
||||
* Multiaddr resolvers to use when dialing
|
||||
*/
|
||||
resolvers?: Record<string, Resolver>
|
||||
}
|
||||
|
||||
export interface ConnectionManagerEvents {
|
||||
'peer:connect': CustomEvent<PeerId>
|
||||
'peer:disconnect': CustomEvent<PeerId>
|
||||
}
|
||||
|
||||
/**
|
||||
* Responsible for managing known connections.
|
||||
*/
|
||||
export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable {
|
||||
private readonly components: Components
|
||||
private readonly init: Required<ConnectionManagerInit>
|
||||
export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable, Initializable {
|
||||
public readonly dialer: Dialer
|
||||
private components = new Components()
|
||||
private readonly opts: Required<ConnectionManagerInit>
|
||||
private readonly peerValues: Map<string, number>
|
||||
private readonly connections: Map<string, Connection[]>
|
||||
private started: boolean
|
||||
private timer?: ReturnType<retimer>
|
||||
private readonly latencyMonitor: LatencyMonitor
|
||||
|
||||
constructor (components: Components, init: ConnectionManagerInit = {}) {
|
||||
constructor (init: ConnectionManagerInit) {
|
||||
super()
|
||||
|
||||
this.components = components
|
||||
this.init = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)
|
||||
this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)
|
||||
|
||||
if (this.init.maxConnections < this.init.minConnections) {
|
||||
if (this.opts.maxConnections < this.opts.minConnections) {
|
||||
throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS)
|
||||
}
|
||||
|
||||
log('options: %o', this.init)
|
||||
log('options: %o', this.opts)
|
||||
|
||||
/**
|
||||
* Map of peer identifiers to their peer value for pruning connections.
|
||||
@@ -153,12 +188,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
setMaxListeners?.(Infinity, this)
|
||||
} catch {}
|
||||
|
||||
this.components.getUpgrader().addEventListener('connection', (evt) => {
|
||||
void this.onConnect(evt).catch(err => {
|
||||
log.error(err)
|
||||
})
|
||||
})
|
||||
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect.bind(this))
|
||||
this.dialer = new Dialer(this.opts)
|
||||
|
||||
this.onConnect = this.onConnect.bind(this)
|
||||
this.onDisconnect = this.onDisconnect.bind(this)
|
||||
}
|
||||
|
||||
init (components: Components): void {
|
||||
this.components = components
|
||||
|
||||
this.dialer.init(components)
|
||||
}
|
||||
|
||||
isStarted () {
|
||||
@@ -171,18 +210,29 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
*/
|
||||
async start () {
|
||||
if (this.components.getMetrics() != null) {
|
||||
this.timer = this.timer ?? retimer(this._checkMetrics, this.init.pollInterval)
|
||||
this.timer = this.timer ?? retimer(this._checkMetrics, this.opts.pollInterval)
|
||||
}
|
||||
|
||||
// latency monitor
|
||||
this.latencyMonitor.start()
|
||||
this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
|
||||
this.latencyMonitor.addEventListener('data', this._onLatencyMeasure)
|
||||
await this.dialer.start()
|
||||
|
||||
this.started = true
|
||||
log('started')
|
||||
}
|
||||
|
||||
async afterStart () {
|
||||
this.components.getUpgrader().addEventListener('connection', this.onConnect)
|
||||
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect)
|
||||
}
|
||||
|
||||
async beforeStop () {
|
||||
this.components.getUpgrader().removeEventListener('connection', this.onConnect)
|
||||
this.components.getUpgrader().removeEventListener('connectionEnd', this.onDisconnect)
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the Connection Manager
|
||||
*/
|
||||
@@ -191,6 +241,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
|
||||
this.latencyMonitor.removeEventListener('data', this._onLatencyMeasure)
|
||||
this.latencyMonitor.stop()
|
||||
await this.dialer.stop()
|
||||
|
||||
this.started = false
|
||||
await this._close()
|
||||
@@ -202,10 +253,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
*/
|
||||
async _close () {
|
||||
// Close all connections we're tracking
|
||||
const tasks = []
|
||||
const tasks: Array<Promise<void>> = []
|
||||
for (const connectionList of this.connections.values()) {
|
||||
for (const connection of connectionList) {
|
||||
tasks.push(connection.close())
|
||||
tasks.push((async () => {
|
||||
try {
|
||||
await connection.close()
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
})())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,23 +295,29 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
if (metrics != null) {
|
||||
try {
|
||||
const movingAverages = metrics.getGlobal().getMovingAverages()
|
||||
const received = movingAverages.dataReceived[this.init.movingAverageInterval].movingAverage
|
||||
const received = movingAverages.dataReceived[this.opts.movingAverageInterval].movingAverage
|
||||
await this._checkMaxLimit('maxReceivedData', received)
|
||||
const sent = movingAverages.dataSent[this.init.movingAverageInterval].movingAverage
|
||||
const sent = movingAverages.dataSent[this.opts.movingAverageInterval].movingAverage
|
||||
await this._checkMaxLimit('maxSentData', sent)
|
||||
const total = received + sent
|
||||
await this._checkMaxLimit('maxData', total)
|
||||
log('metrics update', total)
|
||||
log.trace('metrics update', total)
|
||||
} finally {
|
||||
this.timer = retimer(this._checkMetrics, this.init.pollInterval)
|
||||
this.timer = retimer(this._checkMetrics, this.opts.pollInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onConnect (evt: CustomEvent<Connection>) {
|
||||
void this._onConnect(evt).catch(err => {
|
||||
log.error(err)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks the incoming connection and check the connection limit
|
||||
*/
|
||||
async onConnect (evt: CustomEvent<Connection>) {
|
||||
async _onConnect (evt: CustomEvent<Connection>) {
|
||||
const { detail: connection } = evt
|
||||
|
||||
if (!this.started) {
|
||||
@@ -267,8 +330,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
const peerIdStr = peerId.toString()
|
||||
const storedConns = this.connections.get(peerIdStr)
|
||||
|
||||
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
|
||||
|
||||
if (storedConns != null) {
|
||||
storedConns.push(connection)
|
||||
} else {
|
||||
@@ -280,10 +341,11 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
}
|
||||
|
||||
if (!this.peerValues.has(peerIdStr)) {
|
||||
this.peerValues.set(peerIdStr, this.init.defaultPeerValue)
|
||||
this.peerValues.set(peerIdStr, this.opts.defaultPeerValue)
|
||||
}
|
||||
|
||||
await this._checkMaxLimit('maxConnections', this.getConnectionList().length)
|
||||
await this._checkMaxLimit('maxConnections', this.getConnections().length)
|
||||
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -312,35 +374,64 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
}
|
||||
}
|
||||
|
||||
getConnectionMap (): Map<string, Connection[]> {
|
||||
return this.connections
|
||||
}
|
||||
|
||||
getConnectionList (): Connection[] {
|
||||
let output: Connection[] = []
|
||||
|
||||
for (const connections of this.connections.values()) {
|
||||
output = output.concat(connections)
|
||||
getConnections (peerId?: PeerId): Connection[] {
|
||||
if (peerId != null) {
|
||||
return this.connections.get(peerId.toString()) ?? []
|
||||
}
|
||||
|
||||
return output
|
||||
}
|
||||
let conns: Connection[] = []
|
||||
|
||||
getConnections (peerId: PeerId): Connection[] {
|
||||
return this.connections.get(peerId.toString()) ?? []
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a connection with a peer
|
||||
*/
|
||||
getConnection (peerId: PeerId): Connection | undefined {
|
||||
const connections = this.getAll(peerId)
|
||||
|
||||
if (connections.length > 0) {
|
||||
return connections[0]
|
||||
for (const c of this.connections.values()) {
|
||||
conns = conns.concat(c)
|
||||
}
|
||||
|
||||
return undefined
|
||||
return conns
|
||||
}
|
||||
|
||||
async openConnection (peerId: PeerId, options?: AbortOptions): Promise<Connection> {
|
||||
log('dial to %p', peerId)
|
||||
const existingConnections = this.getConnections(peerId)
|
||||
|
||||
if (existingConnections.length > 0) {
|
||||
log('had an existing connection to %p', peerId)
|
||||
|
||||
return existingConnections[0]
|
||||
}
|
||||
|
||||
const connection = await this.dialer.dial(peerId, options)
|
||||
let peerConnections = this.connections.get(peerId.toString())
|
||||
|
||||
if (peerConnections == null) {
|
||||
peerConnections = []
|
||||
this.connections.set(peerId.toString(), peerConnections)
|
||||
}
|
||||
|
||||
// we get notified of connections via the Upgrader emitting "connection"
|
||||
// events, double check we aren't already tracking this connection before
|
||||
// storing it
|
||||
let trackedConnection = false
|
||||
|
||||
for (const conn of peerConnections) {
|
||||
if (conn.id === connection.id) {
|
||||
trackedConnection = true
|
||||
}
|
||||
}
|
||||
|
||||
if (!trackedConnection) {
|
||||
peerConnections.push(connection)
|
||||
}
|
||||
|
||||
return connection
|
||||
}
|
||||
|
||||
async closeConnections (peerId: PeerId): Promise<void> {
|
||||
const connections = this.connections.get(peerId.toString()) ?? []
|
||||
|
||||
await Promise.all(
|
||||
connections.map(async connection => {
|
||||
return await connection.close()
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -378,7 +469,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
* If the `value` of `name` has exceeded its limit, maybe close a connection
|
||||
*/
|
||||
async _checkMaxLimit (name: keyof ConnectionManagerInit, value: number) {
|
||||
const limit = this.init[name]
|
||||
const limit = this.opts[name]
|
||||
log.trace('checking limit of %s. current value: %d of %d', name, value, limit)
|
||||
if (value > limit) {
|
||||
log('%s: limit exceeded: %p, %d', this.components.getPeerId(), name, value)
|
||||
@@ -391,7 +482,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
|
||||
* to the lowest valued peer.
|
||||
*/
|
||||
async _maybeDisconnectOne () {
|
||||
if (this.init.minConnections < this.connections.size) {
|
||||
if (this.opts.minConnections < this.connections.size) {
|
||||
const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1])))
|
||||
|
||||
log('%p: sorted peer values: %j', this.components.getPeerId(), peerValues)
|
||||
|
@@ -2,7 +2,7 @@
|
||||
* This code is based on `latency-monitor` (https://github.com/mlucool/latency-monitor) by `mlucool` (https://github.com/mlucool), available under Apache License 2.0 (https://github.com/mlucool/latency-monitor/blob/master/LICENSE)
|
||||
*/
|
||||
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { VisibilityChangeEmitter } from './visibility-change-emitter.js'
|
||||
import { logger } from '@libp2p/logger'
|
||||
|
||||
|
@@ -2,7 +2,7 @@
|
||||
* This code is based on `latency-monitor` (https://github.com/mlucool/latency-monitor) by `mlucool` (https://github.com/mlucool), available under Apache License 2.0 (https://github.com/mlucool/latency-monitor/blob/master/LICENSE)
|
||||
*/
|
||||
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { logger } from '@libp2p/logger'
|
||||
|
||||
const log = logger('libp2p:connection-manager:latency-monitor:visibility-change-emitter')
|
||||
|
@@ -9,7 +9,8 @@ import drain from 'it-drain'
|
||||
import merge from 'it-merge'
|
||||
import { pipe } from 'it-pipe'
|
||||
import type { ContentRouting } from '@libp2p/interfaces/content-routing'
|
||||
import type { AbortOptions, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { CID } from 'multiformats/cid'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
||||
|
60
src/dht/dummy-dht.ts
Normal file
60
src/dht/dummy-dht.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import type { DualDHT, QueryEvent, SingleDHT } from '@libp2p/interfaces/dht'
|
||||
import type { PeerDiscoveryEvents } from '@libp2p/interfaces/peer-discovery'
|
||||
import errCode from 'err-code'
|
||||
import { messages, codes } from '../errors.js'
|
||||
import { EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { symbol } from '@libp2p/interfaces/peer-discovery'
|
||||
|
||||
export class DummyDHT extends EventEmitter<PeerDiscoveryEvents> implements DualDHT {
|
||||
get [symbol] (): true {
|
||||
return true
|
||||
}
|
||||
|
||||
get [Symbol.toStringTag] () {
|
||||
return '@libp2p/dummy-dht'
|
||||
}
|
||||
|
||||
get wan (): SingleDHT {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
get lan (): SingleDHT {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
get (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
findProviders (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
findPeer (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
getClosestPeers (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
provide (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
put (): AsyncIterable<QueryEvent> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
async getMode (): Promise<'client' | 'server'> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
async setMode (): Promise<void> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
|
||||
async refreshRoutingTable (): Promise<void> {
|
||||
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
|
||||
}
|
||||
}
|
@@ -1,6 +1,7 @@
|
||||
export enum messages {
|
||||
NOT_STARTED_YET = 'The libp2p node is not started yet',
|
||||
DHT_DISABLED = 'DHT is not available',
|
||||
PUBSUB_DISABLED = 'PubSub is not available',
|
||||
CONN_ENCRYPTION_REQUIRED = 'At least one connection encryption module is required',
|
||||
ERR_TRANSPORTS_REQUIRED = 'At least one transport module is required',
|
||||
ERR_PROTECTOR_REQUIRED = 'Private network is enforced, but no protector was provided',
|
||||
@@ -9,6 +10,7 @@ export enum messages {
|
||||
|
||||
export enum codes {
|
||||
DHT_DISABLED = 'ERR_DHT_DISABLED',
|
||||
ERR_PUBSUB_DISABLED = 'ERR_PUBSUB_DISABLED',
|
||||
PUBSUB_NOT_STARTED = 'ERR_PUBSUB_NOT_STARTED',
|
||||
DHT_NOT_STARTED = 'ERR_DHT_NOT_STARTED',
|
||||
CONN_ENCRYPTION_REQUIRED = 'ERR_CONN_ENCRYPTION_REQUIRED',
|
||||
@@ -67,5 +69,6 @@ export enum codes {
|
||||
ERR_INVALID_PASS_LENGTH = 'ERR_INVALID_PASS_LENGTH',
|
||||
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
|
||||
ERR_WRONG_PING_ACK = 'ERR_WRONG_PING_ACK',
|
||||
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD'
|
||||
ERR_INVALID_RECORD = 'ERR_INVALID_RECORD',
|
||||
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED'
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
|
||||
// https://github.com/libp2p/specs/tree/master/fetch#wire-protocol
|
||||
export const PROTOCOL = '/libp2p/fetch/0.0.1'
|
||||
export const PROTOCOL_VERSION = '0.0.1'
|
||||
export const PROTOCOL_NAME = 'fetch'
|
||||
|
@@ -4,16 +4,19 @@ import { codes } from '../errors.js'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { FetchRequest, FetchResponse } from './pb/proto.js'
|
||||
import { handshake } from 'it-handshake'
|
||||
import { PROTOCOL } from './constants.js'
|
||||
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
|
||||
const log = logger('libp2p:fetch')
|
||||
|
||||
export interface FetchInit {
|
||||
export interface FetchServiceInit {
|
||||
protocolPrefix: string
|
||||
}
|
||||
|
||||
@@ -33,15 +36,15 @@ export interface LookupFunction {
|
||||
* by a fixed prefix that all keys that should be routed to that lookup function will start with.
|
||||
*/
|
||||
export class FetchService implements Startable {
|
||||
public readonly protocol: string
|
||||
private readonly components: Components
|
||||
private readonly lookupFunctions: Map<string, LookupFunction>
|
||||
private readonly protocol: string
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: FetchInit) {
|
||||
constructor (components: Components, init: FetchServiceInit) {
|
||||
this.started = false
|
||||
this.components = components
|
||||
this.protocol = PROTOCOL
|
||||
this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
|
||||
this.lookupFunctions = new Map() // Maps key prefix to value lookup function
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
}
|
||||
@@ -67,12 +70,19 @@ export class FetchService implements Startable {
|
||||
/**
|
||||
* Sends a request to fetch the value associated with the given key from the given peer
|
||||
*/
|
||||
async fetch (peer: PeerId, key: string): Promise<Uint8Array | null> {
|
||||
async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
|
||||
log('dialing %s to %p', this.protocol, peer)
|
||||
|
||||
const connection = await this.components.getDialer().dial(peer)
|
||||
const { stream } = await connection.newStream([this.protocol])
|
||||
const shake = handshake(stream)
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
const { stream } = await connection.newStream([this.protocol], options)
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
|
||||
const shake = handshake(source)
|
||||
|
||||
// send message
|
||||
shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice())
|
||||
|
@@ -2,13 +2,14 @@
|
||||
/* eslint-disable @typescript-eslint/no-namespace */
|
||||
|
||||
import { encodeMessage, decodeMessage, message, string, enumeration, bytes } from 'protons-runtime'
|
||||
import type { Codec } from 'protons-runtime'
|
||||
|
||||
export interface FetchRequest {
|
||||
identifier: string
|
||||
}
|
||||
|
||||
export namespace FetchRequest {
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<FetchRequest> => {
|
||||
return message<FetchRequest>({
|
||||
1: { name: 'identifier', codec: string }
|
||||
})
|
||||
@@ -35,13 +36,19 @@ export namespace FetchResponse {
|
||||
ERROR = 'ERROR'
|
||||
}
|
||||
|
||||
enum __StatusCodeValues {
|
||||
OK = 0,
|
||||
NOT_FOUND = 1,
|
||||
ERROR = 2
|
||||
}
|
||||
|
||||
export namespace StatusCode {
|
||||
export const codec = () => {
|
||||
return enumeration<typeof StatusCode>(StatusCode)
|
||||
return enumeration<typeof StatusCode>(__StatusCodeValues)
|
||||
}
|
||||
}
|
||||
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<FetchResponse> => {
|
||||
return message<FetchResponse>({
|
||||
1: { name: 'status', codec: FetchResponse.StatusCode.codec() },
|
||||
2: { name: 'data', codec: bytes }
|
||||
|
@@ -2,8 +2,6 @@ import { logger } from '@libp2p/logger'
|
||||
import errCode from 'err-code'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import { pipe } from 'it-pipe'
|
||||
import all from 'it-all'
|
||||
import take from 'it-take'
|
||||
import drain from 'it-drain'
|
||||
import first from 'it-first'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
@@ -21,13 +19,19 @@ import {
|
||||
} from './consts.js'
|
||||
import { codes } from '../errors.js'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import { peerIdFromKeys, peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { Connection, Stream } from '@libp2p/interfaces/connection'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import { peerIdFromKeys } from '@libp2p/peer-id'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
|
||||
const log = logger('libp2p:identify')
|
||||
|
||||
const IDENTIFY_TIMEOUT = 30000
|
||||
|
||||
export interface HostProperties {
|
||||
agentVersion: string
|
||||
}
|
||||
@@ -35,6 +39,7 @@ export interface HostProperties {
|
||||
export interface IdentifyServiceInit {
|
||||
protocolPrefix: string
|
||||
host: HostProperties
|
||||
timeout?: number
|
||||
}
|
||||
|
||||
export class IdentifyService implements Startable {
|
||||
@@ -46,11 +51,13 @@ export class IdentifyService implements Startable {
|
||||
agentVersion: string
|
||||
}
|
||||
|
||||
private readonly init: IdentifyServiceInit
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: IdentifyServiceInit) {
|
||||
this.components = components
|
||||
this.started = false
|
||||
this.init = init
|
||||
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
|
||||
@@ -128,8 +135,17 @@ export class IdentifyService implements Startable {
|
||||
const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId())
|
||||
|
||||
const pushes = connections.map(async connection => {
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
let stream: Stream | undefined
|
||||
|
||||
try {
|
||||
const { stream } = await connection.newStream([this.identifyPushProtocolStr])
|
||||
const data = await connection.newStream([this.identifyPushProtocolStr], {
|
||||
signal: timeoutController.signal
|
||||
})
|
||||
stream = data.stream
|
||||
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
await pipe(
|
||||
[Identify.encode({
|
||||
@@ -138,12 +154,18 @@ export class IdentifyService implements Startable {
|
||||
protocols
|
||||
})],
|
||||
lp.encode(),
|
||||
stream,
|
||||
source,
|
||||
drain
|
||||
)
|
||||
} catch (err: any) {
|
||||
// Just log errors
|
||||
log.error('could not push identify update to peer', err)
|
||||
} finally {
|
||||
if (stream != null) {
|
||||
stream.close()
|
||||
}
|
||||
|
||||
timeoutController.clear()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -161,45 +183,58 @@ export class IdentifyService implements Startable {
|
||||
|
||||
const connections: Connection[] = []
|
||||
|
||||
for (const [peerIdStr, conns] of this.components.getConnectionManager().getConnectionMap().entries()) {
|
||||
const peerId = peerIdFromString(peerIdStr)
|
||||
for (const conn of this.components.getConnectionManager().getConnections()) {
|
||||
const peerId = conn.remotePeer
|
||||
const peer = await this.components.getPeerStore().get(peerId)
|
||||
|
||||
if (!peer.protocols.includes(this.identifyPushProtocolStr)) {
|
||||
continue
|
||||
}
|
||||
|
||||
connections.push(...conns)
|
||||
connections.push(conn)
|
||||
}
|
||||
|
||||
await this.push(connections)
|
||||
}
|
||||
|
||||
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
|
||||
const { stream } = await connection.newStream([this.identifyProtocolStr], options)
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
|
||||
try {
|
||||
const data = await pipe(
|
||||
[],
|
||||
source,
|
||||
lp.decode(),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
|
||||
if (data == null) {
|
||||
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
|
||||
}
|
||||
|
||||
try {
|
||||
return Identify.decode(data)
|
||||
} catch (err: any) {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
} finally {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests the `Identify` message from peer associated with the given `connection`.
|
||||
* If the identified peer does not match the `PeerId` associated with the connection,
|
||||
* an error will be thrown.
|
||||
*/
|
||||
async identify (connection: Connection): Promise<void> {
|
||||
const { stream } = await connection.newStream([this.identifyProtocolStr])
|
||||
const [data] = await pipe(
|
||||
[],
|
||||
stream,
|
||||
lp.decode(),
|
||||
(source) => take(source, 1),
|
||||
async (source) => await all(source)
|
||||
)
|
||||
|
||||
if (data == null) {
|
||||
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
|
||||
}
|
||||
|
||||
let message: Identify
|
||||
try {
|
||||
message = Identify.decode(data)
|
||||
} catch (err: any) {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
async identify (connection: Connection, options: AbortOptions = {}): Promise<void> {
|
||||
const message = await this._identify(connection, options)
|
||||
|
||||
const {
|
||||
publicKey,
|
||||
@@ -308,6 +343,8 @@ export class IdentifyService implements Startable {
|
||||
*/
|
||||
async _handleIdentify (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
|
||||
try {
|
||||
const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0)
|
||||
const peerData = await this.components.getPeerStore().get(this.components.getPeerId())
|
||||
@@ -335,14 +372,20 @@ export class IdentifyService implements Startable {
|
||||
protocols: peerData.protocols
|
||||
})
|
||||
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
await pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
source,
|
||||
drain
|
||||
)
|
||||
} catch (err: any) {
|
||||
log.error('could not respond to identify request', err)
|
||||
} finally {
|
||||
stream.close()
|
||||
timeoutController.clear()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,12 +394,16 @@ export class IdentifyService implements Startable {
|
||||
*/
|
||||
async _handlePush (data: IncomingStreamData) {
|
||||
const { connection, stream } = data
|
||||
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
|
||||
|
||||
let message: Identify | undefined
|
||||
try {
|
||||
// make stream abortable
|
||||
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
|
||||
|
||||
const data = await pipe(
|
||||
[],
|
||||
stream,
|
||||
source,
|
||||
lp.decode(),
|
||||
async (source) => await first(source)
|
||||
)
|
||||
@@ -366,6 +413,9 @@ export class IdentifyService implements Startable {
|
||||
}
|
||||
} catch (err: any) {
|
||||
return log.error('received invalid message', err)
|
||||
} finally {
|
||||
stream.close()
|
||||
timeoutController.clear()
|
||||
}
|
||||
|
||||
if (message == null) {
|
||||
|
@@ -2,6 +2,7 @@
|
||||
/* eslint-disable @typescript-eslint/no-namespace */
|
||||
|
||||
import { encodeMessage, decodeMessage, message, string, bytes } from 'protons-runtime'
|
||||
import type { Codec } from 'protons-runtime'
|
||||
|
||||
export interface Identify {
|
||||
protocolVersion?: string
|
||||
@@ -14,7 +15,7 @@ export interface Identify {
|
||||
}
|
||||
|
||||
export namespace Identify {
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<Identify> => {
|
||||
return message<Identify>({
|
||||
5: { name: 'protocolVersion', codec: string, optional: true },
|
||||
6: { name: 'agentVersion', codec: string, optional: true },
|
||||
|
59
src/index.ts
59
src/index.ts
@@ -1,8 +1,10 @@
|
||||
import { createLibp2pNode } from './libp2p.js'
|
||||
import type { AbortOptions, EventEmitter, RecursivePartial, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions, RecursivePartial } from '@libp2p/interfaces'
|
||||
import type { EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { FaultTolerance } from './transport-manager.js'
|
||||
import type { HostProperties } from './identify/index.js'
|
||||
import type { IdentifyServiceInit } from './identify/index.js'
|
||||
import type { DualDHT } from '@libp2p/interfaces/dht'
|
||||
import type { Datastore } from 'interface-datastore'
|
||||
import type { PeerStore, PeerStoreInit } from '@libp2p/interfaces/peer-store'
|
||||
@@ -16,11 +18,14 @@ import type { ConnectionEncrypter } from '@libp2p/interfaces/connection-encrypte
|
||||
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
|
||||
import type { ContentRouting } from '@libp2p/interfaces/content-routing'
|
||||
import type { PubSub } from '@libp2p/interfaces/pubsub'
|
||||
import type { ConnectionManager, Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
|
||||
import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics'
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
import type { DialerInit } from '@libp2p/interfaces/dialer'
|
||||
import type { KeyChain } from './keychain/index.js'
|
||||
import type { ConnectionManagerInit } from './connection-manager/index.js'
|
||||
import type { PingServiceInit } from './ping/index.js'
|
||||
import type { FetchServiceInit } from './fetch/index.js'
|
||||
|
||||
export interface PersistentPeerStoreOptions {
|
||||
threshold?: number
|
||||
@@ -71,29 +76,6 @@ export interface AddressesConfig {
|
||||
announceFilter: (multiaddrs: Multiaddr[]) => Multiaddr[]
|
||||
}
|
||||
|
||||
export interface ConnectionManagerConfig {
|
||||
/**
|
||||
* If true, try to connect to all discovered peers up to the connection manager limit
|
||||
*/
|
||||
autoDial?: boolean
|
||||
|
||||
/**
|
||||
* The maximum number of connections to keep open
|
||||
*/
|
||||
maxConnections: number
|
||||
|
||||
/**
|
||||
* The minimum number of connections to keep open
|
||||
*/
|
||||
minConnections: number
|
||||
|
||||
/**
|
||||
* How long to wait between attempting to keep our number of concurrent connections
|
||||
* above minConnections
|
||||
*/
|
||||
autoDialInterval: number
|
||||
}
|
||||
|
||||
export interface TransportManagerConfig {
|
||||
faultTolerance?: FaultTolerance
|
||||
}
|
||||
@@ -115,20 +97,20 @@ export interface RefreshManagerConfig {
|
||||
|
||||
export interface Libp2pInit {
|
||||
peerId: PeerId
|
||||
host: HostProperties
|
||||
addresses: AddressesConfig
|
||||
connectionManager: ConnectionManagerConfig
|
||||
connectionManager: ConnectionManagerInit
|
||||
connectionGater: Partial<ConnectionGater>
|
||||
transportManager: TransportManagerConfig
|
||||
datastore: Datastore
|
||||
dialer: DialerInit
|
||||
metrics: MetricsInit
|
||||
peerStore: PeerStoreInit
|
||||
peerRouting: PeerRoutingConfig
|
||||
keychain: KeychainConfig
|
||||
protocolPrefix: string
|
||||
nat: NatManagerConfig
|
||||
relay: RelayConfig
|
||||
identify: IdentifyServiceInit
|
||||
ping: PingServiceInit
|
||||
fetch: FetchServiceInit
|
||||
|
||||
transports: Transport[]
|
||||
streamMuxers?: StreamMuxerFactory[]
|
||||
@@ -154,9 +136,8 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
|
||||
connectionManager: ConnectionManager
|
||||
registrar: Registrar
|
||||
metrics?: Metrics
|
||||
|
||||
pubsub?: PubSub
|
||||
dht?: DualDHT
|
||||
pubsub: PubSub
|
||||
dht: DualDHT
|
||||
|
||||
/**
|
||||
* Load keychain keys from the datastore.
|
||||
@@ -217,12 +198,18 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
|
||||
/**
|
||||
* Pings the given peer in order to obtain the operation latency
|
||||
*/
|
||||
ping: (peer: Multiaddr |PeerId) => Promise<number>
|
||||
ping: (peer: Multiaddr | PeerId, options?: AbortOptions) => Promise<number>
|
||||
|
||||
/**
|
||||
* Sends a request to fetch the value associated with the given key from the given peer.
|
||||
*/
|
||||
fetch: (peer: PeerId | Multiaddr | string, key: string) => Promise<Uint8Array | null>
|
||||
fetch: (peer: PeerId | Multiaddr | string, key: string, options?: AbortOptions) => Promise<Uint8Array | null>
|
||||
|
||||
/**
|
||||
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA' type
|
||||
* this may mean searching the DHT if the key is not present in the KeyStore.
|
||||
*/
|
||||
getPublicKey: (peer: PeerId, options?: AbortOptions) => Promise<Uint8Array>
|
||||
}
|
||||
|
||||
export type Libp2pOptions = RecursivePartial<Libp2pInit>
|
||||
|
@@ -2,6 +2,7 @@
|
||||
/* eslint-disable @typescript-eslint/no-namespace */
|
||||
|
||||
import { encodeMessage, decodeMessage, message, bytes, enumeration } from 'protons-runtime'
|
||||
import type { Codec } from 'protons-runtime'
|
||||
|
||||
export interface Exchange {
|
||||
id?: Uint8Array
|
||||
@@ -9,7 +10,7 @@ export interface Exchange {
|
||||
}
|
||||
|
||||
export namespace Exchange {
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<Exchange> => {
|
||||
return message<Exchange>({
|
||||
1: { name: 'id', codec: bytes, optional: true },
|
||||
2: { name: 'pubkey', codec: PublicKey.codec(), optional: true }
|
||||
@@ -32,19 +33,25 @@ export enum KeyType {
|
||||
ECDSA = 'ECDSA'
|
||||
}
|
||||
|
||||
export namespace KeyType {
|
||||
export const codec = () => {
|
||||
return enumeration<typeof KeyType>(KeyType)
|
||||
}
|
||||
enum __KeyTypeValues {
|
||||
RSA = 0,
|
||||
Ed25519 = 1,
|
||||
Secp256k1 = 2,
|
||||
ECDSA = 3
|
||||
}
|
||||
|
||||
export namespace KeyType {
|
||||
export const codec = () => {
|
||||
return enumeration<typeof KeyType>(__KeyTypeValues)
|
||||
}
|
||||
}
|
||||
export interface PublicKey {
|
||||
Type: KeyType
|
||||
Data: Uint8Array
|
||||
}
|
||||
|
||||
export namespace PublicKey {
|
||||
export const codec = () => {
|
||||
export const codec = (): Codec<PublicKey> => {
|
||||
return message<PublicKey>({
|
||||
1: { name: 'Type', codec: KeyType.codec() },
|
||||
2: { name: 'Data', codec: bytes }
|
||||
|
@@ -9,7 +9,6 @@ import errCode from 'err-code'
|
||||
import { codes } from '../errors.js'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import 'node-forge/lib/sha512.js'
|
||||
import { generateKeyPair, importKey, unmarshalPrivateKey } from '@libp2p/crypto/keys'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
156
src/libp2p.ts
156
src/libp2p.ts
@@ -1,5 +1,7 @@
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { AbortOptions, EventEmitter, Startable, CustomEvent, isStartable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events'
|
||||
import { Startable, isStartable } from '@libp2p/interfaces/startable'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { DefaultPeerRouting } from './peer-routing.js'
|
||||
@@ -11,7 +13,6 @@ import { DefaultConnectionManager } from './connection-manager/index.js'
|
||||
import { AutoDialler } from './connection-manager/auto-dialler.js'
|
||||
import { Circuit } from './circuit/transport.js'
|
||||
import { Relay } from './circuit/index.js'
|
||||
import { DefaultDialer } from './dialer/index.js'
|
||||
import { KeyChain } from './keychain/index.js'
|
||||
import { DefaultMetrics } from './metrics/index.js'
|
||||
import { DefaultTransportManager } from './transport-manager.js'
|
||||
@@ -25,14 +26,15 @@ import { PeerRecordUpdater } from './peer-record-updater.js'
|
||||
import { DHTPeerRouting } from './dht/dht-peer-routing.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DHTContentRouting } from './dht/dht-content-routing.js'
|
||||
import { AutoDialer } from './dialer/auto-dialer.js'
|
||||
import { AutoDialer } from './connection-manager/dialer/auto-dialer.js'
|
||||
import { Initializable, Components, isInitializable } from '@libp2p/interfaces/components'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
|
||||
import type { ContentRouting } from '@libp2p/interfaces/content-routing'
|
||||
import type { PubSub } from '@libp2p/interfaces/pubsub'
|
||||
import type { ConnectionManager, Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
|
||||
import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js'
|
||||
import { validateConfig } from './config.js'
|
||||
@@ -44,13 +46,16 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import errCode from 'err-code'
|
||||
import { unmarshalPublicKey } from '@libp2p/crypto/keys'
|
||||
import type { Metrics } from '@libp2p/interfaces/metrics'
|
||||
import { DummyDHT } from './dht/dummy-dht.js'
|
||||
import { DummyPubSub } from './pubsub/dummy-pubsub.js'
|
||||
import { PeerSet } from '@libp2p/peer-collections'
|
||||
|
||||
const log = logger('libp2p')
|
||||
|
||||
export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
public peerId: PeerId
|
||||
public dht?: DualDHT
|
||||
public pubsub?: PubSub
|
||||
public dht: DualDHT
|
||||
public pubsub: PubSub
|
||||
public identifyService?: IdentifyService
|
||||
public fetchService: FetchService
|
||||
public pingService: PingService
|
||||
@@ -70,34 +75,40 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
constructor (init: Libp2pInit) {
|
||||
super()
|
||||
|
||||
this.services = []
|
||||
this.initializables = []
|
||||
this.started = false
|
||||
this.peerId = init.peerId
|
||||
this.components = new Components({
|
||||
peerId: init.peerId,
|
||||
datastore: init.datastore ?? new MemoryDatastore()
|
||||
datastore: init.datastore ?? new MemoryDatastore(),
|
||||
connectionGater: {
|
||||
denyDialPeer: async () => await Promise.resolve(false),
|
||||
denyDialMultiaddr: async () => await Promise.resolve(false),
|
||||
denyInboundConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundConnection: async () => await Promise.resolve(false),
|
||||
denyInboundEncryptedConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundEncryptedConnection: async () => await Promise.resolve(false),
|
||||
denyInboundUpgradedConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundUpgradedConnection: async () => await Promise.resolve(false),
|
||||
filterMultiaddrForPeer: async () => await Promise.resolve(true),
|
||||
...init.connectionGater
|
||||
}
|
||||
})
|
||||
this.components.setPeerStore(new PersistentPeerStore({
|
||||
addressFilter: this.components.getConnectionGater().filterMultiaddrForPeer,
|
||||
...init.peerStore
|
||||
}))
|
||||
|
||||
this.services = [
|
||||
this.components
|
||||
]
|
||||
|
||||
// Create Metrics
|
||||
if (init.metrics.enabled) {
|
||||
this.metrics = this.components.setMetrics(this.configureComponent(new DefaultMetrics(init.metrics)))
|
||||
this.metrics = this.components.setMetrics(new DefaultMetrics(init.metrics))
|
||||
}
|
||||
|
||||
this.components.setConnectionGater(this.configureComponent({
|
||||
denyDialPeer: async () => await Promise.resolve(false),
|
||||
denyDialMultiaddr: async () => await Promise.resolve(false),
|
||||
denyInboundConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundConnection: async () => await Promise.resolve(false),
|
||||
denyInboundEncryptedConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundEncryptedConnection: async () => await Promise.resolve(false),
|
||||
denyInboundUpgradedConnection: async () => await Promise.resolve(false),
|
||||
denyOutboundUpgradedConnection: async () => await Promise.resolve(false),
|
||||
filterMultiaddrForPeer: async () => await Promise.resolve(true),
|
||||
...init.connectionGater
|
||||
}))
|
||||
|
||||
this.peerStore = this.components.setPeerStore(this.configureComponent(new PersistentPeerStore(this.components, init.peerStore)))
|
||||
this.peerStore = this.components.getPeerStore()
|
||||
|
||||
this.peerStore.addEventListener('peer', evt => {
|
||||
const { detail: peerData } = evt
|
||||
@@ -107,32 +118,30 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
|
||||
// Set up connection protector if configured
|
||||
if (init.connectionProtector != null) {
|
||||
this.components.setConnectionProtector(this.configureComponent(init.connectionProtector))
|
||||
this.components.setConnectionProtector(init.connectionProtector)
|
||||
}
|
||||
|
||||
// Set up the Upgrader
|
||||
this.components.setUpgrader(this.configureComponent(new DefaultUpgrader(this.components, {
|
||||
this.components.setUpgrader(new DefaultUpgrader(this.components, {
|
||||
connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)),
|
||||
muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component))
|
||||
})))
|
||||
}))
|
||||
|
||||
// Create the Connection Manager
|
||||
this.connectionManager = this.components.setConnectionManager(this.configureComponent(new DefaultConnectionManager(this.components, init.connectionManager)))
|
||||
this.connectionManager = this.components.setConnectionManager(new DefaultConnectionManager(init.connectionManager))
|
||||
|
||||
// Create the Registrar
|
||||
this.registrar = this.components.setRegistrar(this.configureComponent(new DefaultRegistrar(this.components)))
|
||||
this.registrar = this.components.setRegistrar(new DefaultRegistrar(this.components))
|
||||
|
||||
// Setup the transport manager
|
||||
this.components.setTransportManager(this.configureComponent(new DefaultTransportManager(this.components, init.transportManager)))
|
||||
this.components.setTransportManager(new DefaultTransportManager(this.components, init.transportManager))
|
||||
|
||||
// Addresses {listen, announce, noAnnounce}
|
||||
this.components.setAddressManager(this.configureComponent(new DefaultAddressManager(this.components, init.addresses)))
|
||||
this.components.setAddressManager(new DefaultAddressManager(this.components, init.addresses))
|
||||
|
||||
// update our peer record when addresses change
|
||||
this.configureComponent(new PeerRecordUpdater(this.components))
|
||||
|
||||
this.components.setDialer(this.configureComponent(new DefaultDialer(this.components, init.dialer)))
|
||||
|
||||
this.configureComponent(new AutoDialler(this.components, {
|
||||
enabled: init.connectionManager.autoDial,
|
||||
minConnections: init.connectionManager.minConnections,
|
||||
@@ -157,22 +166,23 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
if (init.streamMuxers != null && init.streamMuxers.length > 0) {
|
||||
// Add the identify service since we can multiplex
|
||||
this.identifyService = new IdentifyService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix,
|
||||
host: {
|
||||
agentVersion: init.host.agentVersion
|
||||
}
|
||||
...init.identify
|
||||
})
|
||||
this.configureComponent(this.identifyService)
|
||||
}
|
||||
|
||||
// dht provided components (peerRouting, contentRouting, dht)
|
||||
if (init.dht != null) {
|
||||
this.dht = this.components.setDHT(this.configureComponent(init.dht))
|
||||
this.dht = this.components.setDHT(init.dht)
|
||||
} else {
|
||||
this.dht = new DummyDHT()
|
||||
}
|
||||
|
||||
// Create pubsub if provided
|
||||
if (init.pubsub != null) {
|
||||
this.pubsub = this.components.setPubSub(this.configureComponent(init.pubsub))
|
||||
this.pubsub = this.components.setPubSub(init.pubsub)
|
||||
} else {
|
||||
this.pubsub = new DummyPubSub()
|
||||
}
|
||||
|
||||
// Attach remaining APIs
|
||||
@@ -180,7 +190,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
|
||||
const peerRouters: PeerRouting[] = (init.peerRouters ?? []).map(component => this.configureComponent(component))
|
||||
|
||||
if (this.dht != null) {
|
||||
if (init.dht != null) {
|
||||
// add dht to routers
|
||||
peerRouters.push(this.configureComponent(new DHTPeerRouting(this.dht)))
|
||||
|
||||
@@ -197,7 +207,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
|
||||
const contentRouters: ContentRouting[] = (init.contentRouters ?? []).map(component => this.configureComponent(component))
|
||||
|
||||
if (this.dht != null) {
|
||||
if (init.dht != null) {
|
||||
// add dht to routers
|
||||
contentRouters.push(this.configureComponent(new DHTContentRouting(this.dht)))
|
||||
}
|
||||
@@ -210,22 +220,23 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
this.components.getTransportManager().add(this.configureComponent(new Circuit()))
|
||||
|
||||
this.configureComponent(new Relay(this.components, {
|
||||
addressSorter: init.dialer.addressSorter,
|
||||
addressSorter: init.connectionManager.addressSorter,
|
||||
...init.relay
|
||||
}))
|
||||
}
|
||||
|
||||
this.fetchService = this.configureComponent(new FetchService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix
|
||||
...init.fetch
|
||||
}))
|
||||
|
||||
this.pingService = this.configureComponent(new PingService(this.components, {
|
||||
protocolPrefix: init.protocolPrefix
|
||||
...init.ping
|
||||
}))
|
||||
|
||||
const autoDialer = this.configureComponent(new AutoDialer(this.components, {
|
||||
enabled: init.connectionManager.autoDial !== false,
|
||||
minConnections: init.connectionManager.minConnections ?? Infinity
|
||||
minConnections: init.connectionManager.minConnections,
|
||||
dialTimeout: init.connectionManager.dialTimeout ?? 30000
|
||||
}))
|
||||
|
||||
this.addEventListener('peer:discovery', evt => {
|
||||
@@ -373,24 +384,41 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
}
|
||||
|
||||
getConnections (peerId?: PeerId): Connection[] {
|
||||
if (peerId == null) {
|
||||
return this.components.getConnectionManager().getConnectionList()
|
||||
}
|
||||
|
||||
return this.components.getConnectionManager().getConnections(peerId)
|
||||
}
|
||||
|
||||
getPeers (): PeerId[] {
|
||||
return this.components.getConnectionManager().getConnectionList()
|
||||
.map(conn => conn.remotePeer)
|
||||
const peerSet = new PeerSet()
|
||||
|
||||
for (const conn of this.components.getConnectionManager().getConnections()) {
|
||||
peerSet.add(conn.remotePeer)
|
||||
}
|
||||
|
||||
return Array.from(peerSet)
|
||||
}
|
||||
|
||||
async dial (peer: PeerId | Multiaddr, options: AbortOptions = {}): Promise<Connection> {
|
||||
return await this.components.getDialer().dial(peer, options)
|
||||
const { id, multiaddrs } = getPeer(peer)
|
||||
|
||||
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
|
||||
|
||||
return await this.components.getConnectionManager().openConnection(id, options)
|
||||
}
|
||||
|
||||
async dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options: AbortOptions = {}) {
|
||||
return await this.components.getDialer().dialProtocol(peer, protocols, options)
|
||||
if (protocols == null) {
|
||||
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
|
||||
}
|
||||
|
||||
protocols = Array.isArray(protocols) ? protocols : [protocols]
|
||||
|
||||
if (protocols.length === 0) {
|
||||
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
|
||||
}
|
||||
|
||||
const connection = await this.dial(peer, options)
|
||||
|
||||
return await connection.newStream(protocols, options)
|
||||
}
|
||||
|
||||
getMultiaddrs (): Multiaddr[] {
|
||||
@@ -400,21 +428,19 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
async hangUp (peer: PeerId | Multiaddr | string): Promise<void> {
|
||||
const { id } = getPeer(peer)
|
||||
|
||||
const connections = this.components.getConnectionManager().getConnections(id)
|
||||
|
||||
await Promise.all(
|
||||
connections.map(async connection => {
|
||||
return await connection.close()
|
||||
})
|
||||
)
|
||||
await this.components.getConnectionManager().closeConnections(id)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the public key for the given peer id
|
||||
*/
|
||||
async getPublicKey (peer: PeerId, options: AbortOptions = {}) {
|
||||
async getPublicKey (peer: PeerId, options: AbortOptions = {}): Promise<Uint8Array> {
|
||||
log('getPublicKey %p', peer)
|
||||
|
||||
if (peer.publicKey != null) {
|
||||
return peer.publicKey
|
||||
}
|
||||
|
||||
const peerInfo = await this.peerStore.get(peer)
|
||||
|
||||
if (peerInfo.pubKey != null) {
|
||||
@@ -437,31 +463,31 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
|
||||
|
||||
await this.peerStore.keyBook.set(peer, event.value)
|
||||
|
||||
return key
|
||||
return key.bytes
|
||||
}
|
||||
}
|
||||
|
||||
throw errCode(new Error(`Node not responding with its public key: ${peer.toString()}`), codes.ERR_INVALID_RECORD)
|
||||
}
|
||||
|
||||
async fetch (peer: PeerId | Multiaddr | string, key: string): Promise<Uint8Array | null> {
|
||||
async fetch (peer: PeerId | Multiaddr | string, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
|
||||
const { id, multiaddrs } = getPeer(peer)
|
||||
|
||||
if (multiaddrs != null) {
|
||||
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
|
||||
}
|
||||
|
||||
return await this.fetchService.fetch(id, key)
|
||||
return await this.fetchService.fetch(id, key, options)
|
||||
}
|
||||
|
||||
async ping (peer: PeerId | Multiaddr | string): Promise<number> {
|
||||
async ping (peer: PeerId | Multiaddr | string, options: AbortOptions = {}): Promise<number> {
|
||||
const { id, multiaddrs } = getPeer(peer)
|
||||
|
||||
if (multiaddrs.length > 0) {
|
||||
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
|
||||
}
|
||||
|
||||
return await this.pingService.ping(id)
|
||||
return await this.pingService.ping(id, options)
|
||||
}
|
||||
|
||||
async handle (protocols: string | string[], handler: StreamHandler): Promise<void> {
|
||||
|
@@ -5,7 +5,7 @@ import { METRICS as defaultOptions } from '../constants.js'
|
||||
import { DefaultStats, StatsInit } from './stats.js'
|
||||
import type { ComponentMetricsUpdate, Metrics, Stats, TrackStreamOptions } from '@libp2p/interfaces/metrics'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
|
||||
const initialCounters: ['dataReceived', 'dataSent'] = [
|
||||
|
@@ -1,4 +1,4 @@
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { createMovingAverage } from './moving-average.js'
|
||||
// @ts-expect-error no types
|
||||
import retimer from 'retimer'
|
||||
|
@@ -7,7 +7,7 @@ import * as pkg from './version.js'
|
||||
import errCode from 'err-code'
|
||||
import { codes } from './errors.js'
|
||||
import { isLoopback } from '@libp2p/utils/multiaddr/is-loopback'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
||||
const log = logger('libp2p:nat')
|
||||
@@ -94,10 +94,14 @@ export class NatManager implements Startable {
|
||||
return this.started
|
||||
}
|
||||
|
||||
start () {}
|
||||
|
||||
/**
|
||||
* Starts the NAT manager
|
||||
* Attempt to use uPnP to configure port mapping using the current gateway.
|
||||
*
|
||||
* Run after start to ensure the transport manager has all addresses configured.
|
||||
*/
|
||||
start () {
|
||||
afterStart () {
|
||||
if (isBrowser || !this.enabled || this.started) {
|
||||
return
|
||||
}
|
||||
@@ -105,7 +109,7 @@ export class NatManager implements Startable {
|
||||
this.started = true
|
||||
|
||||
// done async to not slow down startup
|
||||
this._start().catch((err) => {
|
||||
void this._start().catch((err) => {
|
||||
// hole punching errors are non-fatal
|
||||
log.error(err)
|
||||
})
|
||||
|
@@ -1,6 +1,6 @@
|
||||
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import { logger } from '@libp2p/logger'
|
||||
import { protocols } from '@multiformats/multiaddr'
|
||||
|
||||
|
@@ -17,11 +17,11 @@ import {
|
||||
clearDelayedInterval
|
||||
// @ts-expect-error module with no types
|
||||
} from 'set-delayed-interval'
|
||||
// @ts-expect-error setMaxListeners is missing from the node 16 types
|
||||
import { setMaxListeners } from 'events'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
|
||||
import type { AbortOptions, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
|
||||
|
@@ -8,8 +8,11 @@ import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
|
||||
import { PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION } from './constants.js'
|
||||
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Startable } from '@libp2p/interfaces'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import { abortableDuplex } from 'abortable-iterator'
|
||||
|
||||
const log = logger('libp2p:ping')
|
||||
|
||||
@@ -18,8 +21,8 @@ export interface PingServiceInit {
|
||||
}
|
||||
|
||||
export class PingService implements Startable {
|
||||
public readonly protocol: string
|
||||
private readonly components: Components
|
||||
private readonly protocol: string
|
||||
private started: boolean
|
||||
|
||||
constructor (components: Components, init: PingServiceInit) {
|
||||
@@ -60,24 +63,36 @@ export class PingService implements Startable {
|
||||
* @param {PeerId|Multiaddr} peer
|
||||
* @returns {Promise<number>}
|
||||
*/
|
||||
async ping (peer: PeerId): Promise<number> {
|
||||
async ping (peer: PeerId, options: AbortOptions = {}): Promise<number> {
|
||||
log('dialing %s to %p', this.protocol, peer)
|
||||
|
||||
const { stream } = await this.components.getDialer().dialProtocol(peer, this.protocol)
|
||||
const connection = await this.components.getConnectionManager().openConnection(peer, options)
|
||||
const { stream } = await connection.newStream([this.protocol], options)
|
||||
const start = Date.now()
|
||||
const data = randomBytes(PING_LENGTH)
|
||||
|
||||
const result = await pipe(
|
||||
[data],
|
||||
stream,
|
||||
async (source) => await first(source)
|
||||
)
|
||||
const end = Date.now()
|
||||
let source: Duplex<Uint8Array> = stream
|
||||
|
||||
if (result == null || !uint8ArrayEquals(data, result)) {
|
||||
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
|
||||
// make stream abortable if AbortSignal passed
|
||||
if (options.signal != null) {
|
||||
source = abortableDuplex(stream, options.signal)
|
||||
}
|
||||
|
||||
return end - start
|
||||
try {
|
||||
const result = await pipe(
|
||||
[data],
|
||||
source,
|
||||
async (source) => await first(source)
|
||||
)
|
||||
const end = Date.now()
|
||||
|
||||
if (result == null || !uint8ArrayEquals(data, result)) {
|
||||
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
|
||||
}
|
||||
|
||||
return end - start
|
||||
} finally {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -17,6 +17,8 @@ import type { ConnectionProtector } from '@libp2p/interfaces/connection'
|
||||
|
||||
const log = logger('libp2p:pnet')
|
||||
|
||||
export { generateKey } from './key-generator.js'
|
||||
|
||||
export interface ProtectorInit {
|
||||
enabled?: boolean
|
||||
psk: Uint8Array
|
||||
|
@@ -8,7 +8,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
* @param {Uint8Array} bytes - An object to write the psk into
|
||||
* @returns {void}
|
||||
*/
|
||||
export function generate (bytes: Uint8Array) {
|
||||
export function generateKey (bytes: Uint8Array) {
|
||||
const psk = uint8ArrayToString(randomBytes(KEY_LENGTH), 'base16')
|
||||
const key = uint8ArrayFromString('/key/swarm/psk/1.0.0/\n/base16/\n' + psk)
|
||||
|
||||
|
51
src/pubsub/dummy-pubsub.ts
Normal file
51
src/pubsub/dummy-pubsub.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interfaces/pubsub'
|
||||
import errCode from 'err-code'
|
||||
import { messages, codes } from '../errors.js'
|
||||
|
||||
export class DummyPubSub extends EventEmitter<PubSubEvents> implements PubSub {
|
||||
isStarted (): boolean {
|
||||
return false
|
||||
}
|
||||
|
||||
start (): void | Promise<void> {
|
||||
|
||||
}
|
||||
|
||||
stop (): void | Promise<void> {
|
||||
|
||||
}
|
||||
|
||||
get globalSignaturePolicy (): typeof StrictSign | typeof StrictNoSign {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
get multicodecs (): string[] {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
getPeers (): PeerId[] {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
getTopics (): string[] {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
subscribe (): void {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
unsubscribe (): void {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
getSubscribers (): PeerId[] {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
|
||||
async publish (): Promise<PublishResult> {
|
||||
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
|
||||
}
|
||||
}
|
@@ -33,11 +33,11 @@ export class DefaultRegistrar implements Registrar {
|
||||
this.components = components
|
||||
|
||||
this._onDisconnect = this._onDisconnect.bind(this)
|
||||
this._onConnect = this._onConnect.bind(this)
|
||||
this._onProtocolChange = this._onProtocolChange.bind(this)
|
||||
|
||||
this.components.getConnectionManager().addEventListener('peer:disconnect', this._onDisconnect)
|
||||
this.components.getConnectionManager().addEventListener('peer:connect', this._onConnect)
|
||||
|
||||
// happens after identify
|
||||
this.components.getPeerStore().addEventListener('change:protocols', this._onProtocolChange)
|
||||
}
|
||||
|
||||
@@ -159,22 +159,6 @@ export class DefaultRegistrar implements Registrar {
|
||||
})
|
||||
}
|
||||
|
||||
_onConnect (evt: CustomEvent<Connection>) {
|
||||
const connection = evt.detail
|
||||
|
||||
void this.components.getPeerStore().protoBook.get(connection.remotePeer)
|
||||
.then(peerProtocols => {
|
||||
for (const { topology, protocols } of this.topologies.values()) {
|
||||
if (supportsProtocol(peerProtocols, protocols)) {
|
||||
topology.onConnect(connection.remotePeer, connection)
|
||||
}
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
log.error(err)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a new peer support the multicodecs for this topology
|
||||
*/
|
||||
@@ -192,7 +176,7 @@ export class DefaultRegistrar implements Registrar {
|
||||
|
||||
for (const { topology, protocols } of this.topologies.values()) {
|
||||
if (supportsProtocol(added, protocols)) {
|
||||
const connection = this.components.getConnectionManager().getConnection(peerId)
|
||||
const connection = this.components.getConnectionManager().getConnections(peerId)[0]
|
||||
|
||||
if (connection == null) {
|
||||
continue
|
||||
|
@@ -5,7 +5,9 @@ import errCode from 'err-code'
|
||||
import type { Listener, Transport, TransportManager, TransportManagerEvents } from '@libp2p/interfaces/transport'
|
||||
import type { Multiaddr } from '@multiformats/multiaddr'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import { AbortOptions, CustomEvent, EventEmitter, Startable } from '@libp2p/interfaces'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import { trackedMap } from '@libp2p/tracked-map'
|
||||
|
||||
|
@@ -6,7 +6,7 @@ import { pipe } from 'it-pipe'
|
||||
import mutableProxy from 'mutable-proxy'
|
||||
import { codes } from './errors.js'
|
||||
import { createConnection } from '@libp2p/connection'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
|
||||
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { Connection, ProtocolStream, Stream } from '@libp2p/interfaces/connection'
|
||||
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interfaces/connection-encrypter'
|
||||
@@ -15,6 +15,7 @@ import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { MultiaddrConnection, Upgrader, UpgraderEvents } from '@libp2p/interfaces/transport'
|
||||
import type { Duplex } from 'it-stream-types'
|
||||
import type { Components } from '@libp2p/interfaces/components'
|
||||
import type { AbortOptions } from '@libp2p/interfaces'
|
||||
|
||||
const log = logger('libp2p:upgrader')
|
||||
|
||||
@@ -266,7 +267,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
} = opts
|
||||
|
||||
let muxer: StreamMuxer | undefined
|
||||
let newStream: ((multicodecs: string[]) => Promise<ProtocolStream>) | undefined
|
||||
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<ProtocolStream>) | undefined
|
||||
let connection: Connection // eslint-disable-line prefer-const
|
||||
|
||||
if (muxerFactory != null) {
|
||||
@@ -308,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
}
|
||||
})
|
||||
|
||||
newStream = async (protocols: string[]): Promise<ProtocolStream> => {
|
||||
newStream = async (protocols: string[], options: AbortOptions = {}): Promise<ProtocolStream> => {
|
||||
if (muxer == null) {
|
||||
throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE)
|
||||
}
|
||||
@@ -319,7 +320,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
const metrics = this.components.getMetrics()
|
||||
|
||||
try {
|
||||
let { stream, protocol } = await mss.select(protocols)
|
||||
let { stream, protocol } = await mss.select(protocols, options)
|
||||
|
||||
if (metrics != null) {
|
||||
stream = metrics.trackStream({ stream, remotePeer, protocol })
|
||||
@@ -328,6 +329,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
return { stream: { ...muxedStream, ...stream }, protocol }
|
||||
} catch (err: any) {
|
||||
log.error('could not create new stream', err)
|
||||
|
||||
if (err.code != null) {
|
||||
throw err
|
||||
}
|
||||
|
||||
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||
}
|
||||
}
|
||||
@@ -382,9 +388,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
|
||||
getStreams: () => muxer != null ? muxer.streams : errConnectionNotMultiplexed(),
|
||||
close: async () => {
|
||||
await maConn.close()
|
||||
// Ensure remaining streams are aborted
|
||||
// Ensure remaining streams are closed
|
||||
if (muxer != null) {
|
||||
muxer.streams.map(stream => stream.abort())
|
||||
await Promise.all(muxer.streams.map(async stream => {
|
||||
await stream.close()
|
||||
}))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@@ -18,13 +18,21 @@ describe('Protocol prefix is configurable', () => {
|
||||
it('protocolPrefix is provided', async () => {
|
||||
const testProtocol = 'test-protocol'
|
||||
libp2p = await createLibp2pNode(mergeOptions(baseOptions, {
|
||||
protocolPrefix: testProtocol
|
||||
identify: {
|
||||
protocolPrefix: testProtocol
|
||||
},
|
||||
ping: {
|
||||
protocolPrefix: testProtocol
|
||||
},
|
||||
fetch: {
|
||||
protocolPrefix: testProtocol
|
||||
}
|
||||
}))
|
||||
await libp2p.start()
|
||||
|
||||
const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId)
|
||||
expect(protocols).to.include.members([
|
||||
'/libp2p/fetch/0.0.1',
|
||||
`/${testProtocol}/fetch/0.0.1`,
|
||||
'/libp2p/circuit/relay/0.1.0',
|
||||
`/${testProtocol}/id/1.0.0`,
|
||||
`/${testProtocol}/id/push/1.0.0`,
|
||||
@@ -41,7 +49,8 @@ describe('Protocol prefix is configurable', () => {
|
||||
'/libp2p/circuit/relay/0.1.0',
|
||||
'/ipfs/id/1.0.0',
|
||||
'/ipfs/id/push/1.0.0',
|
||||
'/ipfs/ping/1.0.0'
|
||||
'/ipfs/ping/1.0.0',
|
||||
'/libp2p/fetch/0.0.1'
|
||||
])
|
||||
})
|
||||
})
|
||||
|
@@ -7,7 +7,6 @@ import delay from 'delay'
|
||||
import { createLibp2p, Libp2p } from '../../src/index.js'
|
||||
import { baseOptions, pubsubSubsystemOptions } from './utils.js'
|
||||
import { createPeerId } from '../utils/creators/peer.js'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { FloodSub } from '@libp2p/floodsub'
|
||||
import type { PubSub } from '@libp2p/interfaces/pubsub'
|
||||
@@ -21,14 +20,16 @@ describe('Pubsub subsystem is configurable', () => {
|
||||
}
|
||||
})
|
||||
|
||||
it('should not exist if no module is provided', async () => {
|
||||
it('should throw if no module is provided', async () => {
|
||||
libp2p = await createLibp2p(baseOptions)
|
||||
expect(libp2p.pubsub).to.not.exist()
|
||||
await libp2p.start()
|
||||
expect(() => libp2p.pubsub.getTopics()).to.throw()
|
||||
})
|
||||
|
||||
it('should exist if the module is provided', async () => {
|
||||
it('should not throw if the module is provided', async () => {
|
||||
libp2p = await createLibp2p(pubsubSubsystemOptions)
|
||||
expect(libp2p.pubsub).to.exist()
|
||||
await libp2p.start()
|
||||
expect(libp2p.pubsub.getTopics()).to.be.empty()
|
||||
})
|
||||
|
||||
it('should start and stop by default once libp2p starts', async () => {
|
||||
@@ -39,13 +40,16 @@ describe('Pubsub subsystem is configurable', () => {
|
||||
})
|
||||
|
||||
libp2p = await createLibp2p(customOptions)
|
||||
expect(libp2p.pubsub?.isStarted()).to.equal(false)
|
||||
// @ts-expect-error not part of interface
|
||||
expect(libp2p.pubsub.isStarted()).to.equal(false)
|
||||
|
||||
await libp2p.start()
|
||||
expect(libp2p.pubsub?.isStarted()).to.equal(true)
|
||||
// @ts-expect-error not part of interface
|
||||
expect(libp2p.pubsub.isStarted()).to.equal(true)
|
||||
|
||||
await libp2p.stop()
|
||||
expect(libp2p.pubsub?.isStarted()).to.equal(false)
|
||||
// @ts-expect-error not part of interface
|
||||
expect(libp2p.pubsub.isStarted()).to.equal(false)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -87,16 +91,14 @@ describe('Pubsub subscription handlers adapter', () => {
|
||||
throw new Error('Pubsub was not enabled')
|
||||
}
|
||||
|
||||
pubsub.addEventListener(topic, handler)
|
||||
pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, {
|
||||
detail: uint8ArrayFromString('useless-data')
|
||||
}))
|
||||
pubsub.subscribe(topic)
|
||||
pubsub.addEventListener('message', handler)
|
||||
await pubsub.publish(topic, uint8ArrayFromString('useless-data'))
|
||||
await defer.promise
|
||||
|
||||
pubsub.removeEventListener(topic, handler)
|
||||
pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, {
|
||||
detail: uint8ArrayFromString('useless-data')
|
||||
}))
|
||||
pubsub.unsubscribe(topic)
|
||||
pubsub.removeEventListener('message', handler)
|
||||
await pubsub.publish(topic, uint8ArrayFromString('useless-data'))
|
||||
|
||||
// wait to guarantee that the handler is not called twice
|
||||
await delay(100)
|
||||
|
@@ -5,7 +5,7 @@ import { WebSockets } from '@libp2p/websockets'
|
||||
import * as filters from '@libp2p/websockets/filters'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import mergeOptions from 'merge-options'
|
||||
import type { Message, PubSubInit, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
|
||||
import type { Message, PublishResult, PubSubInit, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
|
||||
import type { Libp2pInit, Libp2pOptions } from '../../src/index.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import * as cborg from 'cborg'
|
||||
@@ -44,11 +44,12 @@ class MockPubSub extends PubSubBaseProtocol {
|
||||
return cborg.encode(rpc)
|
||||
}
|
||||
|
||||
async publishMessage (from: PeerId, message: Message): Promise<void> {
|
||||
async publishMessage (from: PeerId, message: Message): Promise<PublishResult> {
|
||||
const peers = this.getSubscribers(message.topic)
|
||||
const recipients: PeerId[] = []
|
||||
|
||||
if (peers == null || peers.length === 0) {
|
||||
return
|
||||
return { recipients }
|
||||
}
|
||||
|
||||
peers.forEach(id => {
|
||||
@@ -60,8 +61,11 @@ class MockPubSub extends PubSubBaseProtocol {
|
||||
return
|
||||
}
|
||||
|
||||
recipients.push(id)
|
||||
this.send(id, { messages: [message] })
|
||||
})
|
||||
|
||||
return { recipients }
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -7,9 +7,8 @@ import delay from 'delay'
|
||||
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { stubInterface } from 'ts-sinon'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/registrar'
|
||||
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
|
||||
import type { PeerStore, Peer } from '@libp2p/interfaces/peer-store'
|
||||
import type { Dialer } from '@libp2p/interfaces/dialer'
|
||||
|
||||
describe('Auto-dialler', () => {
|
||||
it('should not dial self', async () => {
|
||||
@@ -36,26 +35,24 @@ describe('Auto-dialler', () => {
|
||||
]))
|
||||
|
||||
const connectionManager = stubInterface<ConnectionManager>()
|
||||
connectionManager.getConnectionList.returns([])
|
||||
const dialer = stubInterface<Dialer>()
|
||||
connectionManager.getConnections.returns([])
|
||||
|
||||
const autoDialler = new AutoDialler(new Components({
|
||||
peerId: self.id,
|
||||
peerStore,
|
||||
connectionManager,
|
||||
dialer
|
||||
connectionManager
|
||||
}), {
|
||||
minConnections: 10
|
||||
})
|
||||
|
||||
await autoDialler.start()
|
||||
|
||||
await pWaitFor(() => dialer.dial.callCount === 1)
|
||||
await pWaitFor(() => connectionManager.openConnection.callCount === 1)
|
||||
await delay(1000)
|
||||
|
||||
await autoDialler.stop()
|
||||
|
||||
expect(dialer.dial.callCount).to.equal(1)
|
||||
expect(dialer.dial.calledWith(self.id)).to.be.false()
|
||||
expect(connectionManager.openConnection.callCount).to.equal(1)
|
||||
expect(connectionManager.openConnection.calledWith(self.id)).to.be.false()
|
||||
})
|
||||
})
|
||||
|
@@ -8,7 +8,7 @@ import type { Libp2p } from '../../src/index.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import * as STATUS from '@libp2p/interfaces/connection/status'
|
||||
import { stubInterface } from 'ts-sinon'
|
||||
import type { KeyBook, PeerStore } from '@libp2p/interfaces/peer-store'
|
||||
@@ -18,6 +18,7 @@ import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import delay from 'delay'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import { codes } from '../../src/errors.js'
|
||||
import { start } from '@libp2p/interfaces/startable'
|
||||
|
||||
describe('Connection Manager', () => {
|
||||
let libp2p: Libp2p
|
||||
@@ -50,9 +51,14 @@ describe('Connection Manager', () => {
|
||||
const peerStore = stubInterface<PeerStore>()
|
||||
peerStore.keyBook = stubInterface<KeyBook>()
|
||||
|
||||
const connectionManager = new DefaultConnectionManager(new Components({ upgrader, peerStore }))
|
||||
const connectionManager = new DefaultConnectionManager({
|
||||
maxConnections: 1000,
|
||||
minConnections: 50,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
connectionManager.init(new Components({ upgrader, peerStore }))
|
||||
|
||||
await connectionManager.start()
|
||||
await start(connectionManager)
|
||||
|
||||
const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
|
||||
const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
|
||||
@@ -80,9 +86,14 @@ describe('Connection Manager', () => {
|
||||
const peerStore = stubInterface<PeerStore>()
|
||||
peerStore.keyBook = stubInterface<KeyBook>()
|
||||
|
||||
const connectionManager = new DefaultConnectionManager(new Components({ upgrader, peerStore }))
|
||||
const connectionManager = new DefaultConnectionManager({
|
||||
maxConnections: 1000,
|
||||
minConnections: 50,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
connectionManager.init(new Components({ upgrader, peerStore }))
|
||||
|
||||
await connectionManager.start()
|
||||
await start(connectionManager)
|
||||
|
||||
const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
|
||||
const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
|
||||
@@ -224,11 +235,11 @@ describe('libp2p.connections', () => {
|
||||
await libp2p.start()
|
||||
|
||||
// Wait for peer to connect
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === minConnections)
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === minConnections)
|
||||
|
||||
// Wait more time to guarantee no other connection happened
|
||||
await delay(200)
|
||||
expect(libp2p.components.getConnectionManager().getConnectionMap().size).to.eql(minConnections)
|
||||
expect(libp2p.components.getConnectionManager().getConnections().length).to.eql(minConnections)
|
||||
|
||||
await libp2p.stop()
|
||||
})
|
||||
@@ -257,11 +268,11 @@ describe('libp2p.connections', () => {
|
||||
await libp2p.start()
|
||||
|
||||
// Wait for peer to connect
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === minConnections)
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === minConnections)
|
||||
|
||||
// Should have connected to the peer with protocols
|
||||
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.not.exist()
|
||||
expect(libp2p.components.getConnectionManager().getConnection(nodes[1].peerId)).to.exist()
|
||||
expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.be.empty()
|
||||
expect(libp2p.components.getConnectionManager().getConnections(nodes[1].peerId)).to.not.be.empty()
|
||||
|
||||
await libp2p.stop()
|
||||
})
|
||||
@@ -287,15 +298,15 @@ describe('libp2p.connections', () => {
|
||||
|
||||
// Wait for peer to connect
|
||||
const conn = await libp2p.dial(nodes[0].peerId)
|
||||
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.exist()
|
||||
expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.not.be.empty()
|
||||
|
||||
await conn.close()
|
||||
// Closed
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === 0)
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === 0)
|
||||
// Connected
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === 1)
|
||||
await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === 1)
|
||||
|
||||
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.exist()
|
||||
expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.not.be.empty()
|
||||
|
||||
await libp2p.stop()
|
||||
})
|
||||
@@ -321,9 +332,7 @@ describe('libp2p.connections', () => {
|
||||
await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs())
|
||||
await libp2p.dial(remoteLibp2p.peerId)
|
||||
|
||||
const totalConns = Array.from(libp2p.components.getConnectionManager().getConnectionMap().values())
|
||||
expect(totalConns.length).to.eql(1)
|
||||
const conns = totalConns[0]
|
||||
const conns = libp2p.components.getConnectionManager().getConnections()
|
||||
expect(conns.length).to.eql(1)
|
||||
const conn = conns[0]
|
||||
|
||||
@@ -394,7 +403,7 @@ describe('libp2p.connections', () => {
|
||||
})
|
||||
})
|
||||
await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs())
|
||||
await libp2p.components.getDialer().dial(remoteLibp2p.peerId)
|
||||
await libp2p.components.getConnectionManager().openConnection(remoteLibp2p.peerId)
|
||||
|
||||
for (const multiaddr of remoteLibp2p.getMultiaddrs()) {
|
||||
expect(denyDialMultiaddr.calledWith(remoteLibp2p.peerId, multiaddr)).to.be.true()
|
||||
@@ -418,7 +427,7 @@ describe('libp2p.connections', () => {
|
||||
|
||||
const fullMultiaddr = remoteLibp2p.getMultiaddrs()[0]
|
||||
|
||||
await libp2p.components.getDialer().dial(fullMultiaddr)
|
||||
await libp2p.dial(fullMultiaddr)
|
||||
|
||||
expect(filterMultiaddrForPeer.callCount).to.equal(2)
|
||||
|
||||
|
@@ -8,7 +8,7 @@ import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import type { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
|
||||
describe('Connection Manager', () => {
|
||||
let libp2p: Libp2pNode
|
||||
@@ -79,7 +79,7 @@ describe('Connection Manager', () => {
|
||||
const value = Math.random()
|
||||
spies.set(value, spy)
|
||||
connectionManager.setPeerValue(connection.remotePeer, value)
|
||||
await connectionManager.onConnect(new CustomEvent('connection', { detail: connection }))
|
||||
await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
|
||||
}))
|
||||
|
||||
// get the lowest value
|
||||
@@ -122,7 +122,7 @@ describe('Connection Manager', () => {
|
||||
await Promise.all([...new Array(max + 1)].map(async () => {
|
||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||
sinon.stub(connection, 'close').callsFake(async () => spy()) // eslint-disable-line
|
||||
await connectionManager.onConnect(new CustomEvent('connection', { detail: connection }))
|
||||
await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
|
||||
}))
|
||||
|
||||
expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1)
|
||||
|
@@ -13,15 +13,17 @@ describe('DHT subsystem is configurable', () => {
|
||||
}
|
||||
})
|
||||
|
||||
it('should not exist if no module is provided', async () => {
|
||||
it('should throw if no module is provided', async () => {
|
||||
libp2p = await createLibp2p(createSubsystemOptions({
|
||||
dht: undefined
|
||||
}))
|
||||
expect(libp2p.dht).to.not.exist()
|
||||
await libp2p.start()
|
||||
await expect(libp2p.dht.getMode()).to.eventually.be.rejected()
|
||||
})
|
||||
|
||||
it('should exist if the module is provided', async () => {
|
||||
it('should not throw if the module is provided', async () => {
|
||||
libp2p = await createLibp2p(createSubsystemOptions())
|
||||
expect(libp2p.dht).to.exist()
|
||||
await libp2p.start()
|
||||
await expect(libp2p.dht.getMode()).to.eventually.equal('client')
|
||||
})
|
||||
})
|
||||
|
@@ -8,7 +8,7 @@ import { subsystemMulticodecs, createSubsystemOptions } from './utils.js'
|
||||
import { createPeerId } from '../../utils/creators/peer.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../../src/libp2p.js'
|
||||
import { isStartable } from '@libp2p/interfaces'
|
||||
import { start } from '@libp2p/interfaces/startable'
|
||||
|
||||
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8000')
|
||||
const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8001')
|
||||
@@ -78,8 +78,8 @@ describe('DHT subsystem operates correctly', () => {
|
||||
expect(connection).to.exist()
|
||||
|
||||
return await Promise.all([
|
||||
pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p.dht?.lan.routingTable.size === 1)
|
||||
pWaitFor(() => libp2p.dht.lan.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p.dht.lan.routingTable.size === 1)
|
||||
])
|
||||
})
|
||||
|
||||
@@ -89,8 +89,8 @@ describe('DHT subsystem operates correctly', () => {
|
||||
|
||||
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||
await Promise.all([
|
||||
pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p.dht?.lan.routingTable.size === 1)
|
||||
pWaitFor(() => libp2p.dht.lan.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p.dht.lan.routingTable.size === 1)
|
||||
])
|
||||
|
||||
await libp2p.components.getContentRouting().put(key, value)
|
||||
@@ -141,19 +141,17 @@ describe('DHT subsystem operates correctly', () => {
|
||||
const connection = await libp2p.dial(remAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
expect(libp2p.dht?.lan.routingTable).to.be.empty()
|
||||
expect(libp2p.dht.lan.routingTable).to.be.empty()
|
||||
|
||||
const dht = remoteLibp2p.dht
|
||||
|
||||
if (isStartable(dht)) {
|
||||
await dht.start()
|
||||
}
|
||||
await start(dht)
|
||||
|
||||
// should be 0 directly after start - TODO this may be susceptible to timing bugs, we should have
|
||||
// the ability to report stats on the DHT routing table instead of reaching into it's heart like this
|
||||
expect(remoteLibp2p.dht?.lan.routingTable).to.be.empty()
|
||||
expect(remoteLibp2p.dht.lan.routingTable).to.be.empty()
|
||||
|
||||
return await pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1)
|
||||
return await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1)
|
||||
})
|
||||
|
||||
it('should put on a peer and get from the other', async () => {
|
||||
@@ -164,11 +162,9 @@ describe('DHT subsystem operates correctly', () => {
|
||||
|
||||
const dht = remoteLibp2p.dht
|
||||
|
||||
if (isStartable(dht)) {
|
||||
await dht.start()
|
||||
}
|
||||
await start(dht)
|
||||
|
||||
await pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1)
|
||||
await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1)
|
||||
await libp2p.components.getContentRouting().put(key, value)
|
||||
|
||||
const fetchedValue = await remoteLibp2p.components.getContentRouting().get(key)
|
||||
|
78
test/core/get-public-key.spec.ts
Normal file
78
test/core/get-public-key.spec.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import { WebSockets } from '@libp2p/websockets'
|
||||
import { NOISE } from '@chainsafe/libp2p-noise'
|
||||
import { createPeerId } from '../utils/creators/peer.js'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import type { Libp2pOptions } from '../../src/index.js'
|
||||
import sinon from 'sinon'
|
||||
import { KadDHT } from '@libp2p/kad-dht'
|
||||
|
||||
describe('getPublicKey', () => {
|
||||
let libp2p: Libp2pNode
|
||||
|
||||
beforeEach(async () => {
|
||||
const peerId = await createPeerId()
|
||||
const config: Libp2pOptions = {
|
||||
peerId,
|
||||
transports: [
|
||||
new WebSockets()
|
||||
],
|
||||
connectionEncryption: [
|
||||
NOISE
|
||||
],
|
||||
dht: new KadDHT()
|
||||
}
|
||||
libp2p = await createLibp2pNode(config)
|
||||
|
||||
await libp2p.start()
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await libp2p.stop()
|
||||
})
|
||||
|
||||
it('should extract embedded public key', async () => {
|
||||
const otherPeer = await createPeerId()
|
||||
|
||||
const key = await libp2p.getPublicKey(otherPeer)
|
||||
|
||||
expect(otherPeer.publicKey).to.equalBytes(key)
|
||||
})
|
||||
|
||||
it('should get key from the keystore', async () => {
|
||||
const otherPeer = await createPeerId({ opts: { type: 'rsa' } })
|
||||
|
||||
if (otherPeer.publicKey == null) {
|
||||
throw new Error('Public key was missing')
|
||||
}
|
||||
|
||||
await libp2p.peerStore.keyBook.set(otherPeer, otherPeer.publicKey)
|
||||
|
||||
const key = await libp2p.getPublicKey(otherPeer)
|
||||
|
||||
expect(otherPeer.publicKey).to.equalBytes(key)
|
||||
})
|
||||
|
||||
it('should query the DHT when the key is not in the keystore', async () => {
|
||||
const otherPeer = await createPeerId({ opts: { type: 'rsa' } })
|
||||
|
||||
if (otherPeer.publicKey == null) {
|
||||
throw new Error('Public key was missing')
|
||||
}
|
||||
|
||||
if (libp2p.dht == null) {
|
||||
throw new Error('DHT was not configured')
|
||||
}
|
||||
|
||||
libp2p.dht.get = sinon.stub().returns([{
|
||||
name: 'VALUE',
|
||||
value: otherPeer.publicKey
|
||||
}])
|
||||
|
||||
const key = await libp2p.getPublicKey(otherPeer)
|
||||
|
||||
expect(otherPeer.publicKey).to.equalBytes(key)
|
||||
})
|
||||
})
|
@@ -5,25 +5,25 @@ import sinon from 'sinon'
|
||||
import { AbortError } from '@libp2p/interfaces/errors'
|
||||
import pDefer from 'p-defer'
|
||||
import delay from 'delay'
|
||||
import { DialAction, DialRequest } from '../../src/dialer/dial-request.js'
|
||||
import { DialAction, DialRequest } from '../../src/connection-manager/dialer/dial-request.js'
|
||||
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { DefaultDialer } from '../../src/dialer/index.js'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { Dialer } from '../../src/connection-manager/dialer/index.js'
|
||||
const error = new Error('dial failure')
|
||||
|
||||
describe('Dial Request', () => {
|
||||
it('should end when a single multiaddr dials succeeds', async () => {
|
||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||
const deferredConn = pDefer()
|
||||
const actions: Record<string, () => Promise<any>> = {
|
||||
'/ip4/127.0.0.1/tcp/1231': async () => await Promise.reject(error),
|
||||
'/ip4/127.0.0.1/tcp/1232': async () => await Promise.resolve(connection),
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => await Promise.reject(error)
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => await deferredConn.promise
|
||||
}
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const controller = new AbortController()
|
||||
const dialer = new DefaultDialer(new Components(), {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
|
||||
@@ -33,15 +33,12 @@ describe('Dial Request', () => {
|
||||
dialAction
|
||||
})
|
||||
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1231')
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1232')
|
||||
sinon.spy(actions, '/ip4/127.0.0.1/tcp/1233')
|
||||
// Make sure that dial attempt comes back before terminating last dial action
|
||||
expect(await dialRequest.run({ signal: controller.signal })).to.equal(connection)
|
||||
|
||||
// End third dial attempt
|
||||
deferredConn.resolve()
|
||||
|
||||
const result = await dialRequest.run({ signal: controller.signal })
|
||||
expect(result).to.equal(connection)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||
})
|
||||
|
||||
@@ -56,7 +53,7 @@ describe('Dial Request', () => {
|
||||
}
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const controller = new AbortController()
|
||||
const dialer = new DefaultDialer(new Components(), {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
|
||||
@@ -74,14 +71,16 @@ describe('Dial Request', () => {
|
||||
// Let the first dials run
|
||||
await delay(0)
|
||||
|
||||
// Finish the first 2 dials
|
||||
firstDials.reject(error)
|
||||
await delay(0)
|
||||
|
||||
// Only 1 dial should remain, so 1 token should have been released
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1231']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1232']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 1)
|
||||
expect(actions['/ip4/127.0.0.1/tcp/1233']).to.have.property('callCount', 0)
|
||||
|
||||
// Finish the first 2 dials
|
||||
firstDials.reject(error)
|
||||
|
||||
await delay(0)
|
||||
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(1)
|
||||
|
||||
// Finish the dial and release the 2nd token
|
||||
@@ -99,7 +98,7 @@ describe('Dial Request', () => {
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const addrs = Object.keys(actions)
|
||||
const controller = new AbortController()
|
||||
const dialer = new DefaultDialer(new Components(), {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
|
||||
@@ -139,7 +138,7 @@ describe('Dial Request', () => {
|
||||
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const controller = new AbortController()
|
||||
const dialer = new DefaultDialer(new Components(), {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
|
||||
@@ -185,7 +184,7 @@ describe('Dial Request', () => {
|
||||
const dialAction: DialAction = async (num) => await actions[num.toString()]()
|
||||
const addrs = Object.keys(actions)
|
||||
const controller = new AbortController()
|
||||
const dialer = new DefaultDialer(new Components(), {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
|
||||
@@ -215,4 +214,45 @@ describe('Dial Request', () => {
|
||||
expect(dialerGetTokensSpy.calledWith(addrs.length)).to.equal(true)
|
||||
expect(dialerReleaseTokenSpy.callCount).to.equal(2)
|
||||
})
|
||||
|
||||
it('should abort other dials when one succeeds', async () => {
|
||||
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
|
||||
const actions: Record<string, () => Promise<any>> = {
|
||||
'/ip4/127.0.0.1/tcp/1231': async () => {
|
||||
await delay(100)
|
||||
},
|
||||
'/ip4/127.0.0.1/tcp/1232': async () => {
|
||||
// Successful dial takes longer to establish
|
||||
await delay(1000)
|
||||
|
||||
return connection
|
||||
},
|
||||
|
||||
'/ip4/127.0.0.1/tcp/1233': async () => {
|
||||
await delay(100)
|
||||
}
|
||||
}
|
||||
|
||||
const signals: Record<string, AbortSignal | undefined> = {}
|
||||
|
||||
const dialRequest = new DialRequest({
|
||||
addrs: Object.keys(actions).map(str => new Multiaddr(str)),
|
||||
dialer: new Dialer({
|
||||
maxParallelDials: 3
|
||||
}),
|
||||
dialAction: async (ma, opts) => {
|
||||
signals[ma.toString()] = opts.signal
|
||||
return await actions[ma.toString()]()
|
||||
}
|
||||
})
|
||||
|
||||
await expect(dialRequest.run()).to.eventually.equal(connection)
|
||||
|
||||
// Dial attempt finished without connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', false)
|
||||
// Dial attempt led to connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
|
||||
// Dial attempt finished without connection
|
||||
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', false)
|
||||
})
|
||||
})
|
||||
|
@@ -17,7 +17,7 @@ import { Connection, isConnection } from '@libp2p/interfaces/connection'
|
||||
import { AbortError } from '@libp2p/interfaces/errors'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { DefaultDialer } from '../../src/dialer/index.js'
|
||||
import { Dialer } from '../../src/connection-manager/dialer/index.js'
|
||||
import { DefaultAddressManager } from '../../src/address-manager/index.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
@@ -25,7 +25,6 @@ import { codes as ErrorCodes } from '../../src/errors.js'
|
||||
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import type { PeerStore } from '@libp2p/interfaces/peer-store'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
@@ -40,7 +39,6 @@ const unsupportedAddr = new Multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1
|
||||
describe('Dialing (direct, TCP)', () => {
|
||||
let remoteTM: DefaultTransportManager
|
||||
let localTM: DefaultTransportManager
|
||||
let peerStore: PeerStore
|
||||
let remoteAddr: Multiaddr
|
||||
let remoteComponents: Components
|
||||
let localComponents: Components
|
||||
@@ -55,17 +53,14 @@ describe('Dialing (direct, TCP)', () => {
|
||||
peerId: remotePeerId,
|
||||
datastore: new MemoryDatastore(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater()
|
||||
connectionGater: mockConnectionGater(),
|
||||
peerStore: new PersistentPeerStore()
|
||||
})
|
||||
remoteComponents.setAddressManager(new DefaultAddressManager(remoteComponents, {
|
||||
listen: [
|
||||
listenAddr.toString()
|
||||
]
|
||||
}))
|
||||
peerStore = new PersistentPeerStore(remoteComponents, {
|
||||
addressFilter: remoteComponents.getConnectionGater().filterMultiaddrForPeer
|
||||
})
|
||||
remoteComponents.setPeerStore(peerStore)
|
||||
remoteTM = new DefaultTransportManager(remoteComponents)
|
||||
remoteTM.add(new TCP())
|
||||
|
||||
@@ -75,10 +70,12 @@ describe('Dialing (direct, TCP)', () => {
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater()
|
||||
})
|
||||
localComponents.setPeerStore(new PersistentPeerStore(localComponents, {
|
||||
addressFilter: localComponents.getConnectionGater().filterMultiaddrForPeer
|
||||
localComponents.setPeerStore(new PersistentPeerStore())
|
||||
localComponents.setConnectionManager(new DefaultConnectionManager({
|
||||
maxConnections: 100,
|
||||
minConnections: 50,
|
||||
autoDialInterval: 1000
|
||||
}))
|
||||
localComponents.setConnectionManager(new DefaultConnectionManager(localComponents))
|
||||
|
||||
localTM = new DefaultTransportManager(localComponents)
|
||||
localTM.add(new TCP())
|
||||
@@ -97,7 +94,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
it('should be able to connect to a remote node via its multiaddr', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const connection = await dialer.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
@@ -105,7 +103,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
await expect(dialer.dial(unsupportedAddr))
|
||||
.to.eventually.be.rejectedWith(Error)
|
||||
@@ -113,7 +112,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
it('should fail to connect if peer has no known addresses', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
const peerId = await createFromJSON(Peers[1])
|
||||
|
||||
await expect(dialer.dial(peerId))
|
||||
@@ -124,7 +124,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
it('should be able to connect to a given peer id', async () => {
|
||||
await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs())
|
||||
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const connection = await dialer.dial(remoteComponents.getPeerId())
|
||||
expect(connection).to.exist()
|
||||
@@ -134,7 +135,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr])
|
||||
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
await expect(dialer.dial(remoteComponents.getPeerId()))
|
||||
.to.eventually.be.rejectedWith(Error)
|
||||
@@ -147,7 +149,8 @@ describe('Dialing (direct, TCP)', () => {
|
||||
const peerId = await createFromJSON(Peers[1])
|
||||
await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr])
|
||||
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
sinon.spy(localTM, 'dial')
|
||||
const connection = await dialer.dial(peerId)
|
||||
@@ -158,9 +161,10 @@ describe('Dialing (direct, TCP)', () => {
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
dialTimeout: 50
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => {
|
||||
expect(options.signal).to.exist()
|
||||
@@ -186,9 +190,10 @@ describe('Dialing (direct, TCP)', () => {
|
||||
|
||||
await localComponents.getPeerStore().addressBook.add(peerId, addrs)
|
||||
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
expect(dialer.tokens).to.have.lengthOf(2)
|
||||
|
||||
@@ -298,7 +303,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial')
|
||||
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
@@ -325,7 +330,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial')
|
||||
const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
|
||||
|
||||
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())
|
||||
|
||||
|
@@ -13,7 +13,7 @@ import { AbortError } from '@libp2p/interfaces/errors'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { codes as ErrorCodes } from '../../src/errors.js'
|
||||
import * as Constants from '../../src/constants.js'
|
||||
import { DefaultDialer, DialTarget } from '../../src/dialer/index.js'
|
||||
import { Dialer, DialTarget } from '../../src/connection-manager/dialer/index.js'
|
||||
import { publicAddressesFirst } from '@libp2p/utils/address-sort'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
@@ -29,6 +29,7 @@ import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { pEvent } from 'p-event'
|
||||
|
||||
const unsupportedAddr = new Multiaddr('/ip4/127.0.0.1/tcp/9999')
|
||||
|
||||
@@ -45,10 +46,14 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater()
|
||||
})
|
||||
localComponents.setPeerStore(new PersistentPeerStore(localComponents, {
|
||||
localComponents.setPeerStore(new PersistentPeerStore({
|
||||
addressFilter: localComponents.getConnectionGater().filterMultiaddrForPeer
|
||||
}))
|
||||
localComponents.setConnectionManager(new DefaultConnectionManager(localComponents))
|
||||
localComponents.setConnectionManager(new DefaultConnectionManager({
|
||||
maxConnections: 100,
|
||||
minConnections: 50,
|
||||
autoDialInterval: 1000
|
||||
}))
|
||||
|
||||
localTM = new DefaultTransportManager(localComponents)
|
||||
localTM.add(new WebSockets({ filter: filters.all }))
|
||||
@@ -66,7 +71,8 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should limit the number of tokens it provides', () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const maxPerPeer = Constants.MAX_PER_PEER_DIALS
|
||||
expect(dialer.tokens).to.have.lengthOf(Constants.MAX_PARALLEL_DIALS)
|
||||
@@ -76,9 +82,10 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should not return tokens if none are left', () => {
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
maxDialsPerPeer: Infinity
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
const maxTokens = dialer.tokens.length
|
||||
|
||||
@@ -89,7 +96,8 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should NOT be able to return a token twice', () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const tokens = dialer.getTokens(1)
|
||||
expect(tokens).to.have.length(1)
|
||||
@@ -100,7 +108,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should be able to connect to a remote node via its multiaddr', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
|
||||
|
||||
@@ -110,7 +120,8 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
await expect(dialer.dial(unsupportedAddr.encapsulate(`/p2p/${remoteComponents.getPeerId().toString()}`)))
|
||||
.to.eventually.be.rejectedWith(Error)
|
||||
@@ -118,7 +129,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should be able to connect to a given peer', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
|
||||
|
||||
@@ -128,7 +141,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
await localComponents.getPeerStore().addressBook.set(remotePeerId, [unsupportedAddr])
|
||||
|
||||
@@ -138,9 +153,11 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
dialTimeout: 50
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
|
||||
|
||||
@@ -159,9 +176,11 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should throw when a peer advertises more than the allowed number of peers', async () => {
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
maxAddrsToDial: 10
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
await localComponents.getPeerStore().addressBook.set(remotePeerId, Array.from({ length: 11 }, (_, i) => new Multiaddr(`/ip4/127.0.0.1/tcp/1500${i}/ws/p2p/12D3KooWHFKTMzwerBtsVmtz4ZZEQy2heafxzWw6wNn5PPYkBxJ5`)))
|
||||
|
||||
@@ -180,10 +199,11 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
const publicAddressesFirstSpy = sinon.spy(publicAddressesFirst)
|
||||
const localTMDialStub = sinon.stub(localTM, 'dial').callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), peerIdFromString(ma.getPeerId() ?? ''))))
|
||||
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
addressSorter: publicAddressesFirstSpy,
|
||||
maxParallelDials: 3
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
// Inject data in the AddressBook
|
||||
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), peerMultiaddrs)
|
||||
@@ -208,9 +228,10 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
]
|
||||
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
// Inject data in the AddressBook
|
||||
await localComponents.getPeerStore().addressBook.add(remotePeerId, addrs)
|
||||
@@ -246,9 +267,10 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
new Multiaddr('/ip4/0.0.0.0/tcp/8001/ws'),
|
||||
new Multiaddr('/ip4/0.0.0.0/tcp/8002/ws')
|
||||
]
|
||||
const dialer = new DefaultDialer(localComponents, {
|
||||
const dialer = new Dialer({
|
||||
maxParallelDials: 2
|
||||
})
|
||||
dialer.init(localComponents)
|
||||
|
||||
// Inject data in the AddressBook
|
||||
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), addrs)
|
||||
@@ -286,7 +308,8 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
it('should cancel pending dial targets before proceeding', async () => {
|
||||
const dialer = new DefaultDialer(localComponents)
|
||||
const dialer = new Dialer()
|
||||
dialer.init(localComponents)
|
||||
|
||||
sinon.stub(dialer, '_createDialTarget').callsFake(async () => {
|
||||
const deferredDial = pDefer<DialTarget>()
|
||||
@@ -308,7 +331,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
})
|
||||
|
||||
describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
const connectionGater = mockConnectionGater()
|
||||
// const connectionGater = mockConnectionGater()
|
||||
let libp2p: Libp2pNode
|
||||
let peerId: PeerId
|
||||
|
||||
@@ -337,14 +360,16 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
],
|
||||
connectionEncryption: [
|
||||
NOISE
|
||||
],
|
||||
connectionGater
|
||||
]
|
||||
})
|
||||
|
||||
expect(libp2p.components.getDialer()).to.exist()
|
||||
expect(libp2p.components.getDialer()).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS)
|
||||
expect(libp2p.components.getDialer()).to.have.property('maxDialsPerPeer', Constants.MAX_PER_PEER_DIALS)
|
||||
expect(libp2p.components.getDialer()).to.have.property('timeout', Constants.DIAL_TIMEOUT)
|
||||
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
|
||||
const dialer = connectionManager.dialer
|
||||
|
||||
expect(dialer).to.exist()
|
||||
expect(dialer).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS)
|
||||
expect(dialer).to.have.property('maxDialsPerPeer', Constants.MAX_PER_PEER_DIALS)
|
||||
expect(dialer).to.have.property('timeout', Constants.DIAL_TIMEOUT)
|
||||
})
|
||||
|
||||
it('should be able to override dialer options', async () => {
|
||||
@@ -361,7 +386,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
connectionEncryption: [
|
||||
NOISE
|
||||
],
|
||||
dialer: {
|
||||
connectionManager: {
|
||||
maxParallelDials: 10,
|
||||
maxDialsPerPeer: 1,
|
||||
dialTimeout: 1e3 // 30 second dial timeout per peer
|
||||
@@ -369,10 +394,13 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
}
|
||||
libp2p = await createLibp2pNode(config)
|
||||
|
||||
expect(libp2p.components.getDialer()).to.exist()
|
||||
expect(libp2p.components.getDialer()).to.have.property('tokens').with.lengthOf(config.dialer.maxParallelDials)
|
||||
expect(libp2p.components.getDialer()).to.have.property('maxDialsPerPeer', config.dialer.maxDialsPerPeer)
|
||||
expect(libp2p.components.getDialer()).to.have.property('timeout', config.dialer.dialTimeout)
|
||||
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
|
||||
const dialer = connectionManager.dialer
|
||||
|
||||
expect(dialer).to.exist()
|
||||
expect(dialer).to.have.property('tokens').with.lengthOf(config.connectionManager.maxParallelDials)
|
||||
expect(dialer).to.have.property('maxDialsPerPeer', config.connectionManager.maxDialsPerPeer)
|
||||
expect(dialer).to.have.property('timeout', config.connectionManager.dialTimeout)
|
||||
})
|
||||
|
||||
it('should use the dialer for connecting', async () => {
|
||||
@@ -391,7 +419,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
]
|
||||
})
|
||||
|
||||
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial')
|
||||
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
|
||||
const dialerDialSpy = sinon.spy(connectionManager.dialer, 'dial')
|
||||
const addressBookAddSpy = sinon.spy(libp2p.components.getPeerStore().addressBook, 'add')
|
||||
|
||||
await libp2p.start()
|
||||
@@ -430,21 +459,15 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
|
||||
const identifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const protobookSetSpy = sinon.spy(libp2p.components.getPeerStore().protoBook, 'set')
|
||||
const connectionPromise = pDefer()
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
libp2p.components.getUpgrader().addEventListener('connection', () => {
|
||||
connectionPromise.resolve()
|
||||
}, {
|
||||
once: true
|
||||
})
|
||||
|
||||
const connection = await libp2p.dial(MULTIADDRS_WEBSOCKETS[0])
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise.promise
|
||||
await connectionPromise
|
||||
|
||||
expect(identifySpy.callCount).to.equal(1)
|
||||
await identifySpy.firstCall.returnValue
|
||||
@@ -519,7 +542,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
]
|
||||
})
|
||||
|
||||
sinon.stub(libp2p.components.getDialer() as DefaultDialer, '_createDialTarget').callsFake(async () => {
|
||||
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
|
||||
sinon.stub(connectionManager.dialer, '_createDialTarget').callsFake(async () => {
|
||||
const deferredDial = pDefer<DialTarget>()
|
||||
return await deferredDial.promise
|
||||
})
|
||||
@@ -557,7 +581,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
const dialerDestroyStub = sinon.spy(libp2p.components.getDialer() as DefaultDialer, 'stop')
|
||||
const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
|
||||
const dialerDestroyStub = sinon.spy(connectionManager.dialer, 'stop')
|
||||
|
||||
await libp2p.stop()
|
||||
|
||||
|
@@ -46,18 +46,16 @@ describe('Dialing (resolvable addresses)', () => {
|
||||
listen: [`${relayAddr.toString()}/p2p-circuit`]
|
||||
},
|
||||
connectionManager: {
|
||||
autoDial: false
|
||||
autoDial: false,
|
||||
resolvers: {
|
||||
dnsaddr: resolver
|
||||
}
|
||||
},
|
||||
relay: {
|
||||
enabled: true,
|
||||
hop: {
|
||||
enabled: false
|
||||
}
|
||||
},
|
||||
dialer: {
|
||||
resolvers: {
|
||||
dnsaddr: resolver
|
||||
}
|
||||
}
|
||||
}),
|
||||
started: true
|
||||
@@ -68,18 +66,16 @@ describe('Dialing (resolvable addresses)', () => {
|
||||
listen: [`${relayAddr.toString()}/p2p-circuit`]
|
||||
},
|
||||
connectionManager: {
|
||||
autoDial: false
|
||||
autoDial: false,
|
||||
resolvers: {
|
||||
dnsaddr: resolver
|
||||
}
|
||||
},
|
||||
relay: {
|
||||
enabled: true,
|
||||
hop: {
|
||||
enabled: false
|
||||
}
|
||||
},
|
||||
dialer: {
|
||||
resolvers: {
|
||||
dnsaddr: resolver
|
||||
}
|
||||
}
|
||||
}),
|
||||
started: true
|
||||
|
133
test/fetch/index.spec.ts
Normal file
133
test/fetch/index.spec.ts
Normal file
@@ -0,0 +1,133 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { FetchService } from '../../src/fetch/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import { pipe } from 'it-pipe'
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('fetch', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to fetch from another peer', async () => {
|
||||
const key = 'key'
|
||||
const value = Uint8Array.from([0, 1, 2, 3, 4])
|
||||
const localFetch = new FetchService(localComponents, defaultInit)
|
||||
const remoteFetch = new FetchService(remoteComponents, defaultInit)
|
||||
|
||||
remoteFetch.registerLookupFunction(key, async (identifier) => {
|
||||
expect(identifier).to.equal(key)
|
||||
|
||||
return value
|
||||
})
|
||||
|
||||
await start(localFetch)
|
||||
await start(remoteFetch)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// Run fetch
|
||||
const result = await localFetch.fetch(remoteComponents.getPeerId(), key)
|
||||
|
||||
expect(result).to.equalBytes(value)
|
||||
})
|
||||
|
||||
it('should time out fetching from another peer when waiting for the record', async () => {
|
||||
const key = 'key'
|
||||
const localFetch = new FetchService(localComponents, defaultInit)
|
||||
const remoteFetch = new FetchService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localFetch)
|
||||
await start(remoteFetch)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol)
|
||||
await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
for await (const chunk of source) {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// Run fetch, should time out
|
||||
await expect(localFetch.fetch(remoteComponents.getPeerId(), key, {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
})
|
@@ -3,17 +3,13 @@
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { codes } from '../../src/errors.js'
|
||||
import { IdentifyService, Message } from '../../src/identify/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { createLibp2pNode } from '../../src/libp2p.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { createBaseOptions } from '../utils/base-options.browser.js'
|
||||
import { DefaultAddressManager } from '../../src/address-manager/index.js'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import * as lp from 'it-length-prefixed'
|
||||
import drain from 'it-drain'
|
||||
import { pipe } from 'it-pipe'
|
||||
@@ -27,13 +23,9 @@ import {
|
||||
} from '../../src/identify/consts.js'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
import { CustomEvent, Startable } from '@libp2p/interfaces'
|
||||
import delay from 'delay'
|
||||
import pWaitFor from 'p-wait-for'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
|
||||
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
@@ -46,7 +38,7 @@ const defaultInit = {
|
||||
|
||||
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
|
||||
|
||||
async function createComponents (index: number, services: Startable[]) {
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
@@ -54,73 +46,61 @@ async function createComponents (index: number, services: Startable[]) {
|
||||
datastore: new MemoryDatastore(),
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater()
|
||||
connectionGater: mockConnectionGater(),
|
||||
peerStore: new PersistentPeerStore(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
const peerStore = new PersistentPeerStore(components, {
|
||||
addressFilter: components.getConnectionGater().filterMultiaddrForPeer
|
||||
})
|
||||
components.setPeerStore(peerStore)
|
||||
components.setAddressManager(new DefaultAddressManager(components, {
|
||||
announce: listenMaddrs.map(ma => ma.toString())
|
||||
}))
|
||||
|
||||
const connectionManager = new DefaultConnectionManager(components)
|
||||
services.push(connectionManager)
|
||||
components.setConnectionManager(connectionManager)
|
||||
|
||||
const transportManager = new DefaultTransportManager(components)
|
||||
services.push(transportManager)
|
||||
components.setTransportManager(transportManager)
|
||||
|
||||
await peerStore.protoBook.set(peerId, protocols)
|
||||
await components.getPeerStore().protoBook.set(peerId, protocols)
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('Identify', () => {
|
||||
describe('identify', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
let localPeerRecordUpdater: PeerRecordUpdater
|
||||
let remotePeerRecordUpdater: PeerRecordUpdater
|
||||
let services: Startable[]
|
||||
|
||||
beforeEach(async () => {
|
||||
services = []
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
localComponents = await createComponents(0, services)
|
||||
remoteComponents = await createComponents(1, services)
|
||||
|
||||
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
|
||||
remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents)
|
||||
|
||||
await Promise.all(
|
||||
services.map(s => s.start())
|
||||
)
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all(
|
||||
services.map(s => s.stop())
|
||||
)
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to identify another peer', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await localIdentify.start()
|
||||
await remoteIdentify.start()
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote] = connectionPair({
|
||||
peerId: localComponents.getPeerId(),
|
||||
registrar: localComponents.getRegistrar()
|
||||
}, {
|
||||
peerId: remoteComponents.getPeerId(),
|
||||
registrar: remoteComponents.getRegistrar()
|
||||
})
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
const localAddressBookConsumePeerRecordSpy = sinon.spy(localComponents.getPeerStore().addressBook, 'consumePeerRecord')
|
||||
const localProtoBookSetSpy = sinon.spy(localComponents.getPeerStore().protoBook, 'set')
|
||||
@@ -152,22 +132,16 @@ describe('Identify', () => {
|
||||
agentVersion: agentVersion
|
||||
}
|
||||
})
|
||||
await localIdentify.start()
|
||||
await start(localIdentify)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, {
|
||||
protocolPrefix: 'ipfs',
|
||||
host: {
|
||||
agentVersion: agentVersion
|
||||
}
|
||||
})
|
||||
await remoteIdentify.start()
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote] = connectionPair({
|
||||
peerId: localComponents.getPeerId(),
|
||||
registrar: localComponents.getRegistrar()
|
||||
}, {
|
||||
peerId: remoteComponents.getPeerId(),
|
||||
registrar: remoteComponents.getRegistrar()
|
||||
})
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
sinon.stub(localComponents.getPeerStore().addressBook, 'consumePeerRecord').throws()
|
||||
|
||||
@@ -191,16 +165,10 @@ describe('Identify', () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await localIdentify.start()
|
||||
await remoteIdentify.start()
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote] = connectionPair({
|
||||
peerId: localComponents.getPeerId(),
|
||||
registrar: localComponents.getRegistrar()
|
||||
}, {
|
||||
peerId: remoteComponents.getPeerId(),
|
||||
registrar: remoteComponents.getRegistrar()
|
||||
})
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// send an invalid message
|
||||
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY)
|
||||
@@ -249,371 +217,57 @@ describe('Identify', () => {
|
||||
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion'))
|
||||
.to.eventually.be.undefined()
|
||||
|
||||
await localIdentify.start()
|
||||
await start(localIdentify)
|
||||
|
||||
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'AgentVersion'))
|
||||
.to.eventually.deep.equal(uint8ArrayFromString(agentVersion))
|
||||
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion'))
|
||||
.to.eventually.be.ok()
|
||||
|
||||
await localIdentify.stop()
|
||||
await stop(localIdentify)
|
||||
})
|
||||
|
||||
describe('push', () => {
|
||||
it('should be able to push identify updates to another peer', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
it('should time out during identify', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await localIdentify.start()
|
||||
await remoteIdentify.start()
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair({
|
||||
peerId: localComponents.getPeerId(),
|
||||
registrar: localComponents.getRegistrar()
|
||||
}, {
|
||||
peerId: remoteComponents.getPeerId(),
|
||||
registrar: remoteComponents.getRegistrar()
|
||||
})
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY)
|
||||
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
// we receive no data in the identify protocol, we just send our data
|
||||
await drain(source)
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to update the peer record and send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// ensure sequence number of peer record we are about to create is different
|
||||
await delay(1000)
|
||||
|
||||
// make sure we have a peer record to send
|
||||
await localPeerRecordUpdater.update()
|
||||
|
||||
// wait for the remote peer store to notice the changes
|
||||
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
|
||||
|
||||
// push updated peer record to connections
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await eventPromise
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: true
|
||||
}])
|
||||
|
||||
await localIdentify.stop()
|
||||
await remoteIdentify.stop()
|
||||
yield new Uint8Array()
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
// LEGACY
|
||||
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
await localIdentify.start()
|
||||
await remoteIdentify.start()
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair({
|
||||
peerId: localComponents.getPeerId(),
|
||||
registrar: localComponents.getRegistrar()
|
||||
}, {
|
||||
peerId: remoteComponents.getPeerId(),
|
||||
registrar: remoteComponents.getRegistrar()
|
||||
})
|
||||
// Run identify
|
||||
await expect(localIdentify.identify(localToRemote, {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// wait until remote peer store notices protocol list update
|
||||
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
|
||||
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await waitForUpdate
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: false
|
||||
}])
|
||||
|
||||
await localIdentify.stop()
|
||||
await remoteIdentify.stop()
|
||||
})
|
||||
})
|
||||
|
||||
describe('libp2p.dialer.identifyService', () => {
|
||||
let peerId: PeerId
|
||||
let libp2p: Libp2pNode
|
||||
let remoteLibp2p: Libp2pNode
|
||||
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
|
||||
before(async () => {
|
||||
peerId = await createFromJSON(Peers[0])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
if (libp2p != null) {
|
||||
await libp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
if (remoteLibp2p != null) {
|
||||
await remoteLibp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
it('should run identify automatically after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
})
|
||||
|
||||
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
|
||||
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
|
||||
|
||||
expect(storedAgentVersion).to.exist()
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push protocol updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.handle('/echo/2.0.0', () => {})
|
||||
await libp2p.unhandle('/echo/2.0.0')
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 2)
|
||||
|
||||
// Verify the remote peer is notified of both changes
|
||||
expect(identityServicePushSpy.callCount).to.equal(2)
|
||||
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
|
||||
it('should store host data and protocol version into metadataBook', async () => {
|
||||
const agentVersion = 'js-project/1.0.0'
|
||||
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId,
|
||||
host: {
|
||||
agentVersion
|
||||
}
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
|
||||
|
||||
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push multiaddr updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 1)
|
||||
|
||||
// Verify the remote peer is notified of change
|
||||
expect(identityServicePushSpy.callCount).to.equal(1)
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
})
|
||||
|
296
test/identify/push.spec.ts
Normal file
296
test/identify/push.spec.ts
Normal file
@@ -0,0 +1,296 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { IdentifyService } from '../../src/identify/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DefaultAddressManager } from '../../src/address-manager/index.js'
|
||||
import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import drain from 'it-drain'
|
||||
import { pipe } from 'it-pipe'
|
||||
import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { PeerRecordUpdater } from '../../src/peer-record-updater.js'
|
||||
import {
|
||||
MULTICODEC_IDENTIFY,
|
||||
MULTICODEC_IDENTIFY_PUSH
|
||||
} from '../../src/identify/consts.js'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { DefaultTransportManager } from '../../src/transport-manager.js'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import delay from 'delay'
|
||||
import { pEvent } from 'p-event'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
|
||||
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs',
|
||||
host: {
|
||||
agentVersion: 'v1.0.0'
|
||||
}
|
||||
}
|
||||
|
||||
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
datastore: new MemoryDatastore(),
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionGater: mockConnectionGater(),
|
||||
peerStore: new PersistentPeerStore(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
components.setAddressManager(new DefaultAddressManager(components, {
|
||||
announce: listenMaddrs.map(ma => ma.toString())
|
||||
}))
|
||||
|
||||
const transportManager = new DefaultTransportManager(components)
|
||||
components.setTransportManager(transportManager)
|
||||
|
||||
await components.getPeerStore().protoBook.set(peerId, protocols)
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('identify (push)', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
let localPeerRecordUpdater: PeerRecordUpdater
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to push identify updates to another peer', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to update the peer record and send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// ensure sequence number of peer record we are about to create is different
|
||||
await delay(1000)
|
||||
|
||||
// make sure we have a peer record to send
|
||||
await localPeerRecordUpdater.update()
|
||||
|
||||
// wait for the remote peer store to notice the changes
|
||||
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
|
||||
|
||||
// push updated peer record to connections
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await eventPromise
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: true
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
})
|
||||
|
||||
it('should time out during push identify', async () => {
|
||||
let streamEnded = false
|
||||
const localIdentify = new IdentifyService(localComponents, {
|
||||
...defaultInit,
|
||||
timeout: 10
|
||||
})
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY_PUSH)
|
||||
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY_PUSH, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
// ignore the sent data
|
||||
await drain(source)
|
||||
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
// the delay should have caused the local push to time out so this should
|
||||
// occur after the local push method invocation has completed
|
||||
streamEnded = true
|
||||
|
||||
yield new Uint8Array()
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// push updated peer record to remote
|
||||
await localIdentify.push([localToRemote])
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
|
||||
// method should have returned before the remote handler completes as we timed
|
||||
// out so we ignore the return value
|
||||
expect(streamEnded).to.be.false()
|
||||
})
|
||||
|
||||
// LEGACY
|
||||
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
|
||||
const localIdentify = new IdentifyService(localComponents, defaultInit)
|
||||
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localIdentify)
|
||||
await start(remoteIdentify)
|
||||
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
|
||||
// ensure connections are registered by connection manager
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: localToRemote
|
||||
}))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
|
||||
detail: remoteToLocal
|
||||
}))
|
||||
|
||||
// identify both ways
|
||||
await localIdentify.identify(localToRemote)
|
||||
await remoteIdentify.identify(remoteToLocal)
|
||||
|
||||
const updatedProtocol = '/special-new-protocol/1.0.0'
|
||||
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
|
||||
|
||||
// should have protocols but not our new one
|
||||
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(identifiedProtocols).to.not.be.empty()
|
||||
expect(identifiedProtocols).to.not.include(updatedProtocol)
|
||||
|
||||
// should have addresses but not our new one
|
||||
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(identifiedAddresses).to.not.be.empty()
|
||||
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
|
||||
|
||||
// update local data - change event will trigger push
|
||||
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
|
||||
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
|
||||
|
||||
// needed to send our supported addresses
|
||||
const addressManager = localComponents.getAddressManager()
|
||||
addressManager.getAddresses = () => {
|
||||
return [updatedAddress]
|
||||
}
|
||||
|
||||
// wait until remote peer store notices protocol list update
|
||||
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
|
||||
|
||||
await localIdentify.pushToPeerStore()
|
||||
|
||||
await waitForUpdate
|
||||
|
||||
// should have new protocol
|
||||
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
|
||||
expect(updatedProtocols).to.not.be.empty()
|
||||
expect(updatedProtocols).to.include(updatedProtocol)
|
||||
|
||||
// should have new address
|
||||
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
|
||||
expect(updatedAddresses.map(a => {
|
||||
return {
|
||||
multiaddr: a.multiaddr.toString(),
|
||||
isCertified: a.isCertified
|
||||
}
|
||||
})).to.deep.equal([{
|
||||
multiaddr: updatedAddress.toString(),
|
||||
isCertified: false
|
||||
}])
|
||||
|
||||
await stop(localIdentify)
|
||||
await stop(remoteIdentify)
|
||||
})
|
||||
})
|
216
test/identify/service.spec.ts
Normal file
216
test/identify/service.spec.ts
Normal file
@@ -0,0 +1,216 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { createLibp2pNode } from '../../src/libp2p.js'
|
||||
import { createBaseOptions } from '../utils/base-options.browser.js'
|
||||
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import pWaitFor from 'p-wait-for'
|
||||
import { peerIdFromString } from '@libp2p/peer-id'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import type { Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
|
||||
describe('libp2p.dialer.identifyService', () => {
|
||||
let peerId: PeerId
|
||||
let libp2p: Libp2pNode
|
||||
let remoteLibp2p: Libp2pNode
|
||||
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
|
||||
before(async () => {
|
||||
peerId = await createFromJSON(Peers[0])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
if (libp2p != null) {
|
||||
await libp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
if (remoteLibp2p != null) {
|
||||
await remoteLibp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
it('should run identify automatically after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
})
|
||||
|
||||
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
||||
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
|
||||
// Wait for peer store to be updated
|
||||
// Dialer._createDialTarget (add), Identify (consume)
|
||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||
expect(identityServiceIdentifySpy.callCount).to.equal(1)
|
||||
|
||||
// The connection should have no open streams
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
await connection.close()
|
||||
|
||||
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
|
||||
|
||||
expect(storedAgentVersion).to.exist()
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push protocol updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.handle('/echo/2.0.0', () => {})
|
||||
await libp2p.unhandle('/echo/2.0.0')
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 2)
|
||||
|
||||
// Verify the remote peer is notified of both changes
|
||||
expect(identityServicePushSpy.callCount).to.equal(2)
|
||||
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
|
||||
it('should store host data and protocol version into metadataBook', async () => {
|
||||
const agentVersion = 'js-project/1.0.0'
|
||||
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId,
|
||||
identify: {
|
||||
host: {
|
||||
agentVersion
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
|
||||
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
|
||||
|
||||
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
|
||||
expect(storedProtocolVersion).to.exist()
|
||||
})
|
||||
|
||||
it('should push multiaddr updates to an already connected peer', async () => {
|
||||
libp2p = await createLibp2pNode(createBaseOptions({
|
||||
peerId
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
if (libp2p.identifyService == null) {
|
||||
throw new Error('Identity service was not configured')
|
||||
}
|
||||
|
||||
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
|
||||
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
// Wait for connection event to be emitted
|
||||
await connectionPromise
|
||||
|
||||
// Wait for identify to finish
|
||||
await identityServiceIdentifySpy.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
|
||||
|
||||
// the protocol change event listener in the identity service is async
|
||||
await pWaitFor(() => identityServicePushSpy.callCount === 1)
|
||||
|
||||
// Verify the remote peer is notified of change
|
||||
expect(identityServicePushSpy.callCount).to.equal(1)
|
||||
for (const call of identityServicePushSpy.getCalls()) {
|
||||
const [connections] = call.args
|
||||
expect(connections.length).to.equal(1)
|
||||
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
|
||||
await call.returnValue
|
||||
}
|
||||
|
||||
// Verify the streams close
|
||||
await pWaitFor(() => connection.streams.length === 0)
|
||||
})
|
||||
})
|
@@ -17,7 +17,6 @@ import { unmarshalPrivateKey } from '@libp2p/crypto/keys'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { peerIdFromKeys } from '@libp2p/peer-id'
|
||||
import { FloodSub } from '@libp2p/floodsub'
|
||||
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
|
||||
|
||||
// IPFS_LOGGING=debug DEBUG=libp2p*,go-libp2p:* npm run test:interop
|
||||
|
||||
@@ -122,7 +121,7 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> {
|
||||
if (options.pubsubRouter === 'floodsub') {
|
||||
opts.pubsub = new FloodSub()
|
||||
} else {
|
||||
opts.pubsub = new Gossipsub()
|
||||
opts.pubsub = new FloodSub()
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -14,11 +14,16 @@ import type { DefaultMetrics } from '../../src/metrics/index.js'
|
||||
|
||||
describe('libp2p.metrics', () => {
|
||||
let libp2p: Libp2pNode
|
||||
let remoteLibp2p: Libp2pNode
|
||||
|
||||
afterEach(async () => {
|
||||
if (libp2p != null) {
|
||||
await libp2p.stop()
|
||||
}
|
||||
|
||||
if (remoteLibp2p != null) {
|
||||
await remoteLibp2p.stop()
|
||||
}
|
||||
})
|
||||
|
||||
it('should disable metrics by default', async () => {
|
||||
@@ -56,8 +61,7 @@ describe('libp2p.metrics', () => {
|
||||
})
|
||||
|
||||
it('should record metrics on connections and streams when enabled', async () => {
|
||||
let remoteLibp2p: Libp2pNode
|
||||
;[libp2p, remoteLibp2p] = await Promise.all([
|
||||
[libp2p, remoteLibp2p] = await Promise.all([
|
||||
createNode({
|
||||
config: createBaseOptions({
|
||||
metrics: {
|
||||
@@ -117,8 +121,7 @@ describe('libp2p.metrics', () => {
|
||||
})
|
||||
|
||||
it('should move disconnected peers to the old peers list', async () => {
|
||||
let remoteLibp2p
|
||||
;[libp2p, remoteLibp2p] = await Promise.all([
|
||||
[libp2p, remoteLibp2p] = await Promise.all([
|
||||
createNode({
|
||||
config: createBaseOptions({
|
||||
metrics: {
|
||||
|
@@ -13,6 +13,7 @@ import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import type { NatAPI } from '@achingbrain/nat-port-mapper'
|
||||
import { StubbedInstance, stubInterface } from 'ts-sinon'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
|
||||
const DEFAULT_ADDRESSES = [
|
||||
'/ip4/127.0.0.1/tcp/0',
|
||||
@@ -49,7 +50,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
await components.getTransportManager().listen(components.getAddressManager().getListenAddrs())
|
||||
|
||||
teardown.push(async () => {
|
||||
await natManager.stop()
|
||||
await stop(natManager)
|
||||
await components.getTransportManager().removeAll()
|
||||
})
|
||||
|
||||
@@ -78,7 +79,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.not.be.empty()
|
||||
@@ -127,7 +128,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
enabled: false
|
||||
})
|
||||
|
||||
natManager.start()
|
||||
await start(natManager)
|
||||
|
||||
await delay(100)
|
||||
|
||||
@@ -146,7 +147,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@@ -163,7 +164,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@@ -180,7 +181,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@@ -197,7 +198,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
@@ -214,7 +215,7 @@ describe('Nat Manager (TCP)', () => {
|
||||
let observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
||||
await natManager._start()
|
||||
await start(natManager)
|
||||
|
||||
observed = components.getAddressManager().getObservedAddrs().map(ma => ma.toString())
|
||||
expect(observed).to.be.empty()
|
||||
|
@@ -13,7 +13,7 @@ import { createBaseOptions } from '../utils/base-options.js'
|
||||
import { createPeerId } from '../utils/creators/peer.js'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
|
||||
|
||||
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||
|
@@ -9,6 +9,7 @@ import { createPeerId } from '../utils/creators/peer.js'
|
||||
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import type { Startable } from '@libp2p/interfaces/startable'
|
||||
|
||||
describe('peer discovery', () => {
|
||||
describe('basic functions', () => {
|
||||
@@ -42,7 +43,7 @@ describe('peer discovery', () => {
|
||||
await libp2p.peerStore.addressBook.set(remotePeerId, [new Multiaddr('/ip4/165.1.1.1/tcp/80')])
|
||||
|
||||
const deferred = defer()
|
||||
sinon.stub(libp2p.components.getDialer(), 'dial').callsFake(async (id) => {
|
||||
sinon.stub(libp2p.components.getConnectionManager(), 'openConnection').callsFake(async (id) => {
|
||||
if (!isPeerId(id)) {
|
||||
throw new Error('Tried to dial something that was not a peer ID')
|
||||
}
|
||||
@@ -69,13 +70,22 @@ describe('peer discovery', () => {
|
||||
let started = 0
|
||||
let stopped = 0
|
||||
|
||||
class MockDiscovery {
|
||||
class MockDiscovery implements Startable {
|
||||
static tag = 'mock'
|
||||
|
||||
started = false
|
||||
|
||||
isStarted () {
|
||||
return this.started
|
||||
}
|
||||
|
||||
start () {
|
||||
this.started = true
|
||||
started++
|
||||
}
|
||||
|
||||
stop () {
|
||||
this.started = false
|
||||
stopped++
|
||||
}
|
||||
|
||||
|
122
test/ping/index.spec.ts
Normal file
122
test/ping/index.spec.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
import { expect } from 'aegir/chai'
|
||||
import sinon from 'sinon'
|
||||
import { PingService } from '../../src/ping/index.js'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createFromJSON } from '@libp2p/peer-id-factory'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { start, stop } from '@libp2p/interfaces/startable'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import { pipe } from 'it-pipe'
|
||||
|
||||
const defaultInit = {
|
||||
protocolPrefix: 'ipfs'
|
||||
}
|
||||
|
||||
async function createComponents (index: number) {
|
||||
const peerId = await createFromJSON(Peers[index])
|
||||
|
||||
const components = new Components({
|
||||
peerId,
|
||||
registrar: mockRegistrar(),
|
||||
upgrader: mockUpgrader(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
|
||||
return components
|
||||
}
|
||||
|
||||
describe('ping', () => {
|
||||
let localComponents: Components
|
||||
let remoteComponents: Components
|
||||
|
||||
beforeEach(async () => {
|
||||
localComponents = await createComponents(0)
|
||||
remoteComponents = await createComponents(1)
|
||||
|
||||
await Promise.all([
|
||||
start(localComponents),
|
||||
start(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
sinon.restore()
|
||||
|
||||
await Promise.all([
|
||||
stop(localComponents),
|
||||
stop(remoteComponents)
|
||||
])
|
||||
})
|
||||
|
||||
it('should be able to ping another peer', async () => {
|
||||
const localPing = new PingService(localComponents, defaultInit)
|
||||
const remotePing = new PingService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localPing)
|
||||
await start(remotePing)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// Run ping
|
||||
await expect(localPing.ping(remoteComponents.getPeerId())).to.eventually.be.gte(0)
|
||||
})
|
||||
|
||||
it('should time out pinging another peer when waiting for a pong', async () => {
|
||||
const localPing = new PingService(localComponents, defaultInit)
|
||||
const remotePing = new PingService(remoteComponents, defaultInit)
|
||||
|
||||
await start(localPing)
|
||||
await start(remotePing)
|
||||
|
||||
// simulate connection between nodes
|
||||
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
|
||||
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
|
||||
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
|
||||
|
||||
// replace existing handler with a really slow one
|
||||
await remoteComponents.getRegistrar().unhandle(remotePing.protocol)
|
||||
await remoteComponents.getRegistrar().handle(remotePing.protocol, ({ stream }) => {
|
||||
void pipe(
|
||||
stream,
|
||||
async function * (source) {
|
||||
for await (const chunk of source) {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
|
||||
yield chunk
|
||||
}
|
||||
},
|
||||
stream
|
||||
)
|
||||
})
|
||||
|
||||
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// Run ping, should time out
|
||||
await expect(localPing.ping(remoteComponents.getPeerId(), {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
|
||||
// should have closed stream
|
||||
expect(newStreamSpy).to.have.property('callCount', 1)
|
||||
const { stream } = await newStreamSpy.getCall(0).returnValue
|
||||
expect(stream).to.have.nested.property('timeline.close')
|
||||
})
|
||||
})
|
@@ -3,8 +3,7 @@ import { expect } from 'aegir/chai'
|
||||
import { pipe } from 'it-pipe'
|
||||
import all from 'it-all'
|
||||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js'
|
||||
import { generate } from '../../src/pnet/key-generator.js'
|
||||
import { PreSharedKeyConnectionProtector, generateKey } from '../../src/pnet/index.js'
|
||||
import { INVALID_PSK } from '../../src/pnet/errors.js'
|
||||
import { mockMultiaddrConnPair } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
@@ -14,8 +13,8 @@ const swarmKeyBuffer = new Uint8Array(95)
|
||||
const wrongSwarmKeyBuffer = new Uint8Array(95)
|
||||
|
||||
// Write new psk files to the buffers
|
||||
generate(swarmKeyBuffer)
|
||||
generate(wrongSwarmKeyBuffer)
|
||||
generateKey(swarmKeyBuffer)
|
||||
generateKey(wrongSwarmKeyBuffer)
|
||||
|
||||
describe('private network', () => {
|
||||
it('should accept a valid psk buffer', () => {
|
||||
|
@@ -6,7 +6,7 @@ import { MemoryDatastore } from 'datastore-core/memory'
|
||||
import { createTopology } from '@libp2p/topology'
|
||||
import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { DefaultRegistrar } from '../../src/registrar.js'
|
||||
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { createPeerId, createNode } from '../utils/creators/peer.js'
|
||||
import { createBaseOptions } from '../utils/base-options.browser.js'
|
||||
import type { Registrar } from '@libp2p/interfaces/registrar'
|
||||
@@ -14,17 +14,17 @@ import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
import { Components } from '@libp2p/interfaces/components'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
|
||||
import { CustomEvent } from '@libp2p/interfaces'
|
||||
import { CustomEvent } from '@libp2p/interfaces/events'
|
||||
import type { Connection } from '@libp2p/interfaces/connection'
|
||||
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
|
||||
import { Plaintext } from '../../src/insecure/index.js'
|
||||
import { WebSockets } from '@libp2p/websockets'
|
||||
import { Mplex } from '@libp2p/mplex'
|
||||
import type { PeerProtocolsChangeData } from '@libp2p/interfaces/peer-store'
|
||||
|
||||
const protocol = '/test/1.0.0'
|
||||
|
||||
describe('registrar', () => {
|
||||
const connectionGater = mockConnectionGater()
|
||||
let components: Components
|
||||
let registrar: Registrar
|
||||
let peerId: PeerId
|
||||
@@ -38,12 +38,14 @@ describe('registrar', () => {
|
||||
components = new Components({
|
||||
peerId,
|
||||
datastore: new MemoryDatastore(),
|
||||
upgrader: mockUpgrader()
|
||||
upgrader: mockUpgrader(),
|
||||
peerStore: new PersistentPeerStore(),
|
||||
connectionManager: new DefaultConnectionManager({
|
||||
minConnections: 50,
|
||||
maxConnections: 1000,
|
||||
autoDialInterval: 1000
|
||||
})
|
||||
})
|
||||
components.setPeerStore(new PersistentPeerStore(components, {
|
||||
addressFilter: connectionGater.filterMultiaddrForPeer
|
||||
}))
|
||||
components.setConnectionManager(new DefaultConnectionManager(components))
|
||||
registrar = new DefaultRegistrar(components)
|
||||
})
|
||||
|
||||
@@ -144,6 +146,15 @@ describe('registrar', () => {
|
||||
detail: conn
|
||||
}))
|
||||
|
||||
// identify completes
|
||||
await libp2p.components.getPeerStore().dispatchEvent(new CustomEvent<PeerProtocolsChangeData>('change:protocols', {
|
||||
detail: {
|
||||
peerId: conn.remotePeer,
|
||||
protocols: [protocol],
|
||||
oldProtocols: []
|
||||
}
|
||||
}))
|
||||
|
||||
// remote peer disconnects
|
||||
await conn.close()
|
||||
await libp2p.components.getUpgrader().dispatchEvent(new CustomEvent<Connection>('connectionEnd', {
|
||||
@@ -185,10 +196,20 @@ describe('registrar', () => {
|
||||
// Add protocol to peer and update it
|
||||
await libp2p.peerStore.protoBook.add(remotePeerId, [protocol])
|
||||
|
||||
// remote peer connects
|
||||
await libp2p.components.getUpgrader().dispatchEvent(new CustomEvent<Connection>('connection', {
|
||||
detail: conn
|
||||
}))
|
||||
|
||||
// identify completes
|
||||
await libp2p.components.getPeerStore().dispatchEvent(new CustomEvent<PeerProtocolsChangeData>('change:protocols', {
|
||||
detail: {
|
||||
peerId: conn.remotePeer,
|
||||
protocols: [protocol],
|
||||
oldProtocols: []
|
||||
}
|
||||
}))
|
||||
|
||||
await onConnectDefer.promise
|
||||
|
||||
// Peer no longer supports the protocol our topology is registered for
|
||||
|
@@ -263,7 +263,7 @@ describe('auto-relay', () => {
|
||||
await relayLibp2p1.hangUp(relayLibp2p3.peerId)
|
||||
|
||||
// Stub dial
|
||||
sinon.stub(relayLibp2p1.components.getDialer(), 'dial').callsFake(async () => {
|
||||
sinon.stub(relayLibp2p1.components.getConnectionManager(), 'openConnection').callsFake(async () => {
|
||||
deferred.resolve()
|
||||
return await Promise.reject(new Error('failed to dial'))
|
||||
})
|
||||
|
@@ -118,8 +118,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
.and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// We should not be connected to the relay, because we weren't before the dial
|
||||
const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId)
|
||||
expect(srcToRelayConn).to.not.exist()
|
||||
const srcToRelayConns = srcLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
|
||||
expect(srcToRelayConns).to.be.empty()
|
||||
})
|
||||
|
||||
it('dialer should stay connected to an already connected relay on hop failure', async () => {
|
||||
@@ -135,9 +135,9 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId)
|
||||
expect(srcToRelayConn).to.exist()
|
||||
expect(srcToRelayConn?.stat.status).to.equal('OPEN')
|
||||
const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
|
||||
expect(srcToRelayConn).to.have.lengthOf(1)
|
||||
expect(srcToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN')
|
||||
})
|
||||
|
||||
it('destination peer should stay connected to an already connected relay on hop failure', async () => {
|
||||
@@ -166,8 +166,8 @@ describe('Dialing (via relay, TCP)', () => {
|
||||
streamHandler.close()
|
||||
|
||||
// should still be connected
|
||||
const dstToRelayConn = dstLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId)
|
||||
expect(dstToRelayConn).to.exist()
|
||||
expect(dstToRelayConn?.stat.status).to.equal('OPEN')
|
||||
const dstToRelayConn = dstLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
|
||||
expect(dstToRelayConn).to.have.lengthOf(1)
|
||||
expect(dstToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN')
|
||||
})
|
||||
})
|
||||
|
@@ -8,7 +8,7 @@ import { PersistentPeerStore } from '@libp2p/peer-store'
|
||||
import { PeerRecord } from '@libp2p/peer-record'
|
||||
import { TCP } from '@libp2p/tcp'
|
||||
import { Multiaddr } from '@multiformats/multiaddr'
|
||||
import { mockUpgrader, mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import sinon from 'sinon'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import pWaitFor from 'p-wait-for'
|
||||
@@ -23,7 +23,6 @@ const addrs = [
|
||||
]
|
||||
|
||||
describe('Transport Manager (TCP)', () => {
|
||||
const connectionGater = mockConnectionGater()
|
||||
let tm: DefaultTransportManager
|
||||
let localPeer: PeerId
|
||||
let components: Components
|
||||
@@ -39,9 +38,7 @@ describe('Transport Manager (TCP)', () => {
|
||||
upgrader: mockUpgrader()
|
||||
})
|
||||
components.setAddressManager(new DefaultAddressManager(components, { listen: addrs.map(addr => addr.toString()) }))
|
||||
components.setPeerStore(new PersistentPeerStore(components, {
|
||||
addressFilter: connectionGater.filterMultiaddrForPeer
|
||||
}))
|
||||
components.setPeerStore(new PersistentPeerStore())
|
||||
|
||||
tm = new DefaultTransportManager(components)
|
||||
|
||||
|
@@ -47,7 +47,7 @@ describe('Transport Manager (WebSockets)', () => {
|
||||
tm.add(transport)
|
||||
|
||||
expect(tm.getTransports()).to.have.lengthOf(1)
|
||||
await tm.remove(transport.constructor.name)
|
||||
await tm.remove(transport[Symbol.toStringTag])
|
||||
expect(tm.getTransports()).to.have.lengthOf(0)
|
||||
})
|
||||
|
||||
|
@@ -14,7 +14,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
|
||||
import swarmKey from '../fixtures/swarm.key.js'
|
||||
import { DefaultUpgrader } from '../../src/upgrader.js'
|
||||
import { codes } from '../../src/errors.js'
|
||||
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-compliance-tests/mocks'
|
||||
import Peers from '../fixtures/peers.js'
|
||||
import type { Upgrader } from '@libp2p/interfaces/transport'
|
||||
import type { PeerId } from '@libp2p/interfaces/peer-id'
|
||||
@@ -26,6 +26,10 @@ import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/i
|
||||
import type { Stream } from '@libp2p/interfaces/connection'
|
||||
import pDefer from 'p-defer'
|
||||
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
|
||||
import { pEvent } from 'p-event'
|
||||
import { TimeoutController } from 'timeout-abort-controller'
|
||||
import delay from 'delay'
|
||||
import drain from 'it-drain'
|
||||
|
||||
const addrs = [
|
||||
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
|
||||
@@ -34,6 +38,7 @@ const addrs = [
|
||||
|
||||
describe('Upgrader', () => {
|
||||
let localUpgrader: Upgrader
|
||||
let localMuxerFactory: StreamMuxerFactory
|
||||
let remoteUpgrader: Upgrader
|
||||
let localPeer: PeerId
|
||||
let remotePeer: PeerId
|
||||
@@ -54,12 +59,13 @@ describe('Upgrader', () => {
|
||||
connectionGater: mockConnectionGater(),
|
||||
registrar: mockRegistrar()
|
||||
})
|
||||
localMuxerFactory = new Mplex()
|
||||
localUpgrader = new DefaultUpgrader(localComponents, {
|
||||
connectionEncryption: [
|
||||
new Plaintext()
|
||||
],
|
||||
muxers: [
|
||||
new Mplex()
|
||||
localMuxerFactory
|
||||
]
|
||||
})
|
||||
|
||||
@@ -365,6 +371,40 @@ describe('Upgrader', () => {
|
||||
expect(result).to.have.nested.property('reason.code', codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||
})
|
||||
})
|
||||
|
||||
it('should abort protocol selection for slow streams', async () => {
|
||||
const createStreamMuxerSpy = sinon.spy(localMuxerFactory, 'createStreamMuxer')
|
||||
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||
|
||||
const connections = await Promise.all([
|
||||
localUpgrader.upgradeOutbound(outbound),
|
||||
remoteUpgrader.upgradeInbound(inbound)
|
||||
])
|
||||
|
||||
// 10 ms timeout
|
||||
const timeoutController = new TimeoutController(10)
|
||||
|
||||
// should have created muxer for connection
|
||||
expect(createStreamMuxerSpy).to.have.property('callCount', 1)
|
||||
|
||||
// create mock muxed stream that never sends data
|
||||
const muxer = createStreamMuxerSpy.getCall(0).returnValue
|
||||
muxer.newStream = () => {
|
||||
return mockStream({
|
||||
source: (async function * () {
|
||||
// longer than the timeout
|
||||
await delay(1000)
|
||||
yield new Uint8Array()
|
||||
}()),
|
||||
sink: drain
|
||||
})
|
||||
}
|
||||
|
||||
await expect(connections[0].newStream('/echo/1.0.0', {
|
||||
signal: timeoutController.signal
|
||||
}))
|
||||
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
|
||||
})
|
||||
})
|
||||
|
||||
describe('libp2p.upgrader', () => {
|
||||
@@ -495,10 +535,12 @@ describe('libp2p.upgrader', () => {
|
||||
const connectionManagerDispatchEventSpy = sinon.spy(libp2p.components.getConnectionManager(), 'dispatchEvent')
|
||||
|
||||
// Upgrade and check the connect event
|
||||
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
|
||||
const connections = await Promise.all([
|
||||
libp2p.components.getUpgrader().upgradeOutbound(outbound),
|
||||
remoteLibp2p.components.getUpgrader().upgradeInbound(inbound)
|
||||
])
|
||||
await connectionPromise
|
||||
expect(connectionManagerDispatchEventSpy.callCount).to.equal(1)
|
||||
|
||||
let [event] = connectionManagerDispatchEventSpy.getCall(0).args
|
||||
|
Reference in New Issue
Block a user