diff --git a/package.json b/package.json index 47a12a5..f555656 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ }, "dependencies": { "abort-controller": "^3.0.0", + "abortable-iterator": "^2.0.0", "class-is": "^1.1.0", "debug": "^4.1.1", "err-code": "^1.1.2", diff --git a/src/index.js b/src/index.js index 288c41e..46f47b8 100644 --- a/src/index.js +++ b/src/index.js @@ -18,7 +18,7 @@ function noop () {} class TCP { async dial (ma, options) { const cOpts = ma.toOptions() - log('Connecting to %s:%s', cOpts.host, cOpts.port) + log('Dialing %s:%s', cOpts.host, cOpts.port) const rawSocket = await this._connect(cOpts, options) return new Libp2pSocket(rawSocket, ma, options) @@ -34,13 +34,14 @@ class TCP { const rawSocket = net.connect(cOpts) const onError = (err) => { - const msg = `Error connecting to ${cOpts.host}:${cOpts.port}: ${err.message}` + const msg = `Error dialing ${cOpts.host}:${cOpts.port}: ${err.message}` done(errcode(msg, err.code)) } const onTimeout = () => { - log('Timeout connecting to %s:%s', cOpts.host, cOpts.port) + log('Timeout dialing %s:%s', cOpts.host, cOpts.port) const err = errcode(`Timeout after ${Date.now() - start}ms`, 'ETIMEDOUT') + // Note: this will result in onError() being called rawSocket.emit('error', err) } @@ -50,7 +51,7 @@ class TCP { } const onAbort = () => { - log('Connect to %s:%s aborted', cOpts.host, cOpts.port) + log('Dial to %s:%s aborted', cOpts.host, cOpts.port) rawSocket.destroy() done(new AbortError()) } @@ -59,7 +60,8 @@ class TCP { rawSocket.removeListener('error', onError) rawSocket.removeListener('timeout', onTimeout) rawSocket.removeListener('connect', onConnect) - options.signal && options.signal.removeEventListener(onAbort) + + options.signal && options.signal.removeEventListener('abort', onAbort) err ? reject(err) : resolve(res) } diff --git a/src/listener.js b/src/listener.js index ec40998..8093360 100644 --- a/src/listener.js +++ b/src/listener.js @@ -119,7 +119,7 @@ class Listener extends EventEmitter { return resolve() } - log('Listening on %s %s', lOpts.port, lOpts.host) + log('Listening on %s:%s', lOpts.host, lOpts.port) resolve() }) }) diff --git a/src/socket.js b/src/socket.js index 345484e..c8b1507 100644 --- a/src/socket.js +++ b/src/socket.js @@ -1,27 +1,48 @@ 'use strict' +const abortable = require('abortable-iterator') const debug = require('debug') const log = debug('libp2p:tcp:socket') const c = require('./constants') class Libp2pSocket { - constructor (rawSocket, ma, opts) { + constructor (rawSocket, ma, opts = {}) { this._rawSocket = rawSocket this._ma = ma this.sink = this._sink(opts) - this.source = rawSocket + this.source = opts.signal ? abortable(rawSocket, opts.signal) : rawSocket } - _sink (opts = {}) { + _sink (opts) { // By default, close when the source is exhausted const closeOnEnd = opts.closeOnEnd !== false - return (source) => this._write(source, closeOnEnd) + + return async (source) => { + try { + const src = opts.signal ? abortable(source, opts.signal) : source + await this._write(src, closeOnEnd) + } catch (err) { + // If the connection is aborted just close the socket + if (err.type === 'aborted') { + return this.close() + } + + throw err + } + } } async _write (source, closeOnEnd) { for await (const data of source) { + if (this._rawSocket.destroyed) { + const cOpts = this._ma.toOptions() + log('Cannot write %d bytes to destroyed socket %s:%s', + data.length, cOpts.host, cOpts.port) + return + } + const flushed = this._rawSocket.write(data) if (!flushed) { await new Promise((resolve) => this._rawSocket.once('drain', resolve))