feat: support delegated value store in content-routing module

This commit is contained in:
Yusef Napora 2021-10-05 17:00:07 -04:00 committed by Yusef Napora
parent a335fda852
commit f9e5db5b2a
7 changed files with 297 additions and 26 deletions

View File

@ -190,6 +190,23 @@ If you want to know more about libp2p pubsub, you should read the following cont
- https://docs.libp2p.io/concepts/publish-subscribe
- https://github.com/libp2p/specs/tree/master/pubsub
### Value Storage
Some libp2p components are able to provide Key/Value storage capabilities that can be used by other libp2p components. A Value Storage module implements the [ValueStore interface](https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interfaces/src/value-store/types.d.ts), which provides `put`
and `get` methods for storing arbitrary binary data.
Some available peer routing modules are:
- [js-libp2p-kad-dht](https://github.com/libp2p/js-libp2p-kad-dht)
- [js-libp2p-delegated-content-routing](https://github.com/libp2p/js-libp2p-delegated-content-routing)
- via `DelgatedValueStore`
The current [DHT](#dht) implementation implements the `ValueStore` interface, and if the DHT is enabled
there is no need to separately enable the value storage capability.
Other implementations of value storage may be enabled by including a `ValueStore` implementation in
the `modules.valueStorage` configuration as shown below.
## Customizing libp2p
When [creating a libp2p node](./API.md#create), the modules needed should be specified as follows:
@ -202,6 +219,7 @@ const modules = {
contentRouting: [],
peerRouting: [],
peerDiscovery: [],
valueStorage: [],
dht: dhtImplementation,
pubsub: pubsubImplementation
}
@ -394,6 +412,7 @@ const { NOISE } = require('libp2p-noise')
const ipfsHttpClient = require('ipfs-http-client')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
const DelegatedValueStore = require('libp2p-delegated-content-routing/value-store')
const PeerId = require('peer-id')
// create a peerId
@ -411,6 +430,12 @@ const delegatedContentRouting = new DelegatedContentRouter(peerId, ipfsHttpClien
port: 443
}))
const delegatedValueStore = new DelegatedValueStore(ipfsHttpClient.create({
host: 'node0.delegate.ipfs.io', // In production you should setup your own delegates
protocol: 'https',
port: 443
}))
const node = await Libp2p.create({
modules: {
transport: [TCP],
@ -418,6 +443,7 @@ const node = await Libp2p.create({
connEncryption: [NOISE],
contentRouting: [delegatedContentRouting],
peerRouting: [delegatedPeerRouting],
valueStorage: [delegatedValueStore],
},
peerId,
peerRouting: { // Peer routing configuration

View File

@ -78,10 +78,10 @@
]
},
"dependencies": {
"abortable-iterator": "^3.0.0",
"@motrix/nat-api": "^0.3.1",
"@vascosantos/moving-average": "^1.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"aggregate-error": "^3.1.0",
"any-signal": "^2.1.1",
"bignumber.js": "^9.0.1",
@ -104,7 +104,6 @@
"it-pipe": "^1.1.0",
"it-take": "^1.0.0",
"libp2p-crypto": "^0.19.4",
"libp2p-interfaces": "^1.0.0",
"libp2p-utils": "^0.4.0",
"mafmt": "^10.0.0",
"merge-options": "^3.0.4",
@ -149,7 +148,7 @@
"it-pushable": "^1.4.0",
"libp2p": ".",
"libp2p-bootstrap": "^0.13.0",
"libp2p-delegated-content-routing": "^0.11.0",
"libp2p-delegated-content-routing": "git+ssh://git@github.com/libp2p/js-libp2p-delegated-content-routing#362cd00988e717f6fc49b0a1f2fa7bbaabbfcc53",
"libp2p-delegated-peer-routing": "^0.10.0",
"libp2p-floodsub": "^0.27.0",
"libp2p-gossipsub": "^0.11.0",

View File

@ -6,7 +6,8 @@ const {
storeAddresses,
uniquePeers,
requirePeers,
maybeLimitSource
maybeLimitSource,
raceToSuccess
} = require('./utils')
const merge = require('it-merge')
@ -17,12 +18,8 @@ const { pipe } = require('it-pipe')
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('multiformats/cid').CID} CID
* @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule
*/
/**
* @typedef {Object} GetData
* @property {PeerId} from
* @property {Uint8Array} val
* @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule
* @typedef {import('libp2p-interfaces/src/value-store/types').GetValueResult} GetData
*/
class ContentRouting {
@ -34,11 +31,16 @@ class ContentRouting {
this.libp2p = libp2p
/** @type {ContentRoutingModule[]} */
this.routers = libp2p._modules.contentRouting || []
/** @type {ValueStoreModule[]} */
this.valueStores = libp2p._modules.valueStorage || []
this.dht = libp2p._dht
// If we have the dht, add it to the available content routers
// If we have the dht, add it to the available content routers and value stores
if (this.dht && libp2p._config.dht.enabled) {
this.routers.push(this.dht)
if (!this.valueStores.includes(this.dht)) {
this.valueStores.push(this.dht)
}
}
}
@ -83,7 +85,7 @@ class ContentRouting {
}
/**
* Store the given key/value pair in the DHT.
* Store the given key/value pair in the DHT and/or configured ValueStore.
*
* @param {Uint8Array} key
* @param {Uint8Array} value
@ -91,12 +93,25 @@ class ContentRouting {
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put (key, value, options) {
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
async put (key, value, options) {
if (!this.libp2p.isStarted()) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED)
}
if (this.libp2p._config.dht.enabled && !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return this.dht.put(key, value, options)
if (this.valueStores.length === 0) {
throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE)
}
const promises = []
for (const store of this.valueStores) {
promises.push(store.put(key, value, options))
}
await Promise.all(promises)
}
/**
@ -109,11 +124,24 @@ class ContentRouting {
* @returns {Promise<GetData>}
*/
get (key, options) {
if (!this.libp2p.isStarted() || !this.dht.isStarted) {
if (!this.libp2p.isStarted()) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.ERR_NODE_NOT_STARTED)
}
if (this.libp2p._config.dht.enabled && !this.dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}
return this.dht.get(key, options)
if (this.valueStores.length === 0) {
throw errCode(new Error(messages.VALUE_STORE_REQUIRED), codes.ERR_VALUE_STORE_UNAVAILABLE)
}
const promises = []
for (const store of this.valueStores) {
promises.push(store.get(key, options))
}
return raceToSuccess(promises)
}
/**

View File

@ -81,9 +81,39 @@ function maybeLimitSource (source, max) {
return source
}
/**
* Like Promise.race, but only fails if all input promises fail.
*
* Returns a promise that will resolve with the value of the first promise
* to resolve, but will only fail if all promises fail.
*
* @template {any} T
* @param {Promise<T>[]} promises - an array of promises.
* @returns {Promise<T>} the resolved value of the first promise that succeeded, or an Error if _all_ promises fail.
*/
function raceToSuccess (promises) {
const combineErrors = (/** @type Error[] */ errors) => {
if (errors.length === 1) {
return errors[0]
}
return new Error(`${errors.length} operations failed: ` + errors.map(e => e.message).join(', '))
}
return Promise.all(promises.map(p => {
return p.then(
val => Promise.reject(val),
err => Promise.resolve(err)
)
})).then(
errors => Promise.reject(combineErrors(errors)),
val => Promise.resolve(val)
)
}
module.exports = {
storeAddresses,
uniquePeers,
requirePeers,
maybeLimitSource
maybeLimitSource,
raceToSuccess
}

View File

@ -3,7 +3,8 @@
exports.messages = {
NOT_STARTED_YET: 'The libp2p node is not started yet',
DHT_DISABLED: 'DHT is not available',
CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required'
CONN_ENCRYPTION_REQUIRED: 'At least one connection encryption module is required',
VALUE_STORE_REQUIRED: 'At least one value storage module is required for this operation if the DHT is not enabled.'
}
exports.codes = {
@ -34,5 +35,6 @@ exports.codes = {
ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED',
ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL',
ERR_INVALID_MULTIADDR: 'ERR_INVALID_MULTIADDR',
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID'
ERR_SIGNATURE_NOT_VALID: 'ERR_SIGNATURE_NOT_VALID',
ERR_VALUE_STORE_UNAVAILABLE: 'ERR_VALUE_STORE_UNAVAILABLE'
}

View File

@ -40,6 +40,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @typedef {import('libp2p-interfaces/src/transport/types').TransportFactory<any, any>} TransportFactory
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxerFactory} MuxerFactory
* @typedef {import('libp2p-interfaces/src/content-routing/types').ContentRouting} ContentRoutingModule
* @typedef {import('libp2p-interfaces/src/value-store/types').ValueStore} ValueStoreModule
* @typedef {import('libp2p-interfaces/src/peer-discovery/types').PeerDiscoveryFactory} PeerDiscoveryFactory
* @typedef {import('libp2p-interfaces/src/peer-routing/types').PeerRouting} PeerRoutingModule
* @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto
@ -102,6 +103,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {PeerDiscoveryFactory[]} [peerDiscovery]
* @property {PeerRoutingModule[]} [peerRouting]
* @property {ContentRoutingModule[]} [contentRouting]
* @property {ValueStoreModule[]} [valueStorage]
* @property {Object} [dht]
* @property {{new(...args: any[]): Pubsub}} [pubsub]
* @property {Protector} [connProtector]

View File

@ -11,12 +11,14 @@ const mergeOptions = require('merge-options')
const { CID } = require('multiformats/cid')
const ipfsHttpClient = require('ipfs-http-client')
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
const DelegatedValueStore = require('libp2p-delegated-content-routing/src/value-store')
const { Multiaddr } = require('multiaddr')
const drain = require('it-drain')
const all = require('it-all')
const peerUtils = require('../utils/creators/peer')
const { baseOptions, routingOptions } = require('./utils')
const uint8arrays = require('uint8arrays')
describe('content-routing', () => {
describe('no routers', () => {
@ -96,25 +98,58 @@ describe('content-routing', () => {
return deferred.promise
})
it('should put a key/value pair to the DHT', async () => {
const deferred = pDefer()
sinon.stub(nodes[0]._dht, 'put').callsFake(async () => {
deferred.resolve()
})
const key = new TextEncoder().encode('/foo/bar')
const val = new TextEncoder().encode('hello-world')
await nodes[0].contentRouting.put(key, val)
return deferred.promise
})
it('should get a value by key from the DHT', async () => {
const deferred = pDefer()
sinon.stub(nodes[0]._dht, 'get').callsFake(async () => {
const val = new TextEncoder().encode('hello-world')
deferred.resolve(val)
return { from: nodes[0].id, val }
})
const key = new TextEncoder().encode('/foo/bar')
const res = await nodes[0].contentRouting.get(key)
expect(res.from).to.equal(nodes[0].id)
return deferred.promise
})
})
describe('via delegate router', () => {
let node
let delegate
let valueStore
beforeEach(async () => {
const [peerId] = await peerUtils.createPeerId({ fixture: true })
const [delegateId] = await peerUtils.createPeerId({ fixture: true })
delegate = new DelegatedContentRouter(peerId, ipfsHttpClient.create({
const ipfsClient = ipfsHttpClient.create({
host: '0.0.0.0',
protocol: 'http',
port: 60197
}))
})
delegate = new DelegatedContentRouter(peerId, ipfsClient)
valueStore = new DelegatedValueStore(delegateId, ipfsClient)
;[node] = await peerUtils.createPeer({
config: mergeOptions(baseOptions, {
modules: {
contentRouting: [delegate]
contentRouting: [delegate],
valueStorage: [valueStore]
},
config: {
dht: {
@ -244,25 +279,67 @@ describe('content-routing', () => {
expect(mockApi.isDone()).to.equal(true)
})
it('should put a key/value pair using the delegated node', async () => {
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/put')
.query(true)
.reply(200, '', [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
const val = new TextEncoder().encode('a-value')
await node.contentRouting.put(key, val)
expect(mockApi.isDone()).to.equal(true)
})
it('should get a value by key using the delegated node', async () => {
const val = new TextEncoder().encode('hello-world')
const valueBase64 = uint8arrays.toString(val, 'base64pad')
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/get')
.query(true)
.reply(200, `{"Extra":"${valueBase64}","Type":5}`, [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
await node.contentRouting.get(key)
expect(mockApi.isDone()).to.equal(true)
})
})
describe('via dht and delegate routers', () => {
let node
let nodeId
let delegate
let delegateId
let valueStore
beforeEach(async () => {
const [peerId] = await peerUtils.createPeerId({ fixture: true })
const [delegatePeerId] = await peerUtils.createPeerId({ fixture: true })
nodeId = peerId
delegateId = delegatePeerId
delegate = new DelegatedContentRouter(peerId, ipfsHttpClient.create({
const ipfsClient = ipfsHttpClient.create({
host: '0.0.0.0',
protocol: 'http',
port: 60197
}))
})
delegate = new DelegatedContentRouter(peerId, ipfsClient)
valueStore = new DelegatedValueStore(delegateId, ipfsClient)
;[node] = await peerUtils.createPeer({
config: mergeOptions(routingOptions, {
modules: {
contentRouting: [delegate]
contentRouting: [delegate],
valueStorage: [valueStore]
}
})
})
@ -442,5 +519,112 @@ describe('content-routing', () => {
expect(providers).to.have.length.above(0)
expect(providers).to.eql(results)
})
it('should put values to the DHT and delegated node', async () => {
const deferredDHT = pDefer()
sinon.stub(node._dht, 'put').callsFake(async () => {
deferredDHT.resolve()
})
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/put')
.query(true)
.reply(200, '', [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
const val = new TextEncoder().encode('hello-world')
await node.contentRouting.put(key, val)
expect(mockApi.isDone()).to.equal(true)
return deferredDHT.promise
})
it('should try to get values by key from both DHT and delegated node', async () => {
const deferred = pDefer()
sinon.stub(node._dht, 'get').callsFake(async () => {
// small delay to allow delegate call to go through before dht promise resolves
await new Promise(resolve => setTimeout(resolve, 10))
const val = new TextEncoder().encode('hello-world')
deferred.resolve(val)
const from = nodeId
return { from, val }
})
const val = new TextEncoder().encode('hello-world')
const valueBase64 = uint8arrays.toString(val, 'base64pad')
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/get')
.query(true)
.reply(200, `{"Extra":"${valueBase64}","Type":5}`, [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
await node.contentRouting.get(key)
expect(mockApi.isDone()).to.equal(true)
return deferred.promise
})
it('should return a value for a key from the delegate node if the DHT fails', async () => {
const deferred = pDefer()
sinon.stub(node._dht, 'get').callsFake(async () => {
deferred.resolve()
throw new Error('bang!')
})
const val = new TextEncoder().encode('hello-world')
const valueBase64 = uint8arrays.toString(val, 'base64pad')
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/get')
.query(true)
.reply(200, `{"Extra":"${valueBase64}","Type":5}`, [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
const res = await node.contentRouting.get(key)
const returnedValue = new TextDecoder().decode(res.val)
expect(mockApi.isDone()).to.equal(true)
expect(res.from).to.equal(delegateId)
expect(returnedValue).to.equal('hello-world')
return deferred.promise
})
it('should return a value for key from the DHT if the delegate node fails', async () => {
const deferred = pDefer()
sinon.stub(node._dht, 'get').callsFake(async () => {
// small delay to allow delegate call to go through before dht promise resolves
await new Promise(resolve => setTimeout(resolve, 10))
const val = new TextEncoder().encode('hello-world')
deferred.resolve(val)
const from = nodeId
return { from, val }
})
const mockApi = nock('http://0.0.0.0:60197')
.post('/api/v0/dht/get')
.query(true)
.reply(503, 'No soup for you!', [
'Content-Type', 'application/json',
'X-Chunked-Output', '1'
])
const key = new TextEncoder().encode('/foo/bar')
const res = await node.contentRouting.get(key)
const valueString = new TextDecoder().decode(res.val)
expect(mockApi.isDone()).to.equal(true)
expect(res.from).to.deep.equal(nodeId)
expect(valueString).to.equal('hello-world')
return deferred.promise
})
})
})