From 16a6b55960846a3e78eb4666f1aee41e8647909c Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 18 Apr 2019 09:52:49 +0100 Subject: [PATCH] feat: abortable dials License: MIT Signed-off-by: Alan Shaw --- package.json | 2 ++ src/index.js | 50 ++++++++++++++++++++++++++++++++++++++++++++++---- test/node.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 4c25573..17b5fb7 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "abortable-iterator": "^1.0.4", "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", @@ -49,6 +50,7 @@ "multiaddr-to-uri": "^4.0.1" }, "devDependencies": { + "abort-controller": "^3.0.0", "aegir": "^18.0.3", "chai": "^4.1.2", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index ab4bbf9..16110e3 100644 --- a/src/index.js +++ b/src/index.js @@ -5,17 +5,59 @@ const mafmt = require('mafmt') const withIs = require('class-is') const toUri = require('multiaddr-to-uri') const log = require('debug')('libp2p:websockets:transport') - +const abortable = require('abortable-iterator') const createListener = require('./listener') +const { AbortError } = abortable class WebSockets { async dial (ma, options) { + options = options || {} log('dialing %s', ma) + const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) - await socket.connected() - socket.getObservedAddrs = () => [ma] + const getObservedAddrs = () => [ma] + + if (!options.signal) { + socket.getObservedAddrs = getObservedAddrs + await socket.connected() + log('connected %s', ma) + return socket + } + + // Allow abort via signal during connect + let onAbort + const abort = new Promise((resolve, reject) => { + onAbort = () => { + socket.close() + reject(new AbortError('connection aborted')) + } + + // Already aborted? + if (options.signal.aborted) return onAbort() + options.signal.addEventListener('abort', onAbort) + }) + + try { + await Promise.race([abort, socket.connected()]) + } finally { + options.signal.removeEventListener('abort', onAbort) + } + log('connected %s', ma) - return socket + return { + sink: async source => { + try { + await socket.sink(abortable(source, options.signal)) + } catch (err) { + // Re-throw non-aborted errors + if (err.type !== 'aborted') throw err + // Otherwise, this is fine... + await socket.close() + } + }, + source: abortable(socket.source, options.signal), + getObservedAddrs + } } createListener (options, handler) { diff --git a/test/node.js b/test/node.js index b7fe695..14574a0 100644 --- a/test/node.js +++ b/test/node.js @@ -10,6 +10,7 @@ const multiaddr = require('multiaddr') const goodbye = require('it-goodbye') const { collect, consume } = require('streaming-iterables') const pipe = require('it-pipe') +const AbortController = require('abort-controller') const WS = require('../src') @@ -207,6 +208,49 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) + + it('should be abortable after connect', async () => { + const controller = new AbortController() + const conn = await ws.dial(ma, { signal: controller.signal }) + const s = goodbye({ + source: { + [Symbol.asyncIterator] () { + return this + }, + next () { + return new Promise(resolve => { + setTimeout(() => resolve(Math.random()), 1000) + }) + } + }, + sink: consume + }) + + setTimeout(() => controller.abort(), 500) + + try { + await pipe(s, conn, s) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) + + it('should be abortable before connect', async () => { + const controller = new AbortController() + controller.abort() // Abort before connect + + try { + await ws.dial(ma, { signal: controller.signal }) + } catch (err) { + expect(err.type).to.equal('aborted') + return + } + + throw new Error('connection was not aborted') + }) }) describe('ip6', () => {