From 9c062ffeeb38bc8b5df38fa656abfae0f45c73fd Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 8 Jul 2015 23:01:36 -0700 Subject: [PATCH] have at least one test --- examples/network-c.js | 2 +- examples/network-s.js | 2 +- package.json | 3 +- src/identify.js | 2 +- src/index.js | 160 +------------------------------------ src/swarm.js | 178 ++++++++++++++++++++++++++++++++++++++++++ tests/swarm-test.js | 71 +++++++++++++++++ 7 files changed, 257 insertions(+), 161 deletions(-) create mode 100644 src/swarm.js diff --git a/examples/network-c.js b/examples/network-c.js index 4e6835d1..6fcad796 100644 --- a/examples/network-c.js +++ b/examples/network-c.js @@ -1,4 +1,4 @@ -var swarm = require('./../src') +var swarm = require('./../src').singleton var Peer = require('ipfs-peer') var Id = require('ipfs-peer-id') var multiaddr = require('multiaddr') diff --git a/examples/network-s.js b/examples/network-s.js index de2f7b35..666c5bfd 100644 --- a/examples/network-s.js +++ b/examples/network-s.js @@ -1,4 +1,4 @@ -var swarm = require('./../src') +var swarm = require('./../src').singleton swarm.listen() diff --git a/package.json b/package.json index 8ff55552..825b7770 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,8 @@ "code": "^1.4.1", "lab": "^5.13.0", "precommit-hook": "^3.0.0", - "standard": "^4.5.2" + "standard": "^4.5.2", + "stream-pair": "^1.0.2" }, "dependencies": { "async": "^1.3.0", diff --git a/src/identify.js b/src/identify.js index 7e4fab8a..9695af33 100644 --- a/src/identify.js +++ b/src/identify.js @@ -3,7 +3,7 @@ * pairs a specific peer is available through */ -var swarm = require('./') +var swarm = require('./').singleton var Interactive = require('multistream-select').Interactive exports = module.exports diff --git a/src/index.js b/src/index.js index c12267f2..b825469a 100644 --- a/src/index.js +++ b/src/index.js @@ -1,158 +1,4 @@ -var tcp = require('net') -var Select = require('multistream-select').Select -var Interactive = require('multistream-select').Interactive -var spdy = require('spdy-transport') -var identify = require('./identify') -var log = require('ipfs-logger').group('swarm') -var async = require('async') +var Swarm = require('./swarm') -exports = module.exports - -var connections = {} -var handles = [] - -// set the listener - -exports.listen = function () { - tcp.createServer(function (socket) { - var ms = new Select() - ms.handle(socket) - ms.addHandler('/spdy/3.1.0', function (ds) { - log.info('Negotiated spdy with incoming socket') - log.info('Buffer should be clean - ', ds.read()) - var spdyConnection = spdy.connection.create(ds, { - protocol: 'spdy', - isServer: true - }) - - spdyConnection.start(3.1) - - // attach multistream handlers to incoming streams - spdyConnection.on('stream', function (spdyStream) { - registerHandles(spdyStream) - }) - - // learn about the other peer Identity - /* TODO(daviddias) - identify.inquiry(spdyConnection, function (err, spdyConnection, peerIdB58) { - if (err) { - return log.error(err) - } - if (connections[peerIdB58]) { - return log.error('New connection established with a peer(' + peerIdB58 + ') that we already had a connection') - } - spdyConnection.peerId = peerIdB58 - connections[peerIdB58] = spdyConnection - }) - */ - - // close the connection when all the streams close - spdyConnection.on('close', function () { - delete connections[spdyConnection.peerId] - }) - }) - }).listen(process.env.IPFS_PORT || 4001) -} - -// interface - -exports.openStream = function (peer, protocol, cb) { - // if Connection already open, open a new stream, otherwise, create a new Connection - // then negoatite the protocol and return the opened stream - - // If no connection open yet, open it - if (!connections[peer.id.toB58String()]) { - // Establish a socket with one of the addresses - var gotOne = false - async.eachSeries(peer.multiaddrs, function (multiaddr, callback) { - if (gotOne) { - return callback() - } - var socket = tcp.connect(multiaddr.toOptions(), function connected () { - gotSocket(socket) - }) - - socket.once('error', function (err) { - log.warn('Could not connect using one of the address of peer - ', peer.id.toB58String(), err) - callback() - }) - - }, function done () { - if (!gotOne) { - cb(new Error('Not able to open a scoket with peer - ', peer.id.toB58String())) - } - }) - - } else { - createStream(peer, protocol, cb) - } - - // do the spdy people dance (multistream-select into spdy) - function gotSocket (socket) { - gotOne = true - var msi = new Interactive() - msi.handle(socket, function () { - msi.select('/spdy/3.1.0', function (err, ds) { - var spdyConnection = spdy.connection.create(ds, { - protocol: 'spdy', - isServer: false - }) - spdyConnection.start(3.1) - connections[peer.id.toB58String()] = spdyConnection - - // attach multistream handlers to incoming streams - spdyConnection.on('stream', function (spdyStream) { - registerHandles(spdyStream) - }) - - createStream(peer, protocol, cb) - }) - }) - } - - function createStream (peer, protocol, cb) { - // 1. to pop a new stream on the connection - // 2. negotiate the requested protocol through multistream - // 3. return back the stream when that is negotiated - var conn = connections[peer.id.toB58String()] - conn.request({path: '/', method: 'GET'}, function (err, stream) { - if (err) { - return cb(err) - } - var msi = new Interactive() - msi.handle(stream, function () { - msi.select(protocol, function (err, ds) { - if (err) { - return cb(err) - } - cb(null, ds) // wohoo we finally delivered the stream the user wanted - }) - }) - }) - - conn.on('close', function () { - // TODO(daviddias) remove it from collections - }) - - } - -} - -exports.registerHandle = function (protocol, cb) { - if (handles[protocol]) { - var err = new Error('Handle for protocol already exists', protocol) - log.error(err) - return cb(err) - } - handles.push({ protocol: protocol, func: cb }) - log.info('Registered handler for protocol:', protocol) -} - -function registerHandles (spdyStream) { - log.info('Preparing stream to handle the registered protocols') - var msH = new Select() - msH.handle(spdyStream) - handles.forEach(function (handle) { - msH.addHandler(handle.protocol, handle.func) - }) -} +exports = module.exports = Swarm +exports.singleton = new Swarm() diff --git a/src/swarm.js b/src/swarm.js new file mode 100644 index 00000000..98ae85a3 --- /dev/null +++ b/src/swarm.js @@ -0,0 +1,178 @@ +var tcp = require('net') +var Select = require('multistream-select').Select +var Interactive = require('multistream-select').Interactive +var spdy = require('spdy-transport') +// var identify = require('./identify') +var log = require('ipfs-logger').group('swarm') +var async = require('async') + +exports = module.exports = Swarm + +function Swarm () { + var self = this + + if (!(self instanceof Swarm)) { + throw new Error('Swarm must be called with new') + } + + self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 + self.connections = {} + self.handles = [] + + // set the listener + + self.listen = function () { + console.log('GOING TO LISTEN ON: ', self.port) + tcp.createServer(function (socket) { + console.log('GOT INCOMING CONNECTION') + + var ms = new Select() + ms.handle(socket) + ms.addHandler('/spdy/3.1.0', function (ds) { + console.log('GOT SPDY HANDLER REQUEST') + log.info('Negotiated spdy with incoming socket') + log.info('Buffer should be clean - ', ds.read()) + var spdyConnection = spdy.connection.create(ds, { + protocol: 'spdy', + isServer: true + }) + + spdyConnection.start(3.1) + + // attach multistream handlers to incoming streams + spdyConnection.on('stream', function (spdyStream) { + registerHandles(spdyStream) + }) + + // learn about the other peer Identity + /* TODO(daviddias) + identify.inquiry(spdyConnection, function (err, spdyConnection, peerIdB58) { + if (err) { + return log.error(err) + } + if (self.connections[peerIdB58]) { + return log.error('New connection established with a peer(' + peerIdB58 + ') that we already had a connection') + } + spdyConnection.peerId = peerIdB58 + self.connections[peerIdB58] = spdyConnection + }) + */ + + // close the connection when all the streams close + spdyConnection.on('close', function () { + delete self.connections[spdyConnection.peerId] + }) + }) + }).listen(self.port) + } + + // interface + + self.openStream = function (peer, protocol, cb) { + // if Connection already open, open a new stream, otherwise, create a new Connection + // then negoatite the protocol and return the opened stream + + // If no connection open yet, open it + if (!self.connections[peer.id.toB58String()]) { + // Establish a socket with one of the addresses + var gotOne = false + async.eachSeries(peer.multiaddrs, function (multiaddr, callback) { + if (gotOne) { + return callback() + } + var socket = tcp.connect(multiaddr.toOptions(), function connected () { + console.log('CONNECTED TO: ', multiaddr.toString()) + gotSocket(socket) + }) + + socket.once('error', function (err) { + log.warn('Could not connect using one of the address of peer - ', peer.id.toB58String(), err) + callback() + }) + + }, function done () { + if (!gotOne) { + cb(new Error('Not able to open a scoket with peer - ', peer.id.toB58String())) + } + }) + + } else { + createStream(peer, protocol, cb) + } + + // do the spdy people dance (multistream-select into spdy) + function gotSocket (socket) { + console.log('GOT SOCKET') + gotOne = true + var msi = new Interactive() + msi.handle(socket, function () { + console.log('GOING TO NEGOTIATE SPDY') + msi.select('/spdy/3.1.0', function (err, ds) { + if (err) { + return console.log('err', err) + } + var spdyConnection = spdy.connection.create(ds, { + protocol: 'spdy', + isServer: false + }) + spdyConnection.start(3.1) + self.connections[peer.id.toB58String()] = spdyConnection + + // attach multistream handlers to incoming streams + spdyConnection.on('stream', function (spdyStream) { + registerHandles(spdyStream) + }) + + createStream(peer, protocol, cb) + }) + }) + } + + function createStream (peer, protocol, cb) { + // 1. to pop a new stream on the connection + // 2. negotiate the requested protocol through multistream + // 3. return back the stream when that is negotiated + var conn = self.connections[peer.id.toB58String()] + conn.request({path: '/', method: 'GET'}, function (err, stream) { + if (err) { + return cb(err) + } + var msi = new Interactive() + msi.handle(stream, function () { + msi.select(protocol, function (err, ds) { + if (err) { + return cb(err) + } + cb(null, ds) // wohoo we finally delivered the stream the user wanted + }) + }) + }) + + conn.on('close', function () { + // TODO(daviddias) remove it from collections + }) + + } + + } + + self.registerHandle = function (protocol, cb) { + if (self.handles[protocol]) { + var err = new Error('Handle for protocol already exists', protocol) + log.error(err) + return cb(err) + } + self.handles.push({ protocol: protocol, func: cb }) + log.info('Registered handler for protocol:', protocol) + } + + function registerHandles (spdyStream) { + log.info('Preparing stream to handle the registered protocols') + var msH = new Select() + msH.handle(spdyStream) + self.handles.forEach(function (handle) { + msH.addHandler(handle.protocol, handle.func) + }) + } + +} diff --git a/tests/swarm-test.js b/tests/swarm-test.js index e69de29b..57ecb357 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -0,0 +1,71 @@ +var Lab = require('lab') +var Code = require('code') +var lab = exports.lab = Lab.script() + +var experiment = lab.experiment +var test = lab.test +var beforeEach = lab.beforeEach +var afterEach = lab.afterEach +var expect = Code.expect + +var multiaddr = require('multiaddr') +var Id = require('ipfs-peer-id') +var Peer = require('ipfs-peer') +var Swarm = require('../src/index.js') + +experiment(': ', function () { + var a + var b + var peerA + var peerB + + beforeEach(function (done) { + a = new Swarm() + a.port = 4000 + a.listen() + peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)]) + + b = new Swarm() + b.port = 4001 + b.listen() + peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)]) + + setTimeout(done, 1000) + }) + + afterEach(function (done) { + // a.close() + // b.close() + done() + }) + + test('Open a stream', {timeout: false}, function (done) { + var protocol = '/sparkles/3.3.3' + var c = new Counter(2, done) + + b.registerHandle(protocol, function (stream) { + console.log('bim') + c.unamas() + }) + + a.openStream(peerB, protocol, function (err, stream) { + console.log('pim') + expect(err).to.not.be.instanceof(Error) + c.unamas() + }) + }) + + function Counter (target, callback) { + var c = 0 + + this.unamas = count + + function count () { + c += 1 + if (c === target) { + callback() + } + } + } + +})