Compare commits

...

3 Commits

Author SHA1 Message Date
6a88839b8f chore: release version v0.27.9 2020-07-14 00:46:23 +02:00
ae765d7cf7 chore: update contributors 2020-07-14 00:46:23 +02:00
4b88f0e94a fix: back port #678 to catch pipe errors (#708)
* fix(metrics): return the sink wrap

* fix: back port #678
2020-07-14 00:15:16 +02:00
7 changed files with 61 additions and 21 deletions

View File

@ -1,3 +1,13 @@
<a name="0.27.9"></a>
## [0.27.9](https://github.com/libp2p/js-libp2p/compare/v0.27.8...v0.27.9) (2020-07-13)
### Bug Fixes
* back port [#678](https://github.com/libp2p/js-libp2p/issues/678) to catch pipe errors ([#708](https://github.com/libp2p/js-libp2p/issues/708)) ([4b88f0e](https://github.com/libp2p/js-libp2p/commit/4b88f0e))
<a name="0.27.8"></a>
## [0.27.8](https://github.com/libp2p/js-libp2p/compare/v0.27.7...v0.27.8) (2020-05-06)

View File

@ -1,6 +1,6 @@
{
"name": "libp2p",
"version": "0.27.8",
"version": "0.27.9",
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js",

View File

@ -227,7 +227,7 @@ class IdentifyService {
* @param {*} options.stream
* @param {Connection} options.connection
*/
_handleIdentify ({ connection, stream }) {
async _handleIdentify ({ connection, stream }) {
let publicKey = Buffer.alloc(0)
if (this.peerInfo.id.pubKey) {
publicKey = this.peerInfo.id.pubKey.bytes
@ -242,12 +242,16 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys())
})
pipe(
[message],
lp.encode(),
stream,
consume
)
try {
await pipe(
[message],
lp.encode(),
stream,
consume
)
} catch (err) {
log.error('could not respond to identify request', err)
}
}
/**
@ -258,17 +262,16 @@ class IdentifyService {
* @param {Connection} options.connection
*/
async _handlePush ({ connection, stream }) {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
let message
try {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
message = Message.decode(data)
} catch (err) {
return log.error('received invalid message', err)

View File

@ -210,7 +210,7 @@ class Metrics {
const _sink = stream.sink
stream.sink = source => {
pipe(
return pipe(
source,
tap(chunk => metrics._onMessage({
remotePeer,

View File

@ -17,7 +17,7 @@ const handshake = require('it-handshake')
const { NONCE_LENGTH } = require('./key-generator')
const debug = require('debug')
const log = debug('libp2p:pnet')
log.err = debug('libp2p:pnet:err')
log.error = debug('libp2p:pnet:err')
/**
* Takes a Private Shared Key (psk) and provides a `protect` method
@ -69,7 +69,7 @@ class Protector {
// Decrypt all inbound traffic
createUnboxStream(remoteNonce, this.psk),
external
)
).catch(log.error)
return internal
}

View File

@ -258,7 +258,7 @@ class Upgrader {
}
// Pipe all data through the muxer
pipe(upgradedConn, muxer, upgradedConn)
pipe(upgradedConn, muxer, upgradedConn).catch(log.error)
}
const _timeline = maConn.timeline

View File

@ -99,6 +99,33 @@ describe('libp2p.metrics', () => {
await remoteLibp2p.stop()
})
it('should record metrics on connections and streams when enabled', async () => {
const config = {
...baseOptions,
connectionManager: {
movingAverageIntervals: [10]
},
metrics: {
enabled: true,
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10]
}
}
let remoteLibp2p
;[libp2p, remoteLibp2p] = await createPeer({ number: 2, config })
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
const connection = await libp2p.dial(remoteLibp2p.peerInfo)
const { stream } = await connection.newStream('/echo/1.0.0')
const bytes = randomBytes(512)
const results = stream.sink([bytes])
expect(results).to.exist()
await remoteLibp2p.stop()
})
it('should move disconnected peers to the old peers list', async () => {
const config = {
...baseOptions,