mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
feat: rendezvous client
This commit is contained in:
parent
42b51d8f01
commit
a747e5b495
@ -161,6 +161,8 @@ List of packages currently in existence for libp2p
|
||||
| **peer routing** |
|
||||
| [`libp2p-delegated-peer-routing`](//github.com/libp2p/js-libp2p-delegated-peer-routing) | [](//github.com/libp2p/js-libp2p-delegated-peer-routing/releases) | [](https://david-dm.org/libp2p/js-libp2p-delegated-peer-routing) | [](https://travis-ci.com/libp2p/js-libp2p-delegated-peer-routing) | [](https://codecov.io/gh/libp2p/js-libp2p-delegated-peer-routing) | [Jacob Heun](mailto:jacobheun@gmail.com) |
|
||||
| [`libp2p-kad-dht`](//github.com/libp2p/js-libp2p-kad-dht) | [](//github.com/libp2p/js-libp2p-kad-dht/releases) | [](https://david-dm.org/libp2p/js-libp2p-kad-dht) | [](https://travis-ci.com/libp2p/js-libp2p-kad-dht) | [](https://codecov.io/gh/libp2p/js-libp2p-kad-dht) | [Vasco Santos](mailto:vasco.santos@moxy.studio) |
|
||||
| **service discovery** |
|
||||
| [`libp2p-rendezvous`](//github.com/libp2p/js-libp2p-rendezvous) | [](//github.com/libp2p/js-libp2p-rendezvous/releases) | [](https://david-dm.org/libp2p/js-libp2p-rendezvous) | [](https://travis-ci.com/libp2p/js-libp2p-rendezvous) | [](https://codecov.io/gh/libp2p/js-libp2p-rendezvous) | [Vasco Santos](mailto:santos.vasco10@gmail.com) |
|
||||
| **utilities** |
|
||||
| [`libp2p-crypto`](//github.com/libp2p/js-libp2p-crypto) | [](//github.com/libp2p/js-libp2p-crypto/releases) | [](https://david-dm.org/libp2p/js-libp2p-crypto) | [](https://travis-ci.com/libp2p/js-libp2p-crypto) | [](https://codecov.io/gh/libp2p/js-libp2p-crypto) | [Jacob Heun](mailto:jacobheun@gmail.com) |
|
||||
| [`libp2p-crypto-secp256k1`](//github.com/libp2p/js-libp2p-crypto-secp256k1) | [](//github.com/libp2p/js-libp2p-crypto-secp256k1/releases) | [](https://david-dm.org/libp2p/js-libp2p-crypto-secp256k1) | [](https://travis-ci.com/libp2p/js-libp2p-crypto-secp256k1) | [](https://codecov.io/gh/libp2p/js-libp2p-crypto-secp256k1) | [Friedel Ziegelmayer](mailto:dignifiedquire@gmail.com) |
|
||||
|
85
doc/API.md
85
doc/API.md
@ -51,6 +51,9 @@
|
||||
* [`pubsub.removeListener`](#pubsubremovelistener)
|
||||
* [`pubsub.topicValidators.set`](#pubsubtopicvalidatorsset)
|
||||
* [`pubsub.topicValidators.delete`](#pubsubtopicvalidatorsdelete)
|
||||
* [`rendezvous.discover`](#rendezvousdiscover)
|
||||
* [`rendezvous.register`](#rendezvousregister)
|
||||
* [`rendezvous.unregister`](#rendezvousunregister)
|
||||
* [`connectionManager.get`](#connectionmanagerget)
|
||||
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
|
||||
* [`connectionManager.size`](#connectionmanagersize)
|
||||
@ -1600,6 +1603,88 @@ Get a connection with a given peer, if it exists.
|
||||
libp2p.connectionManager.get(peerId)
|
||||
```
|
||||
|
||||
### rendezvous.register
|
||||
|
||||
Registers the peer in a given namespace.
|
||||
|
||||
`rendezvous.register(namespace, [options])`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| namespace | `string` | namespace to register |
|
||||
| [options] | `Object` | rendezvous registrations options |
|
||||
| [options.ttl=7.2e6] | `number` | registration ttl in ms |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `Promise<number>` | Remaining ttl value |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// ...
|
||||
const ttl = await rendezvous.register(namespace)
|
||||
```
|
||||
|
||||
### rendezvous.unregister
|
||||
|
||||
Unregisters the peer from a given namespace.
|
||||
|
||||
`rendezvous.unregister(namespace)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| namespace | `string` | namespace to unregister |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `Promise<void>` | Operation resolved |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// ...
|
||||
await rendezvous.register(namespace)
|
||||
await rendezvous.unregister(namespace)
|
||||
```
|
||||
|
||||
### rendezvous.discover
|
||||
|
||||
Discovers peers registered under a given namespace.
|
||||
|
||||
`rendezvous.discover(namespace, [limit])`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| namespace | `string` | namespace to discover |
|
||||
| limit | `number` | limit of peers to discover |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `AsyncIterable<{ signedPeerRecord: Uint8Array, ns: string, ttl: number }>` | Async Iterable with registrations |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// ...
|
||||
await rendezvous.register(namespace)
|
||||
for await (const reg of rendezvous.discover(namespace)) {
|
||||
console.log(reg.signedPeerRecord, reg.ns, reg.ttl)
|
||||
}
|
||||
```
|
||||
|
||||
### connectionManager.setPeerValue
|
||||
|
||||
Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
|
||||
|
@ -21,6 +21,7 @@
|
||||
- [Setup with Content and Peer Routing](#setup-with-content-and-peer-routing)
|
||||
- [Setup with Relay](#setup-with-relay)
|
||||
- [Setup with Auto Relay](#setup-with-auto-relay)
|
||||
- [Setup with Rendezvous](#setup-with-rendezvous)
|
||||
- [Setup with Keychain](#setup-with-keychain)
|
||||
- [Configuring Dialing](#configuring-dialing)
|
||||
- [Configuring Connection Manager](#configuring-connection-manager)
|
||||
@ -465,6 +466,34 @@ const node = await Libp2p.create({
|
||||
})
|
||||
```
|
||||
|
||||
#### Setup with Rendezvous
|
||||
|
||||
You will need to setup a rendezvous server, which will be used by rendezvous client nodes. You can see how to setup a rendezvous server in [libp2p/js-libp2p-rendezvous](https://github.com/libp2p/js-libp2p-rendezvous)
|
||||
|
||||
For setting up libp2p with a rendezvous client, you should configure libp2p as follows:
|
||||
|
||||
```js
|
||||
const Libp2p = require('libp2p')
|
||||
const TCP = require('libp2p-tcp')
|
||||
const MPLEX = require('libp2p-mplex')
|
||||
const { NOISE } = require('libp2p-noise')
|
||||
const Rendezvous = require('libp2p-rendezvous')
|
||||
const Bootstrap = require('libp2p-bootstrap')
|
||||
const node = await Libp2p.create({
|
||||
modules: {
|
||||
transport: [TCP],
|
||||
streamMuxer: [MPLEX],
|
||||
connEncryption: [NOISE],
|
||||
peerDiscovery: [Bootstrap]
|
||||
},
|
||||
rendezvous: {
|
||||
enabled: true,
|
||||
// Insert your rendezvous servers multiaddrs
|
||||
rendezvousPoints: ['/dnsaddr/rendezvous.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJP']
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
#### Setup with Keychain
|
||||
|
||||
Libp2p allows you to setup a secure keychain to manage your keys. The keychain configuration object should have the following properties:
|
||||
|
@ -42,6 +42,9 @@
|
||||
["libp2p/js-libp2p-delegated-peer-routing", "libp2p-delegated-peer-routing"],
|
||||
["libp2p/js-libp2p-kad-dht", "libp2p-kad-dht"],
|
||||
|
||||
"service discovery",
|
||||
["libp2p/js-libp2p-rendezvous", "libp2p-rendezvous"],
|
||||
|
||||
"utilities",
|
||||
["libp2p/js-libp2p-crypto", "libp2p-crypto"],
|
||||
["libp2p/js-libp2p-crypto-secp256k1", "libp2p-crypto-secp256k1"],
|
||||
|
@ -117,6 +117,7 @@
|
||||
"libp2p-mdns": "^0.15.0",
|
||||
"libp2p-mplex": "^0.10.1",
|
||||
"libp2p-noise": "^2.0.0",
|
||||
"libp2p-rendezvous": "libp2p/js-libp2p-rendezvous#feat/rendezvous-protocol-full-implementation",
|
||||
"libp2p-secio": "^0.13.1",
|
||||
"libp2p-tcp": "^0.15.1",
|
||||
"libp2p-webrtc-star": "^0.20.0",
|
||||
|
@ -48,6 +48,10 @@ const DefaultConfig = {
|
||||
bootDelay: 10e3
|
||||
}
|
||||
},
|
||||
rendezvous: {
|
||||
enabled: false,
|
||||
rendezvousPoints: []
|
||||
},
|
||||
config: {
|
||||
dht: {
|
||||
enabled: false,
|
||||
|
13
src/index.js
13
src/index.js
@ -28,6 +28,7 @@ const Upgrader = require('./upgrader')
|
||||
const PeerStore = require('./peer-store')
|
||||
const PubsubAdapter = require('./pubsub-adapter')
|
||||
const PersistentPeerStore = require('./peer-store/persistent')
|
||||
const Rendezvous = require('./rendezvous')
|
||||
const Registrar = require('./registrar')
|
||||
const ping = require('./ping')
|
||||
const IdentifyService = require('./identify')
|
||||
@ -267,6 +268,14 @@ class Libp2p extends EventEmitter {
|
||||
this.pubsub = PubsubAdapter(Pubsub, this, this._config.pubsub)
|
||||
}
|
||||
|
||||
// Create rendezvous client if enabled
|
||||
if (this._options.rendezvous.enabled) {
|
||||
this.rendezvous = new Rendezvous({
|
||||
libp2p: this,
|
||||
rendezvousPoints: this._options.rendezvous.rendezvousPoints
|
||||
})
|
||||
}
|
||||
|
||||
// Attach remaining APIs
|
||||
// peer and content routing will automatically get modules from _modules and _dht
|
||||
this.peerRouting = new PeerRouting(this)
|
||||
@ -329,6 +338,7 @@ class Libp2p extends EventEmitter {
|
||||
try {
|
||||
this._isStarted = false
|
||||
|
||||
this.rendezvous && this.rendezvous.start()
|
||||
this.relay && this.relay.stop()
|
||||
this.peerRouting.stop()
|
||||
|
||||
@ -582,6 +592,9 @@ class Libp2p extends EventEmitter {
|
||||
// Peer discovery
|
||||
await this._setupPeerDiscovery()
|
||||
|
||||
// Rendezvous
|
||||
this.rendezvous && this.rendezvous.start()
|
||||
|
||||
// Relay
|
||||
this.relay && this.relay.start()
|
||||
|
||||
|
32
src/rendezvous/README.md
Normal file
32
src/rendezvous/README.md
Normal file
@ -0,0 +1,32 @@
|
||||
# Rendezvous Protocol in js-libp2p
|
||||
|
||||
The rendezvous protocol can be used in different contexts across libp2p. For using it, the libp2p network needs to have well known libp2p nodes acting as rendezvous servers. These nodes will have an extra role in the network. They will collect and maintain a list of registrations per rendezvous namespace. Other peers in the network will act as rendezvous clients and will register themselves on given namespaces by messaging a rendezvous server node. Taking into account these registrations, a rendezvous client is able to discover other peers in a given namespace by querying a server. A registration should have a `ttl`, in order to avoid having invalid registrations.
|
||||
|
||||
An example of a namespace could be a relay namespace, so that undialable nodes can register themselves as reachable through that relay.
|
||||
|
||||
## Usage
|
||||
|
||||
`js-libp2p` supports the usage of the rendezvous protocol through its configuration. It allows the rendezvous protocol to be enabled and customized. You will need to setup a rendezvous server, which will be used by rendezvous client nodes. You can see how to setup a rendezvous server in [libp2p/js-libp2p-rendezvous](https://github.com/libp2p/js-libp2p-rendezvous)
|
||||
|
||||
You can configure it through libp2p as follows:
|
||||
|
||||
```js
|
||||
const Libp2p = require('libp2p')
|
||||
const node = await Libp2p.create({
|
||||
rendezvous: {
|
||||
enabled: true,
|
||||
rendezvousPoints: ['/dnsaddr/rendezvous.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJP']
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
## Libp2p Flow
|
||||
|
||||
Every time a rendezvous operation is performed through libpsp's API, libp2p will attempt to connect to the given rendezvous servers and exchange the appropriate rendezvous protocol per the [spec](https://github.com/libp2p/specs/tree/master/rendezvous).
|
||||
|
||||
## Future Work
|
||||
|
||||
- Libp2p can handle re-registers when properly configured
|
||||
- Libp2p automatically unregisters previously registered namespaces on stop.
|
||||
- Rendezvous client should be able to register namespaces given in configuration on startup
|
||||
- Not supported at the moment, as we would need to deal with re-register over time
|
6
src/rendezvous/constants.js
Normal file
6
src/rendezvous/constants.js
Normal file
@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
module.exports = {
|
||||
PROTOCOL_MULTICODEC: '/rendezvous/1.0.0',
|
||||
MAX_DISCOVER_LIMIT: 50
|
||||
}
|
6
src/rendezvous/errors.js
Normal file
6
src/rendezvous/errors.js
Normal file
@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
exports.codes = {
|
||||
INVALID_NAMESPACE: 'ERR_INVALID_NAMESPACE',
|
||||
NO_CONNECTED_RENDEZVOUS_SERVERS: 'ERR_NO_CONNECTED_RENDEZVOUS_SERVERS'
|
||||
}
|
304
src/rendezvous/index.js
Normal file
304
src/rendezvous/index.js
Normal file
@ -0,0 +1,304 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const log = Object.assign(debug('libp2p:rendezvous'), {
|
||||
error: debug('libp2p:rendezvous:err')
|
||||
})
|
||||
|
||||
const errCode = require('err-code')
|
||||
const { pipe } = require('it-pipe')
|
||||
const lp = require('it-length-prefixed')
|
||||
const { collect } = require('streaming-iterables')
|
||||
const { toBuffer } = require('it-buffer')
|
||||
const fromString = require('uint8arrays/from-string')
|
||||
const toString = require('uint8arrays/to-string')
|
||||
|
||||
const { codes: errCodes } = require('./errors')
|
||||
const {
|
||||
MAX_DISCOVER_LIMIT,
|
||||
PROTOCOL_MULTICODEC
|
||||
} = require('./constants')
|
||||
// @ts-ignore TODO: needs release for types
|
||||
const { Message } = require('libp2p-rendezvous/src/proto')
|
||||
const MESSAGE_TYPE = Message.MessageType
|
||||
|
||||
/**
|
||||
* @typedef {import('..')} Libp2p
|
||||
* @typedef {import('multiaddr')} Multiaddr
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef {Object} RendezvousProperties
|
||||
* @property {Libp2p} libp2p
|
||||
*
|
||||
* @typedef {Object} RendezvousOptions
|
||||
* @property {Multiaddr[]} rendezvousPoints
|
||||
*/
|
||||
|
||||
class Rendezvous {
|
||||
/**
|
||||
* Libp2p Rendezvous. A lightweight mechanism for generalized peer discovery.
|
||||
*
|
||||
* @class
|
||||
* @param {RendezvousProperties & RendezvousOptions} params
|
||||
*/
|
||||
constructor ({ libp2p, rendezvousPoints }) {
|
||||
this._libp2p = libp2p
|
||||
this._peerId = libp2p.peerId
|
||||
this._peerStore = libp2p.peerStore
|
||||
this._connectionManager = libp2p.connectionManager
|
||||
this._rendezvousPoints = rendezvousPoints
|
||||
|
||||
this._isStarted = false
|
||||
|
||||
/**
|
||||
* Map namespaces to a map of rendezvous point identifier to cookie.
|
||||
*
|
||||
* @type {Map<string, Map<string, string>>}
|
||||
*/
|
||||
this._cookies = new Map()
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the rendezvous client in the libp2p node.
|
||||
*
|
||||
* @returns {void}
|
||||
*/
|
||||
start () {
|
||||
if (this._isStarted) {
|
||||
return
|
||||
}
|
||||
|
||||
this._isStarted = true
|
||||
log('started')
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the rendezvous state and unregister from namespaces.
|
||||
*
|
||||
* @returns {void}
|
||||
*/
|
||||
stop () {
|
||||
if (!this._isStarted) {
|
||||
return
|
||||
}
|
||||
|
||||
this._isStarted = false
|
||||
this._cookies.clear()
|
||||
log('stopped')
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the peer in a given namespace
|
||||
*
|
||||
* @param {string} ns
|
||||
* @param {object} [options]
|
||||
* @param {number} [options.ttl = 7.2e6] - registration ttl in ms
|
||||
* @returns {Promise<number>} rendezvous register ttl.
|
||||
*/
|
||||
async register (ns, { ttl = 7.2e6 } = {}) {
|
||||
if (!ns) {
|
||||
throw errCode(new Error('a namespace must be provided'), errCodes.INVALID_NAMESPACE)
|
||||
}
|
||||
|
||||
// Are there available rendezvous servers?
|
||||
if (!this._rendezvousPoints || !this._rendezvousPoints.length) {
|
||||
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
}
|
||||
|
||||
const message = Message.encode({
|
||||
type: MESSAGE_TYPE.REGISTER,
|
||||
register: {
|
||||
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(this._peerId),
|
||||
ns,
|
||||
ttl: ttl * 1e-3 // Convert to seconds
|
||||
}
|
||||
})
|
||||
|
||||
const registerTasks = []
|
||||
|
||||
/**
|
||||
* @param {Multiaddr} m
|
||||
* @returns {Promise<number>}
|
||||
*/
|
||||
const taskFn = async (m) => {
|
||||
const connection = await this._libp2p.dial(m)
|
||||
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
|
||||
|
||||
const [response] = await pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
|
||||
// Close connection if not any other open streams
|
||||
if (!connection.streams.length) {
|
||||
await connection.close()
|
||||
}
|
||||
|
||||
const recMessage = Message.decode(response)
|
||||
|
||||
if (!recMessage.type === MESSAGE_TYPE.REGISTER_RESPONSE) {
|
||||
throw new Error('unexpected message received')
|
||||
}
|
||||
|
||||
if (recMessage.registerResponse.status !== Message.ResponseStatus.OK) {
|
||||
throw errCode(new Error(recMessage.registerResponse.statusText), recMessage.registerResponse.status)
|
||||
}
|
||||
|
||||
return recMessage.registerResponse.ttl * 1e3 // convert to ms
|
||||
}
|
||||
|
||||
for (const m of this._rendezvousPoints) {
|
||||
registerTasks.push(taskFn(m))
|
||||
}
|
||||
|
||||
// Return first ttl
|
||||
// TODO: consider pAny instead of Promise.all?
|
||||
const [returnTtl] = await Promise.all(registerTasks)
|
||||
|
||||
return returnTtl
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister peer from the nampesapce.
|
||||
*
|
||||
* @param {string} ns
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async unregister (ns) {
|
||||
if (!ns) {
|
||||
throw errCode(new Error('a namespace must be provided'), errCodes.INVALID_NAMESPACE)
|
||||
}
|
||||
|
||||
// Are there available rendezvous servers?
|
||||
if (!this._rendezvousPoints || !this._rendezvousPoints.length) {
|
||||
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
}
|
||||
|
||||
const message = Message.encode({
|
||||
type: MESSAGE_TYPE.UNREGISTER,
|
||||
unregister: {
|
||||
id: this._peerId.toBytes(),
|
||||
ns
|
||||
}
|
||||
})
|
||||
|
||||
const unregisterTasks = []
|
||||
/**
|
||||
* @param {Multiaddr} m
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
const taskFn = async (m) => {
|
||||
const connection = await this._libp2p.dial(m)
|
||||
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
|
||||
|
||||
await pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
async (source) => {
|
||||
for await (const _ of source) { } // eslint-disable-line
|
||||
}
|
||||
)
|
||||
|
||||
// Close connection if not any other open streams
|
||||
if (!connection.streams.length) {
|
||||
await connection.close()
|
||||
}
|
||||
}
|
||||
|
||||
for (const m of this._rendezvousPoints) {
|
||||
unregisterTasks.push(taskFn(m))
|
||||
}
|
||||
|
||||
await Promise.all(unregisterTasks)
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover peers registered under a given namespace
|
||||
*
|
||||
* @param {string} ns
|
||||
* @param {number} [limit = MAX_DISCOVER_LIMIT]
|
||||
* @returns {AsyncIterable<{ signedPeerRecord: Uint8Array, ns: string, ttl: number }>}
|
||||
*/
|
||||
async * discover (ns, limit = MAX_DISCOVER_LIMIT) {
|
||||
// TODO: consider opening the envelope in the dicover
|
||||
// This would store the addresses in the AddressBook
|
||||
|
||||
// Are there available rendezvous servers?
|
||||
if (!this._rendezvousPoints || !this._rendezvousPoints.length) {
|
||||
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
}
|
||||
|
||||
const registrationTransformer = (r) => ({
|
||||
signedPeerRecord: r.signedPeerRecord,
|
||||
ns: r.ns,
|
||||
ttl: r.ttl * 1e3 // convert to ms
|
||||
})
|
||||
|
||||
// Iterate over all rendezvous points
|
||||
for (const m of this._rendezvousPoints) {
|
||||
const namespaseCookies = this._cookies.get(ns) || new Map()
|
||||
|
||||
// Check if we have a cookie and encode discover message
|
||||
const cookie = namespaseCookies.get(m.toString())
|
||||
const message = Message.encode({
|
||||
type: MESSAGE_TYPE.DISCOVER,
|
||||
discover: {
|
||||
ns,
|
||||
limit,
|
||||
cookie: cookie ? fromString(cookie) : undefined
|
||||
}
|
||||
})
|
||||
|
||||
// Send discover message and wait for response
|
||||
const connection = await this._libp2p.dial(m)
|
||||
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
|
||||
const [response] = await pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
|
||||
if (!connection.streams.length) {
|
||||
await connection.close()
|
||||
}
|
||||
|
||||
const recMessage = Message.decode(response)
|
||||
|
||||
if (!recMessage.type === MESSAGE_TYPE.DISCOVER_RESPONSE) {
|
||||
throw new Error('unexpected message received')
|
||||
} else if (recMessage.discoverResponse.status !== Message.ResponseStatus.OK) {
|
||||
throw errCode(new Error(recMessage.discoverResponse.statusText), recMessage.discoverResponse.status)
|
||||
}
|
||||
|
||||
// Iterate over registrations response
|
||||
for (const r of recMessage.discoverResponse.registrations) {
|
||||
// track registrations
|
||||
yield registrationTransformer(r)
|
||||
|
||||
limit--
|
||||
if (limit === 0) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Store cookie
|
||||
const c = recMessage.discoverResponse.cookie
|
||||
if (c && c.length) {
|
||||
const nsCookies = this._cookies.get(ns) || new Map()
|
||||
nsCookies.set(m.toString(), toString(c))
|
||||
this._cookies.set(ns, nsCookies)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Rendezvous
|
318
test/rendezvous/index.spec.js
Normal file
318
test/rendezvous/index.spec.js
Normal file
@ -0,0 +1,318 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const { expect } = require('aegir/utils/chai')
|
||||
const sinon = require('sinon')
|
||||
const pWaitFor = require('p-wait-for')
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const { Message } = require('libp2p-rendezvous/src/proto')
|
||||
const RESPONSE_STATUS = Message.ResponseStatus
|
||||
|
||||
const Envelope = require('../../src/record/envelope')
|
||||
const PeerRecord = require('../../src/record/peer-record')
|
||||
const { codes: errCodes } = require('../../src/rendezvous/errors')
|
||||
|
||||
const {
|
||||
getSubsystemOptions,
|
||||
createRendezvousServer,
|
||||
createSignedPeerRecord
|
||||
} = require('./utils')
|
||||
|
||||
const {
|
||||
createPeer
|
||||
} = require('../utils/creators/peer')
|
||||
|
||||
const namespace = 'ns'
|
||||
|
||||
describe('rendezvous', () => {
|
||||
describe('no rendezvous server', () => {
|
||||
let clients
|
||||
|
||||
// Create and start Libp2p nodes
|
||||
beforeEach(async () => {
|
||||
clients = await createPeer({ number: 2, config: getSubsystemOptions([]) })
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(clients.map((c) => c.stop()))
|
||||
})
|
||||
|
||||
it('register throws error if no rendezvous servers', async () => {
|
||||
await expect(clients[0].rendezvous.register(namespace))
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
})
|
||||
|
||||
it('unregister throws error if no rendezvous servers', async () => {
|
||||
await expect(clients[0].rendezvous.unregister(namespace))
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
})
|
||||
|
||||
it('discover throws error if no rendezvous servers', async () => {
|
||||
try {
|
||||
for await (const _ of clients[0].rendezvous.discover()) { } // eslint-disable-line
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
expect(err.code).to.eql(errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
|
||||
return
|
||||
}
|
||||
throw new Error('discover should throw error if no rendezvous servers')
|
||||
})
|
||||
})
|
||||
|
||||
describe('one rendezvous server', () => {
|
||||
let rendezvousServer
|
||||
let clients
|
||||
|
||||
// Create and start Libp2p
|
||||
beforeEach(async function () {
|
||||
this.timeout(10e3)
|
||||
|
||||
// Create Rendezvous Server
|
||||
rendezvousServer = await createRendezvousServer()
|
||||
await pWaitFor(() => rendezvousServer.multiaddrs.length > 0)
|
||||
const rendezvousServerMultiaddr = `${rendezvousServer.multiaddrs[0]}/p2p/${rendezvousServer.peerId.toB58String()}`
|
||||
|
||||
clients = await createPeer({ number: 2, config: getSubsystemOptions([rendezvousServerMultiaddr]) })
|
||||
})
|
||||
|
||||
afterEach(async function () {
|
||||
this.timeout(10e3)
|
||||
sinon.restore()
|
||||
await rendezvousServer.rendezvousDatastore.reset()
|
||||
await rendezvousServer.stop()
|
||||
|
||||
await Promise.all(clients.map((c) => c.stop()))
|
||||
})
|
||||
|
||||
it('register throws error if a namespace is not provided', async () => {
|
||||
await expect(clients[0].rendezvous.register())
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', errCodes.INVALID_NAMESPACE)
|
||||
})
|
||||
|
||||
it('register throws an error with an invalid namespace', async () => {
|
||||
const badNamespace = 'x'.repeat(300)
|
||||
|
||||
await expect(clients[0].rendezvous.register(badNamespace))
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', RESPONSE_STATUS.E_INVALID_NAMESPACE)
|
||||
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
|
||||
it('register throws an error with an invalid ttl', async () => {
|
||||
const badTtl = 5e10
|
||||
|
||||
await expect(clients[0].rendezvous.register(namespace, { ttl: badTtl }))
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', RESPONSE_STATUS.E_INVALID_TTL)
|
||||
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
|
||||
it('register throws an error with an invalid peerId', async () => {
|
||||
const badSignedPeerRecord = await createSignedPeerRecord(clients[1].peerId, [multiaddr('/ip4/127.0.0.1/tcp/100')])
|
||||
|
||||
const stub = sinon.stub(clients[0].peerStore.addressBook, 'getRawEnvelope')
|
||||
stub.onCall(0).returns(badSignedPeerRecord.marshal())
|
||||
|
||||
await expect(clients[0].rendezvous.register(namespace))
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', RESPONSE_STATUS.E_NOT_AUTHORIZED)
|
||||
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
|
||||
it('registers with an available rendezvous server node', async () => {
|
||||
const registers = []
|
||||
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
|
||||
await clients[0].rendezvous.register(namespace)
|
||||
|
||||
// Peer2 discovers Peer0 registered in Peer1
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) {
|
||||
registers.push(reg)
|
||||
}
|
||||
|
||||
expect(registers).to.have.lengthOf(1)
|
||||
expect(registers[0].ns).to.eql(namespace)
|
||||
})
|
||||
|
||||
it('unregister throws if a namespace is not provided', async () => {
|
||||
await expect(clients[0].rendezvous.unregister())
|
||||
.to.eventually.rejected()
|
||||
.and.have.property('code', errCodes.INVALID_NAMESPACE)
|
||||
})
|
||||
|
||||
it('unregisters with an available rendezvous server node', async () => {
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
|
||||
// Register
|
||||
await clients[0].rendezvous.register(namespace)
|
||||
|
||||
// Unregister
|
||||
await clients[0].rendezvous.unregister(namespace)
|
||||
|
||||
// other client does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
|
||||
it('unregister not fails if not registered', async () => {
|
||||
await clients[0].rendezvous.unregister(namespace)
|
||||
})
|
||||
|
||||
it('discover throws error if a namespace is invalid', async () => {
|
||||
const badNamespace = 'x'.repeat(300)
|
||||
|
||||
try {
|
||||
for await (const _ of clients[0].rendezvous.discover(badNamespace)) { } // eslint-disable-line
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
expect(err.code).to.eql(RESPONSE_STATUS.E_INVALID_NAMESPACE)
|
||||
return
|
||||
}
|
||||
|
||||
throw new Error('discover should throw error if a namespace is not provided')
|
||||
})
|
||||
|
||||
it('discover does not find any register if there is none', async () => {
|
||||
for await (const reg of clients[0].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
|
||||
it('discover finds registered peer for namespace', async () => {
|
||||
const registers = []
|
||||
|
||||
// Peer2 does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
|
||||
// Peer0 register itself on namespace (connected to Peer1)
|
||||
await clients[0].rendezvous.register(namespace)
|
||||
|
||||
// Peer2 discovers Peer0 registered in Peer1
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) {
|
||||
registers.push(reg)
|
||||
}
|
||||
|
||||
expect(registers).to.have.lengthOf(1)
|
||||
expect(registers[0].signedPeerRecord).to.exist()
|
||||
expect(registers[0].ns).to.eql(namespace)
|
||||
expect(registers[0].ttl).to.exist()
|
||||
|
||||
// Validate envelope
|
||||
const envelope = await Envelope.openAndCertify(registers[0].signedPeerRecord, PeerRecord.DOMAIN)
|
||||
const rec = PeerRecord.createFromProtobuf(envelope.payload)
|
||||
|
||||
expect(rec.multiaddrs).to.eql(clients[0].multiaddrs)
|
||||
})
|
||||
|
||||
it('discover finds registered peer for namespace once (cookie)', async () => {
|
||||
const registers = []
|
||||
|
||||
// Peer2 does not discovery any peer registered
|
||||
for await (const _ of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
|
||||
// Peer0 register itself on namespace (connected to Peer1)
|
||||
await clients[0].rendezvous.register(namespace)
|
||||
|
||||
// Peer2 discovers Peer0 registered in Peer1
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) {
|
||||
registers.push(reg)
|
||||
}
|
||||
|
||||
expect(registers).to.have.lengthOf(1)
|
||||
expect(registers[0].signedPeerRecord).to.exist()
|
||||
expect(registers[0].ns).to.eql(namespace)
|
||||
expect(registers[0].ttl).to.exist()
|
||||
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) {
|
||||
registers.push(reg)
|
||||
}
|
||||
|
||||
expect(registers).to.have.lengthOf(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('multiple rendezvous servers available', () => {
|
||||
let rendezvousServers = []
|
||||
let clients
|
||||
|
||||
// Create and start Libp2p
|
||||
beforeEach(async function () {
|
||||
this.timeout(10e3)
|
||||
|
||||
// Create Rendezvous Server
|
||||
rendezvousServers = await Promise.all([
|
||||
createRendezvousServer(),
|
||||
createRendezvousServer()
|
||||
])
|
||||
await pWaitFor(() => rendezvousServers[0].multiaddrs.length > 0 && rendezvousServers[1].multiaddrs.length > 0)
|
||||
const rendezvousServerMultiaddrs = rendezvousServers.map((rendezvousServer) => `${rendezvousServer.multiaddrs[0]}/p2p/${rendezvousServer.peerId.toB58String()}`)
|
||||
|
||||
clients = await createPeer({ number: 2, config: getSubsystemOptions(rendezvousServerMultiaddrs) })
|
||||
})
|
||||
|
||||
afterEach(async function () {
|
||||
this.timeout(10e3)
|
||||
await Promise.all(rendezvousServers.map((s) => s.rendezvousDatastore.reset()))
|
||||
await Promise.all(rendezvousServers.map((s) => s.stop()))
|
||||
|
||||
await Promise.all(clients.map((c) => c.stop()))
|
||||
})
|
||||
|
||||
it('discover find registered peer for namespace only when registered ', async () => {
|
||||
const registers = []
|
||||
|
||||
// Client 1 does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
|
||||
// Client 0 register itself on namespace (connected to Peer1)
|
||||
await clients[0].rendezvous.register(namespace)
|
||||
|
||||
// Client1 discovers Client0
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) {
|
||||
registers.push(reg)
|
||||
}
|
||||
|
||||
expect(registers[0].signedPeerRecord).to.exist()
|
||||
expect(registers[0].ns).to.eql(namespace)
|
||||
expect(registers[0].ttl).to.exist()
|
||||
|
||||
// Client0 unregister itself on namespace
|
||||
await clients[0].rendezvous.unregister(namespace)
|
||||
|
||||
// Peer2 does not discovery any peer registered
|
||||
for await (const reg of clients[1].rendezvous.discover(namespace)) { // eslint-disable-line
|
||||
throw new Error('no registers should exist')
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
60
test/rendezvous/utils.js
Normal file
60
test/rendezvous/utils.js
Normal file
@ -0,0 +1,60 @@
|
||||
'use strict'
|
||||
|
||||
const mergeOptions = require('merge-options')
|
||||
|
||||
const RendezvousServer = require('libp2p-rendezvous')
|
||||
const Datastore = require('libp2p-rendezvous/src/datastores/memory')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const Envelope = require('../../src/record/envelope')
|
||||
const PeerRecord = require('../../src/record/peer-record')
|
||||
|
||||
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
|
||||
const relayAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
const baseOptions = require('../utils/base-options.browser')
|
||||
|
||||
const getSubsystemOptions = (multiaddrs) => mergeOptions(baseOptions, {
|
||||
addresses: {
|
||||
listen: [`${relayAddr}/p2p-circuit`]
|
||||
},
|
||||
rendezvous: {
|
||||
enabled: true,
|
||||
rendezvousPoints: multiaddrs
|
||||
}
|
||||
})
|
||||
|
||||
async function createRendezvousServer ({ config = {}, started = true } = {}) {
|
||||
const datastore = new Datastore()
|
||||
|
||||
const peerId = await PeerId.create()
|
||||
const rendezvous = new RendezvousServer(mergeOptions(baseOptions, {
|
||||
addresses: {
|
||||
listen: [`${relayAddr}/p2p-circuit`]
|
||||
},
|
||||
peerId,
|
||||
...config
|
||||
}), { datastore })
|
||||
|
||||
if (started) {
|
||||
await rendezvous.start()
|
||||
}
|
||||
|
||||
return rendezvous
|
||||
}
|
||||
|
||||
async function createSignedPeerRecord (peerId, multiaddrs) {
|
||||
const pr = new PeerRecord({
|
||||
peerId,
|
||||
multiaddrs
|
||||
})
|
||||
|
||||
const envelope = await Envelope.seal(pr, peerId)
|
||||
|
||||
return envelope
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
createRendezvousServer,
|
||||
getSubsystemOptions,
|
||||
createSignedPeerRecord
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user