mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
fix: pubsub configuration (#404)
* fix: add pubsub default config (#401) License: MIT Signed-off-by: Matthias Knopp <matthias-knopp@gmx.net> * docs: add default pubsub config to README (#401) License: MIT Signed-off-by: Matthias Knopp <matthias-knopp@gmx.net> * fix: pass config to provided PubSub (#401) License: MIT Signed-off-by: Matthias Knopp <matthias-knopp@gmx.net> * docs: adapt pubsub/example for new config (#401) License: MIT Signed-off-by: Matthias Knopp <matthias-knopp@gmx.net> * Update examples/pubsub/README.md Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * test: add pubsub config tests (#401) License: MIT Signed-off-by: Matthias Knopp <matthias-knopp@gmx.net>
This commit is contained in:
parent
b294301456
commit
b0f124b5ff
@ -192,7 +192,10 @@ class Node extends Libp2p {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
pubsub: {
|
pubsub: {
|
||||||
enabled: true
|
enabled: true,
|
||||||
|
emitSelf: true, // whether the node should emit to self on publish, in the event of the topic being subscribed
|
||||||
|
signMessages: true, // if messages should be signed
|
||||||
|
strictSigning: true // if message signing should be required
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ const Gossipsub = require('libp2p-gossipsub')
|
|||||||
const defaultsDeep = require('@nodeutils/defaults-deep')
|
const defaultsDeep = require('@nodeutils/defaults-deep')
|
||||||
const waterfall = require('async/waterfall')
|
const waterfall = require('async/waterfall')
|
||||||
const parallel = require('async/parallel')
|
const parallel = require('async/parallel')
|
||||||
|
const series = require('async/series')
|
||||||
|
|
||||||
class MyBundle extends libp2p {
|
class MyBundle extends libp2p {
|
||||||
constructor (_options) {
|
constructor (_options) {
|
||||||
@ -28,6 +29,10 @@ class MyBundle extends libp2p {
|
|||||||
interval: 2000,
|
interval: 2000,
|
||||||
enabled: true
|
enabled: true
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
pubsub: {
|
||||||
|
enabled: true,
|
||||||
|
emitSelf: true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,19 +68,36 @@ parallel([
|
|||||||
node1.once('peer:connect', (peer) => {
|
node1.once('peer:connect', (peer) => {
|
||||||
console.log('connected to %s', peer.id.toB58String())
|
console.log('connected to %s', peer.id.toB58String())
|
||||||
|
|
||||||
// Subscribe to the topic 'news'
|
series([
|
||||||
node1.pubsub.subscribe('news',
|
// node1 subscribes to "news"
|
||||||
(msg) => console.log(msg.from, msg.data.toString()),
|
(cb) => node1.pubsub.subscribe(
|
||||||
() => {
|
'news',
|
||||||
|
(msg) => console.log(`node1 received: ${msg.data.toString()}`),
|
||||||
|
cb
|
||||||
|
),
|
||||||
|
(cb) => setTimeout(cb, 500),
|
||||||
|
// node2 subscribes to "news"
|
||||||
|
(cb) => node2.pubsub.subscribe(
|
||||||
|
'news',
|
||||||
|
(msg) => console.log(`node2 received: ${msg.data.toString()}`),
|
||||||
|
cb
|
||||||
|
),
|
||||||
|
(cb) => setTimeout(cb, 500),
|
||||||
|
// node2 publishes "news" every second
|
||||||
|
(cb) => {
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
// Publish the message on topic 'news'
|
|
||||||
node2.pubsub.publish(
|
node2.pubsub.publish(
|
||||||
'news',
|
'news',
|
||||||
Buffer.from('Bird bird bird, bird is the word!'),
|
Buffer.from('Bird bird bird, bird is the word!'),
|
||||||
() => {}
|
(err) => {
|
||||||
|
if (err) { throw err }
|
||||||
|
}
|
||||||
)
|
)
|
||||||
}, 1000)
|
}, 1000)
|
||||||
}
|
cb()
|
||||||
)
|
},
|
||||||
|
], (err) => {
|
||||||
|
if (err) { throw err }
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
# Publish Subscribe
|
# Publish Subscribe
|
||||||
|
|
||||||
Publish Subscribe is also included on the stack. Currently, we have on PubSub implementation which we ship by default [libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub), with many more being researched at [research-pubsub](https://github.com/libp2p/research-pubsub).
|
Publish Subscribe is also included on the stack. Currently, we have two PubSub implementation available [libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub) and [libp2p-gossipsub](https://github.com/ChainSafe/gossipsub-js), with many more being researched at [research-pubsub](https://github.com/libp2p/research-pubsub).
|
||||||
|
|
||||||
We've seen many interesting use cases appear with this, here are some highlights:
|
We've seen many interesting use cases appear with this, here are some highlights:
|
||||||
|
|
||||||
@ -12,26 +12,43 @@ We've seen many interesting use cases appear with this, here are some highlights
|
|||||||
|
|
||||||
For this example, we will use MulticastDNS for automatic Peer Discovery. This example is based the previous examples found in [Discovery Mechanisms](../discovery-mechanisms). You can find the complete version at [1.js](./1.js).
|
For this example, we will use MulticastDNS for automatic Peer Discovery. This example is based the previous examples found in [Discovery Mechanisms](../discovery-mechanisms). You can find the complete version at [1.js](./1.js).
|
||||||
|
|
||||||
Using PubSub is super simple, all you have to do is start a libp2p node with `EXPERIMENTAL.pubsub` set to true.
|
Using PubSub is super simple, you only need to provide the implementation of your choice and you are ready to go. No need for extra configuration.
|
||||||
|
|
||||||
```JavaScript
|
```JavaScript
|
||||||
node1.once('peer:connect', (peer) => {
|
node1.once('peer:connect', (peer) => {
|
||||||
console.log('connected to %s', peer.id.toB58String())
|
console.log('connected to %s', peer.id.toB58String())
|
||||||
|
|
||||||
// Subscribe to the topic 'news'
|
series([
|
||||||
node1.pubsub.subscribe('news',
|
// node1 subscribes to "news"
|
||||||
(msg) => console.log(msg.from, msg.data.toString()),
|
(cb) => node1.pubsub.subscribe(
|
||||||
() => {
|
'news',
|
||||||
|
(msg) => console.log(`node1 received: ${msg.data.toString()}`),
|
||||||
|
cb
|
||||||
|
),
|
||||||
|
(cb) => setTimeout(cb, 500),
|
||||||
|
// node2 subscribes to "news"
|
||||||
|
(cb) => node2.pubsub.subscribe(
|
||||||
|
'news',
|
||||||
|
(msg) => console.log(`node2 received: ${msg.data.toString()}`),
|
||||||
|
cb
|
||||||
|
),
|
||||||
|
(cb) => setTimeout(cb, 500),
|
||||||
|
// node2 publishes "news" every second
|
||||||
|
(cb) => {
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
// Publish the message on topic 'news'
|
|
||||||
node2.pubsub.publish(
|
node2.pubsub.publish(
|
||||||
'news',
|
'news',
|
||||||
Buffer.from('Bird bird bird, bird is the word!'),
|
Buffer.from('Bird bird bird, bird is the word!'),
|
||||||
() => {}
|
(err) => {
|
||||||
|
if (err) { throw err }
|
||||||
|
}
|
||||||
)
|
)
|
||||||
}, 1000)
|
}, 1000)
|
||||||
}
|
cb()
|
||||||
)
|
},
|
||||||
|
], (err) => {
|
||||||
|
if (err) { throw err }
|
||||||
|
})
|
||||||
})
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -40,11 +57,29 @@ The output of the program should look like:
|
|||||||
```
|
```
|
||||||
> node 1.js
|
> node 1.js
|
||||||
connected to QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82
|
connected to QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82
|
||||||
QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 Bird bird bird, bird is the word!
|
node2 received: Bird bird bird, bird is the word!
|
||||||
QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 Bird bird bird, bird is the word!
|
node1 received: Bird bird bird, bird is the word!
|
||||||
QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 Bird bird bird, bird is the word!
|
node2 received: Bird bird bird, bird is the word!
|
||||||
QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 Bird bird bird, bird is the word!
|
node1 received: Bird bird bird, bird is the word!
|
||||||
QmWpvkKm6qHLhoxpWrTswY6UMNWDyn8hN265Qp9ZYvgS82 Bird bird bird, bird is the word!
|
```
|
||||||
|
|
||||||
|
You can change the pubsub `emitSelf` option if you don't want the publishing node to receive its own messages.
|
||||||
|
|
||||||
|
```JavaScript
|
||||||
|
const defaults = {
|
||||||
|
config: {
|
||||||
|
peerDiscovery: {
|
||||||
|
mdns: {
|
||||||
|
interval: 2000,
|
||||||
|
enabled: true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
pubsub: {
|
||||||
|
enabled: true,
|
||||||
|
emitSelf: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## 2. Future work
|
## 2. Future work
|
||||||
|
@ -63,7 +63,10 @@ const configSchema = s({
|
|||||||
// Pubsub config
|
// Pubsub config
|
||||||
pubsub: s('object?', {
|
pubsub: s('object?', {
|
||||||
// Pubsub defaults
|
// Pubsub defaults
|
||||||
enabled: true
|
enabled: true,
|
||||||
|
emitSelf: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
})
|
})
|
||||||
}, {})
|
}, {})
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
// start pubsub
|
// start pubsub
|
||||||
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
|
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
|
||||||
this.pubsub = pubsub(this, this._modules.pubsub)
|
this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attach remaining APIs
|
// Attach remaining APIs
|
||||||
|
@ -6,8 +6,8 @@ const promisify = require('promisify-es6')
|
|||||||
|
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
|
|
||||||
module.exports = (node, Pubsub) => {
|
module.exports = (node, Pubsub, config) => {
|
||||||
const pubsub = new Pubsub(node, { emitSelf: true })
|
const pubsub = new Pubsub(node, config)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
/**
|
/**
|
||||||
|
@ -83,7 +83,10 @@ describe('configuration', () => {
|
|||||||
autoDial: true
|
autoDial: true
|
||||||
},
|
},
|
||||||
pubsub: {
|
pubsub: {
|
||||||
enabled: true
|
enabled: true,
|
||||||
|
emitSelf: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
},
|
},
|
||||||
dht: {
|
dht: {
|
||||||
kBucketSize: 20,
|
kBucketSize: 20,
|
||||||
@ -145,7 +148,10 @@ describe('configuration', () => {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
pubsub: {
|
pubsub: {
|
||||||
enabled: true
|
enabled: true,
|
||||||
|
emitSelf: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
},
|
},
|
||||||
dht: {
|
dht: {
|
||||||
kBucketSize: 20,
|
kBucketSize: 20,
|
||||||
@ -270,7 +276,10 @@ describe('configuration', () => {
|
|||||||
},
|
},
|
||||||
config: {
|
config: {
|
||||||
pubsub: {
|
pubsub: {
|
||||||
enabled: true
|
enabled: true,
|
||||||
|
emitSelf: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
},
|
},
|
||||||
peerDiscovery: {
|
peerDiscovery: {
|
||||||
autoDial: true
|
autoDial: true
|
||||||
|
@ -367,4 +367,70 @@ describe('.pubsub', () => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe('.pubsub config', () => {
|
||||||
|
it('toggle all pubsub options off (except enabled)', done => {
|
||||||
|
expect(3).checks(done)
|
||||||
|
|
||||||
|
class PubSubSpy {
|
||||||
|
constructor (node, config) {
|
||||||
|
expect(config).to.be.eql({
|
||||||
|
enabled: true,
|
||||||
|
selfEmit: false,
|
||||||
|
signMessages: false,
|
||||||
|
strictSigning: false
|
||||||
|
}).mark()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
||||||
|
modules: {
|
||||||
|
pubsub: PubSubSpy
|
||||||
|
},
|
||||||
|
config: {
|
||||||
|
pubsub: {
|
||||||
|
enabled: true,
|
||||||
|
selfEmit: false,
|
||||||
|
signMessages: false,
|
||||||
|
strictSigning: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (err, node) => {
|
||||||
|
expect(err).to.not.exist().mark()
|
||||||
|
expect(node).to.exist().mark()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('toggle all pubsub options on', done => {
|
||||||
|
expect(3).checks(done)
|
||||||
|
|
||||||
|
class PubSubSpy {
|
||||||
|
constructor (node, config) {
|
||||||
|
expect(config).to.be.eql({
|
||||||
|
enabled: true,
|
||||||
|
selfEmit: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
|
}).mark()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createNode('/ip4/0.0.0.0/tcp/0', {
|
||||||
|
modules: {
|
||||||
|
pubsub: PubSubSpy
|
||||||
|
},
|
||||||
|
config: {
|
||||||
|
pubsub: {
|
||||||
|
enabled: true,
|
||||||
|
selfEmit: true,
|
||||||
|
signMessages: true,
|
||||||
|
strictSigning: true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (err, node) => {
|
||||||
|
expect(err).to.not.exist().mark()
|
||||||
|
expect(node).to.exist().mark()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user