clean the code a bit

This commit is contained in:
David Dias 2015-07-09 15:45:03 -07:00
parent 6c82973315
commit 14d11de201
3 changed files with 46 additions and 65 deletions

View File

@ -7,4 +7,10 @@ ipfs-swarm Node.js implementation
# Description # Description
ipfs-swarm is an abstraction for the network layer on IPFS. It offers an API to open streams between peers on a specific protocol.
Ref link (still a WiP) - https://github.com/diasdavid/specs/blob/protocol-spec/protocol/layers.md#network-layer
# Usage # Usage

View File

@ -30,7 +30,7 @@ function Identify (swarm, peerSelf) {
}) })
stream.on('end', function () { stream.on('end', function () {
console.log(JSON.prse(answer)) console.log(JSON.parse(answer))
self.emit('thenews', answer) self.emit('thenews', answer)
}) })

View File

@ -40,83 +40,72 @@ function Swarm () {
ms.handle(socket) ms.handle(socket)
ms.addHandler('/spdy/3.1.0', function (ds) { ms.addHandler('/spdy/3.1.0', function (ds) {
log.info('Negotiated spdy with incoming socket') log.info('Negotiated spdy with incoming socket')
log.info('Buffer should be clean - ', ds.read())
var spdyConnection = spdy.connection.create(ds, { var conn = spdy.connection.create(ds, {
protocol: 'spdy', protocol: 'spdy',
isServer: true isServer: true
}) })
spdyConnection.start(3.1) conn.start(3.1)
self.emit('connection', spdyConnection) self.emit('connection', conn)
// attach multistream handlers to incoming streams // attach multistream handlers to incoming streams
spdyConnection.on('stream', function (spdyStream) { conn.on('stream', registerHandles)
registerHandles(spdyStream)
})
// close the connection when all the streams close // IDENTIFY DOES THAT FOR US
spdyConnection.on('close', function () { // conn.on('close', function () { delete self.connections[conn.peerId] })
delete self.connections[spdyConnection.peerId]
})
}) })
}).listen(self.port, ready) }).listen(self.port, ready)
} }
// interface // interface
// open stream account for connection reuse
self.openStream = function (peer, protocol, cb) { self.openStream = function (peer, protocol, cb) {
// if Connection already open, open a new stream, otherwise, create a new Connection
// then negoatite the protocol and return the opened stream
// If no connection open yet, open it // If no connection open yet, open it
if (!self.connections[peer.id.toB58String()]) { if (!self.connections[peer.id.toB58String()]) {
// Establish a socket with one of the addresses // Establish a socket with one of the addresses
var gotOne = false var socket
async.eachSeries(peer.multiaddrs, function (multiaddr, callback) { async.eachSeries(peer.multiaddrs, function (multiaddr, next) {
if (gotOne) { if (socket) { return next() }
return callback()
} var tmp = tcp.connect(multiaddr.toOptions(), function () {
var socket = tcp.connect(multiaddr.toOptions(), function connected () { socket = tmp
gotSocket(socket) next()
}) })
socket.once('error', function (err) { tmp.once('error', function (err) {
log.warn('Could not connect using one of the address of peer - ', peer.id.toB58String(), err) log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err)
callback() next()
}) })
}, function done () { }, function done () {
if (!gotOne) { if (!socket) {
cb(new Error('Not able to open a scoket with peer - ', peer.id.toB58String())) return cb(new Error('Not able to open a scoket with peer - ',
peer.id.toB58String()))
} }
gotSocket(socket)
}) })
} else { } else {
createStream(peer, protocol, cb) createStream(peer, protocol, cb)
} }
// do the spdy people dance (multistream-select into spdy) // do the spdy people dance (multistream-select into spdy)
function gotSocket (socket) { function gotSocket (socket) {
gotOne = true
var msi = new Interactive() var msi = new Interactive()
msi.handle(socket, function () { msi.handle(socket, function () {
msi.select('/spdy/3.1.0', function (err, ds) { msi.select('/spdy/3.1.0', function (err, ds) {
if (err) { if (err) { cb(err) }
return console.log('err', err)
}
var spdyConnection = spdy.connection.create(ds, {
protocol: 'spdy',
isServer: false
})
spdyConnection.start(3.1)
self.connections[peer.id.toB58String()] = spdyConnection var conn = spdy.connection.create(ds, { protocol: 'spdy', isServer: false })
conn.start(3.1)
conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = conn
// attach multistream handlers to incoming streams conn.on('close', function () { delete self.connections[peer.id.toB58String()] })
spdyConnection.on('stream', function (spdyStream) {
registerHandles(spdyStream)
})
createStream(peer, protocol, cb) createStream(peer, protocol, cb)
}) })
@ -124,40 +113,28 @@ function Swarm () {
} }
function createStream (peer, protocol, cb) { function createStream (peer, protocol, cb) {
// 1. to pop a new stream on the connection // spawn new stream
// 2. negotiate the requested protocol through multistream
// 3. return back the stream when that is negotiated
var conn = self.connections[peer.id.toB58String()] var conn = self.connections[peer.id.toB58String()]
conn.request({path: '/', method: 'GET'}, function (err, stream) { conn.request({path: '/', method: 'GET'}, function (err, stream) {
if (err) { if (err) { return cb(err) }
return cb(err)
} // negotiate desired protocol
var msi = new Interactive() var msi = new Interactive()
msi.handle(stream, function () { msi.handle(stream, function () {
msi.select(protocol, function (err, ds) { msi.select(protocol, function (err, ds) {
if (err) { if (err) { return cb(err) }
return cb(err) cb(null, ds) // return the stream
}
cb(null, ds) // wohoo we finally delivered the stream the user wanted
}) })
}) })
}) })
conn.on('close', function () {
// TODO(daviddias) remove it from collections
})
} }
} }
self.registerHandle = function (protocol, cb) { self.registerHandle = function (protocol, handleFunc) {
if (self.handles[protocol]) { if (self.handles[protocol]) {
var err = new Error('Handle for protocol already exists', protocol) throw new Error('Handle for protocol already exists', protocol)
log.error(err)
return cb(err)
} }
self.handles.push({ protocol: protocol, func: cb }) self.handles.push({ protocol: protocol, func: handleFunc })
log.info('Registered handler for protocol:', protocol) log.info('Registered handler for protocol:', protocol)
} }
@ -190,8 +167,6 @@ function Counter (target, callback) {
function count () { function count () {
c += 1 c += 1
if (c === target) { if (c === target) { callback() }
callback()
}
} }
} }