From 7706e39f59f139f951b8c6f8cfacc2097a62bc01 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Thu, 10 Dec 2020 01:12:33 -0800 Subject: [PATCH] fix: remaining type errors --- src/circuit/circuit/hop.js | 19 +++++++-------- src/circuit/circuit/stop.js | 23 ++++++++++-------- src/circuit/circuit/stream-handler.js | 14 ++++++----- src/circuit/circuit/utils.js | 9 ++++--- src/circuit/protocol/index.js | 5 +++- src/circuit/transport.js | 2 +- src/insecure/proto.js | 10 ++++++++ src/record/peer-record/index.js | 1 - src/registrar.js | 1 - src/types.ts | 34 +++++++++++++++++++++------ src/upgrader.js | 4 +++- 11 files changed, 81 insertions(+), 41 deletions(-) diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index 3c1730ce..32ab8b98 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -18,17 +18,16 @@ const { stop } = require('./stop') const multicodec = require('./../multicodec') /** - * @typedef {import('../../types').CircuitRequest} Request + * @typedef {import('../../types').CircuitRequest} CircuitRequest * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection - * @typedef {import('./stream-handler')} StreamHandlerT * @typedef {import('../transport')} Transport */ /** * @typedef {Object} HopRequest * @property {Connection} connection - * @property {Request} request - * @property {StreamHandlerT} streamHandler + * @property {CircuitRequest} request + * @property {StreamHandler} streamHandler * @property {Transport} circuit */ @@ -113,7 +112,7 @@ async function handleHop ({ * * @param {object} options * @param {Connection} options.connection - Connection to the relay - * @param {Request} options.request + * @param {CircuitRequest} options.request * @returns {Promise} */ async function hop ({ @@ -128,14 +127,14 @@ async function hop ({ const response = await streamHandler.read() - if (response.code === CircuitPB.Status.SUCCESS) { + if (response && response.code === CircuitPB.Status.SUCCESS) { log('hop request was successful') return streamHandler.rest() } - log('hop request failed with code %d, closing stream', response.code) + log('hop request failed with code %d, closing stream', response && response.code) streamHandler.close() - throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED) + throw errCode(new Error(`HOP request failed with code ${response && response.code}`), Errors.ERR_HOP_REQUEST_FAILED) } /** @@ -159,7 +158,7 @@ async function canHop ({ const response = await streamHandler.read() await streamHandler.close() - if (response.code !== CircuitPB.Status.SUCCESS) { + if (!response || response.code !== CircuitPB.Status.SUCCESS) { return false } @@ -171,7 +170,7 @@ async function canHop ({ * * @param {Object} options * @param {Connection} options.connection - * @param {StreamHandlerT} options.streamHandler + * @param {StreamHandler} options.streamHandler * @param {Transport} options.circuit * @private */ diff --git a/src/circuit/circuit/stop.js b/src/circuit/circuit/stop.js index 2e41e48c..15ded78e 100644 --- a/src/circuit/circuit/stop.js +++ b/src/circuit/circuit/stop.js @@ -13,8 +13,7 @@ const { validateAddrs } = require('./utils') /** * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream - * @typedef {import('../../types').CircuitRequest} Request - * @typedef {import('./stream-handler')} StreamHandlerT + * @typedef {import('../../types').CircuitRequest} CircuitRequest */ /** @@ -23,8 +22,8 @@ const { validateAddrs } = require('./utils') * @private * @param {Object} options * @param {Connection} options.connection - * @param {Request} options.request - The CircuitRelay protobuf request (unencoded) - * @param {StreamHandlerT} options.streamHandler + * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded) + * @param {StreamHandler} options.streamHandler * @returns {Promise|void} Resolves a duplex iterable */ module.exports.handleStop = function handleStop ({ @@ -54,7 +53,7 @@ module.exports.handleStop = function handleStop ({ * @private * @param {Object} options * @param {Connection} options.connection - * @param {Request} options.request - The CircuitRelay protobuf request (unencoded) + * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded) * @returns {Promise} Resolves a duplex iterable */ module.exports.stop = async function stop ({ @@ -68,11 +67,15 @@ module.exports.stop = async function stop ({ streamHandler.write(request) const response = await streamHandler.read() - if (response.code === CircuitPB.Status.SUCCESS) { - log('stop request to %s was successful', connection.remotePeer.toB58String()) - return streamHandler.rest() - } + if (response) { + if (response.code === CircuitPB.Status.SUCCESS) { + log('stop request to %s was successful', connection.remotePeer.toB58String()) + return streamHandler.rest() + } - log('stop request failed with code %d', response.code) + log('stop request failed with code %d', response.code) + } else { + log('stop request was not received') + } streamHandler.close() } diff --git a/src/circuit/circuit/stream-handler.js b/src/circuit/circuit/stream-handler.js index 5be2c6ed..c61a99f2 100644 --- a/src/circuit/circuit/stream-handler.js +++ b/src/circuit/circuit/stream-handler.js @@ -10,12 +10,11 @@ const handshake = require('it-handshake') const { CircuitRelay: CircuitPB } = require('../protocol') /** + * @typedef {import('../../types').CircuitRequest} CircuitRequest + * @typedef {import('../../types').CircuitMessage} CircuitMessage * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream */ -/** - * @template T - */ class StreamHandler { /** * Create a stream handler for connection @@ -36,14 +35,14 @@ class StreamHandler { * Read and decode message * * @async - * @returns {Promise} + * @returns {Promise} */ async read () { const msg = await this.decoder.next() if (msg.value) { const value = CircuitPB.decode(msg.value.slice()) log('read message type', value.type) - return value + return /** @type {CircuitRequest} */(value) } log('read received no value, closing stream') @@ -54,7 +53,7 @@ class StreamHandler { /** * Encode and write array of buffers * - * @param {CircuitPB} msg - An unencoded CircuitRelay protobuf message + * @param {CircuitMessage} msg - An unencoded CircuitRelay protobuf message * @returns {void} */ write (msg) { @@ -73,6 +72,9 @@ class StreamHandler { return this.shake.stream } + /** + * @param {CircuitMessage} msg + */ end (msg) { this.write(msg) this.close() diff --git a/src/circuit/circuit/utils.js b/src/circuit/circuit/utils.js index 65c5afe4..80ef01c7 100644 --- a/src/circuit/circuit/utils.js +++ b/src/circuit/circuit/utils.js @@ -6,6 +6,8 @@ const { CircuitRelay } = require('../protocol') /** * @typedef {import('./stream-handler')} StreamHandler * @typedef {import('../../types').CircuitStatus} CircuitStatus + * @typedef {import('../../types').CircuitMessage} CircuitMessage + * @typedef {import('../../types').CircuitRequest} CircuitRequest */ /** @@ -24,12 +26,13 @@ function writeResponse (streamHandler, status) { /** * Validate incomming HOP/STOP message * - * @param {*} msg - A CircuitRelay unencoded protobuf message + * @param {CircuitRequest} msg - A CircuitRelay unencoded protobuf message * @param {StreamHandler} streamHandler */ function validateAddrs (msg, streamHandler) { + const { srcPeer, dstPeer } = /** @type {CircuitRequest} */(msg) try { - msg.dstPeer.addrs.forEach((addr) => { + dstPeer.addrs.forEach((addr) => { return multiaddr(addr) }) } catch (err) { @@ -40,7 +43,7 @@ function validateAddrs (msg, streamHandler) { } try { - msg.srcPeer.addrs.forEach((addr) => { + srcPeer.addrs.forEach((addr) => { return multiaddr(addr) }) } catch (err) { diff --git a/src/circuit/protocol/index.js b/src/circuit/protocol/index.js index a9d3e31a..f25c1c38 100644 --- a/src/circuit/protocol/index.js +++ b/src/circuit/protocol/index.js @@ -1,7 +1,10 @@ 'use strict' const protobuf = require('protons') -/** @type {{CircuitRelay: import('../../types').CircuitMessageProto}} */ +/** + * @type {{CircuitRelay: import('../../types').CircuitMessageProto}} + */ + module.exports = protobuf(` message CircuitRelay { diff --git a/src/circuit/transport.js b/src/circuit/transport.js index 650bb97d..eaf031d8 100644 --- a/src/circuit/transport.js +++ b/src/circuit/transport.js @@ -101,7 +101,7 @@ class Circuit { remoteAddr, localAddr }) - const type = request.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' + const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound' log('new %s connection %s', type, maConn.remoteAddr) const conn = await this._upgrader.upgradeInbound(maConn) diff --git a/src/insecure/proto.js b/src/insecure/proto.js index 2c7d7e89..9df71496 100644 --- a/src/insecure/proto.js +++ b/src/insecure/proto.js @@ -2,6 +2,16 @@ const protobuf = require('protons') +/** + * @typedef {Object} Proto + * @property {import('../types').ExchangeProto} Exchange, + * @property {typeof import('../types').KeyType} KeyType + * @property {import('../types').PublicKeyProto} PublicKey + */ + +/** + * @type {Proto} + */ module.exports = protobuf(` message Exchange { optional bytes id = 1; diff --git a/src/record/peer-record/index.js b/src/record/peer-record/index.js index 334e717e..4ded73c4 100644 --- a/src/record/peer-record/index.js +++ b/src/record/peer-record/index.js @@ -11,7 +11,6 @@ const { } = require('./consts') /** - * @typedef {import('peer-id')} PeerId * @typedef {import('multiaddr')} Multiaddr * @typedef {import('libp2p-interfaces/src/record/types').Record} Record */ diff --git a/src/registrar.js b/src/registrar.js index 367f110c..1a977ceb 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -16,7 +16,6 @@ const Topology = require('libp2p-interfaces/src/topology') * @typedef {import('./peer-store')} PeerStore * @typedef {import('./connection-manager')} ConnectionManager * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection - * @typedef {import('libp2p-interfaces/src/topology')} Topology */ /** diff --git a/src/types.ts b/src/types.ts index 3e87d7c8..088c5677 100644 --- a/src/types.ts +++ b/src/types.ts @@ -8,9 +8,9 @@ export enum KeyType { } // Protobufs -export type MessageProto = { - encode(value: any): Uint8Array - decode(bytes: Uint8Array): any +export interface MessageProto { + encode(value: T): Uint8Array + decode(bytes: Uint8Array): T } export type SUCCESS = 100; @@ -50,13 +50,19 @@ export type CircuitPeer = { export type CircuitRequest = { type: CircuitType + code?: CircuitStatus dstPeer: CircuitPeer srcPeer: CircuitPeer } -export type CircuitMessageProto = { - encode(value: any): Uint8Array - decode(bytes: Uint8Array): any +export type CircuitMessage = { + type?: CircuitType + dstPeer?: CircuitPeer + srcPeer?: CircuitPeer + code?: CircuitStatus +} + +export interface CircuitMessageProto extends MessageProto { Status: { SUCCESS: SUCCESS, HOP_SRC_ADDR_TOO_LONG: HOP_SRC_ADDR_TOO_LONG, @@ -74,7 +80,7 @@ export type CircuitMessageProto = { STOP_DST_MULTIADDR_INVALID: STOP_DST_MULTIADDR_INVALID, STOP_RELAY_REFUSED: STOP_RELAY_REFUSED, MALFORMED_MESSAGE: MALFORMED_MESSAGE - }, + } Type: { HOP: HOP, STOP: STOP, @@ -82,3 +88,17 @@ export type CircuitMessageProto = { CAN_HOP: CAN_HOP } } + + +export type Exchange = { + id: Uint8Array + pubkey: PublicKey +} +export type ExchangeProto = MessageProto + +export type PublicKey = { + Type: KeyType, + Data: Uint8Array +} + +export type PublicKeyProto = MessageProto diff --git a/src/upgrader.js b/src/upgrader.js index 14d0a4e8..f6af1e9f 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -6,7 +6,7 @@ const log = Object.assign(debug('libp2p:upgrader'), { }) const errCode = require('err-code') const Multistream = require('multistream-select') -const { Connection } = require('libp2p-interfaces/src/connection') +const Connection = require('libp2p-interfaces/src/connection/connection') const PeerId = require('peer-id') const { pipe } = require('it-pipe') const mutableProxy = require('mutable-proxy') @@ -219,6 +219,7 @@ class Upgrader { let muxer let newStream // eslint-disable-next-line prefer-const + /** @type {Connection} */ let connection if (Muxer) { @@ -231,6 +232,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) + // @ts-ignore - metadata seems to be required connection.addStream(muxedStream, { protocol }) this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol }) } catch (err) {