From e14ce40b097da6a7d0ad5e40307ba0dd29f6d270 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 25 Aug 2020 16:48:04 +0200 Subject: [PATCH] feat: gossipsub 1.1 (#733) * feat: gossipsub 1.1 BREAKING CHANGE: pubsub implementation is now directly exposed and its API was updated according to the new pubsub interface in js-libp2p-interfaces repo * chore: use gossipsub branch with src added * fix: add pubsub handlers adapter * chore: fix deps * chore: update pubsub docs and examples * chore: apply suggestions from code review Co-authored-by: Jacob Heun * chore: use new floodsub * chore: change validator doc set Co-authored-by: Jacob Heun * chore: add new gossipsub src Co-authored-by: Jacob Heun --- .aegir.js | 2 +- doc/API.md | 141 ++++++++++++++++++-- examples/pubsub/1.js | 11 +- examples/pubsub/README.md | 10 +- examples/pubsub/message-filtering/1.js | 32 +++-- examples/pubsub/message-filtering/README.md | 31 +++-- package.json | 9 +- src/index.js | 8 +- src/pubsub-adapter.js | 40 ++++++ src/pubsub.js | 109 --------------- test/pubsub/configuration.node.js | 14 +- test/pubsub/operation.node.js | 16 +-- 12 files changed, 248 insertions(+), 175 deletions(-) create mode 100644 src/pubsub-adapter.js delete mode 100644 src/pubsub.js diff --git a/.aegir.js b/.aegir.js index 7ba3e1fb..d57cba6a 100644 --- a/.aegir.js +++ b/.aegir.js @@ -45,7 +45,7 @@ const after = async () => { } module.exports = { - bundlesize: { maxSize: '205kB' }, + bundlesize: { maxSize: '225kB' }, hooks: { pre: before, post: after diff --git a/doc/API.md b/doc/API.md index a401af63..79e788aa 100644 --- a/doc/API.md +++ b/doc/API.md @@ -46,6 +46,8 @@ * [`pubsub.publish`](#pubsubpublish) * [`pubsub.subscribe`](#pubsubsubscribe) * [`pubsub.unsubscribe`](#pubsubunsubscribe) + * [`pubsub.on`](#pubsubon) + * [`pubsub.removeListener`](#pubsubremovelistener) * [`connectionManager.get`](#connectionmanagerget) * [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue) * [`connectionManager.size`](#connectionmanagersize) @@ -1327,15 +1329,75 @@ await libp2p.pubsub.publish(topic, data) ### pubsub.subscribe -Subscribes the given handler to a pubsub topic. +Subscribes to a pubsub topic. -`libp2p.pubsub.subscribe(topic, handler)` +`libp2p.pubsub.subscribe(topic)` #### Parameters | Name | Type | Description | |------|------|-------------| | topic | `string` | topic to subscribe | + +#### Returns + +| Type | Description | +|------|-------------| +| `void` | | + +#### Example + +```js +const topic = 'topic' +const handler = (msg) => { + // msg.data - pubsub data received +} + +libp2p.pubsub.on(topic, handler) +libp2p.pubsub.subscribe(topic) +``` + +### pubsub.unsubscribe + +Unsubscribes from a pubsub topic. + +`libp2p.pubsub.unsubscribe(topic)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | topic to unsubscribe | + +#### Returns + +| Type | Description | +|------|-------------| +| `void` | | + +#### Example + +```js +const topic = 'topic' +const handler = (msg) => { + // msg.data - pubsub data received +} + +libp2p.pubsub.removeListener(topic handler) +libp2p.pubsub.unsubscribe(topic) +``` + +## pubsub.on + +A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers. + +`libp2p.pubsub.on(topic, handler)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | topic to listen | | handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic | #### Returns @@ -1352,21 +1414,22 @@ const handler = (msg) => { // msg.data - pubsub data received } -libp2p.pubsub.subscribe(topic, handler) +libp2p.pubsub.on(topic, handler) +libp2p.pubsub.subscribe(topic) ``` -### pubsub.unsubscribe +## pubsub.removeListener -Unsubscribes the given handler from a pubsub topic. If no handler is provided, all handlers for the topic are removed. +A Pubsub router is an [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter) and uses its events for pubsub message handlers. -`libp2p.pubsub.unsubscribe(topic, handler)` +`libp2p.pubsub.removeListener(topic, handler)` #### Parameters | Name | Type | Description | |------|------|-------------| -| topic | `string` | topic to unsubscribe | -| handler | `function()` | handler subscribed | +| topic | `string` | topic to remove listener | +| handler | `function({ from: string, data: Uint8Array, seqno: Uint8Array, topicIDs: Array, signature: Uint8Array, key: Uint8Array })` | handler for new data on topic | #### Returns @@ -1382,7 +1445,67 @@ const handler = (msg) => { // msg.data - pubsub data received } -libp2p.pubsub.unsubscribe(topic, handler) +libp2p.pubsub.removeListener(topic handler) +libp2p.pubsub.unsubscribe(topic) +``` + +## pubsub.topicValidators.set + +Pubsub routers support message validators per topic, which will validate the message before its propagations. Set is used to specify a validator for a topic. + +`libp2p.pubsub.topicValidators.set(topic, validator)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | topic to bind a validator | +| handler | `function({ topic: string, msg: RPC })` | validator for new data on topic | + +#### Returns + +| Type | Description | +|------|-------------| +| `Map` | The `Map` object | + +#### Example + +```js +const topic = 'topic' +const validateMessage = (msgTopic, msg) => { + const input = uint8ArrayToString(msg.data) + const validInputs = ['a', 'b', 'c'] + + if (!validInputs.includes(input)) { + throw new Error('no valid input received') + } +} +libp2p.pubsub.topicValidators.set(topic, validateMessage) +``` + +## pubsub.topicValidators.delete + +Pubsub routers support message validators per topic, which will validate the message before its propagations. Delete is used to remove a validator for a topic. + +`libp2p.pubsub.topicValidators.delete(topic)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | topic to remove a validator | + +#### Returns + +| Type | Description | +|------|-------------| +| `boolean` | `true` if an element in the Map object existed and has been removed, or `false` if the element does not exist. | + +#### Example + +```js +const topic = 'topic' +libp2p.pubsub.topicValidators.delete(topic) ``` ### connectionManager.get diff --git a/examples/pubsub/1.js b/examples/pubsub/1.js index 2c223715..62cf622d 100644 --- a/examples/pubsub/1.js +++ b/examples/pubsub/1.js @@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise') const SECIO = require('libp2p-secio') const Gossipsub = require('libp2p-gossipsub') const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') const createNode = async () => { const node = await Libp2p.create({ @@ -38,13 +39,15 @@ const createNode = async () => { node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs) await node1.dial(node2.peerId) - await node1.pubsub.subscribe(topic, (msg) => { - console.log(`node1 received: ${msg.data.toString()}`) + node1.pubsub.on(topic, (msg) => { + console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) + await node1.pubsub.subscribe(topic) - await node2.pubsub.subscribe(topic, (msg) => { - console.log(`node2 received: ${msg.data.toString()}`) + node2.pubsub.on(topic, (msg) => { + console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) + await node2.pubsub.subscribe(topic) // node2 publishes "news" every second setInterval(() => { diff --git a/examples/pubsub/README.md b/examples/pubsub/README.md index 0ed99c52..96e7892a 100644 --- a/examples/pubsub/README.md +++ b/examples/pubsub/README.md @@ -47,13 +47,15 @@ node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs) await node1.dial(node2.peerId) -await node1.pubsub.subscribe(topic, (msg) => { - console.log(`node1 received: ${msg.data.toString()}`) +node1.pubsub.on(topic, (msg) => { + console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) +await node1.pubsub.subscribe(topic) -await node2.pubsub.subscribe(topic, (msg) => { - console.log(`node2 received: ${msg.data.toString()}`) +node2.pubsub.on(topic, (msg) => { + console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) +await node2.pubsub.subscribe(topic) // node2 publishes "news" every second setInterval(() => { diff --git a/examples/pubsub/message-filtering/1.js b/examples/pubsub/message-filtering/1.js index a6f63fe1..fffcd2e7 100644 --- a/examples/pubsub/message-filtering/1.js +++ b/examples/pubsub/message-filtering/1.js @@ -8,6 +8,7 @@ const { NOISE } = require('libp2p-noise') const SECIO = require('libp2p-secio') const Gossipsub = require('libp2p-gossipsub') const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') const createNode = async () => { const node = await Libp2p.create({ @@ -43,29 +44,34 @@ const createNode = async () => { await node2.dial(node3.peerId) //subscribe - await node1.pubsub.subscribe(topic, (msg) => { - console.log(`node1 received: ${msg.data.toString()}`) + node1.pubsub.on(topic, (msg) => { + console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) + await node1.pubsub.subscribe(topic) - await node2.pubsub.subscribe(topic, (msg) => { - console.log(`node2 received: ${msg.data.toString()}`) + node2.pubsub.on(topic, (msg) => { + console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) + await node2.pubsub.subscribe(topic) - await node3.pubsub.subscribe(topic, (msg) => { - console.log(`node3 received: ${msg.data.toString()}`) + node3.pubsub.on(topic, (msg) => { + console.log(`node3 received: ${uint8ArrayToString(msg.data)}`) }) + await node3.pubsub.subscribe(topic) - const validateFruit = (msgTopic, peer, msg) => { - const fruit = msg.data.toString(); + const validateFruit = (msgTopic, msg) => { + const fruit = uint8ArrayToString(msg.data) const validFruit = ['banana', 'apple', 'orange'] - const valid = validFruit.includes(fruit); - return valid; + + if (!validFruit.includes(fruit)) { + throw new Error('no valid fruit received') + } } //validate fruit - node1.pubsub._pubsub.topicValidators.set(topic, validateFruit); - node2.pubsub._pubsub.topicValidators.set(topic, validateFruit); - node3.pubsub._pubsub.topicValidators.set(topic, validateFruit); + node1.pubsub.topicValidators.set(topic, validateFruit) + node2.pubsub.topicValidators.set(topic, validateFruit) + node3.pubsub.topicValidators.set(topic, validateFruit) // node1 publishes "fruits" every five seconds var count = 0; diff --git a/examples/pubsub/message-filtering/README.md b/examples/pubsub/message-filtering/README.md index b59b26a0..bff85c90 100644 --- a/examples/pubsub/message-filtering/README.md +++ b/examples/pubsub/message-filtering/README.md @@ -44,31 +44,36 @@ Now we' can subscribe to the fruit topic and log incoming messages. ```JavaScript const topic = 'fruit' -await node1.pubsub.subscribe(topic, (msg) => { - console.log(`node1 received: ${msg.data.toString()}`) +node1.pubsub.on(topic, (msg) => { + console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) }) +await node1.pubsub.subscribe(topic) -await node2.pubsub.subscribe(topic, (msg) => { - console.log(`node2 received: ${msg.data.toString()}`) +node2.pubsub.on(topic, (msg) => { + console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) }) +await node2.pubsub.subscribe(topic) -await node3.pubsub.subscribe(topic, (msg) => { - console.log(`node3 received: ${msg.data.toString()}`) +node3.pubsub.on(topic, (msg) => { + console.log(`node3 received: ${uint8ArrayToString(msg.data)}`) }) +await node3.pubsub.subscribe(topic) ``` Finally, let's define the additional filter in the fruit topic. ```JavaScript -const validateFruit = (msgTopic, peer, msg) => { - const fruit = msg.data.toString(); +const validateFruit = (msgTopic, msg) => { + const fruit = uint8ArrayToString(msg.data) const validFruit = ['banana', 'apple', 'orange'] - const valid = validFruit.includes(fruit); - return valid; + + if (!validFruit.includes(fruit)) { + throw new Error('no valid fruit received') + } } -node1.pubsub._pubsub.topicValidators.set(topic, validateFruit); -node2.pubsub._pubsub.topicValidators.set(topic, validateFruit); -node3.pubsub._pubsub.topicValidators.set(topic, validateFruit); +node1.pubsub.topicValidators.set(topic, validateFruit) +node2.pubsub.topicValidators.set(topic, validateFruit) +node3.pubsub.topicValidators.set(topic, validateFruit) ``` In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared. diff --git a/package.json b/package.json index 9d0bea10..783fc614 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,7 @@ "it-pipe": "^1.1.0", "it-protocol-buffers": "^0.2.0", "libp2p-crypto": "^0.18.0", - "libp2p-interfaces": "^0.5.0", + "libp2p-interfaces": "^0.5.1", "libp2p-utils": "^0.2.0", "mafmt": "^8.0.0", "merge-options": "^2.0.0", @@ -92,16 +92,17 @@ "cids": "^1.0.0", "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "interop-libp2p": "^0.2.0", + "interop-libp2p": "libp2p/interop#chore/gossipsub-1.1", "ipfs-http-client": "^46.0.0", "it-concat": "^1.0.0", "it-pair": "^1.0.0", "it-pushable": "^1.4.0", + "libp2p": ".", "libp2p-bootstrap": "^0.12.0", "libp2p-delegated-content-routing": "^0.6.0", "libp2p-delegated-peer-routing": "^0.6.0", - "libp2p-floodsub": "^0.22.0", - "libp2p-gossipsub": "^0.5.0", + "libp2p-floodsub": "^0.23.0", + "libp2p-gossipsub": "ChainSafe/js-libp2p-gossipsub#chore/use-libp2p-interfaces0.5.1-with-src", "libp2p-kad-dht": "^0.20.0", "libp2p-mdns": "^0.15.0", "libp2p-mplex": "^0.10.0", diff --git a/src/index.js b/src/index.js index febbc485..8ff4e0f7 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,6 @@ const PeerId = require('peer-id') const peerRouting = require('./peer-routing') const contentRouting = require('./content-routing') -const pubsub = require('./pubsub') const getPeer = require('./get-peer') const { validate: validateConfig } = require('./config') const { codes, messages } = require('./errors') @@ -25,6 +24,7 @@ const Metrics = require('./metrics') const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') const PeerStore = require('./peer-store') +const PubsubAdapter = require('./pubsub-adapter') const PersistentPeerStore = require('./peer-store/persistent') const Registrar = require('./registrar') const ping = require('./ping') @@ -185,9 +185,11 @@ class Libp2p extends EventEmitter { }) } - // start pubsub + // Create pubsub if provided if (this._modules.pubsub) { - this.pubsub = pubsub(this, this._modules.pubsub, this._config.pubsub) + const Pubsub = this._modules.pubsub + // using pubsub adapter with *DEPRECATED* handlers functionality + this.pubsub = PubsubAdapter(Pubsub, this, this._config.pubsub) } // Attach remaining APIs diff --git a/src/pubsub-adapter.js b/src/pubsub-adapter.js new file mode 100644 index 00000000..6927178c --- /dev/null +++ b/src/pubsub-adapter.js @@ -0,0 +1,40 @@ +'use strict' + +// Pubsub adapter to keep API with handlers while not removed. +module.exports = (PubsubRouter, libp2p, options) => { + class Pubsub extends PubsubRouter { + /** + * Subscribes to a given topic. + * @override + * @param {string} topic + * @param {function(msg: InMessage)} [handler] + * @returns {void} + */ + subscribe (topic, handler) { + // Bind provided handler + handler && this.on(topic, handler) + super.subscribe(topic) + } + + /** + * Unsubscribe from the given topic. + * @override + * @param {string} topic + * @param {function(msg: InMessage)} [handler] + * @returns {void} + */ + unsubscribe (topic, handler) { + if (!handler) { + this.removeAllListeners(topic) + } else { + this.removeListener(topic, handler) + } + + if (this.listenerCount(topic) === 0) { + super.unsubscribe(topic) + } + } + } + + return new Pubsub(libp2p, options) +} diff --git a/src/pubsub.js b/src/pubsub.js deleted file mode 100644 index f602408d..00000000 --- a/src/pubsub.js +++ /dev/null @@ -1,109 +0,0 @@ -'use strict' - -const errCode = require('err-code') -const { messages, codes } = require('./errors') -const uint8ArrayFromString = require('uint8arrays/from-string') - -module.exports = (node, Pubsub, config) => { - const pubsub = new Pubsub(node.peerId, node.registrar, config) - - return { - /** - * Subscribe the given handler to a pubsub topic - * @param {string} topic - * @param {function} handler The handler to subscribe - * @returns {void} - */ - subscribe: (topic, handler) => { - if (!node.isStarted() && !pubsub.started) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - } - - if (pubsub.listenerCount(topic) === 0) { - pubsub.subscribe(topic) - } - - pubsub.on(topic, handler) - }, - - /** - * Unsubscribes from a pubsub topic - * @param {string} topic - * @param {function} [handler] The handler to unsubscribe from - */ - unsubscribe: (topic, handler) => { - if (!node.isStarted() && !pubsub.started) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - } - - if (!handler) { - pubsub.removeAllListeners(topic) - } else { - pubsub.removeListener(topic, handler) - } - - if (pubsub.listenerCount(topic) === 0) { - pubsub.unsubscribe(topic) - } - }, - - /** - * Publish messages to the given topics. - * @param {Array|string} topic - * @param {Uint8Array} data - * @returns {Promise} - */ - publish: (topic, data) => { - if (!node.isStarted() && !pubsub.started) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - } - - if (typeof data === 'string' || data instanceof String) { - data = uint8ArrayFromString(data) - } - - try { - data = Uint8Array.from(data) - } catch (err) { - throw errCode(new Error('data must be convertible to a Uint8Array'), 'ERR_DATA_IS_NOT_VALID') - } - - return pubsub.publish(topic, data) - }, - - /** - * Get a list of topics the node is subscribed to. - * @returns {Array} topics - */ - getTopics: () => { - if (!node.isStarted() && !pubsub.started) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - } - - return pubsub.getTopics() - }, - - /** - * Get a list of the peer-ids that are subscribed to one topic. - * @param {string} topic - * @returns {Array} - */ - getSubscribers: (topic) => { - if (!node.isStarted() && !pubsub.started) { - throw errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED) - } - - return pubsub.getSubscribers(topic) - }, - - setMaxListeners (n) { - return pubsub.setMaxListeners(n) - }, - - _pubsub: pubsub, - - start: () => pubsub.start(), - - stop: () => pubsub.stop() - } -} diff --git a/test/pubsub/configuration.node.js b/test/pubsub/configuration.node.js index aeb49f28..ed8cd90c 100644 --- a/test/pubsub/configuration.node.js +++ b/test/pubsub/configuration.node.js @@ -42,13 +42,13 @@ describe('Pubsub subsystem is configurable', () => { }) libp2p = await create(customOptions) - expect(libp2p.pubsub._pubsub.started).to.equal(false) + expect(libp2p.pubsub.started).to.equal(false) await libp2p.start() - expect(libp2p.pubsub._pubsub.started).to.equal(true) + expect(libp2p.pubsub.started).to.equal(true) await libp2p.stop() - expect(libp2p.pubsub._pubsub.started).to.equal(false) + expect(libp2p.pubsub.started).to.equal(false) }) it('should not start if disabled once libp2p starts', async () => { @@ -67,10 +67,10 @@ describe('Pubsub subsystem is configurable', () => { }) libp2p = await create(customOptions) - expect(libp2p.pubsub._pubsub.started).to.equal(false) + expect(libp2p.pubsub.started).to.equal(false) await libp2p.start() - expect(libp2p.pubsub._pubsub.started).to.equal(false) + expect(libp2p.pubsub.started).to.equal(false) }) it('should allow a manual start', async () => { @@ -90,9 +90,9 @@ describe('Pubsub subsystem is configurable', () => { libp2p = await create(customOptions) await libp2p.start() - expect(libp2p.pubsub._pubsub.started).to.equal(false) + expect(libp2p.pubsub.started).to.equal(false) await libp2p.pubsub.start() - expect(libp2p.pubsub._pubsub.started).to.equal(true) + expect(libp2p.pubsub.started).to.equal(true) }) }) diff --git a/test/pubsub/operation.node.js b/test/pubsub/operation.node.js index bf169cb3..450f614b 100644 --- a/test/pubsub/operation.node.js +++ b/test/pubsub/operation.node.js @@ -66,8 +66,8 @@ describe('Pubsub subsystem operates correctly', () => { expect(connection).to.exist() return Promise.all([ - pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + pWaitFor(() => libp2p.pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) ]) }) @@ -141,14 +141,14 @@ describe('Pubsub subsystem operates correctly', () => { const connection = await libp2p.dial(remotePeerId) expect(connection).to.exist() - expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0) - expect(remoteLibp2p.pubsub._pubsub.peers.size).to.be.eql(0) + expect(libp2p.pubsub.peers.size).to.be.eql(0) + expect(remoteLibp2p.pubsub.peers.size).to.be.eql(0) remoteLibp2p.pubsub.start() return Promise.all([ - pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + pWaitFor(() => libp2p.pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) ]) }) @@ -164,8 +164,8 @@ describe('Pubsub subsystem operates correctly', () => { remoteLibp2p.pubsub.start() await Promise.all([ - pWaitFor(() => libp2p.pubsub._pubsub.peers.size === 1), - pWaitFor(() => remoteLibp2p.pubsub._pubsub.peers.size === 1) + pWaitFor(() => libp2p.pubsub.peers.size === 1), + pWaitFor(() => remoteLibp2p.pubsub.peers.size === 1) ]) let subscribedTopics = libp2p.pubsub.getTopics()