2020-08-11 15:18:43 +02:00

205 lines
6.4 KiB
JavaScript

/* eslint-env mocha */
'use strict'
const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const { isValidTick } = require('./utils')
const goodbye = require('it-goodbye')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')
const AbortError = require('../errors').AbortError
const sinon = require('sinon')
module.exports = (common) => {
const upgrader = {
_upgrade (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn', 'timeline', 'close'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
expect(isValidTick(multiaddrConnection.timeline.open)).to.equal(true)
return multiaddrConnection
},
upgradeOutbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
},
upgradeInbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
}
}
describe('dial', function () {
this.timeout(20 * 1000)
let addrs
let transport
let connector
let listener
before(async () => {
({ addrs, transport, connector } = await common.setup({ upgrader }))
})
after(() => common.teardown && common.teardown())
beforeEach(() => {
listener = transport.createListener((conn) => pipe(conn, conn))
return listener.listen(addrs[0])
})
afterEach(() => {
sinon.restore()
return listener.close()
})
it('simple', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0])
const s = goodbye({ source: ['hey'], sink: collect })
const result = await pipe(s, conn, s)
expect(upgradeSpy.callCount).to.equal(1)
expect(upgradeSpy.returned(conn)).to.equal(true)
expect(result.length).to.equal(1)
expect(result[0].toString()).to.equal('hey')
})
it('can close connections', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0])
expect(upgradeSpy.callCount).to.equal(1)
expect(upgradeSpy.returned(conn)).to.equal(true)
await conn.close()
expect(isValidTick(conn.timeline.close)).to.equal(true)
})
it('to non existent listener', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
try {
await transport.dial(addrs[1])
} catch (_) {
expect(upgradeSpy.callCount).to.equal(0)
// Success: expected an error to be throw
return
}
expect.fail('Did not throw error attempting to connect to non-existent listener')
})
it('abort before dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const controller = new AbortController()
controller.abort()
const socket = transport.dial(addrs[0], { signal: controller.signal })
try {
await socket
} catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it('abort while dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
// Add a delay to connect() so that we can abort while the dial is in
// progress
connector.delay(100)
const controller = new AbortController()
const socket = transport.dial(addrs[0], { signal: controller.signal })
setTimeout(() => controller.abort(), 50)
try {
await socket
} catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
} finally {
connector.restore()
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it.skip('abort while reading throws AbortError', async () => {
// Add a delay to the response from the server
async function * delayedResponse (source) {
for await (const val of source) {
await new Promise((resolve) => setTimeout(resolve, 1000))
yield val
}
}
const delayedListener = transport.createListener(async (conn) => {
await pipe(conn, delayedResponse, conn)
})
await delayedListener.listen(addrs[1])
// Create an abort signal and dial the socket
const controller = new AbortController()
const socket = await transport.dial(addrs[1], { signal: controller.signal })
try {
// Set a timeout to abort before the server responds
setTimeout(() => controller.abort(), 100)
// An AbortError should be thrown before the pipe completes
const s = goodbye({ source: ['hey'], sink: collect })
await pipe(s, socket, s)
} catch (err) {
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
} finally {
await delayedListener.close()
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it('abort while writing does not throw AbortError', async () => {
// Record values received by the listener
const recorded = []
async function * recorderTransform (source) {
for await (const val of source) {
recorded.push(val)
yield val
}
}
const recordListener = transport.createListener(async (conn) => {
await pipe(conn, recorderTransform, conn)
})
await recordListener.listen(addrs[1])
// Create an abort signal and dial the socket
const controller = new AbortController()
const socket = await transport.dial(addrs[1], { signal: controller.signal })
// Set a timeout to abort before writing has completed
setTimeout(() => controller.abort(), 100)
try {
// The pipe should write to the socket until aborted
await pipe(
async function * () {
yield 'hey'
await new Promise((resolve) => setTimeout(resolve, 200))
yield 'there'
},
socket)
expect(recorded.length).to.eql(1)
expect(recorded[0].toString()).to.eql('hey')
} finally {
await recordListener.close()
}
})
})
}