mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-28 00:11:34 +00:00
Initial Commit
This commit is contained in:
161
src/index.js
Normal file
161
src/index.js
Normal file
@ -0,0 +1,161 @@
|
||||
var tcp = require('net')
|
||||
var Select = require('multistream-select').Select
|
||||
var Interactive = require('multistream-select').Interactive
|
||||
var spdy = require('spdy-transport')
|
||||
var identify = require('./identify')
|
||||
var log = require('ipfs-logger').group('swarm')
|
||||
var async = require('async')
|
||||
|
||||
exports = module.exports
|
||||
|
||||
var connections = {}
|
||||
var handles = []
|
||||
|
||||
// set the listener
|
||||
|
||||
exports.listen = function () {
|
||||
|
||||
tcp.createServer(function (socket) {
|
||||
var ms = new Select()
|
||||
ms.handle(socket)
|
||||
ms.addHandler('/spdy/3.1.0', function (ds) {
|
||||
log.info('Negotiated spdy with incoming socket')
|
||||
log.info('Buffer should be clean - ', ds.read())
|
||||
var spdyConnection = spdy.connection.create(ds, {
|
||||
protocol: 'spdy',
|
||||
isServer: true
|
||||
})
|
||||
|
||||
spdyConnection.start(3.1)
|
||||
|
||||
// attach multistream handlers to incoming streams
|
||||
spdyConnection.on('stream', function (spdyStream) {
|
||||
registerHandles(spdyStream)
|
||||
})
|
||||
|
||||
// learn about the other peer Identity
|
||||
/* TODO(daviddias)
|
||||
identify.inquiry(spdyConnection, function (err, spdyConnection, peerIdB58) {
|
||||
if (err) {
|
||||
return log.error(err)
|
||||
}
|
||||
if (connections[peerIdB58]) {
|
||||
return log.error('New connection established with a peer(' + peerIdB58 + ') that we already had a connection')
|
||||
}
|
||||
spdyConnection.peerId = peerIdB58
|
||||
connections[peerIdB58] = spdyConnection
|
||||
})
|
||||
*/
|
||||
|
||||
// close the connection when all the streams close
|
||||
spdyConnection.on('close', function () {
|
||||
delete connections[spdyConnection.peerId]
|
||||
})
|
||||
})
|
||||
}).listen(process.env.IPFS_PORT || 4001)
|
||||
}
|
||||
|
||||
// interface
|
||||
|
||||
exports.openStream = function (peer, protocol, cb) {
|
||||
// if Connection already open, open a new stream, otherwise, create a new Connection
|
||||
// then negoatite the protocol and return the opened stream
|
||||
|
||||
// If no connection open yet, open it
|
||||
if (!connections[peer.id.toB58String()]) {
|
||||
|
||||
// Establish a socket with one of the addresses
|
||||
var gotOne = false
|
||||
async.eachSeries(peer.multiaddrs, function (multiaddr, callback) {
|
||||
if (gotOne) {
|
||||
return callback()
|
||||
}
|
||||
var socket = tcp.connect(multiaddr.toOptions(), function connected () {
|
||||
gotSocket(socket)
|
||||
})
|
||||
|
||||
socket.once('error', function (err) {
|
||||
log.warn('Could not connect using one of the address of peer - ', peer.id.toB58String(), err)
|
||||
callback()
|
||||
})
|
||||
|
||||
}, function done () {
|
||||
if (!gotOne) {
|
||||
cb(new Error('Not able to open a scoket with peer - ', peer.id.toB58String()))
|
||||
}
|
||||
})
|
||||
|
||||
} else {
|
||||
createStream(peer, protocol, cb)
|
||||
}
|
||||
|
||||
// do the spdy people dance (multistream-select into spdy)
|
||||
function gotSocket (socket) {
|
||||
gotOne = true
|
||||
var msi = new Interactive()
|
||||
msi.handle(socket, function () {
|
||||
msi.select('/spdy/3.1.0', function (err, ds) {
|
||||
var spdyConnection = spdy.connection.create(ds, {
|
||||
protocol: 'spdy',
|
||||
isServer: false
|
||||
})
|
||||
spdyConnection.start(3.1)
|
||||
connections[peer.id.toB58String()] = spdyConnection
|
||||
|
||||
// attach multistream handlers to incoming streams
|
||||
spdyConnection.on('stream', function (spdyStream) {
|
||||
registerHandles(spdyStream)
|
||||
})
|
||||
|
||||
createStream(peer, protocol, cb)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function createStream (peer, protocol, cb) {
|
||||
// 1. to pop a new stream on the connection
|
||||
// 2. negotiate the requested protocol through multistream
|
||||
// 3. return back the stream when that is negotiated
|
||||
var conn = connections[peer.id.toB58String()]
|
||||
conn.request({path: '/', method: 'GET'}, function (err, stream) {
|
||||
if (err) {
|
||||
return cb(err)
|
||||
}
|
||||
var msi = new Interactive()
|
||||
msi.handle(stream, function () {
|
||||
msi.select(protocol, function (err, ds) {
|
||||
if (err) {
|
||||
return cb(err)
|
||||
}
|
||||
cb(null, ds) // wohoo we finally delivered the stream the user wanted
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
conn.on('close', function () {
|
||||
// TODO(daviddias) remove it from collections
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
exports.registerHandle = function (protocol, cb) {
|
||||
if (handles[protocol]) {
|
||||
var err = new Error('Handle for protocol already exists', protocol)
|
||||
log.error(err)
|
||||
return cb(err)
|
||||
}
|
||||
handles.push({ protocol: protocol, func: cb })
|
||||
log.info('Registered handler for protocol:', protocol)
|
||||
}
|
||||
|
||||
function registerHandles (spdyStream) {
|
||||
log.info('Preparing stream to handle the registered protocols')
|
||||
var msH = new Select()
|
||||
msH.handle(spdyStream)
|
||||
handles.forEach(function (handle) {
|
||||
msH.addHandler(handle.protocol, handle.func)
|
||||
})
|
||||
}
|
||||
|
Reference in New Issue
Block a user