Compare commits

...

4 Commits

3 changed files with 80 additions and 17 deletions

View File

@ -53,7 +53,6 @@
"it-pipe": "^1.1.0", "it-pipe": "^1.1.0",
"it-pushable": "^1.4.0", "it-pushable": "^1.4.0",
"libp2p-crypto": "^0.18.0", "libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0", "multiaddr": "^8.0.0",
"multibase": "^3.0.0", "multibase": "^3.0.0",
"p-defer": "^3.0.0", "p-defer": "^3.0.0",

View File

@ -209,6 +209,8 @@ class Connection {
* @return {Promise<void>} * @return {Promise<void>}
*/ */
async close () { async close () {
this.streams.map(s => s.close && s.close())
if (this.stat.status === Status.CLOSED) { if (this.stat.status === Status.CLOSED) {
return return
} }

View File

@ -2,17 +2,17 @@
/* eslint max-nested-callbacks: ["error", 8] */ /* eslint max-nested-callbacks: ["error", 8] */
'use strict' 'use strict'
const chai = require('chai')
const expect = chai.expect
chai.use(require('dirty-chai'))
const pair = require('it-pair/duplex') const pair = require('it-pair/duplex')
const pipe = require('it-pipe') const pipe = require('it-pipe')
const { consume } = require('streaming-iterables') const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const abortable = require('abortable-iterator') const abortable = require('abortable-iterator')
const AbortController = require('abort-controller') const AbortController = require('abort-controller')
const uint8arrayFromString = require('uint8arrays/from-string') const uint8arrayFromString = require('uint8arrays/from-string')
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
function pause (ms) { function pause (ms) {
return new Promise(resolve => setTimeout(resolve, ms)) return new Promise(resolve => setTimeout(resolve, ms))
} }
@ -38,33 +38,31 @@ module.exports = (common) => {
Muxer = await common.setup() Muxer = await common.setup()
}) })
it('closing underlying socket closes streams (tcp)', async () => { it('closing underlying socket closes streams', async () => {
const mockConn = muxer => ({ const mockConn = muxer => ({
newStream: (...args) => muxer.newStream(...args) newStream: (...args) => muxer.newStream(...args)
}) })
const mockUpgrade = () => maConn => { const mockUpgrade = maConn => {
const muxer = new Muxer(stream => pipe(stream, stream)) const muxer = new Muxer(stream => pipe(stream, stream))
pipe(maConn, muxer, maConn) pipe(maConn, muxer, maConn)
return mockConn(muxer) return mockConn(muxer)
} }
const mockUpgrader = () => ({ const [local, remote] = pair()
upgradeInbound: mockUpgrade(), const controller = new AbortController()
upgradeOutbound: mockUpgrade() const abortableRemote = abortable.duplex(remote, controller.signal, {
returnOnAbort: true
}) })
const tcp = new Tcp({ upgrader: mockUpgrader() }) mockUpgrade(abortableRemote)
const tcpListener = tcp.createListener() const dialerConn = mockUpgrade(local)
await tcpListener.listen(mh)
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
const s1 = await dialerConn.newStream() const s1 = await dialerConn.newStream()
const s2 = await dialerConn.newStream() const s2 = await dialerConn.newStream()
// close the listener in a bit // close the remote in a bit
setTimeout(() => tcpListener.close(), 50) setTimeout(() => controller.abort(), 50)
const s1Result = pipe(infiniteRandom, s1, consume) const s1Result = pipe(infiniteRandom, s1, consume)
const s2Result = pipe(infiniteRandom, s2, consume) const s2Result = pipe(infiniteRandom, s2, consume)
@ -115,5 +113,69 @@ module.exports = (common) => {
// These should now all resolve without error // These should now all resolve without error
await Promise.all(streamResults) await Promise.all(streamResults)
}) })
it('can close a stream for writing', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]
const listener = new Muxer(async stream => {
// Immediate close for write
await stream.closeWrite()
const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)
try {
await stream.sink([randomBuffer()])
} catch (err) {
expect(err).to.exist()
return done()
}
expect.fail('should not support writing to closed writer')
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
stream.sink(data)
})
it('can close a stream for reading', (done) => {
const p = pair()
const dialer = new Muxer()
const data = [randomBuffer(), randomBuffer()]
const listener = new Muxer(async stream => {
const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)
done()
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
stream.closeRead()
// Source should be done
;(async () => {
expect(await stream.source.next()).to.eql({ done: true })
stream.sink(data)
})()
})
}) })
} }