diff --git a/src/identify/index.js b/src/identify/index.js index 27694a0d..d8643611 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -4,7 +4,7 @@ const debug = require('debug') const pb = require('it-protocol-buffers') const lp = require('it-length-prefixed') const pipe = require('it-pipe') -const { collect, take } = require('streaming-iterables') +const { collect, take, consume } = require('streaming-iterables') const PeerInfo = require('peer-info') const PeerId = require('peer-id') @@ -114,7 +114,8 @@ class IdentifyService { protocols: Array.from(this._protocols.keys()) }], pb.encode(Message), - stream + stream, + consume ) } catch (err) { // Just log errors @@ -153,6 +154,7 @@ class IdentifyService { async identify (connection) { const { stream } = await connection.newStream(MULTICODEC_IDENTIFY) const [data] = await pipe( + [], stream, lp.decode(), take(1), @@ -242,7 +244,8 @@ class IdentifyService { pipe( [message], lp.encode(), - stream + stream, + consume ) } @@ -255,6 +258,7 @@ class IdentifyService { */ async _handlePush ({ connection, stream }) { const [data] = await pipe( + [], stream, lp.decode(), take(1), diff --git a/src/upgrader.js b/src/upgrader.js index b5699288..25cb6d3f 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -231,7 +231,7 @@ class Upgrader { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) - connection.addStream(stream, protocol) + connection.addStream(muxedStream, { protocol }) this._onStream({ connection, stream, protocol }) } catch (err) { log.error(err) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 29e4ce40..ca32e023 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -12,6 +12,7 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const duplexPair = require('it-pair/duplex') const multiaddr = require('multiaddr') +const pWaitFor = require('p-wait-for') const { codes: Errors } = require('../../src/errors') const { IdentifyService, multicodecs } = require('../../src/identify') @@ -203,16 +204,17 @@ describe('Identify', () => { }) sinon.spy(libp2p.identifyService, 'identify') - sinon.spy(libp2p.peerStore, 'replace') + const peerStoreSpy = sinon.spy(libp2p.peerStore, 'replace') const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() - // Wait for nextTick to trigger the identify call - await delay(1) - expect(libp2p.identifyService.identify.callCount).to.equal(1) - await libp2p.identifyService.identify.firstCall.returnValue - expect(libp2p.peerStore.replace.callCount).to.equal(1) + // Wait for peer store to be updated + await pWaitFor(() => peerStoreSpy.callCount === 1) + expect(libp2p.identifyService.identify.callCount).to.equal(1) + + // The connection should have no open streams + expect(connection.streams).to.have.length(0) await connection.close() }) @@ -247,6 +249,9 @@ describe('Identify', () => { const results = await call.returnValue expect(results.length).to.equal(1) } + + // Verify the streams close + await pWaitFor(() => connection.streams.length === 0) }) }) })