refactor: move stream-muxer

This commit is contained in:
Jacob Heun
2019-10-18 13:48:14 +02:00
parent 07d71de456
commit 827197585e
17 changed files with 0 additions and 0 deletions

View File

@@ -1,153 +0,0 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
chai.use(require('chai-checkmark'))
const { expect } = chai
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { collect, map, consume } = require('streaming-iterables')
function close (stream) {
return pipe([], stream, consume)
}
async function closeAndWait (stream) {
await close(stream)
expect(true).to.be.true.mark()
}
/**
* A tick is considered valid if it happened between now
* and `ms` milliseconds ago
* @param {number} date Time in ticks
* @param {number} ms max milliseconds that should have expired
* @returns {boolean}
*/
function isValidTick (date, ms = 5000) {
const now = Date.now()
if (date > now - ms && date <= now) return true
return false
}
module.exports = (common) => {
describe('base', () => {
let Muxer
beforeEach(async () => {
Muxer = await common.setup()
})
it('Open a stream from the dialer', (done) => {
const p = pair()
const dialer = new Muxer()
const listener = new Muxer({
onStream: stream => {
expect(stream).to.exist.mark() // 1st check
expect(isValidTick(stream.timeline.open)).to.equal(true)
// Make sure the stream is being tracked
expect(listener.streams).to.include(stream)
close(stream)
},
onStreamEnd: stream => {
expect(stream).to.exist.mark() // 2nd check
expect(listener.streams).to.not.include(stream)
// Make sure the stream is removed from tracking
expect(isValidTick(stream.timeline.close)).to.equal(true)
}
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(3).checks(() => {
// ensure we have no streams left
expect(dialer.streams).to.have.length(0)
expect(listener.streams).to.have.length(0)
done()
})
const conn = dialer.newStream()
expect(dialer.streams).to.include(conn)
expect(isValidTick(conn.timeline.open)).to.equal(true)
closeAndWait(conn) // 3rd check
})
it('Open a stream from the listener', (done) => {
const p = pair()
const dialer = new Muxer(stream => {
expect(stream).to.exist.mark()
expect(isValidTick(stream.timeline.open)).to.equal(true)
closeAndWait(stream)
})
const listener = new Muxer()
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(3).check(done)
const conn = listener.newStream()
expect(listener.streams).to.include(conn)
expect(isValidTick(conn.timeline.open)).to.equal(true)
closeAndWait(conn)
})
it('Open a stream on both sides', (done) => {
const p = pair()
const dialer = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const listener = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(6).check(done)
const listenerConn = listener.newStream()
const dialerConn = dialer.newStream()
closeAndWait(dialerConn)
closeAndWait(listenerConn)
})
it('Open a stream on one side, write, open a stream on the other side', (done) => {
const toString = map(c => c.slice().toString())
const p = pair()
const dialer = new Muxer()
const listener = new Muxer(stream => {
pipe(stream, toString, collect).then(chunks => {
expect(chunks).to.be.eql(['hey']).mark()
})
dialer.onStream = onDialerStream
const listenerConn = listener.newStream()
pipe(['hello'], listenerConn)
async function onDialerStream (stream) {
const chunks = await pipe(stream, toString, collect)
expect(chunks).to.be.eql(['hello']).mark()
}
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(2).check(done)
const dialerConn = dialer.newStream()
pipe(['hey'], dialerConn)
})
})
}

View File

@@ -1,118 +0,0 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
function pause (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
function randomBuffer () {
return Buffer.from(Math.random().toString())
}
const infiniteRandom = {
[Symbol.asyncIterator]: async function * () {
while (true) {
yield randomBuffer()
await pause(10)
}
}
}
module.exports = (common) => {
describe('close', () => {
let Muxer
beforeEach(async () => {
Muxer = await common.setup()
})
it('closing underlying socket closes streams (tcp)', async () => {
const mockConn = muxer => ({
newStream: (...args) => muxer.newStream(...args)
})
const mockUpgrade = () => maConn => {
const muxer = new Muxer(stream => pipe(stream, stream))
pipe(maConn, muxer, maConn)
return mockConn(muxer)
}
const mockUpgrader = () => ({
upgradeInbound: mockUpgrade(),
upgradeOutbound: mockUpgrade()
})
const tcp = new Tcp({ upgrader: mockUpgrader() })
const tcpListener = tcp.createListener()
await tcpListener.listen(mh)
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
const s1 = await dialerConn.newStream()
const s2 = await dialerConn.newStream()
// close the listener in a bit
setTimeout(() => tcpListener.close(), 50)
const s1Result = pipe(infiniteRandom, s1, consume)
const s2Result = pipe(infiniteRandom, s2, consume)
// test is complete when all muxed streams have closed
await s1Result
await s2Result
})
it('closing one of the muxed streams doesn\'t close others', async () => {
const p = pair()
const dialer = new Muxer()
// Listener is echo server :)
const listener = new Muxer(stream => pipe(stream, stream))
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
const streams = Array.from(Array(5), () => dialer.newStream())
let closed = false
const controllers = []
const streamResults = streams.map(async stream => {
const controller = new AbortController()
controllers.push(controller)
try {
const abortableRand = abortable(infiniteRandom, controller.signal, { abortCode: 'ERR_TEST_ABORT' })
await pipe(abortableRand, stream, consume)
} catch (err) {
if (err.code !== 'ERR_TEST_ABORT') throw err
}
if (!closed) throw new Error('stream should not have ended yet!')
})
// Pause, and then send some data and close the first stream
await pause(50)
await pipe([randomBuffer()], stream, consume)
closed = true
// Abort all the other streams later
await pause(50)
controllers.forEach(c => c.abort())
// These should now all resolve without error
await Promise.all(streamResults)
})
})
}

View File

@@ -1,19 +0,0 @@
/* eslint-env mocha */
'use strict'
const baseTest = require('./base-test')
const stressTest = require('./stress-test')
const megaStressTest = require('./mega-stress-test')
const isNode = require('detect-node')
module.exports = (common) => {
describe('interface-stream-muxer', () => {
baseTest(common)
if (isNode) {
const closeTest = require('./close-test')
closeTest(common)
}
stressTest(common)
megaStressTest(common)
})
}

View File

@@ -1,17 +0,0 @@
/* eslint-env mocha */
'use strict'
const spawn = require('./spawner')
module.exports = (common) => {
describe.skip('mega stress test', function () {
this.timeout(100 * 200 * 1000)
let Muxer
beforeEach(async () => {
Muxer = await common.setup()
})
it('10,000 streams with 10,000 msg', () => spawn(Muxer, 10000, 10000, 5000))
})
}

View File

@@ -1,82 +0,0 @@
'use strict'
const { expect } = require('chai')
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const pLimit = require('p-limit')
const { collect, tap, consume } = require('streaming-iterables')
module.exports = async (Muxer, nStreams, nMsg, limit) => {
const [dialerSocket, listenerSocket] = pair()
const { check, done } = marker((4 * nStreams) + (nStreams * nMsg))
const msg = 'simple msg'
const listener = new Muxer(async stream => {
expect(stream).to.exist // eslint-disable-line
check()
await pipe(
stream,
tap(chunk => check()),
consume
)
check()
pipe([], stream)
})
const dialer = new Muxer()
pipe(listenerSocket, listener, listenerSocket)
pipe(dialerSocket, dialer, dialerSocket)
const spawnStream = async n => {
const stream = dialer.newStream()
expect(stream).to.exist // eslint-disable-line
check()
const res = await pipe(
(function * () {
for (let i = 0; i < nMsg; i++) {
// console.log('n', n, 'msg', i)
yield new Promise(resolve => resolve(msg))
}
})(),
stream,
collect
)
expect(res).to.be.eql([])
check()
}
const limiter = pLimit(limit || Infinity)
await Promise.all(
Array.from(Array(nStreams), (_, i) => limiter(() => spawnStream(i)))
)
return done
}
function marker (n) {
let check
let i = 0
const done = new Promise((resolve, reject) => {
check = err => {
i++
if (err) {
/* eslint-disable-next-line */
console.error('Failed after %s iterations', i)
return reject(err)
}
if (i === n) {
resolve()
}
}
})
return { check, done }
}

View File

@@ -1,30 +0,0 @@
/* eslint-env mocha */
'use strict'
const spawn = require('./spawner')
module.exports = (common) => {
describe('stress test', () => {
let Muxer
beforeEach(async () => {
Muxer = await common.setup()
})
it('1 stream with 1 msg', () => spawn(Muxer, 1, 1))
it('1 stream with 10 msg', () => spawn(Muxer, 1, 10))
it('1 stream with 100 msg', () => spawn(Muxer, 1, 100))
it('10 streams with 1 msg', () => spawn(Muxer, 10, 1))
it('10 streams with 10 msg', () => spawn(Muxer, 10, 10))
it('10 streams with 100 msg', () => spawn(Muxer, 10, 100))
it('100 streams with 1 msg', () => spawn(Muxer, 100, 1))
it('100 streams with 10 msg', () => spawn(Muxer, 100, 10))
it('100 streams with 100 msg', () => spawn(Muxer, 100, 100))
it('1000 streams with 1 msg', () => spawn(Muxer, 1000, 1))
it('1000 streams with 10 msg', () => spawn(Muxer, 1000, 10))
it('1000 streams with 100 msg', function () {
this.timeout(30 * 1000)
return spawn(Muxer, 1000, 100)
})
})
}