feat: abort after connect

This commit is contained in:
Dirk McCormick 2019-04-23 15:48:45 +08:00 committed by Jacob Heun
parent d5be5ba7ed
commit bba2084476
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
4 changed files with 34 additions and 10 deletions

View File

@ -46,6 +46,7 @@
}, },
"dependencies": { "dependencies": {
"abort-controller": "^3.0.0", "abort-controller": "^3.0.0",
"abortable-iterator": "^2.0.0",
"class-is": "^1.1.0", "class-is": "^1.1.0",
"debug": "^4.1.1", "debug": "^4.1.1",
"err-code": "^1.1.2", "err-code": "^1.1.2",

View File

@ -18,7 +18,7 @@ function noop () {}
class TCP { class TCP {
async dial (ma, options) { async dial (ma, options) {
const cOpts = ma.toOptions() const cOpts = ma.toOptions()
log('Connecting to %s:%s', cOpts.host, cOpts.port) log('Dialing %s:%s', cOpts.host, cOpts.port)
const rawSocket = await this._connect(cOpts, options) const rawSocket = await this._connect(cOpts, options)
return new Libp2pSocket(rawSocket, ma, options) return new Libp2pSocket(rawSocket, ma, options)
@ -34,13 +34,14 @@ class TCP {
const rawSocket = net.connect(cOpts) const rawSocket = net.connect(cOpts)
const onError = (err) => { const onError = (err) => {
const msg = `Error connecting to ${cOpts.host}:${cOpts.port}: ${err.message}` const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}`
done(errcode(msg, err.code)) done(errcode(msg, err.code))
} }
const onTimeout = () => { const onTimeout = () => {
log('Timeout connecting to %s:%s', cOpts.host, cOpts.port) log('Timeout dialing %s:%s', cOpts.host, cOpts.port)
const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT') const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT')
// Note: this will result in onError() being called
rawSocket.emit('error', err) rawSocket.emit('error', err)
} }
@ -50,7 +51,7 @@ class TCP {
} }
const onAbort = () => { const onAbort = () => {
log('Connect to %s:%s aborted', cOpts.host, cOpts.port) log('Dial to %s:%s aborted', cOpts.host, cOpts.port)
rawSocket.destroy() rawSocket.destroy()
done(new AbortError()) done(new AbortError())
} }
@ -59,7 +60,8 @@ class TCP {
rawSocket.removeListener('error', onError) rawSocket.removeListener('error', onError)
rawSocket.removeListener('timeout', onTimeout) rawSocket.removeListener('timeout', onTimeout)
rawSocket.removeListener('connect', onConnect) rawSocket.removeListener('connect', onConnect)
options.signal && options.signal.removeEventListener(onAbort)
options.signal && options.signal.removeEventListener('abort', onAbort)
err ? reject(err) : resolve(res) err ? reject(err) : resolve(res)
} }

View File

@ -119,7 +119,7 @@ class Listener extends EventEmitter {
return resolve() return resolve()
} }
log('Listening on %s %s', lOpts.port, lOpts.host) log('Listening on %s:%s', lOpts.host, lOpts.port)
resolve() resolve()
}) })
}) })

View File

@ -1,27 +1,48 @@
'use strict' 'use strict'
const abortable = require('abortable-iterator')
const debug = require('debug') const debug = require('debug')
const log = debug('libp2p:tcp:socket') const log = debug('libp2p:tcp:socket')
const c = require('./constants') const c = require('./constants')
class Libp2pSocket { class Libp2pSocket {
constructor (rawSocket, ma, opts) { constructor (rawSocket, ma, opts = {}) {
this._rawSocket = rawSocket this._rawSocket = rawSocket
this._ma = ma this._ma = ma
this.sink = this._sink(opts) this.sink = this._sink(opts)
this.source = rawSocket this.source = opts.signal ? abortable(rawSocket, opts.signal) : rawSocket
} }
_sink (opts = {}) { _sink (opts) {
// By default, close when the source is exhausted // By default, close when the source is exhausted
const closeOnEnd = opts.closeOnEnd !== false const closeOnEnd = opts.closeOnEnd !== false
return (source) => this._write(source, closeOnEnd)
return async (source) => {
try {
const src = opts.signal ? abortable(source, opts.signal) : source
await this._write(src, closeOnEnd)
} catch (err) {
// If the connection is aborted just close the socket
if (err.type === 'aborted') {
return this.close()
}
throw err
}
}
} }
async _write (source, closeOnEnd) { async _write (source, closeOnEnd) {
for await (const data of source) { for await (const data of source) {
if (this._rawSocket.destroyed) {
const cOpts = this._ma.toOptions()
log('Cannot write %d bytes to destroyed socket %s:%s',
data.length, cOpts.host, cOpts.port)
return
}
const flushed = this._rawSocket.write(data) const flushed = this._rawSocket.write(data)
if (!flushed) { if (!flushed) {
await new Promise((resolve) => this._rawSocket.once('drain', resolve)) await new Promise((resolve) => this._rawSocket.once('drain', resolve))