js-libp2p/src/swarm.js

171 lines
4.5 KiB
JavaScript
Raw Normal View History

2015-07-08 23:01:36 -07:00
var tcp = require('net')
var Select = require('multistream-select').Select
var Interactive = require('multistream-select').Interactive
var spdy = require('spdy-transport')
var log = require('ipfs-logger').group('swarm')
var async = require('async')
2015-07-09 13:53:03 -07:00
var EventEmitter = require('events').EventEmitter
var util = require('util')
2015-07-08 23:01:36 -07:00
exports = module.exports = Swarm
2015-07-09 13:53:03 -07:00
util.inherits(Swarm, EventEmitter)
2015-07-08 23:01:36 -07:00
function Swarm () {
var self = this
if (!(self instanceof Swarm)) {
throw new Error('Swarm must be called with new')
}
self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001
2015-07-09 13:53:03 -07:00
2015-07-08 23:01:36 -07:00
self.connections = {}
self.handles = []
// set the listener
2015-07-09 13:53:03 -07:00
self.listen = function (port, ready) {
if (!ready) {
ready = function noop () {}
}
if (typeof port === 'function') {
ready = port
} else if (port) {
self.port = port
}
2015-07-08 23:01:36 -07:00
2015-07-09 13:53:03 -07:00
tcp.createServer(function (socket) {
2015-07-08 23:01:36 -07:00
var ms = new Select()
ms.handle(socket)
ms.addHandler('/spdy/3.1.0', function (ds) {
log.info('Negotiated spdy with incoming socket')
2015-07-09 15:45:03 -07:00
var conn = spdy.connection.create(ds, {
2015-07-08 23:01:36 -07:00
protocol: 'spdy',
isServer: true
})
2015-07-09 15:45:03 -07:00
conn.start(3.1)
2015-07-08 23:01:36 -07:00
2015-07-09 15:45:03 -07:00
self.emit('connection', conn)
2015-07-09 13:53:03 -07:00
2015-07-08 23:01:36 -07:00
// attach multistream handlers to incoming streams
2015-07-09 15:45:03 -07:00
conn.on('stream', registerHandles)
2015-07-08 23:01:36 -07:00
2015-07-09 15:45:13 -07:00
// IDENTIFY DOES THAT FOR US
// conn.on('close', function () { delete self.connections[conn.peerId] })
2015-07-08 23:01:36 -07:00
})
2015-07-09 13:53:03 -07:00
}).listen(self.port, ready)
2015-07-08 23:01:36 -07:00
}
// interface
2015-07-09 15:45:03 -07:00
// open stream account for connection reuse
2015-07-08 23:01:36 -07:00
self.openStream = function (peer, protocol, cb) {
// If no connection open yet, open it
if (!self.connections[peer.id.toB58String()]) {
// Establish a socket with one of the addresses
2015-07-09 15:45:03 -07:00
var socket
async.eachSeries(peer.multiaddrs, function (multiaddr, next) {
if (socket) { return next() }
var tmp = tcp.connect(multiaddr.toOptions(), function () {
socket = tmp
next()
2015-07-08 23:01:36 -07:00
})
2015-07-09 15:45:03 -07:00
tmp.once('error', function (err) {
log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err)
next()
2015-07-08 23:01:36 -07:00
})
}, function done () {
2015-07-09 15:45:03 -07:00
if (!socket) {
return cb(new Error('Not able to open a scoket with peer - ',
2015-07-09 15:45:13 -07:00
peer.id.toB58String()))
2015-07-08 23:01:36 -07:00
}
2015-07-09 15:45:03 -07:00
gotSocket(socket)
2015-07-08 23:01:36 -07:00
})
} else {
createStream(peer, protocol, cb)
}
// do the spdy people dance (multistream-select into spdy)
function gotSocket (socket) {
var msi = new Interactive()
msi.handle(socket, function () {
msi.select('/spdy/3.1.0', function (err, ds) {
2015-07-09 15:45:03 -07:00
if (err) { cb(err) }
2015-07-09 13:53:03 -07:00
2015-07-09 15:45:03 -07:00
var conn = spdy.connection.create(ds, { protocol: 'spdy', isServer: false })
conn.start(3.1)
conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = conn
2015-07-08 23:01:36 -07:00
2015-07-09 15:45:03 -07:00
conn.on('close', function () { delete self.connections[peer.id.toB58String()] })
2015-07-08 23:01:36 -07:00
createStream(peer, protocol, cb)
})
})
}
function createStream (peer, protocol, cb) {
2015-07-09 15:45:03 -07:00
// spawn new stream
2015-07-08 23:01:36 -07:00
var conn = self.connections[peer.id.toB58String()]
conn.request({path: '/', method: 'GET'}, function (err, stream) {
2015-07-09 15:45:03 -07:00
if (err) { return cb(err) }
// negotiate desired protocol
2015-07-08 23:01:36 -07:00
var msi = new Interactive()
msi.handle(stream, function () {
msi.select(protocol, function (err, ds) {
2015-07-09 15:45:03 -07:00
if (err) { return cb(err) }
cb(null, ds) // return the stream
2015-07-08 23:01:36 -07:00
})
})
})
}
}
2015-07-09 15:45:03 -07:00
self.registerHandle = function (protocol, handleFunc) {
2015-07-08 23:01:36 -07:00
if (self.handles[protocol]) {
2015-07-09 15:45:03 -07:00
throw new Error('Handle for protocol already exists', protocol)
2015-07-08 23:01:36 -07:00
}
2015-07-09 15:45:03 -07:00
self.handles.push({ protocol: protocol, func: handleFunc })
2015-07-08 23:01:36 -07:00
log.info('Registered handler for protocol:', protocol)
}
2015-07-09 13:53:03 -07:00
self.close = function (cb) {
var keys = Object.keys(self.connections)
var number = keys.length
if (number === 0) { cb() }
var c = new Counter(number, cb)
keys.map(function (key) {
c.hit()
self.connections[key].end()
})
}
2015-07-08 23:01:36 -07:00
function registerHandles (spdyStream) {
log.info('Preparing stream to handle the registered protocols')
var msH = new Select()
msH.handle(spdyStream)
self.handles.forEach(function (handle) {
msH.addHandler(handle.protocol, handle.func)
})
}
}
2015-07-09 13:53:03 -07:00
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
2015-07-09 15:45:03 -07:00
if (c === target) { callback() }
2015-07-09 13:53:03 -07:00
}
}