refactor: API changes and switch to async await (#55)

BREAKING CHANGE: the API is now async / await. See https://github.com/libp2p/interface-stream-muxer/pull/55#issue-275014779 for a summary of the changes.
This commit is contained in:
Alan Shaw 2019-09-16 16:48:04 +01:00 committed by Jacob Heun
parent 9b3d3ea623
commit dd837ba326
9 changed files with 339 additions and 384 deletions

1
.gitignore vendored
View File

@ -32,3 +32,4 @@ build
node_modules
dist
package-lock.json

View File

@ -10,7 +10,6 @@ jobs:
include:
- stage: check
script:
- npx aegir commitlint --travis
- npx aegir dep-check
- npm run lint

120
README.md
View File

@ -1,5 +1,4 @@
interface-stream-muxer
=====================
# interface-stream-muxer
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
@ -15,7 +14,7 @@ The primary goal of this module is to enable developers to pick and swap their s
Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite.
The API is presented with both Node.js and Go primitives, however, there is not actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.
The API is presented with both Node.js and Go primitives, however, there is no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through different stacks.
## Lead Maintainer
@ -37,7 +36,7 @@ Include this badge in your readme if you make a new module that uses interface-s
## Usage
### Node.js
### JS
Install `interface-stream-muxer` as one of the dependencies of your project and as a test file. Then, using `mocha` (for JavaScript) or a test runner with compatible API, do:
@ -45,11 +44,11 @@ Install `interface-stream-muxer` as one of the dependencies of your project and
const test = require('interface-stream-muxer')
const common = {
setup (cb) {
cb(null, yourMuxer)
async setup () {
return yourMuxer
},
teardown (cb) {
cb()
async teardown () {
// cleanup
}
}
@ -63,12 +62,91 @@ test(common)
## API
A valid (read: that follows this abstraction) stream muxer, must implement the following API.
### JS
### Attach muxer to a Connection
A valid (one that follows this abstraction) stream muxer, must implement the following API:
- `JavaScript` muxedConn = muxer(conn, isListener)
- `Go` muxedConn, err := muxer.Attach(conn, isListener)
#### `const muxer = new Muxer([options])`
Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications.
e.g.
```js
const Muxer = require('your-muxer-module')
const pipe = require('it-pipe')
// Create a duplex muxer
const muxer = new Muxer()
// Use the muxer in a pipeline
pipe(conn, muxer, conn) // conn is duplex connection to another peer
```
`options` is an optional `Object` that may have the following properties:
* `onStream` - A function called when receiving a new stream from the remote. e.g.
```js
// Receive a new stream on the muxed connection
const onStream = stream => {
// Read from this stream and write back to it (echo server)
pipe(
stream,
source => (async function * () {
for await (const data of source) yield data
})()
stream
)
}
const muxer = new Muxer({ onStream })
// ...
```
**Note:** The `onStream` function can be passed in place of the `options` object. i.e.
```js
new Mplex(stream => { /* ... */ })
```
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
```js
const controller = new AbortController()
const muxer = new Muxer({ signal: controller.signal })
pipe(conn, muxer, conn)
controller.abort()
```
* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB)
#### `muxer.onStream`
Use this property as an alternative to passing `onStream` as an option to the `Muxer` constructor.
```js
const muxer = new Muxer()
// ...later
muxer.onStream = stream => { /* ... */ }
```
#### `const stream = muxer.newStream([options])`
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
e.g.
```js
// Create a new stream on the muxed connection
const stream = muxer.newStream()
// Use this new stream like any other duplex stream:
pipe([1, 2, 3], stream, consume)
```
### Go
#### Attach muxer to a Connection
```go
muxedConn, err := muxer.Attach(conn, isListener)
```
This method attaches our stream muxer to an instance of [Connection](https://github.com/libp2p/interface-connection/blob/master/src/connection.js) defined by [interface-connection](https://github.com/libp2p/interface-connection).
@ -78,22 +156,22 @@ If `err` is passed, no operation should be made in `conn`.
`muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.
### Dial(open/create) a new stream
#### Dial(open/create) a new stream
- `JavaScript` stream = muxedConn.newStream([function (err, stream)])
- `Go` stream, err := muxedConn.newStream()
```go
stream, err := muxedConn.newStream()
```
This method negotiates and opens a new stream with the other endpoint.
If `err` is passed, no operation should be made in `stream`.
`stream` interface our established Stream with the other endpoint, it must implement the [Duplex pull-stream interface](https://pull-stream.github.io) in JavaScript or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go.
`stream` interface our established Stream with the other endpoint, it must implement the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser).
### Listen(wait/accept) a new incoming stream
#### Listen(wait/accept) a new incoming stream
- `JavaScript` muxedConn.on('stream', function (stream) {})
- `Go` stream := muxedConn.Accept()
```go
stream := muxedConn.Accept()
```
Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.
In JavaScript, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called.

View File

@ -33,18 +33,20 @@
},
"homepage": "https://github.com/libp2p/interface-stream-muxer",
"dependencies": {
"async": "^2.6.2",
"abort-controller": "^3.0.0",
"abortable-iterator": "^2.1.0",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"detect-node": "^2.0.4",
"libp2p-tcp": "~0.13.0",
"multiaddr": "^6.0.6",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.6.9"
"it-pair": "^1.0.0",
"it-pipe": "^1.0.1",
"libp2p-tcp": "^0.14.0",
"multiaddr": "^7.1.0",
"p-limit": "^2.2.0",
"streaming-iterables": "^4.1.0"
},
"devDependencies": {
"aegir": "^18.2.2"
"aegir": "^20.0.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",

View File

@ -3,145 +3,113 @@
const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const { expect } = chai
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { collect, map, consume } = require('streaming-iterables')
function closeAndWait (stream) {
pull(
pull.empty(),
stream,
pull.onEnd((err) => {
expect(err).to.not.exist.mark()
})
)
async function closeAndWait (stream) {
await pipe([], stream, consume)
expect(true).to.be.true.mark()
}
module.exports = (common) => {
describe('base', () => {
let muxer
let Muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
beforeEach(async () => {
Muxer = await common.setup()
})
it('Open a stream from the dialer', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
const dialer = new Muxer()
expect(4).checks(done)
listener.on('stream', (stream) => {
const listener = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const conn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(3).checks(done)
const conn = dialer.newStream()
closeAndWait(conn)
})
it('Open a stream from the listener', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
expect(4).check(done)
dialer.on('stream', (stream) => {
const dialer = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const listener = new Muxer()
const conn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(3).check(done)
const conn = listener.newStream()
closeAndWait(conn)
})
it('Open a stream on both sides', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
expect(8).check(done)
dialer.on('stream', (stream) => {
const dialer = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const listener = new Muxer(stream => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})
expect(6).check(done)
const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
const listenerConn = listener.newStream()
const dialerConn = dialer.newStream()
closeAndWait(dialerConn)
closeAndWait(listenerConn)
})
it('Open a stream on one side, write, open a stream in the other side', (done) => {
it('Open a stream on one side, write, open a stream on the other side', (done) => {
const toString = map(c => c.slice().toString())
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
expect(6).check(done)
const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})
listener.on('stream', (stream) => {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer.from('hey')]).mark()
})
)
dialer.on('stream', onDialerStream)
const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
const dialer = new Muxer()
const listener = new Muxer(stream => {
pipe(stream, toString, collect).then(chunks => {
expect(chunks).to.be.eql(['hey']).mark()
})
pull(
pull.values(['hello']),
listenerConn
)
dialer.onStream = onDialerStream
function onDialerStream (stream) {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer.from('hello')]).mark()
})
)
const listenerConn = listener.newStream()
pipe(['hello'], listenerConn)
async function onDialerStream (stream) {
const chunks = await pipe(stream, toString, collect)
expect(chunks).to.be.eql(['hello']).mark()
}
})
pull(
pull.values(['hey']),
dialerConn
)
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
expect(2).check(done)
const dialerConn = dialer.newStream()
pipe(['hey'], dialerConn)
})
})
}

View File

@ -2,157 +2,117 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'
const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const parallel = require('async/parallel')
const series = require('async/series')
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { consume } = require('streaming-iterables')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const mh = multiaddr('/ip4/127.0.0.1/tcp/10000')
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
function closeAndWait (stream, callback) {
pull(
pull.empty(),
stream,
pull.onEnd(callback)
)
function pause (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
function randomBuffer () {
return Buffer.from(Math.random().toString())
}
const infiniteRandom = {
[Symbol.asyncIterator]: async function * () {
while (true) {
yield randomBuffer()
await pause(10)
}
}
}
module.exports = (common) => {
describe('close', () => {
let muxer
let Muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
beforeEach(async () => {
Muxer = await common.setup()
})
it('closing underlying socket closes streams (tcp)', (done) => {
const tcp = new Tcp()
const tcpListener = tcp.createListener((conn) => {
const listener = muxer.listener(conn)
listener.on('stream', (stream) => {
pull(stream, stream)
})
it('closing underlying socket closes streams (tcp)', async () => {
const mockConn = muxer => ({
newStream: (...args) => muxer.newStream(...args)
})
// Wait for the streams to open
expect(2).checks(() => {
// Once everything is closed, we're done
expect(3).checks(done)
tcpListener.close((err) => {
expect(err).to.not.exist.mark()
})
})
tcpListener.listen(mh, () => {
const dialerConn = tcp.dial(mh, () => {
const dialerMuxer = muxer.dialer(dialerConn)
const s1 = dialerMuxer.newStream((err) => {
expect(err).to.not.exist.mark()
pull(
s1,
pull.onEnd((err) => {
expect(err).to.exist.mark()
})
)
})
const s2 = dialerMuxer.newStream((err) => {
expect(err).to.not.exist.mark()
pull(
s2,
pull.onEnd((err) => {
expect(err).to.exist.mark()
})
)
})
})
})
})
it('closing one of the muxed streams doesn\'t close others', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
expect(6).checks(done)
const conns = []
listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
pull(stream, stream)
})
for (let i = 0; i < 5; i++) {
conns.push(dialer.newStream())
const mockUpgrade = () => maConn => {
const muxer = new Muxer(stream => pipe(stream, stream))
pipe(maConn, muxer, maConn)
return mockConn(muxer)
}
conns.forEach((conn, i) => {
if (i === 1) {
closeAndWait(conn, (err) => {
expect(err).to.not.exist.mark()
})
} else {
pull(
conn,
pull.onEnd(() => {
throw new Error('should not end')
})
)
const mockUpgrader = () => ({
upgradeInbound: mockUpgrade(),
upgradeOutbound: mockUpgrade()
})
const tcp = new Tcp({ upgrader: mockUpgrader() })
const tcpListener = tcp.createListener()
await tcpListener.listen(mh)
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
const s1 = await dialerConn.newStream()
const s2 = await dialerConn.newStream()
// close the listener in a bit
setTimeout(() => tcpListener.close(), 50)
const s1Result = pipe(infiniteRandom, s1, consume)
const s2Result = pipe(infiniteRandom, s2, consume)
// test is complete when all muxed streams have closed
await s1Result
await s2Result
})
it('closing one of the muxed streams doesn\'t close others', async () => {
const p = pair()
const dialer = new Muxer()
// Listener is echo server :)
const listener = new Muxer(stream => pipe(stream, stream))
pipe(p[0], dialer, p[0])
pipe(p[1], listener, p[1])
const stream = dialer.newStream()
const streams = Array.from(Array(5), () => dialer.newStream())
let closed = false
const controllers = []
const streamResults = streams.map(async stream => {
const controller = new AbortController()
controllers.push(controller)
try {
const abortableRand = abortable(infiniteRandom, controller.signal, { abortCode: 'ERR_TEST_ABORT' })
await pipe(abortableRand, stream, consume)
} catch (err) {
if (err.code !== 'ERR_TEST_ABORT') throw err
}
if (!closed) throw new Error('stream should not have ended yet!')
})
})
it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
// Pause, and then send some data and close the first stream
await pause(50)
await pipe([randomBuffer()], stream, consume)
closed = true
expect(15).checks(done)
// Abort all the other streams later
await pause(50)
controllers.forEach(c => c.abort())
const conns = []
const count = []
for (let i = 0; i < 5; i++) {
count.push(i)
}
series(count.map((i) => (cb) => {
parallel([
(cb) => listener.once('stream', (stream) => {
expect(stream).to.exist.mark()
pull(stream, stream)
cb()
}),
(cb) => conns.push(dialer.newStream(cb))
], cb)
}), (err) => {
if (err) return done(err)
conns.forEach((conn, i) => {
pull(
pull.values([Buffer.from('hello')]),
pull.asyncMap((val, cb) => {
setTimeout(() => {
cb(null, val)
}, i * 10)
}),
conn,
pull.collect((err, data) => {
expect(err).to.not.exist.mark()
expect(data).to.be.eql([Buffer.from('hello')]).mark()
})
)
})
})
// These should now all resolve without error
await Promise.all(streamResults)
})
})
}

View File

@ -6,18 +6,12 @@ const spawn = require('./spawner')
module.exports = (common) => {
describe.skip('mega stress test', function () {
this.timeout(100 * 200 * 1000)
let muxer
let Muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
beforeEach(async () => {
Muxer = await common.setup()
})
it('10000 messages of 10000 streams', (done) => {
spawn(muxer, 10000, 10000, done, 5000)
})
it('10,000 streams with 10,000 msg', () => spawn(Muxer, 10000, 10000, 5000))
})
}

View File

@ -1,92 +1,82 @@
'use strict'
const expect = require('chai').expect
const { expect } = require('chai')
const pair = require('it-pair/duplex')
const pipe = require('it-pipe')
const pLimit = require('p-limit')
const { collect, tap, consume } = require('streaming-iterables')
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const generate = require('pull-generate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
const setImmediate = require('async/setImmediate')
module.exports = (muxer, nStreams, nMsg, done, limit) => {
const p = pair()
const dialerSocket = p[0]
const listenerSocket = p[1]
const check = marker((6 * nStreams) + (nStreams * nMsg), done)
module.exports = async (Muxer, nStreams, nMsg, limit) => {
const [dialerSocket, listenerSocket] = pair()
const { check, done } = marker((4 * nStreams) + (nStreams * nMsg))
const msg = 'simple msg'
const listener = muxer.listener(listenerSocket)
const dialer = muxer.dialer(dialerSocket)
listener.on('stream', (stream) => {
const listener = new Muxer(async stream => {
expect(stream).to.exist // eslint-disable-line
check()
pull(
await pipe(
stream,
pull.through((chunk) => {
expect(chunk).to.exist // eslint-disable-line
check()
}),
pull.onEnd((err) => {
expect(err).to.not.exist // eslint-disable-line
check()
pull(pull.empty(), stream)
})
tap(chunk => check()),
consume
)
check()
pipe([], stream)
})
const numbers = []
for (let i = 0; i < nStreams; i++) {
numbers.push(i)
const dialer = new Muxer()
pipe(listenerSocket, listener, listenerSocket)
pipe(dialerSocket, dialer, dialerSocket)
const spawnStream = async n => {
const stream = dialer.newStream()
expect(stream).to.exist // eslint-disable-line
check()
const res = await pipe(
(function * () {
for (let i = 0; i < nMsg; i++) {
// console.log('n', n, 'msg', i)
yield new Promise(resolve => resolve(msg))
}
})(),
stream,
collect
)
expect(res).to.be.eql([])
check()
}
const spawnStream = (n, cb) => {
const stream = dialer.newStream((err) => {
expect(err).to.not.exist // eslint-disable-line
check()
expect(stream).to.exist // eslint-disable-line
check()
pull(
generate(0, (s, cb) => {
setImmediate(() => {
cb(s === nMsg ? true : null, msg, s + 1)
})
}),
stream,
pull.collect((err, res) => {
expect(err).to.not.exist // eslint-disable-line
check()
expect(res).to.be.eql([])
check()
cb()
})
)
})
}
const limiter = pLimit(limit || Infinity)
if (limit) {
eachLimit(numbers, limit, spawnStream, () => {})
} else {
each(numbers, spawnStream, () => {})
}
await Promise.all(
Array.from(Array(nStreams), (_, i) => limiter(() => spawnStream(i)))
)
return done
}
function marker (n, done) {
function marker (n) {
let check
let i = 0
return (err) => {
i++
const done = new Promise((resolve, reject) => {
check = err => {
i++
if (err) {
/* eslint-disable-next-line */
console.error('Failed after %s iterations', i)
return done(err)
}
if (err) {
/* eslint-disable-next-line */
console.error('Failed after %s iterations', i)
return reject(err)
}
if (i === n) {
done()
if (i === n) {
resolve()
}
}
}
})
return { check, done }
}

View File

@ -5,63 +5,26 @@ const spawn = require('./spawner')
module.exports = (common) => {
describe('stress test', () => {
let muxer
let Muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
beforeEach(async () => {
Muxer = await common.setup()
})
it('1 stream with 1 msg', (done) => {
spawn(muxer, 1, 1, done)
})
it('1 stream with 10 msg', (done) => {
spawn(muxer, 1, 10, done)
})
it('1 stream with 100 msg', (done) => {
spawn(muxer, 1, 100, done)
})
it('10 streams with 1 msg', (done) => {
spawn(muxer, 10, 1, done)
})
it('10 streams with 10 msg', (done) => {
spawn(muxer, 10, 10, done)
})
it('10 streams with 100 msg', (done) => {
spawn(muxer, 10, 100, done)
})
it('100 streams with 1 msg', (done) => {
spawn(muxer, 100, 1, done)
})
it('100 streams with 10 msg', (done) => {
spawn(muxer, 100, 10, done)
})
it('100 streams with 100 msg', (done) => {
spawn(muxer, 100, 100, done)
})
it('1000 streams with 1 msg', (done) => {
spawn(muxer, 1000, 1, done)
})
it('1000 streams with 10 msg', (done) => {
spawn(muxer, 1000, 10, done)
})
it('1000 streams with 100 msg', function (done) {
this.timeout(80 * 1000)
spawn(muxer, 1000, 100, done)
it('1 stream with 1 msg', () => spawn(Muxer, 1, 1))
it('1 stream with 10 msg', () => spawn(Muxer, 1, 10))
it('1 stream with 100 msg', () => spawn(Muxer, 1, 100))
it('10 streams with 1 msg', () => spawn(Muxer, 10, 1))
it('10 streams with 10 msg', () => spawn(Muxer, 10, 10))
it('10 streams with 100 msg', () => spawn(Muxer, 10, 100))
it('100 streams with 1 msg', () => spawn(Muxer, 100, 1))
it('100 streams with 10 msg', () => spawn(Muxer, 100, 10))
it('100 streams with 100 msg', () => spawn(Muxer, 100, 100))
it('1000 streams with 1 msg', () => spawn(Muxer, 1000, 1))
it('1000 streams with 10 msg', () => spawn(Muxer, 1000, 10))
it('1000 streams with 100 msg', function () {
this.timeout(30 * 1000)
return spawn(Muxer, 1000, 100)
})
})
}