From e204517a51dec4027568f9c20840326da38b7093 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 7 Apr 2017 11:51:56 -0400 Subject: [PATCH] fix: catch errors on incomming sockets --- package.json | 4 ++-- src/index.js | 27 +++++++++++++++------------ src/listener.js | 44 ++++++++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index f947e41..e5d2a8e 100644 --- a/package.json +++ b/package.json @@ -46,8 +46,8 @@ "ip-address": "^5.8.6", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.8", - "mafmt": "^2.1.7", - "multiaddr": "^2.2.3", + "mafmt": "^2.1.8", + "multiaddr": "^2.3.0", "once": "^1.4.0", "stream-to-pull-stream": "^1.7.2" }, diff --git a/src/index.js b/src/index.js index da47d47..3e8e91d 100644 --- a/src/index.js +++ b/src/index.js @@ -12,40 +12,41 @@ const log = debug('libp2p:tcp:dial') const createListener = require('./listener') -module.exports = class TCP { - dial (ma, options, cb) { +function noop () {} + +class TCP { + dial (ma, options, callback) { if (isFunction(options)) { - cb = options + callback = options options = {} } - if (!cb) { - cb = () => {} - } + callback = callback || noop - cb = once(cb) + callback = once(callback) const cOpts = ma.toOptions() log('Connecting to %s %s', cOpts.port, cOpts.host) const rawSocket = net.connect(cOpts) + rawSocket.once('timeout', () => { log('timeout') rawSocket.emit('error', new Error('Timeout')) }) - rawSocket.once('error', cb) + rawSocket.once('error', callback) rawSocket.once('connect', () => { - rawSocket.removeListener('error', cb) - cb() + rawSocket.removeListener('error', callback) + callback() }) const socket = toPull.duplex(rawSocket) const conn = new Connection(socket) - conn.getObservedAddrs = (cb) => { - return cb(null, [ma]) + conn.getObservedAddrs = (callback) => { + return callback(null, [ma]) } return conn @@ -74,3 +75,5 @@ module.exports = class TCP { }) } } + +module.exports = TCP diff --git a/src/listener.js b/src/listener.js index b8d4859..873ca44 100644 --- a/src/listener.js +++ b/src/listener.js @@ -14,17 +14,22 @@ const getMultiaddr = require('./get-multiaddr') const IPFS_CODE = 421 const CLOSE_TIMEOUT = 2000 +function noop () {} module.exports = (handler) => { const listener = new EventEmitter() const server = net.createServer((socket) => { + // Avoid uncaught errors cause by unstable connections + socket.on('error', noop) + const addr = getMultiaddr(socket) log('new connection', addr.toString()) const s = toPull.duplex(socket) + s.getObservedAddrs = (cb) => { - return cb(null, [addr]) + cb(null, [addr]) } trackSocket(server, socket) @@ -34,36 +39,31 @@ module.exports = (handler) => { listener.emit('connection', conn) }) - server.on('listening', () => { - listener.emit('listening') - }) - - server.on('error', (err) => { - listener.emit('error', err) - }) - - server.on('close', () => { - listener.emit('close') - }) + server.on('listening', () => listener.emit('listening')) + server.on('error', (err) => listener.emit('error', err)) + server.on('close', () => listener.emit('close')) // Keep track of open connections to destroy in case of timeout server.__connections = {} - listener.close = (options, cb) => { + listener.close = (options, callback) => { if (typeof options === 'function') { - cb = options + callback = options options = {} } - cb = cb || (() => {}) + callback = callback || noop options = options || {} let closed = false - server.close(cb) + server.close(callback) + server.once('close', () => { closed = true }) setTimeout(() => { - if (closed) return + if (closed) { + return + } log('unable to close graciously, destroying conns') Object.keys(server.__connections).forEach((key) => { @@ -76,7 +76,7 @@ module.exports = (handler) => { let ipfsId let listeningAddr - listener.listen = (ma, cb) => { + listener.listen = (ma, callback) => { listeningAddr = ma if (includes(ma.protoNames(), 'ipfs')) { ipfsId = getIpfsId(ma) @@ -85,15 +85,15 @@ module.exports = (handler) => { const lOpts = listeningAddr.toOptions() log('Listening on %s %s', lOpts.port, lOpts.host) - return server.listen(lOpts.port, lOpts.host, cb) + return server.listen(lOpts.port, lOpts.host, callback) } - listener.getAddrs = (cb) => { + listener.getAddrs = (callback) => { const multiaddrs = [] const address = server.address() if (!address) { - return cb(new Error('Listener is not ready yet')) + return callback(new Error('Listener is not ready yet')) } // Because TCP will only return the IPv6 version @@ -128,7 +128,7 @@ module.exports = (handler) => { multiaddrs.push(ma) } - cb(null, multiaddrs) + callback(null, multiaddrs) } return listener