fix: ensure identify streams are closed (#551)

* fix: ensure identify streams are closed

* fix: call connection.addStream properly

* chore: simplify stream closure

* test: improve durability of identify push test
This commit is contained in:
Jacob Heun 2020-02-05 17:35:27 +01:00 committed by GitHub
parent 5608178247
commit f662fdcf36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 10 deletions

View File

@ -4,7 +4,7 @@ const debug = require('debug')
const pb = require('it-protocol-buffers')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')
const { collect, take } = require('streaming-iterables')
const { collect, take, consume } = require('streaming-iterables')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
@ -114,7 +114,8 @@ class IdentifyService {
protocols: Array.from(this._protocols.keys())
}],
pb.encode(Message),
stream
stream,
consume
)
} catch (err) {
// Just log errors
@ -153,6 +154,7 @@ class IdentifyService {
async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),
@ -242,7 +244,8 @@ class IdentifyService {
pipe(
[message],
lp.encode(),
stream
stream,
consume
)
}
@ -255,6 +258,7 @@ class IdentifyService {
*/
async _handlePush ({ connection, stream }) {
const [data] = await pipe(
[],
stream,
lp.decode(),
take(1),

View File

@ -231,7 +231,7 @@ class Upgrader {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
connection.addStream(stream, protocol)
connection.addStream(muxedStream, { protocol })
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)

View File

@ -12,6 +12,7 @@ const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const duplexPair = require('it-pair/duplex')
const multiaddr = require('multiaddr')
const pWaitFor = require('p-wait-for')
const { codes: Errors } = require('../../src/errors')
const { IdentifyService, multicodecs } = require('../../src/identify')
@ -203,16 +204,17 @@ describe('Identify', () => {
})
sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'replace')
const peerStoreSpy = sinon.spy(libp2p.peerStore, 'replace')
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)
expect(libp2p.identifyService.identify.callCount).to.equal(1)
await libp2p.identifyService.identify.firstCall.returnValue
expect(libp2p.peerStore.replace.callCount).to.equal(1)
// Wait for peer store to be updated
await pWaitFor(() => peerStoreSpy.callCount === 1)
expect(libp2p.identifyService.identify.callCount).to.equal(1)
// The connection should have no open streams
expect(connection.streams).to.have.length(0)
await connection.close()
})
@ -247,6 +249,9 @@ describe('Identify', () => {
const results = await call.returnValue
expect(results.length).to.equal(1)
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
})
})