diff --git a/package.json b/package.json index f0db6ef..0b43aa5 100644 --- a/package.json +++ b/package.json @@ -38,19 +38,25 @@ "chai": "^4.2.0", "chai-checkmark": "^1.0.1", "class-is": "^1.1.0", + "debug": "^4.1.1", "delay": "^4.3.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", "err-code": "^2.0.0", "it-goodbye": "^2.0.1", + "it-length-prefixed": "^3.1.0", "it-pair": "^1.0.0", "it-pipe": "^1.1.0", + "it-pushable": "^1.4.0", + "libp2p-crypto": "^0.18.0", "libp2p-tcp": "^0.15.0", "multiaddr": "^8.0.0", + "multibase": "^3.0.0", "p-defer": "^3.0.0", "p-limit": "^2.3.0", "p-wait-for": "^3.1.0", "peer-id": "^0.14.0", + "protons": "^2.0.0", "sinon": "^9.0.2", "streaming-iterables": "^5.0.2", "uint8arrays": "^1.1.0" diff --git a/src/pubsub/README.md b/src/pubsub/README.md new file mode 100644 index 0000000..2e7df62 --- /dev/null +++ b/src/pubsub/README.md @@ -0,0 +1,236 @@ +interface-pubsub +================== + +The `interface-pubsub` contains the base implementation for a libp2p pubsub router implementation. This interface should be used to implement a pubsub router compatible with libp2p. It includes a test suite that pubsub routers should run, in order to ensure compatibility with libp2p. + +Table of Contents +================= + +* [Implementations using this interface](#implementations-using-this-interface) +* [Interface usage](#interface-usage) + * [Extend interface](#extend-interface) + * [Example](#example) +* [API](#api) + * [Start](#start) + * [pubsub.start()](#pubsubstart) + * [Returns](#returns) + * [Stop](#stop) + * [pubsub.stop()](#pubsubstop) + * [Returns](#returns-1) + * [Publish](#publish) + * [pubsub.publish(topics, message)](#pubsubpublishtopics-message) + * [Parameters](#parameters) + * [Returns](#returns-2) + * [Subscribe](#subscribe) + * [pubsub.subscribe(topic)](#pubsubsubscribetopic) + * [Parameters](#parameters-1) + * [Unsubscribe](#unsubscribe) + * [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic) + * [Parameters](#parameters-2) + * [Get Topics](#get-topics) + * [pubsub.getTopics()](#pubsubgettopics) + * [Returns](#returns-3) + * [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic) + * [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic) + * [Parameters](#parameters-3) + * [Returns](#returns-4) + * [Validate](#validate) + * [pubsub.validate(message)](#pubsubvalidatemessage) + * [Parameters](#parameters-4) + * [Returns](#returns-5) +* [Test suite usage](#test-suite-usage) + +## Implementations using this interface + +You can check the following implementations as examples for building your own pubsub router. + +- [libp2p/js-libp2p-floodsub](https://github.com/libp2p/js-libp2p-floodsub) +- [ChainSafe/js-libp2p-gossipsub](https://github.com/ChainSafe/js-libp2p-gossipsub) + +## Interface usage + +`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it. + +### Extend interface + +A pubsub router implementation should start by extending the `interface-pubsub` class and **MUST** override the `_publish` function, according to the router algorithms. This function is responsible for forwarding publish messages to other peers, as well as forwarding received messages if the router provides the `canRelayMessage` option to the base implementation. + +Other functions, such as `start`, `stop`, `subscribe`, `unsubscribe`, `_encodeRpc`, `_decodeRpc`, `_processRpcMessage`, `_addPeer` and `_removePeer` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding these functions **MUST** call `super`. + +The `start` and `stop` functions are responsible for the registration of the pubsub protocol with libp2p. The `stop` function also guarantees that the open streams in the protocol are properly closed. + +The `subscribe` and `unsubscribe` functions take care of the subscription management and its inherent message propagation. + +When using a custom protobuf definition for message marshalling, you should override `_encodeRpc` and `_decodeRpc` to use the new protobuf instead of the default one. + +`_processRpcMessage` is responsible for handling messages received from other peers. This should be extended if further operations/validations are needed by the router. + +The `_addPeer` and `_removePeer` functions are called when new peers running the pubsub router protocol establish a connection with the peer. They are used for tracking the open streams between the peers. + +All the remaining functions **MUST NOT** be overwritten. + +### Example + +The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. + +```JavaScript +const Pubsub = require('libp2p-pubsub') + +class PubsubImplementation extends Pubsub { + constructor({ libp2p, options }) + super({ + debugName: 'libp2p:pubsub', + multicodecs: '/pubsub-implementation/1.0.0', + libp2p, + signMessages: options.signMessages, + strictSigning: options.strictSigning + }) + } + + _publish (message) { + // Required to be implemented by the subclass + // Routing logic for the message + } +} +``` + +## API + +The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message. + +### Start + +Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`. + +#### `pubsub.start()` + +### Stop + +Stops the pubsub subsystem. The protocol will be unregistered from `libp2p`, which will remove all listeners for the protocol and the established connections will be closed. + +#### `pubsub.stop()` + +### Publish + +Publish data message to pubsub topics. + +#### `pubsub.publish(topic, message)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | +| message | `Uint8Array` | message to publish | + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once the message is published to the network | + +### Subscribe + +Subscribe to the given topic. + +#### `pubsub.subscribe(topic)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | + +### Unsubscribe + +Unsubscribe from the given topic. + +#### `pubsub.unsubscribe(topic)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | + +### Get Topics + +Get the list of topics which the peer is subscribed to. + +#### `pubsub.getTopics()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of subscribed topics | + +### Get Peers Subscribed to a topic + +Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that are subscribed to one topic. + +#### `pubsub.getSubscribers(topic)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of base-58 PeerId's | + +### Validate + +Validates the signature of a message. + +#### `pubsub.validate(message)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| message | `Message` | a pubsub message | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves if the message is valid | + +## Test suite usage + +```js +'use strict' + +const tests = require('libp2p-interfaces/src/pubsub/tests') +const YourPubsubRouter = require('../src') + +describe('compliance', () => { + let peers + let pubsubNodes = [] + + tests({ + async setup (number = 1, options = {}) { + // Create number pubsub nodes with libp2p + peers = await createPeers({ number }) + + peers.forEach((peer) => { + const ps = new YourPubsubRouter(peer, options) + + pubsubNodes.push(ps) + }) + + return pubsubNodes + }, + async teardown () { + // Clean up any resources created by setup() + await Promise.all(pubsubNodes.map(ps => ps.stop())) + peers.length && await Promise.all(peers.map(peer => peer.stop())) + } + }) +}) +``` diff --git a/src/pubsub/errors.js b/src/pubsub/errors.js new file mode 100644 index 0000000..24f0bbf --- /dev/null +++ b/src/pubsub/errors.js @@ -0,0 +1,6 @@ +'use strict' + +exports.codes = { + ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE', + ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE' +} diff --git a/src/pubsub/index.js b/src/pubsub/index.js new file mode 100644 index 0000000..db878e9 --- /dev/null +++ b/src/pubsub/index.js @@ -0,0 +1,654 @@ +'use strict' + +const debug = require('debug') +const EventEmitter = require('events') +const errcode = require('err-code') + +const pipe = require('it-pipe') + +const MulticodecTopology = require('../topology/multicodec-topology') +const { codes } = require('./errors') +const message = require('./message') +const PeerStreams = require('./peer-streams') +const utils = require('./utils') +const { + signMessage, + verifySignature +} = require('./message/sign') + +/** + * @typedef {Object} InMessage + * @property {string} from + * @property {string} receivedFrom + * @property {string[]} topicIDs + * @property {Uint8Array} data + * @property {Uint8Array} [signature] + * @property {Uint8Array} [key] + */ + +/** +* PubsubBaseProtocol handles the peers and connections logic for pubsub routers +* and specifies the API that pubsub routers should have. +*/ +class PubsubBaseProtocol extends EventEmitter { + /** + * @param {Object} props + * @param {String} props.debugName log namespace + * @param {Array|string} props.multicodecs protocol identificers to connect + * @param {Libp2p} props.libp2p + * @param {boolean} [props.signMessages = true] if messages should be signed + * @param {boolean} [props.strictSigning = true] if message signing should be required + * @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed + * @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed + * @abstract + */ + constructor ({ + debugName, + multicodecs, + libp2p, + signMessages = true, + strictSigning = true, + canRelayMessage = false, + emitSelf = false + }) { + if (typeof debugName !== 'string') { + throw new Error('a debugname `string` is required') + } + + if (!multicodecs) { + throw new Error('multicodecs are required') + } + + if (!libp2p) { + throw new Error('libp2p is required') + } + + super() + + this.log = debug(debugName) + this.log.err = debug(`${debugName}:error`) + + this.multicodecs = utils.ensureArray(multicodecs) + this._libp2p = libp2p + this.registrar = libp2p.registrar + this.peerId = libp2p.peerId + + this.started = false + + /** + * Map of topics to which peers are subscribed to + * + * @type {Map>} + */ + this.topics = new Map() + + /** + * List of our subscriptions + * @type {Set} + */ + this.subscriptions = new Set() + + /** + * Map of peer streams + * + * @type {Map} + */ + this.peers = new Map() + + // Message signing + this.signMessages = signMessages + + /** + * If message signing should be required for incoming messages + * @type {boolean} + */ + this.strictSigning = strictSigning + + /** + * If router can relay received messages, even if not subscribed + * @type {boolean} + */ + this.canRelayMessage = canRelayMessage + + /** + * if publish should emit to self, if subscribed + * @type {boolean} + */ + this.emitSelf = emitSelf + + /** + * Topic validator function + * @typedef {function(string, RPC): boolean} validator + */ + /** + * Topic validator map + * + * Keyed by topic + * Topic validators are functions with the following input: + * @type {Map} + */ + this.topicValidators = new Map() + + this._registrarId = undefined + this._onIncomingStream = this._onIncomingStream.bind(this) + this._onPeerConnected = this._onPeerConnected.bind(this) + this._onPeerDisconnected = this._onPeerDisconnected.bind(this) + } + + // LIFECYCLE METHODS + + /** + * Register the pubsub protocol onto the libp2p node. + * @returns {void} + */ + start () { + if (this.started) { + return + } + this.log('starting') + + // Incoming streams + // Called after a peer dials us + this.registrar.handle(this.multicodecs, this._onIncomingStream) + + // register protocol with topology + // Topology callbacks called on connection manager changes + const topology = new MulticodecTopology({ + multicodecs: this.multicodecs, + handlers: { + onConnect: this._onPeerConnected, + onDisconnect: this._onPeerDisconnected + } + }) + this._registrarId = this.registrar.register(topology) + + this.log('started') + this.started = true + } + + /** + * Unregister the pubsub protocol and the streams with other peers will be closed. + * @returns {void} + */ + stop () { + if (!this.started) { + return + } + + // unregister protocol and handlers + this.registrar.unregister(this._registrarId) + + this.log('stopping') + this.peers.forEach((peerStreams) => peerStreams.close()) + + this.peers = new Map() + this.subscriptions = new Set() + this.started = false + this.log('stopped') + } + + /** + * On an inbound stream opened. + * @private + * @param {Object} props + * @param {string} props.protocol + * @param {DuplexIterableStream} props.stream + * @param {Connection} props.connection connection + */ + _onIncomingStream ({ protocol, stream, connection }) { + const peerId = connection.remotePeer + const idB58Str = peerId.toB58String() + const peer = this._addPeer(peerId, protocol) + peer.attachInboundStream(stream) + + this._processMessages(idB58Str, peer.inboundStream, peer) + } + + /** + * Registrar notifies an established connection with pubsub protocol. + * @private + * @param {PeerId} peerId remote peer-id + * @param {Connection} conn connection to the peer + */ + async _onPeerConnected (peerId, conn) { + const idB58Str = peerId.toB58String() + this.log('connected', idB58Str) + + try { + const { stream, protocol } = await conn.newStream(this.multicodecs) + const peer = this._addPeer(peerId, protocol) + await peer.attachOutboundStream(stream) + } catch (err) { + this.log.err(err) + } + + // Immediately send my own subscriptions to the newly established conn + this._sendSubscriptions(idB58Str, Array.from(this.subscriptions), true) + } + + /** + * Registrar notifies a closing connection with pubsub protocol. + * @private + * @param {PeerId} peerId peerId + * @param {Error} err error for connection end + */ + _onPeerDisconnected (peerId, err) { + const idB58Str = peerId.toB58String() + + this.log('connection ended', idB58Str, err ? err.message : '') + this._removePeer(peerId) + } + + /** + * Notifies the router that a peer has been connected + * @private + * @param {PeerId} peerId + * @param {string} protocol + * @returns {PeerStreams} + */ + _addPeer (peerId, protocol) { + const id = peerId.toB58String() + const existing = this.peers.get(id) + + // If peer streams already exists, do nothing + if (existing) { + return existing + } + + // else create a new peer streams + this.log('new peer', id) + + const peerStreams = new PeerStreams({ + id: peerId, + protocol + }) + + this.peers.set(id, peerStreams) + peerStreams.once('close', () => this._removePeer(peerId)) + + return peerStreams + } + + /** + * Notifies the router that a peer has been disconnected. + * @private + * @param {PeerId} peerId + * @returns {PeerStreams | undefined} + */ + _removePeer (peerId) { + if (!peerId) return + const id = peerId.toB58String() + const peerStreams = this.peers.get(id) + if (!peerStreams) return + + // close peer streams + peerStreams.removeAllListeners() + peerStreams.close() + + // delete peer streams + this.log('delete peer', id) + this.peers.delete(id) + + // remove peer from topics map + for (const peers of this.topics.values()) { + peers.delete(id) + } + + return peerStreams + } + + // MESSAGE METHODS + + /** + * Responsible for processing each RPC message received by other peers. + * @param {string} idB58Str peer id string in base58 + * @param {DuplexIterableStream} stream inbound stream + * @param {PeerStreams} peerStreams PubSub peer + * @returns {Promise} + */ + async _processMessages (idB58Str, stream, peerStreams) { + try { + await pipe( + stream, + async (source) => { + for await (const data of source) { + const rpcBytes = data instanceof Uint8Array ? data : data.slice() + const rpcMsg = this._decodeRpc(rpcBytes) + + this._processRpc(idB58Str, peerStreams, rpcMsg) + } + } + ) + } catch (err) { + this._onPeerDisconnected(peerStreams.id, err) + } + } + + /** + * Handles an rpc request from a peer + * @param {String} idB58Str + * @param {PeerStreams} peerStreams + * @param {RPC} rpc + * @returns {boolean} + */ + _processRpc (idB58Str, peerStreams, rpc) { + this.log('rpc from', idB58Str) + const subs = rpc.subscriptions + const msgs = rpc.msgs + + if (subs.length) { + // update peer subscriptions + subs.forEach((subOpt) => this._processRpcSubOpt(idB58Str, subOpt)) + this.emit('pubsub:subscription-change', peerStreams.id, subs) + } + + if (!this._acceptFrom(idB58Str)) { + this.log('received message from unacceptable peer %s', idB58Str) + return false + } + + if (msgs.length) { + msgs.forEach(message => { + if (!(this.canRelayMessage || message.topicIDs.some((topic) => this.subscriptions.has(topic)))) { + this.log('received message we didn\'t subscribe to. Dropping.') + return + } + const msg = utils.normalizeInRpcMessage(message, idB58Str) + this._processRpcMessage(msg) + }) + } + return true + } + + /** + * Handles a subscription change from a peer + * @param {string} id + * @param {RPC.SubOpt} subOpt + */ + _processRpcSubOpt (id, subOpt) { + const t = subOpt.topicID + + let topicSet = this.topics.get(t) + if (!topicSet) { + topicSet = new Set() + this.topics.set(t, topicSet) + } + + if (subOpt.subscribe) { + // subscribe peer to new topic + topicSet.add(id) + } else { + // unsubscribe from existing topic + topicSet.delete(id) + } + } + + /** + * Handles an message from a peer + * @param {InMessage} msg + * @returns {Promise} + */ + async _processRpcMessage (msg) { + if (this.peerId.toB58String() === msg.from && !this.emitSelf) { + return + } + + // Ensure the message is valid before processing it + try { + await this.validate(msg) + } catch (err) { + this.log('Message is invalid, dropping it. %O', err) + return + } + + // Emit to self + this._emitMessage(msg) + + this._publish(utils.normalizeOutRpcMessage(msg)) + } + + /** + * Emit a message from a peer + * @param {InMessage} message + */ + _emitMessage (message) { + message.topicIDs.forEach((topic) => { + if (this.subscriptions.has(topic)) { + this.emit(topic, message) + } + }) + } + + /** + * The default msgID implementation + * Child class can override this. + * @param {RPC.Message} msg the message object + * @returns {string} message id as string + */ + getMsgId (msg) { + return utils.msgId(msg.from, msg.seqno) + } + + /** + * Whether to accept a message from a peer + * Override to create a graylist + * @override + * @param {string} id + * @returns {boolean} + */ + _acceptFrom (id) { + return true + } + + /** + * Decode Uint8Array into an RPC object. + * This can be override to use a custom router protobuf. + * @param {Uint8Array} bytes + * @returns {RPC} + */ + _decodeRpc (bytes) { + return message.rpc.RPC.decode(bytes) + } + + /** + * Encode RPC object into a Uint8Array. + * This can be override to use a custom router protobuf. + * @param {RPC} rpc + * @returns {Uint8Array} + */ + _encodeRpc (rpc) { + return message.rpc.RPC.encode(rpc) + } + + /** + * Send an rpc object to a peer + * @param {string} id peer id + * @param {RPC} rpc + * @returns {void} + */ + _sendRpc (id, rpc) { + const peerStreams = this.peers.get(id) + if (!peerStreams || !peerStreams.isWritable) { + const msg = `Cannot send RPC to ${id} as there is no open stream to it available` + + this.log.err(msg) + return + } + peerStreams.write(this._encodeRpc(rpc)) + } + + /** + * Send subscroptions to a peer + * @param {string} id peer id + * @param {string[]} topics + * @param {boolean} subscribe set to false for unsubscriptions + * @returns {void} + */ + _sendSubscriptions (id, topics, subscribe) { + return this._sendRpc(id, { + subscriptions: topics.map(t => ({ topicID: t, subscribe: subscribe })) + }) + } + + /** + * Validates the given message. The signature will be checked for authenticity. + * Throws an error on invalid messages + * @param {InMessage} message + * @returns {Promise} + */ + async validate (message) { // eslint-disable-line require-await + // If strict signing is on and we have no signature, abort + if (this.strictSigning && !message.signature) { + throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE) + } + + // Check the message signature if present + if (message.signature && !(await verifySignature(message))) { + throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE) + } + + for (const topic of message.topicIDs) { + const validatorFn = this.topicValidators.get(topic) + if (!validatorFn) { + continue + } + await validatorFn(topic, message) + } + } + + /** + * Normalizes the message and signs it, if signing is enabled. + * Should be used by the routers to create the message to send. + * @private + * @param {Message} message + * @returns {Promise} + */ + _buildMessage (message) { + const msg = utils.normalizeOutRpcMessage(message) + if (this.signMessages) { + return signMessage(this.peerId, msg) + } else { + return message + } + } + + // API METHODS + + /** + * Get a list of the peer-ids that are subscribed to one topic. + * @param {string} topic + * @returns {Array} + */ + getSubscribers (topic) { + if (!this.started) { + throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') + } + + if (!topic || typeof topic !== 'string') { + throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') + } + + const peersInTopic = this.topics.get(topic) + if (!peersInTopic) { + return [] + } + return Array.from(peersInTopic) + } + + /** + * Publishes messages to all subscribed peers + * @override + * @param {string} topic + * @param {Buffer} message + * @returns {Promise} + */ + async publish (topic, message) { + if (!this.started) { + throw new Error('Pubsub has not started') + } + + this.log('publish', topic, message) + + const from = this.peerId.toB58String() + let msgObject = { + receivedFrom: from, + from: from, + data: message, + seqno: utils.randomSeqno(), + topicIDs: [topic] + } + + // ensure that any operations performed on the message will include the signature + const outMsg = await this._buildMessage(msgObject) + msgObject = utils.normalizeInRpcMessage(outMsg) + + // Emit to self if I'm interested and emitSelf enabled + this.emitSelf && this._emitMessage(msgObject) + + // send to all the other peers + await this._publish(msgObject) + } + + /** + * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. + * For example, a Floodsub implementation might simply publish each message to each topic for every peer + * @abstract + * @param {InMessage} message + * @returns {Promise} + * + */ + _publish (message) { + throw errcode(new Error('publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') + } + + /** + * Subscribes to a given topic. + * @abstract + * @param {string} topic + * @returns {void} + */ + subscribe (topic) { + if (!this.started) { + throw new Error('Pubsub has not started') + } + + if (!this.subscriptions.has(topic)) { + this.subscriptions.add(topic) + this.peers.forEach((_, id) => this._sendSubscriptions(id, [topic], true)) + } + } + + /** + * Unsubscribe from the given topic. + * @override + * @param {string} topic + * @returns {void} + */ + unsubscribe (topic) { + if (!this.started) { + throw new Error('Pubsub is not started') + } + + if (this.subscriptions.has(topic) && this.listenerCount(topic) === 0) { + this.subscriptions.delete(topic) + this.peers.forEach((_, id) => this._sendSubscriptions(id, [topic], false)) + } + } + + /** + * Get the list of topics which the peer is subscribed to. + * @override + * @returns {Array} + */ + getTopics () { + if (!this.started) { + throw new Error('Pubsub is not started') + } + + return Array.from(this.subscriptions) + } +} + +module.exports = PubsubBaseProtocol +module.exports.message = message +module.exports.utils = utils diff --git a/src/pubsub/message/index.js b/src/pubsub/message/index.js new file mode 100644 index 0000000..320ab4c --- /dev/null +++ b/src/pubsub/message/index.js @@ -0,0 +1,14 @@ +'use strict' + +const protons = require('protons') + +const rpcProto = protons(require('./rpc.proto.js')) +const RPC = rpcProto.RPC +const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) + +exports = module.exports +exports.rpc = rpcProto +exports.td = topicDescriptorProto +exports.RPC = RPC +exports.Message = RPC.Message +exports.SubOpts = RPC.SubOpts diff --git a/src/pubsub/message/rpc.proto.js b/src/pubsub/message/rpc.proto.js new file mode 100644 index 0000000..88b1f83 --- /dev/null +++ b/src/pubsub/message/rpc.proto.js @@ -0,0 +1,20 @@ +'use strict' +module.exports = ` +message RPC { + repeated SubOpts subscriptions = 1; + repeated Message msgs = 2; + + message SubOpts { + optional bool subscribe = 1; // subscribe or unsubcribe + optional string topicID = 2; + } + + message Message { + optional bytes from = 1; + optional bytes data = 2; + optional bytes seqno = 3; + repeated string topicIDs = 4; + optional bytes signature = 5; + optional bytes key = 6; + } +}` diff --git a/src/pubsub/message/sign.js b/src/pubsub/message/sign.js new file mode 100644 index 0000000..dbb2392 --- /dev/null +++ b/src/pubsub/message/sign.js @@ -0,0 +1,85 @@ +'use strict' + +const PeerId = require('peer-id') +const { Message } = require('./index') +const uint8ArrayConcat = require('uint8arrays/concat') +const uint8ArrayFromString = require('uint8arrays/from-string') +const SignPrefix = uint8ArrayFromString('libp2p-pubsub:') + +/** + * Signs the provided message with the given `peerId` + * + * @param {PeerId} peerId + * @param {Message} message + * @returns {Promise} + */ +async function signMessage (peerId, message) { + // Get the message in bytes, and prepend with the pubsub prefix + const bytes = uint8ArrayConcat([ + SignPrefix, + Message.encode(message) + ]) + + const signature = await peerId.privKey.sign(bytes) + + return { + ...message, + signature: signature, + key: peerId.pubKey.bytes + } +} + +/** + * Verifies the signature of the given message + * @param {InMessage} message + * @returns {Promise} + */ +async function verifySignature (message) { + // Get message sans the signature + const baseMessage = { ...message } + delete baseMessage.signature + delete baseMessage.key + baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes() + const bytes = uint8ArrayConcat([ + SignPrefix, + Message.encode(baseMessage) + ]) + + // Get the public key + const pubKey = await messagePublicKey(message) + + // verify the base message + return pubKey.verify(bytes, message.signature) +} + +/** + * Returns the PublicKey associated with the given message. + * If no, valid PublicKey can be retrieved an error will be returned. + * + * @param {InMessage} message + * @returns {Promise} + */ +async function messagePublicKey (message) { + // should be available in the from property of the message (peer id) + const from = PeerId.createFromCID(message.from) + + if (message.key) { + const keyPeerId = await PeerId.createFromPubKey(message.key) + + // the key belongs to the sender, return the key + if (keyPeerId.isEqual(from)) return keyPeerId.pubKey + // We couldn't validate pubkey is from the originator, error + throw new Error('Public Key does not match the originator') + } else if (from.pubKey) { + return from.pubKey + } else { + throw new Error('Could not get the public key from the originator id') + } +} + +module.exports = { + messagePublicKey, + signMessage, + SignPrefix, + verifySignature +} diff --git a/src/pubsub/message/topic-descriptor.proto.js b/src/pubsub/message/topic-descriptor.proto.js new file mode 100644 index 0000000..6e829ca --- /dev/null +++ b/src/pubsub/message/topic-descriptor.proto.js @@ -0,0 +1,30 @@ +'use strict' +module.exports = ` +// topicCID = cid(merkledag_protobuf(topicDescriptor)); (not the topic.name) +message TopicDescriptor { + optional string name = 1; + optional AuthOpts auth = 2; + optional EncOpts enc = 2; + + message AuthOpts { + optional AuthMode mode = 1; + repeated bytes keys = 2; // root keys to trust + + enum AuthMode { + NONE = 0; // no authentication, anyone can publish + KEY = 1; // only messages signed by keys in the topic descriptor are accepted + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } + + message EncOpts { + optional EncMode mode = 1; + repeated bytes keyHashes = 2; // the hashes of the shared keys used (salted) + + enum EncMode { + NONE = 0; // no encryption, anyone can read + SHAREDKEY = 1; // messages are encrypted with shared key + WOT = 2; // web of trust, certificates can allow publisher set to grow + } + } +}` diff --git a/src/pubsub/peer-streams.js b/src/pubsub/peer-streams.js new file mode 100644 index 0000000..8877762 --- /dev/null +++ b/src/pubsub/peer-streams.js @@ -0,0 +1,188 @@ +'use strict' + +const EventEmitter = require('events') + +const lp = require('it-length-prefixed') +const pushable = require('it-pushable') +const pipe = require('it-pipe') +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') +const debug = require('debug') + +const log = debug('libp2p-pubsub:peer-streams') +log.error = debug('libp2p-pubsub:peer-streams:error') + +/** + * Thin wrapper around a peer's inbound / outbound pubsub streams + */ +class PeerStreams extends EventEmitter { + /** + * @param {PeerId} id + * @param {string} protocol + */ + constructor ({ id, protocol }) { + super() + + /** + * @type {PeerId} + */ + this.id = id + /** + * Established protocol + * @type {string} + */ + this.protocol = protocol + /** + * The raw outbound stream, as retrieved from conn.newStream + * @private + * @type {DuplexIterableStream} + */ + this._rawOutboundStream = null + /** + * The raw inbound stream, as retrieved from the callback from libp2p.handle + * @private + * @type {DuplexIterableStream} + */ + this._rawInboundStream = null + /** + * An AbortController for controlled shutdown of the inbound stream + * @private + * @type {AbortController} + */ + this._inboundAbortController = null + /** + * Write stream -- its preferable to use the write method + * @type {Pushable} + */ + this.outboundStream = null + /** + * Read stream + * @type {DuplexIterableStream} + */ + this.inboundStream = null + } + + /** + * Do we have a connection to read from? + * + * @type {boolean} + */ + get isReadable () { + return Boolean(this.inboundStream) + } + + /** + * Do we have a connection to write on? + * + * @type {boolean} + */ + get isWritable () { + return Boolean(this.outboundStream) + } + + /** + * Send a message to this peer. + * Throws if there is no `stream` to write to available. + * + * @param {Uint8Array} data + * @returns {undefined} + */ + write (data) { + if (!this.isWritable) { + const id = this.id.toB58String() + throw new Error('No writable connection to ' + id) + } + + this.outboundStream.push(data) + } + + /** + * Attach a raw inbound stream and setup a read stream + * + * @param {DuplexIterableStream} stream + * @returns {void} + */ + attachInboundStream (stream) { + // Create and attach a new inbound stream + // The inbound stream is: + // - abortable, set to only return on abort, rather than throw + // - transformed with length-prefix transform + this._inboundAbortController = new AbortController() + this._rawInboundStream = stream + this.inboundStream = abortable( + pipe( + this._rawInboundStream, + lp.decode() + ), + this._inboundAbortController.signal, + { returnOnAbort: true } + ) + + this.emit('stream:inbound') + } + + /** + * Attach a raw outbound stream and setup a write stream + * + * @param {Stream} stream + * @returns {Promise} + */ + async attachOutboundStream (stream) { + // If an outbound stream already exists, + // gently close it + const _prevStream = this.outboundStream + if (_prevStream) { + // End the stream without emitting a close event + await this.outboundStream.end(false) + } + + this._rawOutboundStream = stream + this.outboundStream = pushable({ + onEnd: (shouldEmit) => { + // close writable side of the stream + this._rawOutboundStream.reset && this._rawOutboundStream.reset() + this._rawOutboundStream = null + this.outboundStream = null + if (shouldEmit !== false) { + this.emit('close') + } + } + }) + + pipe( + this.outboundStream, + lp.encode(), + this._rawOutboundStream + ).catch(err => { + log.error(err) + }) + + // Only emit if the connection is new + if (!_prevStream) { + this.emit('stream:outbound') + } + } + + /** + * Closes the open connection to peer + * @returns {void} + */ + close () { + // End the outbound stream + if (this.outboundStream) { + this.outboundStream.end() + } + // End the inbound stream + if (this.inboundStream) { + this._inboundAbortController.abort() + } + + this._rawOutboundStream = null + this.outboundStream = null + this._rawInboundStream = null + this.inboundStream = null + this.emit('close') + } +} + +module.exports = PeerStreams diff --git a/src/pubsub/tests/api.js b/src/pubsub/tests/api.js new file mode 100644 index 0000000..9e081ef --- /dev/null +++ b/src/pubsub/tests/api.js @@ -0,0 +1,93 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const { expect } = chai +const sinon = require('sinon') + +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayFromString = require('uint8arrays/from-string') + +const topic = 'foo' +const data = uint8ArrayFromString('bar') + +module.exports = (common) => { + describe('pubsub api', () => { + let pubsub + + // Create pubsub router + beforeEach(async () => { + [pubsub] = await common.setup(1) + }) + + afterEach(async () => { + sinon.restore() + pubsub && pubsub.stop() + await common.teardown() + }) + + it('can start correctly', () => { + sinon.spy(pubsub.registrar, '_handle') + sinon.spy(pubsub.registrar, 'register') + + pubsub.start() + + expect(pubsub.started).to.eql(true) + expect(pubsub.registrar._handle.callCount).to.eql(1) + expect(pubsub.registrar.register.callCount).to.eql(1) + }) + + it('can stop correctly', () => { + sinon.spy(pubsub.registrar, 'unregister') + + pubsub.start() + pubsub.stop() + + expect(pubsub.started).to.eql(false) + expect(pubsub.registrar.unregister.callCount).to.eql(1) + }) + + it('can subscribe and unsubscribe correctly', async () => { + const handler = () => { + throw new Error('a message should not be received') + } + + pubsub.start() + pubsub.subscribe(topic) + pubsub.on('topic', handler) + + await pWaitFor(() => { + const topics = pubsub.getTopics() + return topics.length === 1 && topics[0] === topic + }) + + pubsub.unsubscribe(topic) + + await pWaitFor(() => !pubsub.getTopics().length) + + // Publish to guarantee the handler is not called + await pubsub.publish(topic, data) + + pubsub.stop() + }) + + it('can subscribe and publish correctly', async () => { + const defer = pDefer() + + const handler = (msg) => { + expect(msg).to.exist() + defer.resolve() + } + + pubsub.start() + + pubsub.subscribe(topic) + pubsub.on(topic, handler) + await pubsub.publish(topic, data) + await defer.promise + + pubsub.stop() + }) + }) +} diff --git a/src/pubsub/tests/emit-self.js b/src/pubsub/tests/emit-self.js new file mode 100644 index 0000000..856fa78 --- /dev/null +++ b/src/pubsub/tests/emit-self.js @@ -0,0 +1,69 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const { expect } = chai +const sinon = require('sinon') + +const uint8ArrayFromString = require('uint8arrays/from-string') + +const topic = 'foo' +const data = uint8ArrayFromString('bar') +const shouldNotHappen = (_) => expect.fail() + +module.exports = (common) => { + describe('emit self', () => { + let pubsub + + describe('enabled', () => { + before(async () => { + [pubsub] = await common.setup(1, { emitSelf: true }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(async () => { + sinon.restore() + pubsub && pubsub.stop() + await common.teardown() + }) + + it('should emit to self on publish', () => { + const promise = new Promise((resolve) => pubsub.once(topic, resolve)) + + pubsub.publish(topic, data) + + return promise + }) + }) + + describe('disabled', () => { + before(async () => { + [pubsub] = await common.setup(1, { emitSelf: false }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(async () => { + sinon.restore() + pubsub && pubsub.stop() + await common.teardown() + }) + + it('should not emit to self on publish', () => { + pubsub.once(topic, (m) => shouldNotHappen) + + pubsub.publish(topic, data) + + // Wait 1 second to guarantee that self is not noticed + return new Promise((resolve) => setTimeout(() => resolve(), 1000)) + }) + }) + }) +} diff --git a/src/pubsub/tests/index.js b/src/pubsub/tests/index.js new file mode 100644 index 0000000..1b1414b --- /dev/null +++ b/src/pubsub/tests/index.js @@ -0,0 +1,18 @@ +/* eslint-env mocha */ +'use strict' + +const apiTest = require('./api') +const emitSelfTest = require('./emit-self') +const messagesTest = require('./messages') +const twoNodesTest = require('./two-nodes') +const multipleNodesTest = require('./multiple-nodes') + +module.exports = (common) => { + describe('interface-pubsub', () => { + apiTest(common) + emitSelfTest(common) + messagesTest(common) + twoNodesTest(common) + multipleNodesTest(common) + }) +} diff --git a/src/pubsub/tests/messages.js b/src/pubsub/tests/messages.js new file mode 100644 index 0000000..6723937 --- /dev/null +++ b/src/pubsub/tests/messages.js @@ -0,0 +1,116 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const { expect } = chai +const sinon = require('sinon') + +const PeerId = require('peer-id') +const uint8ArrayFromString = require('uint8arrays/from-string') + +const { utils } = require('..') +const PeerStreams = require('../peer-streams') + +const topic = 'foo' +const data = uint8ArrayFromString('bar') + +module.exports = (common) => { + describe('messages', () => { + let pubsub + + // Create pubsub router + beforeEach(async () => { + [pubsub] = await common.setup(1) + pubsub.start() + }) + + afterEach(async () => { + sinon.restore() + pubsub && pubsub.stop() + await common.teardown() + }) + + it('should emit normalized signed messages on publish', async () => { + sinon.spy(pubsub, '_emitMessage') + sinon.spy(utils, 'randomSeqno') + + await pubsub.publish(topic, data) + expect(pubsub._emitMessage.callCount).to.eql(1) + + const [messageToEmit] = pubsub._emitMessage.getCall(0).args + + const expected = utils.normalizeInRpcMessage( + await pubsub._buildMessage({ + receivedFrom: pubsub.peerId.toB58String(), + from: pubsub.peerId.toB58String(), + data, + seqno: utils.randomSeqno.getCall(0).returnValue, + topicIDs: [topic] + })) + + expect(messageToEmit).to.eql(expected) + }) + + it('should drop unsigned messages', async () => { + sinon.spy(pubsub, '_emitMessage') + sinon.spy(pubsub, '_publish') + sinon.spy(pubsub, 'validate') + + const peerStream = new PeerStreams({ id: await PeerId.create() }) + const rpc = { + subscriptions: [], + msgs: [{ + receivedFrom: peerStream.id.toB58String(), + from: peerStream.id.toBytes(), + data, + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + pubsub.subscribe(topic) + pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) + + return new Promise((resolve) => { + setTimeout(() => { + expect(pubsub.validate.callCount).to.eql(1) + expect(pubsub._emitMessage.called).to.eql(false) + expect(pubsub._publish.called).to.eql(false) + + resolve() + }, 50) + }) + }) + + it('should not drop unsigned messages if strict signing is disabled', async () => { + sinon.spy(pubsub, '_emitMessage') + sinon.spy(pubsub, '_publish') + sinon.spy(pubsub, 'validate') + sinon.stub(pubsub, 'strictSigning').value(false) + + const peerStream = new PeerStreams({ id: await PeerId.create() }) + const rpc = { + subscriptions: [], + msgs: [{ + from: peerStream.id.toBytes(), + data, + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + pubsub.subscribe(topic) + pubsub._processRpc(peerStream.id.toB58String(), peerStream, rpc) + + return new Promise((resolve) => { + setTimeout(() => { + expect(pubsub.validate.callCount).to.eql(1) + expect(pubsub._emitMessage.called).to.eql(true) + expect(pubsub._publish.called).to.eql(true) + + resolve() + }, 50) + }) + }) + }) +} diff --git a/src/pubsub/tests/multiple-nodes.js b/src/pubsub/tests/multiple-nodes.js new file mode 100644 index 0000000..faef543 --- /dev/null +++ b/src/pubsub/tests/multiple-nodes.js @@ -0,0 +1,346 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const chai = require('chai') +const { expect } = chai +const sinon = require('sinon') + +const delay = require('delay') +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') + +const { expectSet } = require('./utils') + +module.exports = (common) => { + describe('pubsub with multiple nodes', function () { + this.timeout(10e3) + describe('every peer subscribes to the topic', () => { + describe('line', () => { + // line + // ◉────◉────◉ + // a b c + let psA, psB, psC + + // Create and start pubsub nodes + beforeEach(async () => { + [psA, psB, psC] = await common.setup(3) + + // Start pubsub mpdes + ;[psA, psB, psC].map((p) => p.start()) + }) + + // Connect nodes + beforeEach(async () => { + await psA._libp2p.dial(psB.peerId) + await psB._libp2p.dial(psC.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => + psA.peers.size === 1 && + psC.peers.size === 1 && + psA.peers.size === 1 + ) + }) + + afterEach(async () => { + sinon.restore() + + ;[psA, psB, psC].map((p) => p.stop()) + await common.teardown() + }) + + it('subscribe to the topic on node a', () => { + const topic = 'Z' + const defer = pDefer() + + psA.subscribe(topic) + expectSet(psA.subscriptions, [topic]) + + psB.once('pubsub:subscription-change', () => { + expect(psB.peers.size).to.equal(2) + + const aPeerId = psA.peerId.toB58String() + expectSet(psB.topics.get(topic), [aPeerId]) + + expect(psC.peers.size).to.equal(1) + expect(psC.topics.get(topic)).to.not.exist() + + defer.resolve() + }) + + return defer.promise + }) + + it('subscribe to the topic on node b', async () => { + const topic = 'Z' + psB.subscribe(topic) + expectSet(psB.subscriptions, [topic]) + + await Promise.all([ + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('pubsub:subscription-change', resolve)) + ]) + + expect(psA.peers.size).to.equal(1) + expectSet(psA.topics.get(topic), [psB.peerId.toB58String()]) + + expect(psC.peers.size).to.equal(1) + expectSet(psC.topics.get(topic), [psB.peerId.toB58String()]) + }) + + it('subscribe to the topic on node c', () => { + const topic = 'Z' + const defer = pDefer() + + psC.subscribe(topic) + expectSet(psC.subscriptions, [topic]) + + psB.once('pubsub:subscription-change', () => { + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(2) + expectSet(psB.topics.get(topic), [psC.peerId.toB58String()]) + + defer.resolve() + }) + + return defer.promise + }) + + it('publish on node a', async () => { + const topic = 'Z' + const defer = pDefer() + + psA.subscribe(topic) + psB.subscribe(topic) + psC.subscribe(topic) + + // await subscription change + await Promise.all([ + new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) + ]) + + // await a cycle + await delay(1000) + + let counter = 0 + + psA.on(topic, incMsg) + psB.on(topic, incMsg) + psC.on(topic, incMsg) + + psA.publish(topic, uint8ArrayFromString('hey')) + + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + check() + } + + function check () { + if (++counter === 3) { + psA.removeListener(topic, incMsg) + psB.removeListener(topic, incMsg) + psC.removeListener(topic, incMsg) + defer.resolve() + } + } + + return defer.promise + }) + + // since the topology is the same, just the publish + // gets sent by other peer, we reused the same peers + describe('1 level tree', () => { + // 1 level tree + // ┌◉┐ + // │b│ + // ◉─┘ └─◉ + // a c + + it('publish on node b', async () => { + const topic = 'Z' + const defer = pDefer() + let counter = 0 + + psA.subscribe(topic) + psB.subscribe(topic) + psC.subscribe(topic) + + // await subscription change + await Promise.all([ + new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) + ]) + + psA.on(topic, incMsg) + psB.on(topic, incMsg) + psC.on(topic, incMsg) + + // await a cycle + await delay(1000) + + psB.publish(topic, uint8ArrayFromString('hey')) + + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + check() + } + + function check () { + if (++counter === 3) { + psA.removeListener(topic, incMsg) + psB.removeListener(topic, incMsg) + psC.removeListener(topic, incMsg) + defer.resolve() + } + } + + return defer.promise + }) + }) + }) + + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◉─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◉ + // a + let psA, psB, psC, psD, psE + + // Create and start pubsub nodes + beforeEach(async () => { + [psA, psB, psC, psD, psE] = await common.setup(5) + + // Start pubsub nodes + ;[psA, psB, psC, psD, psE].map((p) => p.start()) + }) + + // connect nodes + beforeEach(async () => { + await psA._libp2p.dial(psB.peerId) + await psB._libp2p.dial(psC.peerId) + await psC._libp2p.dial(psD.peerId) + await psD._libp2p.dial(psE.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => + psA.peers.size === 1 && + psB.peers.size === 2 && + psC.peers.size === 2 && + psD.peers.size === 2 && + psE.peers.size === 1 + ) + }) + + afterEach(async () => { + [psA, psB, psC, psD, psE].map((p) => p.stop()) + await common.teardown() + }) + + it('subscribes', () => { + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) + psD.subscribe('Z') + expectSet(psD.subscriptions, ['Z']) + psE.subscribe('Z') + expectSet(psE.subscriptions, ['Z']) + }) + + it('publishes from c', async function () { + this.timeout(30 * 1000) + const defer = pDefer() + let counter = 0 + + psA.subscribe('Z') + psA.on('Z', incMsg) + psB.subscribe('Z') + psB.on('Z', incMsg) + psC.subscribe('Z') + psC.on('Z', incMsg) + psD.subscribe('Z') + psD.on('Z', incMsg) + psE.subscribe('Z') + psE.on('Z', incMsg) + + await Promise.all([ + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psD.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psE.once('pubsub:subscription-change', resolve)) + ]) + + // await a cycle + await delay(1000) + + psC.publish('Z', uint8ArrayFromString('hey from c')) + + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey from c') + check() + } + + function check () { + if (++counter === 5) { + psA.unsubscribe('Z', incMsg) + psB.unsubscribe('Z', incMsg) + psC.unsubscribe('Z', incMsg) + psD.unsubscribe('Z', incMsg) + psE.unsubscribe('Z', incMsg) + defer.resolve() + } + } + + return defer.promise + }) + }) + }) + + describe('only some nodes subscribe the networks', () => { + describe('line', () => { + // line + // ◉────◎────◉ + // a b c + + before(() => { }) + after(() => { }) + }) + + describe('1 level tree', () => { + // 1 level tree + // ┌◉┐ + // │b│ + // ◎─┘ └─◉ + // a c + + before(() => { }) + after(() => { }) + }) + + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◎─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◎ + // a e + + before(() => { }) + after(() => { }) + }) + }) + }) +} diff --git a/src/pubsub/tests/two-nodes.js b/src/pubsub/tests/two-nodes.js new file mode 100644 index 0000000..028e0aa --- /dev/null +++ b/src/pubsub/tests/two-nodes.js @@ -0,0 +1,228 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const chai = require('chai') +const { expect } = chai +const sinon = require('sinon') + +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') + +const { + first, + expectSet +} = require('./utils') + +const topic = 'foo' + +function shouldNotHappen (_) { + expect.fail() +} + +module.exports = (common) => { + describe('pubsub with two nodes', () => { + describe('fresh nodes', () => { + let psA, psB + + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) + + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + // Start pubsub and connect nodes + psA.start() + psB.start() + + await psA._libp2p.dial(psB.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1) + }) + + after(async () => { + sinon.restore() + + psA && psA.stop() + psB && psB.stop() + + await common.teardown() + }) + + it('Subscribe to a topic in nodeA', () => { + const defer = pDefer() + + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expectSet(psA.subscriptions, [topic]) + expect(psB.peers.size).to.equal(1) + expectSet(psB.topics.get(topic), [psA.peerId.toB58String()]) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + defer.resolve() + }) + psA.subscribe(topic) + + return defer.promise + }) + + it('Publish to a topic in nodeA', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + psB.removeListener(topic, shouldNotHappen) + defer.resolve() + }) + + psB.once(topic, shouldNotHappen) + + psA.publish(topic, uint8ArrayFromString('hey')) + + return defer.promise + }) + + it('Publish to a topic in nodeB', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + psA.once(topic, shouldNotHappen) + expect(uint8ArrayToString(msg.data)).to.equal('banana') + + setTimeout(() => { + psA.removeListener(topic, shouldNotHappen) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + }, 100) + }) + + psB.once(topic, shouldNotHappen) + + psB.publish(topic, uint8ArrayFromString('banana')) + + return defer.promise + }) + + it('Publish 10 msg to a topic in nodeB', () => { + const defer = pDefer() + let counter = 0 + + psB.once(topic, shouldNotHappen) + psA.on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('banana') + expect(msg.from).to.be.eql(psB.peerId.toB58String()) + expect(msg.seqno).to.be.a('Uint8Array') + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + psA.removeListener(topic, receivedMsg) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + } + } + + Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana'))) + + return defer.promise + }) + + it('Unsubscribe from topic in nodeA', () => { + const defer = pDefer() + + psA.unsubscribe(topic) + expect(psA.subscriptions.size).to.equal(0) + + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expect(psB.peers.size).to.equal(1) + expectSet(psB.topics.get(topic), []) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + + defer.resolve() + }) + + return defer.promise + }) + + it('Publish to a topic:Z in nodeA nodeB', () => { + const defer = pDefer() + + psA.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) + + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + defer.resolve() + }, 100) + + psB.publish('Z', uint8ArrayFromString('banana')) + psA.publish('Z', uint8ArrayFromString('banana')) + + return defer.promise + }) + }) + + describe('nodes send state on connection', () => { + let psA, psB + + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) + + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + // Start pubsub and connect nodes + psA.start() + psB.start() + }) + + // Make subscriptions prior to nodes connected + before(() => { + psA.subscribe('Za') + psB.subscribe('Zb') + + expect(psA.peers.size).to.equal(0) + expectSet(psA.subscriptions, ['Za']) + expect(psB.peers.size).to.equal(0) + expectSet(psB.subscriptions, ['Zb']) + }) + + after(async () => { + sinon.restore() + + psA && psA.stop() + psB && psB.stop() + + await common.teardown() + }) + + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(10e3) + + await Promise.all([ + psA._libp2p.dial(psB.peerId), + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)) + ]) + + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + + expectSet(psA.subscriptions, ['Za']) + expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()]) + + expectSet(psB.subscriptions, ['Zb']) + expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()]) + }) + }) + }) +} diff --git a/src/pubsub/tests/utils.js b/src/pubsub/tests/utils.js new file mode 100644 index 0000000..7d16ac4 --- /dev/null +++ b/src/pubsub/tests/utils.js @@ -0,0 +1,9 @@ +'use strict' + +const { expect } = require('chai') + +exports.first = (map) => map.values().next().value + +exports.expectSet = (set, subs) => { + expect(Array.from(set.values())).to.eql(subs) +} diff --git a/src/pubsub/utils.js b/src/pubsub/utils.js new file mode 100644 index 0000000..4d26bef --- /dev/null +++ b/src/pubsub/utils.js @@ -0,0 +1,99 @@ +'use strict' + +const randomBytes = require('libp2p-crypto/src/random-bytes') +const uint8ArrayToString = require('uint8arrays/to-string') +const uint8ArrayFromString = require('uint8arrays/from-string') + +exports = module.exports + +/** + * Generatea random sequence number. + * + * @returns {Uint8Array} + * @private + */ +exports.randomSeqno = () => { + return randomBytes(8) +} + +/** + * Generate a message id, based on the `from` and `seqno`. + * + * @param {string} from + * @param {Uint8Array} seqno + * @returns {string} + * @private + */ +exports.msgId = (from, seqno) => { + return from + uint8ArrayToString(seqno, 'base16') +} + +/** + * Check if any member of the first set is also a member + * of the second set. + * + * @param {Set|Array} a + * @param {Set|Array} b + * @returns {boolean} + * @private + */ +exports.anyMatch = (a, b) => { + let bHas + if (Array.isArray(b)) { + bHas = (val) => b.indexOf(val) > -1 + } else { + bHas = (val) => b.has(val) + } + + for (const val of a) { + if (bHas(val)) { + return true + } + } + + return false +} + +/** + * Make everything an array. + * + * @param {any} maybeArray + * @returns {Array} + * @private + */ +exports.ensureArray = (maybeArray) => { + if (!Array.isArray(maybeArray)) { + return [maybeArray] + } + + return maybeArray +} + +/** + * Ensures `message.from` is base58 encoded + * @param {Object} message + * @param {Uint8Array|String} message.from + * @param {String} peerId + * @return {Object} + */ +exports.normalizeInRpcMessage = (message, peerId) => { + const m = Object.assign({}, message) + if (message.from instanceof Uint8Array) { + m.from = uint8ArrayToString(message.from, 'base58btc') + } + if (peerId) { + m.receivedFrom = peerId + } + return m +} + +exports.normalizeOutRpcMessage = (message) => { + const m = Object.assign({}, message) + if (typeof message.from === 'string' || message.from instanceof String) { + m.from = uint8ArrayFromString(message.from, 'base58btc') + } + if (typeof message.data === 'string' || message.data instanceof String) { + m.data = uint8ArrayFromString(message.data) + } + return m +} diff --git a/test/pubsub/emit-self.spec.js b/test/pubsub/emit-self.spec.js new file mode 100644 index 0000000..f365941 --- /dev/null +++ b/test/pubsub/emit-self.spec.js @@ -0,0 +1,78 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') + +const { + createPeerId, + mockRegistrar, + PubsubImplementation +} = require('./utils') + +const uint8ArrayFromString = require('uint8arrays/from-string') + +const protocol = '/pubsub/1.0.0' +const topic = 'foo' +const data = uint8ArrayFromString('bar') +const shouldNotHappen = (_) => expect.fail() + +describe('emitSelf', () => { + let pubsub + + describe('enabled', () => { + before(async () => { + const peerId = await createPeerId() + + pubsub = new PubsubImplementation(protocol, { + peerId, + registrar: mockRegistrar + }, { emitSelf: true }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(() => { + pubsub.stop() + }) + + it('should emit to self on publish', () => { + const promise = new Promise((resolve) => pubsub.once(topic, resolve)) + + pubsub.publish(topic, data) + + return promise + }) + }) + + describe('disabled', () => { + before(async () => { + const peerId = await createPeerId() + + pubsub = new PubsubImplementation(protocol, { + peerId, + registrar: mockRegistrar + }, { emitSelf: false }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(() => { + pubsub.stop() + }) + + it('should not emit to self on publish', () => { + pubsub.once(topic, (m) => shouldNotHappen) + + pubsub.publish(topic, data) + + // Wait 1 second to guarantee that self is not noticed + return new Promise((resolve) => setTimeout(() => resolve(), 1000)) + }) + }) +}) diff --git a/test/pubsub/instance.spec.js b/test/pubsub/instance.spec.js new file mode 100644 index 0000000..7ec3bc6 --- /dev/null +++ b/test/pubsub/instance.spec.js @@ -0,0 +1,54 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') + +const PubsubBaseImpl = require('../../src/pubsub') +const { + createPeerId, + mockRegistrar +} = require('./utils') + +describe('pubsub instance', () => { + let peerId + + before(async () => { + peerId = await createPeerId() + }) + + it('should throw if no debugName is provided', () => { + expect(() => { + new PubsubBaseImpl() // eslint-disable-line no-new + }).to.throw() + }) + + it('should throw if no multicodec is provided', () => { + expect(() => { + new PubsubBaseImpl({ // eslint-disable-line no-new + debugName: 'pubsub' + }) + }).to.throw() + }) + + it('should throw if no libp2p is provided', () => { + expect(() => { + new PubsubBaseImpl({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0' + }) + }).to.throw() + }) + + it('should accept valid parameters', () => { + expect(() => { + new PubsubBaseImpl({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + libp2p: { + peerId: peerId, + registrar: mockRegistrar + } + }) + }).not.to.throw() + }) +}) diff --git a/test/pubsub/lifesycle.spec.js b/test/pubsub/lifesycle.spec.js new file mode 100644 index 0000000..4da0344 --- /dev/null +++ b/test/pubsub/lifesycle.spec.js @@ -0,0 +1,227 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') + +const PubsubBaseImpl = require('../../src/pubsub') +const { + createPeerId, + createMockRegistrar, + PubsubImplementation, + ConnectionPair +} = require('./utils') + +describe('pubsub base lifecycle', () => { + describe('should start and stop properly', () => { + let pubsub + let sinonMockRegistrar + + beforeEach(async () => { + const peerId = await createPeerId() + sinonMockRegistrar = { + handle: sinon.stub(), + register: sinon.stub(), + unregister: sinon.stub() + } + + pubsub = new PubsubBaseImpl({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + libp2p: { + peerId: peerId, + registrar: sinonMockRegistrar + } + }) + + expect(pubsub.peers.size).to.be.eql(0) + }) + + afterEach(() => { + sinon.restore() + }) + + it('should be able to start and stop', async () => { + await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() + }) + + it('starting should not throw if already started', async () => { + await pubsub.start() + await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() + }) + + it('stopping should not throw if not started', async () => { + await pubsub.stop() + expect(sinonMockRegistrar.register.calledOnce).to.be.false() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.false() + }) + }) + + describe('should be able to register two nodes', () => { + const protocol = '/pubsub/1.0.0' + let pubsubA, pubsubB + let peerIdA, peerIdB + const registrarRecordA = {} + const registrarRecordB = {} + + // mount pubsub + beforeEach(async () => { + peerIdA = await createPeerId() + peerIdB = await createPeerId() + + pubsubA = new PubsubImplementation(protocol, { + peerId: peerIdA, + registrar: createMockRegistrar(registrarRecordA) + }) + pubsubB = new PubsubImplementation(protocol, { + peerId: peerIdB, + registrar: createMockRegistrar(registrarRecordB) + }) + }) + + // start pubsub + beforeEach(() => { + pubsubA.start() + pubsubB.start() + + expect(Object.keys(registrarRecordA)).to.have.lengthOf(1) + expect(Object.keys(registrarRecordB)).to.have.lengthOf(1) + }) + + afterEach(() => { + sinon.restore() + + return Promise.all([ + pubsubA.stop(), + pubsubB.stop() + ]) + }) + + it('should handle onConnect as expected', async () => { + const onConnectA = registrarRecordA[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + + expect(pubsubA.peers.size).to.be.eql(1) + expect(pubsubB.peers.size).to.be.eql(1) + }) + + it('should use the latest connection if onConnect is called more than once', async () => { + const onConnectA = registrarRecordA[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + const [c2] = ConnectionPair() + + sinon.spy(c0, 'newStream') + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + expect(c0.newStream).to.have.property('callCount', 1) + + sinon.spy(pubsubA, '_removePeer') + + sinon.spy(c2, 'newStream') + + await onConnectA(peerIdB, c2) + expect(c2.newStream).to.have.property('callCount', 1) + expect(pubsubA._removePeer).to.have.property('callCount', 0) + + // Verify the first stream was closed + const { stream: firstStream } = await c0.newStream.returnValues[0] + try { + await firstStream.sink(['test']) + } catch (err) { + expect(err).to.exist() + return + } + expect.fail('original stream should have ended') + }) + + it('should handle newStream errors in onConnect', async () => { + const onConnectA = registrarRecordA[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + const error = new Error('new stream error') + sinon.stub(c0, 'newStream').throws(error) + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + + expect(c0.newStream).to.have.property('callCount', 1) + }) + + it('should handle onDisconnect as expected', async () => { + const onConnectA = registrarRecordA[protocol].onConnect + const onDisconnectA = registrarRecordA[protocol].onDisconnect + const handlerB = registrarRecordB[protocol].handler + const onDisconnectB = registrarRecordB[protocol].onDisconnect + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + + // Notice peers of disconnect + onDisconnectA(peerIdB) + onDisconnectB(peerIdA) + + expect(pubsubA.peers.size).to.be.eql(0) + expect(pubsubB.peers.size).to.be.eql(0) + }) + + it('should handle onDisconnect for unknown peers', () => { + const onDisconnectA = registrarRecordA[protocol].onDisconnect + + expect(pubsubA.peers.size).to.be.eql(0) + + // Notice peers of disconnect + onDisconnectA(peerIdB) + + expect(pubsubA.peers.size).to.be.eql(0) + }) + }) +}) diff --git a/test/pubsub/message.spec.js b/test/pubsub/message.spec.js new file mode 100644 index 0000000..e2e189f --- /dev/null +++ b/test/pubsub/message.spec.js @@ -0,0 +1,73 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') + +const PubsubBaseImpl = require('../../src/pubsub') +const { randomSeqno } = require('../../src/pubsub/utils') +const { + createPeerId, + mockRegistrar +} = require('./utils') + +describe('pubsub base messages', () => { + let peerId + let pubsub + + before(async () => { + peerId = await createPeerId() + pubsub = new PubsubBaseImpl({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + libp2p: { + peerId: peerId, + registrar: mockRegistrar + } + }) + }) + + afterEach(() => { + sinon.restore() + }) + + it('_buildMessage normalizes and signs messages', async () => { + const message = { + receivedFrom: peerId.id, + from: peerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const signedMessage = await pubsub._buildMessage(message) + expect(pubsub.validate(signedMessage)).to.not.be.rejected() + }) + + it('validate with strict signing off will validate a present signature', async () => { + const message = { + receivedFrom: peerId.id, + from: peerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + sinon.stub(pubsub, 'strictSigning').value(false) + + const signedMessage = await pubsub._buildMessage(message) + expect(pubsub.validate(signedMessage)).to.not.be.rejected() + }) + + it('validate with strict signing requires a signature', async () => { + const message = { + receivedFrom: peerId.id, + from: peerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + await expect(pubsub.validate(message)).to.be.rejectedWith(Error, 'Signing required and no signature was present') + }) +}) diff --git a/test/pubsub/pubsub.spec.js b/test/pubsub/pubsub.spec.js new file mode 100644 index 0000000..3e3aae3 --- /dev/null +++ b/test/pubsub/pubsub.spec.js @@ -0,0 +1,358 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') +const pWaitFor = require('p-wait-for') + +const uint8ArrayFromString = require('uint8arrays/from-string') + +const PeerStreams = require('../../src/pubsub/peer-streams') +const { + createPeerId, + createMockRegistrar, + ConnectionPair, + mockRegistrar, + PubsubImplementation +} = require('./utils') + +const protocol = '/pubsub/1.0.0' +const topic = 'test-topic' +const message = uint8ArrayFromString('hello') + +describe('pubsub base implementation', () => { + describe('publish', () => { + let pubsub + + beforeEach(async () => { + const peerId = await createPeerId() + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + }) + + afterEach(() => pubsub.stop()) + + it('calls _publish for router to forward messages', async () => { + sinon.spy(pubsub, '_publish') + + pubsub.start() + await pubsub.publish(topic, message) + + expect(pubsub._publish.callCount).to.eql(1) + }) + + it('should sign messages on publish', async () => { + sinon.spy(pubsub, '_publish') + + pubsub.start() + await pubsub.publish(topic, message) + + // Get the first message sent to _publish, and validate it + const signedMessage = pubsub._publish.getCall(0).lastArg + try { + await pubsub.validate(signedMessage) + } catch (e) { + expect.fail('validation should not throw') + } + }) + }) + + describe('subscribe', () => { + describe('basics', () => { + let pubsub + + beforeEach(async () => { + const peerId = await createPeerId() + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + pubsub.start() + }) + + afterEach(() => pubsub.stop()) + + it('should add subscription', () => { + pubsub.subscribe(topic) + + expect(pubsub.subscriptions.size).to.eql(1) + expect(pubsub.subscriptions.has(topic)).to.be.true() + }) + }) + + describe('two nodes', () => { + let pubsubA, pubsubB + let peerIdA, peerIdB + const registrarRecordA = {} + const registrarRecordB = {} + + beforeEach(async () => { + peerIdA = await createPeerId() + peerIdB = await createPeerId() + + pubsubA = new PubsubImplementation(protocol, { + peerId: peerIdA, + registrar: createMockRegistrar(registrarRecordA) + }) + pubsubB = new PubsubImplementation(protocol, { + peerId: peerIdB, + registrar: createMockRegistrar(registrarRecordB) + }) + }) + + // start pubsub and connect nodes + beforeEach(async () => { + pubsubA.start() + pubsubB.start() + + const onConnectA = registrarRecordA[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + }) + + afterEach(() => { + pubsubA.stop() + pubsubB.stop() + }) + + it('should send subscribe message to connected peers', async () => { + sinon.spy(pubsubA, '_sendSubscriptions') + sinon.spy(pubsubB, '_processRpcSubOpt') + + pubsubA.subscribe(topic) + + // Should send subscriptions to a peer + expect(pubsubA._sendSubscriptions.callCount).to.eql(1) + + // Other peer should receive subscription message + await pWaitFor(() => { + const subscribers = pubsubB.getSubscribers(topic) + + return subscribers.length === 1 + }) + expect(pubsubB._processRpcSubOpt.callCount).to.eql(1) + }) + }) + }) + + describe('unsubscribe', () => { + describe('basics', () => { + let pubsub + + beforeEach(async () => { + const peerId = await createPeerId() + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + pubsub.start() + }) + + afterEach(() => pubsub.stop()) + + it('should remove all subscriptions for a topic', () => { + pubsub.subscribe(topic, (msg) => {}) + pubsub.subscribe(topic, (msg) => {}) + + expect(pubsub.subscriptions.size).to.eql(1) + + pubsub.unsubscribe(topic) + + expect(pubsub.subscriptions.size).to.eql(0) + }) + }) + + describe('two nodes', () => { + let pubsubA, pubsubB + let peerIdA, peerIdB + const registrarRecordA = {} + const registrarRecordB = {} + + beforeEach(async () => { + peerIdA = await createPeerId() + peerIdB = await createPeerId() + + pubsubA = new PubsubImplementation(protocol, { + peerId: peerIdA, + registrar: createMockRegistrar(registrarRecordA) + }) + pubsubB = new PubsubImplementation(protocol, { + peerId: peerIdB, + registrar: createMockRegistrar(registrarRecordB) + }) + }) + + // start pubsub and connect nodes + beforeEach(async () => { + pubsubA.start() + pubsubB.start() + + const onConnectA = registrarRecordA[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + + await onConnectA(peerIdB, c0) + await handlerB({ + protocol, + stream: c1.stream, + connection: { + remotePeer: peerIdA + } + }) + }) + + afterEach(() => { + pubsubA.stop() + pubsubB.stop() + }) + + it('should send unsubscribe message to connected peers', async () => { + sinon.spy(pubsubA, '_sendSubscriptions') + sinon.spy(pubsubB, '_processRpcSubOpt') + + pubsubA.subscribe(topic) + // Should send subscriptions to a peer + expect(pubsubA._sendSubscriptions.callCount).to.eql(1) + + // Other peer should receive subscription message + await pWaitFor(() => { + const subscribers = pubsubB.getSubscribers(topic) + + return subscribers.length === 1 + }) + expect(pubsubB._processRpcSubOpt.callCount).to.eql(1) + + // Unsubscribe + pubsubA.unsubscribe(topic) + // Should send subscriptions to a peer + expect(pubsubA._sendSubscriptions.callCount).to.eql(2) + + // Other peer should receive subscription message + await pWaitFor(() => { + const subscribers = pubsubB.getSubscribers(topic) + + return subscribers.length === 0 + }) + expect(pubsubB._processRpcSubOpt.callCount).to.eql(2) + }) + + it('should not send unsubscribe message to connected peers if not subscribed', () => { + sinon.spy(pubsubA, '_sendSubscriptions') + sinon.spy(pubsubB, '_processRpcSubOpt') + + // Unsubscribe + pubsubA.unsubscribe(topic) + + // Should send subscriptions to a peer + expect(pubsubA._sendSubscriptions.callCount).to.eql(0) + }) + }) + }) + + describe('getTopics', () => { + let peerId + let pubsub + + beforeEach(async () => { + peerId = await createPeerId() + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + pubsub.start() + }) + + afterEach(() => pubsub.stop()) + + it('returns the subscribed topics', () => { + let subsTopics = pubsub.getTopics() + expect(subsTopics).to.have.lengthOf(0) + + pubsub.subscribe(topic) + + subsTopics = pubsub.getTopics() + expect(subsTopics).to.have.lengthOf(1) + expect(subsTopics[0]).to.eql(topic) + }) + }) + + describe('getSubscribers', () => { + let peerId + let pubsub + + beforeEach(async () => { + peerId = await createPeerId() + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + }) + + afterEach(() => pubsub.stop()) + + it('should fail if pubsub is not started', () => { + const topic = 'topic-test' + + try { + pubsub.getSubscribers(topic) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql('ERR_NOT_STARTED_YET') + return + } + throw new Error('should fail if pubsub is not started') + }) + + it('should fail if no topic is provided', () => { + // start pubsub + pubsub.start() + + try { + pubsub.getSubscribers() + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql('ERR_NOT_VALID_TOPIC') + return + } + throw new Error('should fail if no topic is provided') + }) + + it('should get peer subscribed to one topic', () => { + const topic = 'topic-test' + + // start pubsub + pubsub.start() + + let peersSubscribed = pubsub.getSubscribers(topic) + expect(peersSubscribed).to.be.empty() + + // Set mock peer subscribed + const peer = new PeerStreams({ id: peerId }) + const id = peer.id.toB58String() + + pubsub.topics.set(topic, new Set([id])) + pubsub.peers.set(id, peer) + + peersSubscribed = pubsub.getSubscribers(topic) + + expect(peersSubscribed).to.not.be.empty() + expect(peersSubscribed[0]).to.eql(id) + }) + }) +}) diff --git a/test/pubsub/sign.spec.js b/test/pubsub/sign.spec.js new file mode 100644 index 0000000..f743933 --- /dev/null +++ b/test/pubsub/sign.spec.js @@ -0,0 +1,93 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const uint8ArrayConcat = require('uint8arrays/concat') +const uint8ArrayFromString = require('uint8arrays/from-string') + +const { Message } = require('../../src/pubsub/message') +const { + signMessage, + SignPrefix, + verifySignature +} = require('../../src/pubsub/message/sign') +const PeerId = require('peer-id') +const { randomSeqno } = require('../../src/pubsub/utils') + +describe('message signing', () => { + let peerId + before(async () => { + peerId = await PeerId.create({ + bits: 1024 + }) + }) + + it('should be able to sign and verify a message', async () => { + const message = { + from: peerId.id, + data: uint8ArrayFromString('hello'), + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) + + const signedMessage = await signMessage(peerId, message) + + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) + + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + }) + + it('should be able to extract the public key from an inlined key', async () => { + const secPeerId = await PeerId.create({ keyType: 'secp256k1' }) + + const message = { + from: secPeerId.id, + data: uint8ArrayFromString('hello'), + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const expectedSignature = await secPeerId.privKey.sign(bytesToSign) + + const signedMessage = await signMessage(secPeerId, message) + + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + signedMessage.key = undefined + + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + }) + + it('should be able to extract the public key from the message', async () => { + const message = { + from: peerId.id, + data: uint8ArrayFromString('hello'), + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } + + const bytesToSign = uint8ArrayConcat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) + + const signedMessage = await signMessage(peerId, message) + + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) + + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + }) +}) diff --git a/test/pubsub/topic-validators.spec.js b/test/pubsub/topic-validators.spec.js new file mode 100644 index 0000000..86e0db8 --- /dev/null +++ b/test/pubsub/topic-validators.spec.js @@ -0,0 +1,110 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') +const pWaitFor = require('p-wait-for') +const errCode = require('err-code') + +const PeerId = require('peer-id') +const uint8ArrayEquals = require('uint8arrays/equals') +const uint8ArrayFromString = require('uint8arrays/from-string') + +const { utils } = require('../../src/pubsub') +const PeerStreams = require('../../src/pubsub/peer-streams') + +const { + createPeerId, + mockRegistrar, + PubsubImplementation +} = require('./utils') + +const protocol = '/pubsub/1.0.0' + +describe('topic validators', () => { + let pubsub + + beforeEach(async () => { + const peerId = await createPeerId() + + pubsub = new PubsubImplementation(protocol, { + peerId: peerId, + registrar: mockRegistrar + }) + + pubsub.start() + }) + + afterEach(() => { + sinon.restore() + }) + + it('should filter messages by topic validator', async () => { + // use _publish.callCount() to see if a message is valid or not + sinon.spy(pubsub, '_publish') + // Disable strict signing + sinon.stub(pubsub, 'strictSigning').value(false) + sinon.stub(pubsub.peers, 'get').returns({}) + const filteredTopic = 't' + const peer = new PeerStreams({ id: await PeerId.create() }) + + // Set a trivial topic validator + pubsub.topicValidators.set(filteredTopic, (topic, message) => { + if (!uint8ArrayEquals(message.data, uint8ArrayFromString('a message'))) { + throw errCode(new Error(), 'ERR_TOPIC_VALIDATOR_REJECT') + } + }) + + // valid case + const validRpc = { + subscriptions: [], + msgs: [{ + from: peer.id.toBytes(), + data: uint8ArrayFromString('a message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process valid message + pubsub.subscribe(filteredTopic) + pubsub._processRpc(peer.id.toB58String(), peer, validRpc) + + await pWaitFor(() => pubsub._publish.callCount === 1) + + // invalid case + const invalidRpc = { + subscriptions: [], + msgs: [{ + from: peer.id.toBytes(), + data: uint8ArrayFromString('a different message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process invalid message + pubsub._processRpc(peer.id.toB58String(), peer, invalidRpc) + expect(pubsub._publish.callCount).to.eql(1) + + // remove topic validator + pubsub.topicValidators.delete(filteredTopic) + + // another invalid case + const invalidRpc2 = { + subscriptions: [], + msgs: [{ + from: peer.id.toB58String(), + data: uint8ArrayFromString('a different message'), + seqno: utils.randomSeqno(), + topicIDs: [filteredTopic] + }] + } + + // process previously invalid message, now is valid + pubsub._processRpc(peer.id.toB58String(), peer, invalidRpc2) + pubsub.unsubscribe(filteredTopic) + + await pWaitFor(() => pubsub._publish.callCount === 2) + }) +}) diff --git a/test/pubsub/utils.spec.js b/test/pubsub/utils.spec.js new file mode 100644 index 0000000..439e0da --- /dev/null +++ b/test/pubsub/utils.spec.js @@ -0,0 +1,82 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const utils = require('../../src/pubsub/utils') +const uint8ArrayFromString = require('uint8arrays/from-string') + +describe('utils', () => { + it('randomSeqno', () => { + const first = utils.randomSeqno() + const second = utils.randomSeqno() + + expect(first).to.have.length(8) + expect(second).to.have.length(8) + expect(first).to.not.eql(second) + }) + + it('msgId', () => { + expect(utils.msgId('hello', uint8ArrayFromString('world'))).to.be.eql('hello776f726c64') + }) + + it('msgId should not generate same ID for two different Uint8Arrays', () => { + const peerId = 'QmPNdSYk5Rfpo5euNqwtyizzmKXMNHdXeLjTQhcN4yfX22' + const msgId0 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfde', 'base16')) + const msgId1 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfe0', 'base16')) + expect(msgId0).to.not.eql(msgId1) + }) + + it('anyMatch', () => { + [ + [[1, 2, 3], [4, 5, 6], false], + [[1, 2], [1, 2], true], + [[1, 2, 3], [4, 5, 1], true], + [[5, 6, 1], [1, 2, 3], true], + [[], [], false], + [[1], [2], false] + ].forEach((test) => { + expect(utils.anyMatch(new Set(test[0]), new Set(test[1]))) + .to.eql(test[2]) + + expect(utils.anyMatch(new Set(test[0]), test[1])) + .to.eql(test[2]) + }) + }) + + it('ensureArray', () => { + expect(utils.ensureArray('hello')).to.be.eql(['hello']) + expect(utils.ensureArray([1, 2])).to.be.eql([1, 2]) + }) + + it('converts an IN msg.from to b58', () => { + const binaryId = uint8ArrayFromString('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'base16') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const m = [ + { from: binaryId }, + { from: stringId } + ] + const expected = [ + { from: stringId }, + { from: stringId } + ] + for (let i = 0; i < m.length; i++) { + expect(utils.normalizeInRpcMessage(m[i])).to.deep.eql(expected[i]) + } + }) + + it('converts an OUT msg.from to binary', () => { + const binaryId = uint8ArrayFromString('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'base16') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const m = [ + { from: binaryId }, + { from: stringId } + ] + const expected = [ + { from: binaryId }, + { from: binaryId } + ] + for (let i = 0; i < m.length; i++) { + expect(utils.normalizeOutRpcMessage(m[i])).to.deep.eql(expected[i]) + } + }) +}) diff --git a/test/pubsub/utils/index.js b/test/pubsub/utils/index.js new file mode 100644 index 0000000..f62f1cb --- /dev/null +++ b/test/pubsub/utils/index.js @@ -0,0 +1,85 @@ +'use strict' + +const DuplexPair = require('it-pair/duplex') + +const PeerId = require('peer-id') + +const PubsubBaseProtocol = require('../../../src/pubsub') +const { message } = require('../../../src/pubsub') + +exports.createPeerId = async () => { + const peerId = await PeerId.create({ bits: 1024 }) + + return peerId +} + +class PubsubImplementation extends PubsubBaseProtocol { + constructor (protocol, libp2p, options = {}) { + super({ + debugName: 'libp2p:pubsub', + multicodecs: protocol, + libp2p, + ...options + }) + } + + _publish (message) { + // ... + } + + _decodeRpc (bytes) { + return message.rpc.RPC.decode(bytes) + } + + _encodeRpc (rpc) { + return message.rpc.RPC.encode(rpc) + } +} + +exports.PubsubImplementation = PubsubImplementation + +exports.mockRegistrar = { + handle: () => {}, + register: () => {}, + unregister: () => {} +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } + }, + register: ({ multicodecs, _onConnect, _onDisconnect }) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + onConnect: _onConnect, + onDisconnect: _onDisconnect + } + + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] + } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] +}