From f8e14e4ddf050eb6626decc1146b7bde9e045648 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 7 Mar 2016 12:47:11 +0000 Subject: [PATCH] stream multiplexing done, starting on identify refactor --- package.json | 2 +- src/identify.js | 59 ++++---- src/index.js | 64 ++++++-- tests/swarm-old.js | 354 -------------------------------------------- tests/swarm-test.js | 136 ++++++++++++++--- 5 files changed, 206 insertions(+), 409 deletions(-) delete mode 100644 tests/swarm-old.js diff --git a/package.json b/package.json index 70983e97..8c133589 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "bl": "^1.1.2", "chai": "^3.5.0", "istanbul": "^0.4.2", - "libp2p-spdy": "^0.1.0", + "libp2p-spdy": "^0.2.3", "libp2p-tcp": "^0.2.1", "mocha": "^2.4.5", "multiaddr": "^1.1.1", diff --git a/src/identify.js b/src/identify.js index bf78cd89..7bad7eb7 100644 --- a/src/identify.js +++ b/src/identify.js @@ -1,30 +1,47 @@ /* - * Identify is one of the protocols swarms speaks in order to broadcast and learn - * about the ip:port pairs a specific peer is available through + * Identify is one of the protocols swarms speaks in order to + * broadcast and learn about the ip:port pairs a specific peer + * is available through */ -var Interactive = require('multistream-select').Interactive -var protobufs = require('protocol-buffers-stream') -var fs = require('fs') -var path = require('path') -var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -var Address6 = require('ip-address').Address6 -var Id = require('peer-id') -var multiaddr = require('multiaddr') +// var multistream = require('multistream-select') +// var protobufs = require('protocol-buffers-stream') +// var fs = require('fs') +// var path = require('path') +// var protobufs = require('protocol-buffers-stream') +// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) +// var Address6 = require('ip-address').Address6 +// var Id = require('peer-id') +// var multiaddr = require('multiaddr') -exports = module.exports = identify +exports = module.exports -var protoId = '/ipfs/identify/1.0.0' +exports.multicodec = '/ipfs/identify/1.0.0' -exports.protoId = protoId -var createProtoStream = protobufs(schema) +exports.exec = (muxedConn, callback) => { + // TODO + // 1. open a stream + // 2. multistream into identify + // 3. send what I see from this other peer + // 4. receive what the other peer sees from me + // 4. callback with (err, peerInfo) +} +exports.handler = (peerInfo) => { + return function (conn) { + // TODO + // 1. receive incoming observed info about me + // 2. send back what I see from the other + } +} + +/* function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { var msi = new Interactive() msi.handle(conn, function () { msi.select(protoId, function (err, ds) { if (err) { - return console.log(err) // TODO Treat error + return console.log(err) } var ps = createProtoStream() @@ -39,16 +56,6 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { socket: socket } - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - // }) - }) - var mh = getMultiaddr(socket) ps.identify({ @@ -156,4 +163,4 @@ function updateSelf (peerSelf, observedAddr) { peerSelf.multiaddrs.push(omh) } } -} +}*/ diff --git a/src/index.js b/src/index.js index a4658713..4e3f3f6d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,6 @@ const multistream = require('multistream-select') // const async = require('async') -// const identify = require('./identify') +const identify = require('./identify') const PassThrough = require('stream').PassThrough exports = module.exports = Swarm @@ -130,16 +130,28 @@ function Swarm (peerInfo) { // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } this.muxers = {} this.connection.addStreamMuxer = (muxer) => { - // TODO - // .handle(protocol, () => { - // after attaching the stream muxer, check if identify is enabled - // }) - // TODO add to the list of muxers available + // for dialing + this.muxers[muxer.multicodec] = muxer + + // for listening + this.handle(muxer.multicodec, (conn) => { + const muxedConn = muxer(conn, true) + muxedConn.on('stream', connHandler) + + if (this.identify) { + identify.exec(muxedConn, (err, pi) => { + if (err) {} + // TODO muxedConns[pi.id.toB58String()].muxer = muxedConn + }) + } + }) } // enable the Identify protocol + this.identify = false this.connection.reuse = () => { - // TODO identify + this.identify = true + this.handle(identify.multicodec, identify.handler(peerInfo)) } const self = this // couldn't get rid of this @@ -224,14 +236,40 @@ function Swarm (peerInfo) { } function attemptMuxerUpgrade (conn, cb) { - if (Object.keys(self.muxers).length === 0) { + const muxers = Object.keys(self.muxers) + if (muxers.length === 0) { return cb(new Error('no muxers available')) } - // TODO add muxer to the muxedConns object for the peerId - // TODO if it succeeds, add incomming open coons to connHandler + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + + nextMuxer(muxers.shift()) + + function nextMuxer (key) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + cb(new Error('could not upgrade to stream muxing')) + } else { + nextMuxer(muxers.shift()) + } + } + + const muxedConn = self.muxers[key](conn, false) + self.muxedConns[b58Id] = {} + self.muxedConns[b58Id].muxer = muxedConn + cb(null, muxedConn) + }) + }) + } } function openConnInMuxedConn (muxer, cb) { - // TODO open a conn in this muxer + cb(muxer.newStream()) } function protocolHandshake (conn, protocol, cb) { @@ -255,6 +293,10 @@ function Swarm (peerInfo) { this.close = (callback) => { var count = 0 + Object.keys(this.muxedConns).forEach((key) => { + this.muxedConns[key].muxer.end() + }) + Object.keys(this.transports).forEach((key) => { this.transports[key].close(() => { if (++count === Object.keys(this.transports).length) { diff --git a/tests/swarm-old.js b/tests/swarm-old.js deleted file mode 100644 index 0acd7654..00000000 --- a/tests/swarm-old.js +++ /dev/null @@ -1,354 +0,0 @@ -/* eslint-env mocha */ - -var async = require('async') -var expect = require('chai').expect - -var multiaddr = require('multiaddr') -var Id = require('peer-id') -var Peer = require('peer-info') -var Swarm = require('../src') -var tcp = require('libp2p-tcp') -var Spdy = require('libp2p-spdy') - -// because of Travis-CI -process.on('uncaughtException', function (err) { - console.log('Caught exception: ' + err) -}) - -describe('Basics', function () { - it('enforces creation with new', function (done) { - expect(function () { - Swarm() - }).to.throw() - done() - }) - - it('it throws an exception without peerSelf', function (done) { - expect(function () { - var sw = new Swarm() - sw.close() - }).to.throw(Error) - done() - }) -}) - -describe('When dialing', function () { - describe('if the swarm does add any of the peer transports', function () { - it('it returns an error', function (done) { - var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')]) - var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')]) - var swarm = new Swarm(peerOne) - - swarm.dial(peerTwo, {}, function (err) { - expect(err).to.exist - done() - }) - }) - }) -}) - -describe('Without a Stream Muxer', function () { - describe('and one swarm over tcp', function () { - it('add the transport', function (done) { - var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - - sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) - - function ready () { - expect(sw.transports['tcp'].options).to.deep.equal({}) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - - sw.close(done) - } - }) - }) - - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - async.parallel([sw1.close, sw2.close], done) - }) - - it('dial a conn', function (done) { - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - done() - }) - }) - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial a protocol on a previous created conn', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - // it('add an upgrade', function (done) { done() }) - // it('dial a conn on top of a upgrade', function (done) { done() }) - // it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) - - /* TODO - describe('udp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - - /* TODO - describe('udt', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - -/* TODO -describe('utp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) -}) */ -}) - -describe('With a SPDY Stream Muxer', function () { - describe('and one swarm over tcp', function () { - // TODO: What is the it here? - it('add Stream Muxer', function (done) { - // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - sw.addStreamMuxer('spdy', Spdy, {}) - - done() - }) - }) - - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - function afterEach (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - - done() - } - } - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial two conns (transport reuse)', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formality so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - // TODO Improve clarity - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - describe('and two identity enabled swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - sw1.enableIdentify() - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.enableIdentify() - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - // give time for identify to finish - setTimeout(function () { - expect(Object.keys(sw2.muxedConns).length).to.equal(1) - done() - }, 500) - } - }) - - it('identify', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - done() - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - }) -}) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 02e752cd..06e11612 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -9,7 +9,7 @@ const Peer = require('peer-info') const Swarm = require('../src') const TCP = require('libp2p-tcp') const bl = require('bl') -// var SPDY = require('libp2p-spdy') +const spdy = require('libp2p-spdy') describe('basics', () => { it('throws on missing peerInfo', (done) => { @@ -196,7 +196,7 @@ describe('transport - websockets', function () { }) describe('high level API - 1st without stream multiplexing (on TCP)', function () { - this.timeout(10000) + this.timeout(20000) var swarmA var peerA @@ -278,7 +278,7 @@ describe('high level API - 1st without stream multiplexing (on TCP)', function ( }) describe('stream muxing (on TCP)', function () { - this.timeout(10000) + this.timeout(20000) describe('multiplex', () => { before((done) => { done() }) @@ -291,31 +291,131 @@ describe('stream muxing (on TCP)', function () { it.skip('enable identify to reuse incomming muxed conn', (done) => {}) }) describe('spdy', () => { - before((done) => { done() }) - after((done) => { done() }) + var swarmA + var peerA + var swarmB + var peerB + var swarmC + var peerC - it.skip('add', (done) => {}) - it.skip('handle + dial on protocol', (done) => {}) - it.skip('dial to warm conn', (done) => {}) - it.skip('dial on protocol, reuse warmed conn', (done) => {}) - it.skip('enable identify to reuse incomming muxed conn', (done) => {}) + before((done) => { + peerA = new Peer() + peerB = new Peer() + peerC = new Peer() + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + swarmC = new Swarm(peerC) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + swarmC.transport.add('tcp', new TCP()) + swarmC.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 3) { + done() + } + } + }) + + after((done) => { + var counter = 0 + + swarmA.close(closed) + swarmB.close(closed) + swarmC.close(closed) + + function closed () { + if (++counter === 3) { + done() + } + } + }) + + it('add', (done) => { + swarmA.connection.addStreamMuxer(spdy) + swarmB.connection.addStreamMuxer(spdy) + swarmC.connection.addStreamMuxer(spdy) + done() + }) + + it('handle + dial on protocol', (done) => { + swarmB.handle('/abacaxi/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmA.muxedConns).length).to.equal(1) + conn.end() + conn.on('end', done) + }) + }) + + it('dial to warm conn', (done) => { + swarmB.dial(peerA, (err) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + done() + }) + }) + + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.handle('/papaia/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + conn.end() + conn.on('end', done) + }) + }) + + it.skip('enable identify to reuse incomming muxed conn', (done) => { + swarmA.connection.reuse() + swarmC.connection.reuse() + + swarmC.dial(peerA, (err) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(swarmC.muxedConns).length).to.equal(1) + expect(Object.keys(swarmA.muxedConns).length).to.equal(2) + }, 100) + }) + }) }) }) +/* describe('conn upgrades', function () { - this.timeout(10000) + this.timeout(20000) describe('secio on tcp', () => { - before((done) => { done() }) - after((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) it.skip('tls on a muxed stream (not the full conn)', (done) => {}) }) describe('tls on tcp', () => { - before((done) => { done() }) - after((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -324,12 +424,14 @@ describe('conn upgrades', function () { }) describe('high level API - with everything mixed all together!', function () { - this.timeout(10000) + this.timeout(20000) - before((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add tcp', (done) => {}) it.skip('add utp', (done) => {}) it.skip('add websockets', (done) => {}) it.skip('dial', (done) => {}) }) +*/