Compare commits

...

34 Commits

Author SHA1 Message Date
Pavel Murygin
96d4a118db fix types 2021-04-09 18:49:09 +03:00
Pavel Murygin
d14c37d14b fix tests 2021-04-09 14:18:10 +03:00
Pavel Murygin
9340ec9d98 fix import issue 2021-04-09 14:12:44 +03:00
Pavel Murygin
d9224d5409 bump dependencies 2021-04-09 13:34:14 +03:00
Vasco Santos
edb8ca60e9 chore: release version v0.9.0 2021-04-07 09:44:27 +02:00
Vasco Santos
810642d8ff chore: update contributors 2021-04-07 09:44:26 +02:00
Vasco Santos
64a478d313
chore: update types (#87) 2021-04-07 09:39:48 +02:00
Vasco Santos
b948972c5b chore: release version v0.8.4 2021-03-22 10:03:40 +01:00
Vasco Santos
edcedf093d chore: update contributors 2021-03-22 10:03:39 +01:00
Alex Potsides
3b960d516f
fix: specify connection direction (#86)
The comment says it can be 'inbound' or 'outbound' so constrain the type to those values.
2021-03-22 09:58:10 +01:00
Vasco Santos
2429e7246f chore: release version v0.8.3 2021-01-26 09:24:52 +01:00
Vasco Santos
22a7f6a9f1 chore: update contributors 2021-01-26 09:24:52 +01:00
Vasco Santos
fb9fce8713
chore: update deps (#82)
* chore: update deps

* chore: add prepare script
2021-01-26 09:11:00 +01:00
Vasco Santos
91dba97125 chore: release version v0.8.2 2021-01-20 10:19:40 +01:00
Vasco Santos
93d712bb0e chore: update contributors 2021-01-20 10:19:40 +01:00
Vasco Santos
ca520775eb
fix: event emitter types with local types (#80) 2021-01-20 10:15:40 +01:00
Vasco Santos
ceb91c672e
chore: add github actions badge (#78) 2020-12-17 11:31:12 +01:00
Vasco Santos
30ffad42c8 chore: release version v0.8.1 2020-12-11 10:06:07 +01:00
Vasco Santos
c365399e4f chore: update contributors 2020-12-11 10:06:07 +01:00
Vasco Santos
5b99e6b56b
fix: pubsub publish message should be uint8array (#77) 2020-12-11 10:01:39 +01:00
Vasco Santos
f4c19c9ef2 chore: release version v0.8.0 2020-12-10 14:29:13 +01:00
Vasco Santos
c36698f488 chore: update contributors 2020-12-10 14:29:13 +01:00
Vasco Santos
3ecbc3e889
chore: remove old pre release script (#76) 2020-12-10 14:23:35 +01:00
Vasco Santos
e2419ea308
feat: add types (#74) 2020-12-10 14:03:17 +01:00
Vasco Santos
04e93d3f08
chore: add pubsub interface to readme (#72) 2020-11-25 18:57:25 +01:00
Vasco Santos
83d7d52d7e chore: release version v0.7.2 2020-11-11 17:20:34 +01:00
Vasco Santos
1a3ea82776 chore: update contributors 2020-11-11 17:20:34 +01:00
Cayman
ad2dfa42dc
chore: pubsub conformance test updates (#70) 2020-11-11 17:16:49 +01:00
Vasco Santos
b75f2cab48 chore: release version v0.7.1 2020-11-03 22:43:57 +01:00
Vasco Santos
8512997e76 chore: update contributors 2020-11-03 22:43:57 +01:00
Cayman
269a6f5e0a
fix: typescript types (#69) 2020-11-03 22:35:18 +01:00
Vasco Santos
14d09970ca chore: release version v0.7.0 2020-11-03 18:26:50 +01:00
Vasco Santos
c98c58e824 chore: update contributors 2020-11-03 18:26:49 +01:00
Cayman
946b046440
feat: pubsub: add global signature policy (#66)
BREAKING CHANGE:
`signMessages` and `strictSigning` pubsub configuration options replaced
with a `globalSignaturePolicy` option
2020-11-03 18:22:03 +01:00
76 changed files with 1239 additions and 1326 deletions

53
.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,53 @@
name: ci
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: yarn lint
- uses: gozala/typescript-error-reporter-action@v1.0.4
- run: yarn build
- run: yarn aegir dep-check
- uses: ipfs/aegir/actions/bundle-size@master
name: size
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
test-node:
needs: check
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [windows-latest, ubuntu-latest, macos-latest]
node: [14, 15]
fail-fast: true
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node }}
- run: yarn
- run: npx nyc --reporter=lcov aegir test -t node -- --bail
- uses: codecov/codecov-action@v1
test-chrome:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: npx aegir test -t browser -t webworker --bail
test-firefox:
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: yarn
- run: npx aegir test -t browser -t webworker --bail -- --browsers FirefoxHeadless

View File

@ -1,40 +0,0 @@
language: node_js
cache: npm
stages:
- check
- test
- cov
node_js:
- '10'
- '12'
os:
- linux
- osx
- windows
script: npx nyc -s npm run test:node -- --bail
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov
jobs:
include:
- stage: check
script:
- npx aegir dep-check
- npm run lint
- stage: test
name: chrome
addons:
chrome: stable
script: npx aegir test -t browser -t webworker
- stage: test
name: firefox
addons:
firefox: latest
script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless
notifications:
email: false

View File

@ -1,3 +1,81 @@
# [0.9.0](https://github.com/libp2p/js-interfaces/compare/v0.8.4...v0.9.0) (2021-04-07)
## [0.8.4](https://github.com/libp2p/js-interfaces/compare/v0.8.3...v0.8.4) (2021-03-22)
### Bug Fixes
* specify connection direction ([#86](https://github.com/libp2p/js-interfaces/issues/86)) ([3b960d5](https://github.com/libp2p/js-interfaces/commit/3b960d516f70f7e198574a736cb09000ddd7a94c))
## [0.8.3](https://github.com/libp2p/js-interfaces/compare/v0.8.2...v0.8.3) (2021-01-26)
## [0.8.2](https://github.com/libp2p/js-interfaces/compare/v0.8.1...v0.8.2) (2021-01-20)
### Bug Fixes
* event emitter types with local types ([#80](https://github.com/libp2p/js-interfaces/issues/80)) ([ca52077](https://github.com/libp2p/js-interfaces/commit/ca520775eb26f5ed501375fdb24ba698c9a8c8c8))
## [0.8.1](https://github.com/libp2p/js-interfaces/compare/v0.8.0...v0.8.1) (2020-12-11)
### Bug Fixes
* pubsub publish message should be uint8array ([#77](https://github.com/libp2p/js-interfaces/issues/77)) ([5b99e6b](https://github.com/libp2p/js-interfaces/commit/5b99e6b56b10439a82ee88fb4e31fb95c182264f))
# [0.8.0](https://github.com/libp2p/js-interfaces/compare/v0.7.2...v0.8.0) (2020-12-10)
### Features
* add types ([#74](https://github.com/libp2p/js-interfaces/issues/74)) ([e2419ea](https://github.com/libp2p/js-interfaces/commit/e2419ea308b5db38966850ba6349602c93ce3b0e))
<a name="0.7.2"></a>
## [0.7.2](https://github.com/libp2p/js-interfaces/compare/v0.7.1...v0.7.2) (2020-11-11)
<a name="0.7.1"></a>
## [0.7.1](https://github.com/libp2p/js-interfaces/compare/v0.7.0...v0.7.1) (2020-11-03)
### Bug Fixes
* typescript types ([#69](https://github.com/libp2p/js-interfaces/issues/69)) ([269a6f5](https://github.com/libp2p/js-interfaces/commit/269a6f5))
<a name="0.7.0"></a>
# [0.7.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.7.0) (2020-11-03)
### Features
* pubsub: add global signature policy ([#66](https://github.com/libp2p/js-interfaces/issues/66)) ([946b046](https://github.com/libp2p/js-interfaces/commit/946b046))
* update pubsub getMsgId return type to Uint8Array ([#65](https://github.com/libp2p/js-interfaces/issues/65)) ([e148443](https://github.com/libp2p/js-interfaces/commit/e148443))
### BREAKING CHANGES
* `signMessages` and `strictSigning` pubsub configuration options replaced
with a `globalSignaturePolicy` option
* new getMsgId return type is not backwards compatible with prior `string`
return type.
<a name="0.6.0"></a> <a name="0.6.0"></a>
# [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05) # [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05)

View File

@ -4,6 +4,8 @@
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io) [![](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-interfaces.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-interfaces)
[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/ci?label=ci&style=flat-square)](https://github.com/libp2p/js-libp2p-interfaces/actions?query=branch%3Amaster+workflow%3Aci+)
> Contains test suites and interfaces you can use to implement the various components of libp2p. > Contains test suites and interfaces you can use to implement the various components of libp2p.
@ -18,6 +20,7 @@
- [Crypto](./src/crypto) - [Crypto](./src/crypto)
- [Peer Discovery](./src/peer-discovery) - [Peer Discovery](./src/peer-discovery)
- [Peer Routing](./src/peer-routing) - [Peer Routing](./src/peer-routing)
- [Pubsub](./src/pubsub)
- [Record](./src/record) - [Record](./src/record)
- [Stream Muxer](./src/stream-muxer) - [Stream Muxer](./src/stream-muxer)
- [Topology](./src/topology) - [Topology](./src/topology)
@ -30,6 +33,7 @@ For posterity, here are links to the original repositories for each of the inter
- [Content Routing](https://github.com/libp2p/interface-content-routing) - [Content Routing](https://github.com/libp2p/interface-content-routing)
- [Peer Discovery](https://github.com/libp2p/interface-peer-discovery) - [Peer Discovery](https://github.com/libp2p/interface-peer-discovery)
- [Peer Routing](https://github.com/libp2p/interface-peer-routing) - [Peer Routing](https://github.com/libp2p/interface-peer-routing)
- [Pubsub](https://github.com/libp2p/js-libp2p-pubsub)
- [Stream Muxer](https://github.com/libp2p/interface-stream-muxer) - [Stream Muxer](https://github.com/libp2p/interface-stream-muxer)
- [Transport](https://github.com/libp2p/interface-transport) - [Transport](https://github.com/libp2p/interface-transport)

View File

@ -1,6 +1,6 @@
{ {
"name": "libp2p-interfaces", "name": "libp2p-interfaces",
"version": "0.6.0", "version": "0.9.0",
"description": "Interfaces for JS Libp2p", "description": "Interfaces for JS Libp2p",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>", "leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js", "main": "src/index.js",
@ -9,15 +9,25 @@
"types", "types",
"dist" "dist"
], ],
"types": "dist/src/index.d.ts",
"typesVersions": {
"*": {
"src/*": [
"dist/src/*",
"dist/src/*/index"
]
}
},
"eslintConfig": {
"extends": "ipfs"
},
"scripts": { "scripts": {
"prepare": "aegir build --no-bundle",
"lint": "aegir lint", "lint": "aegir lint",
"build": "aegir build", "build": "aegir build",
"pregenerate:types": "rimraf './src/**/*.d.ts'",
"generate:types": "tsc",
"test": "aegir test", "test": "aegir test",
"test:node": "aegir test --target node", "test:node": "aegir test --target node",
"test:browser": "aegir test --target browser", "test:browser": "aegir test --target browser",
"prepublishOnly": "npm run generate:types",
"release": "aegir release -t node -t browser", "release": "aegir release -t node -t browser",
"release-minor": "aegir release --type minor -t node -t browser", "release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser" "release-major": "aegir release --type major -t node -t browser"
@ -37,39 +47,43 @@
}, },
"homepage": "https://github.com/libp2p/js-interfaces#readme", "homepage": "https://github.com/libp2p/js-interfaces#readme",
"dependencies": { "dependencies": {
"@types/bl": "4.1.0",
"abort-controller": "^3.0.0", "abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0", "abortable-iterator": "^3.0.0",
"chai": "^4.2.0", "chai": "^4.3.4",
"chai-checkmark": "^1.0.1", "chai-checkmark": "^1.0.1",
"class-is": "^1.1.0", "debug": "^4.3.1",
"debug": "^4.1.1", "delay": "^5.0.0",
"delay": "^4.3.0",
"detect-node": "^2.0.4", "detect-node": "^2.0.4",
"dirty-chai": "^2.0.1", "dirty-chai": "^2.0.1",
"err-code": "^2.0.0", "err-code": "^3.0.1",
"it-goodbye": "^2.0.1", "it-goodbye": "^2.0.2",
"it-length-prefixed": "^3.1.0", "it-length-prefixed": "^3.1.0",
"it-pair": "^1.0.0", "it-pair": "^1.0.0",
"it-pipe": "^1.1.0", "it-pipe": "^1.1.0",
"it-pushable": "^1.4.0", "it-pushable": "^1.4.2",
"libp2p-crypto": "^0.18.0", "libp2p-crypto": "fluencelabs/js-libp2p-crypto",
"libp2p-tcp": "^0.15.0", "libp2p-tcp": "fluencelabs/js-libp2p-tcp",
"multiaddr": "^8.0.0", "multiaddr": "^9.0.1",
"multibase": "^3.0.0", "multibase": "^4.0.2",
"multihashes": "^4.0.2",
"p-defer": "^3.0.0", "p-defer": "^3.0.0",
"p-limit": "^2.3.0", "p-limit": "^3.1.0",
"p-wait-for": "^3.1.0", "p-wait-for": "^3.2.0",
"peer-id": "^0.14.0", "peer-id": "fluencelabs/js-peer-id",
"protons": "^2.0.0", "protons": "^2.0.0",
"sinon": "^9.0.2", "sinon": "^10.0.0",
"streaming-iterables": "^5.0.2", "streaming-iterables": "^5.0.4",
"uint8arrays": "^1.1.0" "uint8arrays": "^2.1.3"
}, },
"devDependencies": { "devDependencies": {
"aegir": "^25.0.0", "@types/debug": "^4.1.5",
"it-handshake": "^1.0.1", "aegir": "^32.1.0",
"cids": "^1.1.6",
"events": "^3.3.0",
"it-handshake": "^1.0.2",
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
"typescript": "3.7.5" "util": "^0.12.3"
}, },
"contributors": [ "contributors": [
"Alan Shaw <alan.shaw@protocol.ai>", "Alan Shaw <alan.shaw@protocol.ai>",

View File

@ -1,149 +0,0 @@
declare const _exports: typeof Connection;
export = _exports;
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
declare class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
* @param {string} properties.stat.timeline.open connection opening timestamp.
* @param {string} properties.stat.timeline.upgraded connection upgraded timestamp.
* @param {string} [properties.stat.multiplexer] connection multiplexing identifier.
* @param {string} [properties.stat.encryption] connection encryption method identifier.
*/
constructor({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }: {
localAddr?: import("multiaddr");
remoteAddr?: import("multiaddr");
localPeer: import("peer-id");
remotePeer: import("peer-id");
newStream: Function;
close: Function;
getStreams: () => any[];
stat: {
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
});
/**
* Connection identifier.
*/
id: any;
/**
* Observed multiaddr of the local peer
*/
localAddr: import("multiaddr");
/**
* Observed multiaddr of the remote peer
*/
remoteAddr: import("multiaddr");
/**
* Local peer id.
*/
localPeer: import("peer-id");
/**
* Remote peer id.
*/
remotePeer: import("peer-id");
/**
* Connection metadata.
*/
_stat: {
status: string;
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
/**
* Reference to the new stream function of the multiplexer
*/
_newStream: Function;
/**
* Reference to the close function of the raw connection
*/
_close: Function;
/**
* Reference to the getStreams function of the muxer
*/
_getStreams: () => any[];
/**
* Connection streams registry
*/
registry: Map<any, any>;
/**
* User provided tags
* @type {string[]}
*/
tags: string[];
/**
* Get connection metadata
* @this {Connection}
*/
get stat(): {
status: string;
direction: string;
timeline: {
open: string;
upgraded: string;
};
multiplexer?: string;
encryption?: string;
};
/**
* Get all the streams of the muxer.
* @this {Connection}
*/
get streams(): any[];
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
newStream(protocols: string[]): Promise<{
stream: any;
protocol: string;
}>;
/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
* @return {void}
*/
addStream(muxedStream: any, { protocol, metadata }: {
protocol: string;
metadata: any;
}): void;
/**
* Remove stream registry after it is closed.
* @param {string} id identifier of the stream
*/
removeStream(id: string): void;
/**
* Close the connection.
* @return {Promise<void>}
*/
close(): Promise<void>;
_closing: any;
}

View File

@ -2,12 +2,245 @@
const PeerId = require('peer-id') const PeerId = require('peer-id')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const withIs = require('class-is')
const errCode = require('err-code') const errCode = require('err-code')
const Status = require('./status') const { OPEN, CLOSING, CLOSED } = require('./status')
const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')
/**
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('./status').Status} Status
*/
/**
* @typedef {Object} Timeline
* @property {number} open - connection opening timestamp.
* @property {number} [upgraded] - connection upgraded timestamp.
* @property {number} [close]
*
* @typedef {Object} ConectionStat
* @property {'inbound' | 'outbound'} direction - connection establishment direction
* @property {Timeline} timeline - connection relevant events timestamp.
* @property {string} [multiplexer] - connection multiplexing identifier.
* @property {string} [encryption] - connection encryption method identifier.
*
* @typedef {Object} ConnectionOptions
* @property {multiaddr} [localAddr] - local multiaddr of the connection if known.
* @property {multiaddr} remoteAddr - remote multiaddr of the connection.
* @property {PeerId} localPeer - local peer-id.
* @property {PeerId} remotePeer - remote peer-id.
* @property {(protocols: string|string[]) => Promise<{stream: MuxedStream, protocol: string}>} newStream - new stream muxer function.
* @property {() => Promise<void>} close - close raw connection function.
* @property {() => MuxedStream[]} getStreams - get streams from muxer function.
* @property {ConectionStat} stat - metadata of the connection.
*
* @typedef {Object} StreamData
* @property {string} protocol - the protocol used by the stream
* @property {Object} [metadata] - metadata of the stream
*/
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
class Connection {
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*
* @class
* @param {ConnectionOptions} options
*/
constructor ({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }) {
validateArgs(localAddr, localPeer, remotePeer, newStream, close, getStreams, stat)
/**
* Connection identifier.
*/
this.id = (parseInt(String(Math.random() * 1e9))).toString(36) + Date.now()
/**
* Observed multiaddr of the local peer
*/
this.localAddr = localAddr
/**
* Observed multiaddr of the remote peer
*/
this.remoteAddr = remoteAddr
/**
* Local peer id.
*/
this.localPeer = localPeer
/**
* Remote peer id.
*/
this.remotePeer = remotePeer
/**
* Connection metadata.
*
* @type {ConectionStat & {status: Status}}
*/
this._stat = {
...stat,
status: OPEN
}
/**
* Reference to the new stream function of the multiplexer
*/
this._newStream = newStream
/**
* Reference to the close function of the raw connection
*/
this._close = close
/**
* Reference to the getStreams function of the muxer
*/
this._getStreams = getStreams
/**
* Connection streams registry
*/
this.registry = new Map()
/**
* User provided tags
*
* @type {string[]}
*/
this.tags = []
}
get [Symbol.toStringTag] () {
return 'Connection'
}
get [connectionSymbol] () {
return true
}
/**
* Checks if the given value is a `Connection` instance.
*
* @param {any} other
* @returns {other is Connection}
*/
static isConnection (other) {
return Boolean(other && other[connectionSymbol])
}
/**
* Get connection metadata
*
* @this {Connection}
*/
get stat () {
return this._stat
}
/**
* Get all the streams of the muxer.
*
* @this {Connection}
*/
get streams () {
return this._getStreams()
}
/**
* Create a new stream from this connection
*
* @param {string|string[]} protocols - intended protocol for the stream
* @returns {Promise<{stream: MuxedStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
async newStream (protocols) {
if (this.stat.status === CLOSING) {
throw errCode(new Error('the connection is being closed'), 'ERR_CONNECTION_BEING_CLOSED')
}
if (this.stat.status === CLOSED) {
throw errCode(new Error('the connection is closed'), 'ERR_CONNECTION_CLOSED')
}
if (!Array.isArray(protocols)) protocols = [protocols]
const { stream, protocol } = await this._newStream(protocols)
this.addStream(stream, { protocol })
return {
stream,
protocol
}
}
/**
* Add a stream when it is opened to the registry.
*
* @param {MuxedStream} muxedStream - a muxed stream
* @param {StreamData} data - the stream data to be registered
* @returns {void}
*/
addStream (muxedStream, { protocol, metadata = {} }) {
// Add metadata for the stream
this.registry.set(muxedStream.id, {
protocol,
...metadata
})
}
/**
* Remove stream registry after it is closed.
*
* @param {string} id - identifier of the stream
*/
removeStream (id) {
this.registry.delete(id)
}
/**
* Close the connection.
*
* @returns {Promise<void>}
*/
async close () {
if (this.stat.status === CLOSED) {
return
}
if (this._closing) {
return this._closing
}
this.stat.status = CLOSING
// Close raw connection
this._closing = await this._close()
this._stat.timeline.close = Date.now()
this.stat.status = CLOSED
}
}
module.exports = Connection
/**
* @param {multiaddr|undefined} localAddr
* @param {PeerId} localPeer
* @param {PeerId} remotePeer
* @param {(protocols: string | string[]) => Promise<{ stream: import("../stream-muxer/types").MuxedStream; protocol: string; }>} newStream
* @param {() => Promise<void>} close
* @param {() => import("../stream-muxer/types").MuxedStream[]} getStreams
* @param {{ direction: any; timeline: any; multiplexer?: string | undefined; encryption?: string | undefined; }} stat
*/
function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
if (localAddr && !multiaddr.isMultiaddr(localAddr)) { if (localAddr && !multiaddr.Multiaddr.isMultiaddr(localAddr)) {
throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS') throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS')
} }
@ -51,184 +284,3 @@ function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getSt
throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS') throw errCode(new Error('connection upgraded timestamp must be provided'), 'ERR_INVALID_PARAMETERS')
} }
} }
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} [properties.localAddr] local multiaddr of the connection if known.
* @param {multiaddr} [properties.remoteAddr] remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function(): Stream[]} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
* @param {string} properties.stat.timeline.open connection opening timestamp.
* @param {string} properties.stat.timeline.upgraded connection upgraded timestamp.
* @param {string} [properties.stat.multiplexer] connection multiplexing identifier.
* @param {string} [properties.stat.encryption] connection encryption method identifier.
*/
constructor ({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }) {
validateArgs(localAddr, localPeer, remotePeer, newStream, close, getStreams, stat)
/**
* Connection identifier.
*/
this.id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
/**
* Observed multiaddr of the local peer
*/
this.localAddr = localAddr
/**
* Observed multiaddr of the remote peer
*/
this.remoteAddr = remoteAddr
/**
* Local peer id.
*/
this.localPeer = localPeer
/**
* Remote peer id.
*/
this.remotePeer = remotePeer
/**
* Connection metadata.
*/
this._stat = {
...stat,
status: Status.OPEN
}
/**
* Reference to the new stream function of the multiplexer
*/
this._newStream = newStream
/**
* Reference to the close function of the raw connection
*/
this._close = close
/**
* Reference to the getStreams function of the muxer
*/
this._getStreams = getStreams
/**
* Connection streams registry
*/
this.registry = new Map()
/**
* User provided tags
* @type {string[]}
*/
this.tags = []
}
/**
* Get connection metadata
* @this {Connection}
*/
get stat () {
return this._stat
}
/**
* Get all the streams of the muxer.
* @this {Connection}
*/
get streams () {
return this._getStreams()
}
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<{stream: Stream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
async newStream (protocols) {
if (this.stat.status === Status.CLOSING) {
throw errCode(new Error('the connection is being closed'), 'ERR_CONNECTION_BEING_CLOSED')
}
if (this.stat.status === Status.CLOSED) {
throw errCode(new Error('the connection is closed'), 'ERR_CONNECTION_CLOSED')
}
if (!Array.isArray(protocols)) protocols = [protocols]
const { stream, protocol } = await this._newStream(protocols)
this.addStream(stream, { protocol })
return {
stream,
protocol
}
}
/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
* @return {void}
*/
addStream (muxedStream, { protocol, metadata = {} }) {
// Add metadata for the stream
this.registry.set(muxedStream.id, {
protocol,
...metadata
})
}
/**
* Remove stream registry after it is closed.
* @param {string} id identifier of the stream
*/
removeStream (id) {
this.registry.delete(id)
}
/**
* Close the connection.
* @return {Promise<void>}
*/
async close () {
if (this.stat.status === Status.CLOSED) {
return
}
if (this._closing) {
return this._closing
}
this.stat.status = Status.CLOSING
// Close raw connection
this._closing = await this._close()
this._stat.timeline.close = Date.now()
this.stat.status = Status.CLOSED
}
}
/**
* @module
* @type {typeof Connection}
*/
module.exports = withIs(Connection, { className: 'Connection', symbolName: '@libp2p/interface-connection/connection' })

View File

@ -1 +0,0 @@
export var Connection: typeof import('./connection');

View File

@ -1,7 +1,3 @@
'use strict' 'use strict'
/**
* @module connection/index
* @type {typeof import('./connection')}
*/
exports.Connection = require('./connection') exports.Connection = require('./connection')

View File

@ -1,3 +0,0 @@
export declare const OPEN: string;
export declare const CLOSING: string;
export declare const CLOSED: string;

View File

@ -1,7 +1,12 @@
'use strict' 'use strict'
module.exports = { const STATUS = {
OPEN: 'open', OPEN: /** @type {'open'} */('open'),
CLOSING: 'closing', CLOSING: /** @type {'closing'} */('closing'),
CLOSED: 'closed' CLOSED: /** @type {'closed'} */('closed')
} }
module.exports = STATUS
/**
* @typedef {STATUS[keyof STATUS]} Status
*/

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -74,6 +75,7 @@ module.exports = (test) => {
let timelineProxy let timelineProxy
const proxyHandler = { const proxyHandler = {
set () { set () {
// @ts-ignore - TS fails to infer here
return Reflect.set(...arguments) return Reflect.set(...arguments)
} }
} }
@ -138,7 +140,9 @@ module.exports = (test) => {
expect(connection.stat.timeline.close).to.not.exist() expect(connection.stat.timeline.close).to.not.exist()
await connection.close() await connection.close()
// @ts-ignore - fails to infer callCount
expect(proxyHandler.set.callCount).to.equal(1) expect(proxyHandler.set.callCount).to.equal(1)
// @ts-ignore - fails to infer getCall
const [obj, key, value] = proxyHandler.set.getCall(0).args const [obj, key, value] = proxyHandler.set.getCall(0).args
expect(obj).to.eql(connection.stat.timeline) expect(obj).to.eql(connection.stat.timeline)
expect(key).to.equal('close') expect(key).to.equal('close')

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

11
src/content-routing/types.d.ts vendored Normal file
View File

@ -0,0 +1,11 @@
export = ContentRouting;
import PeerId from 'peer-id'
import { Multiaddr } from 'multiaddr'
import CID from 'cids'
declare class ContentRouting {
constructor (options: Object);
provide (cid: CID): Promise<void>;
findProviders (cid: CID, options: Object): AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>;
}

View File

@ -1,15 +0,0 @@
export class UnexpectedPeerError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}
export class InvalidCryptoExchangeError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}
export class InvalidCryptoTransmissionError extends Error {
static get code(): string;
constructor(message?: string);
code: string;
}

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -6,7 +7,7 @@ const expect = chai.expect
chai.use(require('dirty-chai')) chai.use(require('dirty-chai'))
const duplexPair = require('it-pair/duplex') const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const { collect } = require('streaming-iterables') const { collect } = require('streaming-iterables')
const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayFromString = require('uint8arrays/from-string')

24
src/crypto/types.d.ts vendored Normal file
View File

@ -0,0 +1,24 @@
import PeerId from 'peer-id'
import { MultiaddrConnection } from '../transport/types'
/**
* A libp2p crypto module must be compliant to this interface
* to ensure all exchanged data between two peers is encrypted.
*/
export interface Crypto {
protocol: string;
/**
* Encrypt outgoing data to the remote party.
*/
secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>;
/**
* Decrypt incoming data.
*/
secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>;
}
export type SecureOutbound = {
conn: MultiaddrConnection;
remoteEarlyData: Buffer;
remotePeer: PeerId;
}

0
src/index.d.ts vendored
View File

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -54,7 +55,7 @@ module.exports = (common) => {
expect(PeerId.isPeerId(id)).to.eql(true) expect(PeerId.isPeerId(id)).to.eql(true)
expect(multiaddrs).to.exist() expect(multiaddrs).to.exist()
multiaddrs.forEach((m) => expect(multiaddr.isMultiaddr(m)).to.eql(true)) multiaddrs.forEach((m) => expect(multiaddr.Multiaddr.isMultiaddr(m)).to.eql(true))
defer.resolve() defer.resolve()
}) })

10
src/peer-discovery/types.d.ts vendored Normal file
View File

@ -0,0 +1,10 @@
export = PeerDiscovery;
import events from 'events';
declare class PeerDiscovery extends events.EventEmitter {
constructor (options: Object);
start (): Promise<void>;
stop (): Promise<void>;
tag: string;
}

10
src/peer-routing/types.d.ts vendored Normal file
View File

@ -0,0 +1,10 @@
export = PeerRouting;
import PeerId from 'peer-id'
import { Multiaddr } from 'multiaddr'
declare class PeerRouting {
constructor (options?: Object);
findPeer (peerId: PeerId, options?: Object): Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>;
getClosestPeers(key: Uint8Array, options?: Object): AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>;
}

View File

@ -11,6 +11,9 @@ Table of Contents
* [Extend interface](#extend-interface) * [Extend interface](#extend-interface)
* [Example](#example) * [Example](#example)
* [API](#api) * [API](#api)
* [Constructor](#constructor)
* [new Pubsub(options)](#new-pubsuboptions)
* [Parameters](#parameters)
* [Start](#start) * [Start](#start)
* [pubsub.start()](#pubsubstart) * [pubsub.start()](#pubsubstart)
* [Returns](#returns) * [Returns](#returns)
@ -19,24 +22,24 @@ Table of Contents
* [Returns](#returns-1) * [Returns](#returns-1)
* [Publish](#publish) * [Publish](#publish)
* [pubsub.publish(topics, message)](#pubsubpublishtopics-message) * [pubsub.publish(topics, message)](#pubsubpublishtopics-message)
* [Parameters](#parameters) * [Parameters](#parameters-1)
* [Returns](#returns-2) * [Returns](#returns-2)
* [Subscribe](#subscribe) * [Subscribe](#subscribe)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic) * [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-1) * [Parameters](#parameters-2)
* [Unsubscribe](#unsubscribe) * [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic) * [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-2) * [Parameters](#parameters-3)
* [Get Topics](#get-topics) * [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics) * [pubsub.getTopics()](#pubsubgettopics)
* [Returns](#returns-3) * [Returns](#returns-3)
* [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic) * [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic)
* [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic) * [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic)
* [Parameters](#parameters-3) * [Parameters](#parameters-4)
* [Returns](#returns-4) * [Returns](#returns-4)
* [Validate](#validate) * [Validate](#validate)
* [pubsub.validate(message)](#pubsubvalidatemessage) * [pubsub.validate(message)](#pubsubvalidatemessage)
* [Parameters](#parameters-4) * [Parameters](#parameters-5)
* [Returns](#returns-5) * [Returns](#returns-5)
* [Test suite usage](#test-suite-usage) * [Test suite usage](#test-suite-usage)
@ -49,7 +52,7 @@ You can check the following implementations as examples for building your own pu
## Interface usage ## Interface usage
`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it. `interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management and the features describe in the libp2p [pubsub specs](https://github.com/libp2p/specs/tree/master/pubsub). This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.
### Extend interface ### Extend interface
@ -74,7 +77,7 @@ All the remaining functions **MUST NOT** be overwritten.
The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.
```JavaScript ```JavaScript
const Pubsub = require('libp2p-pubsub') const Pubsub = require('libp2p-interfaces/src/pubsub')
class PubsubImplementation extends Pubsub { class PubsubImplementation extends Pubsub {
constructor({ libp2p, options }) constructor({ libp2p, options })
@ -82,8 +85,7 @@ class PubsubImplementation extends Pubsub {
debugName: 'libp2p:pubsub', debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0', multicodecs: '/pubsub-implementation/1.0.0',
libp2p, libp2p,
signMessages: options.signMessages, globalSigningPolicy: options.globalSigningPolicy
strictSigning: options.strictSigning
}) })
} }
@ -98,6 +100,23 @@ class PubsubImplementation extends Pubsub {
The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message. The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message.
### Constructor
The base class constructor configures the pubsub instance for use with a libp2p instance. It includes settings for logging, signature policies, etc.
#### `new Pubsub({options})`
##### Parameters
| Name | Type | Description | Default |
|------|------|-------------|---------|
| options.libp2p | `Libp2p` | libp2p instance | required, no default |
| options.debugName | `string` | log namespace | required, no default |
| options.multicodecs | `string \| Array<string>` | protocol identifier(s) | required, no default |
| options.globalSignaturePolicy | `'StrictSign' \| 'StrictNoSign'` | signature policy to be globally applied | `'StrictSign'` |
| options.canRelayMessage | `boolean` | if can relay messages if not subscribed | `false` |
| options.emitSelf | `boolean` | if `publish` should emit to self, if subscribed | `false` |
### Start ### Start
Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`. Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.
@ -185,7 +204,7 @@ Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that ar
### Validate ### Validate
Validates the signature of a message. Validates a message according to the signature policy and topic-specific validation function.
#### `pubsub.validate(message)` #### `pubsub.validate(message)`

View File

@ -1,4 +0,0 @@
export namespace codes {
export const ERR_MISSING_SIGNATURE: string;
export const ERR_INVALID_SIGNATURE: string;
}

View File

@ -1,6 +1,46 @@
'use strict' 'use strict'
exports.codes = { exports.codes = {
/**
* Signature policy is invalid
*/
ERR_INVALID_SIGNATURE_POLICY: 'ERR_INVALID_SIGNATURE_POLICY',
/**
* Signature policy is unhandled
*/
ERR_UNHANDLED_SIGNATURE_POLICY: 'ERR_UNHANDLED_SIGNATURE_POLICY',
// Strict signing codes
/**
* Message expected to have a `signature`, but doesn't
*/
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE', ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE' /**
* Message expected to have a `seqno`, but doesn't
*/
ERR_MISSING_SEQNO: 'ERR_MISSING_SEQNO',
/**
* Message `signature` is invalid
*/
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE',
// Strict no-signing codes
/**
* Message expected to not have a `from`, but does
*/
ERR_UNEXPECTED_FROM: 'ERR_UNEXPECTED_FROM',
/**
* Message expected to not have a `signature`, but does
*/
ERR_UNEXPECTED_SIGNATURE: 'ERR_UNEXPECTED_SIGNATURE',
/**
* Message expected to not have a `key`, but does
*/
ERR_UNEXPECTED_KEY: 'ERR_UNEXPECTED_KEY',
/**
* Message expected to not have a `seqno`, but does
*/
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
} }

307
src/pubsub/index.d.ts vendored
View File

@ -1,307 +0,0 @@
export = PubsubBaseProtocol;
/**
* @typedef {Object} InMessage
* @property {string} [from]
* @property {string} receivedFrom
* @property {string[]} topicIDs
* @property {Uint8Array} [seqno]
* @property {Uint8Array} data
* @property {Uint8Array} [signature]
* @property {Uint8Array} [key]
*
* @typedef PeerId
* @type import('peer-id')
*/
/**
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
declare class PubsubBaseProtocol {
/**
* @param {Object} props
* @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract
*/
constructor({ debugName, multicodecs, libp2p, signMessages, strictSigning, canRelayMessage, emitSelf }: {
debugName: string;
multicodecs: string | string[];
libp2p: any;
signMessages?: boolean;
strictSigning?: boolean;
canRelayMessage?: boolean;
emitSelf?: boolean;
});
log: any;
/**
* @type {Array<string>}
*/
multicodecs: Array<string>;
_libp2p: any;
registrar: any;
/**
* @type {PeerId}
*/
peerId: PeerId;
started: boolean;
/**
* Map of topics to which peers are subscribed to
*
* @type {Map<string, Set<string>>}
*/
topics: Map<string, Set<string>>;
/**
* List of our subscriptions
* @type {Set<string>}
*/
subscriptions: Set<string>;
/**
* Map of peer streams
*
* @type {Map<string, import('./peer-streams')>}
*/
peers: Map<string, import('./peer-streams')>;
signMessages: boolean;
/**
* If message signing should be required for incoming messages
* @type {boolean}
*/
strictSigning: boolean;
/**
* If router can relay received messages, even if not subscribed
* @type {boolean}
*/
canRelayMessage: boolean;
/**
* if publish should emit to self, if subscribed
* @type {boolean}
*/
emitSelf: boolean;
/**
* Topic validator function
* @typedef {function(string, InMessage): Promise<void>} validator
*/
/**
* Topic validator map
*
* Keyed by topic
* Topic validators are functions with the following input:
* @type {Map<string, validator>}
*/
topicValidators: Map<string, validator>;
_registrarId: any;
/**
* On an inbound stream opened.
* @private
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexIterableStream} props.stream
* @param {Connection} props.connection connection
*/
_onIncomingStream({ protocol, stream, connection }: {
protocol: string;
stream: any;
connection: any;
}): void;
/**
* Registrar notifies an established connection with pubsub protocol.
* @private
* @param {PeerId} peerId remote peer-id
* @param {Connection} conn connection to the peer
*/
_onPeerConnected(peerId: import("peer-id"), conn: any): Promise<void>;
/**
* Registrar notifies a closing connection with pubsub protocol.
* @private
* @param {PeerId} peerId peerId
* @param {Error} err error for connection end
*/
_onPeerDisconnected(peerId: import("peer-id"), err: Error): void;
/**
* Register the pubsub protocol onto the libp2p node.
* @returns {void}
*/
start(): void;
/**
* Unregister the pubsub protocol and the streams with other peers will be closed.
* @returns {void}
*/
stop(): void;
/**
* Notifies the router that a peer has been connected
* @private
* @param {PeerId} peerId
* @param {string} protocol
* @returns {PeerStreams}
*/
_addPeer(peerId: import("peer-id"), protocol: string): import("./peer-streams");
/**
* Notifies the router that a peer has been disconnected.
* @private
* @param {PeerId} peerId
* @returns {PeerStreams | undefined}
*/
_removePeer(peerId: import("peer-id")): import("./peer-streams");
/**
* Responsible for processing each RPC message received by other peers.
* @param {string} idB58Str peer id string in base58
* @param {DuplexIterableStream} stream inbound stream
* @param {PeerStreams} peerStreams PubSub peer
* @returns {Promise<void>}
*/
_processMessages(idB58Str: string, stream: any, peerStreams: import("./peer-streams")): Promise<void>;
/**
* Handles an rpc request from a peer
* @param {String} idB58Str
* @param {PeerStreams} peerStreams
* @param {RPC} rpc
* @returns {boolean}
*/
_processRpc(idB58Str: string, peerStreams: import("./peer-streams"), rpc: any): boolean;
/**
* Handles a subscription change from a peer
* @param {string} id
* @param {RPC.SubOpt} subOpt
*/
_processRpcSubOpt(id: string, subOpt: any): void;
/**
* Handles an message from a peer
* @param {InMessage} msg
* @returns {Promise<void>}
*/
_processRpcMessage(msg: InMessage): Promise<void>;
/**
* Emit a message from a peer
* @param {InMessage} message
*/
_emitMessage(message: InMessage): void;
/**
* The default msgID implementation
* Child class can override this.
* @param {RPC.Message} msg the message object
* @returns {Uint8Array} message id as bytes
*/
getMsgId(msg: any): Uint8Array;
/**
* Whether to accept a message from a peer
* Override to create a graylist
* @override
* @param {string} id
* @returns {boolean}
*/
_acceptFrom(id: string): boolean;
/**
* Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf.
* @param {Uint8Array} bytes
* @returns {RPC}
*/
_decodeRpc(bytes: Uint8Array): any;
/**
* Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf.
* @param {RPC} rpc
* @returns {Uint8Array}
*/
_encodeRpc(rpc: any): Uint8Array;
/**
* Send an rpc object to a peer
* @param {string} id peer id
* @param {RPC} rpc
* @returns {void}
*/
_sendRpc(id: string, rpc: any): void;
/**
* Send subscroptions to a peer
* @param {string} id peer id
* @param {string[]} topics
* @param {boolean} subscribe set to false for unsubscriptions
* @returns {void}
*/
_sendSubscriptions(id: string, topics: string[], subscribe: boolean): void;
/**
* Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages
* @param {InMessage} message
* @returns {Promise<void>}
*/
validate(message: InMessage): Promise<void>;
/**
* Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send.
* @private
* @param {Message} message
* @returns {Promise<Message>}
*/
_buildMessage(message: any): Promise<any>;
/**
* Get a list of the peer-ids that are subscribed to one topic.
* @param {string} topic
* @returns {Array<string>}
*/
getSubscribers(topic: string): string[];
/**
* Publishes messages to all subscribed peers
* @override
* @param {string} topic
* @param {Buffer} message
* @returns {Promise<void>}
*/
publish(topic: string, message: Buffer): Promise<void>;
/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {InMessage} message
* @returns {Promise<void>}
*
*/
_publish(message: InMessage): Promise<void>;
/**
* Subscribes to a given topic.
* @abstract
* @param {string} topic
* @returns {void}
*/
subscribe(topic: string): void;
/**
* Unsubscribe from the given topic.
* @override
* @param {string} topic
* @returns {void}
*/
unsubscribe(topic: string): void;
/**
* Get the list of topics which the peer is subscribed to.
* @override
* @returns {Array<String>}
*/
getTopics(): string[];
}
declare namespace PubsubBaseProtocol {
export { message, utils, InMessage, PeerId };
}
type PeerId = import("peer-id");
/**
* Topic validator function
*/
type validator = (arg0: string, arg1: InMessage) => Promise<void>;
type InMessage = {
from?: string;
receivedFrom: string;
topicIDs: string[];
seqno?: Uint8Array;
data: Uint8Array;
signature?: Uint8Array;
key?: Uint8Array;
};
/**
* @type {typeof import('./message')}
*/
declare const message: typeof import('./message');
declare const utils: typeof import("./utils");

View File

@ -1,18 +1,19 @@
'use strict' 'use strict'
const debug = require('debug') const debug = require('debug')
/** @typedef {import('../types').EventEmitterFactory} Events */
/** @type Events */
const EventEmitter = require('events') const EventEmitter = require('events')
const errcode = require('err-code') const errcode = require('err-code')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology') const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors') const { codes } = require('./errors')
/**
* @type {typeof import('./message')}
*/
const message = require('./message') const message = require('./message')
const PeerStreams = require('./peer-streams') const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils') const utils = require('./utils')
const { const {
@ -20,6 +21,18 @@ const {
verifySignature verifySignature
} = require('./message/sign') } = require('./message/sign')
/**
* @typedef {any} Libp2p
* @typedef {import('peer-id')} PeerId
* @typedef {import('bl')} BufferList
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('../connection/connection')} Connection
* @typedef {import('./message/types').RPC} RPC
* @typedef {import('./message/types').SubOpts} RPCSubOpts
* @typedef {import('./message/types').Message} RPCMessage
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
*/
/** /**
* @typedef {Object} InMessage * @typedef {Object} InMessage
* @property {string} [from] * @property {string} [from]
@ -30,32 +43,31 @@ const {
* @property {Uint8Array} [signature] * @property {Uint8Array} [signature]
* @property {Uint8Array} [key] * @property {Uint8Array} [key]
* *
* @typedef PeerId * @typedef {Object} PubsubProperties
* @type import('peer-id') * @property {string} debugName - log namespace
* @property {Array<string>|string} multicodecs - protocol identificers to connect
* @property {Libp2p} libp2p
*
* @typedef {Object} PubsubOptions
* @property {SignaturePolicyType} [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @property {boolean} [canRelayMessage = false] - if can relay messages not subscribed
* @property {boolean} [emitSelf = false] - if publish should emit to self, if subscribed
*/ */
/** /**
* PubsubBaseProtocol handles the peers and connections logic for pubsub routers * PubsubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have. * and specifies the API that pubsub routers should have.
*/ */
class PubsubBaseProtocol extends EventEmitter { class PubsubBaseProtocol extends EventEmitter {
/** /**
* @param {Object} props * @param {PubsubProperties & PubsubOptions} props
* @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract * @abstract
*/ */
constructor ({ constructor ({
debugName, debugName,
multicodecs, multicodecs,
libp2p, libp2p,
signMessages = true, globalSignaturePolicy = SignaturePolicy.StrictSign,
strictSigning = true,
canRelayMessage = false, canRelayMessage = false,
emitSelf = false emitSelf = false
}) { }) {
@ -73,8 +85,9 @@ class PubsubBaseProtocol extends EventEmitter {
super() super()
this.log = debug(debugName) this.log = Object.assign(debug(debugName), {
this.log.err = debug(`${debugName}:error`) err: debug(`${debugName}:error`)
})
/** /**
* @type {Array<string>} * @type {Array<string>}
@ -98,6 +111,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* List of our subscriptions * List of our subscriptions
*
* @type {Set<string>} * @type {Set<string>}
*/ */
this.subscriptions = new Set() this.subscriptions = new Set()
@ -109,29 +123,35 @@ class PubsubBaseProtocol extends EventEmitter {
*/ */
this.peers = new Map() this.peers = new Map()
// Message signing // validate signature policy
this.signMessages = signMessages if (!SignaturePolicy[globalSignaturePolicy]) {
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGNATURE_POLICY)
}
/** /**
* If message signing should be required for incoming messages * The signature policy to follow by default
* @type {boolean} *
* @type {string}
*/ */
this.strictSigning = strictSigning this.globalSignaturePolicy = globalSignaturePolicy
/** /**
* If router can relay received messages, even if not subscribed * If router can relay received messages, even if not subscribed
*
* @type {boolean} * @type {boolean}
*/ */
this.canRelayMessage = canRelayMessage this.canRelayMessage = canRelayMessage
/** /**
* if publish should emit to self, if subscribed * if publish should emit to self, if subscribed
*
* @type {boolean} * @type {boolean}
*/ */
this.emitSelf = emitSelf this.emitSelf = emitSelf
/** /**
* Topic validator function * Topic validator function
*
* @typedef {function(string, InMessage): Promise<void>} validator * @typedef {function(string, InMessage): Promise<void>} validator
*/ */
/** /**
@ -139,6 +159,7 @@ class PubsubBaseProtocol extends EventEmitter {
* *
* Keyed by topic * Keyed by topic
* Topic validators are functions with the following input: * Topic validators are functions with the following input:
*
* @type {Map<string, validator>} * @type {Map<string, validator>}
*/ */
this.topicValidators = new Map() this.topicValidators = new Map()
@ -153,6 +174,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Register the pubsub protocol onto the libp2p node. * Register the pubsub protocol onto the libp2p node.
*
* @returns {void} * @returns {void}
*/ */
start () { start () {
@ -182,6 +204,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Unregister the pubsub protocol and the streams with other peers will be closed. * Unregister the pubsub protocol and the streams with other peers will be closed.
*
* @returns {void} * @returns {void}
*/ */
stop () { stop () {
@ -203,26 +226,28 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* On an inbound stream opened. * On an inbound stream opened.
* @private *
* @protected
* @param {Object} props * @param {Object} props
* @param {string} props.protocol * @param {string} props.protocol
* @param {DuplexIterableStream} props.stream * @param {MuxedStream} props.stream
* @param {Connection} props.connection connection * @param {Connection} props.connection - connection
*/ */
_onIncomingStream ({ protocol, stream, connection }) { _onIncomingStream ({ protocol, stream, connection }) {
const peerId = connection.remotePeer const peerId = connection.remotePeer
const idB58Str = peerId.toB58String() const idB58Str = peerId.toB58String()
const peer = this._addPeer(peerId, protocol) const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream) const inboundStream = peer.attachInboundStream(stream)
this._processMessages(idB58Str, peer.inboundStream, peer) this._processMessages(idB58Str, inboundStream, peer)
} }
/** /**
* Registrar notifies an established connection with pubsub protocol. * Registrar notifies an established connection with pubsub protocol.
* @private *
* @param {PeerId} peerId remote peer-id * @protected
* @param {Connection} conn connection to the peer * @param {PeerId} peerId - remote peer-id
* @param {Connection} conn - connection to the peer
*/ */
async _onPeerConnected (peerId, conn) { async _onPeerConnected (peerId, conn) {
const idB58Str = peerId.toB58String() const idB58Str = peerId.toB58String()
@ -242,9 +267,10 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Registrar notifies a closing connection with pubsub protocol. * Registrar notifies a closing connection with pubsub protocol.
* @private *
* @param {PeerId} peerId peerId * @protected
* @param {Error} err error for connection end * @param {PeerId} peerId - peerId
* @param {Error} [err] - error for connection end
*/ */
_onPeerDisconnected (peerId, err) { _onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String() const idB58Str = peerId.toB58String()
@ -255,7 +281,8 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Notifies the router that a peer has been connected * Notifies the router that a peer has been connected
* @private *
* @protected
* @param {PeerId} peerId * @param {PeerId} peerId
* @param {string} protocol * @param {string} protocol
* @returns {PeerStreams} * @returns {PeerStreams}
@ -285,7 +312,8 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Notifies the router that a peer has been disconnected. * Notifies the router that a peer has been disconnected.
* @private *
* @protected
* @param {PeerId} peerId * @param {PeerId} peerId
* @returns {PeerStreams | undefined} * @returns {PeerStreams | undefined}
*/ */
@ -315,9 +343,10 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Responsible for processing each RPC message received by other peers. * Responsible for processing each RPC message received by other peers.
* @param {string} idB58Str peer id string in base58 *
* @param {DuplexIterableStream} stream inbound stream * @param {string} idB58Str - peer id string in base58
* @param {PeerStreams} peerStreams PubSub peer * @param {AsyncIterable<Uint8Array|BufferList>} stream - inbound stream
* @param {PeerStreams} peerStreams - PubSub peer
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async _processMessages (idB58Str, stream, peerStreams) { async _processMessages (idB58Str, stream, peerStreams) {
@ -340,7 +369,8 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Handles an rpc request from a peer * Handles an rpc request from a peer
* @param {String} idB58Str *
* @param {string} idB58Str
* @param {PeerStreams} peerStreams * @param {PeerStreams} peerStreams
* @param {RPC} rpc * @param {RPC} rpc
* @returns {boolean} * @returns {boolean}
@ -352,7 +382,9 @@ class PubsubBaseProtocol extends EventEmitter {
if (subs.length) { if (subs.length) {
// update peer subscriptions // update peer subscriptions
subs.forEach((subOpt) => this._processRpcSubOpt(idB58Str, subOpt)) subs.forEach((/** @type {RPCSubOpts} */ subOpt) => {
this._processRpcSubOpt(idB58Str, subOpt)
})
this.emit('pubsub:subscription-change', peerStreams.id, subs) this.emit('pubsub:subscription-change', peerStreams.id, subs)
} }
@ -362,8 +394,9 @@ class PubsubBaseProtocol extends EventEmitter {
} }
if (msgs.length) { if (msgs.length) {
msgs.forEach(message => { // @ts-ignore RPC message is modified
if (!(this.canRelayMessage || message.topicIDs.some((topic) => this.subscriptions.has(topic)))) { msgs.forEach((message) => {
if (!(this.canRelayMessage || message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic)))) {
this.log('received message we didn\'t subscribe to. Dropping.') this.log('received message we didn\'t subscribe to. Dropping.')
return return
} }
@ -376,8 +409,9 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Handles a subscription change from a peer * Handles a subscription change from a peer
*
* @param {string} id * @param {string} id
* @param {RPC.SubOpt} subOpt * @param {RPCSubOpts} subOpt
*/ */
_processRpcSubOpt (id, subOpt) { _processRpcSubOpt (id, subOpt) {
const t = subOpt.topicID const t = subOpt.topicID
@ -399,6 +433,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Handles an message from a peer * Handles an message from a peer
*
* @param {InMessage} msg * @param {InMessage} msg
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
@ -423,6 +458,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Emit a message from a peer * Emit a message from a peer
*
* @param {InMessage} message * @param {InMessage} message
*/ */
_emitMessage (message) { _emitMessage (message) {
@ -436,16 +472,26 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* The default msgID implementation * The default msgID implementation
* Child class can override this. * Child class can override this.
* @param {RPC.Message} msg the message object *
* @param {RPCMessage} msg - the message object
* @returns {Uint8Array} message id as bytes * @returns {Uint8Array} message id as bytes
*/ */
getMsgId (msg) { getMsgId (msg) {
return utils.msgId(msg.from, msg.seqno) const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
return utils.noSignMsgId(msg.data)
default:
throw errcode(new Error('Cannot get message id: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
} }
/** /**
* Whether to accept a message from a peer * Whether to accept a message from a peer
* Override to create a graylist * Override to create a graylist
*
* @override * @override
* @param {string} id * @param {string} id
* @returns {boolean} * @returns {boolean}
@ -457,6 +503,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Decode Uint8Array into an RPC object. * Decode Uint8Array into an RPC object.
* This can be override to use a custom router protobuf. * This can be override to use a custom router protobuf.
*
* @param {Uint8Array} bytes * @param {Uint8Array} bytes
* @returns {RPC} * @returns {RPC}
*/ */
@ -467,6 +514,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Encode RPC object into a Uint8Array. * Encode RPC object into a Uint8Array.
* This can be override to use a custom router protobuf. * This can be override to use a custom router protobuf.
*
* @param {RPC} rpc * @param {RPC} rpc
* @returns {Uint8Array} * @returns {Uint8Array}
*/ */
@ -476,7 +524,8 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Send an rpc object to a peer * Send an rpc object to a peer
* @param {string} id peer id *
* @param {string} id - peer id
* @param {RPC} rpc * @param {RPC} rpc
* @returns {void} * @returns {void}
*/ */
@ -493,9 +542,10 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Send subscroptions to a peer * Send subscroptions to a peer
* @param {string} id peer id *
* @param {string} id - peer id
* @param {string[]} topics * @param {string[]} topics
* @param {boolean} subscribe set to false for unsubscriptions * @param {boolean} subscribe - set to false for unsubscriptions
* @returns {void} * @returns {void}
*/ */
_sendSubscriptions (id, topics, subscribe) { _sendSubscriptions (id, topics, subscribe) {
@ -507,24 +557,45 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Validates the given message. The signature will be checked for authenticity. * Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages * Throws an error on invalid messages
*
* @param {InMessage} message * @param {InMessage} message
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async validate (message) { // eslint-disable-line require-await async validate (message) { // eslint-disable-line require-await
// If strict signing is on and we have no signature, abort const signaturePolicy = this.globalSignaturePolicy
if (this.strictSigning && !message.signature) { switch (signaturePolicy) {
throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE) case SignaturePolicy.StrictNoSign:
if (message.from) {
throw errcode(new Error('StrictNoSigning: from should not be present'), codes.ERR_UNEXPECTED_FROM)
}
if (message.signature) {
throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
}
if (message.key) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}
if (message.seqno) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case SignaturePolicy.StrictSign:
if (!message.signature) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}
if (!message.seqno) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
}
if (!(await verifySignature(message))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
// Check the message signature if present
if (message.signature && !(await verifySignature(message))) {
throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
for (const topic of message.topicIDs) { for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic) const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) { if (!validatorFn) {
continue continue // eslint-disable-line
} }
await validatorFn(topic, message) await validatorFn(topic, message)
} }
@ -533,16 +604,22 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Normalizes the message and signs it, if signing is enabled. * Normalizes the message and signs it, if signing is enabled.
* Should be used by the routers to create the message to send. * Should be used by the routers to create the message to send.
* @private *
* @param {Message} message * @protected
* @returns {Promise<Message>} * @param {RPCMessage} message
* @returns {Promise<RPCMessage>}
*/ */
_buildMessage (message) { _buildMessage (message) {
const msg = utils.normalizeOutRpcMessage(message) const signaturePolicy = this.globalSignaturePolicy
if (this.signMessages) { switch (signaturePolicy) {
return signMessage(this.peerId, msg) case SignaturePolicy.StrictSign:
} else { message.from = this.peerId.toB58String()
return message message.seqno = utils.randomSeqno()
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
} }
} }
@ -550,6 +627,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Get a list of the peer-ids that are subscribed to one topic. * Get a list of the peer-ids that are subscribed to one topic.
*
* @param {string} topic * @param {string} topic
* @returns {Array<string>} * @returns {Array<string>}
*/ */
@ -571,9 +649,10 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Publishes messages to all subscribed peers * Publishes messages to all subscribed peers
*
* @override * @override
* @param {string} topic * @param {string} topic
* @param {Buffer} message * @param {Uint8Array} message
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async publish (topic, message) { async publish (topic, message) {
@ -586,13 +665,11 @@ class PubsubBaseProtocol extends EventEmitter {
const from = this.peerId.toB58String() const from = this.peerId.toB58String()
let msgObject = { let msgObject = {
receivedFrom: from, receivedFrom: from,
from: from,
data: message, data: message,
seqno: utils.randomSeqno(),
topicIDs: [topic] topicIDs: [topic]
} }
// ensure that any operations performed on the message will include the signature // ensure that the message follows the signature policy
const outMsg = await this._buildMessage(msgObject) const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg) msgObject = utils.normalizeInRpcMessage(outMsg)
@ -606,6 +683,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer * For example, a Floodsub implementation might simply publish each message to each topic for every peer
*
* @abstract * @abstract
* @param {InMessage} message * @param {InMessage} message
* @returns {Promise<void>} * @returns {Promise<void>}
@ -617,6 +695,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Subscribes to a given topic. * Subscribes to a given topic.
*
* @abstract * @abstract
* @param {string} topic * @param {string} topic
* @returns {void} * @returns {void}
@ -634,6 +713,7 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Unsubscribe from the given topic. * Unsubscribe from the given topic.
*
* @override * @override
* @param {string} topic * @param {string} topic
* @returns {void} * @returns {void}
@ -651,8 +731,9 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* Get the list of topics which the peer is subscribed to. * Get the list of topics which the peer is subscribed to.
*
* @override * @override
* @returns {Array<String>} * @returns {Array<string>}
*/ */
getTopics () { getTopics () {
if (!this.started) { if (!this.started) {
@ -663,6 +744,8 @@ class PubsubBaseProtocol extends EventEmitter {
} }
} }
PubsubBaseProtocol.message = message
PubsubBaseProtocol.utils = utils
PubsubBaseProtocol.SignaturePolicy = SignaturePolicy
module.exports = PubsubBaseProtocol module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils

View File

@ -1,5 +0,0 @@
export var rpc: any;
export var td: any;
export var RPC: any;
export var Message: any;
export var SubOpts: any;

View File

@ -1,17 +1,16 @@
'use strict' 'use strict'
// @ts-ignore protons not typed
const protons = require('protons') const protons = require('protons')
const rpcProto = protons(require('./rpc.proto.js')) const rpcProto = protons(require('./rpc.proto.js'))
const RPC = rpcProto.RPC const RPC = rpcProto.RPC
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js')) const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))
/** module.exports = {
* @module pubsub/message/index rpc: rpcProto,
*/ td: topicDescriptorProto,
exports = module.exports RPC,
exports.rpc = rpcProto Message: RPC.Message,
exports.td = topicDescriptorProto SubOpts: RPC.SubOpts
exports.RPC = RPC }
exports.Message = RPC.Message
exports.SubOpts = RPC.SubOpts

View File

@ -1,2 +0,0 @@
declare const _exports: string;
export = _exports;

View File

@ -1,23 +0,0 @@
/**
* Returns the PublicKey associated with the given message.
* If no, valid PublicKey can be retrieved an error will be returned.
*
* @param {InMessage} message
* @returns {Promise<PublicKey>}
*/
export function messagePublicKey(message: any): Promise<any>;
/**
* Signs the provided message with the given `peerId`
*
* @param {PeerId} peerId
* @param {Message} message
* @returns {Promise<Message>}
*/
export function signMessage(peerId: import("peer-id"), message: any): Promise<any>;
export const SignPrefix: any;
/**
* Verifies the signature of the given message
* @param {InMessage} message
* @returns {Promise<Boolean>}
*/
export function verifySignature(message: any): Promise<boolean>;

View File

@ -31,18 +31,24 @@ async function signMessage (peerId, message) {
/** /**
* Verifies the signature of the given message * Verifies the signature of the given message
*
* @param {InMessage} message * @param {InMessage} message
* @returns {Promise<Boolean>} * @returns {Promise<boolean>}
*/ */
async function verifySignature (message) { async function verifySignature (message) {
if (!message.signature) {
throw new Error('Message must contain a signature to be verified')
}
// Get message sans the signature // Get message sans the signature
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes()
const bytes = uint8ArrayConcat([ const bytes = uint8ArrayConcat([
SignPrefix, SignPrefix,
Message.encode(baseMessage) Message.encode({
...message,
from: message.from && PeerId.createFromCID(message.from).toBytes(),
signature: undefined,
key: undefined
})
]) ])
// Get the public key // Get the public key
@ -61,13 +67,17 @@ async function verifySignature (message) {
*/ */
async function messagePublicKey (message) { async function messagePublicKey (message) {
// should be available in the from property of the message (peer id) // should be available in the from property of the message (peer id)
if (!message.from) {
throw new Error('Could not get the public key from the originator id')
}
const from = PeerId.createFromCID(message.from) const from = PeerId.createFromCID(message.from)
if (message.key) { if (message.key) {
const keyPeerId = await PeerId.createFromPubKey(message.key) const keyPeerId = await PeerId.createFromPubKey(message.key)
// the key belongs to the sender, return the key // the key belongs to the sender, return the key
if (keyPeerId.isEqual(from)) return keyPeerId.pubKey if (keyPeerId.equals(from)) return keyPeerId.pubKey
// We couldn't validate pubkey is from the originator, error // We couldn't validate pubkey is from the originator, error
throw new Error('Public Key does not match the originator') throw new Error('Public Key does not match the originator')
} else if (from.pubKey) { } else if (from.pubKey) {
@ -77,6 +87,11 @@ async function messagePublicKey (message) {
} }
} }
/**
* @typedef {import('..').InMessage} InMessage
* @typedef {import('libp2p-crypto').PublicKey} PublicKey
*/
module.exports = { module.exports = {
messagePublicKey, messagePublicKey,
signMessage, signMessage,

View File

@ -1,2 +0,0 @@
declare const _exports: string;
export = _exports;

5
src/pubsub/message/types.d.ts vendored Normal file
View File

@ -0,0 +1,5 @@
import { RPC, Message, SubOpts } from './types'
export type RPC = RPC
export type Message = Message
export type SubOpts = SubOpts

View File

@ -1,113 +0,0 @@
export = PeerStreams;
/**
* @callback Sink
* @param {Uint8Array} source
* @returns {Promise<Uint8Array>}
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
*/
/**
* Thin wrapper around a peer's inbound / outbound pubsub streams
*/
declare class PeerStreams {
/**
* @param {object} properties properties of the PeerStreams.
* @param {PeerId} properties.id
* @param {string} properties.protocol
*/
constructor({ id, protocol }: {
id: import("peer-id");
protocol: string;
});
/**
* @type {import('peer-id')}
*/
id: import('peer-id');
/**
* Established protocol
* @type {string}
*/
protocol: string;
/**
* The raw outbound stream, as retrieved from conn.newStream
* @private
* @type {DuplexIterableStream}
*/
_rawOutboundStream: DuplexIterableStream;
/**
* The raw inbound stream, as retrieved from the callback from libp2p.handle
* @private
* @type {DuplexIterableStream}
*/
_rawInboundStream: DuplexIterableStream;
/**
* An AbortController for controlled shutdown of the inbound stream
* @private
* @type {typeof AbortController}
*/
_inboundAbortController: typeof AbortController;
/**
* Write stream -- its preferable to use the write method
* @type {import('it-pushable').Pushable<Uint8Array>>}
*/
outboundStream: import('it-pushable').Pushable<Uint8Array>;
/**
* Read stream
* @type {DuplexIterableStream}
*/
inboundStream: DuplexIterableStream;
/**
* Do we have a connection to read from?
*
* @type {boolean}
*/
get isReadable(): boolean;
/**
* Do we have a connection to write on?
*
* @type {boolean}
*/
get isWritable(): boolean;
/**
* Send a message to this peer.
* Throws if there is no `stream` to write to available.
*
* @param {Uint8Array} data
* @returns {void}
*/
write(data: Uint8Array): void;
/**
* Attach a raw inbound stream and setup a read stream
*
* @param {DuplexIterableStream} stream
* @returns {void}
*/
attachInboundStream(stream: DuplexIterableStream): void;
/**
* Attach a raw outbound stream and setup a write stream
*
* @param {Stream} stream
* @returns {Promise<void>}
*/
attachOutboundStream(stream: any): Promise<void>;
/**
* Closes the open connection to peer
* @returns {void}
*/
close(): void;
}
declare namespace PeerStreams {
export { Sink, DuplexIterableStream, PeerId };
}
type DuplexIterableStream = {
sink: Sink;
source: () => AsyncIterator<Uint8Array, any, undefined>;
};
declare const AbortController: typeof import("abort-controller");
type Sink = (source: Uint8Array) => Promise<Uint8Array>;
type PeerId = import("peer-id");

View File

@ -1,28 +1,25 @@
'use strict' 'use strict'
const debug = require('debug')
const log = Object.assign(debug('libp2p-pubsub:peer-streams'), {
error: debug('libp2p-pubsub:peer-streams:err')
})
/** @typedef {import('../types').EventEmitterFactory} Events */
/** @type Events */
const EventEmitter = require('events') const EventEmitter = require('events')
const lp = require('it-length-prefixed') const lp = require('it-length-prefixed')
const pushable = require('it-pushable')
const pipe = require('it-pipe')
const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const debug = require('debug')
const log = debug('libp2p-pubsub:peer-streams') const pushable = require('it-pushable')
log.error = debug('libp2p-pubsub:peer-streams:error') const { pipe } = require('it-pipe')
const { source: abortable } = require('abortable-iterator')
const AbortController = require('abort-controller').default
/** /**
* @callback Sink * @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @param {Uint8Array} source * @typedef {import('peer-id')} PeerId
* @returns {Promise<Uint8Array>} * @typedef {import('it-pushable').Pushable<Uint8Array>} PushableStream
*
* @typedef {object} DuplexIterableStream
* @property {Sink} sink
* @property {() AsyncIterator<Uint8Array>} source
*
* @typedef PeerId
* @type import('peer-id')
*/ */
/** /**
@ -30,7 +27,7 @@ log.error = debug('libp2p-pubsub:peer-streams:error')
*/ */
class PeerStreams extends EventEmitter { class PeerStreams extends EventEmitter {
/** /**
* @param {object} properties properties of the PeerStreams. * @param {object} properties - properties of the PeerStreams.
* @param {PeerId} properties.id * @param {PeerId} properties.id
* @param {string} properties.protocol * @param {string} properties.protocol
*/ */
@ -43,35 +40,41 @@ class PeerStreams extends EventEmitter {
this.id = id this.id = id
/** /**
* Established protocol * Established protocol
*
* @type {string} * @type {string}
*/ */
this.protocol = protocol this.protocol = protocol
/** /**
* The raw outbound stream, as retrieved from conn.newStream * The raw outbound stream, as retrieved from conn.newStream
*
* @private * @private
* @type {DuplexIterableStream} * @type {null|MuxedStream}
*/ */
this._rawOutboundStream = null this._rawOutboundStream = null
/** /**
* The raw inbound stream, as retrieved from the callback from libp2p.handle * The raw inbound stream, as retrieved from the callback from libp2p.handle
*
* @private * @private
* @type {DuplexIterableStream} * @type {null|MuxedStream}
*/ */
this._rawInboundStream = null this._rawInboundStream = null
/** /**
* An AbortController for controlled shutdown of the inbound stream * An AbortController for controlled shutdown of the inbound stream
*
* @private * @private
* @type {typeof AbortController} * @type {AbortController}
*/ */
this._inboundAbortController = null this._inboundAbortController = new AbortController()
/** /**
* Write stream -- its preferable to use the write method * Write stream -- its preferable to use the write method
* @type {import('it-pushable').Pushable<Uint8Array>>} *
* @type {null|PushableStream}
*/ */
this.outboundStream = null this.outboundStream = null
/** /**
* Read stream * Read stream
* @type {DuplexIterableStream} *
* @type {null| AsyncIterable<Uint8Array>}
*/ */
this.inboundStream = null this.inboundStream = null
} }
@ -102,7 +105,7 @@ class PeerStreams extends EventEmitter {
* @returns {void} * @returns {void}
*/ */
write (data) { write (data) {
if (!this.isWritable) { if (!this.outboundStream) {
const id = this.id.toB58String() const id = this.id.toB58String()
throw new Error('No writable connection to ' + id) throw new Error('No writable connection to ' + id)
} }
@ -113,15 +116,14 @@ class PeerStreams extends EventEmitter {
/** /**
* Attach a raw inbound stream and setup a read stream * Attach a raw inbound stream and setup a read stream
* *
* @param {DuplexIterableStream} stream * @param {MuxedStream} stream
* @returns {void} * @returns {AsyncIterable<Uint8Array>}
*/ */
attachInboundStream (stream) { attachInboundStream (stream) {
// Create and attach a new inbound stream // Create and attach a new inbound stream
// The inbound stream is: // The inbound stream is:
// - abortable, set to only return on abort, rather than throw // - abortable, set to only return on abort, rather than throw
// - transformed with length-prefix transform // - transformed with length-prefix transform
this._inboundAbortController = new AbortController()
this._rawInboundStream = stream this._rawInboundStream = stream
this.inboundStream = abortable( this.inboundStream = abortable(
pipe( pipe(
@ -133,31 +135,31 @@ class PeerStreams extends EventEmitter {
) )
this.emit('stream:inbound') this.emit('stream:inbound')
return this.inboundStream
} }
/** /**
* Attach a raw outbound stream and setup a write stream * Attach a raw outbound stream and setup a write stream
* *
* @param {Stream} stream * @param {MuxedStream} stream
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async attachOutboundStream (stream) { async attachOutboundStream (stream) {
// If an outbound stream already exists, // If an outbound stream already exists, gently close it
// gently close it
const _prevStream = this.outboundStream const _prevStream = this.outboundStream
if (_prevStream) { if (this.outboundStream) {
// End the stream without emitting a close event // End the stream without emitting a close event
await this.outboundStream.end(false) await this.outboundStream.end()
} }
this._rawOutboundStream = stream this._rawOutboundStream = stream
this.outboundStream = pushable({ this.outboundStream = pushable({
onEnd: (shouldEmit) => { onEnd: (shouldEmit) => {
// close writable side of the stream // close writable side of the stream
this._rawOutboundStream.reset && this._rawOutboundStream.reset() this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset()
this._rawOutboundStream = null this._rawOutboundStream = null
this.outboundStream = null this.outboundStream = null
if (shouldEmit !== false) { if (shouldEmit) {
this.emit('close') this.emit('close')
} }
} }
@ -167,7 +169,7 @@ class PeerStreams extends EventEmitter {
this.outboundStream, this.outboundStream,
lp.encode(), lp.encode(),
this._rawOutboundStream this._rawOutboundStream
).catch(err => { ).catch(/** @param {Error} err */ err => {
log.error(err) log.error(err)
}) })
@ -179,6 +181,7 @@ class PeerStreams extends EventEmitter {
/** /**
* Closes the open connection to peer * Closes the open connection to peer
*
* @returns {void} * @returns {void}
*/ */
close () { close () {

View File

@ -0,0 +1,33 @@
'use strict'
/**
* Enum for Signature Policy
* Details how message signatures are produced/consumed
*/
const SignaturePolicy = {
/**
* On the producing side:
* * Build messages with the signature, key (from may be enough for certain inlineable public key types), from and seqno fields.
*
* On the consuming side:
* * Enforce the fields to be present, reject otherwise.
* * Propagate only if the fields are valid and signature can be verified, reject otherwise.
*/
StrictSign: /** @type {'StrictSign'} */ ('StrictSign'),
/**
* On the producing side:
* * Build messages without the signature, key, from and seqno fields.
* * The corresponding protobuf key-value pairs are absent from the marshalled message, not just empty.
*
* On the consuming side:
* * Enforce the fields to be absent, reject otherwise.
* * Propagate only if the fields are absent, reject otherwise.
* * A message_id function will not be able to use the above fields, and should instead rely on the data field. A commonplace strategy is to calculate a hash.
*/
StrictNoSign: /** @type {'StrictNoSign'} */ ('StrictNoSign')
}
exports.SignaturePolicy = SignaturePolicy
/**
* @typedef {SignaturePolicy[keyof SignaturePolicy]} SignaturePolicyType
*/

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -76,7 +77,7 @@ module.exports = (common) => {
const defer = pDefer() const defer = pDefer()
const handler = (msg) => { const handler = (msg) => {
expect(msg).to.exist() expect(msg).to.not.eql(undefined)
defer.resolve() defer.resolve()
} }

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -62,7 +63,7 @@ module.exports = (common) => {
pubsub.publish(topic, data) pubsub.publish(topic, data)
// Wait 1 second to guarantee that self is not noticed // Wait 1 second to guarantee that self is not noticed
return new Promise((resolve) => setTimeout(() => resolve(), 1000)) return new Promise((resolve) => setTimeout(resolve, 1000))
}) })
}) })
}) })

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -10,6 +11,7 @@ const uint8ArrayFromString = require('uint8arrays/from-string')
const { utils } = require('..') const { utils } = require('..')
const PeerStreams = require('../peer-streams') const PeerStreams = require('../peer-streams')
const { SignaturePolicy } = require('../signature-policy')
const topic = 'foo' const topic = 'foo'
const data = uint8ArrayFromString('bar') const data = uint8ArrayFromString('bar')
@ -31,24 +33,17 @@ module.exports = (common) => {
}) })
it('should emit normalized signed messages on publish', async () => { it('should emit normalized signed messages on publish', async () => {
pubsub.globalSignaturePolicy = SignaturePolicy.StrictSign
sinon.spy(pubsub, '_emitMessage') sinon.spy(pubsub, '_emitMessage')
sinon.spy(utils, 'randomSeqno')
await pubsub.publish(topic, data) await pubsub.publish(topic, data)
expect(pubsub._emitMessage.callCount).to.eql(1) expect(pubsub._emitMessage.callCount).to.eql(1)
const [messageToEmit] = pubsub._emitMessage.getCall(0).args const [messageToEmit] = pubsub._emitMessage.getCall(0).args
const expected = utils.normalizeInRpcMessage( expect(messageToEmit.seqno).to.not.eql(undefined)
await pubsub._buildMessage({ expect(messageToEmit.key).to.not.eql(undefined)
receivedFrom: pubsub.peerId.toB58String(), expect(messageToEmit.signature).to.not.eql(undefined)
from: pubsub.peerId.toB58String(),
data,
seqno: utils.randomSeqno.getCall(0).returnValue,
topicIDs: [topic]
}))
expect(messageToEmit).to.eql(expected)
}) })
it('should drop unsigned messages', async () => { it('should drop unsigned messages', async () => {
@ -56,7 +51,10 @@ module.exports = (common) => {
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
sinon.spy(pubsub, 'validate') sinon.spy(pubsub, 'validate')
const peerStream = new PeerStreams({ id: await PeerId.create() }) const peerStream = new PeerStreams({
id: await PeerId.create(),
protocol: 'test'
})
const rpc = { const rpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
@ -83,18 +81,20 @@ module.exports = (common) => {
}) })
it('should not drop unsigned messages if strict signing is disabled', async () => { it('should not drop unsigned messages if strict signing is disabled', async () => {
pubsub.globalSignaturePolicy = SignaturePolicy.StrictNoSign
sinon.spy(pubsub, '_emitMessage') sinon.spy(pubsub, '_emitMessage')
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
sinon.spy(pubsub, 'validate') sinon.spy(pubsub, 'validate')
sinon.stub(pubsub, 'strictSigning').value(false)
const peerStream = new PeerStreams({ id: await PeerId.create() }) const peerStream = new PeerStreams({
id: await PeerId.create(),
protocol: 'test'
})
const rpc = { const rpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peerStream.id.toBytes(),
data, data,
seqno: utils.randomSeqno(),
topicIDs: [topic] topicIDs: [topic]
}] }]
} }

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */ /* eslint max-nested-callbacks: ["error", 6] */
'use strict' 'use strict'
@ -52,26 +53,20 @@ module.exports = (common) => {
await common.teardown() await common.teardown()
}) })
it('subscribe to the topic on node a', () => { it('subscribe to the topic on node a', async () => {
const topic = 'Z' const topic = 'Z'
const defer = pDefer()
psA.subscribe(topic) psA.subscribe(topic)
expectSet(psA.subscriptions, [topic]) expectSet(psA.subscriptions, [topic])
psB.once('pubsub:subscription-change', () => { await new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
expect(psB.peers.size).to.equal(2) expect(psB.peers.size).to.equal(2)
const aPeerId = psA.peerId.toB58String() const aPeerId = psA.peerId.toB58String()
expectSet(psB.topics.get(topic), [aPeerId]) expectSet(psB.topics.get(topic), [aPeerId])
expect(psC.peers.size).to.equal(1) expect(psC.peers.size).to.equal(1)
expect(psC.topics.get(topic)).to.not.exist() expect(psC.topics.get(topic)).to.eql(undefined)
defer.resolve()
})
return defer.promise
}) })
it('subscribe to the topic on node b', async () => { it('subscribe to the topic on node b', async () => {
@ -119,9 +114,9 @@ module.exports = (common) => {
// await subscription change // await subscription change
await Promise.all([ await Promise.all([
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve(null))),
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve(null))),
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve(null)))
]) ])
// await a cycle // await a cycle
@ -172,9 +167,9 @@ module.exports = (common) => {
// await subscription change // await subscription change
await Promise.all([ await Promise.all([
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve(null))),
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve(null))),
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve(null)))
]) ])
psA.on(topic, incMsg) psA.on(topic, incMsg)

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */ /* eslint max-nested-callbacks: ["error", 6] */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
'use strict' 'use strict'
const { expect } = require('chai') const { expect } = require('chai')

View File

@ -1,6 +0,0 @@
export function randomSeqno(): Uint8Array;
export function msgId(from: string, seqno: Uint8Array): Uint8Array;
export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean;
export function ensureArray(maybeArray: any): any[];
export function normalizeInRpcMessage(message: any, peerId: string): any;
export function normalizeOutRpcMessage(message: any): any;

View File

@ -1,10 +1,11 @@
'use strict' 'use strict'
// @ts-ignore libp2p crypto has no types
const randomBytes = require('libp2p-crypto/src/random-bytes') const randomBytes = require('libp2p-crypto/src/random-bytes')
const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const PeerId = require('peer-id') const PeerId = require('peer-id')
exports = module.exports const multihash = require('multihashes')
/** /**
* Generatea random sequence number. * Generatea random sequence number.
@ -12,7 +13,7 @@ exports = module.exports
* @returns {Uint8Array} * @returns {Uint8Array}
* @private * @private
*/ */
exports.randomSeqno = () => { const randomSeqno = () => {
return randomBytes(8) return randomBytes(8)
} }
@ -24,7 +25,7 @@ exports.randomSeqno = () => {
* @returns {Uint8Array} * @returns {Uint8Array}
* @private * @private
*/ */
exports.msgId = (from, seqno) => { const msgId = (from, seqno) => {
const fromBytes = PeerId.createFromB58String(from).id const fromBytes = PeerId.createFromB58String(from).id
const msgId = new Uint8Array(fromBytes.length + seqno.length) const msgId = new Uint8Array(fromBytes.length + seqno.length)
msgId.set(fromBytes, 0) msgId.set(fromBytes, 0)
@ -32,20 +33,35 @@ exports.msgId = (from, seqno) => {
return msgId return msgId
} }
/**
* Generate a message id, based on message `data`.
*
* @param {Uint8Array} data
* @returns {Uint8Array}
* @private
*/
const noSignMsgId = (data) => multihash.encode(data, 'sha2-256')
/** /**
* Check if any member of the first set is also a member * Check if any member of the first set is also a member
* of the second set. * of the second set.
* *
* @param {Set|Array} a * @param {Set<number>|Array<number>} a
* @param {Set|Array} b * @param {Set<number>|Array<number>} b
* @returns {boolean} * @returns {boolean}
* @private * @private
*/ */
exports.anyMatch = (a, b) => { const anyMatch = (a, b) => {
let bHas let bHas
if (Array.isArray(b)) { if (Array.isArray(b)) {
/**
* @param {number} val
*/
bHas = (val) => b.indexOf(val) > -1 bHas = (val) => b.indexOf(val) > -1
} else { } else {
/**
* @param {number} val
*/
bHas = (val) => b.has(val) bHas = (val) => b.has(val)
} }
@ -61,11 +77,12 @@ exports.anyMatch = (a, b) => {
/** /**
* Make everything an array. * Make everything an array.
* *
* @param {any} maybeArray * @template T
* @returns {Array} * @param {T|T[]} maybeArray
* @returns {T[]}
* @private * @private
*/ */
exports.ensureArray = (maybeArray) => { const ensureArray = (maybeArray) => {
if (!Array.isArray(maybeArray)) { if (!Array.isArray(maybeArray)) {
return [maybeArray] return [maybeArray]
} }
@ -75,11 +92,13 @@ exports.ensureArray = (maybeArray) => {
/** /**
* Ensures `message.from` is base58 encoded * Ensures `message.from` is base58 encoded
* @param {object} message *
* @param {String} peerId * @template {{from?:any}} T
* @return {object} * @param {T & {from?:string, receivedFrom:string}} message
* @param {string} [peerId]
* @returns {T & {from?: string, peerId?: string }}
*/ */
exports.normalizeInRpcMessage = (message, peerId) => { const normalizeInRpcMessage = (message, peerId) => {
const m = Object.assign({}, message) const m = Object.assign({}, message)
if (message.from instanceof Uint8Array) { if (message.from instanceof Uint8Array) {
m.from = uint8ArrayToString(message.from, 'base58btc') m.from = uint8ArrayToString(message.from, 'base58btc')
@ -91,16 +110,28 @@ exports.normalizeInRpcMessage = (message, peerId) => {
} }
/** /**
* @param {object} message * @template {{from?:any, data?:any}} T
* @return {object} *
* @param {T} message
* @returns {T & {from?: Uint8Array, data?: Uint8Array}}
*/ */
exports.normalizeOutRpcMessage = (message) => { const normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message) const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) { if (typeof message.from === 'string') {
m.from = uint8ArrayFromString(message.from, 'base58btc') m.from = uint8ArrayFromString(message.from, 'base58btc')
} }
if (typeof message.data === 'string' || message.data instanceof String) { if (typeof message.data === 'string') {
m.data = uint8ArrayFromString(message.data) m.data = uint8ArrayFromString(message.data)
} }
return m return m
} }
module.exports = {
randomSeqno,
msgId,
noSignMsgId,
anyMatch,
ensureArray,
normalizeInRpcMessage,
normalizeOutRpcMessage
}

View File

@ -36,15 +36,30 @@ const fromString = require('uint8arrays/from-string')
const ENVELOPE_DOMAIN_PEER_RECORD = 'libp2p-peer-record' const ENVELOPE_DOMAIN_PEER_RECORD = 'libp2p-peer-record'
const ENVELOPE_PAYLOAD_TYPE_PEER_RECORD = fromString('0301', 'hex') const ENVELOPE_PAYLOAD_TYPE_PEER_RECORD = fromString('0301', 'hex')
class PeerRecord extends Record { /**
* @implements {import('libp2p-interfaces/src/record/types').Record}
*/
class PeerRecord {
constructor (peerId, multiaddrs, seqNumber) { constructor (peerId, multiaddrs, seqNumber) {
super (ENVELOPE_DOMAIN_PEER_RECORD, ENVELOPE_PAYLOAD_TYPE_PEER_RECORD) this.domain = ENVELOPE_DOMAIN_PEER_RECORD
this.codec = ENVELOPE_PAYLOAD_TYPE_PEER_RECORD
} }
/**
* Marshal a record to be used in an envelope.
*
* @returns {Uint8Array}
*/
marshal () { marshal () {
// Implement and return using Protobuf // Implement and return using Protobuf
} }
/**
* Returns true if `this` record equals the `other`.
*
* @param {PeerRecord} other
* @returns {other is Record}
*/
equals (other) { equals (other) {
// Verify // Verify
} }
@ -73,4 +88,4 @@ Verifies if the other Record is identical to this one.
- other is a `Record` to compare with the current instance. - other is a `Record` to compare with the current instance.
**Returns** **Returns**
- `boolean` - `other is Record`

23
src/record/index.d.ts vendored
View File

@ -1,23 +0,0 @@
export = Record;
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
declare class Record {
/**
* @constructor
* @param {String} domain signature domain
* @param {Uint8Array} codec identifier of the type of record
*/
constructor(domain: string, codec: Uint8Array);
domain: string;
codec: Uint8Array;
/**
* Marshal a record to be used in an envelope.
*/
marshal(): void;
/**
* Verifies if the other provided Record is identical to this one.
* @param {Record} other
*/
equals(other: Record): void;
}

View File

@ -1,35 +0,0 @@
'use strict'
const errcode = require('err-code')
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
class Record {
/**
* @constructor
* @param {String} domain signature domain
* @param {Uint8Array} codec identifier of the type of record
*/
constructor (domain, codec) {
this.domain = domain
this.codec = codec
}
/**
* Marshal a record to be used in an envelope.
*/
marshal () {
throw errcode(new Error('marshal must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
/**
* Verifies if the other provided Record is identical to this one.
* @param {Record} other
*/
equals (other) {
throw errcode(new Error('equals must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
}
module.exports = Record

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

21
src/record/types.d.ts vendored Normal file
View File

@ -0,0 +1,21 @@
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
export interface Record {
/**
* signature domain.
*/
domain: string;
/**
* identifier of the type of record
*/
codec: Uint8Array;
/**
* Marshal a record to be used in an envelope.
*/
marshal(): Uint8Array;
/**
* Verifies if the other provided Record is identical to this one.
*/
equals(other: unknown): boolean
}

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -5,7 +6,7 @@ const chai = require('chai')
chai.use(require('chai-checkmark')) chai.use(require('chai-checkmark'))
const { expect } = chai const { expect } = chai
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const { collect, map, consume } = require('streaming-iterables') const { collect, map, consume } = require('streaming-iterables')
function close (stream) { function close (stream) {
@ -20,8 +21,9 @@ async function closeAndWait (stream) {
/** /**
* A tick is considered valid if it happened between now * A tick is considered valid if it happened between now
* and `ms` milliseconds ago * and `ms` milliseconds ago
* @param {number} date Time in ticks *
* @param {number} ms max milliseconds that should have expired * @param {number} date - Time in ticks
* @param {number} ms - max milliseconds that should have expired
* @returns {boolean} * @returns {boolean}
*/ */
function isValidTick (date, ms = 5000) { function isValidTick (date, ms = 5000) {

View File

@ -1,14 +1,15 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */ /* eslint max-nested-callbacks: ["error", 8] */
'use strict' 'use strict'
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const { consume } = require('streaming-iterables') const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp') const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const abortable = require('abortable-iterator') const { source: abortable } = require('abortable-iterator')
const AbortController = require('abort-controller') const AbortController = require('abort-controller').default
const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayFromString = require('uint8arrays/from-string')
const mh = multiaddr('/ip4/127.0.0.1/tcp/0') const mh = multiaddr('/ip4/127.0.0.1/tcp/0')

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,8 +1,10 @@
// @ts-nocheck interface tests
'use strict' 'use strict'
const { expect } = require('chai') const { expect } = require('chai')
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const pLimit = require('p-limit') const pLimit = require('p-limit')
const { collect, tap, consume } = require('streaming-iterables') const { collect, tap, consume } = require('streaming-iterables')
@ -61,8 +63,11 @@ module.exports = async (Muxer, nStreams, nMsg, limit) => {
} }
function marker (n) { function marker (n) {
/** @type {Function} */
let check let check
let i = 0 let i = 0
/** @type {Promise<void>} */
const done = new Promise((resolve, reject) => { const done = new Promise((resolve, reject) => {
check = err => { check = err => {
i++ i++
@ -78,5 +83,7 @@ function marker (n) {
} }
} }
}) })
// @ts-ignore - TS can't see that assignement occured
return { check, done } return { check, done }
} }

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

51
src/stream-muxer/types.d.ts vendored Normal file
View File

@ -0,0 +1,51 @@
import BufferList from 'bl'
export interface MuxerFactory {
new (options: MuxerOptions): Muxer;
multicodec: string;
}
/**
* A libp2p stream muxer
*/
export interface Muxer {
readonly streams: Array<MuxedStream>;
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of th stream will be used.
*/
newStream (name?: string): MuxedStream;
/**
* A function called when receiving a new stream from the remote.
*/
onStream (stream: MuxedStream): void;
/**
* A function called when a stream ends.
*/
onStreamEnd (stream: MuxedStream): void;
}
export type MuxerOptions = {
onStream: (stream: MuxedStream) => void;
onStreamEnd: (stream: MuxedStream) => void;
maxMsgSize?: number;
}
export type MuxedTimeline = {
open: number;
close?: number;
}
export interface MuxedStream extends AsyncIterable<Uint8Array | BufferList> {
close: () => void;
abort: () => void;
reset: () => void;
sink: Sink;
source: AsyncIterable<Uint8Array | BufferList>;
timeline: MuxedTimeline;
id: string;
}
export type Sink = (source: Uint8Array) => Promise<void>;

View File

@ -1,42 +0,0 @@
declare const _exports: Topology;
export = _exports;
declare class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Object} [props.handlers]
* @param {function} [props.handlers.onConnect] protocol "onConnect" handler
* @param {function} [props.handlers.onDisconnect] protocol "onDisconnect" handler
* @constructor
*/
constructor({ min, max, handlers }: {
min: number;
max: number;
handlers?: {
onConnect?: Function;
onDisconnect?: Function;
};
});
min: number;
max: number;
_onConnect: Function;
_onDisconnect: Function;
/**
* Set of peers that support the protocol.
* @type {Set<string>}
*/
peers: Set<string>;
set registrar(arg: any);
_registrar: any;
/**
* @typedef PeerId
* @type {import('peer-id')}
*/
/**
* Notify about peer disconnected event.
* @param {PeerId} peerId
* @returns {void}
*/
disconnect(peerId: import("peer-id")): void;
}

View File

@ -1,17 +1,28 @@
'use strict' 'use strict'
const withIs = require('class-is')
const noop = () => {} const noop = () => {}
const topologySymbol = Symbol.for('@libp2p/js-interfaces/topology')
/**
* @typedef {import('peer-id')} PeerId
*/
/**
* @typedef {Object} Options
* @property {number} [min=0] - minimum needed connections.
* @property {number} [max=Infinity] - maximum needed connections.
* @property {Handlers} [handlers]
*
* @typedef {Object} Handlers
* @property {(peerId: PeerId, conn: Connection) => void} [onConnect] - protocol "onConnect" handler
* @property {(peerId: PeerId, error?:Error) => void} [onDisconnect] - protocol "onDisconnect" handler
*
* @typedef {import('../connection/connection')} Connection
*/
class Topology { class Topology {
/** /**
* @param {Object} props * @param {Options} options
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Object} [props.handlers]
* @param {function} [props.handlers.onConnect] protocol "onConnect" handler
* @param {function} [props.handlers.onDisconnect] protocol "onDisconnect" handler
* @constructor
*/ */
constructor ({ constructor ({
min = 0, min = 0,
@ -27,22 +38,40 @@ class Topology {
/** /**
* Set of peers that support the protocol. * Set of peers that support the protocol.
*
* @type {Set<string>} * @type {Set<string>}
*/ */
this.peers = new Set() this.peers = new Set()
} }
set registrar (registrar) { get [Symbol.toStringTag] () {
return 'Topology'
}
get [topologySymbol] () {
return true
}
/**
* Checks if the given value is a Topology instance.
*
* @param {any} other
* @returns {other is Topology}
*/
static isTopology (other) {
return Boolean(other && other[topologySymbol])
}
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
this._registrar = registrar this._registrar = registrar
} }
/**
* @typedef PeerId
* @type {import('peer-id')}
*/
/** /**
* Notify about peer disconnected event. * Notify about peer disconnected event.
*
* @param {PeerId} peerId * @param {PeerId} peerId
* @returns {void} * @returns {void}
*/ */
@ -51,8 +80,4 @@ class Topology {
} }
} }
/** module.exports = Topology
* @module
* @type {Topology}
*/
module.exports = withIs(Topology, { className: 'Topology', symbolName: '@libp2p/js-interfaces/topology' })

View File

@ -1,52 +0,0 @@
declare const _exports: MulticodecTopology;
export = _exports;
declare class MulticodecTopology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor({ min, max, multicodecs, handlers }: {
min: number;
max: number;
multicodecs: string[];
handlers: {
onConnect: Function;
onDisconnect: Function;
};
});
multicodecs: string[];
_registrar: any;
/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
*/
_onProtocolChange({ peerId, protocols }: {
peerId: any;
protocols: string[];
}): void;
/**
* Verify if a new connected peer has a topology multicodec and call _onConnect.
* @param {Connection} connection
* @returns {void}
*/
_onPeerConnect(connection: any): void;
set registrar(arg: any);
/**
* Update topology.
* @param {Array<{id: PeerId, multiaddrs: Array<Multiaddr>, protocols: Array<string>}>} peerDataIterable
* @returns {void}
*/
_updatePeers(peerDataIterable: {
id: any;
multiaddrs: any[];
protocols: string[];
}[]): void;
}

View File

@ -1,19 +1,11 @@
'use strict' 'use strict'
const withIs = require('class-is')
const Topology = require('./index') const Topology = require('./index')
const multicodecTopologySymbol = Symbol.for('@libp2p/js-interfaces/topology/multicodec-topology')
class MulticodecTopology extends Topology { class MulticodecTopology extends Topology {
/** /**
* @param {Object} props * @param {TopologyOptions & MulticodecOptions} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/ */
constructor ({ constructor ({
min, min,
@ -46,7 +38,28 @@ class MulticodecTopology extends Topology {
this._onPeerConnect = this._onPeerConnect.bind(this) this._onPeerConnect = this._onPeerConnect.bind(this)
} }
set registrar (registrar) { get [Symbol.toStringTag] () {
return 'Topology'
}
get [multicodecTopologySymbol] () {
return true
}
/**
* Checks if the given value is a `MulticodecTopology` instance.
*
* @param {any} other
* @returns {other is MulticodecTopology}
*/
static isMulticodecTopology (other) {
return Boolean(other && other[multicodecTopologySymbol])
}
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
this._registrar = registrar this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange) this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
this._registrar.connectionManager.on('peer:connect', this._onPeerConnect) this._registrar.connectionManager.on('peer:connect', this._onPeerConnect)
@ -57,6 +70,7 @@ class MulticodecTopology extends Topology {
/** /**
* Update topology. * Update topology.
*
* @param {Array<{id: PeerId, multiaddrs: Array<Multiaddr>, protocols: Array<string>}>} peerDataIterable * @param {Array<{id: PeerId, multiaddrs: Array<Multiaddr>, protocols: Array<string>}>} peerDataIterable
* @returns {void} * @returns {void}
*/ */
@ -77,6 +91,7 @@ class MulticodecTopology extends Topology {
/** /**
* Check if a new peer support the multicodecs for this topology. * Check if a new peer support the multicodecs for this topology.
*
* @param {Object} props * @param {Object} props
* @param {PeerId} props.peerId * @param {PeerId} props.peerId
* @param {Array<string>} props.protocols * @param {Array<string>} props.protocols
@ -102,10 +117,12 @@ class MulticodecTopology extends Topology {
/** /**
* Verify if a new connected peer has a topology multicodec and call _onConnect. * Verify if a new connected peer has a topology multicodec and call _onConnect.
*
* @param {Connection} connection * @param {Connection} connection
* @returns {void} * @returns {void}
*/ */
_onPeerConnect (connection) { _onPeerConnect (connection) {
// @ts-ignore - remotePeer does not existist on Connection
const peerId = connection.remotePeer const peerId = connection.remotePeer
const protocols = this._registrar.peerStore.protoBook.get(peerId) const protocols = this._registrar.peerStore.protoBook.get(peerId)
@ -121,7 +138,13 @@ class MulticodecTopology extends Topology {
} }
/** /**
* @module * @typedef {import('peer-id')} PeerId
* @type {MulticodecTopology} * @typedef {import('multiaddr')} Multiaddr
* @typedef {import('../connection/connection')} Connection
* @typedef {import('.').Options} TopologyOptions
* @typedef {Object} MulticodecOptions
* @property {string[]} multicodecs - protocol multicodecs
* @property {Required<Handlers>} handlers
* @typedef {import('.').Handlers} Handlers
*/ */
module.exports = withIs(MulticodecTopology, { className: 'MulticodecTopology', symbolName: '@libp2p/js-interfaces/topology/multicodec-topology' }) module.exports = MulticodecTopology

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,6 +0,0 @@
export class AbortError extends Error {
static get code(): string;
static get type(): string;
code: string;
type: string;
}

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -9,8 +10,8 @@ chai.use(dirtyChai)
const { isValidTick } = require('./utils') const { isValidTick } = require('./utils')
const goodbye = require('it-goodbye') const goodbye = require('it-goodbye')
const { collect } = require('streaming-iterables') const { collect } = require('streaming-iterables')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const AbortController = require('abort-controller') const AbortController = require('abort-controller').default
const AbortError = require('../errors').AbortError const AbortError = require('../errors').AbortError
const sinon = require('sinon') const sinon = require('sinon')

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'

View File

@ -1,3 +1,4 @@
// @ts-nocheck interface tests
/* eslint max-nested-callbacks: ["error", 8] */ /* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
@ -9,7 +10,7 @@ chai.use(dirtyChai)
const sinon = require('sinon') const sinon = require('sinon')
const pWaitFor = require('p-wait-for') const pWaitFor = require('p-wait-for')
const pipe = require('it-pipe') const { pipe } = require('it-pipe')
const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayFromString = require('uint8arrays/from-string')
const { isValidTick } = require('./utils') const { isValidTick } = require('./utils')

View File

@ -4,8 +4,9 @@ module.exports = {
/** /**
* A tick is considered valid if it happened between now * A tick is considered valid if it happened between now
* and `ms` milliseconds ago * and `ms` milliseconds ago
* @param {number} date Time in ticks *
* @param {number} ms max milliseconds that should have expired * @param {number} date - Time in ticks
* @param {number} ms - max milliseconds that should have expired
* @returns {boolean} * @returns {boolean}
*/ */
isValidTick: function isValidTick (date, ms = 5000) { isValidTick: function isValidTick (date, ms = 5000) {

72
src/transport/types.d.ts vendored Normal file
View File

@ -0,0 +1,72 @@
import BufferList from 'bl'
import events from 'events'
import { Multiaddr } from 'multiaddr'
import Connection from '../connection/connection'
import { Sink } from '../stream-muxer/types'
export interface TransportFactory<DialOptions extends { signal?: AbortSignal }, ListenerOptions> {
new(upgrader: Upgrader): Transport<DialOptions, ListenerOptions>;
}
/**
* A libp2p transport is understood as something that offers a dial and listen interface to establish connections.
*/
export interface Transport <DialOptions extends { signal?: AbortSignal }, ListenerOptions> {
/**
* Dial a given multiaddr.
*/
dial(ma: Multiaddr, options?: DialOptions): Promise<Connection>;
/**
* Create transport listeners.
*/
createListener(options: ListenerOptions, handler?: (connection: Connection) => void): Listener;
/**
* Takes a list of `Multiaddr`s and returns only valid addresses for the transport
*/
filter(multiaddrs: Multiaddr[]): Multiaddr[];
}
export interface Listener extends events.EventEmitter {
/**
* Start a listener
*/
listen(multiaddr: Multiaddr): Promise<void>;
/**
* Get listen addresses
*/
getAddrs(): Multiaddr[];
/**
* Close listener
*
* @returns {Promise<void>}
*/
close(): Promise<void>;
}
export interface Upgrader {
/**
* Upgrades an outbound connection on `transport.dial`.
*/
upgradeOutbound(maConn: MultiaddrConnection): Promise<Connection>;
/**
* Upgrades an inbound connection on transport listener.
*/
upgradeInbound(maConn: MultiaddrConnection): Promise<Connection>;
}
export type MultiaddrConnectionTimeline = {
open: number;
upgraded?: number;
close?: number;
}
export type MultiaddrConnection = {
sink: Sink;
source: AsyncIterable<Uint8Array | BufferList>;
close: (err?: Error) => Promise<void>;
conn: unknown;
remoteAddr: Multiaddr;
localAddr?: Multiaddr;
timeline: MultiaddrConnectionTimeline;
}

18
src/types.d.ts vendored Normal file
View File

@ -0,0 +1,18 @@
export interface EventEmitterFactory {
new(): EventEmitter;
}
export interface EventEmitter {
addListener(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
on(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
once(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
removeListener(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
off(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
removeAllListeners(event?: string | symbol): EventEmitter;
setMaxListeners(n: number): EventEmitter;
getMaxListeners(): number;
listeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types
rawListeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types
emit(event: string | symbol, ...args: any[]): boolean;
listenerCount(event: string | symbol): number;
}

View File

@ -13,11 +13,12 @@ describe('compliance tests', () => {
/** /**
* Test setup. `properties` allows the compliance test to override * Test setup. `properties` allows the compliance test to override
* certain values for testing. * certain values for testing.
*
* @param {*} properties * @param {*} properties
*/ */
async setup (properties) { async setup (properties) {
const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') const localAddr = new multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/8080')
const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') const remoteAddr = new multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/8081')
const [localPeer, remotePeer] = await Promise.all([ const [localPeer, remotePeer] = await Promise.all([
PeerId.createFromJSON(peers[0]), PeerId.createFromJSON(peers[0]),
PeerId.createFromJSON(peers[1]) PeerId.createFromJSON(peers[1])

View File

@ -41,7 +41,7 @@ class MockDiscovery extends EventEmitter {
this._timer = setTimeout(() => { this._timer = setTimeout(() => {
this.emit('peer', { this.emit('peer', {
id: peerId, id: peerId,
multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/8000')] multiaddrs: [new multiaddr.Multiaddr('/ip4/127.0.0.1/tcp/8000')]
}) })
}, this.options.discoveryDelay || 1000) }, this.options.discoveryDelay || 1000)
} }

View File

@ -5,7 +5,7 @@ const { expect } = require('aegir/utils/chai')
const sinon = require('sinon') const sinon = require('sinon')
const PubsubBaseImpl = require('../../src/pubsub') const PubsubBaseImpl = require('../../src/pubsub')
const { randomSeqno } = require('../../src/pubsub/utils') const { SignaturePolicy } = require('../../src/pubsub/signature-policy')
const { const {
createPeerId, createPeerId,
mockRegistrar mockRegistrar
@ -34,9 +34,7 @@ describe('pubsub base messages', () => {
it('_buildMessage normalizes and signs messages', async () => { it('_buildMessage normalizes and signs messages', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
@ -44,27 +42,46 @@ describe('pubsub base messages', () => {
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with strict signing off will validate a present signature', async () => { it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }
sinon.stub(pubsub, 'strictSigning').value(false) sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictSign)
const signedMessage = await pubsub._buildMessage(message)
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.from
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.signature
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.key
await expect(pubsub.validate(signedMessage)).to.be.rejected()
delete signedMessage.seqno
await expect(pubsub.validate(signedMessage)).to.not.be.rejected()
})
it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => {
const message = {
receivedFrom: peerId.id,
data: 'hello',
topicIDs: ['test-topic']
}
sinon.stub(pubsub, 'globalSignaturePolicy').value(SignaturePolicy.StrictNoSign)
const signedMessage = await pubsub._buildMessage(message) const signedMessage = await pubsub._buildMessage(message)
expect(pubsub.validate(signedMessage)).to.not.be.rejected() expect(pubsub.validate(signedMessage)).to.not.be.rejected()
}) })
it('validate with strict signing requires a signature', async () => { it('validate with StrictSign requires a signature', async () => {
const message = { const message = {
receivedFrom: peerId.id, receivedFrom: peerId.id,
from: peerId.id,
data: 'hello', data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic'] topicIDs: ['test-topic']
} }

View File

@ -10,8 +10,8 @@ const PeerId = require('peer-id')
const uint8ArrayEquals = require('uint8arrays/equals') const uint8ArrayEquals = require('uint8arrays/equals')
const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayFromString = require('uint8arrays/from-string')
const { utils } = require('../../src/pubsub')
const PeerStreams = require('../../src/pubsub/peer-streams') const PeerStreams = require('../../src/pubsub/peer-streams')
const { SignaturePolicy } = require('../../src/pubsub/signature-policy')
const { const {
createPeerId, createPeerId,
@ -30,6 +30,8 @@ describe('topic validators', () => {
pubsub = new PubsubImplementation(protocol, { pubsub = new PubsubImplementation(protocol, {
peerId: peerId, peerId: peerId,
registrar: mockRegistrar registrar: mockRegistrar
}, {
globalSignaturePolicy: SignaturePolicy.StrictNoSign
}) })
pubsub.start() pubsub.start()
@ -42,8 +44,6 @@ describe('topic validators', () => {
it('should filter messages by topic validator', async () => { it('should filter messages by topic validator', async () => {
// use _publish.callCount() to see if a message is valid or not // use _publish.callCount() to see if a message is valid or not
sinon.spy(pubsub, '_publish') sinon.spy(pubsub, '_publish')
// Disable strict signing
sinon.stub(pubsub, 'strictSigning').value(false)
sinon.stub(pubsub.peers, 'get').returns({}) sinon.stub(pubsub.peers, 'get').returns({})
const filteredTopic = 't' const filteredTopic = 't'
const peer = new PeerStreams({ id: await PeerId.create() }) const peer = new PeerStreams({ id: await PeerId.create() })
@ -59,9 +59,7 @@ describe('topic validators', () => {
const validRpc = { const validRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a message'), data: uint8ArrayFromString('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -76,9 +74,7 @@ describe('topic validators', () => {
const invalidRpc = { const invalidRpc = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toBytes(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }
@ -94,9 +90,7 @@ describe('topic validators', () => {
const invalidRpc2 = { const invalidRpc2 = {
subscriptions: [], subscriptions: [],
msgs: [{ msgs: [{
from: peer.id.toB58String(),
data: uint8ArrayFromString('a different message'), data: uint8ArrayFromString('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic] topicIDs: [filteredTopic]
}] }]
} }

View File

@ -1,18 +1,9 @@
{ {
"include": ["src/**/*.js"], "extends": "./node_modules/aegir/src/config/tsconfig.aegir.json",
"exclude": ["src/**/tests/*", "src/utils"],
"compilerOptions": { "compilerOptions": {
// Tells TypeScript to read JS files, as "outDir": "dist"
// normally they are ignored as source files },
"allowJs": true, "include": [
// Generate d.ts files "src"
"declaration": true, ]
// This compiler run should
// only output d.ts files
"emitDeclarationOnly": true,
"esModuleInterop": true,
"rootDir": "./src",
"outDir": "./src"
}
} }