Compare commits

...

12 Commits

Author SHA1 Message Date
b75f2cab48 chore: release version v0.7.1 2020-11-03 22:43:57 +01:00
8512997e76 chore: update contributors 2020-11-03 22:43:57 +01:00
269a6f5e0a fix: typescript types (#69) 2020-11-03 22:35:18 +01:00
14d09970ca chore: release version v0.7.0 2020-11-03 18:26:50 +01:00
c98c58e824 chore: update contributors 2020-11-03 18:26:49 +01:00
946b046440 feat: pubsub: add global signature policy (#66)
BREAKING CHANGE:
`signMessages` and `strictSigning` pubsub configuration options replaced
with a `globalSignaturePolicy` option
2020-11-03 18:22:03 +01:00
d168c7d531 chore: release version v0.6.0 2020-10-05 16:40:42 +02:00
349c1174db chore: update contributors 2020-10-05 16:40:41 +02:00
e14844315b feat: update pubsub getMsgId return type to Uint8Array (#65)
BREAKING CHANGE:
new getMsgId return type is not backwards compatible with prior `string`
return type.
2020-10-05 16:36:29 +02:00
ff3bd10704 chore: release version v0.5.2 2020-09-30 11:27:32 +02:00
4ecd7d3c83 chore: update contributors 2020-09-30 11:27:31 +02:00
eacdc246da chore: add type generation from jsdoc (#64) 2020-09-30 11:21:11 +02:00
35 changed files with 1151 additions and 86 deletions

View File

@ -1,3 +1,79 @@
<a name="0.7.1"></a>
## [0.7.1](https://github.com/libp2p/js-interfaces/compare/v0.7.0...v0.7.1) (2020-11-03)
### Bug Fixes
* typescript types ([#69](https://github.com/libp2p/js-interfaces/issues/69)) ([269a6f5](https://github.com/libp2p/js-interfaces/commit/269a6f5))
<a name="0.7.0"></a>
# [0.7.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.7.0) (2020-11-03)
### Features
* pubsub: add global signature policy ([#66](https://github.com/libp2p/js-interfaces/issues/66)) ([946b046](https://github.com/libp2p/js-interfaces/commit/946b046))
* update pubsub getMsgId return type to Uint8Array ([#65](https://github.com/libp2p/js-interfaces/issues/65)) ([e148443](https://github.com/libp2p/js-interfaces/commit/e148443))
### BREAKING CHANGES
* `signMessages` and `strictSigning` pubsub configuration options replaced
with a `globalSignaturePolicy` option
* new getMsgId return type is not backwards compatible with prior `string`
return type.
<a name="0.6.0"></a>
# [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05)
### Features
* update pubsub getMsgId return type to Uint8Array ([#65](https://github.com/libp2p/js-interfaces/issues/65)) ([e148443](https://github.com/libp2p/js-interfaces/commit/e148443))
### BREAKING CHANGES
* new getMsgId return type is not backwards compatible with prior `string`
return type.
<a name="0.5.2"></a>
## [0.5.2](https://github.com/libp2p/js-interfaces/compare/v0.3.1...v0.5.2) (2020-09-30)
### Bug Fixes
* replace remaining Buffer usage with Uint8Array ([#62](https://github.com/libp2p/js-interfaces/issues/62)) ([4130e7f](https://github.com/libp2p/js-interfaces/commit/4130e7f))
### Chores
* update deps ([#57](https://github.com/libp2p/js-interfaces/issues/57)) ([75f6777](https://github.com/libp2p/js-interfaces/commit/75f6777))
### Features
* interface pubsub ([#60](https://github.com/libp2p/js-interfaces/issues/60)) ([ba15a48](https://github.com/libp2p/js-interfaces/commit/ba15a48))
* record interface ([#52](https://github.com/libp2p/js-interfaces/issues/52)) ([1cc943e](https://github.com/libp2p/js-interfaces/commit/1cc943e))
### BREAKING CHANGES
* records now marshal as Uint8Array instead of Buffer
* fix: refactor remaining Buffer usage to Uint8Array
* - The peer id dep of this module has replaced node Buffers with Uint8Arrays
* chore: update gh deps
<a name="0.5.1"></a> <a name="0.5.1"></a>
## [0.5.1](https://github.com/libp2p/js-interfaces/compare/v0.5.0...v0.5.1) (2020-08-25) ## [0.5.1](https://github.com/libp2p/js-interfaces/compare/v0.5.0...v0.5.1) (2020-08-25)

View File

@ -1,19 +1,23 @@
{ {
"name": "libp2p-interfaces", "name": "libp2p-interfaces",
"version": "0.5.1", "version": "0.7.1",
"description": "Interfaces for JS Libp2p", "description": "Interfaces for JS Libp2p",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>", "leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js", "main": "src/index.js",
"files": [ "files": [
"src", "src",
"types",
"dist" "dist"
], ],
"scripts": { "scripts": {
"lint": "aegir lint", "lint": "aegir lint",
"build": "aegir build", "build": "aegir build",
"pregenerate:types": "rimraf './src/**/*.d.ts'",
"generate:types": "tsc",
"test": "aegir test", "test": "aegir test",
"test:node": "aegir test --target node", "test:node": "aegir test --target node",
"test:browser": "aegir test --target browser", "test:browser": "aegir test --target browser",
"prepublishOnly": "npm run generate:types",
"release": "aegir release -t node -t browser", "release": "aegir release -t node -t browser",
"release-minor": "aegir release --type minor -t node -t browser", "release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser" "release-major": "aegir release --type major -t node -t browser"
@ -52,6 +56,7 @@
"libp2p-tcp": "^0.15.0", "libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0", "multiaddr": "^8.0.0",
"multibase": "^3.0.0", "multibase": "^3.0.0",
"multihashes": "^3.0.1",
"p-defer": "^3.0.0", "p-defer": "^3.0.0",
"p-limit": "^2.3.0", "p-limit": "^2.3.0",
"p-wait-for": "^3.1.0", "p-wait-for": "^3.1.0",
@ -63,7 +68,9 @@
}, },
"devDependencies": { "devDependencies": {
"aegir": "^25.0.0", "aegir": "^25.0.0",
"it-handshake": "^1.0.1" "it-handshake": "^1.0.1",
"rimraf": "^3.0.2",
"typescript": "3.7.5"
}, },
"contributors": [ "contributors": [
"Alan Shaw <alan.shaw@protocol.ai>", "Alan Shaw <alan.shaw@protocol.ai>",

149
src/connection/connection.d.ts vendored Normal file
View File

@ -0,0 +1,149 @@
declare const _exports: typeof Connection;
export = _exports;
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
declare class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
* @param {string} properties.stat.timeline.open connection opening timestamp.
* @param {string} properties.stat.timeline.upgraded connection upgraded timestamp.
* @param {string} [properties.stat.multiplexer] connection multiplexing identifier.
* @param {string} [properties.stat.encryption] connection encryption method identifier.
*/
constructor({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }: {
localAddr?: import("multiaddr");
remoteAddr?: import("multiaddr");
localPeer: import("peer-id");
remotePeer: import("peer-id");
newStream: Function;
close: Function;
getStreams: () => any[];
stat: {
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
});
/**
* Connection identifier.
*/
id: any;
/**
* Observed multiaddr of the local peer
*/
localAddr: import("multiaddr");
/**
* Observed multiaddr of the remote peer
*/
remoteAddr: import("multiaddr");
/**
* Local peer id.
*/
localPeer: import("peer-id");
/**
* Remote peer id.
*/
remotePeer: import("peer-id");
/**
* Connection metadata.
*/
_stat: {
status: string;
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
/**
* Reference to the new stream function of the multiplexer
*/
_newStream: Function;
/**
* Reference to the close function of the raw connection
*/
_close: Function;
/**
* Reference to the getStreams function of the muxer
*/
_getStreams: () => any[];
/**
* Connection streams registry
*/
registry: Map<any, any>;
/**
* User provided tags
* @type {string[]}
*/
tags: string[];
/**
* Get connection metadata
* @this {Connection}
*/
get stat(): {
status: string;
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
/**
* Get all the streams of the muxer.
* @this {Connection}
*/
get streams(): any[];
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
newStream(protocols: string[]): Promise<{
stream: any;
protocol: string;
}>;
/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
* @return {void}
*/
addStream(muxedStream: any, { protocol, metadata }: {
protocol: string;
metadata: any;
}): void;
/**
* Remove stream registry after it is closed.
* @param {string} id identifier of the stream
*/
removeStream(id: string): void;
/**
* Close the connection.
* @return {Promise<void>}
*/
close(): Promise<void>;
_closing: any;
}

View File

@ -66,7 +66,7 @@ class Connection {
* @param {PeerId} properties.remotePeer remote peer-id. * @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function. * @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function. * @param {function} properties.close close raw connection function.
* @param {function} properties.getStreams get streams from muxer function. * @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection. * @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound"). * @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp. * @param {object} properties.stat.timeline connection relevant events timestamp.
@ -133,13 +133,14 @@ class Connection {
/** /**
* User provided tags * User provided tags
* @type {string[]}
*/ */
this.tags = [] this.tags = []
} }
/** /**
* Get connection metadata * Get connection metadata
* @return {Object} * @this {Connection}
*/ */
get stat () { get stat () {
return this._stat return this._stat
@ -147,7 +148,7 @@ class Connection {
/** /**
* Get all the streams of the muxer. * Get all the streams of the muxer.
* @return {Array<*>} * @this {Connection}
*/ */
get streams () { get streams () {
return this._getStreams() return this._getStreams()
@ -156,7 +157,7 @@ class Connection {
/** /**
* Create a new stream from this connection * Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream * @param {string[]} protocols intended protocol for the stream
* @return {Promise<object>} with muxed+multistream-selected stream and selected protocol * @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/ */
async newStream (protocols) { async newStream (protocols) {
if (this.stat.status === Status.CLOSING) { if (this.stat.status === Status.CLOSING) {
@ -205,7 +206,7 @@ class Connection {
/** /**
* Close the connection. * Close the connection.
* @return {Promise} * @return {Promise<void>}
*/ */
async close () { async close () {
if (this.stat.status === Status.CLOSED) { if (this.stat.status === Status.CLOSED) {
@ -226,4 +227,8 @@ class Connection {
} }
} }
/**
* @module
* @type {typeof Connection}
*/
module.exports = withIs(Connection, { className: 'Connection', symbolName: '@libp2p/interface-connection/connection' }) module.exports = withIs(Connection, { className: 'Connection', symbolName: '@libp2p/interface-connection/connection' })

1
src/connection/index.d.ts vendored Normal file
View File

@ -0,0 +1 @@
export var Connection: typeof import('./connection');

View File

@ -1,3 +1,7 @@
'use strict' 'use strict'
/**
* @module connection/index
* @type {typeof import('./connection')}
*/
exports.Connection = require('./connection') exports.Connection = require('./connection')

3
src/connection/status.d.ts vendored Normal file
View File

@ -0,0 +1,3 @@
export declare const OPEN: string;
export declare const CLOSING: string;
export declare const CLOSED: string;

15
src/crypto/errors.d.ts vendored Normal file
View File

@ -0,0 +1,15 @@
export class UnexpectedPeerError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}
export class InvalidCryptoExchangeError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}
export class InvalidCryptoTransmissionError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}

0
src/index.d.ts vendored Normal file
View File

View File

@ -11,6 +11,9 @@ Table of Contents
* [Extend interface](#extend-interface) * [Extend interface](#extend-interface)
* [Example](#example) * [Example](#example)
* [API](#api) * [API](#api)
* [Constructor](#constructor)
* [new Pubsub(options)](#new-pubsuboptions)
* [Parameters](#parameters)
* [Start](#start) * [Start](#start)
* [pubsub.start()](#pubsubstart) * [pubsub.start()](#pubsubstart)
* [Returns](#returns) * [Returns](#returns)
@ -19,24 +22,24 @@ Table of Contents
* [Returns](#returns-1) * [Returns](#returns-1)
* [Publish](#publish) * [Publish](#publish)
* [pubsub.publish(topics, message)](#pubsubpublishtopics-message) * [pubsub.publish(topics, message)](#pubsubpublishtopics-message)
* [Parameters](#parameters) * [Parameters](#parameters-1)
* [Returns](#returns-2) * [Returns](#returns-2)
* [Subscribe](#subscribe) * [Subscribe](#subscribe)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic) * [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-1) * [Parameters](#parameters-2)
* [Unsubscribe](#unsubscribe) * [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic) * [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-2) * [Parameters](#parameters-3)
* [Get Topics](#get-topics) * [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics) * [pubsub.getTopics()](#pubsubgettopics)
* [Returns](#returns-3) * [Returns](#returns-3)
* [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic) * [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic)
* [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic) * [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic)
* [Parameters](#parameters-3) * [Parameters](#parameters-4)
* [Returns](#returns-4) * [Returns](#returns-4)
* [Validate](#validate) * [Validate](#validate)
* [pubsub.validate(message)](#pubsubvalidatemessage) * [pubsub.validate(message)](#pubsubvalidatemessage)
* [Parameters](#parameters-4) * [Parameters](#parameters-5)
* [Returns](#returns-5) * [Returns](#returns-5)
* [Test suite usage](#test-suite-usage) * [Test suite usage](#test-suite-usage)
@ -49,7 +52,7 @@ You can check the following implementations as examples for building your own pu
## Interface usage ## Interface usage
`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it. `interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management and the features describe in the libp2p [pubsub specs](https://github.com/libp2p/specs/tree/master/pubsub). This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.
### Extend interface ### Extend interface
@ -74,7 +77,7 @@ All the remaining functions **MUST NOT** be overwritten.
The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.
```JavaScript ```JavaScript
const Pubsub = require('libp2p-pubsub') const Pubsub = require('libp2p-interfaces/src/pubsub')
class PubsubImplementation extends Pubsub { class PubsubImplementation extends Pubsub {
constructor({ libp2p, options }) constructor({ libp2p, options })
@ -82,8 +85,7 @@ class PubsubImplementation extends Pubsub {
debugName: 'libp2p:pubsub', debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0', multicodecs: '/pubsub-implementation/1.0.0',
libp2p, libp2p,
signMessages: options.signMessages, globalSigningPolicy: options.globalSigningPolicy
strictSigning: options.strictSigning
}) })
} }
@ -98,6 +100,23 @@ class PubsubImplementation extends Pubsub {
The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message. The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message.
### Constructor
The base class constructor configures the pubsub instance for use with a libp2p instance. It includes settings for logging, signature policies, etc.
#### `new Pubsub({options})`
##### Parameters
| Name | Type | Description | Default |
|------|------|-------------|---------|
| options.libp2p | `Libp2p` | libp2p instance | required, no default |
| options.debugName | `string` | log namespace | required, no default |
| options.multicodecs | `string \| Array<string>` | protocol identifier(s) | required, no default |
| options.globalSignaturePolicy | `'StrictSign' \| 'StrictNoSign'` | signature policy to be globally applied | `'StrictSign'` |
| options.canRelayMessage | `boolean` | if can relay messages if not subscribed | `false` |
| options.emitSelf | `boolean` | if `publish` should emit to self, if subscribed | `false` |
### Start ### Start
Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`. Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.
@ -185,7 +204,7 @@ Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that ar
### Validate ### Validate
Validates the signature of a message. Validates a message according to the signature policy and topic-specific validation function.
#### `pubsub.validate(message)` #### `pubsub.validate(message)`

11
src/pubsub/errors.d.ts vendored Normal file
View File

@ -0,0 +1,11 @@
export namespace codes {
export const ERR_INVALID_SIGNATURE_POLICY: string;
export const ERR_UNHANDLED_SIGNATURE_POLICY: string;
export const ERR_MISSING_SIGNATURE: string;
export const ERR_MISSING_SEQNO: string;
export const ERR_INVALID_SIGNATURE: string;
export const ERR_UNEXPECTED_FROM: string;
export const ERR_UNEXPECTED_SIGNATURE: string;
export const ERR_UNEXPECTED_KEY: string;
export const ERR_UNEXPECTED_SEQNO: string;
}

View File

@ -1,6 +1,46 @@
'use strict' 'use strict'
exports.codes = { exports.codes = {
/**
* Signature policy is invalid
*/
ERR_INVALID_SIGNATURE_POLICY: 'ERR_INVALID_SIGNATURE_POLICY',
/**
* Signature policy is unhandled
*/
ERR_UNHANDLED_SIGNATURE_POLICY: 'ERR_UNHANDLED_SIGNATURE_POLICY',
// Strict signing codes
/**
* Message expected to have a `signature`, but doesn't
*/
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE', ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE' /**
* Message expected to have a `seqno`, but doesn't
*/
ERR_MISSING_SEQNO: 'ERR_MISSING_SEQNO',
/**
* Message `signature` is invalid
*/
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE',
// Strict no-signing codes
/**
* Message expected to not have a `from`, but does
*/
ERR_UNEXPECTED_FROM: 'ERR_UNEXPECTED_FROM',
/**
* Message expected to not have a `signature`, but does
*/
ERR_UNEXPECTED_SIGNATURE: 'ERR_UNEXPECTED_SIGNATURE',
/**
* Message expected to not have a `key`, but does
*/
ERR_UNEXPECTED_KEY: 'ERR_UNEXPECTED_KEY',
/**
* Message expected to not have a `seqno`, but does
*/
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
} }

309
src/pubsub/index.d.ts vendored Normal file
View File

@ -0,0 +1,309 @@
export = PubsubBaseProtocol;
/**
* @typedef {Object} InMessage
* @property {string} [from]
* @property {string} receivedFrom
* @property {string[]} topicIDs
* @property {Uint8Array} [seqno]
* @property {Uint8Array} data
* @property {Uint8Array} [signature]
* @property {Uint8Array} [key]
*
* @typedef PeerId
* @type import('peer-id')
*/
/**
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
declare class PubsubBaseProtocol {
/**
* @param {Object} props
* @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] defines how signatures should be handled
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract
*/
constructor({ debugName, multicodecs, libp2p, globalSignaturePolicy, canRelayMessage, emitSelf }: {
debugName: string;
multicodecs: string | string[];
libp2p: any;
globalSignaturePolicy?: any;
canRelayMessage?: boolean;
emitSelf?: boolean;
});
log: any;
/**
* @type {Array<string>}
*/
multicodecs: Array<string>;
_libp2p: any;
registrar: any;
/**
* @type {PeerId}
*/
peerId: PeerId;
started: boolean;
/**
* Map of topics to which peers are subscribed to
*
* @type {Map<string, Set<string>>}
*/
topics: Map<string, Set<string>>;
/**
* List of our subscriptions
* @type {Set<string>}
*/
subscriptions: Set<string>;
/**
* Map of peer streams
*
* @type {Map<string, import('./peer-streams')>}
*/
peers: Map<string, import('./peer-streams')>;
/**
* The signature policy to follow by default
*
* @type {string}
*/
globalSignaturePolicy: string;
/**
* If router can relay received messages, even if not subscribed
* @type {boolean}
*/
canRelayMessage: boolean;
/**
* if publish should emit to self, if subscribed
* @type {boolean}
*/
emitSelf: boolean;
/**
* Topic validator function
* @typedef {function(string, InMessage): Promise<void>} validator
*/
/**
* Topic validator map
*
* Keyed by topic
* Topic validators are functions with the following input:
* @type {Map<string, validator>}
*/
topicValidators: Map<string, validator>;
_registrarId: any;
/**
* On an inbound stream opened.
* @private
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexIterableStream} props.stream
* @param {Connection} props.connection connection
*/
_onIncomingStream({ protocol, stream, connection }: {
protocol: string;
stream: any;
connection: any;
}): void;
/**
* Registrar notifies an established connection with pubsub protocol.
* @private
* @param {PeerId} peerId remote peer-id
* @param {Connection} conn connection to the peer
*/
_onPeerConnected(peerId: import("peer-id"), conn: any): Promise<void>;
/**
* Registrar notifies a closing connection with pubsub protocol.
* @private
* @param {PeerId} peerId peerId
* @param {Error} err error for connection end
*/
_onPeerDisconnected(peerId: import("peer-id"), err: Error): void;
/**
* Register the pubsub protocol onto the libp2p node.
* @returns {void}
*/
start(): void;
/**
* Unregister the pubsub protocol and the streams with other peers will be closed.
* @returns {void}
*/
stop(): void;
/**
* Notifies the router that a peer has been connected
* @private
* @param {PeerId} peerId
* @param {string} protocol
* @returns {PeerStreams}
*/
_addPeer(peerId: import("peer-id"), protocol: string): import("./peer-streams");
/**
* Notifies the router that a peer has been disconnected.
* @private
* @param {PeerId} peerId
* @returns {PeerStreams | undefined}
*/
_removePeer(peerId: import("peer-id")): import("./peer-streams");
/**
* Responsible for processing each RPC message received by other peers.
* @param {string} idB58Str peer id string in base58
* @param {DuplexIterableStream} stream inbound stream
* @param {PeerStreams} peerStreams PubSub peer
* @returns {Promise<void>}
*/
_processMessages(idB58Str: string, stream: any, peerStreams: import("./peer-streams")): Promise<void>;
/**
* Handles an rpc request from a peer
* @param {String} idB58Str
* @param {PeerStreams} peerStreams
* @param {RPC} rpc
* @returns {boolean}
*/
_processRpc(idB58Str: string, peerStreams: import("./peer-streams"), rpc: any): boolean;
/**
* Handles a subscription change from a peer
* @param {string} id
* @param {RPC.SubOpt} subOpt
*/
_processRpcSubOpt(id: string, subOpt: any): void;
/**
* Handles an message from a peer
* @param {InMessage} msg
* @returns {Promise<void>}
*/
_processRpcMessage(msg: InMessage): Promise<void>;
/**
* Emit a message from a peer
* @param {InMessage} message
*/
_emitMessage(message: InMessage): void;
/**
* The default msgID implementation
* Child class can override this.
* @param {RPC.Message} msg the message object
* @returns {Uint8Array} message id as bytes
*/
getMsgId(msg: any): Uint8Array;
/**
* Whether to accept a message from a peer
* Override to create a graylist
* @override
* @param {string} id
* @returns {boolean}
*/
_acceptFrom(id: string): boolean;
/**
* Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf.
* @param {Uint8Array} bytes
* @returns {RPC}
*/
_decodeRpc(bytes: Uint8Array): any;
/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
* @param {RPC} rpc
* @returns {Uint8Array}
*/
_encodeRpc(rpc: any): Uint8Array;
/**
* Send an rpc object to a peer
* @param {string} id peer id
* @param {RPC} rpc
* @returns {void}
*/
_sendRpc(id: string, rpc: any): void;
/**
* Send subscroptions to a peer
* @param {string} id peer id
* @param {string[]} topics
* @param {boolean} subscribe set to false for unsubscriptions
* @returns {void}
*/
_sendSubscriptions(id: string, topics: string[], subscribe: boolean): void;
/**
* Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages
* @param {InMessage} message
* @returns {Promise<void>}
*/
validate(message: InMessage): Promise<void>;
/**
* Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send.
* @private
* @param {Message} message
* @returns {Promise<Message>}
*/
_buildMessage(message: any): Promise<any>;
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getSubscribers(topic: string): string[];
/**
* Publishes messages to all subscribed peers
* @override
* @param {string} topic
* @param {Buffer} message
* @returns {Promise<void>}
*/
publish(topic: string, message: Buffer): Promise<void>;
/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {InMessage} message
* @returns {Promise<void>}
*
*/
_publish(message: InMessage): Promise<void>;
/**
* Subscribes to a given topic.
* @abstract
* @param {string} topic
* @returns {void}
*/
subscribe(topic: string): void;
/**
* Unsubscribe from the given topic.
* @override
* @param {string} topic
* @returns {void}
*/
unsubscribe(topic: string): void;
/**
* Get the list of topics which the peer is subscribed to.
* @override
* @returns {Array<String>}
*/
getTopics(): string[];
}
declare namespace PubsubBaseProtocol {
export { message, utils, SignaturePolicy, InMessage, PeerId };
}
type PeerId = import("peer-id");
/**
* Topic validator function
*/
type validator = (arg0: string, arg1: InMessage) => Promise<void>;
type InMessage = {
from?: string;
receivedFrom: string;
topicIDs: string[];
seqno?: Uint8Array;
data: Uint8Array;
signature?: Uint8Array;
key?: Uint8Array;
};
/**
* @type {typeof import('./message')}
*/
declare const message: typeof import('./message');
declare const utils: typeof import("./utils");
declare const SignaturePolicy: {
StrictSign: string;
StrictNoSign: string;
};

View File

@ -8,9 +8,14 @@ const pipe = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology') const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors') const { codes } = require('./errors')
/**
* @type {typeof import('./message')}
*/
const message = require('./message') const message = require('./message')
const PeerStreams = require('./peer-streams') const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils') const utils = require('./utils')
const { const {
signMessage, signMessage,
verifySignature verifySignature
@ -18,12 +23,16 @@ const {
/** /**
* @typedef {Object} InMessage * @typedef {Object} InMessage
* @property {string} from * @property {string} [from]
* @property {string} receivedFrom * @property {string} receivedFrom
* @property {string[]} topicIDs * @property {string[]} topicIDs
* @property {Uint8Array} [seqno]
* @property {Uint8Array} data * @property {Uint8Array} data
* @property {Uint8Array} [signature] * @property {Uint8Array} [signature]
* @property {Uint8Array} [key] * @property {Uint8Array} [key]
*
* @typedef PeerId
* @type import('peer-id')
*/ */
/** /**
@ -36,8 +45,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {String} props.debugName log namespace * @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect * @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p * @param {Libp2p} props.libp2p
* @param {boolean} [props.signMessages = true] if messages should be signed * @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] defines how signatures should be handled
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed * @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed * @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract * @abstract
@ -46,8 +54,7 @@ class PubsubBaseProtocol extends EventEmitter {
debugName, debugName,
multicodecs, multicodecs,
libp2p, libp2p,
signMessages = true, globalSignaturePolicy = SignaturePolicy.StrictSign,
strictSigning = true,
canRelayMessage = false, canRelayMessage = false,
emitSelf = false emitSelf = false
}) { }) {
@ -68,9 +75,15 @@ class PubsubBaseProtocol extends EventEmitter {
this.log = debug(debugName) this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`) this.log.err = debug(`${debugName}:error`)
/**
* @type {Array<string>}
*/
this.multicodecs = utils.ensureArray(multicodecs) this.multicodecs = utils.ensureArray(multicodecs)
this._libp2p = libp2p this._libp2p = libp2p
this.registrar = libp2p.registrar this.registrar = libp2p.registrar
/**
* @type {PeerId}
*/
this.peerId = libp2p.peerId this.peerId = libp2p.peerId
this.started = false this.started = false
@ -91,18 +104,21 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Map of peer streams * Map of peer streams
* *
* @type {Map<string, PeerStreams>} * @type {Map<string, import('./peer-streams')>}
*/ */
this.peers = new Map() this.peers = new Map()
// Message signing // validate signature policy
this.signMessages = signMessages if (!SignaturePolicy[globalSignaturePolicy]) {
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGUATURE_POLICY)
}
/** /**
* If message signing should be required for incoming messages * The signature policy to follow by default
* @type {boolean} *
* @type {string}
*/ */
this.strictSigning = strictSigning this.globalSignaturePolicy = globalSignaturePolicy
/** /**
* If router can relay received messages, even if not subscribed * If router can relay received messages, even if not subscribed
@ -118,7 +134,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Topic validator function * Topic validator function
* @typedef {function(string, RPC): boolean} validator * @typedef {function(string, InMessage): Promise<void>} validator
*/ */
/** /**
* Topic validator map * Topic validator map
@ -423,10 +439,18 @@ class PubsubBaseProtocol extends EventEmitter {
* The default msgID implementation * The default msgID implementation
* Child class can override this. * Child class can override this.
* @param {RPC.Message} msg the message object * @param {RPC.Message} msg the message object
* @returns {string} message id as string * @returns {Uint8Array} message id as bytes
*/ */
getMsgId (msg) { getMsgId (msg) {
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
return utils.msgId(msg.from, msg.seqno) return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
return utils.noSignMsgId(msg.data)
default:
throw errcode(new Error('Cannot get message id: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
} }
/** /**
@ -497,16 +521,36 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async validate (message) { // eslint-disable-line require-await async validate (message) { // eslint-disable-line require-await
// If strict signing is on and we have no signature, abort const signaturePolicy = this.globalSignaturePolicy
if (this.strictSigning && !message.signature) { switch (signaturePolicy) {
throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE) case SignaturePolicy.StrictNoSign:
if (message.from) {
throw errcode(new Error('StrictNoSigning: from should not be present'), codes.ERR_UNEXPECTED_FROM)
} }
if (message.signature) {
// Check the message signature if present throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
if (message.signature && !(await verifySignature(message))) { }
throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE) if (message.key) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}
if (message.seqno) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case SignaturePolicy.StrictSign:
if (!message.signature) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}
if (!message.seqno) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
}
if (!(await verifySignature(message))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
for (const topic of message.topicIDs) { for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic) const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) { if (!validatorFn) {
@ -524,11 +568,16 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<Message>} * @returns {Promise<Message>}
*/ */
_buildMessage (message) { _buildMessage (message) {
const msg = utils.normalizeOutRpcMessage(message) const signaturePolicy = this.globalSignaturePolicy
if (this.signMessages) { switch (signaturePolicy) {
return signMessage(this.peerId, msg) case SignaturePolicy.StrictSign:
} else { message.from = this.peerId.toB58String()
message.seqno = utils.randomSeqno()
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message return message
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
} }
@ -572,13 +621,11 @@ class PubsubBaseProtocol extends EventEmitter {
const from = this.peerId.toB58String() const from = this.peerId.toB58String()
let msgObject = { let msgObject = {
receivedFrom: from, receivedFrom: from,
from: from,
data: message, data: message,
seqno: utils.randomSeqno(),
topicIDs: [topic] topicIDs: [topic]
} }
// ensure that any operations performed on the message will include the signature // ensure that the message follows the signature policy
const outMsg = await this._buildMessage(msgObject) const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg) msgObject = utils.normalizeInRpcMessage(outMsg)
@ -652,3 +699,4 @@ class PubsubBaseProtocol extends EventEmitter {
module.exports = PubsubBaseProtocol module.exports = PubsubBaseProtocol
module.exports.message = message module.exports.message = message
module.exports.utils = utils module.exports.utils = utils
module.exports.SignaturePolicy = SignaturePolicy

5
src/pubsub/message/index.d.ts vendored Normal file
View File

@ -0,0 +1,5 @@
export var rpc: any;
export var td: any;
export var RPC: any;
export var Message: any;
export var SubOpts: any;

View File

@ -6,6 +6,9 @@ const rpcProto = protons(require('./rpc.proto.js'))
const RPC = rpcProto.RPC const RPC = rpcProto.RPC
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))
/**
* @module pubsub/message/index
*/
exports = module.exports exports = module.exports
exports.rpc = rpcProto exports.rpc = rpcProto
exports.td = topicDescriptorProto exports.td = topicDescriptorProto

2
src/pubsub/message/rpc.proto.d.ts vendored Normal file
View File

@ -0,0 +1,2 @@
declare const _exports: string;
export = _exports;

23
src/pubsub/message/sign.d.ts vendored Normal file
View File

@ -0,0 +1,23 @@
/**
* Returns the PublicKey associated with the given message.
* If no, valid PublicKey can be retrieved an error will be returned.
*
* @param {InMessage} message
* @returns {Promise<PublicKey>}
*/
export function messagePublicKey(message: any): Promise<any>;
/**
* Signs the provided message with the given `peerId`
*
* @param {PeerId} peerId
* @param {Message} message
* @returns {Promise<Message>}
*/
export function signMessage(peerId: import("peer-id"), message: any): Promise<any>;
export const SignPrefix: any;
/**
* Verifies the signature of the given message
* @param {InMessage} message
* @returns {Promise<Boolean>}
*/
export function verifySignature(message: any): Promise<boolean>;

View File

@ -0,0 +1,2 @@
declare const _exports: string;
export = _exports;

113
src/pubsub/peer-streams.d.ts vendored Normal file
View File

@ -0,0 +1,113 @@
export = PeerStreams;
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
*/
/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
*/
declare class PeerStreams {
/**
* @param {object} properties properties of the PeerStreams.
* @param {PeerId} properties.id
* @param {string} properties.protocol
*/
constructor({ id, protocol }: {
id: import("peer-id");
protocol: string;
});
/**
* @type {import('peer-id')}
*/
id: import('peer-id');
/**
* Established protocol
* @type {string}
*/
protocol: string;
/**
* The raw outbound stream, as retrieved from conn.newStream
* @private
* @type {DuplexIterableStream}
*/
_rawOutboundStream: DuplexIterableStream;
/**
* The raw inbound stream, as retrieved from the callback from libp2p.handle
* @private
* @type {DuplexIterableStream}
*/
_rawInboundStream: DuplexIterableStream;
/**
* An AbortController for controlled shutdown of the inbound stream
* @private
* @type {typeof AbortController}
*/
_inboundAbortController: typeof AbortController;
/**
* Write stream -- its preferable to use the write method
* @type {import('it-pushable').Pushable<Uint8Array>>}
*/
outboundStream: import('it-pushable').Pushable<Uint8Array>;
/**
* Read stream
* @type {DuplexIterableStream}
*/
inboundStream: DuplexIterableStream;
/**
* Do we have a connection to read from?
*
* @type {boolean}
*/
get isReadable(): boolean;
/**
* Do we have a connection to write on?
*
* @type {boolean}
*/
get isWritable(): boolean;
/**
* Send a message to this peer.
* Throws if there is no `stream` to write to available.
*
* @param {Uint8Array} data
* @returns {void}
*/
write(data: Uint8Array): void;
/**
* Attach a raw inbound stream and setup a read stream
*
* @param {DuplexIterableStream} stream
* @returns {void}
*/
attachInboundStream(stream: DuplexIterableStream): void;
/**
* Attach a raw outbound stream and setup a write stream
*
* @param {Stream} stream
* @returns {Promise<void>}
*/
attachOutboundStream(stream: any): Promise<void>;
/**
* Closes the open connection to peer
* @returns {void}
*/
close(): void;
}
declare namespace PeerStreams {
export { Sink, DuplexIterableStream, PeerId };
}
type DuplexIterableStream = {
sink: Sink;
source: () => AsyncIterator<Uint8Array, any, undefined>;
};
declare const AbortController: typeof import("abort-controller");
type Sink = (source: Uint8Array) => Promise<Uint8Array>;
type PeerId = import("peer-id");

View File

@ -12,19 +12,33 @@ const debug = require('debug')
const log = debug('libp2p-pubsub:peer-streams') const log = debug('libp2p-pubsub:peer-streams')
log.error = debug('libp2p-pubsub:peer-streams:error') log.error = debug('libp2p-pubsub:peer-streams:error')
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
*/
/** /**
* Thin wrapper around a peer's inbound / outbound pubsub streams * Thin wrapper around a peer's inbound / outbound pubsub streams
*/ */
class PeerStreams extends EventEmitter { class PeerStreams extends EventEmitter {
/** /**
* @param {PeerId} id * @param {object} properties properties of the PeerStreams.
* @param {string} protocol * @param {PeerId} properties.id
* @param {string} properties.protocol
*/ */
constructor ({ id, protocol }) { constructor ({ id, protocol }) {
super() super()
/** /**
* @type {PeerId} * @type {import('peer-id')}
*/ */
this.id = id this.id = id
/** /**
@ -47,12 +61,12 @@ class PeerStreams extends EventEmitter {
/** /**
* An AbortController for controlled shutdown of the inbound stream * An AbortController for controlled shutdown of the inbound stream
* @private * @private
* @type {AbortController} * @type {typeof AbortController}
*/ */
this._inboundAbortController = null this._inboundAbortController = null
/** /**
* Write stream -- its preferable to use the write method * Write stream -- its preferable to use the write method
* @type {Pushable} * @type {import('it-pushable').Pushable<Uint8Array>>}
*/ */
this.outboundStream = null this.outboundStream = null
/** /**
@ -85,7 +99,7 @@ class PeerStreams extends EventEmitter {
* Throws if there is no `stream` to write to available. * Throws if there is no `stream` to write to available.
* *
* @param {Uint8Array} data * @param {Uint8Array} data
* @returns {undefined} * @returns {void}
*/ */
write (data) { write (data) {
if (!this.isWritable) { if (!this.isWritable) {

4
src/pubsub/signature-policy.d.ts vendored Normal file
View File

@ -0,0 +1,4 @@
export namespace SignaturePolicy {
export const StrictSign: string;
export const StrictNoSign: string;
}

View File

@ -0,0 +1,28 @@
'use strict'
/**
* Enum for Signature Policy
* Details how message signatures are produced/consumed
*/
exports.SignaturePolicy = {
/**
* On the producing side:
* * Build messages with the signature, key (from may be enough for certain inlineable public key types), from and seqno fields.
*
* On the consuming side:
* * Enforce the fields to be present, reject otherwise.
* * Propagate only if the fields are valid and signature can be verified, reject otherwise.
*/
StrictSign: 'StrictSign',
/**
* On the producing side:
* * Build messages without the signature, key, from and seqno fields.
* * The corresponding protobuf key-value pairs are absent from the marshalled message, not just empty.
*
* On the consuming side:
* * Enforce the fields to be absent, reject otherwise.
* * Propagate only if the fields are absent, reject otherwise.
* * A message_id function will not be able to use the above fields, and should instead rely on the data field. A commonplace strategy is to calculate a hash.
*/
StrictNoSign: 'StrictNoSign'
}

7
src/pubsub/utils.d.ts vendored Normal file
View File

@ -0,0 +1,7 @@
export function randomSeqno(): Uint8Array;
export function msgId(from: string, seqno: Uint8Array): Uint8Array;
export function noSignMsgId(data: Uint8Array): Uint8Array;
export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean;
export function ensureArray(maybeArray: any): any[];
export function normalizeInRpcMessage(message: any, peerId: string): any;
export function normalizeOutRpcMessage(message: any): any;

View File

@ -3,7 +3,8 @@
const randomBytes = require('libp2p-crypto/src/random-bytes') const randomBytes = require('libp2p-crypto/src/random-bytes')
const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const PeerId = require('peer-id')
const multihash = require('multihashes')
exports = module.exports exports = module.exports
/** /**
@ -21,13 +22,26 @@ exports.randomSeqno = () => {
* *
* @param {string} from * @param {string} from
* @param {Uint8Array} seqno * @param {Uint8Array} seqno
* @returns {string} * @returns {Uint8Array}
* @private * @private
*/ */
exports.msgId = (from, seqno) => { exports.msgId = (from, seqno) => {
return from + uint8ArrayToString(seqno, 'base16') const fromBytes = PeerId.createFromB58String(from).id
const msgId = new Uint8Array(fromBytes.length + seqno.length)
msgId.set(fromBytes, 0)
msgId.set(seqno, fromBytes.length)
return msgId
} }
/**
* Generate a message id, based on message `data`.
*
* @param {Uint8Array} data
* @returns {Uint8Array}
* @private
*/
exports.noSignMsgId = (data) => multihash.encode(data, 'sha2')
/** /**
* Check if any member of the first set is also a member * Check if any member of the first set is also a member
* of the second set. * of the second set.
@ -71,10 +85,9 @@ exports.ensureArray = (maybeArray) => {
/** /**
* Ensures `message.from` is base58 encoded * Ensures `message.from` is base58 encoded
* @param {Object} message * @param {object} message
* @param {Uint8Array|String} message.from
* @param {String} peerId * @param {String} peerId
* @return {Object} * @return {object}
*/ */
exports.normalizeInRpcMessage = (message, peerId) => { exports.normalizeInRpcMessage = (message, peerId) => {
const m = Object.assign({}, message) const m = Object.assign({}, message)
@ -87,6 +100,10 @@ exports.normalizeInRpcMessage = (message, peerId) => {
return m return m
} }
/**
* @param {object} message
* @return {object}
*/
exports.normalizeOutRpcMessage = (message) => { exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message) const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) { if (typeof message.from === 'string' || message.from instanceof String) {

23
src/record/index.d.ts vendored Normal file
View File

@ -0,0 +1,23 @@
export = Record;
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
declare class Record {
/**
* @constructor
* @param {String} domain signature domain
* @param {Uint8Array} codec identifier of the type of record
*/
constructor(domain: string, codec: Uint8Array);
domain: string;
codec: Uint8Array;
/**
* Marshal a record to be used in an envelope.
*/
marshal(): void;
/**
* Verifies if the other provided Record is identical to this one.
* @param {Record} other
*/
equals(other: Record): void;
}

42
src/topology/index.d.ts vendored Normal file
View File

@ -0,0 +1,42 @@
declare const _exports: Topology;
export = _exports;
declare class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Object} [props.handlers]
* @param {function} [props.handlers.onConnect] protocol "onConnect" handler
* @param {function} [props.handlers.onDisconnect] protocol "onDisconnect" handler
* @constructor
*/
constructor({ min, max, handlers }: {
min: number;
max: number;
handlers?: {
onConnect?: Function;
onDisconnect?: Function;
};
});
min: number;
max: number;
_onConnect: Function;
_onDisconnect: Function;
/**
* Set of peers that support the protocol.
* @type {Set<string>}
*/
peers: Set<string>;
set registrar(arg: any);
_registrar: any;
/**
* @typedef PeerId
* @type {import('peer-id')}
*/
/**
* Notify about peer disconnected event.
* @param {PeerId} peerId
* @returns {void}
*/
disconnect(peerId: import("peer-id")): void;
}

View File

@ -1,7 +1,6 @@
'use strict' 'use strict'
const withIs = require('class-is') const withIs = require('class-is')
const noop = () => {} const noop = () => {}
class Topology { class Topology {
@ -37,6 +36,11 @@ class Topology {
this._registrar = registrar this._registrar = registrar
} }
/**
* @typedef PeerId
* @type {import('peer-id')}
*/
/** /**
* Notify about peer disconnected event. * Notify about peer disconnected event.
* @param {PeerId} peerId * @param {PeerId} peerId
@ -47,4 +51,8 @@ class Topology {
} }
} }
/**
* @module
* @type {Topology}
*/
module.exports = withIs(Topology, { className: 'Topology', symbolName: '@libp2p/js-interfaces/topology' }) module.exports = withIs(Topology, { className: 'Topology', symbolName: '@libp2p/js-interfaces/topology' })

52
src/topology/multicodec-topology.d.ts vendored Normal file
View File

@ -0,0 +1,52 @@
declare const _exports: MulticodecTopology;
export = _exports;
declare class MulticodecTopology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor({ min, max, multicodecs, handlers }: {
min: number;
max: number;
multicodecs: string[];
handlers: {
onConnect: Function;
onDisconnect: Function;
};
});
multicodecs: string[];
_registrar: any;
/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
*/
_onProtocolChange({ peerId, protocols }: {
peerId: any;
protocols: string[];
}): void;
/**
* Verify if a new connected peer has a topology multicodec and call _onConnect.
* @param {Connection} connection
* @returns {void}
*/
_onPeerConnect(connection: any): void;
set registrar(arg: any);
/**
* Update topology.
* @param {Array<{id: PeerId, multiaddrs: Array<Multiaddr>, protocols: Array<string>}>} peerDataIterable
* @returns {void}
*/
_updatePeers(peerDataIterable: {
id: any;
multiaddrs: any[];
protocols: string[];
}[]): void;
}

View File

@ -120,4 +120,8 @@ class MulticodecTopology extends Topology {
} }
} }
/**
* @module
* @type {MulticodecTopology}
*/
module.exports = withIs(MulticodecTopology, { className: 'MulticodecTopology', symbolName: '@libp2p/js-interfaces/topology/multicodec-topology' }) module.exports = withIs(MulticodecTopology, { className: 'MulticodecTopology', symbolName: '@libp2p/js-interfaces/topology/multicodec-topology' })

6
src/transport/errors.d.ts vendored Normal file
View File

@ -0,0 +1,6 @@
export class AbortError extends Error {
static get code(): string;
static get type(): string;
code: string;
type: string;
}

View File

@ -5,7 +5,7 @@ const { expect } = require('aegir/utils/chai')
const sinon = require('sinon') const sinon = require('sinon')
const PubsubBaseImpl = require('../../src/pubsub') const PubsubBaseImpl = require('../../src/pubsub')
const { randomSeqno } = require('../../src/pubsub/utils') const { SignaturePolicy } = require('../../src/pubsub/signature-policy')
const { const {
createPeerId, createPeerId,
mockRegistrar mockRegistrar
@ -34,9 +34,7 @@ describe('pubsub base messages', () => {
it('_buildMessage normalizes and signs messages', async () => { it('_buildMessage normalizes and signs messages', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
@ -44,27 +42,46 @@ describe('pubsub base messages', () => {
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with strict signing off will validate a present signature', async () => { it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
sinon.stub(pubsub, 'strictSigning').value(false) sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictSign)
const signedMessage = await pubsub._buildMessage(message)
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.from
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.signature
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.key
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.seqno
await expect(pubsub.validate(signedMessage)).to.not.be.rejected()
})
it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => {
const message = {
receivedFrom: peerId.id,
data: 'hello',
topicIDs: ['test-topic']
}
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
const signedMessage = await pubsub._buildMessage(message) const signedMessage = await pubsub._buildMessage(message)
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with strict signing requires a signature', async () => { it('validate with StrictSign requires a signature', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }

View File

@ -10,8 +10,8 @@ const PeerId = require('peer-id')
const uint8ArrayEquals = require('uint8arrays/equals') const uint8ArrayEquals = require('uint8arrays/equals')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const { utils } = require('../../src/pubsub')
const PeerStreams = require('../../src/pubsub/peer-streams') const PeerStreams = require('../../src/pubsub/peer-streams')
const { SignaturePolicy } = require('../../src/pubsub/signature-policy')
const { const {
createPeerId, createPeerId,
@ -30,6 +30,8 @@ describe('topic validators', () => {
pubsub = new PubsubImplementation(protocol, { pubsub = new PubsubImplementation(protocol, {
peerId: peerId, peerId: peerId,
registrar: mockRegistrar registrar: mockRegistrar
}, {
globalSignaturePolicy: SignaturePolicy.StrictNoSign
}) })
pubsub.start() pubsub.start()
@ -42,8 +44,6 @@ describe('topic validators', () => {
it('should filter messages by topic validator', async () => { it('should filter messages by topic validator', async () => {
// use _publish.callCount() to see if a message is valid or not // use _publish.callCount() to see if a message is valid or not
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
// Disable strict signing
sinon.stub(pubsub, 'strictSigning').value(false)
sinon.stub(pubsub.peers, 'get').returns({}) sinon.stub(pubsub.peers, 'get').returns({})
const filteredTopic = 't' const filteredTopic = 't'
const peer = new PeerStreams({ id: await PeerId.create() }) const peer = new PeerStreams({ id: await PeerId.create() })
@ -59,9 +59,7 @@ describe('topic validators', () => {
const validRpc = { const validRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a message'), data: uint8ArrayFromString('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -76,9 +74,7 @@ describe('topic validators', () => {
const invalidRpc = { const invalidRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -94,9 +90,7 @@ describe('topic validators', () => {
const invalidRpc2 = { const invalidRpc2 = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toB58String(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }

View File

@ -15,15 +15,11 @@ describe('utils', () => {
expect(first).to.not.eql(second) expect(first).to.not.eql(second)
}) })
it('msgId', () => {
expect(utils.msgId('hello', uint8ArrayFromString('world'))).to.be.eql('hello776f726c64')
})
it('msgId should not generate same ID for two different Uint8Arrays', () => { it('msgId should not generate same ID for two different Uint8Arrays', () => {
const peerId = 'QmPNdSYk5Rfpo5euNqwtyizzmKXMNHdXeLjTQhcN4yfX22' const peerId = 'QmPNdSYk5Rfpo5euNqwtyizzmKXMNHdXeLjTQhcN4yfX22'
const msgId0 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfde', 'base16')) const msgId0 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfde', 'base16'))
const msgId1 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfe0', 'base16')) const msgId1 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfe0', 'base16'))
expect(msgId0).to.not.eql(msgId1) expect(msgId0).to.not.deep.equal(msgId1)
}) })
it('anyMatch', () => { it('anyMatch', () => {

18
tsconfig.json Normal file
View File

@ -0,0 +1,18 @@
{
"include": ["src/**/*.js"],
"exclude": ["src/**/tests/*", "src/utils"],
"compilerOptions": {
// Tells TypeScript to read JS files, as
// normally they are ignored as source files
"allowJs": true,
// Generate d.ts files
"declaration": true,
// This compiler run should
// only output d.ts files
"emitDeclarationOnly": true,
"esModuleInterop": true,
"rootDir": "./src",
"outDir": "./src"
}
}