mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-09 23:41:19 +00:00
refactor: dht async/await (#480)
* refactor: core async (#478) * refactor: cleanup core test: auto dial on startup * fix: make hangup work properly * chore: fix lint * chore: apply suggestions from code review Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio> * fix: provide libp2p dialer to the dht * chore: use dht release
This commit is contained in:
parent
f28b09fc0d
commit
c563e06a60
@ -93,7 +93,7 @@
|
|||||||
"libp2p-delegated-peer-routing": "^0.2.2",
|
"libp2p-delegated-peer-routing": "^0.2.2",
|
||||||
"libp2p-floodsub": "^0.19.0",
|
"libp2p-floodsub": "^0.19.0",
|
||||||
"libp2p-gossipsub": "^0.1.0",
|
"libp2p-gossipsub": "^0.1.0",
|
||||||
"libp2p-kad-dht": "^0.15.3",
|
"libp2p-kad-dht": "~0.17.0",
|
||||||
"libp2p-mdns": "^0.12.3",
|
"libp2p-mdns": "^0.12.3",
|
||||||
"libp2p-mplex": "^0.9.1",
|
"libp2p-mplex": "^0.9.1",
|
||||||
"libp2p-pnet": "~0.1.0",
|
"libp2p-pnet": "~0.1.0",
|
||||||
|
81
src/dht.js
81
src/dht.js
@ -1,43 +1,72 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const nextTick = require('async/nextTick')
|
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
const promisify = require('promisify-es6')
|
|
||||||
|
|
||||||
const { messages, codes } = require('./errors')
|
const { messages, codes } = require('./errors')
|
||||||
|
|
||||||
module.exports = (node) => {
|
module.exports = (node, DHT, config) => {
|
||||||
|
const dht = new DHT({
|
||||||
|
dialer: node.dialer,
|
||||||
|
peerInfo: node.peerInfo,
|
||||||
|
peerStore: node.peerStore,
|
||||||
|
registrar: node.registrar,
|
||||||
|
datastore: this.datastore,
|
||||||
|
...config
|
||||||
|
})
|
||||||
|
|
||||||
return {
|
return {
|
||||||
put: promisify((key, value, callback) => {
|
/**
|
||||||
if (!node._dht) {
|
* Store the given key/value pair in the DHT.
|
||||||
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
|
* @param {Buffer} key
|
||||||
|
* @param {Buffer} value
|
||||||
|
* @param {Object} [options] - put options
|
||||||
|
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
|
put: (key, value, options) => {
|
||||||
|
if (!node.isStarted() || !dht.isStarted) {
|
||||||
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
node._dht.put(key, value, callback)
|
return dht.put(key, value, options)
|
||||||
}),
|
},
|
||||||
get: promisify((key, options, callback) => {
|
|
||||||
if (typeof options === 'function') {
|
/**
|
||||||
callback = options
|
* Get the value to the given key.
|
||||||
options = {}
|
* Times out after 1 minute by default.
|
||||||
|
* @param {Buffer} key
|
||||||
|
* @param {Object} [options] - get options
|
||||||
|
* @param {number} [options.timeout] - optional timeout (default: 60000)
|
||||||
|
* @returns {Promise<{from: PeerId, val: Buffer}>}
|
||||||
|
*/
|
||||||
|
get: (key, options) => {
|
||||||
|
if (!node.isStarted() || !dht.isStarted) {
|
||||||
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!node._dht) {
|
return dht.get(key, options)
|
||||||
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the `n` values to the given key without sorting.
|
||||||
|
* @param {Buffer} key
|
||||||
|
* @param {number} nVals
|
||||||
|
* @param {Object} [options] - get options
|
||||||
|
* @param {number} [options.timeout] - optional timeout (default: 60000)
|
||||||
|
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
|
||||||
|
*/
|
||||||
|
getMany: (key, nVals, options) => {
|
||||||
|
if (!node.isStarted() || !dht.isStarted) {
|
||||||
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
node._dht.get(key, options, callback)
|
return dht.getMany(key, nVals, options)
|
||||||
}),
|
},
|
||||||
getMany: promisify((key, nVals, options, callback) => {
|
|
||||||
if (typeof options === 'function') {
|
|
||||||
callback = options
|
|
||||||
options = {}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!node._dht) {
|
_dht: dht,
|
||||||
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
|
|
||||||
}
|
|
||||||
|
|
||||||
node._dht.getMany(key, nVals, options, callback)
|
start: () => dht.start(),
|
||||||
})
|
|
||||||
|
stop: () => dht.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ const AbortController = require('abort-controller')
|
|||||||
const debug = require('debug')
|
const debug = require('debug')
|
||||||
const log = debug('libp2p:dialer')
|
const log = debug('libp2p:dialer')
|
||||||
log.error = debug('libp2p:dialer:error')
|
log.error = debug('libp2p:dialer:error')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
|
||||||
const { codes } = require('./errors')
|
const { codes } = require('./errors')
|
||||||
const {
|
const {
|
||||||
@ -20,15 +21,18 @@ class Dialer {
|
|||||||
* @constructor
|
* @constructor
|
||||||
* @param {object} options
|
* @param {object} options
|
||||||
* @param {TransportManager} options.transportManager
|
* @param {TransportManager} options.transportManager
|
||||||
|
* @param {Peerstore} peerStore
|
||||||
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
|
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
|
||||||
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
|
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
|
||||||
*/
|
*/
|
||||||
constructor ({
|
constructor ({
|
||||||
transportManager,
|
transportManager,
|
||||||
|
peerStore,
|
||||||
concurrency = MAX_PARALLEL_DIALS,
|
concurrency = MAX_PARALLEL_DIALS,
|
||||||
timeout = DIAL_TIMEOUT
|
timeout = DIAL_TIMEOUT
|
||||||
}) {
|
}) {
|
||||||
this.transportManager = transportManager
|
this.transportManager = transportManager
|
||||||
|
this.peerStore = peerStore
|
||||||
this.concurrency = concurrency
|
this.concurrency = concurrency
|
||||||
this.timeout = timeout
|
this.timeout = timeout
|
||||||
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
|
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
|
||||||
@ -97,18 +101,22 @@ class Dialer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connects to a given `PeerInfo` by dialing all of its known addresses.
|
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
|
||||||
* The dial to the first address that is successfully able to upgrade a connection
|
* The dial to the first address that is successfully able to upgrade a connection
|
||||||
* will be used.
|
* will be used.
|
||||||
*
|
*
|
||||||
* @async
|
* @async
|
||||||
* @param {PeerInfo} peerInfo The remote peer to dial
|
* @param {PeerInfo|PeerId} peer The remote peer to dial
|
||||||
* @param {object} [options]
|
* @param {object} [options]
|
||||||
* @param {AbortSignal} [options.signal] An AbortController signal
|
* @param {AbortSignal} [options.signal] An AbortController signal
|
||||||
* @returns {Promise<Connection>}
|
* @returns {Promise<Connection>}
|
||||||
*/
|
*/
|
||||||
async connectToPeer (peerInfo, options = {}) {
|
async connectToPeer (peer, options = {}) {
|
||||||
const addrs = peerInfo.multiaddrs.toArray()
|
if (PeerId.isPeerId(peer)) {
|
||||||
|
peer = this.peerStore.get(peer.toB58String())
|
||||||
|
}
|
||||||
|
|
||||||
|
const addrs = peer.multiaddrs.toArray()
|
||||||
for (const addr of addrs) {
|
for (const addr of addrs) {
|
||||||
try {
|
try {
|
||||||
return await this.connectToMultiaddr(addr, options)
|
return await this.connectToMultiaddr(addr, options)
|
||||||
|
@ -8,6 +8,7 @@ exports.messages = {
|
|||||||
exports.codes = {
|
exports.codes = {
|
||||||
DHT_DISABLED: 'ERR_DHT_DISABLED',
|
DHT_DISABLED: 'ERR_DHT_DISABLED',
|
||||||
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
|
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
|
||||||
|
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
|
||||||
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
|
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
|
||||||
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
|
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
|
||||||
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
||||||
|
18
src/index.js
18
src/index.js
@ -91,7 +91,8 @@ class Libp2p extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.dialer = new Dialer({
|
this.dialer = new Dialer({
|
||||||
transportManager: this.transportManager
|
transportManager: this.transportManager,
|
||||||
|
peerStore: this.peerStore
|
||||||
})
|
})
|
||||||
|
|
||||||
// Attach stream multiplexers
|
// Attach stream multiplexers
|
||||||
@ -118,13 +119,8 @@ class Libp2p extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dht provided components (peerRouting, contentRouting, dht)
|
// dht provided components (peerRouting, contentRouting, dht)
|
||||||
if (this._config.dht.enabled) {
|
if (this._modules.dht) {
|
||||||
const DHT = this._modules.dht
|
this._dht = dht(this, this._modules.dht, this._config.dht)
|
||||||
|
|
||||||
this._dht = new DHT(this._switch, {
|
|
||||||
datastore: this.datastore,
|
|
||||||
...this._config.dht
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// start pubsub
|
// start pubsub
|
||||||
@ -136,7 +132,6 @@ class Libp2p extends EventEmitter {
|
|||||||
// peer and content routing will automatically get modules from _modules and _dht
|
// peer and content routing will automatically get modules from _modules and _dht
|
||||||
this.peerRouting = peerRouting(this)
|
this.peerRouting = peerRouting(this)
|
||||||
this.contentRouting = contentRouting(this)
|
this.contentRouting = contentRouting(this)
|
||||||
this.dht = dht(this)
|
|
||||||
|
|
||||||
this._peerDiscovered = this._peerDiscovered.bind(this)
|
this._peerDiscovered = this._peerDiscovered.bind(this)
|
||||||
}
|
}
|
||||||
@ -186,6 +181,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
this.pubsub && await this.pubsub.stop()
|
this.pubsub && await this.pubsub.stop()
|
||||||
|
this._dht && await this._dht.stop()
|
||||||
await this.transportManager.close()
|
await this.transportManager.close()
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
@ -312,6 +308,10 @@ class Libp2p extends EventEmitter {
|
|||||||
if (this._config.pubsub.enabled) {
|
if (this._config.pubsub.enabled) {
|
||||||
this.pubsub && this.pubsub.start()
|
this.pubsub && this.pubsub.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this._config.dht.enabled) {
|
||||||
|
this._dht && this._dht.start()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -37,24 +37,28 @@ class PeerStore extends EventEmitter {
|
|||||||
* Stores the peerInfo of a new peer.
|
* Stores the peerInfo of a new peer.
|
||||||
* If already exist, its info is updated.
|
* If already exist, its info is updated.
|
||||||
* @param {PeerInfo} peerInfo
|
* @param {PeerInfo} peerInfo
|
||||||
|
* @return {PeerInfo}
|
||||||
*/
|
*/
|
||||||
put (peerInfo) {
|
put (peerInfo) {
|
||||||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
|
|
||||||
|
let peer
|
||||||
// Already know the peer?
|
// Already know the peer?
|
||||||
if (this.peers.has(peerInfo.id.toB58String())) {
|
if (this.peers.has(peerInfo.id.toB58String())) {
|
||||||
this.update(peerInfo)
|
peer = this.update(peerInfo)
|
||||||
} else {
|
} else {
|
||||||
this.add(peerInfo)
|
peer = this.add(peerInfo)
|
||||||
|
|
||||||
// Emit the new peer found
|
// Emit the new peer found
|
||||||
this.emit('peer', peerInfo)
|
this.emit('peer', peerInfo)
|
||||||
}
|
}
|
||||||
|
return peer
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a new peer to the store.
|
* Add a new peer to the store.
|
||||||
* @param {PeerInfo} peerInfo
|
* @param {PeerInfo} peerInfo
|
||||||
|
* @return {PeerInfo}
|
||||||
*/
|
*/
|
||||||
add (peerInfo) {
|
add (peerInfo) {
|
||||||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
@ -86,11 +90,13 @@ class PeerStore extends EventEmitter {
|
|||||||
})
|
})
|
||||||
|
|
||||||
this.peers.set(peerInfo.id.toB58String(), peerProxy)
|
this.peers.set(peerInfo.id.toB58String(), peerProxy)
|
||||||
|
return peerProxy
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates an already known peer.
|
* Updates an already known peer.
|
||||||
* @param {PeerInfo} peerInfo
|
* @param {PeerInfo} peerInfo
|
||||||
|
* @return {PeerInfo}
|
||||||
*/
|
*/
|
||||||
update (peerInfo) {
|
update (peerInfo) {
|
||||||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
@ -148,6 +154,8 @@ class PeerStore extends EventEmitter {
|
|||||||
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
|
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
|
||||||
recorded.id.pubKey = peerInfo.id.pubKey
|
recorded.id.pubKey = peerInfo.id.pubKey
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return recorded
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -165,6 +173,15 @@ class PeerStore extends EventEmitter {
|
|||||||
return undefined
|
return undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Has the info to the given id.
|
||||||
|
* @param {string} peerId b58str id
|
||||||
|
* @returns {boolean}
|
||||||
|
*/
|
||||||
|
has (peerId) {
|
||||||
|
return this.peers.has(peerId)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes the Peer with the matching `peerId` from the PeerStore
|
* Removes the Peer with the matching `peerId` from the PeerStore
|
||||||
* @param {string} peerId b58str id
|
* @param {string} peerId b58str id
|
||||||
|
92
test/dht/configuration.node.js
Normal file
92
test/dht/configuration.node.js
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const { create } = require('../../src')
|
||||||
|
const { baseOptions, subsystemOptions } = require('./utils')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
|
||||||
|
describe('DHT subsystem is configurable', () => {
|
||||||
|
let libp2p
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not exist if no module is provided', async () => {
|
||||||
|
libp2p = await create(baseOptions)
|
||||||
|
expect(libp2p._dht).to.not.exist()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should exist if the module is provided', async () => {
|
||||||
|
libp2p = await create(subsystemOptions)
|
||||||
|
expect(libp2p._dht).to.exist()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should start and stop by default once libp2p starts', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(true)
|
||||||
|
|
||||||
|
await libp2p.stop()
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not start if disabled once libp2p starts', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo,
|
||||||
|
config: {
|
||||||
|
dht: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should allow a manual start', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo,
|
||||||
|
config: {
|
||||||
|
dht: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p._dht.start()
|
||||||
|
expect(libp2p._dht._dht.isStarted).to.equal(true)
|
||||||
|
})
|
||||||
|
})
|
135
test/dht/operation.node.js
Normal file
135
test/dht/operation.node.js
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const { create } = require('../../src')
|
||||||
|
const { subsystemOptions, subsystemMulticodecs } = require('./utils')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/8000')
|
||||||
|
const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/8001')
|
||||||
|
|
||||||
|
describe('DHT subsystem operates correctly', () => {
|
||||||
|
let peerInfo, remotePeerInfo
|
||||||
|
let libp2p, remoteLibp2p
|
||||||
|
let remAddr
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
|
||||||
|
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
remotePeerInfo.multiaddrs.add(remoteListenAddr)
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('dht started before connect', () => {
|
||||||
|
beforeEach(async () => {
|
||||||
|
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo: remotePeerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
libp2p.start(),
|
||||||
|
remoteLibp2p.start()
|
||||||
|
])
|
||||||
|
|
||||||
|
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => Promise.all([
|
||||||
|
libp2p && libp2p.stop(),
|
||||||
|
remoteLibp2p && remoteLibp2p.stop()
|
||||||
|
]))
|
||||||
|
|
||||||
|
it('should get notified of connected peers on dial', async () => {
|
||||||
|
const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||||
|
|
||||||
|
expect(connection).to.exist()
|
||||||
|
|
||||||
|
return Promise.all([
|
||||||
|
pWaitFor(() => libp2p._dht._dht.routingTable.size === 1),
|
||||||
|
pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should put on a peer and get from the other', async () => {
|
||||||
|
const key = Buffer.from('hello')
|
||||||
|
const value = Buffer.from('world')
|
||||||
|
|
||||||
|
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => libp2p._dht._dht.routingTable.size === 1),
|
||||||
|
pWaitFor(() => remoteLibp2p._dht._dht.routingTable.size === 1)
|
||||||
|
])
|
||||||
|
|
||||||
|
await libp2p._dht.put(key, value)
|
||||||
|
|
||||||
|
const fetchedValue = await remoteLibp2p._dht.get(key)
|
||||||
|
expect(fetchedValue).to.eql(value)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('dht started after connect', () => {
|
||||||
|
beforeEach(async () => {
|
||||||
|
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo: remotePeerInfo,
|
||||||
|
config: {
|
||||||
|
dht: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
await remoteLibp2p.start()
|
||||||
|
|
||||||
|
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => Promise.all([
|
||||||
|
libp2p && libp2p.stop(),
|
||||||
|
remoteLibp2p && remoteLibp2p.stop()
|
||||||
|
]))
|
||||||
|
|
||||||
|
it('should get notified of connected peers after starting', async () => {
|
||||||
|
const connection = await libp2p.dial(remAddr)
|
||||||
|
|
||||||
|
expect(connection).to.exist()
|
||||||
|
expect(libp2p._dht._dht.routingTable.size).to.be.eql(0)
|
||||||
|
expect(remoteLibp2p._dht._dht.routingTable.size).to.be.eql(0)
|
||||||
|
|
||||||
|
await remoteLibp2p._dht.start()
|
||||||
|
return pWaitFor(() => libp2p._dht._dht.routingTable.size === 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should put on a peer and get from the other', async () => {
|
||||||
|
await libp2p.dial(remAddr)
|
||||||
|
|
||||||
|
const key = Buffer.from('hello')
|
||||||
|
const value = Buffer.from('world')
|
||||||
|
|
||||||
|
await remoteLibp2p._dht.start()
|
||||||
|
await pWaitFor(() => libp2p._dht._dht.routingTable.size === 1)
|
||||||
|
|
||||||
|
await libp2p._dht.put(key, value)
|
||||||
|
|
||||||
|
const fetchedValue = await remoteLibp2p._dht.get(key)
|
||||||
|
expect(fetchedValue).to.eql(value)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
37
test/dht/utils.js
Normal file
37
test/dht/utils.js
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const KadDht = require('libp2p-kad-dht')
|
||||||
|
const { multicodec } = require('libp2p-kad-dht')
|
||||||
|
const Crypto = require('../../src/insecure/plaintext')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const Transport = require('libp2p-tcp')
|
||||||
|
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
|
||||||
|
const baseOptions = {
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [Crypto]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.baseOptions = baseOptions
|
||||||
|
|
||||||
|
const subsystemOptions = mergeOptions(baseOptions, {
|
||||||
|
modules: {
|
||||||
|
dht: KadDht
|
||||||
|
},
|
||||||
|
config: {
|
||||||
|
dht: {
|
||||||
|
kBucketSize: 20,
|
||||||
|
randomWalk: {
|
||||||
|
enabled: true
|
||||||
|
},
|
||||||
|
enabled: true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
module.exports.subsystemOptions = subsystemOptions
|
||||||
|
module.exports.subsystemMulticodecs = [multicodec]
|
@ -17,6 +17,7 @@ const pipe = require('it-pipe')
|
|||||||
|
|
||||||
const Libp2p = require('../../src')
|
const Libp2p = require('../../src')
|
||||||
const Dialer = require('../../src/dialer')
|
const Dialer = require('../../src/dialer')
|
||||||
|
const PeerStore = require('../../src/peer-store')
|
||||||
const TransportManager = require('../../src/transport-manager')
|
const TransportManager = require('../../src/transport-manager')
|
||||||
const { codes: ErrorCodes } = require('../../src/errors')
|
const { codes: ErrorCodes } = require('../../src/errors')
|
||||||
const Protector = require('../../src/pnet')
|
const Protector = require('../../src/pnet')
|
||||||
@ -86,7 +87,7 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
expect.fail('Dial should have failed')
|
expect.fail('Dial should have failed')
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should be able to connect to a given peer', async () => {
|
it('should be able to connect to a given peer info', async () => {
|
||||||
const dialer = new Dialer({ transportManager: localTM })
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
const peerInfo = new PeerInfo(peerId)
|
const peerInfo = new PeerInfo(peerId)
|
||||||
@ -97,6 +98,23 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
await connection.close()
|
await connection.close()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a given peer id', async () => {
|
||||||
|
const peerStore = new PeerStore()
|
||||||
|
const dialer = new Dialer({
|
||||||
|
transportManager: localTM,
|
||||||
|
peerStore
|
||||||
|
})
|
||||||
|
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
const peerInfo = new PeerInfo(peerId)
|
||||||
|
peerInfo.multiaddrs.add(remoteAddr)
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
const connection = await dialer.connectToPeer(peerId)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||||
const dialer = new Dialer({ transportManager: localTM })
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
const peerId = await PeerId.createFromJSON(Peers[0])
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user