diff --git a/src/identify/index.js b/src/identify/index.js index a7351ecb..a6650d22 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -227,7 +227,7 @@ class IdentifyService { * @param {*} options.stream * @param {Connection} options.connection */ - _handleIdentify ({ connection, stream }) { + async _handleIdentify ({ connection, stream }) { let publicKey = Buffer.alloc(0) if (this.peerInfo.id.pubKey) { publicKey = this.peerInfo.id.pubKey.bytes @@ -242,12 +242,16 @@ class IdentifyService { protocols: Array.from(this._protocols.keys()) }) - pipe( - [message], - lp.encode(), - stream, - consume - ) + try { + await pipe( + [message], + lp.encode(), + stream, + consume + ) + } catch (err) { + log.error('could not respond to identify request', err) + } } /** @@ -258,17 +262,16 @@ class IdentifyService { * @param {Connection} options.connection */ async _handlePush ({ connection, stream }) { - const [data] = await pipe( - [], - stream, - lp.decode(), - take(1), - toBuffer, - collect - ) - let message try { + const [data] = await pipe( + [], + stream, + lp.decode(), + take(1), + toBuffer, + collect + ) message = Message.decode(data) } catch (err) { return log.error('received invalid message', err) diff --git a/src/metrics/index.js b/src/metrics/index.js index e687f87b..36afd765 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -210,7 +210,7 @@ class Metrics { const _sink = stream.sink stream.sink = source => { - pipe( + return pipe( source, tap(chunk => metrics._onMessage({ remotePeer, diff --git a/src/pnet/index.js b/src/pnet/index.js index b1a70ac3..2754e6de 100644 --- a/src/pnet/index.js +++ b/src/pnet/index.js @@ -17,7 +17,7 @@ const handshake = require('it-handshake') const { NONCE_LENGTH } = require('./key-generator') const debug = require('debug') const log = debug('libp2p:pnet') -log.err = debug('libp2p:pnet:err') +log.error = debug('libp2p:pnet:err') /** * Takes a Private Shared Key (psk) and provides a `protect` method @@ -69,7 +69,7 @@ class Protector { // Decrypt all inbound traffic createUnboxStream(remoteNonce, this.psk), external - ) + ).catch(log.error) return internal } diff --git a/src/upgrader.js b/src/upgrader.js index 25cb6d3f..04e503c6 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -258,7 +258,7 @@ class Upgrader { } // Pipe all data through the muxer - pipe(upgradedConn, muxer, upgradedConn) + pipe(upgradedConn, muxer, upgradedConn).catch(log.error) } const _timeline = maConn.timeline diff --git a/test/metrics/index.node.js b/test/metrics/index.node.js index f3fa8c7c..1316340e 100644 --- a/test/metrics/index.node.js +++ b/test/metrics/index.node.js @@ -99,6 +99,33 @@ describe('libp2p.metrics', () => { await remoteLibp2p.stop() }) + it('should record metrics on connections and streams when enabled', async () => { + const config = { + ...baseOptions, + connectionManager: { + movingAverageIntervals: [10] + }, + metrics: { + enabled: true, + computeThrottleMaxQueueSize: 1, // compute after every message + movingAverageIntervals: [10] + } + } + let remoteLibp2p + ;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config }) + + remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) + + const connection = await libp2p.dial(remoteLibp2p.peerInfo) + const { stream } = await connection.newStream('/echo/1.0.0') + + const bytes = randomBytes(512) + const results = stream.sink([bytes]) + + expect(results).to.exist() + await remoteLibp2p.stop() + }) + it('should move disconnected peers to the old peers list', async () => { const config = { ...baseOptions,