Merge pull request #5 from diasdavid/0.3.0

0.3.0
This commit is contained in:
David Dias 2016-03-06 22:44:01 +00:00
commit d394b5432e
6 changed files with 99 additions and 140 deletions

View File

@ -1,7 +1,8 @@
interface-stream-muxer interface-stream-muxer
===================== =====================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
> A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs" > A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs"
@ -13,8 +14,8 @@ The API is presented with both Node.js and Go primitives, however, there is not
# Modules that implement the interface # Modules that implement the interface
- [Node.js spdy-stream-muxer](https://github.com/diasdavid/node-spdy-stream-muxer) - stream-muxer abstraction on top of [spdy-transport](https://github.com/indutny/spdy-transport) - [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy)
- [Node.js multiplex-stream-muxer](https://github.com/diasdavid/node-multiplex-stream-muxer) - stream-muxer abstraction on top of [multiplex](https://github.com/maxogden/multiplex) - [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex)
- [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer) - [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer)
Send a PR to add a new one if you happen to find or write one. Send a PR to add a new one if you happen to find or write one.
@ -34,20 +35,18 @@ Install interface-stream-muxer as one of the dependencies of your project and as
``` ```
var tape = require('tape') var tape = require('tape')
var tests = require('interface-stream-muxer/tests') var tests = require('interface-stream-muxer/tests')
var YourStreamMuxer = require('../src') var yourStreamMuxer = require('../src')
var common = { var common = {
setup: function (t, cb) { setup: function (t, cb) {
cb(null, YourStreamMuxer) cb(null, yourStreamMuxer)
}, },
teardown: function (t, cb) { teardown: function (t, cb) {
cb() cb()
} }
} }
var megaTest = false // a really really intensive test case tests(tape, common)
tests(tape, common, megaTest)
``` ```
## Go ## Go
@ -60,8 +59,8 @@ A valid (read: that follows this abstraction) stream muxer, must implement the f
### Attach muxer to a transport ### Attach muxer to a transport
- `Node.js` conn = muxer.attach(transport, isListener) - `Node.js` muxedConn = muxer(transport, isListener)
- `Go` conn, err := muxer.Attach(transport, isListener) - `Go` muxedConn, err := muxer.Attach(transport, isListener)
This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection). This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection).
@ -69,13 +68,13 @@ If `err` is passed, no operation should be made in `conn`.
`isListener` is a bool that tells the side of the socket we are, `isListener = true` for listener/server and `isListener = false` for dialer/client side. `isListener` is a bool that tells the side of the socket we are, `isListener = true` for listener/server and `isListener = false` for dialer/client side.
`conn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests. `muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.
### Dial(open/create) a new stream ### Dial(open/create) a new stream
- `Node.js` stream = conn.dialStream([function (err, stream)]) - `Node.js` stream = muxedConn.newStream([function (err, stream)])
- `Go` stream, err := conn.DialStream() - `Go` stream, err := muxedConn.newStream()
This method negotiates and opens a new stream with the other endpoint. This method negotiates and opens a new stream with the other endpoint.
@ -87,8 +86,8 @@ In the Node.js case, if no callback is passed, stream will emit an 'ready' event
### Listen(wait/accept) a new incoming stream ### Listen(wait/accept) a new incoming stream
- `Node.js` conn.on('stream', function (stream)) - `Node.js` muxedConn.on('stream', function (stream) {})
- `Go` stream := conn.Accept() - `Go` stream := muxedConn.Accept()
Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side. Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.

