mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 18:42:15 +00:00
refactor: pubsub (#467)
* feat: peer-store v0 * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * chore: address review * refactor: pubsub subsystem * chore: address review * chore: use topology interface * chore: address review * chore: address review * chore: simplify tests
This commit is contained in:
parent
ced2dbf318
commit
34d57f8989
14
README.md
14
README.md
@ -211,22 +211,18 @@ class Node extends Libp2p {
|
|||||||
|
|
||||||
**IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670.
|
**IMPORTANT NOTE**: All the methods listed in the API section that take a callback are also now Promisified. Libp2p is migrating away from callbacks to async/await, and in a future release (that will be announced in advance), callback support will be removed entirely. You can follow progress of the async/await endeavor at https://github.com/ipfs/js-ipfs/issues/1670.
|
||||||
|
|
||||||
#### Create a Node - `Libp2p.createLibp2p(options, callback)`
|
#### Create a Node - `Libp2p.create(options)`
|
||||||
|
|
||||||
> Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead
|
> Behaves exactly like `new Libp2p(options)`, but doesn't require a PeerInfo. One will be generated instead
|
||||||
|
|
||||||
```js
|
```js
|
||||||
const { createLibp2p } = require('libp2p')
|
const { create } = require('libp2p')
|
||||||
createLibp2p(options, (err, libp2p) => {
|
const libp2p = await create(options)
|
||||||
if (err) throw err
|
|
||||||
libp2p.start((err) => {
|
await libp2p.start()
|
||||||
if (err) throw err
|
|
||||||
})
|
|
||||||
})
|
|
||||||
```
|
```
|
||||||
|
|
||||||
- `options`: Object of libp2p configuration options
|
- `options`: Object of libp2p configuration options
|
||||||
- `callback`: Function with signature `function (Error, Libp2p) {}`
|
|
||||||
|
|
||||||
#### Create a Node alternative - `new Libp2p(options)`
|
#### Create a Node alternative - `new Libp2p(options)`
|
||||||
|
|
||||||
|
@ -55,13 +55,14 @@
|
|||||||
"it-protocol-buffers": "^0.2.0",
|
"it-protocol-buffers": "^0.2.0",
|
||||||
"latency-monitor": "~0.2.1",
|
"latency-monitor": "~0.2.1",
|
||||||
"libp2p-crypto": "^0.17.1",
|
"libp2p-crypto": "^0.17.1",
|
||||||
"libp2p-interfaces": "^0.1.3",
|
"libp2p-interfaces": "^0.1.5",
|
||||||
"mafmt": "^7.0.0",
|
"mafmt": "^7.0.0",
|
||||||
"merge-options": "^1.0.1",
|
"merge-options": "^1.0.1",
|
||||||
"moving-average": "^1.0.0",
|
"moving-average": "^1.0.0",
|
||||||
"multiaddr": "^7.1.0",
|
"multiaddr": "^7.1.0",
|
||||||
"multistream-select": "^0.15.0",
|
"multistream-select": "^0.15.0",
|
||||||
"once": "^1.4.0",
|
"once": "^1.4.0",
|
||||||
|
"p-map": "^3.0.0",
|
||||||
"p-queue": "^6.1.1",
|
"p-queue": "^6.1.1",
|
||||||
"p-settle": "^3.1.0",
|
"p-settle": "^3.1.0",
|
||||||
"peer-id": "^0.13.3",
|
"peer-id": "^0.13.3",
|
||||||
@ -90,8 +91,8 @@
|
|||||||
"libp2p-bootstrap": "^0.9.7",
|
"libp2p-bootstrap": "^0.9.7",
|
||||||
"libp2p-delegated-content-routing": "^0.2.2",
|
"libp2p-delegated-content-routing": "^0.2.2",
|
||||||
"libp2p-delegated-peer-routing": "^0.2.2",
|
"libp2p-delegated-peer-routing": "^0.2.2",
|
||||||
"libp2p-floodsub": "~0.17.0",
|
"libp2p-floodsub": "^0.19.0",
|
||||||
"libp2p-gossipsub": "~0.0.4",
|
"libp2p-gossipsub": "ChainSafe/gossipsub-js#beta/async",
|
||||||
"libp2p-kad-dht": "^0.15.3",
|
"libp2p-kad-dht": "^0.15.3",
|
||||||
"libp2p-mdns": "^0.12.3",
|
"libp2p-mdns": "^0.12.3",
|
||||||
"libp2p-mplex": "^0.9.1",
|
"libp2p-mplex": "^0.9.1",
|
||||||
@ -103,6 +104,7 @@
|
|||||||
"lodash.times": "^4.3.2",
|
"lodash.times": "^4.3.2",
|
||||||
"nock": "^10.0.6",
|
"nock": "^10.0.6",
|
||||||
"p-defer": "^3.0.0",
|
"p-defer": "^3.0.0",
|
||||||
|
"p-wait-for": "^3.1.0",
|
||||||
"portfinder": "^1.0.20",
|
"portfinder": "^1.0.20",
|
||||||
"pull-goodbye": "0.0.2",
|
"pull-goodbye": "0.0.2",
|
||||||
"pull-length-prefixed": "^1.3.3",
|
"pull-length-prefixed": "^1.3.3",
|
||||||
|
@ -1,108 +0,0 @@
|
|||||||
'use strict'
|
|
||||||
|
|
||||||
const assert = require('assert')
|
|
||||||
|
|
||||||
class Topology {
|
|
||||||
/**
|
|
||||||
* @param {Object} props
|
|
||||||
* @param {number} props.min minimum needed connections (default: 0)
|
|
||||||
* @param {number} props.max maximum needed connections (default: Infinity)
|
|
||||||
* @param {Array<string>} props.multicodecs protocol multicodecs
|
|
||||||
* @param {Object} props.handlers
|
|
||||||
* @param {function} props.handlers.onConnect protocol "onConnect" handler
|
|
||||||
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
|
|
||||||
* @constructor
|
|
||||||
*/
|
|
||||||
constructor ({
|
|
||||||
min = 0,
|
|
||||||
max = Infinity,
|
|
||||||
multicodecs,
|
|
||||||
handlers
|
|
||||||
}) {
|
|
||||||
assert(multicodecs, 'one or more multicodec should be provided')
|
|
||||||
assert(handlers, 'the handlers should be provided')
|
|
||||||
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
|
|
||||||
'the \'onConnect\' handler must be provided')
|
|
||||||
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
|
|
||||||
'the \'onDisconnect\' handler must be provided')
|
|
||||||
|
|
||||||
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
|
|
||||||
this.min = min
|
|
||||||
this.max = max
|
|
||||||
|
|
||||||
// Handlers
|
|
||||||
this._onConnect = handlers.onConnect
|
|
||||||
this._onDisconnect = handlers.onDisconnect
|
|
||||||
|
|
||||||
this.peers = new Map()
|
|
||||||
this._registrar = undefined
|
|
||||||
|
|
||||||
this._onProtocolChange = this._onProtocolChange.bind(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
set registrar (registrar) {
|
|
||||||
this._registrar = registrar
|
|
||||||
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
|
|
||||||
|
|
||||||
// Update topology peers
|
|
||||||
this._updatePeers(this._registrar.peerStore.peers.values())
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update topology.
|
|
||||||
* @param {Array<PeerInfo>} peerInfoIterable
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
_updatePeers (peerInfoIterable) {
|
|
||||||
for (const peerInfo of peerInfoIterable) {
|
|
||||||
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
|
|
||||||
// Add the peer regardless of whether or not there is currently a connection
|
|
||||||
this.peers.set(peerInfo.id.toB58String(), peerInfo)
|
|
||||||
// If there is a connection, call _onConnect
|
|
||||||
const connection = this._registrar.getConnection(peerInfo)
|
|
||||||
connection && this._onConnect(peerInfo, connection)
|
|
||||||
} else {
|
|
||||||
// Remove any peers we might be tracking that are no longer of value to us
|
|
||||||
this.peers.delete(peerInfo.id.toB58String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Notify protocol of peer disconnected.
|
|
||||||
* @param {PeerInfo} peerInfo
|
|
||||||
* @param {Error} [error]
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
disconnect (peerInfo, error) {
|
|
||||||
this._onDisconnect(peerInfo, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a new peer support the multicodecs for this topology.
|
|
||||||
* @param {Object} props
|
|
||||||
* @param {PeerInfo} props.peerInfo
|
|
||||||
* @param {Array<string>} props.protocols
|
|
||||||
*/
|
|
||||||
_onProtocolChange ({ peerInfo, protocols }) {
|
|
||||||
const existingPeer = this.peers.get(peerInfo.id.toB58String())
|
|
||||||
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))
|
|
||||||
|
|
||||||
// Not supporting the protocol anymore?
|
|
||||||
if (existingPeer && hasProtocol.length === 0) {
|
|
||||||
this._onDisconnect({
|
|
||||||
peerInfo
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// New to protocol support
|
|
||||||
for (const protocol of protocols) {
|
|
||||||
if (this.multicodecs.includes(protocol)) {
|
|
||||||
this._updatePeers([peerInfo])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = Topology
|
|
39
src/index.js
39
src/index.js
@ -1,7 +1,7 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const FSM = require('fsm-event')
|
const FSM = require('fsm-event')
|
||||||
const EventEmitter = require('events').EventEmitter
|
const { EventEmitter } = require('events')
|
||||||
const debug = require('debug')
|
const debug = require('debug')
|
||||||
const log = debug('libp2p')
|
const log = debug('libp2p')
|
||||||
log.error = debug('libp2p:error')
|
log.error = debug('libp2p:error')
|
||||||
@ -9,7 +9,6 @@ const errCode = require('err-code')
|
|||||||
const promisify = require('promisify-es6')
|
const promisify = require('promisify-es6')
|
||||||
|
|
||||||
const each = require('async/each')
|
const each = require('async/each')
|
||||||
const nextTick = require('async/nextTick')
|
|
||||||
|
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
@ -66,6 +65,8 @@ class Libp2p extends EventEmitter {
|
|||||||
this._transport = [] // Transport instances/references
|
this._transport = [] // Transport instances/references
|
||||||
this._discovery = [] // Discovery service instances/references
|
this._discovery = [] // Discovery service instances/references
|
||||||
|
|
||||||
|
this.peerStore = new PeerStore()
|
||||||
|
|
||||||
// create the switch, and listen for errors
|
// create the switch, and listen for errors
|
||||||
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
|
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
|
||||||
|
|
||||||
@ -147,7 +148,7 @@ class Libp2p extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start pubsub
|
// start pubsub
|
||||||
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
|
if (this._modules.pubsub) {
|
||||||
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
|
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,6 +252,7 @@ class Libp2p extends EventEmitter {
|
|||||||
this.state('stop')
|
this.state('stop')
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
this.pubsub && await this.pubsub.stop()
|
||||||
await this.transportManager.close()
|
await this.transportManager.close()
|
||||||
await this._switch.stop()
|
await this._switch.stop()
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@ -385,10 +387,16 @@ class Libp2p extends EventEmitter {
|
|||||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||||
|
|
||||||
// Start parallel tasks
|
// Start parallel tasks
|
||||||
|
const tasks = [
|
||||||
|
this.transportManager.listen(multiaddrs)
|
||||||
|
]
|
||||||
|
|
||||||
|
if (this._config.pubsub.enabled) {
|
||||||
|
this.pubsub && this.pubsub.start()
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Promise.all([
|
await Promise.all(tasks)
|
||||||
this.transportManager.listen(multiaddrs)
|
|
||||||
])
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log.error(err)
|
log.error(err)
|
||||||
this.emit('error', err)
|
this.emit('error', err)
|
||||||
@ -483,16 +491,15 @@ module.exports = Libp2p
|
|||||||
* Like `new Libp2p(options)` except it will create a `PeerInfo`
|
* Like `new Libp2p(options)` except it will create a `PeerInfo`
|
||||||
* instance if one is not provided in options.
|
* instance if one is not provided in options.
|
||||||
* @param {object} options Libp2p configuration options
|
* @param {object} options Libp2p configuration options
|
||||||
* @param {function(Error, Libp2p)} callback
|
* @returns {Libp2p}
|
||||||
* @returns {void}
|
|
||||||
*/
|
*/
|
||||||
module.exports.createLibp2p = promisify((options, callback) => {
|
module.exports.create = async (options = {}) => {
|
||||||
if (options.peerInfo) {
|
if (options.peerInfo) {
|
||||||
return nextTick(callback, null, new Libp2p(options))
|
return new Libp2p(options)
|
||||||
}
|
}
|
||||||
PeerInfo.create((err, peerInfo) => {
|
|
||||||
if (err) return callback(err)
|
const peerInfo = await PeerInfo.create()
|
||||||
options.peerInfo = peerInfo
|
|
||||||
callback(null, new Libp2p(options))
|
options.peerInfo = peerInfo
|
||||||
})
|
return new Libp2p(options)
|
||||||
})
|
}
|
||||||
|
141
src/pubsub.js
141
src/pubsub.js
@ -1,52 +1,21 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const nextTick = require('async/nextTick')
|
|
||||||
const { messages, codes } = require('./errors')
|
|
||||||
const promisify = require('promisify-es6')
|
|
||||||
|
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
|
const { messages, codes } = require('./errors')
|
||||||
|
|
||||||
module.exports = (node, Pubsub, config) => {
|
module.exports = (node, Pubsub, config) => {
|
||||||
const pubsub = new Pubsub(node, config)
|
const pubsub = new Pubsub(node.peerInfo, node.registrar, config)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
/**
|
/**
|
||||||
* Subscribe the given handler to a pubsub topic
|
* Subscribe the given handler to a pubsub topic
|
||||||
*
|
|
||||||
* @param {string} topic
|
* @param {string} topic
|
||||||
* @param {function} handler The handler to subscribe
|
* @param {function} handler The handler to subscribe
|
||||||
* @param {object|null} [options]
|
* @returns {void}
|
||||||
* @param {function} [callback] An optional callback
|
|
||||||
*
|
|
||||||
* @returns {Promise|void} A promise is returned if no callback is provided
|
|
||||||
*
|
|
||||||
* @example <caption>Subscribe a handler to a topic</caption>
|
|
||||||
*
|
|
||||||
* // `null` must be passed for options until subscribe is no longer using promisify
|
|
||||||
* const handler = (message) => { }
|
|
||||||
* await libp2p.subscribe(topic, handler, null)
|
|
||||||
*
|
|
||||||
* @example <caption>Use a callback instead of the Promise api</caption>
|
|
||||||
*
|
|
||||||
* // `options` may be passed or omitted when supplying a callback
|
|
||||||
* const handler = (message) => { }
|
|
||||||
* libp2p.subscribe(topic, handler, callback)
|
|
||||||
*/
|
*/
|
||||||
subscribe: (topic, handler, options, callback) => {
|
subscribe: (topic, handler) => {
|
||||||
// can't use promisify because it thinks the handler is a callback
|
|
||||||
if (typeof options === 'function') {
|
|
||||||
callback = options
|
|
||||||
options = {}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!node.isStarted() && !pubsub.started) {
|
if (!node.isStarted() && !pubsub.started) {
|
||||||
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||||
|
|
||||||
if (callback) {
|
|
||||||
return nextTick(() => callback(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.reject(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pubsub.listenerCount(topic) === 0) {
|
if (pubsub.listenerCount(topic) === 0) {
|
||||||
@ -54,46 +23,16 @@ module.exports = (node, Pubsub, config) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pubsub.on(topic, handler)
|
pubsub.on(topic, handler)
|
||||||
|
|
||||||
if (callback) {
|
|
||||||
return nextTick(() => callback())
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.resolve()
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unsubscribes from a pubsub topic
|
* Unsubscribes from a pubsub topic
|
||||||
*
|
|
||||||
* @param {string} topic
|
* @param {string} topic
|
||||||
* @param {function|null} handler The handler to unsubscribe from
|
* @param {function} [handler] The handler to unsubscribe from
|
||||||
* @param {function} [callback] An optional callback
|
|
||||||
*
|
|
||||||
* @returns {Promise|void} A promise is returned if no callback is provided
|
|
||||||
*
|
|
||||||
* @example <caption>Unsubscribe a topic for all handlers</caption>
|
|
||||||
*
|
|
||||||
* // `null` must be passed until unsubscribe is no longer using promisify
|
|
||||||
* await libp2p.unsubscribe(topic, null)
|
|
||||||
*
|
|
||||||
* @example <caption>Unsubscribe a topic for 1 handler</caption>
|
|
||||||
*
|
|
||||||
* await libp2p.unsubscribe(topic, handler)
|
|
||||||
*
|
|
||||||
* @example <caption>Use a callback instead of the Promise api</caption>
|
|
||||||
*
|
|
||||||
* libp2p.unsubscribe(topic, handler, callback)
|
|
||||||
*/
|
*/
|
||||||
unsubscribe: (topic, handler, callback) => {
|
unsubscribe: (topic, handler) => {
|
||||||
// can't use promisify because it thinks the handler is a callback
|
|
||||||
if (!node.isStarted() && !pubsub.started) {
|
if (!node.isStarted() && !pubsub.started) {
|
||||||
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||||
|
|
||||||
if (callback) {
|
|
||||||
return nextTick(() => callback(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.reject(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!handler) {
|
if (!handler) {
|
||||||
@ -105,61 +44,61 @@ module.exports = (node, Pubsub, config) => {
|
|||||||
if (pubsub.listenerCount(topic) === 0) {
|
if (pubsub.listenerCount(topic) === 0) {
|
||||||
pubsub.unsubscribe(topic)
|
pubsub.unsubscribe(topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (callback) {
|
|
||||||
return nextTick(() => callback())
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.resolve()
|
|
||||||
},
|
},
|
||||||
|
|
||||||
publish: promisify((topic, data, callback) => {
|
/**
|
||||||
|
* Publish messages to the given topics.
|
||||||
|
* @param {Array<string>|string} topic
|
||||||
|
* @param {Buffer} data
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
|
publish: (topic, data) => {
|
||||||
if (!node.isStarted() && !pubsub.started) {
|
if (!node.isStarted() && !pubsub.started) {
|
||||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
data = Buffer.from(data)
|
data = Buffer.from(data)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
|
throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')
|
||||||
}
|
}
|
||||||
|
|
||||||
pubsub.publish(topic, data, callback)
|
return pubsub.publish(topic, data)
|
||||||
}),
|
},
|
||||||
|
|
||||||
ls: promisify((callback) => {
|
/**
|
||||||
|
* Get a list of topics the node is subscribed to.
|
||||||
|
* @returns {Array<String>} topics
|
||||||
|
*/
|
||||||
|
getTopics: () => {
|
||||||
if (!node.isStarted() && !pubsub.started) {
|
if (!node.isStarted() && !pubsub.started) {
|
||||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
const subscriptions = Array.from(pubsub.subscriptions)
|
return pubsub.getTopics()
|
||||||
|
},
|
||||||
|
|
||||||
nextTick(() => callback(null, subscriptions))
|
/**
|
||||||
}),
|
* Get a list of the peer-ids that are subscribed to one topic.
|
||||||
|
* @param {string} topic
|
||||||
peers: promisify((topic, callback) => {
|
* @returns {Array<string>}
|
||||||
|
*/
|
||||||
|
getPeersSubscribed: (topic) => {
|
||||||
if (!node.isStarted() && !pubsub.started) {
|
if (!node.isStarted() && !pubsub.started) {
|
||||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof topic === 'function') {
|
return pubsub.getPeersSubscribed(topic)
|
||||||
callback = topic
|
},
|
||||||
topic = null
|
|
||||||
}
|
|
||||||
|
|
||||||
const peers = Array.from(pubsub.peers.values())
|
|
||||||
.filter((peer) => topic ? peer.topics.has(topic) : true)
|
|
||||||
.map((peer) => peer.info.id.toB58String())
|
|
||||||
|
|
||||||
nextTick(() => callback(null, peers))
|
|
||||||
}),
|
|
||||||
|
|
||||||
setMaxListeners (n) {
|
setMaxListeners (n) {
|
||||||
return pubsub.setMaxListeners(n)
|
return pubsub.setMaxListeners(n)
|
||||||
},
|
},
|
||||||
|
|
||||||
start: promisify((cb) => pubsub.start(cb)),
|
_pubsub: pubsub,
|
||||||
|
|
||||||
stop: promisify((cb) => pubsub.stop(cb))
|
start: () => pubsub.start(),
|
||||||
|
|
||||||
|
stop: () => pubsub.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,9 +5,9 @@ const debug = require('debug')
|
|||||||
const log = debug('libp2p:peer-store')
|
const log = debug('libp2p:peer-store')
|
||||||
log.error = debug('libp2p:peer-store:error')
|
log.error = debug('libp2p:peer-store:error')
|
||||||
|
|
||||||
|
const Topology = require('libp2p-interfaces/src/topology')
|
||||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
const Toplogy = require('./connection-manager/topology')
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responsible for notifying registered protocols of events in the network.
|
* Responsible for notifying registered protocols of events in the network.
|
||||||
@ -106,17 +106,16 @@ class Registrar {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Register handlers for a set of multicodecs given
|
* Register handlers for a set of multicodecs given
|
||||||
* @param {Object} topologyProps properties for topology
|
* @param {Topology} topology protocol topology
|
||||||
* @param {Array<string>|string} topologyProps.multicodecs
|
|
||||||
* @param {Object} topologyProps.handlers
|
|
||||||
* @param {function} topologyProps.handlers.onConnect
|
|
||||||
* @param {function} topologyProps.handlers.onDisconnect
|
|
||||||
* @return {string} registrar identifier
|
* @return {string} registrar identifier
|
||||||
*/
|
*/
|
||||||
register (topologyProps) {
|
register (topology) {
|
||||||
// Create multicodec topology
|
assert(
|
||||||
|
Topology.isTopology(topology),
|
||||||
|
'topology must be an instance of interfaces/topology')
|
||||||
|
|
||||||
|
// Create topology
|
||||||
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
|
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
|
||||||
const topology = new Toplogy(topologyProps)
|
|
||||||
|
|
||||||
this.topologies.set(id, topology)
|
this.topologies.set(id, topology)
|
||||||
|
|
||||||
|
92
test/pubsub/configuration.node.js
Normal file
92
test/pubsub/configuration.node.js
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const { create } = require('../../src')
|
||||||
|
const { baseOptions, subsystemOptions } = require('./utils')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
|
||||||
|
describe('Pubsub subsystem is configurable', () => {
|
||||||
|
let libp2p
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not exist if no module is provided', async () => {
|
||||||
|
libp2p = await create(baseOptions)
|
||||||
|
expect(libp2p.pubsub).to.not.exist()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should exist if the module is provided', async () => {
|
||||||
|
libp2p = await create(subsystemOptions)
|
||||||
|
expect(libp2p.pubsub).to.exist()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should start and stop by default once libp2p starts', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(true)
|
||||||
|
|
||||||
|
await libp2p.stop()
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not start if disabled once libp2p starts', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo,
|
||||||
|
config: {
|
||||||
|
pubsub: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should allow a manual start', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
|
||||||
|
const customOptions = mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo,
|
||||||
|
config: {
|
||||||
|
pubsub: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
libp2p = await create(customOptions)
|
||||||
|
await libp2p.start()
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(false)
|
||||||
|
|
||||||
|
await libp2p.pubsub.start()
|
||||||
|
expect(libp2p.pubsub._pubsub.started).to.equal(true)
|
||||||
|
})
|
||||||
|
})
|
95
test/pubsub/implementations.node.js
Normal file
95
test/pubsub/implementations.node.js
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
|
||||||
|
const Floodsub = require('libp2p-floodsub')
|
||||||
|
const Gossipsub = require('libp2p-gossipsub')
|
||||||
|
const { multicodec: floodsubMulticodec } = require('libp2p-floodsub')
|
||||||
|
const { multicodec: gossipsubMulticodec } = require('libp2p-gossipsub')
|
||||||
|
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const { create } = require('../../src')
|
||||||
|
const { baseOptions } = require('./utils')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
|
||||||
|
describe('Pubsub subsystem is able to use different implementations', () => {
|
||||||
|
let peerInfo, remotePeerInfo
|
||||||
|
let libp2p, remoteLibp2p
|
||||||
|
let remAddr
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
|
||||||
|
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
remotePeerInfo.multiaddrs.add(remoteListenAddr)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => Promise.all([
|
||||||
|
libp2p && libp2p.stop(),
|
||||||
|
remoteLibp2p && remoteLibp2p.stop()
|
||||||
|
]))
|
||||||
|
|
||||||
|
it('Floodsub nodes', () => {
|
||||||
|
return pubsubTest(floodsubMulticodec, Floodsub)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Gossipsub nodes', () => {
|
||||||
|
return pubsubTest(gossipsubMulticodec, Gossipsub)
|
||||||
|
})
|
||||||
|
|
||||||
|
const pubsubTest = async (multicodec, pubsub) => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
|
||||||
|
libp2p = await create(mergeOptions(baseOptions, {
|
||||||
|
peerInfo,
|
||||||
|
modules: {
|
||||||
|
pubsub: pubsub
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
remoteLibp2p = await create(mergeOptions(baseOptions, {
|
||||||
|
peerInfo: remotePeerInfo,
|
||||||
|
modules: {
|
||||||
|
pubsub: pubsub
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
libp2p.start(),
|
||||||
|
remoteLibp2p.start()
|
||||||
|
])
|
||||||
|
|
||||||
|
const libp2pId = libp2p.peerInfo.id.toB58String()
|
||||||
|
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
|
||||||
|
const connection = await libp2p.dialProtocol(remAddr, multicodec)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
|
||||||
|
libp2p.pubsub.subscribe(topic, (msg) => {
|
||||||
|
expect(msg.data.toString()).to.equal(data)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait for remoteLibp2p to know about libp2p subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic)
|
||||||
|
return subscribedPeers.includes(libp2pId)
|
||||||
|
})
|
||||||
|
|
||||||
|
remoteLibp2p.pubsub.publish(topic, data)
|
||||||
|
await defer.promise
|
||||||
|
}
|
||||||
|
})
|
184
test/pubsub/operation.node.js
Normal file
184
test/pubsub/operation.node.js
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
const sinon = require('sinon')
|
||||||
|
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const { create } = require('../../src')
|
||||||
|
const { subsystemOptions, subsystemMulticodecs } = require('./utils')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
|
||||||
|
describe('Pubsub subsystem operates correctly', () => {
|
||||||
|
let peerInfo, remotePeerInfo
|
||||||
|
let libp2p, remoteLibp2p
|
||||||
|
let remAddr
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
|
||||||
|
|
||||||
|
peerInfo.multiaddrs.add(listenAddr)
|
||||||
|
remotePeerInfo.multiaddrs.add(remoteListenAddr)
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('pubsub started before connect', () => {
|
||||||
|
beforeEach(async () => {
|
||||||
|
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo: remotePeerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
libp2p.start(),
|
||||||
|
remoteLibp2p.start()
|
||||||
|
])
|
||||||
|
|
||||||
|
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => Promise.all([
|
||||||
|
libp2p && libp2p.stop(),
|
||||||
|
remoteLibp2p && remoteLibp2p.stop()
|
||||||
|
]))
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
sinon.restore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should get notified of connected peers on dial', async () => {
|
||||||
|
const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||||
|
|
||||||
|
expect(connection).to.exist()
|
||||||
|
|
||||||
|
return Promise.all([
|
||||||
|
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
|
||||||
|
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should receive pubsub messages', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
const libp2pId = libp2p.peerInfo.id.toB58String()
|
||||||
|
|
||||||
|
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
|
||||||
|
|
||||||
|
let subscribedTopics = libp2p.pubsub.getTopics()
|
||||||
|
expect(subscribedTopics).to.not.include(topic)
|
||||||
|
|
||||||
|
libp2p.pubsub.subscribe(topic, (msg) => {
|
||||||
|
expect(msg.data.toString()).to.equal(data)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
subscribedTopics = libp2p.pubsub.getTopics()
|
||||||
|
expect(subscribedTopics).to.include(topic)
|
||||||
|
|
||||||
|
// wait for remoteLibp2p to know about libp2p subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic)
|
||||||
|
return subscribedPeers.includes(libp2pId)
|
||||||
|
})
|
||||||
|
remoteLibp2p.pubsub.publish(topic, data)
|
||||||
|
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('pubsub started after connect', () => {
|
||||||
|
beforeEach(async () => {
|
||||||
|
libp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
remoteLibp2p = await create(mergeOptions(subsystemOptions, {
|
||||||
|
peerInfo: remotePeerInfo,
|
||||||
|
config: {
|
||||||
|
pubsub: {
|
||||||
|
enabled: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
await remoteLibp2p.start()
|
||||||
|
|
||||||
|
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => Promise.all([
|
||||||
|
libp2p && libp2p.stop(),
|
||||||
|
remoteLibp2p && remoteLibp2p.stop()
|
||||||
|
]))
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
sinon.restore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should get notified of connected peers after starting', async () => {
|
||||||
|
const connection = await libp2p.dial(remAddr)
|
||||||
|
|
||||||
|
expect(connection).to.exist()
|
||||||
|
expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0)
|
||||||
|
expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0)
|
||||||
|
|
||||||
|
remoteLibp2p.pubsub.start()
|
||||||
|
|
||||||
|
return Promise.all([
|
||||||
|
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
|
||||||
|
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should receive pubsub messages', async function () {
|
||||||
|
this.timeout(10e3)
|
||||||
|
const defer = pDefer()
|
||||||
|
const libp2pId = libp2p.peerInfo.id.toB58String()
|
||||||
|
const topic = 'test-topic'
|
||||||
|
const data = 'hey!'
|
||||||
|
|
||||||
|
await libp2p.dial(remAddr)
|
||||||
|
|
||||||
|
remoteLibp2p.pubsub.start()
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1),
|
||||||
|
pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1)
|
||||||
|
])
|
||||||
|
|
||||||
|
let subscribedTopics = libp2p.pubsub.getTopics()
|
||||||
|
expect(subscribedTopics).to.not.include(topic)
|
||||||
|
|
||||||
|
libp2p.pubsub.subscribe(topic, (msg) => {
|
||||||
|
expect(msg.data.toString()).to.equal(data)
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
subscribedTopics = libp2p.pubsub.getTopics()
|
||||||
|
expect(subscribedTopics).to.include(topic)
|
||||||
|
|
||||||
|
// wait for remoteLibp2p to know about libp2p subscription
|
||||||
|
await pWaitFor(() => {
|
||||||
|
const subscribedPeers = remoteLibp2p.pubsub.getPeersSubscribed(topic)
|
||||||
|
return subscribedPeers.includes(libp2pId)
|
||||||
|
})
|
||||||
|
|
||||||
|
remoteLibp2p.pubsub.publish(topic, data)
|
||||||
|
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
29
test/pubsub/utils.js
Normal file
29
test/pubsub/utils.js
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const Gossipsub = require('libp2p-gossipsub')
|
||||||
|
const { multicodec } = require('libp2p-gossipsub')
|
||||||
|
const Crypto = require('../../src/insecure/plaintext')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const Transport = require('libp2p-tcp')
|
||||||
|
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
|
||||||
|
const baseOptions = {
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [Crypto]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.baseOptions = baseOptions
|
||||||
|
|
||||||
|
const subsystemOptions = mergeOptions(baseOptions, {
|
||||||
|
modules: {
|
||||||
|
pubsub: Gossipsub
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
module.exports.subsystemOptions = subsystemOptions
|
||||||
|
|
||||||
|
module.exports.subsystemMulticodecs = [multicodec]
|
@ -7,6 +7,7 @@ const { expect } = chai
|
|||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
|
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
|
const Topology = require('libp2p-interfaces/src/topology/multicodec-topology')
|
||||||
const PeerStore = require('../../src/peer-store')
|
const PeerStore = require('../../src/peer-store')
|
||||||
const Registrar = require('../../src/registrar')
|
const Registrar = require('../../src/registrar')
|
||||||
const { createMockConnection } = require('./utils')
|
const { createMockConnection } = require('./utils')
|
||||||
@ -32,52 +33,17 @@ describe('registrar', () => {
|
|||||||
throw new Error('should fail to register a protocol if no multicodec is provided')
|
throw new Error('should fail to register a protocol if no multicodec is provided')
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should fail to register a protocol if no handlers are provided', () => {
|
it('should fail to register a protocol if an invalid topology is provided', () => {
|
||||||
const topologyProps = {
|
const fakeTopology = {
|
||||||
multicodecs: multicodec
|
random: 1
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
registrar.register(topologyProps)
|
registrar.register()
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
expect(err).to.exist()
|
expect(err).to.exist(fakeTopology)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
throw new Error('should fail to register a protocol if no handlers are provided')
|
throw new Error('should fail to register a protocol if an invalid topology is provided')
|
||||||
})
|
|
||||||
|
|
||||||
it('should fail to register a protocol if the onConnect handler is not provided', () => {
|
|
||||||
const topologyProps = {
|
|
||||||
multicodecs: multicodec,
|
|
||||||
handlers: {
|
|
||||||
onDisconnect: () => { }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
registrar.register(topologyProps)
|
|
||||||
} catch (err) {
|
|
||||||
expect(err).to.exist()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
throw new Error('should fail to register a protocol if the onConnect handler is not provided')
|
|
||||||
})
|
|
||||||
|
|
||||||
it('should fail to register a protocol if the onDisconnect handler is not provided', () => {
|
|
||||||
const topologyProps = {
|
|
||||||
multicodecs: multicodec,
|
|
||||||
handlers: {
|
|
||||||
onConnect: () => { }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
registrar.register(topologyProps)
|
|
||||||
} catch (err) {
|
|
||||||
expect(err).to.exist()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
throw new Error('should fail to register a protocol if the onDisconnect handler is not provided')
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -88,13 +54,13 @@ describe('registrar', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it('should be able to register a protocol', () => {
|
it('should be able to register a protocol', () => {
|
||||||
const topologyProps = {
|
const topologyProps = new Topology({
|
||||||
|
multicodecs: multicodec,
|
||||||
handlers: {
|
handlers: {
|
||||||
onConnect: () => { },
|
onConnect: () => { },
|
||||||
onDisconnect: () => { }
|
onDisconnect: () => { }
|
||||||
},
|
}
|
||||||
multicodecs: multicodec
|
})
|
||||||
}
|
|
||||||
|
|
||||||
const identifier = registrar.register(topologyProps)
|
const identifier = registrar.register(topologyProps)
|
||||||
|
|
||||||
@ -102,13 +68,13 @@ describe('registrar', () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it('should be able to unregister a protocol', () => {
|
it('should be able to unregister a protocol', () => {
|
||||||
const topologyProps = {
|
const topologyProps = new Topology({
|
||||||
|
multicodecs: multicodec,
|
||||||
handlers: {
|
handlers: {
|
||||||
onConnect: () => { },
|
onConnect: () => { },
|
||||||
onDisconnect: () => { }
|
onDisconnect: () => { }
|
||||||
},
|
}
|
||||||
multicodecs: multicodec
|
})
|
||||||
}
|
|
||||||
|
|
||||||
const identifier = registrar.register(topologyProps)
|
const identifier = registrar.register(topologyProps)
|
||||||
const success = registrar.unregister(identifier)
|
const success = registrar.unregister(identifier)
|
||||||
@ -138,7 +104,7 @@ describe('registrar', () => {
|
|||||||
registrar.onConnect(remotePeerInfo, conn)
|
registrar.onConnect(remotePeerInfo, conn)
|
||||||
expect(registrar.connections.size).to.eql(1)
|
expect(registrar.connections.size).to.eql(1)
|
||||||
|
|
||||||
const topologyProps = {
|
const topologyProps = new Topology({
|
||||||
multicodecs: multicodec,
|
multicodecs: multicodec,
|
||||||
handlers: {
|
handlers: {
|
||||||
onConnect: (peerInfo, connection) => {
|
onConnect: (peerInfo, connection) => {
|
||||||
@ -153,7 +119,7 @@ describe('registrar', () => {
|
|||||||
onDisconnectDefer.resolve()
|
onDisconnectDefer.resolve()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
// Register protocol
|
// Register protocol
|
||||||
const identifier = registrar.register(topologyProps)
|
const identifier = registrar.register(topologyProps)
|
||||||
@ -161,11 +127,9 @@ describe('registrar', () => {
|
|||||||
|
|
||||||
// Topology created
|
// Topology created
|
||||||
expect(topology).to.exist()
|
expect(topology).to.exist()
|
||||||
expect(topology.peers.size).to.eql(1)
|
|
||||||
|
|
||||||
registrar.onDisconnect(remotePeerInfo)
|
registrar.onDisconnect(remotePeerInfo)
|
||||||
expect(registrar.connections.size).to.eql(0)
|
expect(registrar.connections.size).to.eql(0)
|
||||||
expect(topology.peers.size).to.eql(1) // topology should keep the peer
|
|
||||||
|
|
||||||
// Wait for handlers to be called
|
// Wait for handlers to be called
|
||||||
return Promise.all([
|
return Promise.all([
|
||||||
@ -178,7 +142,7 @@ describe('registrar', () => {
|
|||||||
const onConnectDefer = pDefer()
|
const onConnectDefer = pDefer()
|
||||||
const onDisconnectDefer = pDefer()
|
const onDisconnectDefer = pDefer()
|
||||||
|
|
||||||
const topologyProps = {
|
const topologyProps = new Topology({
|
||||||
multicodecs: multicodec,
|
multicodecs: multicodec,
|
||||||
handlers: {
|
handlers: {
|
||||||
onConnect: () => {
|
onConnect: () => {
|
||||||
@ -188,7 +152,7 @@ describe('registrar', () => {
|
|||||||
onDisconnectDefer.resolve()
|
onDisconnectDefer.resolve()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
// Register protocol
|
// Register protocol
|
||||||
const identifier = registrar.register(topologyProps)
|
const identifier = registrar.register(topologyProps)
|
||||||
@ -196,7 +160,6 @@ describe('registrar', () => {
|
|||||||
|
|
||||||
// Topology created
|
// Topology created
|
||||||
expect(topology).to.exist()
|
expect(topology).to.exist()
|
||||||
expect(topology.peers.size).to.eql(0)
|
|
||||||
expect(registrar.connections.size).to.eql(0)
|
expect(registrar.connections.size).to.eql(0)
|
||||||
|
|
||||||
// Setup connections before registrar
|
// Setup connections before registrar
|
||||||
@ -212,7 +175,6 @@ describe('registrar', () => {
|
|||||||
peerStore.put(peerInfo)
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
await onConnectDefer.promise
|
await onConnectDefer.promise
|
||||||
expect(topology.peers.size).to.eql(1)
|
|
||||||
|
|
||||||
// Remove protocol to peer and update it
|
// Remove protocol to peer and update it
|
||||||
peerInfo.protocols.delete(multicodec)
|
peerInfo.protocols.delete(multicodec)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user