Compare commits

..

4 Commits

18 changed files with 176 additions and 290 deletions

View File

@ -1,37 +1,3 @@
<a name="0.7.2"></a>
## [0.7.2](https://github.com/libp2p/js-interfaces/compare/v0.7.1...v0.7.2) (2020-11-11)
<a name="0.7.1"></a>
## [0.7.1](https://github.com/libp2p/js-interfaces/compare/v0.7.0...v0.7.1) (2020-11-03)
### Bug Fixes
* typescript types ([#69](https://github.com/libp2p/js-interfaces/issues/69)) ([269a6f5](https://github.com/libp2p/js-interfaces/commit/269a6f5))
<a name="0.7.0"></a>
# [0.7.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.7.0) (2020-11-03)
### Features
* pubsub: add global signature policy ([#66](https://github.com/libp2p/js-interfaces/issues/66)) ([946b046](https://github.com/libp2p/js-interfaces/commit/946b046))
* update pubsub getMsgId return type to Uint8Array ([#65](https://github.com/libp2p/js-interfaces/issues/65)) ([e148443](https://github.com/libp2p/js-interfaces/commit/e148443))
### BREAKING CHANGES
* `signMessages` and `strictSigning` pubsub configuration options replaced
with a `globalSignaturePolicy` option
* new getMsgId return type is not backwards compatible with prior `string`
return type.
<a name="0.6.0"></a> <a name="0.6.0"></a>
# [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05) # [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05)

View File

@ -1,6 +1,6 @@
{ {
"name": "libp2p-interfaces", "name": "libp2p-interfaces",
"version": "0.7.2", "version": "0.6.0",
"description": "Interfaces for JS Libp2p", "description": "Interfaces for JS Libp2p",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>", "leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js", "main": "src/index.js",
@ -53,10 +53,8 @@
"it-pipe": "^1.1.0", "it-pipe": "^1.1.0",
"it-pushable": "^1.4.0", "it-pushable": "^1.4.0",
"libp2p-crypto": "^0.18.0", "libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0", "multiaddr": "^8.0.0",
"multibase": "^3.0.0", "multibase": "^3.0.0",
"multihashes": "^3.0.1",
"p-defer": "^3.0.0", "p-defer": "^3.0.0",
"p-limit": "^2.3.0", "p-limit": "^2.3.0",
"p-wait-for": "^3.1.0", "p-wait-for": "^3.1.0",

View File

@ -209,6 +209,8 @@ class Connection {
* @return {Promise<void>} * @return {Promise<void>}
*/ */
async close () { async close () {
this.streams.map(s => s.close && s.close())
if (this.stat.status === Status.CLOSED) { if (this.stat.status === Status.CLOSED) {
return return
} }

View File

@ -11,9 +11,6 @@ Table of Contents
* [Extend interface](#extend-interface) * [Extend interface](#extend-interface)
* [Example](#example) * [Example](#example)
* [API](#api) * [API](#api)
* [Constructor](#constructor)
* [new Pubsub(options)](#new-pubsuboptions)
* [Parameters](#parameters)
* [Start](#start) * [Start](#start)
* [pubsub.start()](#pubsubstart) * [pubsub.start()](#pubsubstart)
* [Returns](#returns) * [Returns](#returns)
@ -22,24 +19,24 @@ Table of Contents
* [Returns](#returns-1) * [Returns](#returns-1)
* [Publish](#publish) * [Publish](#publish)
* [pubsub.publish(topics, message)](#pubsubpublishtopics-message) * [pubsub.publish(topics, message)](#pubsubpublishtopics-message)
* [Parameters](#parameters-1) * [Parameters](#parameters)
* [Returns](#returns-2) * [Returns](#returns-2)
* [Subscribe](#subscribe) * [Subscribe](#subscribe)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic) * [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-2) * [Parameters](#parameters-1)
* [Unsubscribe](#unsubscribe) * [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic) * [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-3) * [Parameters](#parameters-2)
* [Get Topics](#get-topics) * [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics) * [pubsub.getTopics()](#pubsubgettopics)
* [Returns](#returns-3) * [Returns](#returns-3)
* [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic) * [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic)
* [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic) * [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic)
* [Parameters](#parameters-4) * [Parameters](#parameters-3)
* [Returns](#returns-4) * [Returns](#returns-4)
* [Validate](#validate) * [Validate](#validate)
* [pubsub.validate(message)](#pubsubvalidatemessage) * [pubsub.validate(message)](#pubsubvalidatemessage)
* [Parameters](#parameters-5) * [Parameters](#parameters-4)
* [Returns](#returns-5) * [Returns](#returns-5)
* [Test suite usage](#test-suite-usage) * [Test suite usage](#test-suite-usage)
@ -52,7 +49,7 @@ You can check the following implementations as examples for building your own pu
## Interface usage ## Interface usage
`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management and the features describe in the libp2p [pubsub specs](https://github.com/libp2p/specs/tree/master/pubsub). This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it. `interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.
### Extend interface ### Extend interface
@ -77,7 +74,7 @@ All the remaining functions **MUST NOT** be overwritten.
The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.
```JavaScript ```JavaScript
const Pubsub = require('libp2p-interfaces/src/pubsub') const Pubsub = require('libp2p-pubsub')
class PubsubImplementation extends Pubsub { class PubsubImplementation extends Pubsub {
constructor({ libp2p, options }) constructor({ libp2p, options })
@ -85,7 +82,8 @@ class PubsubImplementation extends Pubsub {
debugName: 'libp2p:pubsub', debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0', multicodecs: '/pubsub-implementation/1.0.0',
libp2p, libp2p,
globalSigningPolicy: options.globalSigningPolicy signMessages: options.signMessages,
strictSigning: options.strictSigning
}) })
} }
@ -100,23 +98,6 @@ class PubsubImplementation extends Pubsub {
The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message. The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message.
### Constructor
The base class constructor configures the pubsub instance for use with a libp2p instance. It includes settings for logging, signature policies, etc.
#### `new Pubsub({options})`
##### Parameters
| Name | Type | Description | Default |
|------|------|-------------|---------|
| options.libp2p | `Libp2p` | libp2p instance | required, no default |
| options.debugName | `string` | log namespace | required, no default |
| options.multicodecs | `string \| Array<string>` | protocol identifier(s) | required, no default |
| options.globalSignaturePolicy | `'StrictSign' \| 'StrictNoSign'` | signature policy to be globally applied | `'StrictSign'` |
| options.canRelayMessage | `boolean` | if can relay messages if not subscribed | `false` |
| options.emitSelf | `boolean` | if `publish` should emit to self, if subscribed | `false` |
### Start ### Start
Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`. Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.
@ -204,7 +185,7 @@ Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that ar
### Validate ### Validate
Validates a message according to the signature policy and topic-specific validation function. Validates the signature of a message.
#### `pubsub.validate(message)` #### `pubsub.validate(message)`

View File

@ -1,11 +1,4 @@
export namespace codes { export namespace codes {
export const ERR_INVALID_SIGNATURE_POLICY: string;
export const ERR_UNHANDLED_SIGNATURE_POLICY: string;
export const ERR_MISSING_SIGNATURE: string; export const ERR_MISSING_SIGNATURE: string;
export const ERR_MISSING_SEQNO: string;
export const ERR_INVALID_SIGNATURE: string; export const ERR_INVALID_SIGNATURE: string;
export const ERR_UNEXPECTED_FROM: string;
export const ERR_UNEXPECTED_SIGNATURE: string;
export const ERR_UNEXPECTED_KEY: string;
export const ERR_UNEXPECTED_SEQNO: string;
} }

View File

@ -1,46 +1,6 @@
'use strict' 'use strict'
exports.codes = { exports.codes = {
/**
* Signature policy is invalid
*/
ERR_INVALID_SIGNATURE_POLICY: 'ERR_INVALID_SIGNATURE_POLICY',
/**
* Signature policy is unhandled
*/
ERR_UNHANDLED_SIGNATURE_POLICY: 'ERR_UNHANDLED_SIGNATURE_POLICY',
// Strict signing codes
/**
* Message expected to have a `signature`, but doesn't
*/
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE', ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
/** ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE'
* Message expected to have a `seqno`, but doesn't
*/
ERR_MISSING_SEQNO: 'ERR_MISSING_SEQNO',
/**
* Message `signature` is invalid
*/
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE',
// Strict no-signing codes
/**
* Message expected to not have a `from`, but does
*/
ERR_UNEXPECTED_FROM: 'ERR_UNEXPECTED_FROM',
/**
* Message expected to not have a `signature`, but does
*/
ERR_UNEXPECTED_SIGNATURE: 'ERR_UNEXPECTED_SIGNATURE',
/**
* Message expected to not have a `key`, but does
*/
ERR_UNEXPECTED_KEY: 'ERR_UNEXPECTED_KEY',
/**
* Message expected to not have a `seqno`, but does
*/
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
} }

22
src/pubsub/index.d.ts vendored
View File

@ -22,16 +22,18 @@ declare class PubsubBaseProtocol {
* @param {String} props.debugName log namespace * @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect * @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p * @param {Libp2p} props.libp2p
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] defines how signatures should be handled * @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed * @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed * @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract * @abstract
*/ */
constructor({ debugName, multicodecs, libp2p, globalSignaturePolicy, canRelayMessage, emitSelf }: { constructor({ debugName, multicodecs, libp2p, signMessages, strictSigning, canRelayMessage, emitSelf }: {
debugName: string; debugName: string;
multicodecs: string | string[]; multicodecs: string | string[];
libp2p: any; libp2p: any;
globalSignaturePolicy?: any; signMessages?: boolean;
strictSigning?: boolean;
canRelayMessage?: boolean; canRelayMessage?: boolean;
emitSelf?: boolean; emitSelf?: boolean;
}); });
@ -64,12 +66,12 @@ declare class PubsubBaseProtocol {
* @type {Map<string, import('./peer-streams')>} * @type {Map<string, import('./peer-streams')>}
*/ */
peers: Map<string, import('./peer-streams')>; peers: Map<string, import('./peer-streams')>;
signMessages: boolean;
/** /**
* The signature policy to follow by default * If message signing should be required for incoming messages
* * @type {boolean}
* @type {string}
*/ */
globalSignaturePolicy: string; strictSigning: boolean;
/** /**
* If router can relay received messages, even if not subscribed * If router can relay received messages, even if not subscribed
* @type {boolean} * @type {boolean}
@ -282,7 +284,7 @@ declare class PubsubBaseProtocol {
getTopics(): string[]; getTopics(): string[];
} }
declare namespace PubsubBaseProtocol { declare namespace PubsubBaseProtocol {
export { message, utils, SignaturePolicy, InMessage, PeerId }; export { message, utils, InMessage, PeerId };
} }
type PeerId = import("peer-id"); type PeerId = import("peer-id");
/** /**
@ -303,7 +305,3 @@ type InMessage = {
*/ */
declare const message: typeof import('./message'); declare const message: typeof import('./message');
declare const utils: typeof import("./utils"); declare const utils: typeof import("./utils");
declare const SignaturePolicy: {
StrictSign: string;
StrictNoSign: string;
};

View File

@ -13,7 +13,6 @@ const { codes } = require('./errors')
*/ */
const message = require('./message') const message = require('./message')
const PeerStreams = require('./peer-streams') const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils') const utils = require('./utils')
const { const {
@ -45,7 +44,8 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {String} props.debugName log namespace * @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect * @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p * @param {Libp2p} props.libp2p
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] defines how signatures should be handled * @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed * @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed * @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract * @abstract
@ -54,7 +54,8 @@ class PubsubBaseProtocol extends EventEmitter {
debugName, debugName,
multicodecs, multicodecs,
libp2p, libp2p,
globalSignaturePolicy = SignaturePolicy.StrictSign, signMessages = true,
strictSigning = true,
canRelayMessage = false, canRelayMessage = false,
emitSelf = false emitSelf = false
}) { }) {
@ -108,17 +109,14 @@ class PubsubBaseProtocol extends EventEmitter {
*/ */
this.peers = new Map() this.peers = new Map()
// validate signature policy // Message signing
if (!SignaturePolicy[globalSignaturePolicy]) { this.signMessages = signMessages
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGUATURE_POLICY)
}
/** /**
* The signature policy to follow by default * If message signing should be required for incoming messages
* * @type {boolean}
* @type {string}
*/ */
this.globalSignaturePolicy = globalSignaturePolicy this.strictSigning = strictSigning
/** /**
* If router can relay received messages, even if not subscribed * If router can relay received messages, even if not subscribed
@ -442,15 +440,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Uint8Array} message id as bytes * @returns {Uint8Array} message id as bytes
*/ */
getMsgId (msg) { getMsgId (msg) {
const signaturePolicy = this.globalSignaturePolicy return utils.msgId(msg.from, msg.seqno)
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
return utils.noSignMsgId(msg.data)
default:
throw errcode(new Error('Cannot get message id: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
} }
/** /**
@ -521,36 +511,16 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async validate (message) { // eslint-disable-line require-await async validate (message) { // eslint-disable-line require-await
const signaturePolicy = this.globalSignaturePolicy // If strict signing is on and we have no signature, abort
switch (signaturePolicy) { if (this.strictSigning && !message.signature) {
case SignaturePolicy.StrictNoSign: throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
if (message.from) {
throw errcode(new Error('StrictNoSigning: from should not be present'), codes.ERR_UNEXPECTED_FROM)
}
if (message.signature) {
throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
}
if (message.key) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}
if (message.seqno) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case SignaturePolicy.StrictSign:
if (!message.signature) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}
if (!message.seqno) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
}
if (!(await verifySignature(message))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
// Check the message signature if present
if (message.signature && !(await verifySignature(message))) {
throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
for (const topic of message.topicIDs) { for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic) const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) { if (!validatorFn) {
@ -568,16 +538,11 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<Message>} * @returns {Promise<Message>}
*/ */
_buildMessage (message) { _buildMessage (message) {
const signaturePolicy = this.globalSignaturePolicy const msg = utils.normalizeOutRpcMessage(message)
switch (signaturePolicy) { if (this.signMessages) {
case SignaturePolicy.StrictSign: return signMessage(this.peerId, msg)
message.from = this.peerId.toB58String() } else {
message.seqno = utils.randomSeqno() return message
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
} }
@ -621,11 +586,13 @@ class PubsubBaseProtocol extends EventEmitter {
const from = this.peerId.toB58String() const from = this.peerId.toB58String()
let msgObject = { let msgObject = {
receivedFrom: from, receivedFrom: from,
from: from,
data: message, data: message,
seqno: utils.randomSeqno(),
topicIDs: [topic] topicIDs: [topic]
} }
// ensure that the message follows the signature policy // ensure that any operations performed on the message will include the signature
const outMsg = await this._buildMessage(msgObject) const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg) msgObject = utils.normalizeInRpcMessage(outMsg)
@ -699,4 +666,3 @@ class PubsubBaseProtocol extends EventEmitter {
module.exports = PubsubBaseProtocol module.exports = PubsubBaseProtocol
module.exports.message = message module.exports.message = message
module.exports.utils = utils module.exports.utils = utils
module.exports.SignaturePolicy = SignaturePolicy

View File

@ -1,4 +0,0 @@
export namespace SignaturePolicy {
export const StrictSign: string;
export const StrictNoSign: string;
}

View File

@ -1,28 +0,0 @@
'use strict'
/**
* Enum for Signature Policy
* Details how message signatures are produced/consumed
*/
exports.SignaturePolicy = {
/**
* On the producing side:
* * Build messages with the signature, key (from may be enough for certain inlineable public key types), from and seqno fields.
*
* On the consuming side:
* * Enforce the fields to be present, reject otherwise.
* * Propagate only if the fields are valid and signature can be verified, reject otherwise.
*/
StrictSign: 'StrictSign',
/**
* On the producing side:
* * Build messages without the signature, key, from and seqno fields.
* * The corresponding protobuf key-value pairs are absent from the marshalled message, not just empty.
*
* On the consuming side:
* * Enforce the fields to be absent, reject otherwise.
* * Propagate only if the fields are absent, reject otherwise.
* * A message_id function will not be able to use the above fields, and should instead rely on the data field. A commonplace strategy is to calculate a hash.
*/
StrictNoSign: 'StrictNoSign'
}

View File

@ -76,7 +76,7 @@ module.exports = (common) => {
const defer = pDefer() const defer = pDefer()
const handler = (msg) => { const handler = (msg) => {
expect(msg).to.not.eql(undefined) expect(msg).to.exist()
defer.resolve() defer.resolve()
} }

View File

@ -10,7 +10,6 @@ const uint8ArrayFromString = require('uint8arrays/from-string')
const { utils } = require('..') const { utils } = require('..')
const PeerStreams = require('../peer-streams') const PeerStreams = require('../peer-streams')
const { SignaturePolicy } = require('../signature-policy')
const topic = 'foo' const topic = 'foo'
const data = uint8ArrayFromString('bar') const data = uint8ArrayFromString('bar')
@ -32,17 +31,24 @@ module.exports = (common) => {
}) })
it('should emit normalized signed messages on publish', async () => { it('should emit normalized signed messages on publish', async () => {
pubsub.globalSignaturePolicy = SignaturePolicy.StrictSign
sinon.spy(pubsub, '_emitMessage') sinon.spy(pubsub, '_emitMessage')
sinon.spy(utils, 'randomSeqno')
await pubsub.publish(topic, data) await pubsub.publish(topic, data)
expect(pubsub._emitMessage.callCount).to.eql(1) expect(pubsub._emitMessage.callCount).to.eql(1)
const [messageToEmit] = pubsub._emitMessage.getCall(0).args const [messageToEmit] = pubsub._emitMessage.getCall(0).args
expect(messageToEmit.seqno).to.not.eql(undefined) const expected = utils.normalizeInRpcMessage(
expect(messageToEmit.key).to.not.eql(undefined) await pubsub._buildMessage({
expect(messageToEmit.signature).to.not.eql(undefined) receivedFrom: pubsub.peerId.toB58String(),
from: pubsub.peerId.toB58String(),
data,
seqno: utils.randomSeqno.getCall(0).returnValue,
topicIDs: [topic]
}))
expect(messageToEmit).to.eql(expected)
}) })
it('should drop unsigned messages', async () => { it('should drop unsigned messages', async () => {
@ -77,16 +83,18 @@ module.exports = (common) => {
}) })
it('should not drop unsigned messages if strict signing is disabled', async () => { it('should not drop unsigned messages if strict signing is disabled', async () => {
pubsub.globalSignaturePolicy = SignaturePolicy.StrictNoSign
sinon.spy(pubsub, '_emitMessage') sinon.spy(pubsub, '_emitMessage')
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
sinon.spy(pubsub, 'validate') sinon.spy(pubsub, 'validate')
sinon.stub(pubsub, 'strictSigning').value(false)
const peerStream = new PeerStreams({ id: await PeerId.create() }) const peerStream = new PeerStreams({ id: await PeerId.create() })
const rpc = { const rpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peerStream.id.toBytes(),
data, data,
seqno: utils.randomSeqno(),
topicIDs: [topic] topicIDs: [topic]
}] }]
} }

View File

@ -52,20 +52,26 @@ module.exports = (common) => {
await common.teardown() await common.teardown()
}) })
it('subscribe to the topic on node a', async () => { it('subscribe to the topic on node a', () => {
const topic = 'Z' const topic = 'Z'
const defer = pDefer()
psA.subscribe(topic) psA.subscribe(topic)
expectSet(psA.subscriptions, [topic]) expectSet(psA.subscriptions, [topic])
await new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)) psB.once('pubsub:subscription-change', () => {
expect(psB.peers.size).to.equal(2) expect(psB.peers.size).to.equal(2)
const aPeerId = psA.peerId.toB58String() const aPeerId = psA.peerId.toB58String()
expectSet(psB.topics.get(topic), [aPeerId]) expectSet(psB.topics.get(topic), [aPeerId])
expect(psC.peers.size).to.equal(1) expect(psC.peers.size).to.equal(1)
expect(psC.topics.get(topic)).to.eql(undefined) expect(psC.topics.get(topic)).to.not.exist()
defer.resolve()
})
return defer.promise
}) })
it('subscribe to the topic on node b', async () => { it('subscribe to the topic on node b', async () => {

View File

@ -1,6 +1,5 @@
export function randomSeqno(): Uint8Array; export function randomSeqno(): Uint8Array;
export function msgId(from: string, seqno: Uint8Array): Uint8Array; export function msgId(from: string, seqno: Uint8Array): Uint8Array;
export function noSignMsgId(data: Uint8Array): Uint8Array;
export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean; export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean;
export function ensureArray(maybeArray: any): any[]; export function ensureArray(maybeArray: any): any[];
export function normalizeInRpcMessage(message: any, peerId: string): any; export function normalizeInRpcMessage(message: any, peerId: string): any;

View File

@ -4,7 +4,6 @@ const randomBytes = require('libp2p-crypto/src/random-bytes')
const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const multihash = require('multihashes')
exports = module.exports exports = module.exports
/** /**
@ -33,15 +32,6 @@ exports.msgId = (from, seqno) => {
return msgId return msgId
} }
/**
* Generate a message id, based on message `data`.
*
* @param {Uint8Array} data
* @returns {Uint8Array}
* @private
*/
exports.noSignMsgId = (data) => multihash.encode(data, 'sha2-256')
/** /**
* Check if any member of the first set is also a member * Check if any member of the first set is also a member
* of the second set. * of the second set.

View File

@ -2,17 +2,17 @@
/* eslint max-nested-callbacks: ["error", 8] */ /* eslint max-nested-callbacks: ["error", 8] */
'use strict' 'use strict'
const chai = require('chai')
const expect = chai.expect
chai.use(require('dirty-chai'))
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const pipe = require('it-pipe') const pipe = require('it-pipe')
const { consume } = require('streaming-iterables') const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const abortable = require('abortable-iterator') const abortable = require('abortable-iterator')
const AbortController = require('abort-controller') const AbortController = require('abort-controller')
const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayFromString = require('uint8arrays/from-string')
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
function pause (ms) { function pause (ms) {
return new Promise(resolve => setTimeout(resolve, ms)) return new Promise(resolve => setTimeout(resolve, ms))
} }
@ -38,33 +38,31 @@ module.exports = (common) => {
Muxer = await common.setup() Muxer = await common.setup()
}) })
it('closing underlying socket closes streams (tcp)', async () => { it('closing underlying socket closes streams', async () => {
const mockConn = muxer => ({ const mockConn = muxer => ({
newStream: (...args) => muxer.newStream(...args) newStream: (...args) => muxer.newStream(...args)
}) })
const mockUpgrade = () => maConn => { const mockUpgrade = maConn => {
const muxer = new Muxer(stream => pipe(stream, stream)) const muxer = new Muxer(stream => pipe(stream, stream))
pipe(maConn, muxer, maConn) pipe(maConn, muxer, maConn)
return mockConn(muxer) return mockConn(muxer)
} }
const mockUpgrader = () => ({ const [local, remote] = pair()
upgradeInbound: mockUpgrade(), const controller = new AbortController()
upgradeOutbound: mockUpgrade() const abortableRemote = abortable.duplex(remote, controller.signal, {
returnOnAbort: true
}) })
const tcp = new Tcp({ upgrader: mockUpgrader() }) mockUpgrade(abortableRemote)
const tcpListener = tcp.createListener() const dialerConn = mockUpgrade(local)
await tcpListener.listen(mh)
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
const s1 = await dialerConn.newStream() const s1 = await dialerConn.newStream()
const s2 = await dialerConn.newStream() const s2 = await dialerConn.newStream()
// close the listener in a bit // close the remote in a bit
setTimeout(() => tcpListener.close(), 50) setTimeout(() => controller.abort(), 50)
const s1Result = pipe(infiniteRandom, s1, consume) const s1Result = pipe(infiniteRandom, s1, consume)
const s2Result = pipe(infiniteRandom, s2, consume) const s2Result = pipe(infiniteRandom, s2, consume)
@ -115,5 +113,69 @@ module.exports = (common) => {
// These should now all resolve without error // These should now all resolve without error
await Promise.all(streamResults) await Promise.all(streamResults)
}) })
it('can close a stream for writing', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]
const listener = new Muxer(async stream => {
// Immediate close for write
await stream.closeWrite()
const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)
try {
await stream.sink([randomBuffer()])
} catch (err) {
expect(err).to.exist()
return done()
}
expect.fail('should not support writing to closed writer')
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
stream.sink(data)
})
it('can close a stream for reading', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]
const listener = new Muxer(async stream => {
const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)
done()
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
stream.closeRead()
// Source should be done
;(async () => {
expect(await stream.source.next()).to.eql({ done: true })
stream.sink(data)
})()
})
}) })
} }

View File

@ -5,7 +5,7 @@ const { expect } = require('aegir/utils/chai')
const sinon = require('sinon') const sinon = require('sinon')
const PubsubBaseImpl = require('../../src/pubsub') const PubsubBaseImpl = require('../../src/pubsub')
const { SignaturePolicy } = require('../../src/pubsub/signature-policy') const { randomSeqno } = require('../../src/pubsub/utils')
const { const {
createPeerId, createPeerId,
mockRegistrar mockRegistrar
@ -34,7 +34,9 @@ describe('pubsub base messages', () => {
it('_buildMessage normalizes and signs messages', async () => { it('_buildMessage normalizes and signs messages', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
@ -42,46 +44,27 @@ describe('pubsub base messages', () => {
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => { it('validate with strict signing off will validate a present signature', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictSign) sinon.stub(pubsub, 'strictSigning').value(false)
const signedMessage = await pubsub._buildMessage(message)
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.from
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.signature
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.key
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.seqno
await expect(pubsub.validate(signedMessage)).to.not.be.rejected()
})
it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => {
const message = {
receivedFrom: peerId.id,
data: 'hello',
topicIDs: ['test-topic']
}
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
const signedMessage = await pubsub._buildMessage(message) const signedMessage = await pubsub._buildMessage(message)
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with StrictSign requires a signature', async () => { it('validate with strict signing requires a signature', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }

View File

@ -10,8 +10,8 @@ const PeerId = require('peer-id')
const uint8ArrayEquals = require('uint8arrays/equals') const uint8ArrayEquals = require('uint8arrays/equals')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const { utils } = require('../../src/pubsub')
const PeerStreams = require('../../src/pubsub/peer-streams') const PeerStreams = require('../../src/pubsub/peer-streams')
const { SignaturePolicy } = require('../../src/pubsub/signature-policy')
const { const {
createPeerId, createPeerId,
@ -30,8 +30,6 @@ describe('topic validators', () => {
pubsub = new PubsubImplementation(protocol, { pubsub = new PubsubImplementation(protocol, {
peerId: peerId, peerId: peerId,
registrar: mockRegistrar registrar: mockRegistrar
}, {
globalSignaturePolicy: SignaturePolicy.StrictNoSign
}) })
pubsub.start() pubsub.start()
@ -44,6 +42,8 @@ describe('topic validators', () => {
it('should filter messages by topic validator', async () => { it('should filter messages by topic validator', async () => {
// use _publish.callCount() to see if a message is valid or not // use _publish.callCount() to see if a message is valid or not
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
// Disable strict signing
sinon.stub(pubsub, 'strictSigning').value(false)
sinon.stub(pubsub.peers, 'get').returns({}) sinon.stub(pubsub.peers, 'get').returns({})
const filteredTopic = 't' const filteredTopic = 't'
const peer = new PeerStreams({ id: await PeerId.create() }) const peer = new PeerStreams({ id: await PeerId.create() })
@ -59,7 +59,9 @@ describe('topic validators', () => {
const validRpc = { const validRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a message'), data: uint8ArrayFromString('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -74,7 +76,9 @@ describe('topic validators', () => {
const invalidRpc = { const invalidRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -90,7 +94,9 @@ describe('topic validators', () => {
const invalidRpc2 = { const invalidRpc2 = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toB58String(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }