From 4c6be9158879161955b178d15e9898fad8592fd7 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 20 Oct 2020 12:52:17 +0200 Subject: [PATCH] fix: ensure streams are closed on connection close --- package.json | 2 +- src/upgrader.js | 12 +++++++--- test/dialing/direct.node.js | 46 +++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 6a963181..84c4bc57 100644 --- a/package.json +++ b/package.json @@ -105,7 +105,7 @@ "libp2p-gossipsub": "^0.6.0", "libp2p-kad-dht": "^0.20.0", "libp2p-mdns": "^0.15.0", - "libp2p-mplex": "^0.10.0", + "libp2p-mplex": "libp2p/js-libp2p-mplex#fix/stream-close", "libp2p-noise": "^2.0.0", "libp2p-secio": "^0.13.1", "libp2p-tcp": "^0.15.1", diff --git a/src/upgrader.js b/src/upgrader.js index b53b6475..d583a8b8 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -5,6 +5,7 @@ const log = debug('libp2p:upgrader') log.error = debug('libp2p:upgrader:error') const Multistream = require('multistream-select') const { Connection } = require('libp2p-interfaces/src/connection') +const ConnectionStatus = require('libp2p-interfaces/src/connection/status') const PeerId = require('peer-id') const pipe = require('it-pipe') const errCode = require('err-code') @@ -268,8 +269,13 @@ class Upgrader { maConn.timeline = new Proxy(_timeline, { set: (...args) => { if (connection && args[1] === 'close' && args[2] && !_timeline.close) { - connection.stat.status = 'closed' - this.onConnectionEnd(connection) + // Wait for close to finish before notifying of the closure + (async () => { + if (connection.stat.status === ConnectionStatus.OPEN) { + await connection.close() + } + this.onConnectionEnd(connection) + })() } return Reflect.set(...args) @@ -295,7 +301,7 @@ class Upgrader { }, newStream: newStream || errConnectionNotMultiplexed, getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed, - close: err => maConn.close(err) + close: (err) => maConn.close(err) }) this.onConnection(connection) diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 401f4eb5..752f462e 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -11,7 +11,9 @@ const PeerId = require('peer-id') const delay = require('delay') const pDefer = require('p-defer') const pSettle = require('p-settle') +const pWaitFor = require('p-wait-for') const pipe = require('it-pipe') +const pushable = require('it-pushable') const AggregateError = require('aggregate-error') const { Connection } = require('libp2p-interfaces/src/connection') const { AbortError } = require('libp2p-interfaces/src/transport/errors') @@ -299,6 +301,50 @@ describe('Dialing (direct, TCP)', () => { expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) }) + it('should close all streams when the connection closes', async () => { + libp2p = new Libp2p({ + peerId, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + + // register some stream handlers to simulate several protocols + libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream)) + libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream)) + remoteLibp2p.handle('/stream-count/3', ({ stream }) => pipe(stream, stream)) + remoteLibp2p.handle('/stream-count/4', ({ stream }) => pipe(stream, stream)) + + libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) + const connection = await libp2p.dial(remotePeerId) + + // Create local to remote streams + const { stream } = await connection.newStream('/echo/1.0.0') + await connection.newStream('/stream-count/3') + await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4') + + // Partially write to the echo stream + const source = pushable() + stream.sink(source) + source.push('hello') + + // Create remote to local streams + await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1') + await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2') + + // Verify stream count + const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId) + expect(connection.streams).to.have.length(5) + expect(remoteConn.streams).to.have.length(5) + + // Close the connection and verify all streams have been closed + await connection.close() + await pWaitFor(() => connection.streams.length === 0) + await pWaitFor(() => remoteConn.streams.length === 0) + }) + it('should be able to use hangup to close connections', async () => { libp2p = new Libp2p({ peerId,