fix: remaining type errors

This commit is contained in:
Irakli Gozalishvili 2020-12-10 01:12:33 -08:00
parent ce075443b6
commit 7706e39f59
No known key found for this signature in database
GPG Key ID: C80F9B292FB470DE
11 changed files with 81 additions and 41 deletions

View File

@ -18,17 +18,16 @@ const { stop } = require('./stop')
const multicodec = require('./../multicodec') const multicodec = require('./../multicodec')
/** /**
* @typedef {import('../../types').CircuitRequest} Request * @typedef {import('../../types').CircuitRequest} CircuitRequest
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('./stream-handler')<Request>} StreamHandlerT
* @typedef {import('../transport')} Transport * @typedef {import('../transport')} Transport
*/ */
/** /**
* @typedef {Object} HopRequest * @typedef {Object} HopRequest
* @property {Connection} connection * @property {Connection} connection
* @property {Request} request * @property {CircuitRequest} request
* @property {StreamHandlerT} streamHandler * @property {StreamHandler} streamHandler
* @property {Transport} circuit * @property {Transport} circuit
*/ */
@ -113,7 +112,7 @@ async function handleHop ({
* *
* @param {object} options * @param {object} options
* @param {Connection} options.connection - Connection to the relay * @param {Connection} options.connection - Connection to the relay
* @param {Request} options.request * @param {CircuitRequest} options.request
* @returns {Promise<Connection>} * @returns {Promise<Connection>}
*/ */
async function hop ({ async function hop ({
@ -128,14 +127,14 @@ async function hop ({
const response = await streamHandler.read() const response = await streamHandler.read()
if (response.code === CircuitPB.Status.SUCCESS) { if (response && response.code === CircuitPB.Status.SUCCESS) {
log('hop request was successful') log('hop request was successful')
return streamHandler.rest() return streamHandler.rest()
} }
log('hop request failed with code %d, closing stream', response.code) log('hop request failed with code %d, closing stream', response && response.code)
streamHandler.close() streamHandler.close()
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED) throw errCode(new Error(`HOP request failed with code ${response && response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
} }
/** /**
@ -159,7 +158,7 @@ async function canHop ({
const response = await streamHandler.read() const response = await streamHandler.read()
await streamHandler.close() await streamHandler.close()
if (response.code !== CircuitPB.Status.SUCCESS) { if (!response || response.code !== CircuitPB.Status.SUCCESS) {
return false return false
} }
@ -171,7 +170,7 @@ async function canHop ({
* *
* @param {Object} options * @param {Object} options
* @param {Connection} options.connection * @param {Connection} options.connection
* @param {StreamHandlerT} options.streamHandler * @param {StreamHandler} options.streamHandler
* @param {Transport} options.circuit * @param {Transport} options.circuit
* @private * @private
*/ */

View File

@ -13,8 +13,7 @@ const { validateAddrs } = require('./utils')
/** /**
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('../../types').CircuitRequest} Request * @typedef {import('../../types').CircuitRequest} CircuitRequest
* @typedef {import('./stream-handler')<Request>} StreamHandlerT
*/ */
/** /**
@ -23,8 +22,8 @@ const { validateAddrs } = require('./utils')
* @private * @private
* @param {Object} options * @param {Object} options
* @param {Connection} options.connection * @param {Connection} options.connection
* @param {Request} options.request - The CircuitRelay protobuf request (unencoded) * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded)
* @param {StreamHandlerT} options.streamHandler * @param {StreamHandler} options.streamHandler
* @returns {Promise<MuxedStream>|void} Resolves a duplex iterable * @returns {Promise<MuxedStream>|void} Resolves a duplex iterable
*/ */
module.exports.handleStop = function handleStop ({ module.exports.handleStop = function handleStop ({
@ -54,7 +53,7 @@ module.exports.handleStop = function handleStop ({
* @private * @private
* @param {Object} options * @param {Object} options
* @param {Connection} options.connection * @param {Connection} options.connection
* @param {Request} options.request - The CircuitRelay protobuf request (unencoded) * @param {CircuitRequest} options.request - The CircuitRelay protobuf request (unencoded)
* @returns {Promise<MuxedStream|void>} Resolves a duplex iterable * @returns {Promise<MuxedStream|void>} Resolves a duplex iterable
*/ */
module.exports.stop = async function stop ({ module.exports.stop = async function stop ({
@ -68,11 +67,15 @@ module.exports.stop = async function stop ({
streamHandler.write(request) streamHandler.write(request)
const response = await streamHandler.read() const response = await streamHandler.read()
if (response.code === CircuitPB.Status.SUCCESS) { if (response) {
log('stop request to %s was successful', connection.remotePeer.toB58String()) if (response.code === CircuitPB.Status.SUCCESS) {
return streamHandler.rest() log('stop request to %s was successful', connection.remotePeer.toB58String())
} return streamHandler.rest()
}
log('stop request failed with code %d', response.code) log('stop request failed with code %d', response.code)
} else {
log('stop request was not received')
}
streamHandler.close() streamHandler.close()
} }

View File

@ -10,12 +10,11 @@ const handshake = require('it-handshake')
const { CircuitRelay: CircuitPB } = require('../protocol') const { CircuitRelay: CircuitPB } = require('../protocol')
/** /**
* @typedef {import('../../types').CircuitRequest} CircuitRequest
* @typedef {import('../../types').CircuitMessage} CircuitMessage
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
*/ */
/**
* @template T
*/
class StreamHandler { class StreamHandler {
/** /**
* Create a stream handler for connection * Create a stream handler for connection
@ -36,14 +35,14 @@ class StreamHandler {
* Read and decode message * Read and decode message
* *
* @async * @async
* @returns {Promise<T|undefined>} * @returns {Promise<CircuitRequest|undefined>}
*/ */
async read () { async read () {
const msg = await this.decoder.next() const msg = await this.decoder.next()
if (msg.value) { if (msg.value) {
const value = CircuitPB.decode(msg.value.slice()) const value = CircuitPB.decode(msg.value.slice())
log('read message type', value.type) log('read message type', value.type)
return value return /** @type {CircuitRequest} */(value)
} }
log('read received no value, closing stream') log('read received no value, closing stream')
@ -54,7 +53,7 @@ class StreamHandler {
/** /**
* Encode and write array of buffers * Encode and write array of buffers
* *
* @param {CircuitPB} msg - An unencoded CircuitRelay protobuf message * @param {CircuitMessage} msg - An unencoded CircuitRelay protobuf message
* @returns {void} * @returns {void}
*/ */
write (msg) { write (msg) {
@ -73,6 +72,9 @@ class StreamHandler {
return this.shake.stream return this.shake.stream
} }
/**
* @param {CircuitMessage} msg
*/
end (msg) { end (msg) {
this.write(msg) this.write(msg)
this.close() this.close()

View File

@ -6,6 +6,8 @@ const { CircuitRelay } = require('../protocol')
/** /**
* @typedef {import('./stream-handler')} StreamHandler * @typedef {import('./stream-handler')} StreamHandler
* @typedef {import('../../types').CircuitStatus} CircuitStatus * @typedef {import('../../types').CircuitStatus} CircuitStatus
* @typedef {import('../../types').CircuitMessage} CircuitMessage
* @typedef {import('../../types').CircuitRequest} CircuitRequest
*/ */
/** /**
@ -24,12 +26,13 @@ function writeResponse (streamHandler, status) {
/** /**
* Validate incomming HOP/STOP message * Validate incomming HOP/STOP message
* *
* @param {*} msg - A CircuitRelay unencoded protobuf message * @param {CircuitRequest} msg - A CircuitRelay unencoded protobuf message
* @param {StreamHandler} streamHandler * @param {StreamHandler} streamHandler
*/ */
function validateAddrs (msg, streamHandler) { function validateAddrs (msg, streamHandler) {
const { srcPeer, dstPeer } = /** @type {CircuitRequest} */(msg)
try { try {
msg.dstPeer.addrs.forEach((addr) => { dstPeer.addrs.forEach((addr) => {
return multiaddr(addr) return multiaddr(addr)
}) })
} catch (err) { } catch (err) {
@ -40,7 +43,7 @@ function validateAddrs (msg, streamHandler) {
} }
try { try {
msg.srcPeer.addrs.forEach((addr) => { srcPeer.addrs.forEach((addr) => {
return multiaddr(addr) return multiaddr(addr)
}) })
} catch (err) { } catch (err) {

View File

@ -1,7 +1,10 @@
'use strict' 'use strict'
const protobuf = require('protons') const protobuf = require('protons')
/** @type {{CircuitRelay: import('../../types').CircuitMessageProto}} */ /**
* @type {{CircuitRelay: import('../../types').CircuitMessageProto}}
*/
module.exports = protobuf(` module.exports = protobuf(`
message CircuitRelay { message CircuitRelay {

View File

@ -101,7 +101,7 @@ class Circuit {
remoteAddr, remoteAddr,
localAddr localAddr
}) })
const type = request.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' const type = request.type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
log('new %s connection %s', type, maConn.remoteAddr) log('new %s connection %s', type, maConn.remoteAddr)
const conn = await this._upgrader.upgradeInbound(maConn) const conn = await this._upgrader.upgradeInbound(maConn)

View File

@ -2,6 +2,16 @@
const protobuf = require('protons') const protobuf = require('protons')
/**
* @typedef {Object} Proto
* @property {import('../types').ExchangeProto} Exchange,
* @property {typeof import('../types').KeyType} KeyType
* @property {import('../types').PublicKeyProto} PublicKey
*/
/**
* @type {Proto}
*/
module.exports = protobuf(` module.exports = protobuf(`
message Exchange { message Exchange {
optional bytes id = 1; optional bytes id = 1;

View File

@ -11,7 +11,6 @@ const {
} = require('./consts') } = require('./consts')
/** /**
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr')} Multiaddr * @typedef {import('multiaddr')} Multiaddr
* @typedef {import('libp2p-interfaces/src/record/types').Record} Record * @typedef {import('libp2p-interfaces/src/record/types').Record} Record
*/ */

View File

@ -16,7 +16,6 @@ const Topology = require('libp2p-interfaces/src/topology')
* @typedef {import('./peer-store')} PeerStore * @typedef {import('./peer-store')} PeerStore
* @typedef {import('./connection-manager')} ConnectionManager * @typedef {import('./connection-manager')} ConnectionManager
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection * @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('libp2p-interfaces/src/topology')} Topology
*/ */
/** /**

View File

@ -8,9 +8,9 @@ export enum KeyType {
} }
// Protobufs // Protobufs
export type MessageProto = { export interface MessageProto<T> {
encode(value: any): Uint8Array encode(value: T): Uint8Array
decode(bytes: Uint8Array): any decode(bytes: Uint8Array): T
} }
export type SUCCESS = 100; export type SUCCESS = 100;
@ -50,13 +50,19 @@ export type CircuitPeer = {
export type CircuitRequest = { export type CircuitRequest = {
type: CircuitType type: CircuitType
code?: CircuitStatus
dstPeer: CircuitPeer dstPeer: CircuitPeer
srcPeer: CircuitPeer srcPeer: CircuitPeer
} }
export type CircuitMessageProto = { export type CircuitMessage = {
encode(value: any): Uint8Array type?: CircuitType
decode(bytes: Uint8Array): any dstPeer?: CircuitPeer
srcPeer?: CircuitPeer
code?: CircuitStatus
}
export interface CircuitMessageProto extends MessageProto<CircuitMessage> {
Status: { Status: {
SUCCESS: SUCCESS, SUCCESS: SUCCESS,
HOP_SRC_ADDR_TOO_LONG: HOP_SRC_ADDR_TOO_LONG, HOP_SRC_ADDR_TOO_LONG: HOP_SRC_ADDR_TOO_LONG,
@ -74,7 +80,7 @@ export type CircuitMessageProto = {
STOP_DST_MULTIADDR_INVALID: STOP_DST_MULTIADDR_INVALID, STOP_DST_MULTIADDR_INVALID: STOP_DST_MULTIADDR_INVALID,
STOP_RELAY_REFUSED: STOP_RELAY_REFUSED, STOP_RELAY_REFUSED: STOP_RELAY_REFUSED,
MALFORMED_MESSAGE: MALFORMED_MESSAGE MALFORMED_MESSAGE: MALFORMED_MESSAGE
}, }
Type: { Type: {
HOP: HOP, HOP: HOP,
STOP: STOP, STOP: STOP,
@ -82,3 +88,17 @@ export type CircuitMessageProto = {
CAN_HOP: CAN_HOP CAN_HOP: CAN_HOP
} }
} }
export type Exchange = {
id: Uint8Array
pubkey: PublicKey
}
export type ExchangeProto = MessageProto<Exchange>
export type PublicKey = {
Type: KeyType,
Data: Uint8Array
}
export type PublicKeyProto = MessageProto<PublicKey>

View File

@ -6,7 +6,7 @@ const log = Object.assign(debug('libp2p:upgrader'), {
}) })
const errCode = require('err-code') const errCode = require('err-code')
const Multistream = require('multistream-select') const Multistream = require('multistream-select')
const { Connection } = require('libp2p-interfaces/src/connection') const Connection = require('libp2p-interfaces/src/connection/connection')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const { pipe } = require('it-pipe') const { pipe } = require('it-pipe')
const mutableProxy = require('mutable-proxy') const mutableProxy = require('mutable-proxy')
@ -219,6 +219,7 @@ class Upgrader {
let muxer let muxer
let newStream let newStream
// eslint-disable-next-line prefer-const // eslint-disable-next-line prefer-const
/** @type {Connection} */
let connection let connection
if (Muxer) { if (Muxer) {
@ -231,6 +232,7 @@ class Upgrader {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol) log('%s: incoming stream opened on %s', direction, protocol)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
// @ts-ignore - metadata seems to be required
connection.addStream(muxedStream, { protocol }) connection.addStream(muxedStream, { protocol })
this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol }) this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol })
} catch (err) { } catch (err) {