BIN
tests/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1,121 +1,105 @@
var streamPair = require('stream-pair') var streamPair = require('stream-pair')
module.exports.all = function (test, common) { module.exports.all = function (test, common) {
test('Open a stream from the dialer', function (t) {
test('Open a stream from the dealer', function (t) { common.setup(test, function (err, muxer) {
common.setup(test, function (err, Muxer) {
t.plan(4) t.plan(4)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) listener.on('stream', (stream) => {
var connListener = listener.attach(pair.other, true) t.pass('got stream')
connDialer.dialStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
}) })
connListener.on('stream', function (stream) { dialer.newStream((err, stream) => {
t.pass('got stream') t.ifError(err, 'Should not throw')
t.pass('dialed stream')
}) })
}) })
}) })
test('Open a stream from the listener', function (t) { test('Open a stream from the listener', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.plan(4) t.plan(4)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) dialer.on('stream', (stream) => {
var connListener = listener.attach(pair.other, true) t.pass('got stream')
connListener.dialStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
}) })
connDialer.on('stream', function (stream) { listener.newStream((err, stream) => {
t.pass('got stream') t.ifError(err, 'Should not throw')
t.pass('dialed stream')
}) })
}) })
}) })
test('Open a stream on both sides', function (t) { test('Open a stream on both sides', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.plan(7) t.plan(7)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) dialer.on('stream', (stream) => {
var connListener = listener.attach(pair.other, true) t.pass('got stream')
})
connDialer.dialStream(function (err, stream) { listener.newStream((err, stream) => {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('dialed stream from dialer') t.pass('dialed stream')
}) })
connListener.on('stream', function (stream) { listener.on('stream', (stream) => {
t.pass('listener got stream') t.pass('got stream')
}) })
connListener.dialStream(function (err, stream) { dialer.newStream((err, stream) => {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('dialed stream from listener') t.pass('dialed stream')
})
connDialer.on('stream', function (stream) {
t.pass('dialer got stream')
}) })
}) })
}) })
test('Open a stream on one side, write, open a stream in the other side', function (t) { test('Open a stream on one side, write, open a stream in the other side', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.plan(9) t.plan(9)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) dialer.newStream(function (err, stream) {
var connListener = listener.attach(pair.other, true)
connDialer.dialStream(function (err, stream) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('dialed stream from dialer') t.pass('dialed stream from dialer')
stream.write('hey') stream.write('hey')
}) })
connListener.on('stream', function (stream) { listener.on('stream', function (stream) {
t.pass('listener got stream') t.pass('listener got stream')
stream.on('data', function (chunk) { stream.on('data', function (chunk) {
t.equal(chunk.toString(), 'hey') t.equal(chunk.toString(), 'hey')
}) })
connListener.dialStream(function (err, stream) { listener.newStream(function (err, stream) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('dialed stream from listener') t.pass('dialed stream from listener')
stream.write('hello') stream.write('hello')
}) })
}) })
connDialer.on('stream', function (stream) { dialer.on('stream', function (stream) {
t.pass('dialer got stream') t.pass('dialer got stream')
stream.on('data', function (chunk) { stream.on('data', function (chunk) {
@ -126,18 +110,15 @@ module.exports.all = function (test, common) {
}) })
test('Open a stream using the net.connect pattern', function (t) { test('Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.plan(3) t.plan(2)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) var stream = dialer.newStream()
var connListener = listener.attach(pair.other, true)
var stream = connListener.dialStream()
stream.on('ready', function () { stream.on('ready', function () {
t.pass('dialed stream') t.pass('dialed stream')
@ -147,25 +128,22 @@ module.exports.all = function (test, common) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
}) })
connDialer.on('stream', function (stream) { listener.on('stream', function (stream) {
t.pass('got stream') t.pass('got stream')
}) })
}) })
}) })
test('Buffer writes Open a stream using the net.connect pattern', function (t) { test('Buffer writes Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.plan(4) t.plan(3)
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
var pair = streamPair.create() var pair = streamPair.create()
var dialer = new Muxer() var dialer = muxer(pair, false)
var listener = new Muxer() var listener = muxer(pair.other, true)
var connDialer = dialer.attach(pair, false) var stream = dialer.newStream()
var connListener = listener.attach(pair.other, true)
var stream = connListener.dialStream()
stream.write('buffer this') stream.write('buffer this')
@ -177,7 +155,7 @@ module.exports.all = function (test, common) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
}) })
connDialer.on('stream', function (stream) { listener.on('stream', function (stream) {
t.pass('got stream') t.pass('got stream')
stream.on('data', function (chunk) { stream.on('data', function (chunk) {
@ -186,5 +164,4 @@ module.exports.all = function (test, common) {
}) })
}) })
}) })
} }

View File

@ -4,7 +4,5 @@ module.exports = function (test, common, mega) {
test = timed(test) test = timed(test)
require('./base-test.js').all(test, common) require('./base-test.js').all(test, common)
require('./stress-test.js').all(test, common) require('./stress-test.js').all(test, common)
if (mega) { // require('./mega-stress-test.js').all(test, common)
require('./mega-stress-test.js').all(test, common)
}
} }

View File

