fix: back port #678 to catch pipe errors (#708)

* fix(metrics): return the sink wrap

* fix: back port #678
This commit is contained in:
Jacob Heun 2020-07-14 00:15:16 +02:00 committed by GitHub
parent aadeb73c94
commit 4b88f0e94a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 20 deletions

View File

@ -227,7 +227,7 @@ class IdentifyService {
* @param {*} options.stream * @param {*} options.stream
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
_handleIdentify ({ connection, stream }) { async _handleIdentify ({ connection, stream }) {
let publicKey = Buffer.alloc(0) let publicKey = Buffer.alloc(0)
if (this.peerInfo.id.pubKey) { if (this.peerInfo.id.pubKey) {
publicKey = this.peerInfo.id.pubKey.bytes publicKey = this.peerInfo.id.pubKey.bytes
@ -242,12 +242,16 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys()) protocols: Array.from(this._protocols.keys())
}) })
pipe( try {
[message], await pipe(
lp.encode(), [message],
stream, lp.encode(),
consume stream,
) consume
)
} catch (err) {
log.error('could not respond to identify request', err)
}
} }
/** /**
@ -258,17 +262,16 @@ class IdentifyService {
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
async _handlePush ({ connection, stream }) { async _handlePush ({ connection, stream }) {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
let message let message
try { try {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
message = Message.decode(data) message = Message.decode(data)
} catch (err) { } catch (err) {
return log.error('received invalid message', err) return log.error('received invalid message', err)

View File

@ -210,7 +210,7 @@ class Metrics {
const _sink = stream.sink const _sink = stream.sink
stream.sink = source => { stream.sink = source => {
pipe( return pipe(
source, source,
tap(chunk => metrics._onMessage({ tap(chunk => metrics._onMessage({
remotePeer, remotePeer,

View File

@ -17,7 +17,7 @@ const handshake = require('it-handshake')
const { NONCE_LENGTH } = require('./key-generator') const { NONCE_LENGTH } = require('./key-generator')
const debug = require('debug') const debug = require('debug')
const log = debug('libp2p:pnet') const log = debug('libp2p:pnet')
log.err = debug('libp2p:pnet:err') log.error = debug('libp2p:pnet:err')
/** /**
* Takes a Private Shared Key (psk) and provides a `protect` method * Takes a Private Shared Key (psk) and provides a `protect` method
@ -69,7 +69,7 @@ class Protector {
// Decrypt all inbound traffic // Decrypt all inbound traffic
createUnboxStream(remoteNonce, this.psk), createUnboxStream(remoteNonce, this.psk),
external external
) ).catch(log.error)
return internal return internal
} }

View File

@ -258,7 +258,7 @@ class Upgrader {
} }
// Pipe all data through the muxer // Pipe all data through the muxer
pipe(upgradedConn, muxer, upgradedConn) pipe(upgradedConn, muxer, upgradedConn).catch(log.error)
} }
const _timeline = maConn.timeline const _timeline = maConn.timeline

View File

@ -99,6 +99,33 @@ describe('libp2p.metrics', () => {
await remoteLibp2p.stop() await remoteLibp2p.stop()
}) })
it('should record metrics on connections and streams when enabled', async () => {
const config = {
...baseOptions,
connectionManager: {
movingAverageIntervals: [10]
},
metrics: {
enabled: true,
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10]
}
}
let remoteLibp2p
;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config })
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
const connection = await libp2p.dial(remoteLibp2p.peerInfo)
const { stream } = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
const results = stream.sink([bytes])
expect(results).to.exist()
await remoteLibp2p.stop()
})
it('should move disconnected peers to the old peers list', async () => { it('should move disconnected peers to the old peers list', async () => {
const config = { const config = {
...baseOptions, ...baseOptions,