mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-03 04:31:19 +00:00
fix: ensure streams are closed on connection close
This commit is contained in:
parent
5f50054d94
commit
4c6be91588
@ -105,7 +105,7 @@
|
|||||||
"libp2p-gossipsub": "^0.6.0",
|
"libp2p-gossipsub": "^0.6.0",
|
||||||
"libp2p-kad-dht": "^0.20.0",
|
"libp2p-kad-dht": "^0.20.0",
|
||||||
"libp2p-mdns": "^0.15.0",
|
"libp2p-mdns": "^0.15.0",
|
||||||
"libp2p-mplex": "^0.10.0",
|
"libp2p-mplex": "libp2p/js-libp2p-mplex#fix/stream-close",
|
||||||
"libp2p-noise": "^2.0.0",
|
"libp2p-noise": "^2.0.0",
|
||||||
"libp2p-secio": "^0.13.1",
|
"libp2p-secio": "^0.13.1",
|
||||||
"libp2p-tcp": "^0.15.1",
|
"libp2p-tcp": "^0.15.1",
|
||||||
|
@ -5,6 +5,7 @@ const log = debug('libp2p:upgrader')
|
|||||||
log.error = debug('libp2p:upgrader:error')
|
log.error = debug('libp2p:upgrader:error')
|
||||||
const Multistream = require('multistream-select')
|
const Multistream = require('multistream-select')
|
||||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
|
const ConnectionStatus = require('libp2p-interfaces/src/connection/status')
|
||||||
const PeerId = require('peer-id')
|
const PeerId = require('peer-id')
|
||||||
const pipe = require('it-pipe')
|
const pipe = require('it-pipe')
|
||||||
const errCode = require('err-code')
|
const errCode = require('err-code')
|
||||||
@ -268,8 +269,13 @@ class Upgrader {
|
|||||||
maConn.timeline = new Proxy(_timeline, {
|
maConn.timeline = new Proxy(_timeline, {
|
||||||
set: (...args) => {
|
set: (...args) => {
|
||||||
if (connection && args[1] === 'close' && args[2] && !_timeline.close) {
|
if (connection && args[1] === 'close' && args[2] && !_timeline.close) {
|
||||||
connection.stat.status = 'closed'
|
// Wait for close to finish before notifying of the closure
|
||||||
this.onConnectionEnd(connection)
|
(async () => {
|
||||||
|
if (connection.stat.status === ConnectionStatus.OPEN) {
|
||||||
|
await connection.close()
|
||||||
|
}
|
||||||
|
this.onConnectionEnd(connection)
|
||||||
|
})()
|
||||||
}
|
}
|
||||||
|
|
||||||
return Reflect.set(...args)
|
return Reflect.set(...args)
|
||||||
@ -295,7 +301,7 @@ class Upgrader {
|
|||||||
},
|
},
|
||||||
newStream: newStream || errConnectionNotMultiplexed,
|
newStream: newStream || errConnectionNotMultiplexed,
|
||||||
getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed,
|
getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed,
|
||||||
close: err => maConn.close(err)
|
close: (err) => maConn.close(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
this.onConnection(connection)
|
this.onConnection(connection)
|
||||||
|
@ -11,7 +11,9 @@ const PeerId = require('peer-id')
|
|||||||
const delay = require('delay')
|
const delay = require('delay')
|
||||||
const pDefer = require('p-defer')
|
const pDefer = require('p-defer')
|
||||||
const pSettle = require('p-settle')
|
const pSettle = require('p-settle')
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
const pipe = require('it-pipe')
|
const pipe = require('it-pipe')
|
||||||
|
const pushable = require('it-pushable')
|
||||||
const AggregateError = require('aggregate-error')
|
const AggregateError = require('aggregate-error')
|
||||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
const { AbortError } = require('libp2p-interfaces/src/transport/errors')
|
||||||
@ -299,6 +301,50 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1)
|
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should close all streams when the connection closes', async () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerId,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [Crypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// register some stream handlers to simulate several protocols
|
||||||
|
libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream))
|
||||||
|
libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream))
|
||||||
|
remoteLibp2p.handle('/stream-count/3', ({ stream }) => pipe(stream, stream))
|
||||||
|
remoteLibp2p.handle('/stream-count/4', ({ stream }) => pipe(stream, stream))
|
||||||
|
|
||||||
|
libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
|
||||||
|
const connection = await libp2p.dial(remotePeerId)
|
||||||
|
|
||||||
|
// Create local to remote streams
|
||||||
|
const { stream } = await connection.newStream('/echo/1.0.0')
|
||||||
|
await connection.newStream('/stream-count/3')
|
||||||
|
await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4')
|
||||||
|
|
||||||
|
// Partially write to the echo stream
|
||||||
|
const source = pushable()
|
||||||
|
stream.sink(source)
|
||||||
|
source.push('hello')
|
||||||
|
|
||||||
|
// Create remote to local streams
|
||||||
|
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1')
|
||||||
|
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2')
|
||||||
|
|
||||||
|
// Verify stream count
|
||||||
|
const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId)
|
||||||
|
expect(connection.streams).to.have.length(5)
|
||||||
|
expect(remoteConn.streams).to.have.length(5)
|
||||||
|
|
||||||
|
// Close the connection and verify all streams have been closed
|
||||||
|
await connection.close()
|
||||||
|
await pWaitFor(() => connection.streams.length === 0)
|
||||||
|
await pWaitFor(() => remoteConn.streams.length === 0)
|
||||||
|
})
|
||||||
|
|
||||||
it('should be able to use hangup to close connections', async () => {
|
it('should be able to use hangup to close connections', async () => {
|
||||||
libp2p = new Libp2p({
|
libp2p = new Libp2p({
|
||||||
peerId,
|
peerId,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user