mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-04-25 06:12:29 +00:00
feat: add onStreamEnd, muxer.streams and timeline (#56)
BREAKING CHANGE: This adds new validations to the stream muxer, which will cause existing tests to fail.
This commit is contained in:
parent
d908f8afc2
commit
0f608322c8
29
README.md
29
README.md
@ -105,6 +105,14 @@ pipe(conn, muxer, conn) // conn is duplex connection to another peer
|
|||||||
```js
|
```js
|
||||||
new Mplex(stream => { /* ... */ })
|
new Mplex(stream => { /* ... */ })
|
||||||
```
|
```
|
||||||
|
* `onStreamEnd` - A function called when a stream ends.
|
||||||
|
```js
|
||||||
|
// Get notified when a stream has ended
|
||||||
|
const onStreamEnd = stream => {
|
||||||
|
// Manage any tracking changes, etc
|
||||||
|
}
|
||||||
|
const muxer = new Muxer({ onStreamEnd, ... })
|
||||||
|
```
|
||||||
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
|
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
|
||||||
```js
|
```js
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
@ -126,6 +134,16 @@ const muxer = new Muxer()
|
|||||||
muxer.onStream = stream => { /* ... */ }
|
muxer.onStream = stream => { /* ... */ }
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `muxer.onStreamEnd`
|
||||||
|
|
||||||
|
Use this property as an alternative to passing `onStreamEnd` as an option to the `Muxer` constructor.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const muxer = new Muxer()
|
||||||
|
// ...later
|
||||||
|
muxer.onStreamEnd = stream => { /* ... */ }
|
||||||
|
```
|
||||||
|
|
||||||
#### `const stream = muxer.newStream([options])`
|
#### `const stream = muxer.newStream([options])`
|
||||||
|
|
||||||
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
|
Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it).
|
||||||
@ -140,6 +158,17 @@ const stream = muxer.newStream()
|
|||||||
pipe([1, 2, 3], stream, consume)
|
pipe([1, 2, 3], stream, consume)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `const streams = muxer.streams`
|
||||||
|
|
||||||
|
The streams property returns an array of streams the muxer currently has open. Closed streams will not be returned.
|
||||||
|
|
||||||
|
```js
|
||||||
|
muxer.streams.map(stream => {
|
||||||
|
// Log out the stream's id
|
||||||
|
console.log(stream.id)
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
### Go
|
### Go
|
||||||
|
|
||||||
#### Attach muxer to a Connection
|
#### Attach muxer to a Connection
|
||||||
|
@ -8,11 +8,28 @@ const pair = require('it-pair/duplex')
|
|||||||
const pipe = require('it-pipe')
|
const pipe = require('it-pipe')
|
||||||
const { collect, map, consume } = require('streaming-iterables')
|
const { collect, map, consume } = require('streaming-iterables')
|
||||||
|
|
||||||
|
function close (stream) {
|
||||||
|
return pipe([], stream, consume)
|
||||||
|
}
|
||||||
|
|
||||||
async function closeAndWait (stream) {
|
async function closeAndWait (stream) {
|
||||||
await pipe([], stream, consume)
|
await close(stream)
|
||||||
expect(true).to.be.true.mark()
|
expect(true).to.be.true.mark()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A tick is considered valid if it happened between now
|
||||||
|
* and `ms` milliseconds ago
|
||||||
|
* @param {number} date Time in ticks
|
||||||
|
* @param {number} ms max milliseconds that should have expired
|
||||||
|
* @returns {boolean}
|
||||||
|
*/
|
||||||
|
function isValidTick (date, ms = 5000) {
|
||||||
|
const now = Date.now()
|
||||||
|
if (date > now - ms && date <= now) return true
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = (common) => {
|
module.exports = (common) => {
|
||||||
describe('base', () => {
|
describe('base', () => {
|
||||||
let Muxer
|
let Muxer
|
||||||
@ -25,25 +42,44 @@ module.exports = (common) => {
|
|||||||
const p = pair()
|
const p = pair()
|
||||||
const dialer = new Muxer()
|
const dialer = new Muxer()
|
||||||
|
|
||||||
const listener = new Muxer(stream => {
|
const listener = new Muxer({
|
||||||
expect(stream).to.exist.mark()
|
onStream: stream => {
|
||||||
closeAndWait(stream)
|
expect(stream).to.exist.mark() // 1st check
|
||||||
|
expect(isValidTick(stream.timeline.open)).to.equal(true)
|
||||||
|
// Make sure the stream is being tracked
|
||||||
|
expect(listener.streams).to.include(stream)
|
||||||
|
close(stream)
|
||||||
|
},
|
||||||
|
onStreamEnd: stream => {
|
||||||
|
expect(stream).to.exist.mark() // 2nd check
|
||||||
|
expect(listener.streams).to.not.include(stream)
|
||||||
|
// Make sure the stream is removed from tracking
|
||||||
|
expect(isValidTick(stream.timeline.close)).to.equal(true)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
pipe(p[0], dialer, p[0])
|
pipe(p[0], dialer, p[0])
|
||||||
pipe(p[1], listener, p[1])
|
pipe(p[1], listener, p[1])
|
||||||
|
|
||||||
expect(3).checks(done)
|
expect(3).checks(() => {
|
||||||
|
// ensure we have no streams left
|
||||||
|
expect(dialer.streams).to.have.length(0)
|
||||||
|
expect(listener.streams).to.have.length(0)
|
||||||
|
done()
|
||||||
|
})
|
||||||
|
|
||||||
const conn = dialer.newStream()
|
const conn = dialer.newStream()
|
||||||
|
expect(dialer.streams).to.include(conn)
|
||||||
|
expect(isValidTick(conn.timeline.open)).to.equal(true)
|
||||||
|
|
||||||
closeAndWait(conn)
|
closeAndWait(conn) // 3rd check
|
||||||
})
|
})
|
||||||
|
|
||||||
it('Open a stream from the listener', (done) => {
|
it('Open a stream from the listener', (done) => {
|
||||||
const p = pair()
|
const p = pair()
|
||||||
const dialer = new Muxer(stream => {
|
const dialer = new Muxer(stream => {
|
||||||
expect(stream).to.exist.mark()
|
expect(stream).to.exist.mark()
|
||||||
|
expect(isValidTick(stream.timeline.open)).to.equal(true)
|
||||||
closeAndWait(stream)
|
closeAndWait(stream)
|
||||||
})
|
})
|
||||||
const listener = new Muxer()
|
const listener = new Muxer()
|
||||||
@ -54,6 +90,8 @@ module.exports = (common) => {
|
|||||||
expect(3).check(done)
|
expect(3).check(done)
|
||||||
|
|
||||||
const conn = listener.newStream()
|
const conn = listener.newStream()
|
||||||
|
expect(listener.streams).to.include(conn)
|
||||||
|
expect(isValidTick(conn.timeline.open)).to.equal(true)
|
||||||
|
|
||||||
closeAndWait(conn)
|
closeAndWait(conn)
|
||||||
})
|
})
|
||||||
|
Loading…
x
Reference in New Issue
Block a user