mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-26 07:21:36 +00:00
refactor: pubsub (#467)
* feat: peer-store v0 * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * chore: address review * refactor: pubsub subsystem * chore: address review * chore: use topology interface * chore: address review * chore: address review * chore: simplify tests
This commit is contained in:
@ -1,108 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
|
||||
class Topology {
|
||||
/**
|
||||
* @param {Object} props
|
||||
* @param {number} props.min minimum needed connections (default: 0)
|
||||
* @param {number} props.max maximum needed connections (default: Infinity)
|
||||
* @param {Array<string>} props.multicodecs protocol multicodecs
|
||||
* @param {Object} props.handlers
|
||||
* @param {function} props.handlers.onConnect protocol "onConnect" handler
|
||||
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
|
||||
* @constructor
|
||||
*/
|
||||
constructor ({
|
||||
min = 0,
|
||||
max = Infinity,
|
||||
multicodecs,
|
||||
handlers
|
||||
}) {
|
||||
assert(multicodecs, 'one or more multicodec should be provided')
|
||||
assert(handlers, 'the handlers should be provided')
|
||||
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
|
||||
'the \'onConnect\' handler must be provided')
|
||||
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
|
||||
'the \'onDisconnect\' handler must be provided')
|
||||
|
||||
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
|
||||
this.min = min
|
||||
this.max = max
|
||||
|
||||
// Handlers
|
||||
this._onConnect = handlers.onConnect
|
||||
this._onDisconnect = handlers.onDisconnect
|
||||
|
||||
this.peers = new Map()
|
||||
this._registrar = undefined
|
||||
|
||||
this._onProtocolChange = this._onProtocolChange.bind(this)
|
||||
}
|
||||
|
||||
set registrar (registrar) {
|
||||
this._registrar = registrar
|
||||
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
|
||||
|
||||
// Update topology peers
|
||||
this._updatePeers(this._registrar.peerStore.peers.values())
|
||||
}
|
||||
|
||||
/**
|
||||
* Update topology.
|
||||
* @param {Array<PeerInfo>} peerInfoIterable
|
||||
* @returns {void}
|
||||
*/
|
||||
_updatePeers (peerInfoIterable) {
|
||||
for (const peerInfo of peerInfoIterable) {
|
||||
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
|
||||
// Add the peer regardless of whether or not there is currently a connection
|
||||
this.peers.set(peerInfo.id.toB58String(), peerInfo)
|
||||
// If there is a connection, call _onConnect
|
||||
const connection = this._registrar.getConnection(peerInfo)
|
||||
connection && this._onConnect(peerInfo, connection)
|
||||
} else {
|
||||
// Remove any peers we might be tracking that are no longer of value to us
|
||||
this.peers.delete(peerInfo.id.toB58String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify protocol of peer disconnected.
|
||||
* @param {PeerInfo} peerInfo
|
||||
* @param {Error} [error]
|
||||
* @returns {void}
|
||||
*/
|
||||
disconnect (peerInfo, error) {
|
||||
this._onDisconnect(peerInfo, error)
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a new peer support the multicodecs for this topology.
|
||||
* @param {Object} props
|
||||
* @param {PeerInfo} props.peerInfo
|
||||
* @param {Array<string>} props.protocols
|
||||
*/
|
||||
_onProtocolChange ({ peerInfo, protocols }) {
|
||||
const existingPeer = this.peers.get(peerInfo.id.toB58String())
|
||||
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))
|
||||
|
||||
// Not supporting the protocol anymore?
|
||||
if (existingPeer && hasProtocol.length === 0) {
|
||||
this._onDisconnect({
|
||||
peerInfo
|
||||
})
|
||||
}
|
||||
|
||||
// New to protocol support
|
||||
for (const protocol of protocols) {
|
||||
if (this.multicodecs.includes(protocol)) {
|
||||
this._updatePeers([peerInfo])
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Topology
|
39
src/index.js
39
src/index.js
@ -1,7 +1,7 @@
|
||||
'use strict'
|
||||
|
||||
const FSM = require('fsm-event')
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const { EventEmitter } = require('events')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p')
|
||||
log.error = debug('libp2p:error')
|
||||
@ -9,7 +9,6 @@ const errCode = require('err-code')
|
||||
const promisify = require('promisify-es6')
|
||||
|
||||
const each = require('async/each')
|
||||
const nextTick = require('async/nextTick')
|
||||
|
||||
const PeerInfo = require('peer-info')
|
||||
const multiaddr = require('multiaddr')
|
||||
@ -66,6 +65,8 @@ class Libp2p extends EventEmitter {
|
||||
this._transport = [] // Transport instances/references
|
||||
this._discovery = [] // Discovery service instances/references
|
||||
|
||||
this.peerStore = new PeerStore()
|
||||
|
||||
// create the switch, and listen for errors
|
||||
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
|
||||
|
||||
@ -147,7 +148,7 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
|
||||
// start pubsub
|
||||
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
|
||||
if (this._modules.pubsub) {
|
||||
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
|
||||
}
|
||||
|
||||
@ -251,6 +252,7 @@ class Libp2p extends EventEmitter {
|
||||
this.state('stop')
|
||||
|
||||
try {
|
||||
this.pubsub && await this.pubsub.stop()
|
||||
await this.transportManager.close()
|
||||
await this._switch.stop()
|
||||
} catch (err) {
|
||||
@ -385,10 +387,16 @@ class Libp2p extends EventEmitter {
|
||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||
|
||||
// Start parallel tasks
|
||||
const tasks = [
|
||||
this.transportManager.listen(multiaddrs)
|
||||
]
|
||||
|
||||
if (this._config.pubsub.enabled) {
|
||||
this.pubsub && this.pubsub.start()
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
this.transportManager.listen(multiaddrs)
|
||||
])
|
||||
await Promise.all(tasks)
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
@ -483,16 +491,15 @@ module.exports = Libp2p
|
||||
* Like `new Libp2p(options)` except it will create a `PeerInfo`
|
||||
* instance if one is not provided in options.
|
||||
* @param {object} options Libp2p configuration options
|
||||
* @param {function(Error, Libp2p)} callback
|
||||
* @returns {void}
|
||||
* @returns {Libp2p}
|
||||
*/
|
||||
module.exports.createLibp2p = promisify((options, callback) => {
|
||||
module.exports.create = async (options = {}) => {
|
||||
if (options.peerInfo) {
|
||||
return nextTick(callback, null, new Libp2p(options))
|
||||
return new Libp2p(options)
|
||||
}
|
||||
PeerInfo.create((err, peerInfo) => {
|
||||
if (err) return callback(err)
|
||||
options.peerInfo = peerInfo
|
||||
callback(null, new Libp2p(options))
|
||||
})
|
||||
})
|
||||
|
||||
const peerInfo = await PeerInfo.create()
|
||||
|
||||
options.peerInfo = peerInfo
|
||||
return new Libp2p(options)
|
||||
}
|
||||
|
141
src/pubsub.js
141
src/pubsub.js
@ -1,52 +1,21 @@
|
||||
'use strict'
|
||||
|
||||
const nextTick = require('async/nextTick')
|
||||
const { messages, codes } = require('./errors')
|
||||
const promisify = require('promisify-es6')
|
||||
|
||||
const errCode = require('err-code')
|
||||
const { messages, codes } = require('./errors')
|
||||
|
||||
module.exports = (node, Pubsub, config) => {
|
||||
const pubsub = new Pubsub(node, config)
|
||||
const pubsub = new Pubsub(node.peerInfo, node.registrar, config)
|
||||
|
||||
return {
|
||||
/**
|
||||
* Subscribe the given handler to a pubsub topic
|
||||
*
|
||||
* @param {string} topic
|
||||
* @param {function} handler The handler to subscribe
|
||||
* @param {object|null} [options]
|
||||
* @param {function} [callback] An optional callback
|
||||
*
|
||||
* @returns {Promise|void} A promise is returned if no callback is provided
|
||||
*
|
||||
* @example <caption>Subscribe a handler to a topic</caption>
|
||||
*
|
||||
* // `null` must be passed for options until subscribe is no longer using promisify
|
||||
* const handler = (message) => { }
|
||||
* await libp2p.subscribe(topic, handler, null)
|
||||
*
|
||||
* @example <caption>Use a callback instead of the Promise api</caption>
|
||||
*
|
||||
* // `options` may be passed or omitted when supplying a callback
|
||||
* const handler = (message) => { }
|
||||
* libp2p.subscribe(topic, handler, callback)
|
||||
* @returns {void}
|
||||
*/
|
||||
subscribe: (topic, handler, options, callback) => {
|
||||
// can't use promisify because it thinks the handler is a callback
|
||||
if (typeof options === 'function') {
|
||||
callback = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
subscribe: (topic, handler) => {
|
||||
if (!node.isStarted() && !pubsub.started) {
|
||||
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
|
||||
if (callback) {
|
||||
return nextTick(() => callback(err))
|
||||
}
|
||||
|
||||
return Promise.reject(err)
|
||||
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
}
|
||||
|
||||
if (pubsub.listenerCount(topic) === 0) {
|
||||
@ -54,46 +23,16 @@ module.exports = (node, Pubsub, config) => {
|
||||
}
|
||||
|
||||
pubsub.on(topic, handler)
|
||||
|
||||
if (callback) {
|
||||
return nextTick(() => callback())
|
||||
}
|
||||
|
||||
return Promise.resolve()
|
||||
},
|
||||
|
||||
/**
|
||||
* Unsubscribes from a pubsub topic
|
||||
*
|
||||
* @param {string} topic
|
||||
* @param {function|null} handler The handler to unsubscribe from
|
||||
* @param {function} [callback] An optional callback
|
||||
*
|
||||
* @returns {Promise|void} A promise is returned if no callback is provided
|
||||
*
|
||||
* @example <caption>Unsubscribe a topic for all handlers</caption>
|
||||
*
|
||||
* // `null` must be passed until unsubscribe is no longer using promisify
|
||||
* await libp2p.unsubscribe(topic, null)
|
||||
*
|
||||
* @example <caption>Unsubscribe a topic for 1 handler</caption>
|
||||
*
|
||||
* await libp2p.unsubscribe(topic, handler)
|
||||
*
|
||||
* @example <caption>Use a callback instead of the Promise api</caption>
|
||||
*
|
||||
* libp2p.unsubscribe(topic, handler, callback)
|
||||
* @param {function} [handler] The handler to unsubscribe from
|
||||
*/
|
||||
unsubscribe: (topic, handler, callback) => {
|
||||
// can't use promisify because it thinks the handler is a callback
|
||||
unsubscribe: (topic, handler) => {
|
||||
if (!node.isStarted() && !pubsub.started) {
|
||||
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
|
||||
if (callback) {
|
||||
return nextTick(() => callback(err))
|
||||
}
|
||||
|
||||
return Promise.reject(err)
|
||||
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
}
|
||||
|
||||
if (!handler) {
|
||||
@ -105,61 +44,61 @@ module.exports = (node, Pubsub, config) => {
|
||||
if (pubsub.listenerCount(topic) === 0) {
|
||||
pubsub.unsubscribe(topic)
|
||||
}
|
||||
|
||||
if (callback) {
|
||||
return nextTick(() => callback())
|
||||
}
|
||||
|
||||
return Promise.resolve()
|
||||
},
|
||||
|
||||
publish: promisify((topic, data, callback) => {
|
||||
/**
|
||||
* Publish messages to the given topics.
|
||||
* @param {Array<string>|string} topic
|
||||
* @param {Buffer} data
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
publish: (topic, data) => {
|
||||
if (!node.isStarted() && !pubsub.started) {
|
||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
||||
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
}
|
||||
|
||||
try {
|
||||
data = Buffer.from(data)
|
||||
} catch (err) {
|
||||
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
|
||||
throw errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID')
|
||||
}
|
||||
|
||||
pubsub.publish(topic, data, callback)
|
||||
}),
|
||||
return pubsub.publish(topic, data)
|
||||
},
|
||||
|
||||
ls: promisify((callback) => {
|
||||
/**
|
||||
* Get a list of topics the node is subscribed to.
|
||||
* @returns {Array<String>} topics
|
||||
*/
|
||||
getTopics: () => {
|
||||
if (!node.isStarted() && !pubsub.started) {
|
||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
||||
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
}
|
||||
|
||||
const subscriptions = Array.from(pubsub.subscriptions)
|
||||
return pubsub.getTopics()
|
||||
},
|
||||
|
||||
nextTick(() => callback(null, subscriptions))
|
||||
}),
|
||||
|
||||
peers: promisify((topic, callback) => {
|
||||
/**
|
||||
* Get a list of the peer-ids that are subscribed to one topic.
|
||||
* @param {string} topic
|
||||
* @returns {Array<string>}
|
||||
*/
|
||||
getPeersSubscribed: (topic) => {
|
||||
if (!node.isStarted() && !pubsub.started) {
|
||||
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
|
||||
throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
|
||||
}
|
||||
|
||||
if (typeof topic === 'function') {
|
||||
callback = topic
|
||||
topic = null
|
||||
}
|
||||
|
||||
const peers = Array.from(pubsub.peers.values())
|
||||
.filter((peer) => topic ? peer.topics.has(topic) : true)
|
||||
.map((peer) => peer.info.id.toB58String())
|
||||
|
||||
nextTick(() => callback(null, peers))
|
||||
}),
|
||||
return pubsub.getPeersSubscribed(topic)
|
||||
},
|
||||
|
||||
setMaxListeners (n) {
|
||||
return pubsub.setMaxListeners(n)
|
||||
},
|
||||
|
||||
start: promisify((cb) => pubsub.start(cb)),
|
||||
_pubsub: pubsub,
|
||||
|
||||
stop: promisify((cb) => pubsub.stop(cb))
|
||||
start: () => pubsub.start(),
|
||||
|
||||
stop: () => pubsub.stop()
|
||||
}
|
||||
}
|
||||
|
@ -5,9 +5,9 @@ const debug = require('debug')
|
||||
const log = debug('libp2p:peer-store')
|
||||
log.error = debug('libp2p:peer-store:error')
|
||||
|
||||
const Topology = require('libp2p-interfaces/src/topology')
|
||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||
const PeerInfo = require('peer-info')
|
||||
const Toplogy = require('./connection-manager/topology')
|
||||
|
||||
/**
|
||||
* Responsible for notifying registered protocols of events in the network.
|
||||
@ -106,17 +106,16 @@ class Registrar {
|
||||
|
||||
/**
|
||||
* Register handlers for a set of multicodecs given
|
||||
* @param {Object} topologyProps properties for topology
|
||||
* @param {Array<string>|string} topologyProps.multicodecs
|
||||
* @param {Object} topologyProps.handlers
|
||||
* @param {function} topologyProps.handlers.onConnect
|
||||
* @param {function} topologyProps.handlers.onDisconnect
|
||||
* @param {Topology} topology protocol topology
|
||||
* @return {string} registrar identifier
|
||||
*/
|
||||
register (topologyProps) {
|
||||
// Create multicodec topology
|
||||
register (topology) {
|
||||
assert(
|
||||
Topology.isTopology(topology),
|
||||
'topology must be an instance of interfaces/topology')
|
||||
|
||||
// Create topology
|
||||
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
|
||||
const topology = new Toplogy(topologyProps)
|
||||
|
||||
this.topologies.set(id, topology)
|
||||
|
||||
|
Reference in New Issue
Block a user