From 6f090617aa44ca54ee674a8e38e8abd68d8cfc49 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 2 Dec 2020 16:04:16 +0100 Subject: [PATCH] chore: address review --- package.json | 1 - src/connection/connection.js | 23 ++++-------- src/crypto/types.ts | 26 +++----------- src/pubsub/index.js | 20 +++++------ src/pubsub/message/sign.js | 11 +++--- src/pubsub/peer-streams.js | 39 +++++++++----------- src/stream-muxer/tests/spawner.js | 4 +-- src/stream-muxer/types.ts | 19 ++-------- src/transport/types.ts | 60 +++---------------------------- 9 files changed, 53 insertions(+), 150 deletions(-) diff --git a/package.json b/package.json index 1822dce..9b9041b 100644 --- a/package.json +++ b/package.json @@ -75,7 +75,6 @@ "protons": "^2.0.0", "sinon": "^9.0.2", "streaming-iterables": "^5.0.2", - "typescript": "^4.1.2", "uint8arrays": "^1.1.0" }, "devDependencies": { diff --git a/src/connection/connection.js b/src/connection/connection.js index e727bdf..c429624 100644 --- a/src/connection/connection.js +++ b/src/connection/connection.js @@ -9,9 +9,15 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection') /** * @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream + * @typedef {import('./status').Status} Status */ /** + * @typedef {Object} Timeline + * @property {number} open - connection opening timestamp. + * @property {number} [upgraded] - connection upgraded timestamp. + * @property {number} [close] + * * @typedef {Object} ConectionStat * @property {string} direction - connection establishment direction ("inbound" or "outbound"). * @property {object} timeline - connection relevant events timestamp. @@ -75,7 +81,7 @@ class Connection { /** * Connection metadata. * - * @type {Stat & {status: Status}} + * @type {ConectionStat & {status: Status}} */ this._stat = { ...stat, @@ -223,21 +229,6 @@ class Connection { } } -/** - * @typedef {Object} Stat - * @property {string} direction - connection establishment direction ("inbound" or "outbound"). - * @property {Timeline} timeline - connection relevant events timestamp. - * @property {string} [multiplexer] - connection multiplexing identifier. - * @property {string} [encryption] - connection encryption method identifier. - * - * @typedef {Object} Timeline - * @property {number} open - connection opening timestamp. - * @property {number} [upgraded] - connection upgraded timestamp. - * @property {number} [close] - * - * @typedef {import('./status').Status} Status - */ - module.exports = Connection function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { diff --git a/src/crypto/types.ts b/src/crypto/types.ts index faa02cd..56286d5 100644 --- a/src/crypto/types.ts +++ b/src/crypto/types.ts @@ -1,40 +1,24 @@ +import PeerId from 'peer-id' +import { MultiaddrConnection } from '../transport/types' + /** * A libp2p crypto module must be compliant to this interface * to ensure all exchanged data between two peers is encrypted. */ -export interface CryptoInterface { +export interface Crypto { + protocol: string; /** * Encrypt outgoing data to the remote party. - * - * @param {PeerId} localPeer - PeerId of the receiving peer - * @param {MultiaddrConnection} connection - streaming iterable duplex that will be encrypted - * @param {PeerId} remotePeer - PeerId of the remote peer. Used to validate the integrity of the remote peer. - * @returns {Promise} */ secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise; - /** * Decrypt incoming data. - * - * @param {PeerId} localPeer - PeerId of the receiving peer. - * @param {MultiaddrConnection} connection - streaming iterable duplex that will be encryption. - * @param {PeerId} remotePeer - optional PeerId of the initiating peer, if known. This may only exist during transport upgrades. - * @returns {Promise} */ secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise; } -export declare class Crypto implements CryptoInterface { - protocol: string; - secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise; - secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise; -} - export type SecureOutbound = { conn: MultiaddrConnection; remoteEarlyData: Buffer; remotePeer: PeerId; } - -type PeerId = import('peer-id'); -type MultiaddrConnection = import('../transport/types').MultiaddrConnection diff --git a/src/pubsub/index.js b/src/pubsub/index.js index d1905a3..9982db7 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -216,7 +216,7 @@ class PubsubBaseProtocol extends EventEmitter { * @private * @param {Object} props * @param {string} props.protocol - * @param {DuplexIterableStream} props.stream + * @param {MuxedStream} props.stream * @param {Connection} props.connection - connection */ _onIncomingStream ({ protocol, stream, connection }) { @@ -225,8 +225,7 @@ class PubsubBaseProtocol extends EventEmitter { const peer = this._addPeer(peerId, protocol) peer.attachInboundStream(stream) - // @ts-ignore - peer.inboundStream maybe null - this._processMessages(idB58Str, peer.inboundStream, peer) + peer.inboundStream && this._processMessages(idB58Str, peer.inboundStream, peer) } /** @@ -243,7 +242,6 @@ class PubsubBaseProtocol extends EventEmitter { try { const { stream, protocol } = await conn.newStream(this.multicodecs) const peer = this._addPeer(peerId, protocol) - // @ts-ignore MuxedStream is not DuplexIterableStream await peer.attachOutboundStream(stream) } catch (err) { this.log.err(err) @@ -333,7 +331,7 @@ class PubsubBaseProtocol extends EventEmitter { * Responsible for processing each RPC message received by other peers. * * @param {string} idB58Str - peer id string in base58 - * @param {DuplexIterableStream} stream - inbound stream + * @param {MuxedStream} stream - inbound stream * @param {PeerStreams} peerStreams - PubSub peer * @returns {Promise} */ @@ -342,8 +340,8 @@ class PubsubBaseProtocol extends EventEmitter { await pipe( stream, async (source) => { - // @ts-ignore - DuplexIterableStream isn't defined as iterable for await (const data of source) { + // @ts-ignore data slice from BufferList const rpcBytes = data instanceof Uint8Array ? data : data.slice() const rpcMsg = this._decodeRpc(rpcBytes) @@ -732,7 +730,8 @@ class PubsubBaseProtocol extends EventEmitter { /** * @typedef {any} Libp2p - * @typedef {import('./peer-streams').DuplexIterableStream} DuplexIterableStream + * @typedef {object} MuxedStream + * @type import('../stream-muxer/types').MuxedStream * @typedef {import('../connection/connection')} Connection * @typedef {import('./message').RPC} RPC * @typedef {import('./message').SubOpts} RPCSubOpts @@ -740,7 +739,8 @@ class PubsubBaseProtocol extends EventEmitter { * @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType */ +PubsubBaseProtocol.message = message +PubsubBaseProtocol.utils = utils +PubsubBaseProtocol.SignaturePolicy = SignaturePolicy + module.exports = PubsubBaseProtocol -module.exports.message = message -module.exports.utils = utils -module.exports.SignaturePolicy = SignaturePolicy diff --git a/src/pubsub/message/sign.js b/src/pubsub/message/sign.js index fe27c13..4879562 100644 --- a/src/pubsub/message/sign.js +++ b/src/pubsub/message/sign.js @@ -40,11 +40,12 @@ async function verifySignature (message) { const baseMessage = { ...message } delete baseMessage.signature delete baseMessage.key - // @ts-ignore - from is optional - baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes() + const bytes = uint8ArrayConcat([ SignPrefix, - Message.encode(baseMessage) + Message.encode(Object.assign(baseMessage, { + from: baseMessage.from && PeerId.createFromCID(baseMessage.from).toBytes() + })) ]) // Get the public key @@ -64,14 +65,14 @@ async function verifySignature (message) { */ async function messagePublicKey (message) { // should be available in the from property of the message (peer id) - // @ts-ignore - from is optional + // @ts-ignore - from type changed const from = PeerId.createFromCID(message.from) if (message.key) { const keyPeerId = await PeerId.createFromPubKey(message.key) // the key belongs to the sender, return the key - if (keyPeerId.isEqual(from)) return keyPeerId.pubKey + if (keyPeerId.equals(from)) return keyPeerId.pubKey // We couldn't validate pubkey is from the originator, error throw new Error('Public Key does not match the originator') } else if (from.pubKey) { diff --git a/src/pubsub/peer-streams.js b/src/pubsub/peer-streams.js index fd3ce2f..4f35cda 100644 --- a/src/pubsub/peer-streams.js +++ b/src/pubsub/peer-streams.js @@ -20,12 +20,14 @@ log.error = debug('libp2p-pubsub:peer-streams:error') * @param {Uint8Array} source * @returns {Promise} * - * @typedef {object} DuplexIterableStream - * @property {Sink} sink - * @property {AsyncIterator} source + * @typedef {object} MuxedStream + * @type import('../stream-muxer/types').MuxedStream * * @typedef PeerId * @type import('peer-id') + * + * @typedef PushableStream + * @type import('it-pushable').Pushable */ /** @@ -54,33 +56,33 @@ class PeerStreams extends EventEmitter { * The raw outbound stream, as retrieved from conn.newStream * * @private - * @type {null|DuplexIterableStream} + * @type {null|MuxedStream} */ this._rawOutboundStream = null /** * The raw inbound stream, as retrieved from the callback from libp2p.handle * * @private - * @type {null|DuplexIterableStream} + * @type {null|MuxedStream} */ this._rawInboundStream = null /** * An AbortController for controlled shutdown of the inbound stream * * @private - * @type {null|AbortController} + * @type {AbortController} */ - this._inboundAbortController = null + this._inboundAbortController = new AbortController() /** * Write stream -- its preferable to use the write method * - * @type {null|import('it-pushable').Pushable} + * @type {null|PushableStream} */ this.outboundStream = null /** * Read stream * - * @type {null|DuplexIterableStream} + * @type {null|MuxedStream} */ this.inboundStream = null } @@ -123,7 +125,7 @@ class PeerStreams extends EventEmitter { /** * Attach a raw inbound stream and setup a read stream * - * @param {DuplexIterableStream} stream + * @param {MuxedStream} stream * @returns {void} */ attachInboundStream (stream) { @@ -131,15 +133,13 @@ class PeerStreams extends EventEmitter { // The inbound stream is: // - abortable, set to only return on abort, rather than throw // - transformed with length-prefix transform - this._inboundAbortController = new AbortController() this._rawInboundStream = stream - // @ts-ignore - abortable returns AsyncIterable and not a DuplexIterableStream + // @ts-ignore - abortable returns AsyncIterable and not a MuxedStream this.inboundStream = abortable( pipe( this._rawInboundStream, lp.decode() ), - // @ts-ignore - possibly null this._inboundAbortController.signal, { returnOnAbort: true } ) @@ -150,30 +150,26 @@ class PeerStreams extends EventEmitter { /** * Attach a raw outbound stream and setup a write stream * - * @param {DuplexIterableStream} stream + * @param {MuxedStream} stream * @returns {Promise} */ async attachOutboundStream (stream) { // If an outbound stream already exists, // gently close it const _prevStream = this.outboundStream - if (_prevStream) { + if (this.outboundStream) { // End the stream without emitting a close event - // @ts-ignore - outboundStream may be null - await this.outboundStream.end(false) + await this.outboundStream.end() } this._rawOutboundStream = stream this.outboundStream = pushable({ onEnd: (shouldEmit) => { // close writable side of the stream - // @ts-ignore - DuplexIterableStream does not define reset this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset() this._rawOutboundStream = null this.outboundStream = null - // @ts-ignore - shouldEmit is `Error | undefined` so condition is - // always false - if (shouldEmit !== false) { + if (shouldEmit) { this.emit('close') } } @@ -205,7 +201,6 @@ class PeerStreams extends EventEmitter { } // End the inbound stream if (this.inboundStream) { - // @ts-ignore - possibly null this._inboundAbortController.abort() } diff --git a/src/stream-muxer/tests/spawner.js b/src/stream-muxer/tests/spawner.js index 59b3416..28e3fa3 100644 --- a/src/stream-muxer/tests/spawner.js +++ b/src/stream-muxer/tests/spawner.js @@ -4,9 +4,7 @@ const { expect } = require('chai') const pair = require('it-pair/duplex') const { pipe } = require('it-pipe') -/** @type {typeof import('p-limit').default} */ -// @ts-ignore - wrong type defs -const pLimit = require('p-limit') +const pLimit = require('p-limit').default const { collect, tap, consume } = require('streaming-iterables') module.exports = async (Muxer, nStreams, nMsg, limit) => { diff --git a/src/stream-muxer/types.ts b/src/stream-muxer/types.ts index a882a5a..747dc1e 100644 --- a/src/stream-muxer/types.ts +++ b/src/stream-muxer/types.ts @@ -1,40 +1,26 @@ /** * A libp2p stream muxer */ -export interface StreamMuxerInterface { +export interface Muxer { + multicodec: string; readonly streams: Array; /** * Initiate a new stream with the given name. If no name is * provided, the id of th stream will be used. - * - * @param {string} [name] - If name is not a string it will be cast to one - * @returns {Stream} */ newStream (name?: string): MuxedStream; /** * A function called when receiving a new stream from the remote. - * - * @param {MuxedStream} stream */ onStream (stream: MuxedStream): void; /** * A function called when a stream ends. - * - * @param {MuxedStream} stream */ onStreamEnd (stream: MuxedStream): void; } -export declare class Muxer implements StreamMuxerInterface { - multicodec: string; - readonly streams: Array; - newStream (name?: string): MuxedStream; - onStream(stream: MuxedStream): void; - onStreamEnd(stream: MuxedStream): void; -} - export type MuxedTimeline = { open: number; close?: number; @@ -48,6 +34,7 @@ export type MuxedStream = { source: () => AsyncIterable; timeline: MuxedTimeline; id: string; + [Symbol.asyncIterator](): AsyncIterator; } type Sink = (source: Uint8Array) => Promise; diff --git a/src/transport/types.ts b/src/transport/types.ts index 09156dc..c04ea05 100644 --- a/src/transport/types.ts +++ b/src/transport/types.ts @@ -1,29 +1,21 @@ import events from 'events' +import Multiaddr from 'multiaddr' +import Connection from '../connection/connection' /** * A libp2p transport is understood as something that offers a dial and listen interface to establish connections. */ -export interface Interface { +export interface Transport { /** * Dial a given multiaddr. - * - * @param {Multiaddr} ma - * @param {any} [options] - * @returns {Promise} */ dial(ma: Multiaddr, options?: any): Promise; /** * Create transport listeners. - * - * @param {any} options - * @param {(Connection) => void} handler */ createListener(options: any, handler: (Connection) => void): Listener; /** * Takes a list of `Multiaddr`s and returns only valid addresses for the transport - * - * @param {Multiaddr[]} multiaddrs - * @returns {Multiaddr[]} */ filter(multiaddrs: Multiaddr[]): Multiaddr[]; } @@ -31,15 +23,10 @@ export interface Interface { export interface Listener extends events.EventEmitter { /** * Start a listener - * - * @param {Multiaddr} multiaddr - * @returns {Promise} */ listen(multiaddr: Multiaddr): Promise; /** * Get listen addresses - * - * @returns {Multiaddr[]} */ getAddrs(): Multiaddr[]; /** @@ -53,51 +40,15 @@ export interface Listener extends events.EventEmitter { export interface Upgrader { /** * Upgrades an outbound connection on `transport.dial`. - * - * @param {MultiaddrConnection} maConn - * @returns {Promise} */ upgradeOutbound(maConn: MultiaddrConnection): Promise; /** * Upgrades an inbound connection on transport listener. - * - * @param {MultiaddrConnection} maConn - * @returns {Promise} */ upgradeInbound(maConn: MultiaddrConnection): Promise; } -export declare class Transport implements Interface { - constructor({ upgrader, ...others }: { - upgrader: Upgrader; - others: any; - }); - - /** - * Dial a given multiaddr. - * - * @param {Multiaddr} ma - * @param {any} [options] - * @returns {Promise} - */ - dial(ma: Multiaddr, options?: any): Promise; - /** - * Create transport listeners. - * - * @param {any} options - * @param {(Connection) => void} handler - */ - createListener(options: any, handler: (Connection) => void): Listener; - /** - * Takes a list of `Multiaddr`s and returns only valid addresses for the transport - * - * @param {Multiaddr[]} multiaddrs - * @returns {Multiaddr[]} - */ - filter(multiaddrs: Multiaddr[]): Multiaddr[]; -} - export type MultiaddrConnectionTimeline = { open: number; upgraded?: number; @@ -114,7 +65,4 @@ export type MultiaddrConnection = { timeline: MultiaddrConnectionTimeline; } -type Sink = (source: Uint8Array) => Promise; -type Connection = import('../connection/connection') - -type Multiaddr = import('multiaddr'); +export type Sink = (source: Uint8Array) => Promise;