Compare commits

...

3 Commits

Author SHA1 Message Date
6a88839b8f chore: release version v0.27.9 2020-07-14 00:46:23 +02:00
ae765d7cf7 chore: update contributors 2020-07-14 00:46:23 +02:00
4b88f0e94a fix: back port #678 to catch pipe errors (#708)
* fix(metrics): return the sink wrap

* fix: back port #678
2020-07-14 00:15:16 +02:00
7 changed files with 61 additions and 21 deletions

View File

@ -1,3 +1,13 @@
<a name="0.27.9"></a>
## [0.27.9](https://github.com/libp2p/js-libp2p/compare/v0.27.8...v0.27.9) (2020-07-13)
### Bug Fixes
* back port [#678](https://github.com/libp2p/js-libp2p/issues/678) to catch pipe errors ([#708](https://github.com/libp2p/js-libp2p/issues/708)) ([4b88f0e](https://github.com/libp2p/js-libp2p/commit/4b88f0e))
<a name="0.27.8"></a> <a name="0.27.8"></a>
## [0.27.8](https://github.com/libp2p/js-libp2p/compare/v0.27.7...v0.27.8) (2020-05-06) ## [0.27.8](https://github.com/libp2p/js-libp2p/compare/v0.27.7...v0.27.8) (2020-05-06)

View File

@ -1,6 +1,6 @@
{ {
"name": "libp2p", "name": "libp2p",
"version": "0.27.8", "version": "0.27.9",
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack", "description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>", "leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js", "main": "src/index.js",

View File

@ -227,7 +227,7 @@ class IdentifyService {
* @param {*} options.stream * @param {*} options.stream
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
_handleIdentify ({ connection, stream }) { async _handleIdentify ({ connection, stream }) {
let publicKey = Buffer.alloc(0) let publicKey = Buffer.alloc(0)
if (this.peerInfo.id.pubKey) { if (this.peerInfo.id.pubKey) {
publicKey = this.peerInfo.id.pubKey.bytes publicKey = this.peerInfo.id.pubKey.bytes
@ -242,12 +242,16 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys()) protocols: Array.from(this._protocols.keys())
}) })
pipe( try {
[message], await pipe(
lp.encode(), [message],
stream, lp.encode(),
consume stream,
) consume
)
} catch (err) {
log.error('could not respond to identify request', err)
}
} }
/** /**
@ -258,17 +262,16 @@ class IdentifyService {
* @param {Connection} options.connection * @param {Connection} options.connection
*/ */
async _handlePush ({ connection, stream }) { async _handlePush ({ connection, stream }) {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
let message let message
try { try {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
message = Message.decode(data) message = Message.decode(data)
} catch (err) { } catch (err) {
return log.error('received invalid message', err) return log.error('received invalid message', err)

View File

@ -210,7 +210,7 @@ class Metrics {
const _sink = stream.sink const _sink = stream.sink
stream.sink = source => { stream.sink = source => {
pipe( return pipe(
source, source,
tap(chunk => metrics._onMessage({ tap(chunk => metrics._onMessage({
remotePeer, remotePeer,

View File

@ -17,7 +17,7 @@ const handshake = require('it-handshake')
const { NONCE_LENGTH } = require('./key-generator') const { NONCE_LENGTH } = require('./key-generator')
const debug = require('debug') const debug = require('debug')
const log = debug('libp2p:pnet') const log = debug('libp2p:pnet')
log.err = debug('libp2p:pnet:err') log.error = debug('libp2p:pnet:err')
/** /**
* Takes a Private Shared Key (psk) and provides a `protect` method * Takes a Private Shared Key (psk) and provides a `protect` method
@ -69,7 +69,7 @@ class Protector {
// Decrypt all inbound traffic // Decrypt all inbound traffic
createUnboxStream(remoteNonce, this.psk), createUnboxStream(remoteNonce, this.psk),
external external
) ).catch(log.error)
return internal return internal
} }

View File

@ -258,7 +258,7 @@ class Upgrader {
} }
// Pipe all data through the muxer // Pipe all data through the muxer
pipe(upgradedConn, muxer, upgradedConn) pipe(upgradedConn, muxer, upgradedConn).catch(log.error)
} }
const _timeline = maConn.timeline const _timeline = maConn.timeline

View File

@ -99,6 +99,33 @@ describe('libp2p.metrics', () => {
await remoteLibp2p.stop() await remoteLibp2p.stop()
}) })
it('should record metrics on connections and streams when enabled', async () => {
const config = {
...baseOptions,
connectionManager: {
movingAverageIntervals: [10]
},
metrics: {
enabled: true,
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10]
}
}
let remoteLibp2p
;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config })
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
const connection = await libp2p.dial(remoteLibp2p.peerInfo)
const { stream } = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
const results = stream.sink([bytes])
expect(results).to.exist()
await remoteLibp2p.stop()
})
it('should move disconnected peers to the old peers list', async () => { it('should move disconnected peers to the old peers list', async () => {
const config = { const config = {
...baseOptions, ...baseOptions,