Vasco Santos ba15a48dd9
feat: interface pubsub (#60)
* feat: interface pubsub

* chore: pubsub router tests

* chore: move pubsub abstractions from gossipsub

* chore: address review

* chore: revamp docs

* chore: add emit self tests to interface

* chore: refactor base tests

* chore: publish should only accept one topic per api call

* chore: normalize msg before emit

* chore: do not reset inbound stream

* chore: apply suggestions from code review

Co-authored-by: Jacob Heun <jacobheun@gmail.com>

* chore: address review

* fix: remove subscribe handler

* chore: remove bits from create peerId

Co-authored-by: Jacob Heun <jacobheun@gmail.com>

* chore: remove delay from topic validators tests

* chore: add event emitter information

* fix: topic validator docs

Co-authored-by: Jacob Heun <jacobheun@gmail.com>
2020-08-25 13:05:58 +02:00

655 lines
16 KiB
JavaScript

'use strict'
const debug = require('debug')
const EventEmitter = require('events')
const errcode = require('err-code')
const pipe = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
const message = require('./message')
const PeerStreams = require('./peer-streams')
const utils = require('./utils')
const {
signMessage,
verifySignature
} = require('./message/sign')
/**
* @typedef {Object} InMessage
* @property {string} from
* @property {string} receivedFrom
* @property {string[]} topicIDs
* @property {Uint8Array} data
* @property {Uint8Array} [signature]
* @property {Uint8Array} [key]
*/
/**
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
class PubsubBaseProtocol extends EventEmitter {
/**
* @param {Object} props
* @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract
*/
constructor ({
debugName,
multicodecs,
libp2p,
signMessages = true,
strictSigning = true,
canRelayMessage = false,
emitSelf = false
}) {
if (typeof debugName !== 'string') {
throw new Error('a debugname `string` is required')
}
if (!multicodecs) {
throw new Error('multicodecs are required')
}
if (!libp2p) {
throw new Error('libp2p is required')
}
super()
this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`)
this.multicodecs = utils.ensureArray(multicodecs)
this._libp2p = libp2p
this.registrar = libp2p.registrar
this.peerId = libp2p.peerId
this.started = false
/**
* Map of topics to which peers are subscribed to
*
* @type {Map<string, Set<string>>}
*/
this.topics = new Map()
/**
* List of our subscriptions
* @type {Set<string>}
*/
this.subscriptions = new Set()
/**
* Map of peer streams
*
* @type {Map<string, PeerStreams>}
*/
this.peers = new Map()
// Message signing
this.signMessages = signMessages
/**
* If message signing should be required for incoming messages
* @type {boolean}
*/
this.strictSigning = strictSigning
/**
* If router can relay received messages, even if not subscribed
* @type {boolean}
*/
this.canRelayMessage = canRelayMessage
/**
* if publish should emit to self, if subscribed
* @type {boolean}
*/
this.emitSelf = emitSelf
/**
* Topic validator function
* @typedef {function(string, RPC): boolean} validator
*/
/**
* Topic validator map
*
* Keyed by topic
* Topic validators are functions with the following input:
* @type {Map<string, validator>}
*/
this.topicValidators = new Map()
this._registrarId = undefined
this._onIncomingStream = this._onIncomingStream.bind(this)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
}
// LIFECYCLE METHODS
/**
* Register the pubsub protocol onto the libp2p node.
* @returns {void}
*/
start () {
if (this.started) {
return
}
this.log('starting')
// Incoming streams
// Called after a peer dials us
this.registrar.handle(this.multicodecs, this._onIncomingStream)
// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = new MulticodecTopology({
multicodecs: this.multicodecs,
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
}
})
this._registrarId = this.registrar.register(topology)
this.log('started')
this.started = true
}
/**
* Unregister the pubsub protocol and the streams with other peers will be closed.
* @returns {void}
*/
stop () {
if (!this.started) {
return
}
// unregister protocol and handlers
this.registrar.unregister(this._registrarId)
this.log('stopping')
this.peers.forEach((peerStreams) => peerStreams.close())
this.peers = new Map()
this.subscriptions = new Set()
this.started = false
this.log('stopped')
}
/**
* On an inbound stream opened.
* @private
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexIterableStream} props.stream
* @param {Connection} props.connection connection
*/
_onIncomingStream ({ protocol, stream, connection }) {
const peerId = connection.remotePeer
const idB58Str = peerId.toB58String()
const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream)
this._processMessages(idB58Str, peer.inboundStream, peer)
}
/**
* Registrar notifies an established connection with pubsub protocol.
* @private
* @param {PeerId} peerId remote peer-id
* @param {Connection} conn connection to the peer
*/
async _onPeerConnected (peerId, conn) {
const idB58Str = peerId.toB58String()
this.log('connected', idB58Str)
try {
const { stream, protocol } = await conn.newStream(this.multicodecs)
const peer = this._addPeer(peerId, protocol)
await peer.attachOutboundStream(stream)
} catch (err) {
this.log.err(err)
}
// Immediately send my own subscriptions to the newly established conn
this._sendSubscriptions(idB58Str, Array.from(this.subscriptions), true)
}
/**
* Registrar notifies a closing connection with pubsub protocol.
* @private
* @param {PeerId} peerId peerId
* @param {Error} err error for connection end
*/
_onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String()
this.log('connection ended', idB58Str, err ? err.message : '')
this._removePeer(peerId)
}
/**
* Notifies the router that a peer has been connected
* @private
* @param {PeerId} peerId
* @param {string} protocol
* @returns {PeerStreams}
*/
_addPeer (peerId, protocol) {
const id = peerId.toB58String()
const existing = this.peers.get(id)
// If peer streams already exists, do nothing
if (existing) {
return existing
}
// else create a new peer streams
this.log('new peer', id)
const peerStreams = new PeerStreams({
id: peerId,
protocol
})
this.peers.set(id, peerStreams)
peerStreams.once('close', () => this._removePeer(peerId))
return peerStreams
}
/**
* Notifies the router that a peer has been disconnected.
* @private
* @param {PeerId} peerId
* @returns {PeerStreams | undefined}
*/
_removePeer (peerId) {
if (!peerId) return
const id = peerId.toB58String()
const peerStreams = this.peers.get(id)
if (!peerStreams) return
// close peer streams
peerStreams.removeAllListeners()
peerStreams.close()
// delete peer streams
this.log('delete peer', id)
this.peers.delete(id)
// remove peer from topics map
for (const peers of this.topics.values()) {
peers.delete(id)
}
return peerStreams
}
// MESSAGE METHODS
/**
* Responsible for processing each RPC message received by other peers.
* @param {string} idB58Str peer id string in base58
* @param {DuplexIterableStream} stream inbound stream
* @param {PeerStreams} peerStreams PubSub peer
* @returns {Promise<void>}
*/
async _processMessages (idB58Str, stream, peerStreams) {
try {
await pipe(
stream,
async (source) => {
for await (const data of source) {
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
const rpcMsg = this._decodeRpc(rpcBytes)
this._processRpc(idB58Str, peerStreams, rpcMsg)
}
}
)
} catch (err) {
this._onPeerDisconnected(peerStreams.id, err)
}
}
/**
* Handles an rpc request from a peer
* @param {String} idB58Str
* @param {PeerStreams} peerStreams
* @param {RPC} rpc
* @returns {boolean}
*/
_processRpc (idB58Str, peerStreams, rpc) {
this.log('rpc from', idB58Str)
const subs = rpc.subscriptions
const msgs = rpc.msgs
if (subs.length) {
// update peer subscriptions
subs.forEach((subOpt) => this._processRpcSubOpt(idB58Str, subOpt))
this.emit('pubsub:subscription-change', peerStreams.id, subs)
}
if (!this._acceptFrom(idB58Str)) {
this.log('received message from unacceptable peer %s', idB58Str)
return false
}
if (msgs.length) {
msgs.forEach(message => {
if (!(this.canRelayMessage || message.topicIDs.some((topic) => this.subscriptions.has(topic)))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
const msg = utils.normalizeInRpcMessage(message, idB58Str)
this._processRpcMessage(msg)
})
}
return true
}
/**
* Handles a subscription change from a peer
* @param {string} id
* @param {RPC.SubOpt} subOpt
*/
_processRpcSubOpt (id, subOpt) {
const t = subOpt.topicID
let topicSet = this.topics.get(t)
if (!topicSet) {
topicSet = new Set()
this.topics.set(t, topicSet)
}
if (subOpt.subscribe) {
// subscribe peer to new topic
topicSet.add(id)
} else {
// unsubscribe from existing topic
topicSet.delete(id)
}
}
/**
* Handles an message from a peer
* @param {InMessage} msg
* @returns {Promise<void>}
*/
async _processRpcMessage (msg) {
if (this.peerId.toB58String() === msg.from && !this.emitSelf) {
return
}
// Ensure the message is valid before processing it
try {
await this.validate(msg)
} catch (err) {
this.log('Message is invalid, dropping it. %O', err)
return
}
// Emit to self
this._emitMessage(msg)
this._publish(utils.normalizeOutRpcMessage(msg))
}
/**
* Emit a message from a peer
* @param {InMessage} message
*/
_emitMessage (message) {
message.topicIDs.forEach((topic) => {
if (this.subscriptions.has(topic)) {
this.emit(topic, message)
}
})
}
/**
* The default msgID implementation
* Child class can override this.
* @param {RPC.Message} msg the message object
* @returns {string} message id as string
*/
getMsgId (msg) {
return utils.msgId(msg.from, msg.seqno)
}
/**
* Whether to accept a message from a peer
* Override to create a graylist
* @override
* @param {string} id
* @returns {boolean}
*/
_acceptFrom (id) {
return true
}
/**
* Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf.
* @param {Uint8Array} bytes
* @returns {RPC}
*/
_decodeRpc (bytes) {
return message.rpc.RPC.decode(bytes)
}
/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
* @param {RPC} rpc
* @returns {Uint8Array}
*/
_encodeRpc (rpc) {
return message.rpc.RPC.encode(rpc)
}
/**
* Send an rpc object to a peer
* @param {string} id peer id
* @param {RPC} rpc
* @returns {void}
*/
_sendRpc (id, rpc) {
const peerStreams = this.peers.get(id)
if (!peerStreams || !peerStreams.isWritable) {
const msg = `Cannot send RPC to ${id} as there is no open stream to it available`
this.log.err(msg)
return
}
peerStreams.write(this._encodeRpc(rpc))
}
/**
* Send subscroptions to a peer
* @param {string} id peer id
* @param {string[]} topics
* @param {boolean} subscribe set to false for unsubscriptions
* @returns {void}
*/
_sendSubscriptions (id, topics, subscribe) {
return this._sendRpc(id, {
subscriptions: topics.map(t => ({ topicID: t, subscribe: subscribe }))
})
}
/**
* Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages
* @param {InMessage} message
* @returns {Promise<void>}
*/
async validate (message) { // eslint-disable-line require-await
// If strict signing is on and we have no signature, abort
if (this.strictSigning && !message.signature) {
throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}
// Check the message signature if present
if (message.signature && !(await verifySignature(message))) {
throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
continue
}
await validatorFn(topic, message)
}
}
/**
* Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send.
* @private
* @param {Message} message
* @returns {Promise<Message>}
*/
_buildMessage (message) {
const msg = utils.normalizeOutRpcMessage(message)
if (this.signMessages) {
return signMessage(this.peerId, msg)
} else {
return message
}
}
// API METHODS
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getSubscribers (topic) {
if (!this.started) {
throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET')
}
if (!topic || typeof topic !== 'string') {
throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC')
}
const peersInTopic = this.topics.get(topic)
if (!peersInTopic) {
return []
}
return Array.from(peersInTopic)
}
/**
* Publishes messages to all subscribed peers
* @override
* @param {string} topic
* @param {Buffer} message
* @returns {Promise<void>}
*/
async publish (topic, message) {
if (!this.started) {
throw new Error('Pubsub has not started')
}
this.log('publish', topic, message)
const from = this.peerId.toB58String()
let msgObject = {
receivedFrom: from,
from: from,
data: message,
seqno: utils.randomSeqno(),
topicIDs: [topic]
}
// ensure that any operations performed on the message will include the signature
const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg)
// Emit to self if I'm interested and emitSelf enabled
this.emitSelf && this._emitMessage(msgObject)
// send to all the other peers
await this._publish(msgObject)
}
/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {InMessage} message
* @returns {Promise<void>}
*
*/
_publish (message) {
throw errcode(new Error('publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
/**
* Subscribes to a given topic.
* @abstract
* @param {string} topic
* @returns {void}
*/
subscribe (topic) {
if (!this.started) {
throw new Error('Pubsub has not started')
}
if (!this.subscriptions.has(topic)) {
this.subscriptions.add(topic)
this.peers.forEach((_, id) => this._sendSubscriptions(id, [topic], true))
}
}
/**
* Unsubscribe from the given topic.
* @override
* @param {string} topic
* @returns {void}
*/
unsubscribe (topic) {
if (!this.started) {
throw new Error('Pubsub is not started')
}
if (this.subscriptions.has(topic) && this.listenerCount(topic) === 0) {
this.subscriptions.delete(topic)
this.peers.forEach((_, id) => this._sendSubscriptions(id, [topic], false))
}
}
/**
* Get the list of topics which the peer is subscribed to.
* @override
* @returns {Array<String>}
*/
getTopics () {
if (!this.started) {
throw new Error('Pubsub is not started')
}
return Array.from(this.subscriptions)
}
}
module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils