From dc38263ec4ecf22bfc3b06ef9fff77ea9ca4fd48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Kr=C3=BCger?= Date: Thu, 2 Aug 2018 17:23:27 +0200 Subject: [PATCH] feat: Use pull net --- package.json | 1 + src/get-multiaddr.js | 14 ++++++------ src/index.js | 21 ++---------------- src/listener.js | 53 +++++++++----------------------------------- 4 files changed, 20 insertions(+), 69 deletions(-) diff --git a/package.json b/package.json index e2f5bb6..3e61a56 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "mafmt": "^6.0.0", "multiaddr": "^4.0.0", "once": "^1.4.0", + "pull-net": "github:mkg20001/pull-net", "stream-to-pull-stream": "^1.7.2" }, "contributors": [ diff --git a/src/get-multiaddr.js b/src/get-multiaddr.js index 4be7243..49e8acb 100644 --- a/src/get-multiaddr.js +++ b/src/get-multiaddr.js @@ -9,22 +9,22 @@ module.exports = (socket) => { let ma try { - if (socket.remoteFamily === 'IPv6') { - const addr = new Address6(socket.remoteAddress) + if (socket.remoteAddress.family === 'IPv6') { + const addr = new Address6(socket.remoteAddress.address) if (addr.v4) { const ip4 = addr.to4().correctForm() ma = multiaddr('/ip4/' + ip4 + - '/tcp/' + socket.remotePort + '/tcp/' + socket.remoteAddress.port ) } else { - ma = multiaddr('/ip6/' + socket.remoteAddress + - '/tcp/' + socket.remotePort + ma = multiaddr('/ip6/' + socket.remoteAddress.address + + '/tcp/' + socket.remoteAddress.port ) } } else { - ma = multiaddr('/ip4/' + socket.remoteAddress + - '/tcp/' + socket.remotePort) + ma = multiaddr('/ip4/' + socket.remoteAddress.address + + '/tcp/' + socket.remoteAddress.port) } } catch (err) { log(err) diff --git a/src/index.js b/src/index.js index b04c223..d4281b5 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,6 @@ 'use strict' -const net = require('net') -const toPull = require('stream-to-pull-stream') +const {connect} = require('pull-net') const mafmt = require('mafmt') const withIs = require('class-is') const includes = require('lodash.includes') @@ -27,23 +26,7 @@ class TCP { const cOpts = ma.toOptions() log('Connecting to %s %s', cOpts.port, cOpts.host) - const rawSocket = net.connect(cOpts) - - rawSocket.once('timeout', () => { - log('timeout') - rawSocket.emit('error', new Error('Timeout')) - }) - - rawSocket.once('error', callback) - - rawSocket.once('connect', () => { - rawSocket.removeListener('error', callback) - callback() - }) - - const socket = toPull.duplex(rawSocket) - - const conn = new Connection(socket) + const conn = new Connection(connect(cOpts.port, cOpts.host, callback)) conn.getObservedAddrs = (callback) => { return callback(null, [ma]) diff --git a/src/listener.js b/src/listener.js index 30b3076..d5c48ea 100644 --- a/src/listener.js +++ b/src/listener.js @@ -4,29 +4,23 @@ const multiaddr = require('multiaddr') const Connection = require('interface-connection').Connection const os = require('os') const includes = require('lodash.includes') -const net = require('net') -const toPull = require('stream-to-pull-stream') +const {createServer} = require('pull-net') const EventEmitter = require('events').EventEmitter const debug = require('debug') const log = debug('libp2p:tcp:listen') const getMultiaddr = require('./get-multiaddr') - const IPFS_CODE = 421 -const CLOSE_TIMEOUT = 2000 function noop () {} module.exports = (handler) => { const listener = new EventEmitter() - const server = net.createServer((socket) => { - // Avoid uncaught errors cause by unstable connections - socket.on('error', noop) - - const addr = getMultiaddr(socket) + const server = createServer((stream) => { + const addr = getMultiaddr(stream) if (!addr) { - if (socket.remoteAddress === undefined) { + if (stream.remoteAddress === undefined || stream.remoteAddress.address === 'undefined') { log('connection closed before p2p connection made') } else { log('error interpreting incoming p2p connection') @@ -36,25 +30,16 @@ module.exports = (handler) => { log('new connection', addr.toString()) - const s = toPull.duplex(socket) - - s.getObservedAddrs = (cb) => { + stream.getObservedAddrs = (cb) => { cb(null, [addr]) } - trackSocket(server, socket) - - const conn = new Connection(s) + const conn = new Connection(stream) handler(conn) listener.emit('connection', conn) }) - server.on('listening', () => listener.emit('listening')) - server.on('error', (err) => listener.emit('error', err)) - server.on('close', () => listener.emit('close')) - - // Keep track of open connections to destroy in case of timeout - server.__connections = {} + listener.emit('listening') listener.close = (options, callback) => { if (typeof options === 'function') { @@ -64,18 +49,9 @@ module.exports = (handler) => { callback = callback || noop options = options || {} - const timeout = setTimeout(() => { - log('unable to close graciously, destroying conns') - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() - }) - }, options.timeout || CLOSE_TIMEOUT) - - server.close(callback) - - server.once('close', () => { - clearTimeout(timeout) + server.close((err, ...a) => { + listener.emit('close') + callback(err, ...a) }) } @@ -145,12 +121,3 @@ function getIpfsId (ma) { return tuple[0] === IPFS_CODE })[0][1] } - -function trackSocket (server, socket) { - const key = `${socket.remoteAddress}:${socket.remotePort}` - server.__connections[key] = socket - - socket.on('close', () => { - delete server.__connections[key] - }) -}