chore: record interface instead of class, transport and stream muxer factory interface and minor pubsub fixes

This commit is contained in:
Vasco Santos 2020-12-03 11:21:56 +01:00
parent ef86c87b40
commit 7597875c32
8 changed files with 66 additions and 66 deletions

View File

@ -232,9 +232,9 @@ class PubsubBaseProtocol extends EventEmitter {
const peerId = connection.remotePeer
const idB58Str = peerId.toB58String()
const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream)
const inboundStream = peer.attachInboundStream(stream)
peer.inboundStream && this._processMessages(idB58Str, peer.inboundStream, peer)
this._processMessages(idB58Str, inboundStream, peer)
}
/**

View File

@ -36,23 +36,25 @@ async function signMessage (peerId, message) {
* @returns {Promise<boolean>}
*/
async function verifySignature (message) {
// Get message sans the signature
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
if (!message.signature) {
throw new Error('Message must contain a signature to be verified')
}
// Get message sans the signature
const bytes = uint8ArrayConcat([
SignPrefix,
Message.encode(Object.assign(baseMessage, {
from: baseMessage.from && PeerId.createFromCID(baseMessage.from).toBytes()
}))
Message.encode({
...message,
from: message.from && PeerId.createFromCID(message.from).toBytes(),
signature: undefined,
key: undefined
})
])
// Get the public key
const pubKey = await messagePublicKey(message)
// verify the base message
// @ts-ignore - may not have signature
return pubKey.verify(bytes, message.signature)
}

View File

@ -104,12 +104,11 @@ class PeerStreams extends EventEmitter {
* @returns {void}
*/
write (data) {
if (!this.isWritable) {
if (!this.outboundStream) {
const id = this.id.toB58String()
throw new Error('No writable connection to ' + id)
}
// @ts-ignore - this.outboundStream could be null
this.outboundStream.push(data)
}
@ -117,7 +116,7 @@ class PeerStreams extends EventEmitter {
* Attach a raw inbound stream and setup a read stream
*
* @param {MuxedStream} stream
* @returns {void}
* @returns {AsyncIterable<Uint8Array>}
*/
attachInboundStream (stream) {
// Create and attach a new inbound stream
@ -135,6 +134,7 @@ class PeerStreams extends EventEmitter {
)
this.emit('stream:inbound')
return this.inboundStream
}
/**
@ -144,8 +144,7 @@ class PeerStreams extends EventEmitter {
* @returns {Promise<void>}
*/
async attachOutboundStream (stream) {
// If an outbound stream already exists,
// gently close it
// If an outbound stream already exists, gently close it
const _prevStream = this.outboundStream
if (this.outboundStream) {
// End the stream without emitting a close event

View File

@ -36,15 +36,30 @@ const fromString = require('uint8arrays/from-string')
const ENVELOPE_DOMAIN_PEER_RECORD = 'libp2p-peer-record'
const ENVELOPE_PAYLOAD_TYPE_PEER_RECORD = fromString('0301', 'hex')
class PeerRecord extends Record {
/**
* @implements {import('libp2p-interfaces/src/record/types').Record}
*/
class PeerRecord {
constructor (peerId, multiaddrs, seqNumber) {
super (ENVELOPE_DOMAIN_PEER_RECORD, ENVELOPE_PAYLOAD_TYPE_PEER_RECORD)
this.domain = ENVELOPE_DOMAIN_PEER_RECORD
this.codec = ENVELOPE_PAYLOAD_TYPE_PEER_RECORD
}
/**
* Marshal a record to be used in an envelope.
*
* @returns {Uint8Array}
*/
marshal () {
// Implement and return using Protobuf
}
/**
* Returns true if `this` record equals the `other`.
*
* @param {PeerRecord} other
* @returns {other is Record}
*/
equals (other) {
// Verify
}
@ -73,4 +88,4 @@ Verifies if the other Record is identical to this one.
- other is a `Record` to compare with the current instance.
**Returns**
- `boolean`
- `other is Record`

View File

@ -1,41 +0,0 @@
'use strict'
const errcode = require('err-code')
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
class Record {
/**
* @class
* @param {string} domain - signature domain
* @param {Uint8Array} codec - identifier of the type of record
*/
constructor (domain, codec) {
this.domain = domain
this.codec = codec
}
// eslint-disable-next-line
/**
* Marshal a record to be used in an envelope.
*
* @returns {Uint8Array}
*/
marshal () {
throw errcode(new Error('marshal must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
// eslint-disable-next-line
/**
* Verifies if the other provided Record is identical to this one.
*
* @param {Record} other
* @returns {boolean}
*/
equals (other) {
throw errcode(new Error('equals must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
}
}
module.exports = Record

21
src/record/types.ts Normal file
View File

@ -0,0 +1,21 @@
/**
* Record is the base implementation of a record that can be used as the payload of a libp2p envelope.
*/
export interface Record {
/**
* signature domain.
*/
domain: string;
/**
* identifier of the type of record
*/
codec: Uint8Array;
/**
* Marshal a record to be used in an envelope.
*/
marshal(): Uint8Array;
/**
* erifies if the other provided Record is identical to this one.
*/
equals(other: any): other is Record
}

View File

@ -1,13 +1,15 @@
import BufferList from 'bl'
export interface MuxerFactory {
new (options: MuxerOptions): Muxer;
multicodec: string;
}
/**
* A libp2p stream muxer
*/
export interface Muxer {
new (options: MuxerOptions): Muxer; // eslint-disable-line
multicodec: string;
readonly streams: Array<MuxedStream>;
prototype: Muxer;
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of th stream will be used.

View File

@ -7,12 +7,14 @@ export type DialOptions = {
signal?: AbortSignal
}
export interface TransportFactory<DialOptions extends { signal?: AbortSignal }> {
new(upgrader: Upgrader): Transport<DialOptions>;
}
/**
* A libp2p transport is understood as something that offers a dial and listen interface to establish connections.
*/
export interface Transport <DialOptions extends { signal?: AbortSignal }> {
new (upgrader: Upgrader, ...others: any): Transport<DialOptions>; // eslint-disable-line
prototype: Transport <DialOptions>;
/**
* Dial a given multiaddr.
*/
@ -20,7 +22,7 @@ export interface Transport <DialOptions extends { signal?: AbortSignal }> {
/**
* Create transport listeners.
*/
createListener(options: any, handler: (Connection) => void): Listener;
createListener(options: unknown, handler?: (connection: Connection) => void): Listener;
/**
* Takes a list of `Multiaddr`s and returns only valid addresses for the transport
*/
@ -66,7 +68,7 @@ export type MultiaddrConnection = {
sink: Sink;
source: () => AsyncIterable<Uint8Array>;
close: (err?: Error) => Promise<void>;
conn: any;
conn: unknown;
remoteAddr: Multiaddr;
localAddr?: Multiaddr;
timeline: MultiaddrConnectionTimeline;