diff --git a/.travis.yml b/.travis.yml index c415b39..1898be4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,6 @@ after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codeco jobs: include: - stage: check - os: linux script: - npx aegir build --bundlesize - npx aegir dep-check diff --git a/README.md b/README.md index 9bfd1fe..f2b0803 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') const { collect } = require('streaming-iterables') -const addr = multiaddr('/ip4/127.0.0.1/tcp/9090') +const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') const tcp = new TCP() @@ -56,10 +56,10 @@ const listener = tcp.createListener((socket) => { ) }) -await listener.listen([addr]) +await listener.listen(mh) console.log('listening') -const socket = await tcp.dial(addr) +const socket = await tcp.dial(mh) const values = await pipe( socket, collect diff --git a/package.json b/package.json index 1a05e63..820493c 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", + "interface-transport": "~0.3.6", "lodash.isfunction": "^3.0.9", "pull-stream": "^3.6.9", "sinon": "^7.3.1" @@ -47,17 +48,15 @@ "dependencies": { "abortable-iterator": "^2.0.0", "class-is": "^1.1.0", - "debug": "^4.1.1", + "debug": "^3.1.0", "err-code": "^1.1.2", "interface-connection": "~0.3.3", - "interface-transport": "~0.4.0", + "interface-transport": "libp2p/interface-transport#feat/async-await", "ip-address": "^5.8.9", - "it-pipe": "^1.0.0", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.9", - "mafmt": "^6.0.7", - "multiaddr": "^6.0.6", - "streaming-iterables": "^4.1.0" + "mafmt": "^6.0.2", + "multiaddr": "^5.0.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index 46f47b8..000ac4d 100644 --- a/src/index.js +++ b/src/index.js @@ -80,7 +80,7 @@ class TCP { } handler = handler || noop - return createListener(options, handler) + return createListener(handler) } filter (multiaddrs) { diff --git a/src/listener.js b/src/listener.js index 8093360..b7764b9 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,12 +2,11 @@ const multiaddr = require('multiaddr') const os = require('os') +const includes = require('lodash.includes') const net = require('net') const EventEmitter = require('events').EventEmitter -const { AllListenersFailedError } = require('interface-transport') const debug = require('debug') const log = debug('libp2p:tcp:listen') -log.error = debug('libp2p:tcp:listen:error') const Libp2pSocket = require('./socket') const getMultiaddr = require('./get-multiaddr') @@ -15,187 +14,10 @@ const c = require('./constants') function noop () {} -class Listener extends EventEmitter { - /** - * @constructor - * @param {object} options - * @param {function(Connection)} handler - */ - constructor (options, handler) { - super() - this._options = options - this._connectionHandler = handler - this._servers = new Set() - this.__connections = new Map() - } +module.exports = (handler) => { + const listener = new EventEmitter() - /** - * Whether or not there is currently at least 1 server listening - * @private - * @returns {boolean} - */ - _isListening () { - return [...this._servers].filter(server => server.listening).length > 0 - } - - /** - * Closes all open servers - * @param {object} options - * @param {number} options.timeout how long before closure is forced, defaults to 2000 ms - * @returns {Promise} - */ - close (options = {}) { - if (!this._isListening()) { - return - } - - // Close all running servers in parallel - return Promise.all( - [...this._servers].map(server => { - return new Promise((resolve) => { - const start = Date.now() - - // Attempt to stop the server. If it takes longer than the timeout, - // resolve the promise. Any remaining connections will be destroyed after - const timeout = setTimeout(() => { - log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) - resolve() - }, options.timeout || c.CLOSE_TIMEOUT) - - // Just clear the timeout, cleanup listeners are added on server creation - server.once('close', () => clearTimeout(timeout)) - server.close((err) => { - // log the error and resolve so we don't exit early - err && log.error('an error occurred closing the server', err) - resolve() - }) - }) - }) - ).then(() => { - // Destroy all remaining connections - this.__connections.forEach((connection, key) => { - log('destroying %s', key) - connection.destroy() - }) - this.__connections.clear() - this._servers.clear() - }) - } - - /** - * Creates servers listening on the given `addrs` - * @async - * @param {Array} addrs - */ - async listen (addrs) { - addrs = Array.isArray(addrs) ? addrs : [addrs] - - let listeners = [] - let errors = [] - - // Filter out duplicate ports, unless it's port 0 - addrs = uniqueBy(addrs, (addr) => { - const port = Number(addr.toOptions().port) - return isNaN(port) || port === 0 ? addr.toString() : port - }) - - for (const ma of addrs) { - const lOpts = ma.toOptions() - - listeners.push( - new Promise((resolve) => { - const server = net.createServer(this._onSocket.bind(this)) - this._servers.add(server) - - server.on('listening', () => this.emit('listening')) - server.on('close', () => { - this._removeServer(server) - }) - server.on('error', (err) => this.emit('error', err)) - - server.listen(lOpts.port, lOpts.host, (err) => { - if (err) { - errors.push(err) - return resolve() - } - - log('Listening on %s:%s', lOpts.host, lOpts.port) - resolve() - }) - }) - ) - } - - return Promise.all(listeners) - .then(() => { - errors.forEach((err) => { - log.error('received an error while attempting to listen', err) - }) - - // All servers failed to listen, throw an error - if (errors.length === listeners.length) { - throw new AllListenersFailedError() - } - }) - } - - /** - * Removes the server from tracking and performs cleanup. - * If all servers have been closed, `close` will be emitted by - * the listener. - * @private - * @param {net.Server} server - */ - _removeServer (server) { - // only emit if we're not listening - if (!this._isListening()) { - this.emit('close') - } - this._servers.delete(server) - server.removeAllListeners() - } - - /** - * Return the addresses we are listening on - * @throws - * @returns {Array} - */ - getAddrs () { - const multiaddrs = [] - this._servers.forEach(server => { - const address = server.address() - - if (address.address === '0.0.0.0') { - const netInterfaces = os.networkInterfaces() - Object.keys(netInterfaces).forEach((niKey) => { - netInterfaces[niKey].forEach((ni) => { - if (address.family === ni.family) { - multiaddrs.push( - multiaddr.fromNodeAddress({ - ...address, - address: ni.address - }, 'tcp') - ) - } - }) - }) - } else { - multiaddrs.push(multiaddr.fromNodeAddress(address, 'tcp')) - } - }) - - if (multiaddrs.length === 0) { - throw new Error('Listener is not ready yet') - } - - return multiaddrs - } - - /** - * Handler for new sockets from `net.createServer` - * @param {net.Socket} socket - */ - _onSocket (socket) { + const server = net.createServer((socket) => { // Avoid uncaught errors caused by unstable connections socket.on('error', noop) @@ -212,31 +34,124 @@ class Listener extends EventEmitter { log('new connection', addr.toString()) const s = new Libp2pSocket(socket, addr) + trackSocket(server, socket) - // Track the connection - const key = `${socket.remoteAddress}:${socket.remotePort}` - this.__connections.set(key, socket) - socket.once('close', () => { - this.__connections.delete(key) - socket.removeAllListeners() + handler && handler(s) + listener.emit('connection', s) + }) + + server.on('listening', () => listener.emit('listening')) + server.on('error', (err) => listener.emit('error', err)) + server.on('close', () => listener.emit('close')) + + // Keep track of open connections to destroy in case of timeout + server.__connections = {} + + listener.close = (options = {}) => { + if (!server.listening) { + return + } + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to stop the server. If it takes longer than the timeout, + // destroy all the underlying sockets manually. + const timeout = setTimeout(() => { + log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) + Object.keys(server.__connections).forEach((key) => { + log('destroying %s', key) + server.__connections[key].destroy() + }) + resolve() + }, options.timeout || c.CLOSE_TIMEOUT) + + server.once('close', () => clearTimeout(timeout)) + + server.close((err) => err ? reject(err) : resolve()) }) - - this._connectionHandler(s) - this.emit('connection', s) } + + let ipfsId + let listeningAddr + + listener.listen = (ma) => { + listeningAddr = ma + if (includes(ma.protoNames(), 'ipfs')) { + ipfsId = getIpfsId(ma) + listeningAddr = ma.decapsulate('ipfs') + } + + const lOpts = listeningAddr.toOptions() + return new Promise((resolve, reject) => { + server.listen(lOpts.port, lOpts.host, (err) => { + if (err) { + return reject(err) + } + + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) + }) + } + + listener.getAddrs = () => { + const multiaddrs = [] + const address = server.address() + + if (!address) { + throw new Error('Listener is not ready yet') + } + + // Because TCP will only return the IPv6 version + // we need to capture from the passed multiaddr + if (listeningAddr.toString().indexOf('ip4') !== -1) { + let m = listeningAddr.decapsulate('tcp') + m = m.encapsulate('/tcp/' + address.port) + if (ipfsId) { + m = m.encapsulate('/ipfs/' + ipfsId) + } + + if (m.toString().indexOf('0.0.0.0') !== -1) { + const netInterfaces = os.networkInterfaces() + Object.keys(netInterfaces).forEach((niKey) => { + netInterfaces[niKey].forEach((ni) => { + if (ni.family === 'IPv4') { + multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) + } + }) + }) + } else { + multiaddrs.push(m) + } + } + + if (address.family === 'IPv6') { + let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port) + if (ipfsId) { + ma = ma.encapsulate('/ipfs/' + ipfsId) + } + + multiaddrs.push(ma) + } + + return multiaddrs + } + + return listener } -module.exports = (options, handler) => { - return new Listener(options, handler) +function getIpfsId (ma) { + return ma.stringTuples().filter((tuple) => { + return tuple[0] === c.IPFS_MA_CODE + })[0][1] } -/** - * Get unique values from `arr` using `getValue` to determine - * what is used for uniqueness - * @param {Array} arr The array to get unique values for - * @param {function(value)} getValue The function to determine what is compared - * @returns {Array} - */ -function uniqueBy (arr, getValue) { - return [...new Map(arr.map((i) => [getValue(i), i])).values()] +function trackSocket (server, socket) { + const key = `${socket.remoteAddress}:${socket.remotePort}` + server.__connections[key] = socket + + socket.once('close', () => { + delete server.__connections[key] + }) } diff --git a/test/adapter/listen-dial.spec.js b/test/adapter/listen-dial.spec.js index 72c3cfc..97e6980 100644 --- a/test/adapter/listen-dial.spec.js +++ b/test/adapter/listen-dial.spec.js @@ -114,14 +114,14 @@ describe('listen', () => { }) }) - it('getAddrs does not preserve IPFS Id', (done) => { + it('getAddrs preserves IPFS Id', (done) => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) listener.listen(mh, () => { listener.getAddrs((err, multiaddrs) => { expect(err).to.not.exist() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) + expect(multiaddrs[0]).to.deep.equal(mh) listener.close(done) }) }) diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index 83a1876..e14e759 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -38,7 +38,7 @@ describe('listen', () => { await new Promise((resolve) => { socket1.on('connect', async () => { - await listener.close({ timeout: 100 }) + await listener.close() resolve() }) }) @@ -115,14 +115,14 @@ describe('listen', () => { await listener.close() }) - it('getAddrs does not preserve IPFS Id', async () => { + it('getAddrs preserves IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) + expect(multiaddrs[0]).to.deep.equal(mh) await listener.close() }) @@ -183,7 +183,7 @@ describe('dial', () => { handled = resolve }) - const ma = multiaddr('/ip6/::/tcp/0') + const ma = multiaddr('/ip6/::/tcp/9067') const listener = tcp.createListener(async (conn) => { await pipe( @@ -194,8 +194,7 @@ describe('dial', () => { }) await listener.listen(ma) - const addrs = listener.getAddrs() - await pipe(await tcp.dial(addrs[0])) + await pipe(await tcp.dial(ma)) await handledPromise await listener.close() @@ -211,7 +210,7 @@ describe('dial', () => { handled = resolve }) - const ma = multiaddr('/ip6/::/tcp/0') + const ma = multiaddr('/ip6/::/tcp/9068') const listener = tcp.createListener(async (conn) => { // pull(conn, pull.onEnd(destroyed)) @@ -220,8 +219,7 @@ describe('dial', () => { }) await listener.listen(ma) - const addrs = listener.getAddrs() - await pipe([], await tcp.dial(addrs[0])) + await pipe([], await tcp.dial(ma)) await handledPromise await listener.close()