mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-24 18:12:14 +00:00
feat: discovery api
This commit is contained in:
parent
894d8e3cac
commit
87da0cd86d
60
doc/API.md
60
doc/API.md
@ -20,6 +20,8 @@
|
||||
* [`contentRouting.put`](#contentroutingput)
|
||||
* [`contentRouting.get`](#contentroutingget)
|
||||
* [`contentRouting.getMany`](#contentroutinggetmany)
|
||||
* [`discovery.advertise`](#discoveryadvertise)
|
||||
* [`discovery.findPeers`](#discoveryfindpeers)
|
||||
* [`peerRouting.findPeer`](#peerroutingfindpeer)
|
||||
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
|
||||
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
|
||||
@ -666,6 +668,64 @@ const key = '/key'
|
||||
const { from, val } = await libp2p.contentRouting.get(key)
|
||||
```
|
||||
|
||||
### discovery.advertise
|
||||
|
||||
Advertise services on the network.
|
||||
This will use content routing modules and rendezvous if enabled.
|
||||
|
||||
`libp2p.discovery.advertise(namespace, options)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| namespace | `string` | namespace of the service to advertise |
|
||||
| [options] | `object` | advertise options |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `Promise<void>` | Promise resolves once advertise messages are sent |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// ...
|
||||
const namespace = '/libp2p/relay'
|
||||
await libp2p.discovery.advertise(namespace)
|
||||
```
|
||||
|
||||
### discovery.findPeers
|
||||
|
||||
Discover peers providing a given service.
|
||||
This will use content routing modules and rendezvous if enabled and will store the peers data in the PeerStore.
|
||||
|
||||
`libp2p.discovery.findPeers(namespace, options)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| namespace | `string` | namespace of the service to find peers |
|
||||
| [options] | `object` | find peers options |
|
||||
| [options.limit] | `number` | number of distinct peers to find |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `AsyncIterable<PeerId>}` | Async iterator for peers |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// Iterate over the peers found for the given namespace
|
||||
for await (const peer of libp2p.discovery.findPeers(namespace)) {
|
||||
console.log(peer)
|
||||
}
|
||||
```
|
||||
|
||||
### peerRouting.findPeer
|
||||
|
||||
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.
|
||||
|
@ -16,6 +16,7 @@ module.exports = (node) => {
|
||||
}
|
||||
|
||||
return {
|
||||
routers,
|
||||
/**
|
||||
* Iterates over all content routers in series to find providers of the given key.
|
||||
* Once a content router succeeds, iteration will stop.
|
||||
@ -31,6 +32,8 @@ module.exports = (node) => {
|
||||
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
|
||||
}
|
||||
|
||||
// TODO: Abortables
|
||||
|
||||
const result = await pAny(
|
||||
routers.map(async (router) => {
|
||||
const provs = await all(router.findProviders(key, options))
|
||||
|
96
src/discovery/index.js
Normal file
96
src/discovery/index.js
Normal file
@ -0,0 +1,96 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:discovery')
|
||||
log.error = debug('libp2p:discovery:error')
|
||||
const errCode = require('err-code')
|
||||
|
||||
// const AbortController = require('abort-controller')
|
||||
const { parallelMerge } = require('streaming-iterables')
|
||||
|
||||
const Rendezvous = require('./rendezvous')
|
||||
const Routing = require('./routing')
|
||||
const { codes } = require('../errors')
|
||||
|
||||
module.exports = (libp2p) => {
|
||||
const addressBook = libp2p.peerStore.addressBook
|
||||
const routing = Routing(libp2p)
|
||||
const rendezvous = Rendezvous(libp2p)
|
||||
|
||||
const getDiscoveryAvailableIts = (namespace, options) => {
|
||||
if (routing.isEnabled && rendezvous.isEnabled) {
|
||||
return parallelMerge(
|
||||
routing.findPeers(namespace, options),
|
||||
rendezvous.findPeers(namespace, options)
|
||||
)
|
||||
} else if (routing.isEnabled) {
|
||||
return routing.findPeers(namespace, options)
|
||||
}
|
||||
return rendezvous.findPeers(namespace, options)
|
||||
}
|
||||
|
||||
return {
|
||||
/**
|
||||
* Advertise services on the network.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async advertise (namespace, options) {
|
||||
if (!routing.isEnabled && !rendezvous.isEnabled) {
|
||||
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
|
||||
}
|
||||
return Promise.all([
|
||||
routing.isEnabled && routing.advertise(namespace, options),
|
||||
rendezvous.isEnabled && rendezvous.advertise(namespace, options)
|
||||
])
|
||||
},
|
||||
|
||||
/**
|
||||
* Discover peers providing a given service.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @param {number} [options.limit] number of distinct peers to find
|
||||
* @param {AsyncIterable<PeerId>}
|
||||
*/
|
||||
async * findPeers (namespace, options = {}) {
|
||||
if (!routing.isEnabled && !rendezvous.isEnabled) {
|
||||
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
|
||||
}
|
||||
|
||||
const discoveredPeers = new Set()
|
||||
// TODO: add abort controller
|
||||
const discAsyncIt = getDiscoveryAvailableIts(namespace, options)
|
||||
|
||||
// Store in the AddressBook: signed record or uncertified
|
||||
for await (const { signedPeerRecord, id, multiaddrs } of discAsyncIt) {
|
||||
if (signedPeerRecord) {
|
||||
const idStr = signedPeerRecord.peerId.toB58String()
|
||||
const isNew = !discoveredPeers.has(idStr)
|
||||
discoveredPeers.add(idStr)
|
||||
|
||||
// Consume peer record and yield if new
|
||||
if (addressBook.consumePeerRecord(signedPeerRecord) && isNew) {
|
||||
yield signedPeerRecord.peerId
|
||||
}
|
||||
} else if (id && multiaddrs) {
|
||||
const idStr = id.toB58String()
|
||||
const isNew = !discoveredPeers.has(idStr)
|
||||
discoveredPeers.add(idStr)
|
||||
|
||||
addressBook.add(id, multiaddrs)
|
||||
|
||||
if (isNew) {
|
||||
yield
|
||||
}
|
||||
}
|
||||
// Abort if enough
|
||||
if (options.limit && options.limit <= discoveredPeers.size) {
|
||||
console.log('abort')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: who handles reprovide??
|
38
src/discovery/rendezvous.js
Normal file
38
src/discovery/rendezvous.js
Normal file
@ -0,0 +1,38 @@
|
||||
'use strict'
|
||||
|
||||
/**
|
||||
* RendezvousDiscovery is an implementation of service discovery
|
||||
* using Rendezvous. Namespaces represent services supported by other nodes.
|
||||
* @param {Libp2p} libp2p
|
||||
*/
|
||||
module.exports = libp2p => {
|
||||
const rendezvous = libp2p.rendezvous
|
||||
const isEnabled = !!rendezvous
|
||||
|
||||
return {
|
||||
isEnabled,
|
||||
/**
|
||||
* Advertise services on the network using the rendezvous module.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @param {object} [options.ttl = 7200e3] registration ttl in ms (minimum 120)
|
||||
* @returns {Promise<number>}
|
||||
*/
|
||||
advertise(namespace, options) {
|
||||
return rendezvous.register(namespace, options)
|
||||
},
|
||||
/**
|
||||
* Discover peers providing a given service.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @param {number} [options.limit] limit of peers to discover
|
||||
* @returns {AsyncIterable<{ signedPeerRecord: Envelope }>}
|
||||
*/
|
||||
async * findPeers(namespace, options) {
|
||||
// TODO: Abortables + options
|
||||
for await (const peer of rendezvous.discover(namespace, options)) {
|
||||
yield peer
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
47
src/discovery/routing.js
Normal file
47
src/discovery/routing.js
Normal file
@ -0,0 +1,47 @@
|
||||
'use strict'
|
||||
|
||||
const { namespaceToCid } = require('./utils')
|
||||
|
||||
/**
|
||||
* RoutingDiscovery is an implementation of service discovery
|
||||
* using ContentRouting. Namespaces represent services supported by other nodes.
|
||||
* @param {Libp2p} libp2p
|
||||
*/
|
||||
module.exports = libp2p => {
|
||||
const contentRouting = libp2p.contentRouting
|
||||
const isEnabled = !!contentRouting.routers.length
|
||||
|
||||
return {
|
||||
isEnabled,
|
||||
/**
|
||||
* Advertise services on the network using content routing modules.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async advertise(namespace, options) {
|
||||
const cid = await namespaceToCid(namespace)
|
||||
|
||||
// TODO: options?
|
||||
await contentRouting.provide(cid)
|
||||
},
|
||||
/**
|
||||
* Discover peers providing a given service.
|
||||
* @param {string} namespace
|
||||
* @param {object} [options]
|
||||
* @param {number} [options.limit] limit of peers to discover
|
||||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
|
||||
*/
|
||||
async * findPeers(namespace, options = {}) {
|
||||
const cid = await namespaceToCid(namespace)
|
||||
const providerOptions = {
|
||||
maxNumProviders: options.limit
|
||||
}
|
||||
|
||||
// TODO: Abortables + options
|
||||
for await (const peer of contentRouting.findProviders(cid, providerOptions)) {
|
||||
yield peer
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
16
src/discovery/utils.js
Normal file
16
src/discovery/utils.js
Normal file
@ -0,0 +1,16 @@
|
||||
'use strict'
|
||||
|
||||
const CID = require('cids')
|
||||
const multihashing = require('multihashing-async')
|
||||
|
||||
/**
|
||||
* Convert a namespace string into a cid.
|
||||
* @param {string} namespace
|
||||
* @return {Promise<CID>}
|
||||
*/
|
||||
module.exports.namespaceToCid = async (namespace) => {
|
||||
const bytes = new TextEncoder('utf8').encode(namespace)
|
||||
const hash = await multihashing(bytes, 'sha2-256')
|
||||
|
||||
return new CID(hash)
|
||||
}
|
@ -16,6 +16,7 @@ exports.codes = {
|
||||
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
||||
ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED',
|
||||
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
||||
ERR_NO_DISCOVERY_IMPLEMENTATIONS: 'ERR_NO_DISCOVERY_IMPLEMENTATIONS',
|
||||
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
||||
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
||||
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
||||
|
Loading…
x
Reference in New Issue
Block a user