|
|
|
@ -2,17 +2,17 @@
|
|
|
|
|
/* eslint max-nested-callbacks: ["error", 8] */
|
|
|
|
|
'use strict'
|
|
|
|
|
|
|
|
|
|
const chai = require('chai')
|
|
|
|
|
const expect = chai.expect
|
|
|
|
|
chai.use(require('dirty-chai'))
|
|
|
|
|
|
|
|
|
|
const pair = require('it-pair/duplex')
|
|
|
|
|
const pipe = require('it-pipe')
|
|
|
|
|
const { consume } = require('streaming-iterables')
|
|
|
|
|
const Tcp = require('libp2p-tcp')
|
|
|
|
|
const multiaddr = require('multiaddr')
|
|
|
|
|
const abortable = require('abortable-iterator')
|
|
|
|
|
const AbortController = require('abort-controller')
|
|
|
|
|
const uint8arrayFromString = require('uint8arrays/from-string')
|
|
|
|
|
|
|
|
|
|
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
|
|
|
|
|
|
|
|
|
|
function pause (ms) {
|
|
|
|
|
return new Promise(resolve => setTimeout(resolve, ms))
|
|
|
|
|
}
|
|
|
|
@ -38,33 +38,31 @@ module.exports = (common) => {
|
|
|
|
|
Muxer = await common.setup()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
it('closing underlying socket closes streams (tcp)', async () => {
|
|
|
|
|
it('closing underlying socket closes streams', async () => {
|
|
|
|
|
const mockConn = muxer => ({
|
|
|
|
|
newStream: (...args) => muxer.newStream(...args)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
const mockUpgrade = () => maConn => {
|
|
|
|
|
const mockUpgrade = maConn => {
|
|
|
|
|
const muxer = new Muxer(stream => pipe(stream, stream))
|
|
|
|
|
pipe(maConn, muxer, maConn)
|
|
|
|
|
return mockConn(muxer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const mockUpgrader = () => ({
|
|
|
|
|
upgradeInbound: mockUpgrade(),
|
|
|
|
|
upgradeOutbound: mockUpgrade()
|
|
|
|
|
const [local, remote] = pair()
|
|
|
|
|
const controller = new AbortController()
|
|
|
|
|
const abortableRemote = abortable.duplex(remote, controller.signal, {
|
|
|
|
|
returnOnAbort: true
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
const tcp = new Tcp({ upgrader: mockUpgrader() })
|
|
|
|
|
const tcpListener = tcp.createListener()
|
|
|
|
|
|
|
|
|
|
await tcpListener.listen(mh)
|
|
|
|
|
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
|
|
|
|
|
mockUpgrade(abortableRemote)
|
|
|
|
|
const dialerConn = mockUpgrade(local)
|
|
|
|
|
|
|
|
|
|
const s1 = await dialerConn.newStream()
|
|
|
|
|
const s2 = await dialerConn.newStream()
|
|
|
|
|
|
|
|
|
|
// close the listener in a bit
|
|
|
|
|
setTimeout(() => tcpListener.close(), 50)
|
|
|
|
|
// close the remote in a bit
|
|
|
|
|
setTimeout(() => controller.abort(), 50)
|
|
|
|
|
|
|
|
|
|
const s1Result = pipe(infiniteRandom, s1, consume)
|
|
|
|
|
const s2Result = pipe(infiniteRandom, s2, consume)
|
|
|
|
@ -115,5 +113,69 @@ module.exports = (common) => {
|
|
|
|
|
// These should now all resolve without error
|
|
|
|
|
await Promise.all(streamResults)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
it('can close a stream for writing', (done) => {
|
|
|
|
|
const p = pair()
|
|
|
|
|
const dialer = new Muxer()
|
|
|
|
|
const data = [randomBuffer(), randomBuffer()]
|
|
|
|
|
|
|
|
|
|
const listener = new Muxer(async stream => {
|
|
|
|
|
// Immediate close for write
|
|
|
|
|
await stream.closeWrite()
|
|
|
|
|
|
|
|
|
|
const results = await pipe(stream, async (source) => {
|
|
|
|
|
const data = []
|
|
|
|
|
for await (const chunk of source) {
|
|
|
|
|
data.push(chunk.slice())
|
|
|
|
|
}
|
|
|
|
|
return data
|
|
|
|
|
})
|
|
|
|
|
expect(results).to.eql(data)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await stream.sink([randomBuffer()])
|
|
|
|
|
} catch (err) {
|
|
|
|
|
expect(err).to.exist()
|
|
|
|
|
return done()
|
|
|
|
|
}
|
|
|
|
|
expect.fail('should not support writing to closed writer')
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
pipe(p[0], dialer, p[0])
|
|
|
|
|
pipe(p[1], listener, p[1])
|
|
|
|
|
|
|
|
|
|
const stream = dialer.newStream()
|
|
|
|
|
stream.sink(data)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
it('can close a stream for reading', (done) => {
|
|
|
|
|
const p = pair()
|
|
|
|
|
const dialer = new Muxer()
|
|
|
|
|
const data = [randomBuffer(), randomBuffer()]
|
|
|
|
|
|
|
|
|
|
const listener = new Muxer(async stream => {
|
|
|
|
|
const results = await pipe(stream, async (source) => {
|
|
|
|
|
const data = []
|
|
|
|
|
for await (const chunk of source) {
|
|
|
|
|
data.push(chunk.slice())
|
|
|
|
|
}
|
|
|
|
|
return data
|
|
|
|
|
})
|
|
|
|
|
expect(results).to.eql(data)
|
|
|
|
|
done()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
pipe(p[0], dialer, p[0])
|
|
|
|
|
pipe(p[1], listener, p[1])
|
|
|
|
|
|
|
|
|
|
const stream = dialer.newStream()
|
|
|
|
|
stream.closeRead()
|
|
|
|
|
|
|
|
|
|
// Source should be done
|
|
|
|
|
;(async () => {
|
|
|
|
|
expect(await stream.source.next()).to.eql({ done: true })
|
|
|
|
|
stream.sink(data)
|
|
|
|
|
})()
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|