mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-27 03:22:15 +00:00
feat: add pubsub to libp2p
This commit is contained in:
parent
beeb36c10c
commit
0c543b7180
@ -38,6 +38,7 @@
|
|||||||
"homepage": "https://github.com/libp2p/js-libp2p",
|
"homepage": "https://github.com/libp2p/js-libp2p",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"async": "^2.6.0",
|
"async": "^2.6.0",
|
||||||
|
"libp2p-floodsub": "^0.14.1",
|
||||||
"libp2p-ping": "~0.6.1",
|
"libp2p-ping": "~0.6.1",
|
||||||
"libp2p-switch": "~0.36.1",
|
"libp2p-switch": "~0.36.1",
|
||||||
"mafmt": "^4.0.0",
|
"mafmt": "^4.0.0",
|
||||||
|
3
src/error-messages.js
Normal file
3
src/error-messages.js
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
exports.NOT_STARTED_YET = 'The libp2p node is not started yet'
|
25
src/index.js
25
src/index.js
@ -14,6 +14,7 @@ const Ping = require('libp2p-ping')
|
|||||||
const peerRouting = require('./peer-routing')
|
const peerRouting = require('./peer-routing')
|
||||||
const contentRouting = require('./content-routing')
|
const contentRouting = require('./content-routing')
|
||||||
const dht = require('./dht')
|
const dht = require('./dht')
|
||||||
|
const pubsub = require('./pubsub')
|
||||||
const getPeerInfo = require('./get-peer-info')
|
const getPeerInfo = require('./get-peer-info')
|
||||||
|
|
||||||
exports = module.exports
|
exports = module.exports
|
||||||
@ -89,6 +90,7 @@ class Node extends EventEmitter {
|
|||||||
this.peerRouting = peerRouting(this)
|
this.peerRouting = peerRouting(this)
|
||||||
this.contentRouting = contentRouting(this)
|
this.contentRouting = contentRouting(this)
|
||||||
this.dht = dht(this)
|
this.dht = dht(this)
|
||||||
|
this.pubsub = pubsub(this)
|
||||||
|
|
||||||
this._getPeerInfo = getPeerInfo(this)
|
this._getPeerInfo = getPeerInfo(this)
|
||||||
|
|
||||||
@ -149,17 +151,29 @@ class Node extends EventEmitter {
|
|||||||
cb()
|
cb()
|
||||||
},
|
},
|
||||||
(cb) => {
|
(cb) => {
|
||||||
// TODO: chicken-and-egg problem:
|
// TODO: chicken-and-egg problem #1:
|
||||||
// have to set started here because DHT requires libp2p is already started
|
// have to set started here because DHT requires libp2p is already started
|
||||||
this._isStarted = true
|
this._isStarted = true
|
||||||
if (this._dht) {
|
if (this._dht) {
|
||||||
return this._dht.start(cb)
|
this._dht.start(cb)
|
||||||
}
|
} else {
|
||||||
cb()
|
cb()
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
(cb) => {
|
||||||
|
// TODO: chicken-and-egg problem #2:
|
||||||
|
// have to set started here because FloodSub requires libp2p is already started
|
||||||
|
if (this._options !== false) {
|
||||||
|
this._floodSub.start(cb)
|
||||||
|
} else {
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
(cb) => {
|
(cb) => {
|
||||||
// detect which multiaddrs we don't have a transport for and remove them
|
// detect which multiaddrs we don't have a transport for and remove them
|
||||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||||
|
|
||||||
transports.forEach((transport) => {
|
transports.forEach((transport) => {
|
||||||
multiaddrs.forEach((multiaddr) => {
|
multiaddrs.forEach((multiaddr) => {
|
||||||
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
|
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
|
||||||
@ -188,6 +202,11 @@ class Node extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
series([
|
series([
|
||||||
|
(cb) => {
|
||||||
|
if (this._floodSub.started) {
|
||||||
|
this._floodSub.stop(cb)
|
||||||
|
}
|
||||||
|
},
|
||||||
(cb) => {
|
(cb) => {
|
||||||
if (this._dht) {
|
if (this._dht) {
|
||||||
return this._dht.stop(cb)
|
return this._dht.stop(cb)
|
||||||
|
89
src/pubsub.js
Normal file
89
src/pubsub.js
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const setImmediate = require('async/setImmediate')
|
||||||
|
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
|
||||||
|
const FloodSub = require('libp2p-floodsub')
|
||||||
|
|
||||||
|
module.exports = (node) => {
|
||||||
|
const floodSub = new FloodSub(node)
|
||||||
|
|
||||||
|
node._floodSub = floodSub
|
||||||
|
|
||||||
|
return {
|
||||||
|
subscribe: (topic, options, handler, callback) => {
|
||||||
|
if (!node.isStarted()) {
|
||||||
|
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof options === 'function') {
|
||||||
|
callback = handler
|
||||||
|
handler = options
|
||||||
|
options = {}
|
||||||
|
}
|
||||||
|
|
||||||
|
function subscribe (cb) {
|
||||||
|
if (floodSub.listenerCount(topic) === 0) {
|
||||||
|
floodSub.subscribe(topic)
|
||||||
|
}
|
||||||
|
|
||||||
|
floodSub.pubsub.on(topic, handler)
|
||||||
|
setImmediate(cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribe(callback)
|
||||||
|
},
|
||||||
|
|
||||||
|
unsubscribe: (topic, handler) => {
|
||||||
|
floodSub.removeListener(topic, handler)
|
||||||
|
|
||||||
|
if (floodSub.listenerCount(topic) === 0) {
|
||||||
|
floodSub.unsubscribe(topic)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
publish: (topic, data, callback) => {
|
||||||
|
if (!node.isStarted()) {
|
||||||
|
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!Buffer.isBuffer(data)) {
|
||||||
|
return setImmediate(() => callback(new Error('data must be a Buffer')))
|
||||||
|
}
|
||||||
|
|
||||||
|
floodSub.publish(topic, data)
|
||||||
|
|
||||||
|
setImmediate(() => callback())
|
||||||
|
},
|
||||||
|
|
||||||
|
ls: (callback) => {
|
||||||
|
if (!node.isStarted()) {
|
||||||
|
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
|
||||||
|
}
|
||||||
|
|
||||||
|
const subscriptions = Array.from(floodSub.subscriptions)
|
||||||
|
|
||||||
|
setImmediate(() => callback(null, subscriptions))
|
||||||
|
},
|
||||||
|
|
||||||
|
peers: (topic, callback) => {
|
||||||
|
if (!node.isStarted()) {
|
||||||
|
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof topic === 'function') {
|
||||||
|
callback = topic
|
||||||
|
topic = null
|
||||||
|
}
|
||||||
|
|
||||||
|
const peers = Array.from(floodSub.peers.values())
|
||||||
|
.filter((peer) => topic ? peer.topics.has(topic) : true)
|
||||||
|
.map((peer) => peer.info.id.toB58String())
|
||||||
|
|
||||||
|
setImmediate(() => callback(null, peers))
|
||||||
|
},
|
||||||
|
|
||||||
|
setMaxListeners (n) {
|
||||||
|
return floodSub.setMaxListeners(n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ require('./base')
|
|||||||
require('./transports.node')
|
require('./transports.node')
|
||||||
require('./stream-muxing.node')
|
require('./stream-muxing.node')
|
||||||
require('./peer-discovery.node')
|
require('./peer-discovery.node')
|
||||||
|
require('./pubsub.node')
|
||||||
require('./peer-routing.node')
|
require('./peer-routing.node')
|
||||||
require('./content-routing.node')
|
require('./content-routing.node')
|
||||||
require('./circuit-relay.node')
|
require('./circuit-relay.node')
|
||||||
|
52
test/pubsub.node.js
Normal file
52
test/pubsub.node.js
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
/* eslint-env mocha */
|
||||||
|
/* eslint max-nested-callbacks: ["error", 8] */
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const expect = chai.expect
|
||||||
|
const parallel = require('async/parallel')
|
||||||
|
const _times = require('lodash.times')
|
||||||
|
const utils = require('./utils/node')
|
||||||
|
const createNode = utils.createNode
|
||||||
|
|
||||||
|
describe('.pubsub', () => {
|
||||||
|
let nodeA
|
||||||
|
let nodeB
|
||||||
|
|
||||||
|
before(function (done) {
|
||||||
|
this.timeout(5 * 1000)
|
||||||
|
|
||||||
|
const tasks = _times(2, () => (cb) => {
|
||||||
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
||||||
|
mdns: false,
|
||||||
|
dht: true
|
||||||
|
}, (err, node) => {
|
||||||
|
expect(err).to.not.exist()
|
||||||
|
node.start((err) => cb(err, node))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
parallel(tasks, (err, nodes) => {
|
||||||
|
expect(err).to.not.exist()
|
||||||
|
nodeA = nodes[0]
|
||||||
|
nodeB = nodes[1]
|
||||||
|
|
||||||
|
nodeA.dial(nodeB.peerInfo, done)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
after((done) => {
|
||||||
|
parallel([
|
||||||
|
(cb) => nodeA.stop(cb),
|
||||||
|
(cb) => nodeB.stop(cb)
|
||||||
|
], done)
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('.pubsub on (default)', () => {
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('.pubsub off', () => {
|
||||||
|
})
|
||||||
|
})
|
Loading…
x
Reference in New Issue
Block a user