feat: integrate gossipsub by default (#365)

BREAKING CHANGE: new configuration for deciding the implementation of pubsub to be used.
In this context, the experimental flags were also removed. See the README for the latest usage.
This commit is contained in:
Vasco Santos
2019-07-31 09:38:14 +02:00
committed by Jacob Heun
parent 65d52857a5
commit 791f39a09b
10 changed files with 211 additions and 86 deletions

View File

@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-kad-dht')
const GossipSub = require('libp2p-gossipsub')
const defaultsDeep = require('@nodeutils/defaults-deep')
const Protector = require('libp2p-pnet')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
@ -154,7 +155,8 @@ class Node extends Libp2p {
peerDiscovery: [
MulticastDNS
],
dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components
dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components
pubsub: GossipSub
},
// libp2p config options (typically found on a config.json)
@ -187,9 +189,8 @@ class Node extends Libp2p {
timeout: 10e3
}
},
// Enable/Disable Experimental features
EXPERIMENTAL: { // Experimental features ("behind a flag")
pubsub: false
pubsub: {
enabled: true
}
}
}

View File

@ -48,7 +48,6 @@
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"libp2p-connection-manager": "^0.1.0",
"libp2p-floodsub": "^0.16.1",
"libp2p-ping": "^0.8.5",
"libp2p-switch": "^0.42.12",
"libp2p-websockets": "^0.12.2",
@ -74,6 +73,8 @@
"libp2p-circuit": "^0.3.7",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-kad-dht": "^0.15.3",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.8.4",
@ -84,6 +85,7 @@
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash.times": "^4.3.2",
"merge-options": "^1.0.1",
"nock": "^10.0.6",
"pull-goodbye": "0.0.2",
"pull-mplex": "^0.1.2",

View File

@ -19,6 +19,7 @@ const modulesSchema = s({
connProtector: s.union(['undefined', s.interface({ protect: 'function' })]),
contentRouting: optional(list(['object'])),
dht: optional(s('null|function|object')),
pubsub: optional(s('null|function|object')),
peerDiscovery: optional(list([s('object|function')])),
peerRouting: optional(list(['object'])),
streamMuxer: optional(list([s('object|function')])),
@ -59,12 +60,10 @@ const configSchema = s({
timeout: 10e3
}
}),
// Experimental config
EXPERIMENTAL: s({
pubsub: 'boolean'
}, {
// Experimental defaults
pubsub: false
// Pubsub config
pubsub: s('object?', {
// DHT defaults
enabled: false
})
}, {})

View File

@ -122,9 +122,9 @@ class Libp2p extends EventEmitter {
})
}
// enable/disable pubsub
if (this._config.EXPERIMENTAL.pubsub) {
this.pubsub = pubsub(this)
// start pubsub
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
this.pubsub = pubsub(this, this._modules.pubsub)
}
// Attach remaining APIs
@ -403,8 +403,8 @@ class Libp2p extends EventEmitter {
}
},
(cb) => {
if (this._floodSub) {
return this._floodSub.start(cb)
if (this.pubsub) {
return this.pubsub.start(cb)
}
cb()
},
@ -442,8 +442,8 @@ class Libp2p extends EventEmitter {
)
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
if (this.pubsub) {
return this.pubsub.stop(cb)
}
cb()
},

View File

