diff --git a/package.json b/package.json index 05323866..df32523a 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "devDependencies": { "code": "^1.4.1", "lab": "^5.13.0", + "libp2p-spdy": "^0.1.0", "libp2p-tcp": "^0.1.1", "precommit-hook": "^3.0.0", "sinon": "^1.15.4", diff --git a/src/swarm.js b/src/swarm.js index 099917b4..0673c391 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -24,11 +24,13 @@ function Swarm (peerInfo) { // listeners: [] } self.transports = {} + // transportName: listener self.listeners = {} + // protocolName: handlerFunc self.protocols = {} - // muxerName: { muxer: muxer + // muxerName: { Muxer: Muxer // Muxer is a constructor // options: options } self.muxers = {} @@ -62,8 +64,11 @@ function Swarm (peerInfo) { } - self.addStreamMuxer = function (StreamMuxer, options) { - + self.addStreamMuxer = function (name, StreamMuxer, options) { + self.muxers[name] = { + Muxer: StreamMuxer, + options: options + } } self.dial = function (peerInfo, options, protocol, callback) { @@ -82,7 +87,12 @@ function Swarm (peerInfo) { if (self.conns[peerInfo.id.toB58String()]) { if (protocol) { - multistreamHandshake(self.conns[peerInfo.id.toB58String()]) + if (self.muxers['spdy']) { + // TODO upgrade this conn to a muxer + console.log('TODO: upgrade a warm conn to muxer that was added after') + } else { + multistreamHandshake(self.conns[peerInfo.id.toB58String()]) + } self.conns[peerInfo.id.toB58String()] = undefined delete self.conns[peerInfo.id.toB58String()] return @@ -93,7 +103,11 @@ function Swarm (peerInfo) { // check if a stream muxer for this peer is available if (self.muxedConns[peerInfo.id.toB58String()]) { - return openMuxedStream() + if (protocol) { + return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()]) + } else { + return callback() + } } // Creating a new conn with this peer routine @@ -151,24 +165,49 @@ function Swarm (peerInfo) { function done () { // TODO apply upgrades - // TODO apply stream muxer + // apply stream muxer // if no protocol is selected, save it in the pool // if protocol is selected, multistream that protocol if (!conn) { callback(new Error('Unable to open a connection')) } - if (protocol) { - multistreamHandshake(conn) + if (self.muxers['spdy']) { + var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) + spdy.attach(conn, false, function (err, muxer) { + if (err) { + return console.log(err) // TODO Treat error + } + + muxer.on('stream', userProtocolMuxer) + + self.muxedConns[peerInfo.id.toB58String()] = muxer + + if (protocol) { + openMuxedStream(muxer) + } else { + callback() + } + }) } else { - self.conns[peerInfo.id.toB58String()] = conn - callback() + if (protocol) { + multistreamHandshake(conn) + } else { + self.conns[peerInfo.id.toB58String()] = conn + callback() + } } } - function openMuxedStream () { + function openMuxedStream (muxer) { // 1. create a new stream on this muxedConn and pass that to // multistreamHanshake + muxer.dialStream(function (err, conn) { + if (err) { + return console.log(err) // TODO Treat error + } + multistreamHandshake(conn) + }) } function multistreamHandshake (conn) { @@ -213,8 +252,21 @@ function Swarm (peerInfo) { // TODO apply upgrades // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) - // if no stream muxer, then - userProtocolMuxer(conn) + if (self.muxers['spdy']) { + var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) + spdy.attach(conn, true, function (err, muxer) { + if (err) { + return console.log(err) // TODO treat error + } + + // TODO This muxer has to be identified! + + muxer.on('stream', userProtocolMuxer) + }) + } else { + // if no stream muxer, then + userProtocolMuxer(conn) + } } // Handle user given protocols diff --git a/tests/swarm-test.js b/tests/swarm-test.js index a5a13ea0..8fd474a7 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -11,6 +11,7 @@ var Id = require('peer-id') var Peer = require('peer-info') var Swarm = require('../src') var tcp = require('libp2p-tcp') +var Spdy = require('libp2p-spdy') /* TODO experiment('Basics', function () { @@ -220,7 +221,81 @@ experiment('Without a Stream Muxer', function () { }) */ }) -experiment('With a SPDY Stream Muxer', function () {}) +experiment('With a SPDY Stream Muxer', function () { + experiment('tcp', function () { + test('add Stream Muxer', function (done) { + // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p = new Peer(Id.create(), []) + var sw = new Swarm(p) + sw.addStreamMuxer('spdy', Spdy, {}) + + done() + }) + + test('dial a conn on a protocol', function (done) { + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + sw1.addStreamMuxer('spdy', Spdy, {}) + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + sw2.addStreamMuxer('spdy', Spdy, {}) + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + expect(Object.keys(sw2.muxedConns).length).to.equal(0) + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + + done() + } + }) + }) + + var count = 0 + + function ready () { + count++ + if (count < 2) { + return + } + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + } + }) + test('dial two conns (transport reuse)', function (done) { + done() + }) + test('identify', function (done) { done() }) + }) +}) /* OLD experiment('BASICS', function () {