mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-04-24 10:02:18 +00:00
done
This commit is contained in:
parent
4093c5762b
commit
1aaff80f53
29
README.md
29
README.md
@ -1,7 +1,8 @@
|
||||
interface-stream-muxer
|
||||
=====================
|
||||
|
||||
[](http://ipn.io) [](http://webchat.freenode.net/?channels=%23ipfs)
|
||||
[](http://ipn.io)
|
||||
[](http://webchat.freenode.net/?channels=%23ipfs)
|
||||
|
||||
> A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs"
|
||||
|
||||
@ -13,8 +14,8 @@ The API is presented with both Node.js and Go primitives, however, there is not
|
||||
|
||||
# Modules that implement the interface
|
||||
|
||||
- [Node.js spdy-stream-muxer](https://github.com/diasdavid/node-spdy-stream-muxer) - stream-muxer abstraction on top of [spdy-transport](https://github.com/indutny/spdy-transport)
|
||||
- [Node.js multiplex-stream-muxer](https://github.com/diasdavid/node-multiplex-stream-muxer) - stream-muxer abstraction on top of [multiplex](https://github.com/maxogden/multiplex)
|
||||
- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy)
|
||||
- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex)
|
||||
- [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer)
|
||||
|
||||
Send a PR to add a new one if you happen to find or write one.
|
||||
@ -34,20 +35,18 @@ Install interface-stream-muxer as one of the dependencies of your project and as
|
||||
```
|
||||
var tape = require('tape')
|
||||
var tests = require('interface-stream-muxer/tests')
|
||||
var YourStreamMuxer = require('../src')
|
||||
var yourStreamMuxer = require('../src')
|
||||
|
||||
var common = {
|
||||
setup: function (t, cb) {
|
||||
cb(null, YourStreamMuxer)
|
||||
cb(null, yourStreamMuxer)
|
||||
},
|
||||
teardown: function (t, cb) {
|
||||
cb()
|
||||
}
|
||||
}
|
||||
|
||||
var megaTest = false // a really really intensive test case
|
||||
|
||||
tests(tape, common, megaTest)
|
||||
tests(tape, common)
|
||||
```
|
||||
|
||||
## Go
|
||||
@ -60,8 +59,8 @@ A valid (read: that follows this abstraction) stream muxer, must implement the f
|
||||
|
||||
### Attach muxer to a transport
|
||||
|
||||
- `Node.js` conn = muxer.attach(transport, isListener)
|
||||
- `Go` conn, err := muxer.Attach(transport, isListener)
|
||||
- `Node.js` muxedConn = muxer(transport, isListener)
|
||||
- `Go` muxedConn, err := muxer.Attach(transport, isListener)
|
||||
|
||||
This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection).
|
||||
|
||||
@ -69,13 +68,13 @@ If `err` is passed, no operation should be made in `conn`.
|
||||
|
||||
`isListener` is a bool that tells the side of the socket we are, `isListener = true` for listener/server and `isListener = false` for dialer/client side.
|
||||
|
||||
`conn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.
|
||||
`muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.
|
||||
|
||||
### Dial(open/create) a new stream
|
||||
|
||||
|
||||
- `Node.js` stream = conn.dialStream([function (err, stream)])
|
||||
- `Go` stream, err := conn.DialStream()
|
||||
- `Node.js` stream = muxedConn.newStream([function (err, stream)])
|
||||
- `Go` stream, err := muxedConn.newStream()
|
||||
|
||||
This method negotiates and opens a new stream with the other endpoint.
|
||||
|
||||
@ -87,8 +86,8 @@ In the Node.js case, if no callback is passed, stream will emit an 'ready' event
|
||||
|
||||
### Listen(wait/accept) a new incoming stream
|
||||
|
||||
- `Node.js` conn.on('stream', function (stream))
|
||||
- `Go` stream := conn.Accept()
|
||||
- `Node.js` muxedConn.on('stream', function (stream) {})
|
||||
- `Go` stream := muxedConn.Accept()
|
||||
|
||||
Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.
|
||||
|
||||
|
BIN
tests/.DS_Store
vendored
Normal file
BIN
tests/.DS_Store
vendored
Normal file
Binary file not shown.
@ -1,121 +1,105 @@
|
||||
var streamPair = require('stream-pair')
|
||||
|
||||
module.exports.all = function (test, common) {
|
||||
|
||||
test('Open a stream from the dealer', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
test('Open a stream from the dialer', function (t) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(4)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
|
||||
connDialer.dialStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream')
|
||||
listener.on('stream', (stream) => {
|
||||
t.pass('got stream')
|
||||
})
|
||||
|
||||
connListener.on('stream', function (stream) {
|
||||
t.pass('got stream')
|
||||
dialer.newStream((err, stream) => {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Open a stream from the listener', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(4)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
|
||||
connListener.dialStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream')
|
||||
dialer.on('stream', (stream) => {
|
||||
t.pass('got stream')
|
||||
})
|
||||
|
||||
connDialer.on('stream', function (stream) {
|
||||
t.pass('got stream')
|
||||
listener.newStream((err, stream) => {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Open a stream on both sides', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(7)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
dialer.on('stream', (stream) => {
|
||||
t.pass('got stream')
|
||||
})
|
||||
|
||||
connDialer.dialStream(function (err, stream) {
|
||||
listener.newStream((err, stream) => {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream from dialer')
|
||||
t.pass('dialed stream')
|
||||
})
|
||||
|
||||
connListener.on('stream', function (stream) {
|
||||
t.pass('listener got stream')
|
||||
listener.on('stream', (stream) => {
|
||||
t.pass('got stream')
|
||||
})
|
||||
|
||||
connListener.dialStream(function (err, stream) {
|
||||
dialer.newStream((err, stream) => {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream from listener')
|
||||
})
|
||||
|
||||
connDialer.on('stream', function (stream) {
|
||||
t.pass('dialer got stream')
|
||||
t.pass('dialed stream')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Open a stream on one side, write, open a stream in the other side', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(9)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
|
||||
connDialer.dialStream(function (err, stream) {
|
||||
dialer.newStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream from dialer')
|
||||
|
||||
stream.write('hey')
|
||||
})
|
||||
|
||||
connListener.on('stream', function (stream) {
|
||||
listener.on('stream', function (stream) {
|
||||
t.pass('listener got stream')
|
||||
|
||||
stream.on('data', function (chunk) {
|
||||
t.equal(chunk.toString(), 'hey')
|
||||
})
|
||||
|
||||
connListener.dialStream(function (err, stream) {
|
||||
listener.newStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('dialed stream from listener')
|
||||
|
||||
stream.write('hello')
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
connDialer.on('stream', function (stream) {
|
||||
dialer.on('stream', function (stream) {
|
||||
t.pass('dialer got stream')
|
||||
|
||||
stream.on('data', function (chunk) {
|
||||
@ -126,18 +110,15 @@ module.exports.all = function (test, common) {
|
||||
})
|
||||
|
||||
test('Open a stream using the net.connect pattern', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
t.plan(3)
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(2)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
|
||||
var stream = connListener.dialStream()
|
||||
var stream = dialer.newStream()
|
||||
|
||||
stream.on('ready', function () {
|
||||
t.pass('dialed stream')
|
||||
@ -147,25 +128,22 @@ module.exports.all = function (test, common) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
})
|
||||
|
||||
connDialer.on('stream', function (stream) {
|
||||
listener.on('stream', function (stream) {
|
||||
t.pass('got stream')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Buffer writes Open a stream using the net.connect pattern', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
t.plan(4)
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.plan(3)
|
||||
t.ifError(err, 'Should not throw')
|
||||
|
||||
var pair = streamPair.create()
|
||||
var dialer = new Muxer()
|
||||
var listener = new Muxer()
|
||||
var dialer = muxer(pair, false)
|
||||
var listener = muxer(pair.other, true)
|
||||
|
||||
var connDialer = dialer.attach(pair, false)
|
||||
var connListener = listener.attach(pair.other, true)
|
||||
|
||||
var stream = connListener.dialStream()
|
||||
var stream = dialer.newStream()
|
||||
|
||||
stream.write('buffer this')
|
||||
|
||||
@ -177,7 +155,7 @@ module.exports.all = function (test, common) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
})
|
||||
|
||||
connDialer.on('stream', function (stream) {
|
||||
listener.on('stream', function (stream) {
|
||||
t.pass('got stream')
|
||||
|
||||
stream.on('data', function (chunk) {
|
||||
@ -186,5 +164,4 @@ module.exports.all = function (test, common) {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -4,7 +4,5 @@ module.exports = function (test, common, mega) {
|
||||
test = timed(test)
|
||||
require('./base-test.js').all(test, common)
|
||||
require('./stress-test.js').all(test, common)
|
||||
if (mega) {
|
||||
require('./mega-stress-test.js').all(test, common)
|
||||
}
|
||||
// require('./mega-stress-test.js').all(test, common)
|
||||
}
|
||||
|
@ -1,30 +1,25 @@
|
||||
var streamPair = require('stream-pair')
|
||||
|
||||
module.exports.all = function (test, common) {
|
||||
|
||||
test('10000 messages of 10000 streams', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 10000, 10000)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 10000, 10000)
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
|
||||
function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
|
||||
t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
|
||||
|
||||
var msg = !size ? 'simple msg' : 'make the msg bigger'
|
||||
|
||||
var listenerMuxer = new Muxer()
|
||||
var dialerMuxer = new Muxer()
|
||||
var listener = muxer(listenerSocket, true)
|
||||
var dialer = muxer(dialerSocket, false)
|
||||
|
||||
var listenerConn = listenerMuxer.attach(listenerSocket, true)
|
||||
var dialerConn = dialerMuxer.attach(dialerSocket, false)
|
||||
|
||||
listenerConn.on('stream', function (stream) {
|
||||
listener.on('stream', function (stream) {
|
||||
t.pass('Incoming stream')
|
||||
|
||||
stream.on('data', function (chunk) {
|
||||
@ -35,11 +30,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
|
||||
t.pass('Stream ended on Listener')
|
||||
stream.end()
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
for (var i = 0; i < nStreams; i++) {
|
||||
dialerConn.dialStream(function (err, stream) {
|
||||
dialer.newStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('Dialed stream')
|
||||
|
||||
@ -58,5 +52,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
|
||||
stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,131 +1,125 @@
|
||||
var streamPair = require('stream-pair')
|
||||
|
||||
module.exports.all = function (test, common) {
|
||||
|
||||
test('1 stream with 1 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1, 1)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1, 1)
|
||||
})
|
||||
})
|
||||
|
||||
test('1 stream with 10 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('1 stream with 100 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1, 100)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1, 100)
|
||||
})
|
||||
})
|
||||
|
||||
test('10 stream with 1 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 10, 1)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 10, 1)
|
||||
})
|
||||
})
|
||||
|
||||
test('10 stream with 10 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 10, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 10, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('10 stream with 100 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 10, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 10, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('100 stream with 1 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 100, 1)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 100, 1)
|
||||
})
|
||||
})
|
||||
|
||||
test('100 stream with 10 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 100, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 100, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('100 stream with 100 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 100, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 100, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('1000 stream with 1 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1000, 1)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1000, 1)
|
||||
})
|
||||
})
|
||||
|
||||
test('1000 stream with 10 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1000, 10)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1000, 10)
|
||||
})
|
||||
})
|
||||
|
||||
test('1000 stream with 100 msg', function (t) {
|
||||
common.setup(test, function (err, Muxer) {
|
||||
common.setup(test, function (err, muxer) {
|
||||
t.ifError(err, 'should not throw')
|
||||
var pair = streamPair.create()
|
||||
|
||||
spawnGeneration(t, Muxer, pair, pair.other, 1000, 100)
|
||||
spawnGeneration(t, muxer, pair, pair.other, 1000, 100)
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
|
||||
function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
|
||||
t.plan(1 + (5 * nStreams) + (nStreams * nMsg))
|
||||
|
||||
var msg = !size ? 'simple msg' : 'make the msg bigger'
|
||||
|
||||
var listenerMuxer = new Muxer()
|
||||
var dialerMuxer = new Muxer()
|
||||
var listener = muxer(listenerSocket, true)
|
||||
var dialer = muxer(dialerSocket, false)
|
||||
|
||||
var listenerConn = listenerMuxer.attach(listenerSocket, true)
|
||||
var dialerConn = dialerMuxer.attach(dialerSocket, false)
|
||||
|
||||
listenerConn.on('stream', function (stream) {
|
||||
listener.on('stream', function (stream) {
|
||||
t.pass('Incoming stream')
|
||||
|
||||
stream.on('data', function (chunk) {
|
||||
t.pass('Received message')
|
||||
})
|
||||
@ -134,11 +128,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
|
||||
t.pass('Stream ended on Listener')
|
||||
stream.end()
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
for (var i = 0; i < nStreams; i++) {
|
||||
dialerConn.dialStream(function (err, stream) {
|
||||
dialer.newStream(function (err, stream) {
|
||||
t.ifError(err, 'Should not throw')
|
||||
t.pass('Dialed stream')
|
||||
|
||||
@ -157,5 +150,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
|
||||
stream.end()
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user