@ -1,30 +1,25 @@
var streamPair = require('stream-pair') var streamPair = require('stream-pair')
module.exports.all = function (test, common) { module.exports.all = function (test, common) {
test('10000 messages of 10000 streams', function (t) { test('10000 messages of 10000 streams', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 10000, 10000) spawnGeneration(t, muxer, pair, pair.other, 10000, 10000)
}) })
}) })
} }
function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
var msg = !size ? 'simple msg' : 'make the msg bigger' var msg = !size ? 'simple msg' : 'make the msg bigger'
var listenerMuxer = new Muxer() var listener = muxer(listenerSocket, true)
var dialerMuxer = new Muxer() var dialer = muxer(dialerSocket, false)
var listenerConn = listenerMuxer.attach(listenerSocket, true) listener.on('stream', function (stream) {
var dialerConn = dialerMuxer.attach(dialerSocket, false)
listenerConn.on('stream', function (stream) {
t.pass('Incoming stream') t.pass('Incoming stream')
stream.on('data', function (chunk) { stream.on('data', function (chunk) {
@ -35,11 +30,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
t.pass('Stream ended on Listener') t.pass('Stream ended on Listener')
stream.end() stream.end()
}) })
}) })
for (var i = 0; i < nStreams; i++) { for (var i = 0; i < nStreams; i++) {
dialerConn.dialStream(function (err, stream) { dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('Dialed stream') t.pass('Dialed stream')
@ -58,5 +52,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
stream.end() stream.end()
}) })
} }
} }

View File

@ -1,131 +1,125 @@
var streamPair = require('stream-pair') var streamPair = require('stream-pair')
module.exports.all = function (test, common) { module.exports.all = function (test, common) {
test('1 stream with 1 msg', function (t) { test('1 stream with 1 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1, 1) spawnGeneration(t, muxer, pair, pair.other, 1, 1)
}) })
}) })
test('1 stream with 10 msg', function (t) { test('1 stream with 10 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1, 10) spawnGeneration(t, muxer, pair, pair.other, 1, 10)
}) })
}) })
test('1 stream with 100 msg', function (t) { test('1 stream with 100 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1, 100) spawnGeneration(t, muxer, pair, pair.other, 1, 100)
}) })
}) })
test('10 stream with 1 msg', function (t) { test('10 stream with 1 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 10, 1) spawnGeneration(t, muxer, pair, pair.other, 10, 1)
}) })
}) })
test('10 stream with 10 msg', function (t) { test('10 stream with 10 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 10, 10) spawnGeneration(t, muxer, pair, pair.other, 10, 10)
}) })
}) })
test('10 stream with 100 msg', function (t) { test('10 stream with 100 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 10, 10) spawnGeneration(t, muxer, pair, pair.other, 10, 10)
}) })
}) })
test('100 stream with 1 msg', function (t) { test('100 stream with 1 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 100, 1) spawnGeneration(t, muxer, pair, pair.other, 100, 1)
}) })
}) })
test('100 stream with 10 msg', function (t) { test('100 stream with 10 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 100, 10) spawnGeneration(t, muxer, pair, pair.other, 100, 10)
}) })
}) })
test('100 stream with 100 msg', function (t) { test('100 stream with 100 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 100, 10) spawnGeneration(t, muxer, pair, pair.other, 100, 10)
}) })
}) })
test('1000 stream with 1 msg', function (t) { test('1000 stream with 1 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1000, 1) spawnGeneration(t, muxer, pair, pair.other, 1000, 1)
}) })
}) })
test('1000 stream with 10 msg', function (t) { test('1000 stream with 10 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1000, 10) spawnGeneration(t, muxer, pair, pair.other, 1000, 10)
}) })
}) })
test('1000 stream with 100 msg', function (t) { test('1000 stream with 100 msg', function (t) {
common.setup(test, function (err, Muxer) { common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw') t.ifError(err, 'should not throw')
var pair = streamPair.create() var pair = streamPair.create()
spawnGeneration(t, Muxer, pair, pair.other, 1000, 100) spawnGeneration(t, muxer, pair, pair.other, 1000, 100)
}) })
}) })
} }
function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
var msg = !size ? 'simple msg' : 'make the msg bigger' var msg = !size ? 'simple msg' : 'make the msg bigger'
var listenerMuxer = new Muxer() var listener = muxer(listenerSocket, true)
var dialerMuxer = new Muxer() var dialer = muxer(dialerSocket, false)
var listenerConn = listenerMuxer.attach(listenerSocket, true) listener.on('stream', function (stream) {
var dialerConn = dialerMuxer.attach(dialerSocket, false)
listenerConn.on('stream', function (stream) {
t.pass('Incoming stream') t.pass('Incoming stream')
stream.on('data', function (chunk) { stream.on('data', function (chunk) {
t.pass('Received message') t.pass('Received message')
}) })
@ -134,11 +128,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
t.pass('Stream ended on Listener') t.pass('Stream ended on Listener')
stream.end() stream.end()
}) })
}) })
for (var i = 0; i < nStreams; i++) { for (var i = 0; i < nStreams; i++) {
dialerConn.dialStream(function (err, stream) { dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw') t.ifError(err, 'Should not throw')
t.pass('Dialed stream') t.pass('Dialed stream')
@ -157,5 +150,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
stream.end() stream.end()
}) })
} }
} }