feat(readme): add pull-streams documentation

This commit is contained in:
David Dias
2016-08-31 06:41:34 -04:00
parent 5e89a2608b
commit d9f65e0b0c
6 changed files with 111 additions and 34 deletions

View File

@ -7,7 +7,7 @@ const contains = require('lodash.contains')
const isFunction = require('lodash.isfunction')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:tcp')
const log = debug('libp2p:tcp:dial')
const createListener = require('./listener')
@ -24,13 +24,23 @@ module.exports = class TCP {
const cOpts = ma.toOptions()
log('Connecting to %s %s', cOpts.port, cOpts.host)
const socket = toPull.duplex(net.connect(cOpts, cb))
socket.getObservedAddrs = (cb) => {
const rawSocket = net.connect(cOpts, cb)
rawSocket.once('timeout', () => {
log('timeout')
rawSocket.emit('error', new Error('Timeout'))
})
const socket = toPull.duplex(rawSocket)
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
return new Connection(socket)
return conn
}
createListener (options, handler) {
@ -39,6 +49,8 @@ module.exports = class TCP {
options = {}
}
handler = handler || (() => {})
return createListener(handler)
}

View File

@ -1,13 +1,14 @@
'use strict'
const multiaddr = require('multiaddr')
const debug = require('debug')
const log = debug('libp2p:tcp')
const Connection = require('interface-connection').Connection
const os = require('os')
const contains = require('lodash.contains')
const net = require('net')
const toPull = require('stream-to-pull-stream')
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const log = debug('libp2p:tcp:listen')
const getMultiaddr = require('./get-multiaddr')
@ -15,28 +16,39 @@ const IPFS_CODE = 421
const CLOSE_TIMEOUT = 2000
module.exports = (handler) => {
const listener = net.createServer((socket) => {
const listener = new EventEmitter()
const server = net.createServer((socket) => {
const addr = getMultiaddr(socket)
log('new connection', addr.toString())
const s = toPull.duplex(socket)
s.getObservedAddrs = (cb) => {
return cb(null, [getMultiaddr(socket)])
return cb(null, [addr])
}
trackSocket(server, socket)
const conn = new Connection(s)
handler(conn)
listener.emit('connection', conn)
})
server.on('listening', () => {
listener.emit('listening')
})
server.on('error', (err) => {
listener.emit('error', err)
})
server.on('close', () => {
listener.emit('close')
})
// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = `${socket.remoteAddress}:${socket.remotePort}`
listener.__connections[key] = socket
server.__connections = {}
socket.on('close', () => {
delete listener.__connections[key]
})
})
listener._close = listener.close
listener.close = (options, cb) => {
if (typeof options === 'function') {
cb = options
@ -46,24 +58,24 @@ module.exports = (handler) => {
options = options || {}
let closed = false
listener._close(cb)
listener.once('close', () => {
server.close(cb)
server.once('close', () => {
closed = true
})
setTimeout(() => {
if (closed) return
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
Object.keys(server.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
server.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}
let ipfsId
let listeningAddr
listener._listen = listener.listen
listener.listen = (ma, cb) => {
listeningAddr = ma
if (contains(ma.protoNames(), 'ipfs')) {
@ -73,12 +85,12 @@ module.exports = (handler) => {
const lOpts = listeningAddr.toOptions()
log('Listening on %s %s', lOpts.port, lOpts.host)
return listener._listen(lOpts.port, lOpts.host, cb)
return server.listen(lOpts.port, lOpts.host, cb)
}
listener.getAddrs = (cb) => {
const multiaddrs = []
const address = listener.address()
const address = server.address()
if (!address) {
return cb(new Error('Listener is not ready yet'))
@ -127,3 +139,12 @@ function getIpfsId (ma) {
return tuple[0] === IPFS_CODE
})[0][1]
}
function trackSocket (server, socket) {
const key = `${socket.remoteAddress}:${socket.remotePort}`
server.__connections[key] = socket
socket.on('close', () => {
delete server.__connections[key]
})
}