mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-24 06:21:32 +00:00
refactor: async routing (#489)
* feat: async routing * chore: put dht extra api commands under content routing * chore: add default option to createPeerInfo Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * chore: address review * chore: rm dlv
This commit is contained in:
323
test/content-routing/content-routing.node.js
Normal file
323
test/content-routing/content-routing.node.js
Normal file
@ -0,0 +1,323 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
const nock = require('nock')
|
||||
const sinon = require('sinon')
|
||||
|
||||
const pDefer = require('p-defer')
|
||||
const mergeOptions = require('merge-options')
|
||||
|
||||
const CID = require('cids')
|
||||
const DelegatedContentRouter = require('libp2p-delegated-content-routing')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const peerUtils = require('../utils/creators/peer')
|
||||
const { baseOptions, routingOptions } = require('./utils')
|
||||
|
||||
describe('content-routing', () => {
|
||||
describe('no routers', () => {
|
||||
let node
|
||||
|
||||
before(async () => {
|
||||
[node] = await peerUtils.createPeer({
|
||||
config: baseOptions
|
||||
})
|
||||
})
|
||||
|
||||
it('.findProviders should return an error', async () => {
|
||||
try {
|
||||
for await (const _ of node.contentRouting.findProviders('a cid')) {} // eslint-disable-line no-unused-vars
|
||||
throw new Error('.findProviders should return an error')
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
expect(err.code).to.equal('NO_ROUTERS_AVAILABLE')
|
||||
}
|
||||
})
|
||||
|
||||
it('.provide should return an error', async () => {
|
||||
await expect(node.contentRouting.provide('a cid'))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', 'NO_ROUTERS_AVAILABLE')
|
||||
})
|
||||
})
|
||||
|
||||
describe('via dht router', () => {
|
||||
const number = 5
|
||||
let nodes
|
||||
|
||||
before(async () => {
|
||||
nodes = await peerUtils.createPeer({
|
||||
number,
|
||||
config: routingOptions
|
||||
})
|
||||
|
||||
// Ring dial
|
||||
await Promise.all(
|
||||
nodes.map((peer, i) => peer.dial(nodes[(i + 1) % number].peerInfo))
|
||||
)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
after(() => Promise.all(nodes.map((n) => n.stop())))
|
||||
|
||||
it('should use the nodes dht to provide', () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
sinon.stub(nodes[0]._dht, 'provide').callsFake(() => {
|
||||
deferred.resolve()
|
||||
})
|
||||
|
||||
nodes[0].contentRouting.provide()
|
||||
return deferred.promise
|
||||
})
|
||||
|
||||
it('should use the nodes dht to find providers', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
sinon.stub(nodes[0]._dht, 'findProviders').callsFake(function * () {
|
||||
deferred.resolve()
|
||||
yield
|
||||
})
|
||||
|
||||
await nodes[0].contentRouting.findProviders().next()
|
||||
|
||||
return deferred.promise
|
||||
})
|
||||
})
|
||||
|
||||
describe('via delegate router', () => {
|
||||
let node
|
||||
let delegate
|
||||
|
||||
beforeEach(async () => {
|
||||
const [peerInfo] = await peerUtils.createPeerInfo({ fixture: false })
|
||||
|
||||
delegate = new DelegatedContentRouter(peerInfo.id, {
|
||||
host: '0.0.0.0',
|
||||
protocol: 'http',
|
||||
port: 60197
|
||||
}, [
|
||||
multiaddr('/ip4/0.0.0.0/tcp/60197')
|
||||
])
|
||||
|
||||
;[node] = await peerUtils.createPeer({
|
||||
config: mergeOptions(baseOptions, {
|
||||
modules: {
|
||||
contentRouting: [delegate]
|
||||
},
|
||||
config: {
|
||||
dht: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
afterEach(() => node.stop())
|
||||
|
||||
it('should use the delegate router to provide', () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
sinon.stub(delegate, 'provide').callsFake(() => {
|
||||
deferred.resolve()
|
||||
})
|
||||
|
||||
node.contentRouting.provide()
|
||||
return deferred.promise
|
||||
})
|
||||
|
||||
it('should use the delegate router to find providers', async () => {
|
||||
const deferred = pDefer()
|
||||
|
||||
sinon.stub(delegate, 'findProviders').callsFake(function * () {
|
||||
deferred.resolve()
|
||||
yield
|
||||
})
|
||||
|
||||
await node.contentRouting.findProviders().next()
|
||||
|
||||
return deferred.promise
|
||||
})
|
||||
|
||||
it('should be able to register as a provider', async () => {
|
||||
const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB')
|
||||
const mockApi = nock('http://0.0.0.0:60197')
|
||||
// mock the refs call
|
||||
.post('/api/v0/refs')
|
||||
.query({
|
||||
recursive: false,
|
||||
arg: cid.toBaseEncodedString()
|
||||
})
|
||||
.reply(200, null, [
|
||||
'Content-Type', 'application/json',
|
||||
'X-Chunked-Output', '1'
|
||||
])
|
||||
|
||||
await node.contentRouting.provide(cid)
|
||||
|
||||
expect(mockApi.isDone()).to.equal(true)
|
||||
})
|
||||
|
||||
it('should handle errors when registering as a provider', async () => {
|
||||
const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB')
|
||||
const mockApi = nock('http://0.0.0.0:60197')
|
||||
// mock the refs call
|
||||
.post('/api/v0/refs')
|
||||
.query({
|
||||
recursive: false,
|
||||
arg: cid.toBaseEncodedString()
|
||||
})
|
||||
.reply(502, 'Bad Gateway', ['Content-Type', 'application/json'])
|
||||
|
||||
await expect(node.contentRouting.provide(cid))
|
||||
.to.eventually.be.rejected()
|
||||
|
||||
expect(mockApi.isDone()).to.equal(true)
|
||||
})
|
||||
|
||||
it('should be able to find providers', async () => {
|
||||
const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB')
|
||||
const provider = 'QmZNgCqZCvTsi3B4Vt7gsSqpkqDpE7M2Y9TDmEhbDb4ceF'
|
||||
|
||||
const mockApi = nock('http://0.0.0.0:60197')
|
||||
.post('/api/v0/dht/findprovs')
|
||||
.query({
|
||||
arg: cid.toBaseEncodedString()
|
||||
})
|
||||
.reply(200, `{"Extra":"","ID":"QmWKqWXCtRXEeCQTo3FoZ7g4AfnGiauYYiczvNxFCHicbB","Responses":[{"Addrs":["/ip4/0.0.0.0/tcp/0"],"ID":"${provider}"}],"Type":4}\n`, [
|
||||
'Content-Type', 'application/json',
|
||||
'X-Chunked-Output', '1'
|
||||
])
|
||||
|
||||
const providers = []
|
||||
for await (const provider of node.contentRouting.findProviders(cid, { timeout: 1000 })) {
|
||||
providers.push(provider)
|
||||
}
|
||||
|
||||
expect(providers).to.have.length(1)
|
||||
expect(providers[0].id.toB58String()).to.equal(provider)
|
||||
expect(mockApi.isDone()).to.equal(true)
|
||||
})
|
||||
|
||||
it('should handle errors when finding providers', async () => {
|
||||
const cid = new CID('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB')
|
||||
const mockApi = nock('http://0.0.0.0:60197')
|
||||
.post('/api/v0/dht/findprovs')
|
||||
.query({
|
||||
arg: cid.toBaseEncodedString()
|
||||
})
|
||||
.reply(502, 'Bad Gateway', [
|
||||
'X-Chunked-Output', '1'
|
||||
])
|
||||
|
||||
try {
|
||||
for await (const _ of node.contentRouting.findProviders(cid)) { } // eslint-disable-line no-unused-vars
|
||||
throw new Error('should handle errors when finding providers')
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
}
|
||||
|
||||
expect(mockApi.isDone()).to.equal(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe('via dht and delegate routers', () => {
|
||||
let node
|
||||
let delegate
|
||||
|
||||
beforeEach(async () => {
|
||||
const [peerInfo] = await peerUtils.createPeerInfo({ fixture: false })
|
||||
|
||||
delegate = new DelegatedContentRouter(peerInfo.id, {
|
||||
host: '0.0.0.0',
|
||||
protocol: 'http',
|
||||
port: 60197
|
||||
}, [
|
||||
multiaddr('/ip4/0.0.0.0/tcp/60197')
|
||||
])
|
||||
|
||||
;[node] = await peerUtils.createPeer({
|
||||
config: mergeOptions(routingOptions, {
|
||||
modules: {
|
||||
contentRouting: [delegate]
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
afterEach(() => node.stop())
|
||||
|
||||
it('should use both the dht and delegate router to provide', async () => {
|
||||
const dhtDeferred = pDefer()
|
||||
const delegatedDeferred = pDefer()
|
||||
|
||||
sinon.stub(node._dht, 'provide').callsFake(() => {
|
||||
dhtDeferred.resolve()
|
||||
})
|
||||
|
||||
sinon.stub(delegate, 'provide').callsFake(() => {
|
||||
delegatedDeferred.resolve()
|
||||
})
|
||||
|
||||
await node.contentRouting.provide()
|
||||
|
||||
await Promise.all([
|
||||
dhtDeferred.promise,
|
||||
delegatedDeferred.promise
|
||||
])
|
||||
})
|
||||
|
||||
it('should only use the dht if it finds providers', async () => {
|
||||
const results = [true]
|
||||
|
||||
sinon.stub(node._dht, 'findProviders').callsFake(function * () {
|
||||
yield results[0]
|
||||
})
|
||||
|
||||
sinon.stub(delegate, 'findProviders').callsFake(function * () { // eslint-disable-line require-yield
|
||||
throw new Error('the delegate should not have been called')
|
||||
})
|
||||
|
||||
const providers = []
|
||||
for await (const prov of node.contentRouting.findProviders('a cid')) {
|
||||
providers.push(prov)
|
||||
}
|
||||
|
||||
expect(providers).to.have.length.above(0)
|
||||
expect(providers).to.eql(results)
|
||||
})
|
||||
|
||||
it('should use the delegate if the dht fails to find providers', async () => {
|
||||
const results = [true]
|
||||
|
||||
sinon.stub(node._dht, 'findProviders').callsFake(function * () {})
|
||||
|
||||
sinon.stub(delegate, 'findProviders').callsFake(function * () {
|
||||
yield results[0]
|
||||
})
|
||||
|
||||
const providers = []
|
||||
for await (const prov of node.contentRouting.findProviders('a cid')) {
|
||||
providers.push(prov)
|
||||
}
|
||||
|
||||
expect(providers).to.have.length.above(0)
|
||||
expect(providers).to.eql(results)
|
||||
})
|
||||
})
|
||||
})
|
92
test/content-routing/dht/configuration.node.js
Normal file
92
test/content-routing/dht/configuration.node.js
Normal file
@ -0,0 +1,92 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
|
||||
const mergeOptions = require('merge-options')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const { create } = require('../../../src')
|
||||
const { baseOptions, subsystemOptions } = require('./utils')
|
||||
const peerUtils = require('../../utils/creators/peer')
|
||||
|
||||
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||
|
||||
describe('DHT subsystem is configurable', () => {
|
||||
let libp2p
|
||||
|
||||
afterEach(async () => {
|
||||
libp2p && await libp2p.stop()
|
||||
})
|
||||
|
||||
it('should not exist if no module is provided', async () => {
|
||||
libp2p = await create(baseOptions)
|
||||
expect(libp2p._dht).to.not.exist()
|
||||
})
|
||||
|
||||
it('should exist if the module is provided', async () => {
|
||||
libp2p = await create(subsystemOptions)
|
||||
expect(libp2p._dht).to.exist()
|
||||
})
|
||||
|
||||
it('should start and stop by default once libp2p starts', async () => {
|
||||
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||
peerInfo.multiaddrs.add(listenAddr)
|
||||
|
||||
const customOptions = mergeOptions(subsystemOptions, {
|
||||
peerInfo
|
||||
})
|
||||
|
||||
libp2p = await create(customOptions)
|
||||
expect(libp2p._dht.isStarted).to.equal(false)
|
||||
|
||||
await libp2p.start()
|
||||
expect(libp2p._dht.isStarted).to.equal(true)
|
||||
|
||||
await libp2p.stop()
|
||||
expect(libp2p._dht.isStarted).to.equal(false)
|
||||
})
|
||||
|
||||
it('should not start if disabled once libp2p starts', async () => {
|
||||
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||
peerInfo.multiaddrs.add(listenAddr)
|
||||
|
||||
const customOptions = mergeOptions(subsystemOptions, {
|
||||
peerInfo,
|
||||
config: {
|
||||
dht: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
libp2p = await create(customOptions)
|
||||
expect(libp2p._dht.isStarted).to.equal(false)
|
||||
|
||||
await libp2p.start()
|
||||
expect(libp2p._dht.isStarted).to.equal(false)
|
||||
})
|
||||
|
||||
it('should allow a manual start', async () => {
|
||||
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||
peerInfo.multiaddrs.add(listenAddr)
|
||||
|
||||
const customOptions = mergeOptions(subsystemOptions, {
|
||||
peerInfo,
|
||||
config: {
|
||||
dht: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
libp2p = await create(customOptions)
|
||||
await libp2p.start()
|
||||
expect(libp2p._dht.isStarted).to.equal(false)
|
||||
|
||||
await libp2p._dht.start()
|
||||
expect(libp2p._dht.isStarted).to.equal(true)
|
||||
})
|
||||
})
|
135
test/content-routing/dht/operation.node.js
Normal file
135
test/content-routing/dht/operation.node.js
Normal file
@ -0,0 +1,135 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
|
||||
const pWaitFor = require('p-wait-for')
|
||||
const mergeOptions = require('merge-options')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const { create } = require('../../../src')
|
||||
const { subsystemOptions, subsystemMulticodecs } = require('./utils')
|
||||
const peerUtils = require('../../utils/creators/peer')
|
||||
|
||||
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/8000')
|
||||
const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/8001')
|
||||
|
||||
describe('DHT subsystem operates correctly', () => {
|
||||
let peerInfo, remotePeerInfo
|
||||
let libp2p, remoteLibp2p
|
||||
let remAddr
|
||||
|
||||
beforeEach(async () => {
|
||||
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfo({ number: 2 })
|
||||
|
||||
peerInfo.multiaddrs.add(listenAddr)
|
||||
remotePeerInfo.multiaddrs.add(remoteListenAddr)
|
||||
})
|
||||
|
||||
describe('dht started before connect', () => {
|
||||
beforeEach(async () => {
|
||||
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||
peerInfo
|
||||
}))
|
||||
|
||||
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||
peerInfo: remotePeerInfo
|
||||
}))
|
||||
|
||||
await Promise.all([
|
||||
libp2p.start(),
|
||||
remoteLibp2p.start()
|
||||
])
|
||||
|
||||
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||
})
|
||||
|
||||
afterEach(() => Promise.all([
|
||||
libp2p && libp2p.stop(),
|
||||
remoteLibp2p && remoteLibp2p.stop()
|
||||
]))
|
||||
|
||||
it('should get notified of connected peers on dial', async () => {
|
||||
const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||
|
||||
expect(connection).to.exist()
|
||||
|
||||
return Promise.all([
|
||||
pWaitFor(() => libp2p._dht.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p._dht.routingTable.size === 1)
|
||||
])
|
||||
})
|
||||
|
||||
it('should put on a peer and get from the other', async () => {
|
||||
const key = Buffer.from('hello')
|
||||
const value = Buffer.from('world')
|
||||
|
||||
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||
|
||||
await Promise.all([
|
||||
pWaitFor(() => libp2p._dht.routingTable.size === 1),
|
||||
pWaitFor(() => remoteLibp2p._dht.routingTable.size === 1)
|
||||
])
|
||||
|
||||
await libp2p.contentRouting.put(key, value)
|
||||
|
||||
const fetchedValue = await remoteLibp2p.contentRouting.get(key)
|
||||
expect(fetchedValue).to.eql(value)
|
||||
})
|
||||
})
|
||||
|
||||
describe('dht started after connect', () => {
|
||||
beforeEach(async () => {
|
||||
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||
peerInfo
|
||||
}))
|
||||
|
||||
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||
peerInfo: remotePeerInfo,
|
||||
config: {
|
||||
dht: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
||||
await libp2p.start()
|
||||
await remoteLibp2p.start()
|
||||
|
||||
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||
})
|
||||
|
||||
afterEach(() => Promise.all([
|
||||
libp2p && libp2p.stop(),
|
||||
remoteLibp2p && remoteLibp2p.stop()
|
||||
]))
|
||||
|
||||
it('should get notified of connected peers after starting', async () => {
|
||||
const connection = await libp2p.dial(remAddr)
|
||||
|
||||
expect(connection).to.exist()
|
||||
expect(libp2p._dht.routingTable.size).to.be.eql(0)
|
||||
expect(remoteLibp2p._dht.routingTable.size).to.be.eql(0)
|
||||
|
||||
await remoteLibp2p._dht.start()
|
||||
return pWaitFor(() => libp2p._dht.routingTable.size === 1)
|
||||
})
|
||||
|
||||
it('should put on a peer and get from the other', async () => {
|
||||
await libp2p.dial(remAddr)
|
||||
|
||||
const key = Buffer.from('hello')
|
||||
const value = Buffer.from('world')
|
||||
|
||||
await remoteLibp2p._dht.start()
|
||||
await pWaitFor(() => libp2p._dht.routingTable.size === 1)
|
||||
|
||||
await libp2p.contentRouting.put(key, value)
|
||||
|
||||
const fetchedValue = await remoteLibp2p.contentRouting.get(key)
|
||||
expect(fetchedValue).to.eql(value)
|
||||
})
|
||||
})
|
||||
})
|
37
test/content-routing/dht/utils.js
Normal file
37
test/content-routing/dht/utils.js
Normal file
@ -0,0 +1,37 @@
|
||||
'use strict'
|
||||
|
||||
const KadDht = require('libp2p-kad-dht')
|
||||
const { multicodec } = require('libp2p-kad-dht')
|
||||
const Crypto = require('../../../src/insecure/plaintext')
|
||||
const Muxer = require('libp2p-mplex')
|
||||
const Transport = require('libp2p-tcp')
|
||||
|
||||
const mergeOptions = require('merge-options')
|
||||
|
||||
const baseOptions = {
|
||||
modules: {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.baseOptions = baseOptions
|
||||
|
||||
const subsystemOptions = mergeOptions(baseOptions, {
|
||||
modules: {
|
||||
dht: KadDht
|
||||
},
|
||||
config: {
|
||||
dht: {
|
||||
kBucketSize: 20,
|
||||
randomWalk: {
|
||||
enabled: true
|
||||
},
|
||||
enabled: true
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
module.exports.subsystemOptions = subsystemOptions
|
||||
module.exports.subsystemMulticodecs = [multicodec]
|
24
test/content-routing/utils.js
Normal file
24
test/content-routing/utils.js
Normal file
@ -0,0 +1,24 @@
|
||||
'use strict'
|
||||
|
||||
const KadDht = require('libp2p-kad-dht')
|
||||
const mergeOptions = require('merge-options')
|
||||
const baseOptions = require('../utils/base-options')
|
||||
|
||||
module.exports.baseOptions = baseOptions
|
||||
|
||||
const routingOptions = mergeOptions(baseOptions, {
|
||||
modules: {
|
||||
dht: KadDht
|
||||
},
|
||||
config: {
|
||||
dht: {
|
||||
kBucketSize: 20,
|
||||
randomWalk: {
|
||||
enabled: true
|
||||
},
|
||||
enabled: true
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
module.exports.routingOptions = routingOptions
|
Reference in New Issue
Block a user