diff --git a/doc/API.md b/doc/API.md index 4b412dd9..59cfa15a 100644 --- a/doc/API.md +++ b/doc/API.md @@ -20,6 +20,8 @@ * [`contentRouting.put`](#contentroutingput) * [`contentRouting.get`](#contentroutingget) * [`contentRouting.getMany`](#contentroutinggetmany) + * [`discovery.advertise`](#discoveryadvertise) + * [`discovery.findPeers`](#discoveryfindpeers) * [`peerRouting.findPeer`](#peerroutingfindpeer) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) @@ -666,6 +668,64 @@ const key = '/key' const { from, val } = await libp2p.contentRouting.get(key) ``` +### discovery.advertise + +Advertise services on the network. +This will use content routing modules and rendezvous if enabled. + +`libp2p.discovery.advertise(namespace, options)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| namespace | `string` | namespace of the service to advertise | +| [options] | `object` | advertise options | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | Promise resolves once advertise messages are sent | + +#### Example + +```js +// ... +const namespace = '/libp2p/relay' +await libp2p.discovery.advertise(namespace) +``` + +### discovery.findPeers + +Discover peers providing a given service. +This will use content routing modules and rendezvous if enabled and will store the peers data in the PeerStore. + +`libp2p.discovery.findPeers(namespace, options)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| namespace | `string` | namespace of the service to find peers | +| [options] | `object` | find peers options | +| [options.limit] | `number` | number of distinct peers to find | + +#### Returns + +| Type | Description | +|------|-------------| +| `AsyncIterable}` | Async iterator for peers | + +#### Example + +```js +// Iterate over the peers found for the given namespace +for await (const peer of libp2p.discovery.findPeers(namespace)) { + console.log(peer) +} +``` + ### peerRouting.findPeer Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first. diff --git a/src/content-routing.js b/src/content-routing.js index dcf549e7..340fc328 100644 --- a/src/content-routing.js +++ b/src/content-routing.js @@ -16,6 +16,7 @@ module.exports = (node) => { } return { + routers, /** * Iterates over all content routers in series to find providers of the given key. * Once a content router succeeds, iteration will stop. @@ -31,6 +32,8 @@ module.exports = (node) => { throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE') } + // TODO: Abortables + const result = await pAny( routers.map(async (router) => { const provs = await all(router.findProviders(key, options)) diff --git a/src/discovery/index.js b/src/discovery/index.js new file mode 100644 index 00000000..bf2b6626 --- /dev/null +++ b/src/discovery/index.js @@ -0,0 +1,96 @@ +'use strict' + +const debug = require('debug') +const log = debug('libp2p:discovery') +log.error = debug('libp2p:discovery:error') +const errCode = require('err-code') + +// const AbortController = require('abort-controller') +const { parallelMerge } = require('streaming-iterables') + +const Rendezvous = require('./rendezvous') +const Routing = require('./routing') +const { codes } = require('../errors') + +module.exports = (libp2p) => { + const addressBook = libp2p.peerStore.addressBook + const routing = Routing(libp2p) + const rendezvous = Rendezvous(libp2p) + + const getDiscoveryAvailableIts = (namespace, options) => { + if (routing.isEnabled && rendezvous.isEnabled) { + return parallelMerge( + routing.findPeers(namespace, options), + rendezvous.findPeers(namespace, options) + ) + } else if (routing.isEnabled) { + return routing.findPeers(namespace, options) + } + return rendezvous.findPeers(namespace, options) + } + + return { + /** + * Advertise services on the network. + * @param {string} namespace + * @param {object} [options] + * @returns {Promise} + */ + async advertise (namespace, options) { + if (!routing.isEnabled && !rendezvous.isEnabled) { + throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS) + } + return Promise.all([ + routing.isEnabled && routing.advertise(namespace, options), + rendezvous.isEnabled && rendezvous.advertise(namespace, options) + ]) + }, + + /** + * Discover peers providing a given service. + * @param {string} namespace + * @param {object} [options] + * @param {number} [options.limit] number of distinct peers to find + * @param {AsyncIterable} + */ + async * findPeers (namespace, options = {}) { + if (!routing.isEnabled && !rendezvous.isEnabled) { + throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS) + } + + const discoveredPeers = new Set() + // TODO: add abort controller + const discAsyncIt = getDiscoveryAvailableIts(namespace, options) + + // Store in the AddressBook: signed record or uncertified + for await (const { signedPeerRecord, id, multiaddrs } of discAsyncIt) { + if (signedPeerRecord) { + const idStr = signedPeerRecord.peerId.toB58String() + const isNew = !discoveredPeers.has(idStr) + discoveredPeers.add(idStr) + + // Consume peer record and yield if new + if (addressBook.consumePeerRecord(signedPeerRecord) && isNew) { + yield signedPeerRecord.peerId + } + } else if (id && multiaddrs) { + const idStr = id.toB58String() + const isNew = !discoveredPeers.has(idStr) + discoveredPeers.add(idStr) + + addressBook.add(id, multiaddrs) + + if (isNew) { + yield + } + } + // Abort if enough + if (options.limit && options.limit <= discoveredPeers.size) { + console.log('abort') + } + } + } + } +} + +// TODO: who handles reprovide?? diff --git a/src/discovery/rendezvous.js b/src/discovery/rendezvous.js new file mode 100644 index 00000000..2fae0a85 --- /dev/null +++ b/src/discovery/rendezvous.js @@ -0,0 +1,38 @@ +'use strict' + +/** + * RendezvousDiscovery is an implementation of service discovery + * using Rendezvous. Namespaces represent services supported by other nodes. + * @param {Libp2p} libp2p + */ +module.exports = libp2p => { + const rendezvous = libp2p.rendezvous + const isEnabled = !!rendezvous + + return { + isEnabled, + /** + * Advertise services on the network using the rendezvous module. + * @param {string} namespace + * @param {object} [options] + * @param {object} [options.ttl = 7200e3] registration ttl in ms (minimum 120) + * @returns {Promise} + */ + advertise(namespace, options) { + return rendezvous.register(namespace, options) + }, + /** + * Discover peers providing a given service. + * @param {string} namespace + * @param {object} [options] + * @param {number} [options.limit] limit of peers to discover + * @returns {AsyncIterable<{ signedPeerRecord: Envelope }>} + */ + async * findPeers(namespace, options) { + // TODO: Abortables + options + for await (const peer of rendezvous.discover(namespace, options)) { + yield peer + } + } + } +} diff --git a/src/discovery/routing.js b/src/discovery/routing.js new file mode 100644 index 00000000..bf9b66ea --- /dev/null +++ b/src/discovery/routing.js @@ -0,0 +1,47 @@ +'use strict' + +const { namespaceToCid } = require('./utils') + +/** + * RoutingDiscovery is an implementation of service discovery + * using ContentRouting. Namespaces represent services supported by other nodes. + * @param {Libp2p} libp2p + */ +module.exports = libp2p => { + const contentRouting = libp2p.contentRouting + const isEnabled = !!contentRouting.routers.length + + return { + isEnabled, + /** + * Advertise services on the network using content routing modules. + * @param {string} namespace + * @param {object} [options] + * @returns {Promise} + */ + async advertise(namespace, options) { + const cid = await namespaceToCid(namespace) + + // TODO: options? + await contentRouting.provide(cid) + }, + /** + * Discover peers providing a given service. + * @param {string} namespace + * @param {object} [options] + * @param {number} [options.limit] limit of peers to discover + * @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} + */ + async * findPeers(namespace, options = {}) { + const cid = await namespaceToCid(namespace) + const providerOptions = { + maxNumProviders: options.limit + } + + // TODO: Abortables + options + for await (const peer of contentRouting.findProviders(cid, providerOptions)) { + yield peer + } + } + } +} diff --git a/src/discovery/utils.js b/src/discovery/utils.js new file mode 100644 index 00000000..7426271c --- /dev/null +++ b/src/discovery/utils.js @@ -0,0 +1,16 @@ +'use strict' + +const CID = require('cids') +const multihashing = require('multihashing-async') + +/** + * Convert a namespace string into a cid. + * @param {string} namespace + * @return {Promise} + */ +module.exports.namespaceToCid = async (namespace) => { + const bytes = new TextEncoder('utf8').encode(namespace) + const hash = await multihashing(bytes, 'sha2-256') + + return new CID(hash) +} diff --git a/src/errors.js b/src/errors.js index 18e600c6..cf3f78de 100644 --- a/src/errors.js +++ b/src/errors.js @@ -16,6 +16,7 @@ exports.codes = { ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED', ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES', + ERR_NO_DISCOVERY_IMPLEMENTATIONS: 'ERR_NO_DISCOVERY_IMPLEMENTATIONS', ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF', ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',