diff --git a/README.md b/README.md index c85fba1..2340b56 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ interface-stream-muxer ===================== -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) > A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs" @@ -13,8 +14,8 @@ The API is presented with both Node.js and Go primitives, however, there is not # Modules that implement the interface -- [Node.js spdy-stream-muxer](https://github.com/diasdavid/node-spdy-stream-muxer) - stream-muxer abstraction on top of [spdy-transport](https://github.com/indutny/spdy-transport) -- [Node.js multiplex-stream-muxer](https://github.com/diasdavid/node-multiplex-stream-muxer) - stream-muxer abstraction on top of [multiplex](https://github.com/maxogden/multiplex) +- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy) +- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex) - [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer) Send a PR to add a new one if you happen to find or write one. @@ -34,20 +35,18 @@ Install interface-stream-muxer as one of the dependencies of your project and as ``` var tape = require('tape') var tests = require('interface-stream-muxer/tests') -var YourStreamMuxer = require('../src') +var yourStreamMuxer = require('../src') var common = { setup: function (t, cb) { - cb(null, YourStreamMuxer) + cb(null, yourStreamMuxer) }, teardown: function (t, cb) { cb() } } -var megaTest = false // a really really intensive test case - -tests(tape, common, megaTest) +tests(tape, common) ``` ## Go @@ -60,8 +59,8 @@ A valid (read: that follows this abstraction) stream muxer, must implement the f ### Attach muxer to a transport -- `Node.js` conn = muxer.attach(transport, isListener) -- `Go` conn, err := muxer.Attach(transport, isListener) +- `Node.js` muxedConn = muxer(transport, isListener) +- `Go` muxedConn, err := muxer.Attach(transport, isListener) This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection). @@ -69,13 +68,13 @@ If `err` is passed, no operation should be made in `conn`. `isListener` is a bool that tells the side of the socket we are, `isListener = true` for listener/server and `isListener = false` for dialer/client side. -`conn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests. +`muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests. ### Dial(open/create) a new stream -- `Node.js` stream = conn.dialStream([function (err, stream)]) -- `Go` stream, err := conn.DialStream() +- `Node.js` stream = muxedConn.newStream([function (err, stream)]) +- `Go` stream, err := muxedConn.newStream() This method negotiates and opens a new stream with the other endpoint. @@ -87,8 +86,8 @@ In the Node.js case, if no callback is passed, stream will emit an 'ready' event ### Listen(wait/accept) a new incoming stream -- `Node.js` conn.on('stream', function (stream)) -- `Go` stream := conn.Accept() +- `Node.js` muxedConn.on('stream', function (stream) {}) +- `Go` stream := muxedConn.Accept() Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side. diff --git a/tests/.DS_Store b/tests/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/tests/.DS_Store differ diff --git a/tests/base-test.js b/tests/base-test.js index c91b2d5..a36373b 100644 --- a/tests/base-test.js +++ b/tests/base-test.js @@ -1,121 +1,105 @@ var streamPair = require('stream-pair') module.exports.all = function (test, common) { - - test('Open a stream from the dealer', function (t) { - common.setup(test, function (err, Muxer) { + test('Open a stream from the dialer', function (t) { + common.setup(test, function (err, muxer) { t.plan(4) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) - - connDialer.dialStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') + listener.on('stream', (stream) => { + t.pass('got stream') }) - connListener.on('stream', function (stream) { - t.pass('got stream') + dialer.newStream((err, stream) => { + t.ifError(err, 'Should not throw') + t.pass('dialed stream') }) }) }) - test('Open a stream from the listener', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.plan(4) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) - - connListener.dialStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') + dialer.on('stream', (stream) => { + t.pass('got stream') }) - connDialer.on('stream', function (stream) { - t.pass('got stream') + listener.newStream((err, stream) => { + t.ifError(err, 'Should not throw') + t.pass('dialed stream') }) }) }) test('Open a stream on both sides', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.plan(7) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) + dialer.on('stream', (stream) => { + t.pass('got stream') + }) - connDialer.dialStream(function (err, stream) { + listener.newStream((err, stream) => { t.ifError(err, 'Should not throw') - t.pass('dialed stream from dialer') + t.pass('dialed stream') }) - connListener.on('stream', function (stream) { - t.pass('listener got stream') + listener.on('stream', (stream) => { + t.pass('got stream') }) - connListener.dialStream(function (err, stream) { + dialer.newStream((err, stream) => { t.ifError(err, 'Should not throw') - t.pass('dialed stream from listener') - }) - - connDialer.on('stream', function (stream) { - t.pass('dialer got stream') + t.pass('dialed stream') }) }) }) test('Open a stream on one side, write, open a stream in the other side', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.plan(9) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) - - connDialer.dialStream(function (err, stream) { + dialer.newStream(function (err, stream) { t.ifError(err, 'Should not throw') t.pass('dialed stream from dialer') - stream.write('hey') }) - connListener.on('stream', function (stream) { + listener.on('stream', function (stream) { t.pass('listener got stream') stream.on('data', function (chunk) { t.equal(chunk.toString(), 'hey') }) - connListener.dialStream(function (err, stream) { + listener.newStream(function (err, stream) { t.ifError(err, 'Should not throw') t.pass('dialed stream from listener') stream.write('hello') }) - }) - connDialer.on('stream', function (stream) { + dialer.on('stream', function (stream) { t.pass('dialer got stream') stream.on('data', function (chunk) { @@ -126,18 +110,15 @@ module.exports.all = function (test, common) { }) test('Open a stream using the net.connect pattern', function (t) { - common.setup(test, function (err, Muxer) { - t.plan(3) + common.setup(test, function (err, muxer) { + t.plan(2) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) - - var stream = connListener.dialStream() + var stream = dialer.newStream() stream.on('ready', function () { t.pass('dialed stream') @@ -147,25 +128,22 @@ module.exports.all = function (test, common) { t.ifError(err, 'Should not throw') }) - connDialer.on('stream', function (stream) { + listener.on('stream', function (stream) { t.pass('got stream') }) }) }) test('Buffer writes Open a stream using the net.connect pattern', function (t) { - common.setup(test, function (err, Muxer) { - t.plan(4) + common.setup(test, function (err, muxer) { + t.plan(3) t.ifError(err, 'Should not throw') var pair = streamPair.create() - var dialer = new Muxer() - var listener = new Muxer() + var dialer = muxer(pair, false) + var listener = muxer(pair.other, true) - var connDialer = dialer.attach(pair, false) - var connListener = listener.attach(pair.other, true) - - var stream = connListener.dialStream() + var stream = dialer.newStream() stream.write('buffer this') @@ -177,7 +155,7 @@ module.exports.all = function (test, common) { t.ifError(err, 'Should not throw') }) - connDialer.on('stream', function (stream) { + listener.on('stream', function (stream) { t.pass('got stream') stream.on('data', function (chunk) { @@ -186,5 +164,4 @@ module.exports.all = function (test, common) { }) }) }) - } diff --git a/tests/index.js b/tests/index.js index 1a1d43b..dddff1c 100644 --- a/tests/index.js +++ b/tests/index.js @@ -4,7 +4,5 @@ module.exports = function (test, common, mega) { test = timed(test) require('./base-test.js').all(test, common) require('./stress-test.js').all(test, common) - if (mega) { - require('./mega-stress-test.js').all(test, common) - } + // require('./mega-stress-test.js').all(test, common) } diff --git a/tests/mega-stress-test.js b/tests/mega-stress-test.js index 94fe101..a073985 100644 --- a/tests/mega-stress-test.js +++ b/tests/mega-stress-test.js @@ -1,30 +1,25 @@ var streamPair = require('stream-pair') module.exports.all = function (test, common) { - test('10000 messages of 10000 streams', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 10000, 10000) + spawnGeneration(t, muxer, pair, pair.other, 10000, 10000) }) }) - } -function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { +function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) var msg = !size ? 'simple msg' : 'make the msg bigger' - var listenerMuxer = new Muxer() - var dialerMuxer = new Muxer() + var listener = muxer(listenerSocket, true) + var dialer = muxer(dialerSocket, false) - var listenerConn = listenerMuxer.attach(listenerSocket, true) - var dialerConn = dialerMuxer.attach(dialerSocket, false) - - listenerConn.on('stream', function (stream) { + listener.on('stream', function (stream) { t.pass('Incoming stream') stream.on('data', function (chunk) { @@ -35,11 +30,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg t.pass('Stream ended on Listener') stream.end() }) - }) for (var i = 0; i < nStreams; i++) { - dialerConn.dialStream(function (err, stream) { + dialer.newStream(function (err, stream) { t.ifError(err, 'Should not throw') t.pass('Dialed stream') @@ -58,5 +52,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg stream.end() }) } - } diff --git a/tests/stress-test.js b/tests/stress-test.js index a70c199..c96e57d 100644 --- a/tests/stress-test.js +++ b/tests/stress-test.js @@ -1,131 +1,125 @@ var streamPair = require('stream-pair') module.exports.all = function (test, common) { - test('1 stream with 1 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1, 1) + spawnGeneration(t, muxer, pair, pair.other, 1, 1) }) }) test('1 stream with 10 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1, 10) + spawnGeneration(t, muxer, pair, pair.other, 1, 10) }) }) test('1 stream with 100 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1, 100) + spawnGeneration(t, muxer, pair, pair.other, 1, 100) }) }) test('10 stream with 1 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 10, 1) + spawnGeneration(t, muxer, pair, pair.other, 10, 1) }) }) test('10 stream with 10 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 10, 10) + spawnGeneration(t, muxer, pair, pair.other, 10, 10) }) }) test('10 stream with 100 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 10, 10) + spawnGeneration(t, muxer, pair, pair.other, 10, 10) }) }) test('100 stream with 1 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 100, 1) + spawnGeneration(t, muxer, pair, pair.other, 100, 1) }) }) test('100 stream with 10 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 100, 10) + spawnGeneration(t, muxer, pair, pair.other, 100, 10) }) }) test('100 stream with 100 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 100, 10) + spawnGeneration(t, muxer, pair, pair.other, 100, 10) }) }) test('1000 stream with 1 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1000, 1) + spawnGeneration(t, muxer, pair, pair.other, 1000, 1) }) }) test('1000 stream with 10 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1000, 10) + spawnGeneration(t, muxer, pair, pair.other, 1000, 10) }) }) test('1000 stream with 100 msg', function (t) { - common.setup(test, function (err, Muxer) { + common.setup(test, function (err, muxer) { t.ifError(err, 'should not throw') var pair = streamPair.create() - spawnGeneration(t, Muxer, pair, pair.other, 1000, 100) + spawnGeneration(t, muxer, pair, pair.other, 1000, 100) }) }) - } -function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { +function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) var msg = !size ? 'simple msg' : 'make the msg bigger' - var listenerMuxer = new Muxer() - var dialerMuxer = new Muxer() + var listener = muxer(listenerSocket, true) + var dialer = muxer(dialerSocket, false) - var listenerConn = listenerMuxer.attach(listenerSocket, true) - var dialerConn = dialerMuxer.attach(dialerSocket, false) - - listenerConn.on('stream', function (stream) { + listener.on('stream', function (stream) { t.pass('Incoming stream') - stream.on('data', function (chunk) { t.pass('Received message') }) @@ -134,11 +128,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg t.pass('Stream ended on Listener') stream.end() }) - }) for (var i = 0; i < nStreams; i++) { - dialerConn.dialStream(function (err, stream) { + dialer.newStream(function (err, stream) { t.ifError(err, 'Should not throw') t.pass('Dialed stream') @@ -157,5 +150,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg stream.end() }) } - }