diff --git a/src/pubsub/index.js b/src/pubsub/index.js index dcb0c86..9d67332 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -232,9 +232,9 @@ class PubsubBaseProtocol extends EventEmitter { const peerId = connection.remotePeer const idB58Str = peerId.toB58String() const peer = this._addPeer(peerId, protocol) - peer.attachInboundStream(stream) + const inboundStream = peer.attachInboundStream(stream) - peer.inboundStream && this._processMessages(idB58Str, peer.inboundStream, peer) + this._processMessages(idB58Str, inboundStream, peer) } /** diff --git a/src/pubsub/message/sign.js b/src/pubsub/message/sign.js index 5424c55..4820b11 100644 --- a/src/pubsub/message/sign.js +++ b/src/pubsub/message/sign.js @@ -36,23 +36,25 @@ async function signMessage (peerId, message) { * @returns {Promise} */ async function verifySignature (message) { - // Get message sans the signature - const baseMessage = { ...message } - delete baseMessage.signature - delete baseMessage.key + if (!message.signature) { + throw new Error('Message must contain a signature to be verified') + } + // Get message sans the signature const bytes = uint8ArrayConcat([ SignPrefix, - Message.encode(Object.assign(baseMessage, { - from: baseMessage.from && PeerId.createFromCID(baseMessage.from).toBytes() - })) + Message.encode({ + ...message, + from: message.from && PeerId.createFromCID(message.from).toBytes(), + signature: undefined, + key: undefined + }) ]) // Get the public key const pubKey = await messagePublicKey(message) // verify the base message - // @ts-ignore - may not have signature return pubKey.verify(bytes, message.signature) } diff --git a/src/pubsub/peer-streams.js b/src/pubsub/peer-streams.js index 047a359..d238bb5 100644 --- a/src/pubsub/peer-streams.js +++ b/src/pubsub/peer-streams.js @@ -104,12 +104,11 @@ class PeerStreams extends EventEmitter { * @returns {void} */ write (data) { - if (!this.isWritable) { + if (!this.outboundStream) { const id = this.id.toB58String() throw new Error('No writable connection to ' + id) } - // @ts-ignore - this.outboundStream could be null this.outboundStream.push(data) } @@ -117,7 +116,7 @@ class PeerStreams extends EventEmitter { * Attach a raw inbound stream and setup a read stream * * @param {MuxedStream} stream - * @returns {void} + * @returns {AsyncIterable} */ attachInboundStream (stream) { // Create and attach a new inbound stream @@ -135,6 +134,7 @@ class PeerStreams extends EventEmitter { ) this.emit('stream:inbound') + return this.inboundStream } /** @@ -144,8 +144,7 @@ class PeerStreams extends EventEmitter { * @returns {Promise} */ async attachOutboundStream (stream) { - // If an outbound stream already exists, - // gently close it + // If an outbound stream already exists, gently close it const _prevStream = this.outboundStream if (this.outboundStream) { // End the stream without emitting a close event diff --git a/src/record/README.md b/src/record/README.md index ab95019..f838ecf 100644 --- a/src/record/README.md +++ b/src/record/README.md @@ -36,15 +36,30 @@ const fromString = require('uint8arrays/from-string') const ENVELOPE_DOMAIN_PEER_RECORD = 'libp2p-peer-record' const ENVELOPE_PAYLOAD_TYPE_PEER_RECORD = fromString('0301', 'hex') -class PeerRecord extends Record { +/** + * @implements {import('libp2p-interfaces/src/record/types').Record} + */ +class PeerRecord { constructor (peerId, multiaddrs, seqNumber) { - super (ENVELOPE_DOMAIN_PEER_RECORD, ENVELOPE_PAYLOAD_TYPE_PEER_RECORD) + this.domain = ENVELOPE_DOMAIN_PEER_RECORD + this.codec = ENVELOPE_PAYLOAD_TYPE_PEER_RECORD } + /** + * Marshal a record to be used in an envelope. + * + * @returns {Uint8Array} + */ marshal () { // Implement and return using Protobuf } + /** + * Returns true if `this` record equals the `other`. + * + * @param {PeerRecord} other + * @returns {other is Record} + */ equals (other) { // Verify } @@ -73,4 +88,4 @@ Verifies if the other Record is identical to this one. - other is a `Record` to compare with the current instance. **Returns** -- `boolean` +- `other is Record` diff --git a/src/record/index.js b/src/record/index.js index d82526d..e69de29 100644 --- a/src/record/index.js +++ b/src/record/index.js @@ -1,41 +0,0 @@ -'use strict' - -const errcode = require('err-code') - -/** - * Record is the base implementation of a record that can be used as the payload of a libp2p envelope. - */ -class Record { - /** - * @class - * @param {string} domain - signature domain - * @param {Uint8Array} codec - identifier of the type of record - */ - constructor (domain, codec) { - this.domain = domain - this.codec = codec - } - - // eslint-disable-next-line - /** - * Marshal a record to be used in an envelope. - * - * @returns {Uint8Array} - */ - marshal () { - throw errcode(new Error('marshal must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') - } - - // eslint-disable-next-line - /** - * Verifies if the other provided Record is identical to this one. - * - * @param {Record} other - * @returns {boolean} - */ - equals (other) { - throw errcode(new Error('equals must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') - } -} - -module.exports = Record diff --git a/src/record/types.ts b/src/record/types.ts new file mode 100644 index 0000000..2ccae83 --- /dev/null +++ b/src/record/types.ts @@ -0,0 +1,21 @@ +/** + * Record is the base implementation of a record that can be used as the payload of a libp2p envelope. + */ +export interface Record { + /** + * signature domain. + */ + domain: string; + /** + * identifier of the type of record + */ + codec: Uint8Array; + /** + * Marshal a record to be used in an envelope. + */ + marshal(): Uint8Array; + /** + * erifies if the other provided Record is identical to this one. + */ + equals(other: any): other is Record +} diff --git a/src/stream-muxer/types.ts b/src/stream-muxer/types.ts index 0c1884a..6d525a8 100644 --- a/src/stream-muxer/types.ts +++ b/src/stream-muxer/types.ts @@ -1,13 +1,15 @@ import BufferList from 'bl' +export interface MuxerFactory { + new (options: MuxerOptions): Muxer; + multicodec: string; +} + /** * A libp2p stream muxer */ export interface Muxer { - new (options: MuxerOptions): Muxer; // eslint-disable-line - multicodec: string; readonly streams: Array; - prototype: Muxer; /** * Initiate a new stream with the given name. If no name is * provided, the id of th stream will be used. diff --git a/src/transport/types.ts b/src/transport/types.ts index 088fb7c..1087960 100644 --- a/src/transport/types.ts +++ b/src/transport/types.ts @@ -7,12 +7,14 @@ export type DialOptions = { signal?: AbortSignal } +export interface TransportFactory { + new(upgrader: Upgrader): Transport; +} + /** * A libp2p transport is understood as something that offers a dial and listen interface to establish connections. */ export interface Transport { - new (upgrader: Upgrader, ...others: any): Transport; // eslint-disable-line - prototype: Transport ; /** * Dial a given multiaddr. */ @@ -20,7 +22,7 @@ export interface Transport { /** * Create transport listeners. */ - createListener(options: any, handler: (Connection) => void): Listener; + createListener(options: unknown, handler?: (connection: Connection) => void): Listener; /** * Takes a list of `Multiaddr`s and returns only valid addresses for the transport */ @@ -66,7 +68,7 @@ export type MultiaddrConnection = { sink: Sink; source: () => AsyncIterable; close: (err?: Error) => Promise; - conn: any; + conn: unknown; remoteAddr: Multiaddr; localAddr?: Multiaddr; timeline: MultiaddrConnectionTimeline;