From d9f65e0b0c244164c8cf53e9385eeb125daf653b Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 31 Aug 2016 06:41:34 -0400 Subject: [PATCH] feat(readme): add pull-streams documentation --- README.md | 32 +++++++++++++++++---- package.json | 1 + src/index.js | 20 ++++++++++--- src/listener.js | 63 +++++++++++++++++++++++++++-------------- test/compliance.spec.js | 23 +++++++++++++++ test/index.spec.js | 6 ++-- 6 files changed, 111 insertions(+), 34 deletions(-) create mode 100644 test/compliance.spec.js diff --git a/README.md b/README.md index 98c0055..84bcbe0 100644 --- a/README.md +++ b/README.md @@ -10,15 +10,11 @@ ![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png) ![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png) -> Node.js implementation of the TCP module that libp2p uses, which implements -> the [interface-connection](https://github.com/libp2p/interface-connection) -> interface for dial/listen. +> Node.js implementation of the TCP module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/interface-connection) interface for dial/listen. ## Description -`libp2p-tcp` in Node.js is a very thin shim that adds support for dialing to a -`multiaddr`. This small shim will enable libp2p to use other different -transports. +`libp2p-tcp` in Node.js is a very thin shim that adds support for dialing to a `multiaddr`. This small shim will enable libp2p to use other different transports. **Note:** This module uses [pull-streams](https://pull-stream.github.io) for all stream based interfaces. @@ -71,6 +67,30 @@ hello > npm i libp2p-tcp ``` +## This module uses `pull-streams` + +We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about what took us to make this migration, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). + +You can learn more about pull-streams at: + +- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) +- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) +- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) +- [pull-streams documentation](https://pull-stream.github.io/) + +### Converting `pull-streams` to Node.js Streams + +If you are a Node.js streams user, you can convert a pull-stream to Node.js Stream using the module `pull-stream-to-stream`, giving you an instance of a Node.js stream that is linked to the pull-stream. Example: + +``` +const pullToStream = require('pull-stream-to-stream') + +const nodeStreamInstance = pullToStream(pullStreamInstance) +// nodeStreamInstance is an instance of a Node.js Stream +``` + +To learn more about his utility, visit https://pull-stream.github.io/#pull-stream-to-stream + ## API [![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)](https://github.com/diasdavid/interface-transport) diff --git a/package.json b/package.json index 7bbfa81..3e534ef 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "devDependencies": { "aegir": "^6.0.0", "chai": "^3.5.0", + "interface-transport": "^0.2.0", "lodash.isfunction": "^3.0.8", "pre-commit": "^1.1.2" }, diff --git a/src/index.js b/src/index.js index bbb9fa7..0d459a2 100644 --- a/src/index.js +++ b/src/index.js @@ -7,7 +7,7 @@ const contains = require('lodash.contains') const isFunction = require('lodash.isfunction') const Connection = require('interface-connection').Connection const debug = require('debug') -const log = debug('libp2p:tcp') +const log = debug('libp2p:tcp:dial') const createListener = require('./listener') @@ -24,13 +24,23 @@ module.exports = class TCP { const cOpts = ma.toOptions() log('Connecting to %s %s', cOpts.port, cOpts.host) - const socket = toPull.duplex(net.connect(cOpts, cb)) - socket.getObservedAddrs = (cb) => { + const rawSocket = net.connect(cOpts, cb) + + rawSocket.once('timeout', () => { + log('timeout') + rawSocket.emit('error', new Error('Timeout')) + }) + + const socket = toPull.duplex(rawSocket) + + const conn = new Connection(socket) + + conn.getObservedAddrs = (cb) => { return cb(null, [ma]) } - return new Connection(socket) + return conn } createListener (options, handler) { @@ -39,6 +49,8 @@ module.exports = class TCP { options = {} } + handler = handler || (() => {}) + return createListener(handler) } diff --git a/src/listener.js b/src/listener.js index b945afd..e4232e8 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,13 +1,14 @@ 'use strict' const multiaddr = require('multiaddr') -const debug = require('debug') -const log = debug('libp2p:tcp') const Connection = require('interface-connection').Connection const os = require('os') const contains = require('lodash.contains') const net = require('net') const toPull = require('stream-to-pull-stream') +const EventEmitter = require('events').EventEmitter +const debug = require('debug') +const log = debug('libp2p:tcp:listen') const getMultiaddr = require('./get-multiaddr') @@ -15,28 +16,39 @@ const IPFS_CODE = 421 const CLOSE_TIMEOUT = 2000 module.exports = (handler) => { - const listener = net.createServer((socket) => { + const listener = new EventEmitter() + + const server = net.createServer((socket) => { + const addr = getMultiaddr(socket) + log('new connection', addr.toString()) + const s = toPull.duplex(socket) s.getObservedAddrs = (cb) => { - return cb(null, [getMultiaddr(socket)]) + return cb(null, [addr]) } + trackSocket(server, socket) + const conn = new Connection(s) handler(conn) + listener.emit('connection', conn) + }) + + server.on('listening', () => { + listener.emit('listening') + }) + + server.on('error', (err) => { + listener.emit('error', err) + }) + + server.on('close', () => { + listener.emit('close') }) // Keep track of open connections to destroy in case of timeout - listener.__connections = {} - listener.on('connection', (socket) => { - const key = `${socket.remoteAddress}:${socket.remotePort}` - listener.__connections[key] = socket + server.__connections = {} - socket.on('close', () => { - delete listener.__connections[key] - }) - }) - - listener._close = listener.close listener.close = (options, cb) => { if (typeof options === 'function') { cb = options @@ -46,24 +58,24 @@ module.exports = (handler) => { options = options || {} let closed = false - listener._close(cb) - listener.once('close', () => { + server.close(cb) + server.once('close', () => { closed = true }) setTimeout(() => { if (closed) return log('unable to close graciously, destroying conns') - Object.keys(listener.__connections).forEach((key) => { + Object.keys(server.__connections).forEach((key) => { log('destroying %s', key) - listener.__connections[key].destroy() + server.__connections[key].destroy() }) }, options.timeout || CLOSE_TIMEOUT) } + let ipfsId let listeningAddr - listener._listen = listener.listen listener.listen = (ma, cb) => { listeningAddr = ma if (contains(ma.protoNames(), 'ipfs')) { @@ -73,12 +85,12 @@ module.exports = (handler) => { const lOpts = listeningAddr.toOptions() log('Listening on %s %s', lOpts.port, lOpts.host) - return listener._listen(lOpts.port, lOpts.host, cb) + return server.listen(lOpts.port, lOpts.host, cb) } listener.getAddrs = (cb) => { const multiaddrs = [] - const address = listener.address() + const address = server.address() if (!address) { return cb(new Error('Listener is not ready yet')) @@ -127,3 +139,12 @@ function getIpfsId (ma) { return tuple[0] === IPFS_CODE })[0][1] } + +function trackSocket (server, socket) { + const key = `${socket.remoteAddress}:${socket.remotePort}` + server.__connections[key] = socket + + socket.on('close', () => { + delete server.__connections[key] + }) +} diff --git a/test/compliance.spec.js b/test/compliance.spec.js new file mode 100644 index 0000000..efd8c04 --- /dev/null +++ b/test/compliance.spec.js @@ -0,0 +1,23 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') +const Tcp = require('../src') + +describe('compliance', () => { + tests({ + setup (cb) { + let tcp = new Tcp() + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/9091'), + multiaddr('/ip4/127.0.0.1/tcp/9092'), + multiaddr('/ip4/127.0.0.1/tcp/9093') + ] + cb(null, tcp, addrs) + }, + teardown (cb) { + cb() + } + }) +}) diff --git a/test/index.spec.js b/test/index.spec.js index 952e39d..7f4dc32 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -23,14 +23,14 @@ describe('listen', () => { }) it('close listener with connections, through timeout', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9091/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => { pull(conn, conn) }) listener.listen(mh, () => { - const socket1 = net.connect(9091) - const socket2 = net.connect(9091) + const socket1 = net.connect(9090) + const socket2 = net.connect(9090) socket1.write('Some data that is never handled') socket1.end()