mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-06-21 08:51:52 +00:00
fix: type incompatibilities (#75)
This commit is contained in:
committed by
GitHub
parent
bac57b05dc
commit
67a5f51805
@ -1,11 +1,10 @@
|
||||
'use strict'
|
||||
/* eslint-disable valid-jsdoc */
|
||||
|
||||
const debug = require('debug')
|
||||
const EventEmitter = require('events')
|
||||
const { EventEmitter } = require('events')
|
||||
const errcode = require('err-code')
|
||||
|
||||
const pipe = require('it-pipe')
|
||||
const { pipe } = require('it-pipe')
|
||||
|
||||
const MulticodecTopology = require('../topology/multicodec-topology')
|
||||
const { codes } = require('./errors')
|
||||
@ -46,7 +45,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
* @param {string} props.debugName - log namespace
|
||||
* @param {Array<string>|string} props.multicodecs - protocol identificers to connect
|
||||
* @param {Libp2p} props.libp2p
|
||||
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
|
||||
* @param {SignaturePolicyType} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
|
||||
* @param {boolean} [props.canRelayMessage = false] - if can relay messages not subscribed
|
||||
* @param {boolean} [props.emitSelf = false] - if publish should emit to self, if subscribed
|
||||
* @abstract
|
||||
@ -226,6 +225,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
const peer = this._addPeer(peerId, protocol)
|
||||
peer.attachInboundStream(stream)
|
||||
|
||||
// @ts-ignore - peer.inboundStream maybe null
|
||||
this._processMessages(idB58Str, peer.inboundStream, peer)
|
||||
}
|
||||
|
||||
@ -243,6 +243,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
try {
|
||||
const { stream, protocol } = await conn.newStream(this.multicodecs)
|
||||
const peer = this._addPeer(peerId, protocol)
|
||||
// @ts-ignore MuxedStream is not DuplexIterableStream
|
||||
await peer.attachOutboundStream(stream)
|
||||
} catch (err) {
|
||||
this.log.err(err)
|
||||
@ -257,7 +258,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
*
|
||||
* @private
|
||||
* @param {PeerId} peerId - peerId
|
||||
* @param {Error} err - error for connection end
|
||||
* @param {Error} [err] - error for connection end
|
||||
*/
|
||||
_onPeerDisconnected (peerId, err) {
|
||||
const idB58Str = peerId.toB58String()
|
||||
@ -341,6 +342,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
await pipe(
|
||||
stream,
|
||||
async (source) => {
|
||||
// @ts-ignore - DuplexIterableStream isn't defined as iterable
|
||||
for await (const data of source) {
|
||||
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
|
||||
const rpcMsg = this._decodeRpc(rpcBytes)
|
||||
@ -395,7 +397,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
* Handles a subscription change from a peer
|
||||
*
|
||||
* @param {string} id
|
||||
* @param {RPC.SubOpt} subOpt
|
||||
* @param {RPCSubOpts} subOpt
|
||||
*/
|
||||
_processRpcSubOpt (id, subOpt) {
|
||||
const t = subOpt.topicID
|
||||
@ -457,7 +459,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
* The default msgID implementation
|
||||
* Child class can override this.
|
||||
*
|
||||
* @param {RPC.Message} msg - the message object
|
||||
* @param {RPCMessage} msg - the message object
|
||||
* @returns {Uint8Array} message id as bytes
|
||||
*/
|
||||
getMsgId (msg) {
|
||||
@ -590,8 +592,8 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
* Should be used by the routers to create the message to send.
|
||||
*
|
||||
* @private
|
||||
* @param {Message} message
|
||||
* @returns {Promise<Message>}
|
||||
* @param {RPCMessage} message
|
||||
* @returns {Promise<RPCMessage>}
|
||||
*/
|
||||
_buildMessage (message) {
|
||||
const signaturePolicy = this.globalSignaturePolicy
|
||||
@ -728,6 +730,16 @@ class PubsubBaseProtocol extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {any} Libp2p
|
||||
* @typedef {import('./peer-streams').DuplexIterableStream} DuplexIterableStream
|
||||
* @typedef {import('../connection/connection')} Connection
|
||||
* @typedef {import('./message').RPC} RPC
|
||||
* @typedef {import('./message').SubOpts} RPCSubOpts
|
||||
* @typedef {import('./message').Message} RPCMessage
|
||||
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
|
||||
*/
|
||||
|
||||
module.exports = PubsubBaseProtocol
|
||||
module.exports.message = message
|
||||
module.exports.utils = utils
|
||||
|
@ -40,6 +40,7 @@ async function verifySignature (message) {
|
||||
const baseMessage = { ...message }
|
||||
delete baseMessage.signature
|
||||
delete baseMessage.key
|
||||
// @ts-ignore - from is optional
|
||||
baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes()
|
||||
const bytes = uint8ArrayConcat([
|
||||
SignPrefix,
|
||||
@ -50,6 +51,7 @@ async function verifySignature (message) {
|
||||
const pubKey = await messagePublicKey(message)
|
||||
|
||||
// verify the base message
|
||||
// @ts-ignore - may not have signature
|
||||
return pubKey.verify(bytes, message.signature)
|
||||
}
|
||||
|
||||
@ -62,6 +64,7 @@ async function verifySignature (message) {
|
||||
*/
|
||||
async function messagePublicKey (message) {
|
||||
// should be available in the from property of the message (peer id)
|
||||
// @ts-ignore - from is optional
|
||||
const from = PeerId.createFromCID(message.from)
|
||||
|
||||
if (message.key) {
|
||||
@ -78,6 +81,11 @@ async function messagePublicKey (message) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef {import('..').InMessage} InMessage
|
||||
* @typedef {import('libp2p-crypto').PublicKey} PublicKey
|
||||
*/
|
||||
|
||||
module.exports = {
|
||||
messagePublicKey,
|
||||
signMessage,
|
||||
|
@ -1,12 +1,15 @@
|
||||
'use strict'
|
||||
|
||||
const EventEmitter = require('events')
|
||||
const { EventEmitter } = require('events')
|
||||
|
||||
const lp = require('it-length-prefixed')
|
||||
|
||||
/** @type {typeof import('it-pushable').default} */
|
||||
// @ts-ignore
|
||||
const pushable = require('it-pushable')
|
||||
const pipe = require('it-pipe')
|
||||
const abortable = require('abortable-iterator')
|
||||
const AbortController = require('abort-controller')
|
||||
const { pipe } = require('it-pipe')
|
||||
const { source: abortable } = require('abortable-iterator')
|
||||
const AbortController = require('abort-controller').default
|
||||
const debug = require('debug')
|
||||
|
||||
const log = debug('libp2p-pubsub:peer-streams')
|
||||
@ -19,7 +22,7 @@ log.error = debug('libp2p-pubsub:peer-streams:error')
|
||||
*
|
||||
* @typedef {object} DuplexIterableStream
|
||||
* @property {Sink} sink
|
||||
* @property {() AsyncIterator<Uint8Array>} source
|
||||
* @property {AsyncIterator<Uint8Array>} source
|
||||
*
|
||||
* @typedef PeerId
|
||||
* @type import('peer-id')
|
||||
@ -51,33 +54,33 @@ class PeerStreams extends EventEmitter {
|
||||
* The raw outbound stream, as retrieved from conn.newStream
|
||||
*
|
||||
* @private
|
||||
* @type {DuplexIterableStream}
|
||||
* @type {null|DuplexIterableStream}
|
||||
*/
|
||||
this._rawOutboundStream = null
|
||||
/**
|
||||
* The raw inbound stream, as retrieved from the callback from libp2p.handle
|
||||
*
|
||||
* @private
|
||||
* @type {DuplexIterableStream}
|
||||
* @type {null|DuplexIterableStream}
|
||||
*/
|
||||
this._rawInboundStream = null
|
||||
/**
|
||||
* An AbortController for controlled shutdown of the inbound stream
|
||||
*
|
||||
* @private
|
||||
* @type {typeof AbortController}
|
||||
* @type {null|AbortController}
|
||||
*/
|
||||
this._inboundAbortController = null
|
||||
/**
|
||||
* Write stream -- its preferable to use the write method
|
||||
*
|
||||
* @type {import('it-pushable').Pushable<Uint8Array>>}
|
||||
* @type {null|import('it-pushable').Pushable<Uint8Array>}
|
||||
*/
|
||||
this.outboundStream = null
|
||||
/**
|
||||
* Read stream
|
||||
*
|
||||
* @type {DuplexIterableStream}
|
||||
* @type {null|DuplexIterableStream}
|
||||
*/
|
||||
this.inboundStream = null
|
||||
}
|
||||
@ -113,6 +116,7 @@ class PeerStreams extends EventEmitter {
|
||||
throw new Error('No writable connection to ' + id)
|
||||
}
|
||||
|
||||
// @ts-ignore - this.outboundStream could be null
|
||||
this.outboundStream.push(data)
|
||||
}
|
||||
|
||||
@ -129,11 +133,13 @@ class PeerStreams extends EventEmitter {
|
||||
// - transformed with length-prefix transform
|
||||
this._inboundAbortController = new AbortController()
|
||||
this._rawInboundStream = stream
|
||||
// @ts-ignore - abortable returns AsyncIterable and not a DuplexIterableStream
|
||||
this.inboundStream = abortable(
|
||||
pipe(
|
||||
this._rawInboundStream,
|
||||
lp.decode()
|
||||
),
|
||||
// @ts-ignore - possibly null
|
||||
this._inboundAbortController.signal,
|
||||
{ returnOnAbort: true }
|
||||
)
|
||||
@ -144,7 +150,7 @@ class PeerStreams extends EventEmitter {
|
||||
/**
|
||||
* Attach a raw outbound stream and setup a write stream
|
||||
*
|
||||
* @param {Stream} stream
|
||||
* @param {DuplexIterableStream} stream
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async attachOutboundStream (stream) {
|
||||
@ -153,6 +159,7 @@ class PeerStreams extends EventEmitter {
|
||||
const _prevStream = this.outboundStream
|
||||
if (_prevStream) {
|
||||
// End the stream without emitting a close event
|
||||
// @ts-ignore - outboundStream may be null
|
||||
await this.outboundStream.end(false)
|
||||
}
|
||||
|
||||
@ -160,9 +167,12 @@ class PeerStreams extends EventEmitter {
|
||||
this.outboundStream = pushable({
|
||||
onEnd: (shouldEmit) => {
|
||||
// close writable side of the stream
|
||||
this._rawOutboundStream.reset && this._rawOutboundStream.reset()
|
||||
// @ts-ignore - DuplexIterableStream does not define reset
|
||||
this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset()
|
||||
this._rawOutboundStream = null
|
||||
this.outboundStream = null
|
||||
// @ts-ignore - shouldEmit is `Error | undefined` so condition is
|
||||
// always false
|
||||
if (shouldEmit !== false) {
|
||||
this.emit('close')
|
||||
}
|
||||
@ -195,6 +205,7 @@ class PeerStreams extends EventEmitter {
|
||||
}
|
||||
// End the inbound stream
|
||||
if (this.inboundStream) {
|
||||
// @ts-ignore - possibly null
|
||||
this._inboundAbortController.abort()
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
* Enum for Signature Policy
|
||||
* Details how message signatures are produced/consumed
|
||||
*/
|
||||
exports.SignaturePolicy = {
|
||||
const SignaturePolicy = {
|
||||
/**
|
||||
* On the producing side:
|
||||
* * Build messages with the signature, key (from may be enough for certain inlineable public key types), from and seqno fields.
|
||||
@ -24,5 +24,10 @@ exports.SignaturePolicy = {
|
||||
* * Propagate only if the fields are absent, reject otherwise.
|
||||
* * A message_id function will not be able to use the above fields, and should instead rely on the data field. A commonplace strategy is to calculate a hash.
|
||||
*/
|
||||
StrictNoSign: /** @type {'StrictNoSign'} */ 'StrictNoSign'
|
||||
StrictNoSign: /** @type {'StrictNoSign'} */ ('StrictNoSign')
|
||||
}
|
||||
exports.SignaturePolicy = SignaturePolicy
|
||||
|
||||
/**
|
||||
* @typedef {SignaturePolicy[keyof SignaturePolicy]} SignaturePolicyType
|
||||
*/
|
||||
|
@ -62,7 +62,7 @@ module.exports = (common) => {
|
||||
pubsub.publish(topic, data)
|
||||
|
||||
// Wait 1 second to guarantee that self is not noticed
|
||||
return new Promise((resolve) => setTimeout(() => resolve(), 1000))
|
||||
return new Promise((resolve) => setTimeout(resolve, 1000))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -50,7 +50,10 @@ module.exports = (common) => {
|
||||
sinon.spy(pubsub, '_publish')
|
||||
sinon.spy(pubsub, 'validate')
|
||||
|
||||
const peerStream = new PeerStreams({ id: await PeerId.create() })
|
||||
const peerStream = new PeerStreams({
|
||||
id: await PeerId.create(),
|
||||
protocol: 'test'
|
||||
})
|
||||
const rpc = {
|
||||
subscriptions: [],
|
||||
msgs: [{
|
||||
@ -82,7 +85,11 @@ module.exports = (common) => {
|
||||
sinon.spy(pubsub, '_publish')
|
||||
sinon.spy(pubsub, 'validate')
|
||||
|
||||
const peerStream = new PeerStreams({ id: await PeerId.create() })
|
||||
const peerStream = new PeerStreams({
|
||||
id: await PeerId.create(),
|
||||
protocol: 'test'
|
||||
})
|
||||
|
||||
const rpc = {
|
||||
subscriptions: [],
|
||||
msgs: [{
|
||||
|
@ -113,9 +113,9 @@ module.exports = (common) => {
|
||||
|
||||
// await subscription change
|
||||
await Promise.all([
|
||||
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())),
|
||||
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())),
|
||||
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve()))
|
||||
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve(null))),
|
||||
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve(null))),
|
||||
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve(null)))
|
||||
])
|
||||
|
||||
// await a cycle
|
||||
@ -166,9 +166,9 @@ module.exports = (common) => {
|
||||
|
||||
// await subscription change
|
||||
await Promise.all([
|
||||
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())),
|
||||
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())),
|
||||
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve()))
|
||||
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve(null))),
|
||||
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve(null))),
|
||||
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve(null)))
|
||||
])
|
||||
|
||||
psA.on(topic, incMsg)
|
||||
|
@ -1,5 +1,4 @@
|
||||
'use strict'
|
||||
/* eslint-disable valid-jsdoc */
|
||||
|
||||
const randomBytes = require('libp2p-crypto/src/random-bytes')
|
||||
const uint8ArrayToString = require('uint8arrays/to-string')
|
||||
@ -88,8 +87,8 @@ exports.ensureArray = (maybeArray) => {
|
||||
/**
|
||||
* Ensures `message.from` is base58 encoded
|
||||
*
|
||||
* @template {Object} T
|
||||
* @param {T} message
|
||||
* @template {{from?:any}} T
|
||||
* @param {T & {from?:string, receivedFrom:string}} message
|
||||
* @param {string} [peerId]
|
||||
* @returns {T & {from?: string, peerId?: string }}
|
||||
*/
|
||||
@ -105,7 +104,7 @@ exports.normalizeInRpcMessage = (message, peerId) => {
|
||||
}
|
||||
|
||||
/**
|
||||
* @template {Object} T
|
||||
* @template {{from?:any, data?:any}} T
|
||||
*
|
||||
* @param {T} message
|
||||
* @returns {T & {from?: Uint8Array, data?: Uint8Array}}
|
||||
|
Reference in New Issue
Block a user