fix: catch pipe errors (#678)

* fix: catch pipe errors

There were some pipe errors not being caught. This can result in unhandled exceptions being thrown

* fix: catch pipe errors in identify push handler
This commit is contained in:
Jacob Heun
2020-06-18 15:33:08 +02:00
committed by GitHub
parent a1590acc8b
commit a8219e61a0
4 changed files with 23 additions and 20 deletions

View File

@ -208,7 +208,7 @@ class IdentifyService {
* @param {*} options.stream * @param {*} options.stream
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
_handleIdentify ({ connection, stream }) { async _handleIdentify ({ connection, stream }) {
let publicKey = Buffer.alloc(0) let publicKey = Buffer.alloc(0)
if (this.peerId.pubKey) { if (this.peerId.pubKey) {
publicKey = this.peerId.pubKey.bytes publicKey = this.peerId.pubKey.bytes
@ -223,12 +223,16 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys()) protocols: Array.from(this._protocols.keys())
}) })
pipe( try {
await pipe(
[message], [message],
lp.encode(), lp.encode(),
stream, stream,
consume consume
) )
} catch (err) {
log.error('could not respond to identify request', err)
}
} }
/** /**
@ -239,6 +243,8 @@ class IdentifyService {
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
async _handlePush ({ connection, stream }) { async _handlePush ({ connection, stream }) {
let message
try {
const [data] = await pipe( const [data] = await pipe(
[], [],
stream, stream,
@ -247,9 +253,6 @@ class IdentifyService {
toBuffer, toBuffer,
collect collect
) )
let message
try {
message = Message.decode(data) message = Message.decode(data)
} catch (err) { } catch (err) {
return log.error('received invalid message', err) return log.error('received invalid message', err)

View File

@ -215,7 +215,7 @@ class Metrics {
const _sink = stream.sink const _sink = stream.sink
stream.sink = source => { stream.sink = source => {
pipe( return pipe(
source, source,
tap(chunk => metrics._onMessage({ tap(chunk => metrics._onMessage({
remotePeer, remotePeer,

View File

@ -17,7 +17,7 @@ const handshake = require('it-handshake')
const { NONCE_LENGTH } = require('./key-generator') const { NONCE_LENGTH } = require('./key-generator')
const debug = require('debug') const debug = require('debug')
const log = debug('libp2p:pnet') const log = debug('libp2p:pnet')
log.err = debug('libp2p:pnet:err') log.error = debug('libp2p:pnet:err')
/** /**
* Takes a Private Shared Key (psk) and provides a `protect` method * Takes a Private Shared Key (psk) and provides a `protect` method
@ -69,7 +69,7 @@ class Protector {
// Decrypt all inbound traffic // Decrypt all inbound traffic
createUnboxStream(remoteNonce, this.psk), createUnboxStream(remoteNonce, this.psk),
external external
) ).catch(log.error)
return internal return internal
} }

View File

@ -258,7 +258,7 @@ class Upgrader {
} }
// Pipe all data through the muxer // Pipe all data through the muxer
pipe(upgradedConn, muxer, upgradedConn) pipe(upgradedConn, muxer, upgradedConn).catch(log.error)
} }
const _timeline = maConn.timeline const _timeline = maConn.timeline