Compare commits

..

1 Commits

Author SHA1 Message Date
achingbrain
c7e923a812 fix: remove peer routing search-for-self
The peer routing module starts a recurring process that searches for
peers close to our peer id.

This makes the DHT module query the network for peers.  Thing is the
DHT module is already doing this because periodically searching for
peers close to us is in the DHT spec so this ends up making redundant
queries.

This PR removes the recurring task configured by the peer routing module.
2021-12-06 19:15:20 +00:00
41 changed files with 252 additions and 810 deletions

View File

@@ -19,7 +19,7 @@ jobs:
- run: npx aegir lint
- run: npx aegir build
- run: npx aegir dep-check
- uses: ipfs/aegir/actions/bundle-size@master
- uses: ipfs/aegir/actions/bundle-size@v32.1.0
name: size
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
@@ -37,7 +37,7 @@ jobs:
with:
node-version: ${{ matrix.node }}
- run: npm install
- run: npm run test:node -- --cov --bail
- run: npx aegir test -t node --cov --bail
- uses: codecov/codecov-action@v1
test-chrome:
needs: check
@@ -48,7 +48,7 @@ jobs:
with:
node-version: lts/*
- run: npm install
- run: npm run test:browser -- -t browser -t webworker --bail
- run: npx aegir test -t browser -t webworker --bail
test-firefox:
needs: check
runs-on: ubuntu-latest
@@ -58,7 +58,7 @@ jobs:
with:
node-version: lts/*
- run: npm install
- run: npm run test:browser -- -t browser -t webworker --bail -- --browser firefox
- run: npx aegir test -t browser -t webworker --bail -- --browser firefox
test-ts:
needs: check
runs-on: ubuntu-latest

View File

@@ -1,96 +1,3 @@
## [0.35.8](https://github.com/libp2p/js-libp2p/compare/v0.35.7...v0.35.8) (2021-12-29)
### Bug Fixes
* do not wait for autodial start ([#1089](https://github.com/libp2p/js-libp2p/issues/1089)) ([79b3cfc](https://github.com/libp2p/js-libp2p/commit/79b3cfc6ad02ecc76fe23a3c3ff2d0b32a0ae4a8))
* increase listeners on any-signal ([#1084](https://github.com/libp2p/js-libp2p/issues/1084)) ([f18fc80](https://github.com/libp2p/js-libp2p/commit/f18fc80b70bf7b6b26fffa70b0a8d0502a6c4801))
* look for final peer event instead of peer response ([#1092](https://github.com/libp2p/js-libp2p/issues/1092)) ([d2b7ec0](https://github.com/libp2p/js-libp2p/commit/d2b7ec0f6be0ee80f2c963279a8ec2385059a889))
* record tracked map clears ([#1085](https://github.com/libp2p/js-libp2p/issues/1085)) ([b4b4324](https://github.com/libp2p/js-libp2p/commit/b4b432406ebc08ef2fc3a1922c64cde7c9060cae))
## [0.35.7](https://github.com/libp2p/js-libp2p/compare/v0.35.2...v0.35.7) (2021-12-24)
### Bug Fixes
* add tracked map ([#1069](https://github.com/libp2p/js-libp2p/issues/1069)) ([b425fa1](https://github.com/libp2p/js-libp2p/commit/b425fa12304def2a007d43a0aa445c28b766ed02))
* clean up pending dial targets ([#1059](https://github.com/libp2p/js-libp2p/issues/1059)) ([bdc9f16](https://github.com/libp2p/js-libp2p/commit/bdc9f16d0cbe56ccf26822f11068e7795bcef046))
* fix uncaught promise rejection when finding peers ([#1044](https://github.com/libp2p/js-libp2p/issues/1044)) ([3b683e7](https://github.com/libp2p/js-libp2p/commit/3b683e715686163e229b7b5c3a892327dfd4fc63))
* increase the maxlisteners for timeout controllers ([#1065](https://github.com/libp2p/js-libp2p/issues/1065)) ([09a0f94](https://github.com/libp2p/js-libp2p/commit/09a0f940df7fdb4ece34604e85693709df5c213e))
* main ci ([#1079](https://github.com/libp2p/js-libp2p/issues/1079)) ([d1c48dc](https://github.com/libp2p/js-libp2p/commit/d1c48dcbeded828f2dd3044cc9aed3f17f02846d))
* make error codes consistent ([#1054](https://github.com/libp2p/js-libp2p/issues/1054)) ([b25e0fe](https://github.com/libp2p/js-libp2p/commit/b25e0fe5312db58a06c39500ae84c50fed3a93bd))
* type definitions for big dialrequest and persistent peerstore ([#1078](https://github.com/libp2p/js-libp2p/issues/1078)) ([cb0d7d6](https://github.com/libp2p/js-libp2p/commit/cb0d7d6c99d179498f04e76df76e70e4f7d41c4c))
### Features
* allow per-component metrics to be collected ([#1061](https://github.com/libp2p/js-libp2p/issues/1061)) ([2f0b311](https://github.com/libp2p/js-libp2p/commit/2f0b311df7127aa44512c2008142d4ca30268986)), closes [#1060](https://github.com/libp2p/js-libp2p/issues/1060)
## [0.35.6](https://github.com/libp2p/js-libp2p/compare/v0.35.5...v0.35.6) (2021-12-18)
### Bug Fixes
* increase the maxlisteners for timeout controllers ([#1065](https://github.com/libp2p/js-libp2p/issues/1065)) ([09a0f94](https://github.com/libp2p/js-libp2p/commit/09a0f940df7fdb4ece34604e85693709df5c213e))
## [0.35.5](https://github.com/libp2p/js-libp2p/compare/v0.35.4...v0.35.5) (2021-12-15)
## [0.35.4](https://github.com/libp2p/js-libp2p/compare/v0.35.3...v0.35.4) (2021-12-15)
### Features
* allow per-component metrics to be collected ([#1061](https://github.com/libp2p/js-libp2p/issues/1061)) ([2f0b311](https://github.com/libp2p/js-libp2p/commit/2f0b311df7127aa44512c2008142d4ca30268986)), closes [#1060](https://github.com/libp2p/js-libp2p/issues/1060)
## [0.35.3](https://github.com/libp2p/js-libp2p/compare/v0.35.2...v0.35.3) (2021-12-13)
### Bug Fixes
* clean up pending dial targets ([#1059](https://github.com/libp2p/js-libp2p/issues/1059)) ([bdc9f16](https://github.com/libp2p/js-libp2p/commit/bdc9f16d0cbe56ccf26822f11068e7795bcef046))
* fix uncaught promise rejection when finding peers ([#1044](https://github.com/libp2p/js-libp2p/issues/1044)) ([3b683e7](https://github.com/libp2p/js-libp2p/commit/3b683e715686163e229b7b5c3a892327dfd4fc63))
* make error codes consistent ([#1054](https://github.com/libp2p/js-libp2p/issues/1054)) ([b25e0fe](https://github.com/libp2p/js-libp2p/commit/b25e0fe5312db58a06c39500ae84c50fed3a93bd))
## [0.35.2](https://github.com/libp2p/js-libp2p/compare/v0.33.0...v0.35.2) (2021-12-06)
### Bug Fixes
* do not let closest peers run forever ([#1047](https://github.com/libp2p/js-libp2p/issues/1047)) ([91c2ec9](https://github.com/libp2p/js-libp2p/commit/91c2ec9856a3e972b7b2c9c4d9a4eda1d431c7ef))
* increase maxlisteners on event target ([#1050](https://github.com/libp2p/js-libp2p/issues/1050)) ([b70fb43](https://github.com/libp2p/js-libp2p/commit/b70fb43427b47df079b55929ec8956f69cbda966)), closes [#900](https://github.com/libp2p/js-libp2p/issues/900)
* private ip ts compile has no call signatures ([#1020](https://github.com/libp2p/js-libp2p/issues/1020)) ([77d7cb8](https://github.com/libp2p/js-libp2p/commit/77d7cb8f0815f2cdd3bfdfa8b641a7a186fe9520))
* stop dht before connection manager ([#1041](https://github.com/libp2p/js-libp2p/issues/1041)) ([3a9d5f6](https://github.com/libp2p/js-libp2p/commit/3a9d5f64d96719ebb4d3b083c4f5832db4fa0816)), closes [#1039](https://github.com/libp2p/js-libp2p/issues/1039)
### chore
* update peer id and libp2p crypto ([#1042](https://github.com/libp2p/js-libp2p/issues/1042)) ([9cbf36f](https://github.com/libp2p/js-libp2p/commit/9cbf36fcb54099e6fed35ceccc4a2376f0926c1f))
### Features
* update dht ([#1009](https://github.com/libp2p/js-libp2p/issues/1009)) ([2f598eb](https://github.com/libp2p/js-libp2p/commit/2f598eba09cff4301474af08196158065e3602d8))
### BREAKING CHANGES
* requires node 15+
* libp2p-kad-dht has a new event-based API which is exposed as `_dht`
## [0.35.1](https://github.com/libp2p/js-libp2p/compare/v0.35.0...v0.35.1) (2021-12-03)

View File

@@ -23,8 +23,8 @@
<a href="https://david-dm.org/libp2p/js-libp2p"><img src="https://david-dm.org/libp2p/js-libp2p.svg?style=flat-square" /></a>
<a href="https://github.com/feross/standard"><img src="https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square"></a>
<a href="https://github.com/RichardLitt/standard-readme"><img src="https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/npm-%3E%3D7.0.0-orange.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/Node.js-%3E%3D15.0.0-orange.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/npm-%3E%3D6.0.0-orange.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/Node.js-%3E%3D12.0.0-orange.svg?style=flat-square" /></a>
<br>
</p>

View File

@@ -415,14 +415,7 @@ const node = await Libp2p.create({
contentRouting: [delegatedContentRouting],
peerRouting: [delegatedPeerRouting],
},
peerId,
peerRouting: { // Peer routing configuration
refreshManager: { // Refresh known and connected closest peers
enabled: true, // Should find the closest peers.
interval: 6e5, // Interval for getting the new for closest peers of 10min
bootDelay: 10e3 // Delay for the initial query for closest peers
}
}
peerId
})
```
@@ -497,9 +490,9 @@ const Libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const MPLEX = require('libp2p-mplex')
const { NOISE } = require('libp2p-noise')
const LevelDatastore = require('datastore-level')
const LevelStore = require('datastore-level')
const datastore = new LevelDatastore('path/to/store')
const datastore = new LevelStore('path/to/store')
await datastore.open()
const node = await Libp2p.create({
@@ -672,18 +665,18 @@ const Libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const MPLEX = require('libp2p-mplex')
const { NOISE } = require('libp2p-noise')
const LevelDatastore = require('datastore-level')
const LevelStore = require('datastore-level')
const datastore = new LevelDatastore('path/to/store')
await datastore.open() // level database must be ready before node boot
const datastore = new LevelStore('path/to/store')
const dsInstant = await datastore.open()
const node = await Libp2p.create({
datastore, // pass the opened datastore
modules: {
transport: [TCP],
streamMuxer: [MPLEX],
connEncryption: [NOISE]
},
datastore: dsInstant,
peerStore: {
persistence: true,
threshold: 5
@@ -806,8 +799,8 @@ protocols: [
As libp2p is designed to be a modular networking library, its usage will vary based on individual project needs. We've included links to some existing project configurations for your reference, in case you wish to replicate their configuration:
- [libp2p-ipfs-nodejs](https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs-core-config/src/libp2p.js) - libp2p configuration used by js-ipfs when running in Node.js
- [libp2p-ipfs-browser](https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs-core-config/src/libp2p.browser.js) - libp2p configuration used by js-ipfs when running in a Browser (that supports WebRTC)
- [libp2p-ipfs-nodejs](https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs/src/core/runtime/libp2p-nodejs.js) - libp2p configuration used by js-ipfs when running in Node.js
- [libp2p-ipfs-browser](https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs/src/core/runtime/libp2p-browser.js) - libp2p configuration used by js-ipfs when running in a Browser (that supports WebRTC)
If you have developed a project using `js-libp2p`, please consider submitting your configuration to this list so that it can be found easily by other users.

View File

@@ -41,13 +41,13 @@ const createNode = async () => {
node1.pubsub.on(topic, (msg) => {
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
})
node1.pubsub.subscribe(topic)
await node1.pubsub.subscribe(topic)
// Will not receive own published messages by default
node2.pubsub.on(topic, (msg) => {
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
})
node2.pubsub.subscribe(topic)
await node2.pubsub.subscribe(topic)
// node2 publishes "news" every second
setInterval(() => {

View File

@@ -1,6 +1,6 @@
{
"name": "libp2p",
"version": "0.35.8",
"version": "0.35.1",
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js",
@@ -69,7 +69,7 @@
"node": ">=15.0.0"
},
"browser": {
"nat-api": false
"@motrix/nat-api": false
},
"eslintConfig": {
"extends": "ipfs",
@@ -80,8 +80,8 @@
]
},
"dependencies": {
"@motrix/nat-api": "^0.3.1",
"@vascosantos/moving-average": "^1.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"aggregate-error": "^3.1.0",
"any-signal": "^2.1.1",
@@ -113,7 +113,6 @@
"multiformats": "^9.0.0",
"multistream-select": "^2.0.0",
"mutable-proxy": "^1.0.0",
"nat-api": "^0.3.1",
"node-forge": "^0.10.0",
"p-any": "^3.0.0",
"p-fifo": "^1.0.0",
@@ -133,10 +132,9 @@
"xsalsa20": "^1.1.0"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^5.0.0",
"@chainsafe/libp2p-noise": "^4.0.0",
"@nodeutils/defaults-deep": "^1.1.0",
"@types/es6-promisify": "^6.0.0",
"@types/node": "^16.0.1",
"@types/node-forge": "^0.10.1",
"@types/varint": "^6.0.0",
"aegir": "^36.0.0",
@@ -151,9 +149,9 @@
"libp2p": ".",
"libp2p-bootstrap": "^0.14.0",
"libp2p-delegated-content-routing": "^0.11.0",
"libp2p-delegated-peer-routing": "^0.11.1",
"libp2p-floodsub": "^0.28.0",
"libp2p-gossipsub": "^0.12.1",
"libp2p-delegated-peer-routing": "^0.11.0",
"libp2p-floodsub": "^0.27.0",
"libp2p-gossipsub": "^0.11.0",
"libp2p-interfaces-compliance-tests": "^2.0.1",
"libp2p-interop": "^0.5.0",
"libp2p-kad-dht": "^0.27.1",
@@ -182,34 +180,33 @@
"Maciej Krüger <mkg20001@gmail.com>",
"Hugo Dias <mail@hugodias.me>",
"dirkmc <dirkmdev@gmail.com>",
"Volker Mische <volker.mische@gmail.com>",
"Chris Dostert <chrisdostert@users.noreply.github.com>",
"Volker Mische <volker.mische@gmail.com>",
"zeim839 <50573884+zeim839@users.noreply.github.com>",
"Robert Kiel <robert.kiel@hoprnet.org>",
"Richard Littauer <richard.littauer@gmail.com>",
"a1300 <matthias-knopp@gmx.net>",
"Ryan Bell <ryan@piing.net>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com>",
"Andrew Nesbitt <andrewnez@gmail.com>",
"Franck Royer <franck@royer.one>",
"Thomas Eizinger <thomas@eizinger.io>",
"Vít Habada <vithabada93@gmail.com>",
"Giovanni T. Parra <fiatjaf@gmail.com>",
"acolytec3 <17355484+acolytec3@users.noreply.github.com>",
"Alan Smithee <ggnore.alan.smithee@gmail.com>",
"Franck Royer <franck@royer.one>",
"Elven <mon.samuel@qq.com>",
"Robert Kiel <robert.kiel@hoprnet.org>",
"Andrew Nesbitt <andrewnez@gmail.com>",
"Samlior <samlior@foxmail.com>",
"Thomas Eizinger <thomas@eizinger.io>",
"Didrik Nordström <didrik.nordstrom@gmail.com>",
"Aditya Bose <13054902+adbose@users.noreply.github.com>",
"Smite Chow <xiaopengyou@live.com>",
"Soeren <nikorpoulsen@gmail.com>",
"Sönke Hahn <soenkehahn@gmail.com>",
"TJKoury <TJKoury@gmail.com>",
"TheStarBoys <41286328+TheStarBoys@users.noreply.github.com>",
"Tiago Alves <alvesjtiago@gmail.com>",
"Tim Daubenschütz <tim@daubenschuetz.de>",
"XiaoZhang <zxinmyth@gmail.com>",
"Yusef Napora <yusef@napora.org>",
"Zane Starr <zcstarr@gmail.com>",
"ebinks <elizabethjbinks@gmail.com>",
"greenSnot <greenSnot@users.noreply.github.com>",
"Aditya Bose <13054902+adbose@users.noreply.github.com>",
"isan_rivkin <isanrivkin@gmail.com>",
"mayerwin <mayerwin@users.noreply.github.com>",
"mcclure <andi.m.mcclure@gmail.com>",
@@ -218,8 +215,8 @@
"robertkiel <robert.kiel@validitylabs.org>",
"shresthagrawal <34920931+shresthagrawal@users.noreply.github.com>",
"swedneck <40505480+swedneck@users.noreply.github.com>",
"tuyennhv <vutuyen2636@gmail.com>",
"Sönke Hahn <soenkehahn@gmail.com>",
"greenSnot <greenSnot@users.noreply.github.com>",
"Alan Smithee <ggnore.alan.smithee@gmail.com>",
"Aleksei <vozhdb@gmail.com>",
"Bernd Strehl <bernd.strehl@gmail.com>",
"Chris Bratlien <chrisbratlien@gmail.com>",
@@ -244,13 +241,10 @@
"Lars Gierth <lgierth@users.noreply.github.com>",
"Leask Wong <i@leaskh.com>",
"Marcin Tojek <mtojek@users.noreply.github.com>",
"Marston Connell <34043723+TheMarstonConnell@users.noreply.github.com>",
"Michael Burns <5170+mburns@users.noreply.github.com>",
"Miguel Mota <miguelmota2@gmail.com>",
"Nuno Nogueira <nunofmn@gmail.com>",
"Philipp Muens <raute1337@gmx.de>",
"RasmusErik Voel Jensen <github@solsort.com>",
"Smite Chow <xiaopengyou@live.com>",
"Soeren <nikorpoulsen@gmail.com>"
"RasmusErik Voel Jensen <github@solsort.com>"
]
}

View File

@@ -4,7 +4,7 @@ const debug = require('debug')
const log = Object.assign(debug('libp2p:relay'), {
error: debug('libp2p:relay:err')
})
const { codes } = require('./../errors')
const {
setDelayedInterval,
clearDelayedInterval
@@ -88,7 +88,7 @@ class Relay {
const cid = await namespaceToCid(RELAY_RENDEZVOUS_NS)
await this._libp2p.contentRouting.provide(cid)
} catch (/** @type {any} */ err) {
if (err.code === codes.ERR_NO_ROUTERS_AVAILABLE) {
if (err.code === 'NO_ROUTERS_AVAILABLE') {
log.error('a content router, such as a DHT, must be provided in order to advertise the relay service', err)
// Stop the advertise
this.stop()

View File

@@ -49,13 +49,6 @@ const DefaultConfig = {
persistence: false,
threshold: 5
},
peerRouting: {
refreshManager: {
enabled: true,
interval: 6e5,
bootDelay: 10e3
}
},
config: {
protocolPrefix: 'ipfs',
dht: {

View File

@@ -57,9 +57,7 @@ class AutoDialler {
}
this._running = true
this._autoDial().catch(err => {
log.error('could start autodial', err)
})
this._autoDial()
log('started')
}

View File

@@ -12,7 +12,7 @@ const LatencyMonitor = require('./latency-monitor')
const retimer = require('retimer')
const { EventEmitter } = require('events')
const trackedMap = require('../metrics/tracked-map')
const PeerId = require('peer-id')
const {
@@ -32,10 +32,6 @@ const defaultOptions = {
defaultPeerValue: 1
}
const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections'
const METRICS_PEER_VALUES = 'peer-values'
/**
* @typedef {import('../')} Libp2p
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
@@ -87,14 +83,14 @@ class ConnectionManager extends EventEmitter {
*
* @type {Map<string, number>}
*/
this._peerValues = trackedMap(METRICS_COMPONENT, METRICS_PEER_VALUES, this._libp2p.metrics)
this._peerValues = new Map()
/**
* Map of connections per peer
*
* @type {Map<string, Connection[]>}
*/
this.connections = trackedMap(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this._libp2p.metrics)
this.connections = new Map()
this._started = false
this._timer = null
@@ -215,7 +211,6 @@ class ConnectionManager extends EventEmitter {
const storedConn = this.connections.get(peerIdStr)
this.emit('peer:connect', connection)
if (storedConn) {
storedConn.push(connection)
} else {
@@ -248,8 +243,6 @@ class ConnectionManager extends EventEmitter {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer)
}
}

View File

@@ -54,7 +54,7 @@ class ContentRouting {
*/
async * findProviders (key, options = {}) {
if (!this.routers.length) {
throw errCode(new Error('No content this.routers available'), codes.ERR_NO_ROUTERS_AVAILABLE)
throw errCode(new Error('No content this.routers available'), 'NO_ROUTERS_AVAILABLE')
}
yield * pipe(
@@ -77,7 +77,7 @@ class ContentRouting {
*/
async provide (key) {
if (!this.routers.length) {
throw errCode(new Error('No content routers available'), codes.ERR_NO_ROUTERS_AVAILABLE)
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
}
await Promise.all(this.routers.map((router) => router.provide(key)))

View File

@@ -5,9 +5,6 @@ const { anySignal } = require('any-signal')
// @ts-ignore p-fifo does not export types
const FIFO = require('p-fifo')
const pAny = require('p-any')
// @ts-expect-error setMaxListeners is missing from the types
const { setMaxListeners } = require('events')
const { codes } = require('../errors')
/**
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
@@ -56,17 +53,12 @@ class DialRequest {
const tokens = this.dialer.getTokens(this.addrs.length)
// If no tokens are available, throw
if (tokens.length < 1) {
throw errCode(new Error('No dial tokens available'), codes.ERR_NO_DIAL_TOKENS)
throw errCode(new Error('No dial tokens available'), 'ERR_NO_DIAL_TOKENS')
}
const tokenHolder = new FIFO()
tokens.forEach(token => tokenHolder.push(token))
const dialAbortControllers = this.addrs.map(() => {
const controller = new AbortController()
setMaxListeners && setMaxListeners(Infinity, controller.signal)
return controller
})
const dialAbortControllers = this.addrs.map(() => new AbortController())
let completedDials = 0
try {

View File

@@ -9,12 +9,11 @@ const { Multiaddr } = require('multiaddr')
const { TimeoutController } = require('timeout-abort-controller')
const { AbortError } = require('abortable-iterator')
const { anySignal } = require('any-signal')
// @ts-expect-error setMaxListeners is missing from the types
const { setMaxListeners } = require('events')
const DialRequest = require('./dial-request')
const { publicAddressesFirst } = require('libp2p-utils/src/address-sort')
const getPeer = require('../get-peer')
const trackedMap = require('../metrics/tracked-map')
const { codes } = require('../errors')
const {
DIAL_TIMEOUT,
@@ -23,10 +22,6 @@ const {
MAX_ADDRS_TO_DIAL
} = require('../constants')
const METRICS_COMPONENT = 'dialler'
const METRICS_PENDING_DIALS = 'pending-dials'
const METRICS_PENDING_DIAL_TARGETS = 'pending-dial-targets'
/**
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('peer-id')} PeerId
@@ -49,15 +44,14 @@ const METRICS_PENDING_DIAL_TARGETS = 'pending-dial-targets'
* @property {number} [maxDialsPerPeer = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer.
* @property {number} [dialTimeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take.
* @property {Record<string, Resolver>} [resolvers = {}] - multiaddr resolvers to use when dialing
* @property {import('../metrics')} [metrics]
*
* @typedef DialTarget
* @property {string} id
* @property {Multiaddr[]} addrs
*
* @typedef PendingDial
* @property {import('./dial-request')} dialRequest
* @property {import('timeout-abort-controller').TimeoutController} controller
* @property {DialRequest} dialRequest
* @property {TimeoutController} controller
* @property {Promise<Connection>} promise
* @property {function():void} destroy
*/
@@ -75,8 +69,7 @@ class Dialer {
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
dialTimeout = DIAL_TIMEOUT,
maxDialsPerPeer = MAX_PER_PEER_DIALS,
resolvers = {},
metrics
resolvers = {}
}) {
this.transportManager = transportManager
this.peerStore = peerStore
@@ -86,12 +79,8 @@ class Dialer {
this.timeout = dialTimeout
this.maxDialsPerPeer = maxDialsPerPeer
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
/** @type {Map<string, PendingDial>} */
this._pendingDials = trackedMap(METRICS_COMPONENT, METRICS_PENDING_DIALS, metrics)
/** @type {Map<string, { resolve: (value: any) => void, reject: (err: Error) => void}>} */
this._pendingDialTargets = trackedMap(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, metrics)
this._pendingDials = new Map()
this._pendingDialTargets = new Map()
for (const [key, value] of Object.entries(resolvers)) {
Multiaddr.resolvers.set(key, value)
@@ -166,16 +155,14 @@ class Dialer {
this._pendingDialTargets.set(id, { resolve, reject })
})
try {
const dialTarget = await Promise.race([
this._createDialTarget(peer),
cancellablePromise
])
const dialTarget = await Promise.race([
this._createDialTarget(peer),
cancellablePromise
])
return dialTarget
} finally {
this._pendingDialTargets.delete(id)
}
this._pendingDialTargets.delete(id)
return dialTarget
}
/**
@@ -252,15 +239,10 @@ class Dialer {
// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)
// this signal will potentially be used while dialing lots of
// peers so prevent MaxListenersExceededWarning appearing in the console
setMaxListeners && setMaxListeners(Infinity, signal)
const pendingDial = {
dialRequest,
controller: timeoutController,
@@ -271,7 +253,6 @@ class Dialer {
}
}
this._pendingDials.set(dialTarget.id, pendingDial)
return pendingDial
}

View File

@@ -36,29 +36,5 @@ exports.codes = {
ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED',
ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL',
ERR_INVALID_MULTIADDR: 'ERR_INVALID_MULTIADDR',
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID',
ERR_FIND_SELF: 'ERR_FIND_SELF',
ERR_NO_ROUTERS_AVAILABLE: 'ERR_NO_ROUTERS_AVAILABLE',
ERR_CONNECTION_NOT_MULTIPLEXED: 'ERR_CONNECTION_NOT_MULTIPLEXED',
ERR_NO_DIAL_TOKENS: 'ERR_NO_DIAL_TOKENS',
ERR_KEYCHAIN_REQUIRED: 'ERR_KEYCHAIN_REQUIRED',
ERR_INVALID_CMS: 'ERR_INVALID_CMS',
ERR_MISSING_KEYS: 'ERR_MISSING_KEYS',
ERR_NO_KEY: 'ERR_NO_KEY',
ERR_INVALID_KEY_NAME: 'ERR_INVALID_KEY_NAME',
ERR_INVALID_KEY_TYPE: 'ERR_INVALID_KEY_TYPE',
ERR_KEY_ALREADY_EXISTS: 'ERR_KEY_ALREADY_EXISTS',
ERR_INVALID_KEY_SIZE: 'ERR_INVALID_KEY_SIZE',
ERR_KEY_NOT_FOUND: 'ERR_KEY_NOT_FOUND',
ERR_OLD_KEY_NAME_INVALID: 'ERR_OLD_KEY_NAME_INVALID',
ERR_NEW_KEY_NAME_INVALID: 'ERR_NEW_KEY_NAME_INVALID',
ERR_PASSWORD_REQUIRED: 'ERR_PASSWORD_REQUIRED',
ERR_PEM_REQUIRED: 'ERR_PEM_REQUIRED',
ERR_CANNOT_READ_KEY: 'ERR_CANNOT_READ_KEY',
ERR_MISSING_PRIVATE_KEY: 'ERR_MISSING_PRIVATE_KEY',
ERR_INVALID_OLD_PASS_TYPE: 'ERR_INVALID_OLD_PASS_TYPE',
ERR_INVALID_NEW_PASS_TYPE: 'ERR_INVALID_NEW_PASS_TYPE',
ERR_INVALID_PASS_LENGTH: 'ERR_INVALID_PASS_LENGTH',
ERR_NOT_IMPLEMENTED: 'ERR_NOT_IMPLEMENTED',
ERR_WRONG_PING_ACK: 'ERR_WRONG_PING_ACK'
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID'
}

View File

@@ -48,8 +48,6 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @typedef {import('libp2p-interfaces/src/pubsub').PubsubOptions} PubsubOptions
* @typedef {import('interface-datastore').Datastore} Datastore
* @typedef {import('./pnet')} Protector
* @typedef {Object} PersistentPeerStoreOptions
* @property {number} [threshold]
*/
/**
@@ -58,9 +56,16 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {MuxedStream} stream
* @property {string} protocol
*
* @typedef {Object} RandomWalkOptions
* @property {boolean} [enabled = false]
* @property {number} [queriesPerPeriod = 1]
* @property {number} [interval = 300e3]
* @property {number} [timeout = 10e3]
*
* @typedef {Object} DhtOptions
* @property {boolean} [enabled = false]
* @property {number} [kBucketSize = 20]
* @property {RandomWalkOptions} [randomWalk]
* @property {boolean} [clientMode]
* @property {import('libp2p-interfaces/src/types').DhtSelectors} [selectors]
* @property {import('libp2p-interfaces/src/types').DhtValidators} [validators]
@@ -112,7 +117,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {KeychainOptions & import('./keychain/index').KeychainOptions} [keychain]
* @property {MetricsOptions & import('./metrics').MetricsOptions} [metrics]
* @property {import('./peer-routing').PeerRoutingOptions} [peerRouting]
* @property {PeerStoreOptions & PersistentPeerStoreOptions} [peerStore]
* @property {PeerStoreOptions & import('./peer-store/persistent').PersistentPeerStoreOptions} [peerStore]
* @property {import('./transport-manager').TransportManagerOptions} [transportManager]
* @property {Libp2pConfig} [config]
*
@@ -163,15 +168,6 @@ class Libp2p extends EventEmitter {
this.peerId = this._options.peerId
this.datastore = this._options.datastore
// Create Metrics
if (this._options.metrics.enabled) {
const metrics = new Metrics({
...this._options.metrics
})
this.metrics = metrics
}
this.peerStore = (this.datastore && this._options.peerStore.persistence)
? new PersistentPeerStore({
peerId: this.peerId,
@@ -206,6 +202,14 @@ class Libp2p extends EventEmitter {
autoDialInterval: this._options.connectionManager.autoDialInterval
})
// Create Metrics
if (this._options.metrics.enabled) {
this.metrics = new Metrics({
...this._options.metrics,
connectionManager: this.connectionManager
})
}
// Create keychain
if (this._options.keychain && this._options.keychain.datastore) {
log('creating keychain')
@@ -265,7 +269,6 @@ class Libp2p extends EventEmitter {
this.dialer = new Dialer({
transportManager: this.transportManager,
peerStore: this.peerStore,
metrics: this.metrics,
...this._options.dialer
})
@@ -381,7 +384,6 @@ class Libp2p extends EventEmitter {
this._isStarted = false
this.relay && this.relay.stop()
this.peerRouting.stop()
this._autodialler.stop()
await (this._dht && this._dht.stop())
@@ -660,8 +662,6 @@ class Libp2p extends EventEmitter {
// Relay
this.relay && this.relay.start()
this.peerRouting.start()
}
/**

View File

@@ -10,7 +10,6 @@ const { certificateForKey, findAsync } = require('./util')
const errcode = require('err-code')
const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const { codes } = require('../errors')
const privates = new WeakMap()
@@ -32,7 +31,7 @@ class CMS {
*/
constructor (keychain, dek) {
if (!keychain) {
throw errcode(new Error('keychain is required'), codes.ERR_KEYCHAIN_REQUIRED)
throw errcode(new Error('keychain is required'), 'ERR_KEYCHAIN_REQUIRED')
}
this.keychain = keychain
@@ -50,7 +49,7 @@ class CMS {
*/
async encrypt (name, plain) {
if (!(plain instanceof Uint8Array)) {
throw errcode(new Error('Plain data must be a Uint8Array'), codes.ERR_INVALID_PARAMETERS)
throw errcode(new Error('Plain data must be a Uint8Array'), 'ERR_INVALID_PARAMS')
}
const key = await this.keychain.findKeyByName(name)
@@ -82,7 +81,7 @@ class CMS {
*/
async decrypt (cmsData) {
if (!(cmsData instanceof Uint8Array)) {
throw errcode(new Error('CMS data is required'), codes.ERR_INVALID_PARAMETERS)
throw errcode(new Error('CMS data is required'), 'ERR_INVALID_PARAMS')
}
let cms
@@ -92,7 +91,7 @@ class CMS {
// @ts-ignore not defined
cms = forge.pkcs7.messageFromAsn1(obj)
} catch (/** @type {any} */ err) {
throw errcode(new Error('Invalid CMS: ' + err.message), codes.ERR_INVALID_CMS)
throw errcode(new Error('Invalid CMS: ' + err.message), 'ERR_INVALID_CMS')
}
// Find a recipient whose key we hold. We only deal with recipient certs
@@ -124,7 +123,7 @@ class CMS {
if (!r) {
// @ts-ignore cms types not defined
const missingKeys = recipients.map(r => r.keyId)
throw errcode(new Error('Decryption needs one of the key(s): ' + missingKeys.join(', ')), codes.ERR_MISSING_KEYS, {
throw errcode(new Error('Decryption needs one of the key(s): ' + missingKeys.join(', ')), 'ERR_MISSING_KEYS', {
missingKeys
})
}
@@ -132,7 +131,7 @@ class CMS {
const key = await this.keychain.findKeyById(r.keyId)
if (!key) {
throw errcode(new Error('No key available to decrypto'), codes.ERR_NO_KEY)
throw errcode(new Error('No key available to decrypto'), 'ERR_NO_KEY')
}
const pem = await this.keychain._getPrivateKey(key.name)

View File

@@ -10,7 +10,6 @@ const crypto = require('libp2p-crypto')
const { Key } = require('interface-datastore/key')
const CMS = require('./cms')
const errcode = require('err-code')
const { codes } = require('../errors')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string')
@@ -211,21 +210,21 @@ class Keychain {
const self = this
if (!validateKeyName(name) || name === 'self') {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
if (typeof type !== 'string') {
return throwDelayed(errcode(new Error(`Invalid key type '${type}'`), codes.ERR_INVALID_KEY_TYPE))
return throwDelayed(errcode(new Error(`Invalid key type '${type}'`), 'ERR_INVALID_KEY_TYPE'))
}
const dsname = DsName(name)
const exists = await self.store.has(dsname)
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), codes.ERR_KEY_ALREADY_EXISTS))
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), 'ERR_KEY_ALREADY_EXISTS'))
switch (type.toLowerCase()) {
case 'rsa':
if (!Number.isSafeInteger(size) || size < 2048) {
return throwDelayed(errcode(new Error(`Invalid RSA key size ${size}`), codes.ERR_INVALID_KEY_SIZE))
return throwDelayed(errcode(new Error(`Invalid RSA key size ${size}`), 'ERR_INVALID_KEY_SIZE'))
}
break
default:
@@ -298,7 +297,7 @@ class Keychain {
*/
async findKeyByName (name) {
if (!validateKeyName(name)) {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
const dsname = DsInfoName(name)
@@ -306,7 +305,7 @@ class Keychain {
const res = await this.store.get(dsname)
return JSON.parse(uint8ArrayToString(res))
} catch (/** @type {any} */ err) {
return throwDelayed(errcode(new Error(`Key '${name}' does not exist. ${err.message}`), codes.ERR_KEY_NOT_FOUND))
return throwDelayed(errcode(new Error(`Key '${name}' does not exist. ${err.message}`), 'ERR_KEY_NOT_FOUND'))
}
}
@@ -319,7 +318,7 @@ class Keychain {
async removeKey (name) {
const self = this
if (!validateKeyName(name) || name === 'self') {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
const dsname = DsName(name)
const keyInfo = await self.findKeyByName(name)
@@ -340,10 +339,10 @@ class Keychain {
async renameKey (oldName, newName) {
const self = this
if (!validateKeyName(oldName) || oldName === 'self') {
return throwDelayed(errcode(new Error(`Invalid old key name '${oldName}'`), codes.ERR_OLD_KEY_NAME_INVALID))
return throwDelayed(errcode(new Error(`Invalid old key name '${oldName}'`), 'ERR_OLD_KEY_NAME_INVALID'))
}
if (!validateKeyName(newName) || newName === 'self') {
return throwDelayed(errcode(new Error(`Invalid new key name '${newName}'`), codes.ERR_NEW_KEY_NAME_INVALID))
return throwDelayed(errcode(new Error(`Invalid new key name '${newName}'`), 'ERR_NEW_KEY_NAME_INVALID'))
}
const oldDsname = DsName(oldName)
const newDsname = DsName(newName)
@@ -351,7 +350,7 @@ class Keychain {
const newInfoName = DsInfoName(newName)
const exists = await self.store.has(newDsname)
if (exists) return throwDelayed(errcode(new Error(`Key '${newName}' already exists`), codes.ERR_KEY_ALREADY_EXISTS))
if (exists) return throwDelayed(errcode(new Error(`Key '${newName}' already exists`), 'ERR_KEY_ALREADY_EXISTS'))
try {
const pem = await self.store.get(oldDsname)
@@ -380,10 +379,10 @@ class Keychain {
*/
async exportKey (name, password) {
if (!validateKeyName(name)) {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
if (!password) {
return throwDelayed(errcode(new Error('Password is required'), codes.ERR_PASSWORD_REQUIRED))
return throwDelayed(errcode(new Error('Password is required'), 'ERR_PASSWORD_REQUIRED'))
}
const dsname = DsName(name)
@@ -410,20 +409,20 @@ class Keychain {
async importKey (name, pem, password) {
const self = this
if (!validateKeyName(name) || name === 'self') {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
if (!pem) {
return throwDelayed(errcode(new Error('PEM encoded key is required'), codes.ERR_PEM_REQUIRED))
return throwDelayed(errcode(new Error('PEM encoded key is required'), 'ERR_PEM_REQUIRED'))
}
const dsname = DsName(name)
const exists = await self.store.has(dsname)
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), codes.ERR_KEY_ALREADY_EXISTS))
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), 'ERR_KEY_ALREADY_EXISTS'))
let privateKey
try {
privateKey = await crypto.keys.import(pem, password)
} catch (/** @type {any} */ err) {
return throwDelayed(errcode(new Error('Cannot read the key, most likely the password is wrong'), codes.ERR_CANNOT_READ_KEY))
return throwDelayed(errcode(new Error('Cannot read the key, most likely the password is wrong'), 'ERR_CANNOT_READ_KEY'))
}
let kid
@@ -458,16 +457,16 @@ class Keychain {
async importPeer (name, peer) {
const self = this
if (!validateKeyName(name)) {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
if (!peer || !peer.privKey) {
return throwDelayed(errcode(new Error('Peer.privKey is required'), codes.ERR_MISSING_PRIVATE_KEY))
return throwDelayed(errcode(new Error('Peer.privKey is required'), 'ERR_MISSING_PRIVATE_KEY'))
}
const privateKey = peer.privKey
const dsname = DsName(name)
const exists = await self.store.has(dsname)
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), codes.ERR_KEY_ALREADY_EXISTS))
if (exists) return throwDelayed(errcode(new Error(`Key '${name}' already exists`), 'ERR_KEY_ALREADY_EXISTS'))
try {
const kid = await privateKey.id()
@@ -496,7 +495,7 @@ class Keychain {
*/
async _getPrivateKey (name) {
if (!validateKeyName(name)) {
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), codes.ERR_INVALID_KEY_NAME))
return throwDelayed(errcode(new Error(`Invalid key name '${name}'`), 'ERR_INVALID_KEY_NAME'))
}
try {
@@ -504,7 +503,7 @@ class Keychain {
const res = await this.store.get(dsname)
return uint8ArrayToString(res)
} catch (/** @type {any} */ err) {
return throwDelayed(errcode(new Error(`Key '${name}' does not exist. ${err.message}`), codes.ERR_KEY_NOT_FOUND))
return throwDelayed(errcode(new Error(`Key '${name}' does not exist. ${err.message}`), 'ERR_KEY_NOT_FOUND'))
}
}
@@ -516,13 +515,13 @@ class Keychain {
*/
async rotateKeychainPass (oldPass, newPass) {
if (typeof oldPass !== 'string') {
return throwDelayed(errcode(new Error(`Invalid old pass type '${typeof oldPass}'`), codes.ERR_INVALID_OLD_PASS_TYPE))
return throwDelayed(errcode(new Error(`Invalid old pass type '${typeof oldPass}'`), 'ERR_INVALID_OLD_PASS_TYPE'))
}
if (typeof newPass !== 'string') {
return throwDelayed(errcode(new Error(`Invalid new pass type '${typeof newPass}'`), codes.ERR_INVALID_NEW_PASS_TYPE))
return throwDelayed(errcode(new Error(`Invalid new pass type '${typeof newPass}'`), 'ERR_INVALID_NEW_PASS_TYPE'))
}
if (newPass.length < 20) {
return throwDelayed(errcode(new Error(`Invalid pass length ${newPass.length}`), codes.ERR_INVALID_PASS_LENGTH))
return throwDelayed(errcode(new Error(`Invalid pass length ${newPass.length}`), 'ERR_INVALID_PASS_LENGTH'))
}
log('recreating keychain')
const oldDek = privates.get(this).dek

View File

@@ -24,6 +24,9 @@ const directionToEvent = {
*/
/**
* @typedef MetricsProperties
* @property {import('../connection-manager')} connectionManager
*
* @typedef MetricsOptions
* @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize]
* @property {number} [computeThrottleTimeout = defaultOptions.computeThrottleTimeout]
@@ -34,7 +37,7 @@ const directionToEvent = {
class Metrics {
/**
* @class
* @param {MetricsOptions} options
* @param {MetricsProperties & MetricsOptions} options
*/
constructor (options) {
this._options = mergeOptions(defaultOptions, options)
@@ -44,7 +47,10 @@ class Metrics {
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
this._running = false
this._onMessage = this._onMessage.bind(this)
this._componentMetrics = new Map()
this._connectionManager = options.connectionManager
this._connectionManager.on('peer:disconnect', (connection) => {
this.onPeerDisconnected(connection.remotePeer)
})
}
/**
@@ -88,22 +94,6 @@ class Metrics {
return Array.from(this._peerStats.keys())
}
/**
* @returns {Map}
*/
getComponentMetrics () {
return this._componentMetrics
}
updateComponentMetric (component, metric, value) {
if (!this._componentMetrics.has(component)) {
this._componentMetrics.set(component, new Map())
}
const map = this._componentMetrics.get(component)
map.set(metric, value)
}
/**
* Returns the `Stats` object for the given `PeerId` whether it
* is a live peer, or in the disconnected peer LRU cache.

View File

@@ -8,7 +8,6 @@ const retimer = require('retimer')
/**
* @typedef {import('@vascosantos/moving-average').IMovingAverage} IMovingAverage
* @typedef {import('bignumber.js').BigNumber} Big
*/
class Stats extends EventEmitter {

View File

@@ -1,68 +0,0 @@
'use strict'
/**
* @template K
* @template V
*/
class TrackedMap extends Map {
/**
* @param {string} component
* @param {string} name
* @param {import('.')} metrics
*/
constructor (component, name, metrics) {
super()
this._component = component
this._name = name
this._metrics = metrics
this._metrics.updateComponentMetric(this._component, this._name, this.size)
}
/**
* @param {K} key
* @param {V} value
*/
set (key, value) {
super.set(key, value)
this._metrics.updateComponentMetric(this._component, this._name, this.size)
return this
}
/**
* @param {K} key
*/
delete (key) {
const deleted = super.delete(key)
this._metrics.updateComponentMetric(this._component, this._name, this.size)
return deleted
}
clear () {
super.clear()
this._metrics.updateComponentMetric(this._component, this._name, this.size)
}
}
/**
* @template K
* @template V
* @param {string} component
* @param {string} name
* @param {import('.')} [metrics]
* @returns {Map<K, V>}
*/
module.exports = (component, name, metrics) => {
/** @type {Map<K, V>} */
let map
if (metrics) {
map = new TrackedMap(component, name, metrics)
} else {
map = new Map()
}
return map
}

View File

@@ -1,7 +1,7 @@
'use strict'
// @ts-ignore nat-api does not export types
const NatAPI = require('nat-api')
const NatAPI = require('@motrix/nat-api')
const debug = require('debug')
const { promisify } = require('es6-promisify')
const { Multiaddr } = require('multiaddr')

View File

@@ -1,11 +1,6 @@
'use strict'
const debug = require('debug')
const log = Object.assign(debug('libp2p:peer-routing'), {
error: debug('libp2p:peer-routing:err')
})
const errCode = require('err-code')
const errors = require('./errors')
const {
storeAddresses,
uniquePeers,
@@ -16,16 +11,8 @@ const { TimeoutController } = require('timeout-abort-controller')
const merge = require('it-merge')
const { pipe } = require('it-pipe')
const first = require('it-first')
const drain = require('it-drain')
const filter = require('it-filter')
const {
setDelayedInterval,
clearDelayedInterval
// @ts-ignore module with no types
} = require('set-delayed-interval')
const { DHTPeerRouting } = require('./dht/dht-peer-routing')
// @ts-expect-error setMaxListeners is missing from the types
const { setMaxListeners } = require('events')
/**
* @typedef {import('peer-id')} PeerId
@@ -34,14 +21,7 @@ const { setMaxListeners } = require('events')
*/
/**
* @typedef {Object} RefreshManagerOptions
* @property {boolean} [enabled = true] - Whether to enable the Refresh manager
* @property {number} [bootDelay = 6e5] - Boot delay to start the Refresh Manager (in ms)
* @property {number} [interval = 10e3] - Interval between each Refresh Manager run (in ms)
* @property {number} [timeout = 10e3] - How long to let each refresh run (in ms)
*
* @typedef {Object} PeerRoutingOptions
* @property {RefreshManagerOptions} [refreshManager]
*/
class PeerRouting {
@@ -59,42 +39,6 @@ class PeerRouting {
if (libp2p._dht && libp2p._config.dht.enabled) {
this._routers.push(new DHTPeerRouting(libp2p._dht))
}
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager
this._findClosestPeersTask = this._findClosestPeersTask.bind(this)
}
/**
* Start peer routing service.
*/
start () {
if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) {
return
}
this._timeoutId = setDelayedInterval(
this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay
)
}
/**
* Recurrent task to find closest peers and add their addresses to the Address Book.
*/
async _findClosestPeersTask () {
try {
// nb getClosestPeers adds the addresses to the address book
await drain(this.getClosestPeers(this._peerId.id, { timeout: this._refreshManagerOptions.timeout || 10e3 }))
} catch (/** @type {any} */ err) {
log.error(err)
}
}
/**
* Stop peer routing service.
*/
stop () {
clearDelayedInterval(this._timeoutId)
}
/**
@@ -107,24 +51,19 @@ class PeerRouting {
*/
async findPeer (id, options) { // eslint-disable-line require-await
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), errors.codes.ERR_NO_ROUTERS_AVAILABLE)
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}
if (id.toB58String() === this._peerId.toB58String()) {
throw errCode(new Error('Should not try to find self'), errors.codes.ERR_FIND_SELF)
throw errCode(new Error('Should not try to find self'), 'ERR_FIND_SELF')
}
const output = await pipe(
merge(
...this._routers.map(router => (async function * () {
try {
yield await router.findPeer(id, options)
} catch (err) {
log.error(err)
}
})())
...this._routers.map(router => [router.findPeer(id, options)])
),
(source) => filter(source, Boolean),
// @ts-ignore findPeer resolves a Promise
(source) => storeAddresses(source, this._peerStore),
(source) => first(source)
)
@@ -133,7 +72,7 @@ class PeerRouting {
return output
}
throw errCode(new Error(errors.messages.NOT_FOUND), errors.codes.ERR_NOT_FOUND)
throw errCode(new Error('not found'), 'NOT_FOUND')
}
/**
@@ -147,16 +86,11 @@ class PeerRouting {
*/
async * getClosestPeers (key, options = { timeout: 30e3 }) {
if (!this._routers.length) {
throw errCode(new Error('No peer routers available'), errors.codes.ERR_NO_ROUTERS_AVAILABLE)
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE')
}
if (options.timeout) {
const controller = new TimeoutController(options.timeout)
// this controller will potentially be used while dialing lots of
// peers so prevent MaxListenersExceededWarning appearing in the console
setMaxListeners && setMaxListeners(Infinity, controller.signal)
options.signal = controller.signal
options.signal = new TimeoutController(options.timeout).signal
}
yield * pipe(

View File

@@ -2,7 +2,10 @@
const errcode = require('err-code')
const PeerId = require('peer-id')
const { codes } = require('../errors')
const {
codes: { ERR_INVALID_PARAMETERS }
} = require('../errors')
/**
* @param {any} data
@@ -45,7 +48,7 @@ class Book {
* @param {any[]|any} data
*/
set (peerId, data) {
throw errcode(new Error('set must be implemented by the subclass'), codes.ERR_NOT_IMPLEMENTED)
throw errcode(new Error('set must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
/**
@@ -91,7 +94,7 @@ class Book {
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), codes.ERR_INVALID_PARAMETERS)
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const rec = this.data.get(peerId.toB58String())
@@ -108,7 +111,7 @@ class Book {
*/
delete (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), codes.ERR_INVALID_PARAMETERS)
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
if (!this.data.delete(peerId.toB58String())) {

View File

@@ -5,7 +5,7 @@ const log = Object.assign(debug('libp2p:ping'), {
error: debug('libp2p:ping:err')
})
const errCode = require('err-code')
const { codes } = require('../errors')
const crypto = require('libp2p-crypto')
const { pipe } = require('it-pipe')
// @ts-ignore it-buffer has no types exported
@@ -50,7 +50,7 @@ async function ping (node, peer) {
const end = Date.now()
if (!equals(data, result)) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
throw errCode(new Error('Received wrong ping ack'), 'ERR_WRONG_PING_ACK')
}
return end - start

View File

@@ -297,7 +297,7 @@ class Upgrader {
maConn.timeline.upgraded = Date.now()
const errConnectionNotMultiplexed = () => {
throw errCode(new Error('connection is not multiplexed'), codes.ERR_CONNECTION_NOT_MULTIPLEXED)
throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED')
}
// Create the connection

View File

@@ -36,14 +36,14 @@ describe('content-routing', () => {
throw new Error('.findProviders should return an error')
} catch (/** @type {any} */ err) {
expect(err).to.exist()
expect(err.code).to.equal('ERR_NO_ROUTERS_AVAILABLE')
expect(err.code).to.equal('NO_ROUTERS_AVAILABLE')
}
})
it('.provide should return an error', async () => {
await expect(node.contentRouting.provide('a cid'))
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_NO_ROUTERS_AVAILABLE')
.and.to.have.property('code', 'NO_ROUTERS_AVAILABLE')
})
})
@@ -87,11 +87,8 @@ describe('content-routing', () => {
sinon.stub(nodes[0]._dht, 'findProviders').callsFake(function * () {
deferred.resolve()
yield {
name: 'PROVIDER',
providers: [{
id: providerPeerId,
multiaddrs: []
}]
id: providerPeerId,
multiaddrs: []
}
})
@@ -364,12 +361,7 @@ describe('content-routing', () => {
}
sinon.stub(node._dht, 'findProviders').callsFake(async function * () {
yield {
name: 'PROVIDER',
providers: [
result1
]
}
yield result1
})
sinon.stub(delegate, 'findProviders').callsFake(async function * () {
yield result2
@@ -390,8 +382,7 @@ describe('content-routing', () => {
const dhtDeferred = pDefer()
const delegatedDeferred = pDefer()
sinon.stub(node._dht, 'provide').callsFake(async function * () {
yield
sinon.stub(node._dht, 'provide').callsFake(() => {
dhtDeferred.resolve()
})
@@ -415,12 +406,7 @@ describe('content-routing', () => {
}]
sinon.stub(node._dht, 'findProviders').callsFake(function * () {
yield {
name: 'PROVIDER',
providers: [
results[0]
]
}
yield results[0]
})
sinon.stub(delegate, 'findProviders').callsFake(function * () { // eslint-disable-line require-yield

View File

@@ -38,13 +38,13 @@ describe('DHT subsystem is configurable', () => {
})
libp2p = await create(customOptions)
expect(libp2p._dht.isStarted()).to.equal(false)
expect(libp2p._dht.isStarted).to.equal(false)
await libp2p.start()
expect(libp2p._dht.isStarted()).to.equal(true)
expect(libp2p._dht.isStarted).to.equal(true)
await libp2p.stop()
expect(libp2p._dht.isStarted()).to.equal(false)
expect(libp2p._dht.isStarted).to.equal(false)
})
it('should not start if disabled once libp2p starts', async () => {
@@ -63,10 +63,10 @@ describe('DHT subsystem is configurable', () => {
})
libp2p = await create(customOptions)
expect(libp2p._dht.isStarted()).to.equal(false)
expect(libp2p._dht.isStarted).to.equal(false)
await libp2p.start()
expect(libp2p._dht.isStarted()).to.equal(false)
expect(libp2p._dht.isStarted).to.equal(false)
})
it('should allow a manual start', async () => {
@@ -86,9 +86,9 @@ describe('DHT subsystem is configurable', () => {
libp2p = await create(customOptions)
await libp2p.start()
expect(libp2p._dht.isStarted()).to.equal(false)
expect(libp2p._dht.isStarted).to.equal(false)
await libp2p._dht.start()
expect(libp2p._dht.isStarted()).to.equal(true)
expect(libp2p._dht.isStarted).to.equal(true)
})
})

View File

@@ -60,8 +60,8 @@ describe('DHT subsystem operates correctly', () => {
expect(connection).to.exist()
return Promise.all([
pWaitFor(() => libp2p._dht._lan._routingTable.size === 1),
pWaitFor(() => remoteLibp2p._dht._lan._routingTable.size === 1)
pWaitFor(() => libp2p._dht.routingTable.size === 1),
pWaitFor(() => remoteLibp2p._dht.routingTable.size === 1)
])
})
@@ -71,14 +71,14 @@ describe('DHT subsystem operates correctly', () => {
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
await Promise.all([
pWaitFor(() => libp2p._dht._lan._routingTable.size === 1),
pWaitFor(() => remoteLibp2p._dht._lan._routingTable.size === 1)
pWaitFor(() => libp2p._dht.routingTable.size === 1),
pWaitFor(() => remoteLibp2p._dht.routingTable.size === 1)
])
await libp2p.contentRouting.put(key, value)
const fetchedValue = await remoteLibp2p.contentRouting.get(key)
expect(fetchedValue).to.have.property('val').that.equalBytes(value)
expect(fetchedValue).to.eql(value)
})
})
@@ -119,13 +119,11 @@ describe('DHT subsystem operates correctly', () => {
const connection = await libp2p.dial(remAddr)
expect(connection).to.exist()
expect(libp2p._dht._lan._routingTable.size).to.be.eql(0)
expect(libp2p._dht.routingTable.size).to.be.eql(0)
expect(remoteLibp2p._dht.routingTable.size).to.be.eql(0)
await remoteLibp2p._dht.start()
// should be 0 directly after start - TODO this may be susceptible to timing bugs, we should have
// the ability to report stats on the DHT routing table instead of reaching into it's heart like this
expect(remoteLibp2p._dht._lan._routingTable.size).to.be.eql(0)
return pWaitFor(() => libp2p._dht._lan._routingTable.size === 1)
return pWaitFor(() => libp2p._dht.routingTable.size === 1)
})
it('should put on a peer and get from the other', async () => {
@@ -135,12 +133,12 @@ describe('DHT subsystem operates correctly', () => {
const value = uint8ArrayFromString('world')
await remoteLibp2p._dht.start()
await pWaitFor(() => libp2p._dht._lan._routingTable.size === 1)
await pWaitFor(() => libp2p._dht.routingTable.size === 1)
await libp2p.contentRouting.put(key, value)
const fetchedValue = await remoteLibp2p.contentRouting.get(key)
expect(fetchedValue).to.have.property('val').that.equalBytes(value)
expect(fetchedValue).to.eql(value)
})
})
})

View File

@@ -1,6 +1,7 @@
'use strict'
const KadDht = require('libp2p-kad-dht')
const { multicodec } = require('libp2p-kad-dht')
const Crypto = require('../../../src/insecure/plaintext')
const Muxer = require('libp2p-mplex')
const Transport = require('libp2p-tcp')
@@ -24,12 +25,13 @@ const subsystemOptions = mergeOptions(baseOptions, {
config: {
dht: {
kBucketSize: 20,
randomWalk: {
enabled: true
},
enabled: true
}
}
})
module.exports.subsystemOptions = subsystemOptions
module.exports.subsystemMulticodecs = [
'/ipfs/lan/kad/1.0.0'
]
module.exports.subsystemMulticodecs = [multicodec]

View File

@@ -13,6 +13,9 @@ const routingOptions = mergeOptions(baseOptions, {
config: {
dht: {
kBucketSize: 20,
randomWalk: {
enabled: true
},
enabled: true
}
}

View File

@@ -5,7 +5,6 @@ const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const AbortController = require('abort-controller')
const AggregateError = require('aggregate-error')
const pDefer = require('p-defer')
const delay = require('delay')

View File

@@ -296,7 +296,7 @@ describe('keychain', () => {
it('requires plain data as a Uint8Array', async () => {
const err = await ks.cms.encrypt(rsaKeyName, 'plain data').then(fail, err => err)
expect(err).to.exist()
expect(err).to.have.property('code', 'ERR_INVALID_PARAMETERS')
expect(err).to.have.property('code', 'ERR_INVALID_PARAMS')
})
it('encrypts', async () => {
@@ -308,7 +308,7 @@ describe('keychain', () => {
it('is a PKCS #7 message', async () => {
const err = await ks.cms.decrypt('not CMS').then(fail, err => err)
expect(err).to.exist()
expect(err).to.have.property('code', 'ERR_INVALID_PARAMETERS')
expect(err).to.have.property('code', 'ERR_INVALID_PARAMS')
})
it('is a PKCS #7 binary message', async () => {

View File

@@ -3,6 +3,9 @@
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const { EventEmitter } = require('events')
const { randomBytes } = require('libp2p-crypto')
const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe')
@@ -31,7 +34,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
})
metrics.trackStream({
@@ -66,7 +70,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
})
metrics.trackStream({
@@ -114,7 +119,8 @@ describe('Metrics', () => {
const [local2, remote2] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
})
const protocol = '/echo/1.0.0'
metrics.start()
@@ -169,7 +175,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000]
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
})
metrics.start()
@@ -224,7 +231,8 @@ describe('Metrics', () => {
}))
const metrics = new Metrics({
maxOldPeersRetention: 5 // Only keep track of 5
maxOldPeersRetention: 5, // Only keep track of 5
connectionManager: new EventEmitter()
})
// Clone so trackedPeers isn't modified
@@ -254,22 +262,4 @@ describe('Metrics', () => {
expect(spy).to.have.property('callCount', 1)
}
})
it('should allow components to track metrics', () => {
const metrics = new Metrics({
maxOldPeersRetention: 5 // Only keep track of 5
})
expect(metrics.getComponentMetrics()).to.be.empty()
const component = 'my-component'
const metric = 'some-metric'
const value = 1
metrics.updateComponentMetric(component, metric, value)
expect(metrics.getComponentMetrics()).to.have.lengthOf(1)
expect(metrics.getComponentMetrics().get(component)).to.have.lengthOf(1)
expect(metrics.getComponentMetrics().get(component).get(metric)).to.equal(value)
})
})

View File

@@ -244,10 +244,6 @@ describe('Nat Manager (TCP)', () => {
})
it('shuts the nat api down when stopping', async function () {
if (process.env.CI) {
return this.skip('CI environments will not let us map external ports')
}
function findRoutableAddress () {
const interfaces = networkInterfaces()
@@ -265,7 +261,7 @@ describe('Nat Manager (TCP)', () => {
if (!addr) {
// skip test if no non-loopback address is found
return this.skip()
this.skip()
}
const {

View File

@@ -161,13 +161,20 @@ describe('peer discovery scenarios', () => {
autoDial: false
},
dht: {
randomWalk: {
enabled: false,
delay: 1000, // start the first query quickly
interval: 10000,
timeout: 5000
},
enabled: true
}
}
})
const localConfig = getConfig(peerId)
// Only run random walk on our local node
localConfig.config.dht.randomWalk.enabled = true
libp2p = new Libp2p(localConfig)
const remoteLibp2p1 = new Libp2p(getConfig(remotePeerId1))

View File

@@ -6,9 +6,7 @@ const nock = require('nock')
const sinon = require('sinon')
const intoStream = require('into-stream')
const delay = require('delay')
const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const mergeOptions = require('merge-options')
const drain = require('it-drain')
const all = require('it-all')
@@ -36,7 +34,7 @@ describe('peer-routing', () => {
it('.findPeer should return an error', async () => {
await expect(node.peerRouting.findPeer('a cid'))
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_NO_ROUTERS_AVAILABLE')
.and.to.have.property('code', 'NO_ROUTERS_AVAILABLE')
})
it('.getClosestPeers should return an error', async () => {
@@ -45,7 +43,7 @@ describe('peer-routing', () => {
throw new Error('.getClosestPeers should return an error')
} catch (/** @type {any} */ err) {
expect(err).to.exist()
expect(err.code).to.equal('ERR_NO_ROUTERS_AVAILABLE')
expect(err.code).to.equal('NO_ROUTERS_AVAILABLE')
}
})
})
@@ -72,38 +70,33 @@ describe('peer-routing', () => {
after(() => Promise.all(nodes.map((n) => n.stop())))
it('should use the nodes dht', async () => {
sinon.stub(nodes[0]._dht, 'findPeer').callsFake(async function * () {
yield {
name: 'PEER_RESPONSE',
closer: [{
id: nodes[1].peerId,
multiaddrs: []
}]
}
it('should use the nodes dht', () => {
const deferred = pDefer()
sinon.stub(nodes[0]._dht, 'findPeer').callsFake(() => {
deferred.resolve()
return nodes[1].peerId
})
expect(nodes[0]._dht.findPeer.called).to.be.false()
await nodes[0].peerRouting.findPeer(nodes[1].peerId)
expect(nodes[0]._dht.findPeer.called).to.be.true()
nodes[0]._dht.findPeer.restore()
nodes[0].peerRouting.findPeer()
return deferred.promise
})
it('should use the nodes dht to get the closest peers', async () => {
sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(async function * () {
const deferred = pDefer()
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
sinon.stub(nodes[0]._dht, 'getClosestPeers').callsFake(function * () {
deferred.resolve()
yield {
name: 'PEER_RESPONSE',
closer: [{
id: nodes[1].peerId,
multiaddrs: []
}]
id: remotePeerId,
multiaddrs: []
}
})
expect(nodes[0]._dht.getClosestPeers.called).to.be.false()
await drain(nodes[0].peerRouting.getClosestPeers(nodes[1].peerId))
expect(nodes[0]._dht.getClosestPeers.called).to.be.true()
nodes[0]._dht.getClosestPeers.restore()
await nodes[0].peerRouting.getClosestPeers().next()
return deferred.promise
})
it('should error when peer tries to find itself', async () => {
@@ -111,95 +104,6 @@ describe('peer-routing', () => {
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_FIND_SELF')
})
it('should handle error thrown synchronously during find peer', async () => {
const unknownPeers = await peerUtils.createPeerId({ number: 1, fixture: false })
nodes[0].peerRouting._routers = [{
findPeer () {
throw new Error('Thrown sync')
}
}]
await expect(nodes[0].peerRouting.findPeer(unknownPeers[0]))
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_NOT_FOUND')
})
it('should handle error thrown asynchronously during find peer', async () => {
const unknownPeers = await peerUtils.createPeerId({ number: 1, fixture: false })
nodes[0].peerRouting._routers = [{
async findPeer () {
throw new Error('Thrown async')
}
}]
await expect(nodes[0].peerRouting.findPeer(unknownPeers[0]))
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_NOT_FOUND')
})
it('should handle error thrown asynchronously after delay during find peer', async () => {
const unknownPeers = await peerUtils.createPeerId({ number: 1, fixture: false })
nodes[0].peerRouting._routers = [{
async findPeer () {
await delay(100)
throw new Error('Thrown async after delay')
}
}]
await expect(nodes[0].peerRouting.findPeer(unknownPeers[0]))
.to.eventually.be.rejected()
.and.to.have.property('code', 'ERR_NOT_FOUND')
})
it('should return value when one router errors synchronously and another returns a value', async () => {
const [peer] = await peerUtils.createPeerId({ number: 1, fixture: false })
nodes[0].peerRouting._routers = [{
findPeer () {
throw new Error('Thrown sync')
}
}, {
async findPeer () {
return Promise.resolve({
id: peer,
multiaddrs: []
})
}
}]
await expect(nodes[0].peerRouting.findPeer(peer))
.to.eventually.deep.equal({
id: peer,
multiaddrs: []
})
})
it('should return value when one router errors asynchronously and another returns a value', async () => {
const [peer] = await peerUtils.createPeerId({ number: 1, fixture: false })
nodes[0].peerRouting._routers = [{
async findPeer () {
throw new Error('Thrown sync')
}
}, {
async findPeer () {
return Promise.resolve({
id: peer,
multiaddrs: []
})
}
}]
await expect(nodes[0].peerRouting.findPeer(peer))
.to.eventually.deep.equal({
id: peer,
multiaddrs: []
})
})
})
describe('via delegate router', () => {
@@ -239,35 +143,36 @@ describe('peer-routing', () => {
})
it('should use the delegate router to find peers', async () => {
const deferred = pDefer()
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
sinon.stub(delegate, 'findPeer').callsFake(() => {
deferred.resolve()
return {
id: remotePeerId,
multiaddrs: []
}
})
expect(delegate.findPeer.called).to.be.false()
await node.peerRouting.findPeer(remotePeerId)
expect(delegate.findPeer.called).to.be.true()
delegate.findPeer.restore()
await node.peerRouting.findPeer()
return deferred.promise
})
it('should use the delegate router to get the closest peers', async () => {
const deferred = pDefer()
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
sinon.stub(delegate, 'getClosestPeers').callsFake(function * () {
deferred.resolve()
yield {
id: remotePeerId,
multiaddrs: []
}
})
expect(delegate.getClosestPeers.called).to.be.false()
await drain(node.peerRouting.getClosestPeers(remotePeerId))
expect(delegate.getClosestPeers.called).to.be.true()
delegate.getClosestPeers.restore()
await node.peerRouting.getClosestPeers().next()
return deferred.promise
})
it('should be able to find a peer', async () => {
@@ -293,7 +198,7 @@ describe('peer-routing', () => {
})
it('should error when a peer cannot be found', async () => {
const peerId = await PeerId.create({ keyType: 'ed25519' })
const peerKey = 'key of a peer not on the network'
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/findpeer')
.query(true)
@@ -302,20 +207,20 @@ describe('peer-routing', () => {
'X-Chunked-Output', '1'
])
await expect(node.peerRouting.findPeer(peerId))
await expect(node.peerRouting.findPeer(peerKey))
.to.eventually.be.rejected()
expect(mockApi.isDone()).to.equal(true)
})
it('should handle errors from the api', async () => {
const peerId = await PeerId.create({ keyType: 'ed25519' })
const peerKey = 'key of a peer not on the network'
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/findpeer')
.query(true)
.reply(502)
await expect(node.peerRouting.findPeer(peerId))
await expect(node.peerRouting.findPeer(peerKey))
.to.eventually.be.rejected()
expect(mockApi.isDone()).to.equal(true)
@@ -323,6 +228,7 @@ describe('peer-routing', () => {
it('should be able to get the closest peers', async () => {
const peerId = await PeerId.create({ keyType: 'ed25519' })
const closest1 = '12D3KooWLewYMMdGWAtuX852n4rgCWkK7EBn4CWbwwBzhsVoKxk3'
const closest2 = '12D3KooWDtoQbpKhtnWddfj72QmpFvvLDTsBLTFkjvgQm6cde2AK'
@@ -341,12 +247,15 @@ describe('peer-routing', () => {
'X-Chunked-Output', '1'
])
const closestPeers = await all(node.peerRouting.getClosestPeers(peerId.id, { timeout: 1000 }))
const closestPeers = []
for await (const peer of node.peerRouting.getClosestPeers(peerId.id, { timeout: 1000 })) {
closestPeers.push(peer)
}
expect(closestPeers).to.have.length(2)
expect(closestPeers[0].id.toB58String()).to.equal(closest1)
expect(closestPeers[0].id.toB58String()).to.equal(closest2)
expect(closestPeers[0].multiaddrs).to.have.lengthOf(2)
expect(closestPeers[1].id.toB58String()).to.equal(closest2)
expect(closestPeers[1].id.toB58String()).to.equal(closest1)
expect(closestPeers[1].multiaddrs).to.have.lengthOf(2)
expect(mockApi.isDone()).to.equal(true)
})
@@ -405,7 +314,7 @@ describe('peer-routing', () => {
multiaddrs: []
}
sinon.stub(node._dht, 'findPeer').callsFake(async function * () {})
sinon.stub(node._dht, 'findPeer').callsFake(() => {})
sinon.stub(delegate, 'findPeer').callsFake(() => {
return results
})
@@ -423,8 +332,7 @@ describe('peer-routing', () => {
const defer = pDefer()
sinon.stub(node._dht, 'findPeer').callsFake(async function * () {
yield
sinon.stub(node._dht, 'findPeer').callsFake(async () => {
await defer.promise
})
sinon.stub(delegate, 'findPeer').callsFake(() => {
@@ -439,34 +347,29 @@ describe('peer-routing', () => {
it('should not wait for the delegate to return if the dht does first', async () => {
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
const result = {
const results = {
id: remotePeerId,
multiaddrs: []
}
const defer = pDefer()
sinon.stub(node._dht, 'findPeer').callsFake(async function * () {
yield {
name: 'PEER_RESPONSE',
closer: [
result
]
}
sinon.stub(node._dht, 'findPeer').callsFake(() => {
return results
})
sinon.stub(delegate, 'findPeer').callsFake(async () => {
await defer.promise
})
const peer = await node.peerRouting.findPeer(remotePeerId)
expect(peer).to.eql(result)
expect(peer).to.eql(results)
defer.resolve()
})
it('should store the addresses of the found peer', async () => {
const [remotePeerId] = await peerUtils.createPeerId({ fixture: false })
const result = {
const results = {
id: remotePeerId,
multiaddrs: [
new Multiaddr('/ip4/123.123.123.123/tcp/38982')
@@ -475,19 +378,14 @@ describe('peer-routing', () => {
const spy = sinon.spy(node.peerStore.addressBook, 'add')
sinon.stub(node._dht, 'findPeer').callsFake(async function * () {
yield {
name: 'PEER_RESPONSE',
closer: [
result
]
}
sinon.stub(node._dht, 'findPeer').callsFake(() => {
return results
})
sinon.stub(delegate, 'findPeer').callsFake(() => {})
await node.peerRouting.findPeer(remotePeerId)
expect(spy.calledWith(result.id, result.multiaddrs)).to.be.true()
expect(spy.calledWith(results.id, results.multiaddrs)).to.be.true()
})
it('should use the delegate if the dht fails to get the closest peer', async () => {
@@ -553,119 +451,4 @@ describe('peer-routing', () => {
expect(peers).to.be.an('array').with.a.lengthOf(1).that.deep.equals(results)
})
})
describe('peer routing refresh manager service', () => {
let node
let peerIds
before(async () => {
peerIds = await peerUtils.createPeerId({ number: 2 })
})
afterEach(() => {
sinon.restore()
return node && node.stop()
})
it('should be enabled and start by default', async () => {
const results = [
{ id: peerIds[0], multiaddrs: [new Multiaddr('/ip4/30.0.0.1/tcp/2000')] },
{ id: peerIds[1], multiaddrs: [new Multiaddr('/ip4/32.0.0.1/tcp/2000')] }
]
;[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
bootDelay: 100
}
}
}),
started: false
})
sinon.spy(node.peerStore.addressBook, 'add')
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield {
name: 'PEER_RESPONSE',
closer: [
results[0]
]
}
yield {
name: 'PEER_RESPONSE',
closer: [
results[1]
]
}
})
await node.start()
await pWaitFor(() => node._dht.getClosestPeers.callCount === 1)
await pWaitFor(() => node.peerStore.addressBook.add.callCount === results.length)
const call0 = node.peerStore.addressBook.add.getCall(0)
expect(call0.args[0].equals(results[0].id))
call0.args[1].forEach((m, index) => {
expect(m.equals(results[0].multiaddrs[index]))
})
const call1 = node.peerStore.addressBook.add.getCall(1)
expect(call1.args[0].equals(results[1].id))
call0.args[1].forEach((m, index) => {
expect(m.equals(results[1].multiaddrs[index]))
})
})
it('should support being disabled', async () => {
[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
bootDelay: 100,
enabled: false
}
}
}),
started: false
})
sinon.stub(node._dht, 'getClosestPeers').callsFake(async function * () {
yield
throw new Error('should not be called')
})
await node.start()
await delay(100)
expect(node._dht.getClosestPeers.callCount === 0)
})
it('should start and run recurrently on interval', async () => {
[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
peerRouting: {
refreshManager: {
interval: 500,
bootDelay: 200
}
}
}),
started: false
})
sinon.stub(node._dht, 'getClosestPeers').callsFake(function * () {
yield { id: peerIds[0], multiaddrs: [new Multiaddr('/ip4/30.0.0.1/tcp/2000')] }
})
await node.start()
await delay(300)
expect(node._dht.getClosestPeers.callCount).to.eql(1)
await delay(500)
expect(node._dht.getClosestPeers.callCount).to.eql(2)
})
})
})

View File

@@ -13,6 +13,9 @@ const routingOptions = mergeOptions(baseOptions, {
config: {
dht: {
kBucketSize: 20,
randomWalk: {
enabled: true
},
enabled: true
}
}

View File

@@ -13,7 +13,6 @@ const { Multiaddr } = require('multiaddr')
const mockUpgrader = require('../utils/mockUpgrader')
const sinon = require('sinon')
const Peers = require('../fixtures/peers')
const pWaitFor = require('p-wait-for')
const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
new Multiaddr('/ip4/127.0.0.1/tcp/0')
@@ -73,13 +72,9 @@ describe('Transport Manager (TCP)', () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen(addrs)
// Should created Self Peer record on new listen address, but it is done async
// with no event so we have to wait a bit
await pWaitFor(async () => {
signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
return signedPeerRecord != null
}, { interval: 100, timeout: 2000 })
// Should created Self Peer record on new listen address
signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
expect(signedPeerRecord).to.exist()
const record = PeerRecord.createFromProtobuf(signedPeerRecord.payload)
expect(record).to.exist()

View File

@@ -7,7 +7,7 @@
"libp2p": "file:../..",
"libp2p-bootstrap": "^0.13.0",
"libp2p-delegated-content-routing": "^0.11.0",
"libp2p-delegated-peer-routing": "^0.11.1",
"libp2p-delegated-peer-routing": "^0.10.0",
"libp2p-gossipsub": "^0.9.0",
"libp2p-interfaces": "^1.0.1",
"libp2p-kad-dht": "^0.26.5",

View File

@@ -62,13 +62,6 @@ async function main() {
contentRouting: [delegatedContentRouting],
peerRouting: [delegatedPeerRouting]
},
peerRouting: {
refreshManager: {
enabled: true,
interval: 1000,
bootDelay: 11111
}
},
dialer: {
maxParallelDials: 100,
maxDialsPerPeer: 4,
@@ -123,6 +116,11 @@ async function main() {
dht: {
enabled: true,
kBucketSize: 20,
randomWalk: {
enabled: true, // Allows to disable discovery (enabled by default)
interval: 300e3,
timeout: 10e3
},
clientMode: true,
validators: {
pk: Libp2pRecord.validator.validators.pk

View File

@@ -2,7 +2,6 @@
const duplexPair = require('it-pair/duplex')
const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
/**
* Returns both sides of a mocked MultiaddrConnection