mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-04-24 22:52:35 +00:00
feat(tests): add closing tests, make sure errors are propagated
This commit is contained in:
parent
5069679163
commit
c06da3b925
@ -2,7 +2,7 @@
|
||||
"name": "interface-stream-muxer",
|
||||
"version": "0.3.1",
|
||||
"description": "A test suite and interface you can use to implement a stream muxer.",
|
||||
"main": "lib/index.js",
|
||||
"main": "src/index.js",
|
||||
"jsnext:main": "src/index.js",
|
||||
"scripts": {
|
||||
"test": "exit(0)",
|
||||
@ -34,9 +34,13 @@
|
||||
"async": "^2.0.1",
|
||||
"chai": "^3.5.0",
|
||||
"chai-checkmark": "^1.0.1",
|
||||
"libp2p-tcp": "^0.8.1",
|
||||
"multiaddr": "^2.0.2",
|
||||
"pull-generate": "^2.2.0",
|
||||
"pull-pair": "^1.1.0",
|
||||
"pull-stream": "^3.4.3"
|
||||
"pull-stream": "^3.4.3",
|
||||
"run-parallel": "^1.1.6",
|
||||
"run-series": "^1.1.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"aegir": "^6.0.1"
|
||||
|
161
src/close-test.js
Normal file
161
src/close-test.js
Normal file
@ -0,0 +1,161 @@
|
||||
/* eslint-env mocha */
|
||||
'use strict'
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('chai-checkmark'))
|
||||
const expect = chai.expect
|
||||
const pair = require('pull-pair/duplex')
|
||||
const pull = require('pull-stream')
|
||||
const parallel = require('run-parallel')
|
||||
const series = require('run-series')
|
||||
const Tcp = require('libp2p-tcp')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090')
|
||||
|
||||
function closeAndWait (stream) {
|
||||
pull(
|
||||
pull.empty(),
|
||||
stream,
|
||||
pull.onEnd((err) => {
|
||||
expect(err).to.not.exist.mark()
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
module.exports = (common) => {
|
||||
describe.only('close', () => {
|
||||
let muxer
|
||||
|
||||
beforeEach((done) => {
|
||||
common.setup((err, _muxer) => {
|
||||
if (err) return done(err)
|
||||
muxer = _muxer
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('closing underlying closes streams (tcp)', (done) => {
|
||||
expect(2).checks(done)
|
||||
|
||||
const tcp = new Tcp()
|
||||
const tcpListener = tcp.createListener((socket) => {
|
||||
const listener = muxer.listen(socket)
|
||||
listener.on('stream', (stream) => {
|
||||
pull(stream, stream)
|
||||
})
|
||||
})
|
||||
|
||||
tcpListener.listen(mh, () => {
|
||||
const dialer = muxer.dial(tcp.dial(mh, () => {
|
||||
tcpListener.close()
|
||||
}))
|
||||
|
||||
const s1 = dialer.newStream(() => {
|
||||
pull(
|
||||
s1,
|
||||
pull.onEnd((err) => {
|
||||
expect(err).to.exist.mark()
|
||||
})
|
||||
)
|
||||
|
||||
const s2 = dialer.newStream(() => {
|
||||
pull(
|
||||
s2,
|
||||
pull.onEnd((err) => {
|
||||
expect(err).to.exist.mark()
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('closing one of the muxed streams doesn\'t close others', (done) => {
|
||||
const p = pair()
|
||||
const dialer = muxer.dial(p[0])
|
||||
const listener = muxer.listen(p[1])
|
||||
|
||||
expect(6).checks(done)
|
||||
|
||||
const conns = []
|
||||
|
||||
listener.on('stream', (stream) => {
|
||||
expect(stream).to.exist.mark()
|
||||
pull(stream, stream)
|
||||
})
|
||||
|
||||
for (let i = 0; i < 5; i++) {
|
||||
conns.push(dialer.newStream())
|
||||
}
|
||||
|
||||
conns.forEach((conn, i) => {
|
||||
if (i === 2) {
|
||||
closeAndWait(conn)
|
||||
} else {
|
||||
pull(
|
||||
conn,
|
||||
pull.onEnd(() => {
|
||||
throw new Error('should not end')
|
||||
})
|
||||
)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => {
|
||||
const p = pair()
|
||||
const dialer = muxer.dial(p[0])
|
||||
const listener = muxer.listen(p[1])
|
||||
|
||||
expect(15).checks(done)
|
||||
|
||||
const conns = []
|
||||
const count = []
|
||||
for (let i = 0; i < 5; i++) {
|
||||
count.push(i)
|
||||
}
|
||||
|
||||
series(count.map((i) => (cb) => {
|
||||
parallel([
|
||||
(cb) => listener.once('stream', (stream) => {
|
||||
console.log('pipe')
|
||||
expect(stream).to.exist.mark()
|
||||
pull(stream, stream)
|
||||
cb()
|
||||
}),
|
||||
(cb) => conns.push(dialer.newStream(cb))
|
||||
], cb)
|
||||
}), (err) => {
|
||||
if (err) return done(err)
|
||||
|
||||
conns.forEach((conn, i) => {
|
||||
pull(
|
||||
pull.values([Buffer('hello')]),
|
||||
pull.asyncMap((val, cb) => {
|
||||
setTimeout(() => {
|
||||
cb(null, val)
|
||||
}, i * 10)
|
||||
}),
|
||||
pull.through((val) => console.log('send', val)),
|
||||
conn,
|
||||
pull.through((val) => console.log('recv', val)),
|
||||
pull.collect((err, data) => {
|
||||
console.log('end', i)
|
||||
expect(err).to.not.exist.mark()
|
||||
expect(data).to.be.eql([Buffer('hello')]).mark()
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
listener.on('close', () => {
|
||||
console.log('closed listener')
|
||||
})
|
||||
|
||||
dialer.end(() => {
|
||||
console.log('CLOSED')
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
@ -2,12 +2,14 @@
|
||||
'use strict'
|
||||
|
||||
const baseTest = require('./base-test')
|
||||
const closeTest = require('./close-test')
|
||||
const stressTest = require('./stress-test')
|
||||
const megaStressTest = require('./mega-stress-test')
|
||||
|
||||
module.exports = (common) => {
|
||||
describe('interface-stream-muxer', () => {
|
||||
baseTest(common)
|
||||
closeTest(common)
|
||||
stressTest(common)
|
||||
megaStressTest(common)
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user