feat: abortable dials

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
This commit is contained in:
Alan Shaw
2019-04-18 09:52:49 +01:00
parent 24d0a6132a
commit 16a6b55960
3 changed files with 92 additions and 4 deletions

View File

@ -40,6 +40,7 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"abortable-iterator": "^1.0.4",
"async-iterator-to-pull-stream": "^1.3.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
@ -49,6 +50,7 @@
"multiaddr-to-uri": "^4.0.1"
},
"devDependencies": {
"abort-controller": "^3.0.0",
"aegir": "^18.0.3",
"chai": "^4.1.2",
"dirty-chai": "^2.0.1",

View File

@ -5,17 +5,59 @@ const mafmt = require('mafmt')
const withIs = require('class-is')
const toUri = require('multiaddr-to-uri')
const log = require('debug')('libp2p:websockets:transport')
const abortable = require('abortable-iterator')
const createListener = require('./listener')
const { AbortError } = abortable
class WebSockets {
async dial (ma, options) {
options = options || {}
log('dialing %s', ma)
const socket = connect(toUri(ma), Object.assign({ binary: true }, options))
await socket.connected()
socket.getObservedAddrs = () => [ma]
const getObservedAddrs = () => [ma]
if (!options.signal) {
socket.getObservedAddrs = getObservedAddrs
await socket.connected()
log('connected %s', ma)
return socket
}
// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
socket.close()
reject(new AbortError('connection aborted'))
}
// Already aborted?
if (options.signal.aborted) return onAbort()
options.signal.addEventListener('abort', onAbort)
})
try {
await Promise.race([abort, socket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}
log('connected %s', ma)
return socket
return {
sink: async source => {
try {
await socket.sink(abortable(source, options.signal))
} catch (err) {
// Re-throw non-aborted errors
if (err.type !== 'aborted') throw err
// Otherwise, this is fine...
await socket.close()
}
},
source: abortable(socket.source, options.signal),
getObservedAddrs
}
}
createListener (options, handler) {

View File

@ -10,6 +10,7 @@ const multiaddr = require('multiaddr')
const goodbye = require('it-goodbye')
const { collect, consume } = require('streaming-iterables')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')
const WS = require('../src')
@ -207,6 +208,49 @@ describe('dial', () => {
expect(result).to.be.eql([Buffer.from('hey')])
})
it('should be abortable after connect', async () => {
const controller = new AbortController()
const conn = await ws.dial(ma, { signal: controller.signal })
const s = goodbye({
source: {
[Symbol.asyncIterator] () {
return this
},
next () {
return new Promise(resolve => {
setTimeout(() => resolve(Math.random()), 1000)
})
}
},
sink: consume
})
setTimeout(() => controller.abort(), 500)
try {
await pipe(s, conn, s)
} catch (err) {
expect(err.type).to.equal('aborted')
return
}
throw new Error('connection was not aborted')
})
it('should be abortable before connect', async () => {
const controller = new AbortController()
controller.abort() // Abort before connect
try {
await ws.dial(ma, { signal: controller.signal })
} catch (err) {
expect(err.type).to.equal('aborted')
return
}
throw new Error('connection was not aborted')
})
})
describe('ip6', () => {