chore: address review

This commit is contained in:
Vasco Santos 2020-12-02 16:04:16 +01:00
parent 67a5f51805
commit 6f090617aa
9 changed files with 53 additions and 150 deletions

View File

@ -75,7 +75,6 @@
"protons": "^2.0.0", "protons": "^2.0.0",
"sinon": "^9.0.2", "sinon": "^9.0.2",
"streaming-iterables": "^5.0.2", "streaming-iterables": "^5.0.2",
"typescript": "^4.1.2",
"uint8arrays": "^1.1.0" "uint8arrays": "^1.1.0"
}, },
"devDependencies": { "devDependencies": {

View File

@ -9,9 +9,15 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')
/** /**
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream * @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('./status').Status} Status
*/ */
/** /**
* @typedef {Object} Timeline
* @property {number} open - connection opening timestamp.
* @property {number} [upgraded] - connection upgraded timestamp.
* @property {number} [close]
*
* @typedef {Object} ConectionStat * @typedef {Object} ConectionStat
* @property {string} direction - connection establishment direction ("inbound" or "outbound"). * @property {string} direction - connection establishment direction ("inbound" or "outbound").
* @property {object} timeline - connection relevant events timestamp. * @property {object} timeline - connection relevant events timestamp.
@ -75,7 +81,7 @@ class Connection {
/** /**
* Connection metadata. * Connection metadata.
* *
* @type {Stat & {status: Status}} * @type {ConectionStat & {status: Status}}
*/ */
this._stat = { this._stat = {
...stat, ...stat,
@ -223,21 +229,6 @@ class Connection {
} }
} }
/**
* @typedef {Object} Stat
* @property {string} direction - connection establishment direction ("inbound" or "outbound").
* @property {Timeline} timeline - connection relevant events timestamp.
* @property {string} [multiplexer] - connection multiplexing identifier.
* @property {string} [encryption] - connection encryption method identifier.
*
* @typedef {Object} Timeline
* @property {number} open - connection opening timestamp.
* @property {number} [upgraded] - connection upgraded timestamp.
* @property {number} [close]
*
* @typedef {import('./status').Status} Status
*/
module.exports = Connection module.exports = Connection
function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) { function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {

View File

@ -1,40 +1,24 @@
import PeerId from 'peer-id'
import { MultiaddrConnection } from '../transport/types'
/** /**
* A libp2p crypto module must be compliant to this interface * A libp2p crypto module must be compliant to this interface
* to ensure all exchanged data between two peers is encrypted. * to ensure all exchanged data between two peers is encrypted.
*/ */
export interface CryptoInterface { export interface Crypto {
protocol: string;
/** /**
* Encrypt outgoing data to the remote party. * Encrypt outgoing data to the remote party.
*
* @param {PeerId} localPeer - PeerId of the receiving peer
* @param {MultiaddrConnection} connection - streaming iterable duplex that will be encrypted
* @param {PeerId} remotePeer - PeerId of the remote peer. Used to validate the integrity of the remote peer.
* @returns {Promise<SecureOutbound>}
*/ */
secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>; secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>;
/** /**
* Decrypt incoming data. * Decrypt incoming data.
*
* @param {PeerId} localPeer - PeerId of the receiving peer.
* @param {MultiaddrConnection} connection - streaming iterable duplex that will be encryption.
* @param {PeerId} remotePeer - optional PeerId of the initiating peer, if known. This may only exist during transport upgrades.
* @returns {Promise<SecureOutbound>}
*/ */
secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>; secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>;
} }
export declare class Crypto implements CryptoInterface {
protocol: string;
secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>;
secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>;
}
export type SecureOutbound = { export type SecureOutbound = {
conn: MultiaddrConnection; conn: MultiaddrConnection;
remoteEarlyData: Buffer; remoteEarlyData: Buffer;
remotePeer: PeerId; remotePeer: PeerId;
} }
type PeerId = import('peer-id');
type MultiaddrConnection = import('../transport/types').MultiaddrConnection

View File

@ -216,7 +216,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @private * @private
* @param {Object} props * @param {Object} props
* @param {string} props.protocol * @param {string} props.protocol
* @param {DuplexIterableStream} props.stream * @param {MuxedStream} props.stream
* @param {Connection} props.connection - connection * @param {Connection} props.connection - connection
*/ */
_onIncomingStream ({ protocol, stream, connection }) { _onIncomingStream ({ protocol, stream, connection }) {
@ -225,8 +225,7 @@ class PubsubBaseProtocol extends EventEmitter {
const peer = this._addPeer(peerId, protocol) const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream) peer.attachInboundStream(stream)
// @ts-ignore - peer.inboundStream maybe null peer.inboundStream && this._processMessages(idB58Str, peer.inboundStream, peer)
this._processMessages(idB58Str, peer.inboundStream, peer)
} }
/** /**
@ -243,7 +242,6 @@ class PubsubBaseProtocol extends EventEmitter {
try { try {
const { stream, protocol } = await conn.newStream(this.multicodecs) const { stream, protocol } = await conn.newStream(this.multicodecs)
const peer = this._addPeer(peerId, protocol) const peer = this._addPeer(peerId, protocol)
// @ts-ignore MuxedStream is not DuplexIterableStream
await peer.attachOutboundStream(stream) await peer.attachOutboundStream(stream)
} catch (err) { } catch (err) {
this.log.err(err) this.log.err(err)
@ -333,7 +331,7 @@ class PubsubBaseProtocol extends EventEmitter {
* Responsible for processing each RPC message received by other peers. * Responsible for processing each RPC message received by other peers.
* *
* @param {string} idB58Str - peer id string in base58 * @param {string} idB58Str - peer id string in base58
* @param {DuplexIterableStream} stream - inbound stream * @param {MuxedStream} stream - inbound stream
* @param {PeerStreams} peerStreams - PubSub peer * @param {PeerStreams} peerStreams - PubSub peer
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
@ -342,8 +340,8 @@ class PubsubBaseProtocol extends EventEmitter {
await pipe( await pipe(
stream, stream,
async (source) => { async (source) => {
// @ts-ignore - DuplexIterableStream isn't defined as iterable
for await (const data of source) { for await (const data of source) {
// @ts-ignore data slice from BufferList
const rpcBytes = data instanceof Uint8Array ? data : data.slice() const rpcBytes = data instanceof Uint8Array ? data : data.slice()
const rpcMsg = this._decodeRpc(rpcBytes) const rpcMsg = this._decodeRpc(rpcBytes)
@ -732,7 +730,8 @@ class PubsubBaseProtocol extends EventEmitter {
/** /**
* @typedef {any} Libp2p * @typedef {any} Libp2p
* @typedef {import('./peer-streams').DuplexIterableStream} DuplexIterableStream * @typedef {object} MuxedStream
* @type import('../stream-muxer/types').MuxedStream
* @typedef {import('../connection/connection')} Connection * @typedef {import('../connection/connection')} Connection
* @typedef {import('./message').RPC} RPC * @typedef {import('./message').RPC} RPC
* @typedef {import('./message').SubOpts} RPCSubOpts * @typedef {import('./message').SubOpts} RPCSubOpts
@ -740,7 +739,8 @@ class PubsubBaseProtocol extends EventEmitter {
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType * @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
*/ */
PubsubBaseProtocol.message = message
PubsubBaseProtocol.utils = utils
PubsubBaseProtocol.SignaturePolicy = SignaturePolicy
module.exports = PubsubBaseProtocol module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils
module.exports.SignaturePolicy = SignaturePolicy

View File

@ -40,11 +40,12 @@ async function verifySignature (message) {
const baseMessage = { ...message } const baseMessage = { ...message }
delete baseMessage.signature delete baseMessage.signature
delete baseMessage.key delete baseMessage.key
// @ts-ignore - from is optional
baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes()
const bytes = uint8ArrayConcat([ const bytes = uint8ArrayConcat([
SignPrefix, SignPrefix,
Message.encode(baseMessage) Message.encode(Object.assign(baseMessage, {
from: baseMessage.from && PeerId.createFromCID(baseMessage.from).toBytes()
}))
]) ])
// Get the public key // Get the public key
@ -64,14 +65,14 @@ async function verifySignature (message) {
*/ */
async function messagePublicKey (message) { async function messagePublicKey (message) {
// should be available in the from property of the message (peer id) // should be available in the from property of the message (peer id)
// @ts-ignore - from is optional // @ts-ignore - from type changed
const from = PeerId.createFromCID(message.from) const from = PeerId.createFromCID(message.from)
if (message.key) { if (message.key) {
const keyPeerId = await PeerId.createFromPubKey(message.key) const keyPeerId = await PeerId.createFromPubKey(message.key)
// the key belongs to the sender, return the key // the key belongs to the sender, return the key
if (keyPeerId.isEqual(from)) return keyPeerId.pubKey if (keyPeerId.equals(from)) return keyPeerId.pubKey
// We couldn't validate pubkey is from the originator, error // We couldn't validate pubkey is from the originator, error
throw new Error('Public Key does not match the originator') throw new Error('Public Key does not match the originator')
} else if (from.pubKey) { } else if (from.pubKey) {

View File

@ -20,12 +20,14 @@ log.error = debug('libp2p-pubsub:peer-streams:error')
* @param {Uint8Array} source * @param {Uint8Array} source
* @returns {Promise<Uint8Array>} * @returns {Promise<Uint8Array>}
* *
* @typedef {object} DuplexIterableStream * @typedef {object} MuxedStream
* @property {Sink} sink * @type import('../stream-muxer/types').MuxedStream
* @property {AsyncIterator<Uint8Array>} source
* *
* @typedef PeerId * @typedef PeerId
* @type import('peer-id') * @type import('peer-id')
*
* @typedef PushableStream
* @type import('it-pushable').Pushable<Uint8Array>
*/ */
/** /**
@ -54,33 +56,33 @@ class PeerStreams extends EventEmitter {
* The raw outbound stream, as retrieved from conn.newStream * The raw outbound stream, as retrieved from conn.newStream
* *
* @private * @private
* @type {null|DuplexIterableStream} * @type {null|MuxedStream}
*/ */
this._rawOutboundStream = null this._rawOutboundStream = null
/** /**
* The raw inbound stream, as retrieved from the callback from libp2p.handle * The raw inbound stream, as retrieved from the callback from libp2p.handle
* *
* @private * @private
* @type {null|DuplexIterableStream} * @type {null|MuxedStream}
*/ */
this._rawInboundStream = null this._rawInboundStream = null
/** /**
* An AbortController for controlled shutdown of the inbound stream * An AbortController for controlled shutdown of the inbound stream
* *
* @private * @private
* @type {null|AbortController} * @type {AbortController}
*/ */
this._inboundAbortController = null this._inboundAbortController = new AbortController()
/** /**
* Write stream -- its preferable to use the write method * Write stream -- its preferable to use the write method
* *
* @type {null|import('it-pushable').Pushable<Uint8Array>} * @type {null|PushableStream}
*/ */
this.outboundStream = null this.outboundStream = null
/** /**
* Read stream * Read stream
* *
* @type {null|DuplexIterableStream} * @type {null|MuxedStream}
*/ */
this.inboundStream = null this.inboundStream = null
} }
@ -123,7 +125,7 @@ class PeerStreams extends EventEmitter {
/** /**
* Attach a raw inbound stream and setup a read stream * Attach a raw inbound stream and setup a read stream
* *
* @param {DuplexIterableStream} stream * @param {MuxedStream} stream
* @returns {void} * @returns {void}
*/ */
attachInboundStream (stream) { attachInboundStream (stream) {
@ -131,15 +133,13 @@ class PeerStreams extends EventEmitter {
// The inbound stream is: // The inbound stream is:
// - abortable, set to only return on abort, rather than throw // - abortable, set to only return on abort, rather than throw
// - transformed with length-prefix transform // - transformed with length-prefix transform
this._inboundAbortController = new AbortController()
this._rawInboundStream = stream this._rawInboundStream = stream
// @ts-ignore - abortable returns AsyncIterable and not a DuplexIterableStream // @ts-ignore - abortable returns AsyncIterable and not a MuxedStream
this.inboundStream = abortable( this.inboundStream = abortable(
pipe( pipe(
this._rawInboundStream, this._rawInboundStream,
lp.decode() lp.decode()
), ),
// @ts-ignore - possibly null
this._inboundAbortController.signal, this._inboundAbortController.signal,
{ returnOnAbort: true } { returnOnAbort: true }
) )
@ -150,30 +150,26 @@ class PeerStreams extends EventEmitter {
/** /**
* Attach a raw outbound stream and setup a write stream * Attach a raw outbound stream and setup a write stream
* *
* @param {DuplexIterableStream} stream * @param {MuxedStream} stream
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async attachOutboundStream (stream) { async attachOutboundStream (stream) {
// If an outbound stream already exists, // If an outbound stream already exists,
// gently close it // gently close it
const _prevStream = this.outboundStream const _prevStream = this.outboundStream
if (_prevStream) { if (this.outboundStream) {
// End the stream without emitting a close event // End the stream without emitting a close event
// @ts-ignore - outboundStream may be null await this.outboundStream.end()
await this.outboundStream.end(false)
} }
this._rawOutboundStream = stream this._rawOutboundStream = stream
this.outboundStream = pushable({ this.outboundStream = pushable({
onEnd: (shouldEmit) => { onEnd: (shouldEmit) => {
// close writable side of the stream // close writable side of the stream
// @ts-ignore - DuplexIterableStream does not define reset
this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset() this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset()
this._rawOutboundStream = null this._rawOutboundStream = null
this.outboundStream = null this.outboundStream = null
// @ts-ignore - shouldEmit is `Error | undefined` so condition is if (shouldEmit) {
// always false
if (shouldEmit !== false) {
this.emit('close') this.emit('close')
} }
} }
@ -205,7 +201,6 @@ class PeerStreams extends EventEmitter {
} }
// End the inbound stream // End the inbound stream
if (this.inboundStream) { if (this.inboundStream) {
// @ts-ignore - possibly null
this._inboundAbortController.abort() this._inboundAbortController.abort()
} }

View File

@ -4,9 +4,7 @@ const { expect } = require('chai')
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const { pipe } = require('it-pipe') const { pipe } = require('it-pipe')
/** @type {typeof import('p-limit').default} */ const pLimit = require('p-limit').default
// @ts-ignore - wrong type defs
const pLimit = require('p-limit')
const { collect, tap, consume } = require('streaming-iterables') const { collect, tap, consume } = require('streaming-iterables')
module.exports = async (Muxer, nStreams, nMsg, limit) => { module.exports = async (Muxer, nStreams, nMsg, limit) => {

View File

@ -1,40 +1,26 @@
/** /**
* A libp2p stream muxer * A libp2p stream muxer
*/ */
export interface StreamMuxerInterface { export interface Muxer {
multicodec: string;
readonly streams: Array<MuxedStream>; readonly streams: Array<MuxedStream>;
/** /**
* Initiate a new stream with the given name. If no name is * Initiate a new stream with the given name. If no name is
* provided, the id of th stream will be used. * provided, the id of th stream will be used.
*
* @param {string} [name] - If name is not a string it will be cast to one
* @returns {Stream}
*/ */
newStream (name?: string): MuxedStream; newStream (name?: string): MuxedStream;
/** /**
* A function called when receiving a new stream from the remote. * A function called when receiving a new stream from the remote.
*
* @param {MuxedStream} stream
*/ */
onStream (stream: MuxedStream): void; onStream (stream: MuxedStream): void;
/** /**
* A function called when a stream ends. * A function called when a stream ends.
*
* @param {MuxedStream} stream
*/ */
onStreamEnd (stream: MuxedStream): void; onStreamEnd (stream: MuxedStream): void;
} }
export declare class Muxer implements StreamMuxerInterface {
multicodec: string;
readonly streams: Array<MuxedStream>;
newStream (name?: string): MuxedStream;
onStream(stream: MuxedStream): void;
onStreamEnd(stream: MuxedStream): void;
}
export type MuxedTimeline = { export type MuxedTimeline = {
open: number; open: number;
close?: number; close?: number;
@ -48,6 +34,7 @@ export type MuxedStream = {
source: () => AsyncIterable<Uint8Array>; source: () => AsyncIterable<Uint8Array>;
timeline: MuxedTimeline; timeline: MuxedTimeline;
id: string; id: string;
[Symbol.asyncIterator](): AsyncIterator<Uint8Array>;
} }
type Sink = (source: Uint8Array) => Promise<Uint8Array>; type Sink = (source: Uint8Array) => Promise<Uint8Array>;

View File

@ -1,29 +1,21 @@
import events from 'events' import events from 'events'
import Multiaddr from 'multiaddr'
import Connection from '../connection/connection'
/** /**
* A libp2p transport is understood as something that offers a dial and listen interface to establish connections. * A libp2p transport is understood as something that offers a dial and listen interface to establish connections.
*/ */
export interface Interface { export interface Transport {
/** /**
* Dial a given multiaddr. * Dial a given multiaddr.
*
* @param {Multiaddr} ma
* @param {any} [options]
* @returns {Promise<Connection>}
*/ */
dial(ma: Multiaddr, options?: any): Promise<Connection>; dial(ma: Multiaddr, options?: any): Promise<Connection>;
/** /**
* Create transport listeners. * Create transport listeners.
*
* @param {any} options
* @param {(Connection) => void} handler
*/ */
createListener(options: any, handler: (Connection) => void): Listener; createListener(options: any, handler: (Connection) => void): Listener;
/** /**
* Takes a list of `Multiaddr`s and returns only valid addresses for the transport * Takes a list of `Multiaddr`s and returns only valid addresses for the transport
*
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]}
*/ */
filter(multiaddrs: Multiaddr[]): Multiaddr[]; filter(multiaddrs: Multiaddr[]): Multiaddr[];
} }
@ -31,15 +23,10 @@ export interface Interface {
export interface Listener extends events.EventEmitter { export interface Listener extends events.EventEmitter {
/** /**
* Start a listener * Start a listener
*
* @param {Multiaddr} multiaddr
* @returns {Promise<void>}
*/ */
listen(multiaddr: Multiaddr): Promise<void>; listen(multiaddr: Multiaddr): Promise<void>;
/** /**
* Get listen addresses * Get listen addresses
*
* @returns {Multiaddr[]}
*/ */
getAddrs(): Multiaddr[]; getAddrs(): Multiaddr[];
/** /**
@ -53,51 +40,15 @@ export interface Listener extends events.EventEmitter {
export interface Upgrader { export interface Upgrader {
/** /**
* Upgrades an outbound connection on `transport.dial`. * Upgrades an outbound connection on `transport.dial`.
*
* @param {MultiaddrConnection} maConn
* @returns {Promise<Connection>}
*/ */
upgradeOutbound(maConn: MultiaddrConnection): Promise<Connection>; upgradeOutbound(maConn: MultiaddrConnection): Promise<Connection>;
/** /**
* Upgrades an inbound connection on transport listener. * Upgrades an inbound connection on transport listener.
*
* @param {MultiaddrConnection} maConn
* @returns {Promise<Connection>}
*/ */
upgradeInbound(maConn: MultiaddrConnection): Promise<Connection>; upgradeInbound(maConn: MultiaddrConnection): Promise<Connection>;
} }
export declare class Transport implements Interface {
constructor({ upgrader, ...others }: {
upgrader: Upgrader;
others: any;
});
/**
* Dial a given multiaddr.
*
* @param {Multiaddr} ma
* @param {any} [options]
* @returns {Promise<Connection>}
*/
dial(ma: Multiaddr, options?: any): Promise<Connection>;
/**
* Create transport listeners.
*
* @param {any} options
* @param {(Connection) => void} handler
*/
createListener(options: any, handler: (Connection) => void): Listener;
/**
* Takes a list of `Multiaddr`s and returns only valid addresses for the transport
*
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]}
*/
filter(multiaddrs: Multiaddr[]): Multiaddr[];
}
export type MultiaddrConnectionTimeline = { export type MultiaddrConnectionTimeline = {
open: number; open: number;
upgraded?: number; upgraded?: number;
@ -114,7 +65,4 @@ export type MultiaddrConnection = {
timeline: MultiaddrConnectionTimeline; timeline: MultiaddrConnectionTimeline;
} }
type Sink = (source: Uint8Array) => Promise<Uint8Array>; export type Sink = (source: Uint8Array) => Promise<Uint8Array>;
type Connection = import('../connection/connection')
type Multiaddr = import('multiaddr');