diff --git a/package.json b/package.json index baf84d2..982b5f3 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "aegir": "^18.1.1", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.6", "lodash.isfunction": "^3.0.9", "pull-stream": "^3.6.9", "sinon": "^7.3.1" @@ -48,15 +47,14 @@ "dependencies": { "abort-controller": "^3.0.0", "class-is": "^1.1.0", - "debug": "^3.1.0", "err-code": "^1.1.2", "interface-connection": "~0.3.3", - "interface-transport": "libp2p/interface-transport#feat/async-await", + "interface-transport": "libp2p/interface-transport#feat/listen-array", "ip-address": "^5.8.9", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.9", - "mafmt": "^6.0.2", - "multiaddr": "^5.0.0" + "mafmt": "^6.0.7", + "multiaddr": "^6.0.6" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index e59da74..288c41e 100644 --- a/src/index.js +++ b/src/index.js @@ -78,7 +78,7 @@ class TCP { } handler = handler || noop - return createListener(handler) + return createListener(options, handler) } filter (multiaddrs) { diff --git a/src/listener.js b/src/listener.js index b7764b9..bb27c34 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,11 +2,12 @@ const multiaddr = require('multiaddr') const os = require('os') -const includes = require('lodash.includes') const net = require('net') const EventEmitter = require('events').EventEmitter +const { AllListenersFailedError } = require('interface-transport') const debug = require('debug') const log = debug('libp2p:tcp:listen') +log.error = debug('libp2p:tcp:listen:error') const Libp2pSocket = require('./socket') const getMultiaddr = require('./get-multiaddr') @@ -14,10 +15,143 @@ const c = require('./constants') function noop () {} -module.exports = (handler) => { - const listener = new EventEmitter() +class Listener extends EventEmitter { + /** + * + * @param {object} options + * @param {function(Connection)} handler + */ + constructor (options, handler) { + super() + this._options = options + this._connectionHandler = handler + this._servers = new Set() + this.__connections = new Map() + } - const server = net.createServer((socket) => { + close (options = {}) { + if ([...this._servers].filter(server => server.listening).length === 0) { + return + } + + // Close all running servers in parallel + return Promise.all( + [...this._servers].map(server => { + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to stop the server. If it takes longer than the timeout, + // destroy all the underlying sockets manually. + const timeout = setTimeout(() => { + log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) + resolve() + }, options.timeout || c.CLOSE_TIMEOUT) + + server.once('close', () => { + clearTimeout(timeout) + this._servers.delete(server) + }) + + server.close((err) => err ? reject(err) : resolve()) + }) + }) + ).then(() => { + this.__connections.forEach((connection, key) => { + log('destroying %s', key) + connection.destroy() + }) + this.__connections.clear() + this._servers.clear() + }) + } + + async listen (addrs) { + addrs = Array.isArray(addrs) ? addrs : [addrs] + + let listeners = [] + let errors = [] + + // Filter out duplicate ports, unless it's port 0 + addrs = uniqueBy(addrs, (addr) => { + const port = Number(addr.toOptions().port) + return isNaN(port) || port === 0 ? addr.toString() : port + }) + + for (const ma of addrs) { + const lOpts = ma.toOptions() + + listeners.push( + new Promise((resolve) => { + const server = net.createServer(this._onSocket.bind(this)) + this._servers.add(server) + // TODO: clean these up + server.on('listening', () => this.emit('listening')) + server.on('close', () => this.emit('close')) + server.on('error', (err) => this.emit('error', err)) + + server.listen(lOpts.port, lOpts.host, (err) => { + if (err) { + errors.push(err) + return resolve() + } + + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) + }) + ) + } + + return Promise.all(listeners) + .then(() => { + errors.forEach((err) => { + log.error('received an error while attempting to listen', err) + }) + + // All servers failed to listen, throw an error + if (errors.length === listeners.length) { + throw new AllListenersFailedError() + } + }) + } + + getAddrs () { + const multiaddrs = [] + this._servers.forEach(server => { + const address = server.address() + + if (address.address === '0.0.0.0') { + const netInterfaces = os.networkInterfaces() + Object.keys(netInterfaces).forEach((niKey) => { + netInterfaces[niKey].forEach((ni) => { + if (ni.internal === false && ni.family === address.family) { + multiaddrs.push( + multiaddr.fromNodeAddress({ + ...address, + address: ni.address + }, 'tcp') + ) + } + }) + }) + // TODO: handle IPv6 wildcard + } else { + multiaddrs.push(multiaddr.fromNodeAddress(address, 'tcp')) + } + }) + + if (multiaddrs.length === 0) { + throw new Error('Listener is not ready yet') + } + + return multiaddrs + } + + /** + * Handler for new sockets from `net.createServer` + * @param {net.Socket} socket + */ + _onSocket (socket) { // Avoid uncaught errors caused by unstable connections socket.on('error', noop) @@ -34,124 +168,30 @@ module.exports = (handler) => { log('new connection', addr.toString()) const s = new Libp2pSocket(socket, addr) - trackSocket(server, socket) - handler && handler(s) - listener.emit('connection', s) - }) - - server.on('listening', () => listener.emit('listening')) - server.on('error', (err) => listener.emit('error', err)) - server.on('close', () => listener.emit('close')) - - // Keep track of open connections to destroy in case of timeout - server.__connections = {} - - listener.close = (options = {}) => { - if (!server.listening) { - return - } - - return new Promise((resolve, reject) => { - const start = Date.now() - - // Attempt to stop the server. If it takes longer than the timeout, - // destroy all the underlying sockets manually. - const timeout = setTimeout(() => { - log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() - }) - resolve() - }, options.timeout || c.CLOSE_TIMEOUT) - - server.once('close', () => clearTimeout(timeout)) - - server.close((err) => err ? reject(err) : resolve()) + // Track the connection + const key = `${socket.remoteAddress}:${socket.remotePort}` + this.__connections.set(key, socket) + socket.once('close', () => { + this.__connections.delete(key) }) + + this._connectionHandler(s) + this.emit('connection', s) } - - let ipfsId - let listeningAddr - - listener.listen = (ma) => { - listeningAddr = ma - if (includes(ma.protoNames(), 'ipfs')) { - ipfsId = getIpfsId(ma) - listeningAddr = ma.decapsulate('ipfs') - } - - const lOpts = listeningAddr.toOptions() - return new Promise((resolve, reject) => { - server.listen(lOpts.port, lOpts.host, (err) => { - if (err) { - return reject(err) - } - - log('Listening on %s %s', lOpts.port, lOpts.host) - resolve() - }) - }) - } - - listener.getAddrs = () => { - const multiaddrs = [] - const address = server.address() - - if (!address) { - throw new Error('Listener is not ready yet') - } - - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (listeningAddr.toString().indexOf('ip4') !== -1) { - let m = listeningAddr.decapsulate('tcp') - m = m.encapsulate('/tcp/' + address.port) - if (ipfsId) { - m = m.encapsulate('/ipfs/' + ipfsId) - } - - if (m.toString().indexOf('0.0.0.0') !== -1) { - const netInterfaces = os.networkInterfaces() - Object.keys(netInterfaces).forEach((niKey) => { - netInterfaces[niKey].forEach((ni) => { - if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) - } - }) - }) - } else { - multiaddrs.push(m) - } - } - - if (address.family === 'IPv6') { - let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port) - if (ipfsId) { - ma = ma.encapsulate('/ipfs/' + ipfsId) - } - - multiaddrs.push(ma) - } - - return multiaddrs - } - - return listener } -function getIpfsId (ma) { - return ma.stringTuples().filter((tuple) => { - return tuple[0] === c.IPFS_MA_CODE - })[0][1] +module.exports = (options, handler) => { + return new Listener(options, handler) } -function trackSocket (server, socket) { - const key = `${socket.remoteAddress}:${socket.remotePort}` - server.__connections[key] = socket - - socket.once('close', () => { - delete server.__connections[key] - }) +/** + * Get unique values from `arr` using `getValue` to determine + * what is used for uniqueness + * @param {Array} arr The array to get unique values for + * @param {function(value)} getValue The function to determine what is compared + * @returns {Array} + */ +function uniqueBy (arr, getValue) { + return [...new Map(arr.map((i) => [getValue(i), i])).values()] } diff --git a/test/adapter/listen-dial.spec.js b/test/adapter/listen-dial.spec.js index 97e6980..72c3cfc 100644 --- a/test/adapter/listen-dial.spec.js +++ b/test/adapter/listen-dial.spec.js @@ -114,14 +114,14 @@ describe('listen', () => { }) }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs does not preserve IPFS Id', (done) => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) listener.listen(mh, () => { listener.getAddrs((err, multiaddrs) => { expect(err).to.not.exist() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) + expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) listener.close(done) }) }) diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index e14e759..f05be66 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -115,14 +115,14 @@ describe('listen', () => { await listener.close() }) - it('getAddrs preserves IPFS Id', async () => { + it('getAddrs does not preserve IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) + expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) await listener.close() })