@ -2,15 +2,12 @@
const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')
const promisify = require('promisify-es6')
const errCode = require('err-code')
module.exports = (node) => {
const floodSub = new FloodSub(node)
node._floodSub = floodSub
module.exports = (node, Pubsub) => {
const pubsub = new Pubsub(node, { emitSelf: true })
return {
/**
@ -41,16 +38,16 @@ module.exports = (node) => {
options = {}
}
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}
floodSub.on(topic, handler)
pubsub.on(topic, handler)
nextTick(cb)
}
@ -80,18 +77,18 @@ module.exports = (node) => {
* libp2p.unsubscribe(topic, handler, callback)
*/
unsubscribe: promisify((topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
if (!handler) {
floodSub.removeAllListeners(topic)
pubsub.removeAllListeners(topic)
} else {
floodSub.removeListener(topic, handler)
pubsub.removeListener(topic, handler)
}
if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
}
if (typeof callback === 'function') {
@ -102,29 +99,31 @@ module.exports = (node) => {
}),
publish: promisify((topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
if (!Buffer.isBuffer(data)) {
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
try {
data = Buffer.from(data)
} catch (err) {
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
}
floodSub.publish(topic, data, callback)
pubsub.publish(topic, data, callback)
}),
ls: promisify((callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
const subscriptions = Array.from(floodSub.subscriptions)
const subscriptions = Array.from(pubsub.subscriptions)
nextTick(() => callback(null, subscriptions))
}),
peers: promisify((topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
@ -133,7 +132,7 @@ module.exports = (node) => {
topic = null
}
const peers = Array.from(floodSub.peers.values())
const peers = Array.from(pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())
@ -141,7 +140,11 @@ module.exports = (node) => {
}),
setMaxListeners (n) {
return floodSub.setMaxListeners(n)
}
return pubsub.setMaxListeners(n)
},
start: (cb) => pubsub.start(cb),
stop: (cb) => pubsub.stop(cb)
}
}

View File

@ -82,8 +82,8 @@ describe('configuration', () => {
peerDiscovery: {
autoDial: true
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
@ -144,8 +144,8 @@ describe('configuration', () => {
enabled: true
}
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
@ -269,8 +269,8 @@ describe('configuration', () => {
dht: DHT
},
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
peerDiscovery: {
autoDial: true

View File

@ -19,8 +19,8 @@ describe('libp2p creation', () => {
it('should be able to start and stop successfully', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
},
dht: {
enabled: true
@ -32,7 +32,7 @@ describe('libp2p creation', () => {
const sw = node._switch
const cm = node.connectionManager
const dht = node._dht
const pub = node._floodSub
const pub = node.pubsub
sinon.spy(sw, 'start')
sinon.spy(cm, 'start')
@ -77,13 +77,13 @@ describe('libp2p creation', () => {
it('should not create disabled modules', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}, (err, node) => {
expect(err).to.not.exist()
expect(node._floodSub).to.not.exist()
expect(node._pubsub).to.not.exist()
done()
})
})

View File

@ -11,23 +11,31 @@ const parallel = require('async/parallel')
const series = require('async/series')
const _times = require('lodash.times')
const Floodsub = require('libp2p-floodsub')
const mergeOptions = require('merge-options')
const { codes } = require('../src/errors')
const createNode = require('./utils/create-node')
function startTwo (callback) {
function startTwo (options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
const tasks = _times(2, () => (cb) => {
createNode('/ip4/0.0.0.0/tcp/0', {
createNode('/ip4/0.0.0.0/tcp/0', mergeOptions({
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
}
}
}, (err, node) => {
}, options), (err, node) => {
expect(err).to.not.exist()
node.start((err) => cb(err, node))
})
@ -47,22 +55,17 @@ function stopTwo (nodes, callback) {
], callback)
}
// There is a vast test suite on PubSub through js-ipfs
// https://github.com/ipfs/interface-ipfs-core/blob/master/js/src/pubsub.js
// and libp2p-floodsub itself
// https://github.com/libp2p/js-libp2p-floodsub/tree/master/test
// TODO: consider if all or some of those should come here
describe('.pubsub', () => {
describe('.pubsub on (default)', (done) => {
describe('.pubsub on (default)', () => {
it('start two nodes and send one message, then unsubscribe', (done) => {
// Check the final series error, and the publish handler
expect(2).checks(done)
let nodes
const data = Buffer.from('test')
const data = 'test'
const handler = (msg) => {
// verify the data is correct and mark the expect
expect(msg.data).to.eql(data).mark()
expect(msg.data.toString()).to.eql(data).mark()
}
series([
@ -77,10 +80,6 @@ describe('.pubsub', () => {
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// ls subscripts
(cb) => nodes[1].pubsub.ls(cb),
// get subscribed peers
(cb) => nodes[1].pubsub.peers('pubsub', cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe on the first
@ -115,6 +114,123 @@ describe('.pubsub', () => {
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// ls subscripts
(cb) => nodes[1].pubsub.ls(cb),
// get subscribed peers
(cb) => nodes[1].pubsub.peers('pubsub', cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe from all
(cb) => nodes[0].pubsub.unsubscribe('pubsub', null, cb),
// Verify unsubscribed
(cb) => {
nodes[0].pubsub.ls((err, topics) => {
expect(topics.length).to.eql(0).mark()
cb(err)
})
},
// Stop both nodes
(cb) => stopTwo(nodes, cb)
], (err) => {
// Verify there was no error, and mark the expect
expect(err).to.not.exist().mark()
})
})
it('publish should fail if data is not a buffer nor a string', (done) => {
createNode('/ip4/0.0.0.0/tcp/0', {
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
pubsub: {
enabled: true
}
}
}, (err, node) => {
expect(err).to.not.exist()
node.start((err) => {
expect(err).to.not.exist()
node.pubsub.publish('pubsub', 10, (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DATA_IS_NOT_VALID')
done()
})
})
})
})
})
describe('.pubsub on using floodsub', () => {
it('start two nodes and send one message, then unsubscribe', (done) => {
// Check the final series error, and the publish handler
expect(2).checks(done)
let nodes
const data = Buffer.from('test')
const handler = (msg) => {
// verify the data is correct and mark the expect
expect(msg.data).to.eql(data).mark()
}
series([
// Start the nodes
(cb) => startTwo({
modules: {
pubsub: Floodsub
}
}, (err, _nodes) => {
nodes = _nodes
cb(err)
}),
// subscribe on the first
(cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
// Wait a moment before publishing
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe on the first
(cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb),
// Stop both nodes
(cb) => stopTwo(nodes, cb)
], (err) => {
// Verify there was no error, and mark the expect
expect(err).to.not.exist().mark()
})
})
it('start two nodes and send one message, then unsubscribe without handler', (done) => {
// Check the final series error, and the publish handler
expect(3).checks(done)
let nodes
const data = Buffer.from('test')
const handler = (msg) => {
// verify the data is correct and mark the expect
expect(msg.data).to.eql(data).mark()
}
series([
// Start the nodes
(cb) => startTwo({
modules: {
pubsub: Floodsub
}
}, (err, _nodes) => {
nodes = _nodes
cb(err)
}),
// subscribe on the first
(cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb),
// Wait a moment before publishing
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe from all
@ -141,9 +257,12 @@ describe('.pubsub', () => {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
}
},
modules: {
pubsub: Floodsub
}
}, (err, node) => {
expect(err).to.not.exist()
@ -151,9 +270,9 @@ describe('.pubsub', () => {
node.start((err) => {
expect(err).to.not.exist()
node.pubsub.publish('pubsub', 'datastr', (err) => {
node.pubsub.publish('pubsub', 10, (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER')
expect(err.code).to.equal('ERR_DATA_IS_NOT_VALID')
done()
})
@ -170,9 +289,6 @@ describe('.pubsub', () => {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: false
}
}
}, (err, node) => {
@ -194,8 +310,8 @@ describe('.pubsub', () => {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
}
}
}, (err, node) => {

View File

@ -8,6 +8,7 @@ const SPDY = require('libp2p-spdy')
const MPLEX = require('libp2p-mplex')
const PULLMPLEX = require('pull-mplex')
const KadDHT = require('libp2p-kad-dht')
const GossipSub = require('libp2p-gossipsub')
const SECIO = require('libp2p-secio')
const defaultsDeep = require('@nodeutils/defaults-deep')
const libp2p = require('../..')
@ -57,7 +58,8 @@ class Node extends libp2p {
wsStar.discovery,
Bootstrap
],
dht: KadDHT
dht: KadDHT,
pubsub: GossipSub
},
config: {
peerDiscovery: {
@ -88,8 +90,8 @@ class Node extends libp2p {
},
enabled: false
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}

View File

@ -6,6 +6,7 @@ const WS = require('libp2p-websockets')
const Bootstrap = require('libp2p-bootstrap')
const SPDY = require('libp2p-spdy')
const KadDHT = require('libp2p-kad-dht')
const GossipSub = require('libp2p-gossipsub')
const MPLEX = require('libp2p-mplex')
const PULLMPLEX = require('pull-mplex')
const SECIO = require('libp2p-secio')
@ -52,7 +53,8 @@ class Node extends libp2p {
MulticastDNS,
Bootstrap
],
dht: KadDHT
dht: KadDHT,
pubsub: GossipSub
},
config: {
peerDiscovery: {
@ -81,8 +83,8 @@ class Node extends libp2p {
},
enabled: true
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}