From c8f2fdd0775129971033e0067090be7890c577fa Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 3 Mar 2016 16:35:05 +0000 Subject: [PATCH 1/6] internal transport interface + libp2p-tcp tests --- README.md | 13 +- package.json | 14 +- src/index.js | 162 ++++++++++++- tests/swarm-old.js | 354 +++++++++++++++++++++++++++++ tests/swarm-test.js | 536 ++++++++++++++++++-------------------------- 5 files changed, 739 insertions(+), 340 deletions(-) create mode 100644 tests/swarm-old.js diff --git a/README.md b/README.md index dc4200c1..3b8225f2 100644 --- a/README.md +++ b/README.md @@ -18,18 +18,23 @@ libp2p-swarm is used by libp2p but it can be also used as a standalone module. libp2p-swarm is available on npm and so, like any other npm module, just: ```bash -$ npm install libp2p-swarm --save +> npm install libp2p-swarm --save ``` And use it in your Node.js code as: ```JavaScript -var Swarm = require('libp2p-swarm') +const Swarm = require('libp2p-swarm') -var sw = new Swarm(peerInfoSelf) +const sw = new Swarm(peerInfo) ``` -peerInfoSelf is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. +peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. + + +---------- +BELOW NEEDS AN UPDATE + ### Support a transport diff --git a/package.json b/package.json index 180bb5bc..f6987119 100644 --- a/package.json +++ b/package.json @@ -26,28 +26,26 @@ "test" ], "engines": { - "node": "^4.0.0" + "node": "^4.3.0" }, "devDependencies": { + "bl": "^1.1.2", "chai": "^3.5.0", "istanbul": "^0.4.2", "libp2p-spdy": "^0.1.0", - "libp2p-tcp": "^0.1.1", + "libp2p-tcp": "^0.2.1", "mocha": "^2.4.5", + "multiaddr": "^1.1.1", + "peer-id": "^0.5.1", + "peer-info": "^0.5.2", "pre-commit": "^1.1.2", - "sinon": "^1.15.4", "standard": "^6.0.7", "stream-pair": "^1.0.3" }, "dependencies": { "async": "^1.3.0", "ip-address": "^5.0.2", - "ipfs-logger": "^0.1.0", - "multiaddr": "^1.0.0", - "multiplex-stream-muxer": "^0.2.0", "multistream-select": "^0.6.1", - "peer-id": "^0.3.3", - "peer-info": "^0.3.2", "protocol-buffers-stream": "^1.2.0" } } diff --git a/src/index.js b/src/index.js index 54c25426..f933418c 100644 --- a/src/index.js +++ b/src/index.js @@ -1,14 +1,161 @@ -var multistream = require('multistream-select') -var async = require('async') -var identify = require('./identify') +// const multistream = require('multistream-select') +// const async = require('async') +// const identify = require('./identify') +const PassThrough = require('stream').PassThrough exports = module.exports = Swarm function Swarm (peerInfo) { - var self = this + if (!(this instanceof Swarm)) { + return new Swarm(peerInfo) + } - if (!(self instanceof Swarm)) { - throw new Error('Swarm must be called with new') + if (!peerInfo) { + throw new Error('You must provide a value for `peerInfo`') + } + + // transports -- + + // { key: transport }; e.g { tcp: } + this.transports = {} + + this.transport = {} + + this.transport.add = (key, transport, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + if (!callback) { callback = noop } + + if (this.transports[key]) { + throw new Error('There is already a transport with this key') + } + this.transports[key] = transport + callback() + } + + this.transport.dial = (key, multiaddrs, callback) => { + const t = this.transports[key] + + // a) if multiaddrs.length = 1, return the conn from the + // transport, otherwise, create a passthrough + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } + if (multiaddrs.length === 1) { + const conn = t.dial(multiaddrs.shift(), {ready: () => { + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, conn) + }}) + conn.once('error', () => { + callback(new Error('failed to connect to every multiaddr')) + }) + return conn + } + + // b) multiaddrs should already be a filtered list + // specific for the transport we are using + const pt = new PassThrough() + + next(multiaddrs.shift()) + return pt + function next (multiaddr) { + const conn = t.dial(multiaddr, {ready: () => { + pt.pipe(conn).pipe(pt) + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, pt) + }}) + + conn.once('error', () => { + if (multiaddrs.length === 0) { + return callback(new Error('failed to connect to every multiaddr')) + } + next(multiaddrs.shift()) + }) + } + } + + this.transport.listen = (key, options, handler, callback) => { + // if no callback is passed, we pass conns to connHandler + if (!handler) { handler = connHandler } + + const multiaddrs = peerInfo.multiaddrs.filter((m) => { + if (m.toString().indexOf('tcp') !== -1) { + return m + } + }) + + this.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => { + if (err) { + return callback(err) + } + if (maUpdate) { + // because we can listen on port 0... + peerInfo.multiaddr.replace(multiaddrs, maUpdate) + } + + callback() + }) + } + + this.transport.close = (key, callback) => { + this.transports[key].close(callback) + } + + // connections -- + + // { peerIdB58: { conn: }} + this.conns = {} + + // { + // peerIdB58: { + // muxerName: { + // muxer: + // rawSocket: socket // to abstract info required for the Identify Protocol + // } + // } + this.muxedConns = {} + + this.connection = {} + this.connection.addUpgrade = () => {} + this.connection.addStreamMuxer = () => {} + + // enable the Identify protocol + this.connection.reuse = () => {} + + // main API - higher level abstractions -- + + this.dial = () => { + // TODO + } + this.handle = (protocol, callback) => { + // TODO + } + this.close = (callback) => { + var count = 0 + + Object.keys(this.transports).forEach((key) => { + this.transports[key].close(() => { + if (++count === Object.keys(this.transports).length) { + callback() + } + }) + }) + } + + function connHandler (conn) { + // do all the multistream select handshakes (this should be probably done recursively + } +} + +/* +function Swarm (peerInfo) { + var self = this + if (!(this instanceof Swarm)) { + return new Swarm(peerInfo) } if (!peerInfo) { @@ -341,3 +488,6 @@ function Swarm (peerInfo) { }) } } +*/ + +function noop () {} diff --git a/tests/swarm-old.js b/tests/swarm-old.js new file mode 100644 index 00000000..0acd7654 --- /dev/null +++ b/tests/swarm-old.js @@ -0,0 +1,354 @@ +/* eslint-env mocha */ + +var async = require('async') +var expect = require('chai').expect + +var multiaddr = require('multiaddr') +var Id = require('peer-id') +var Peer = require('peer-info') +var Swarm = require('../src') +var tcp = require('libp2p-tcp') +var Spdy = require('libp2p-spdy') + +// because of Travis-CI +process.on('uncaughtException', function (err) { + console.log('Caught exception: ' + err) +}) + +describe('Basics', function () { + it('enforces creation with new', function (done) { + expect(function () { + Swarm() + }).to.throw() + done() + }) + + it('it throws an exception without peerSelf', function (done) { + expect(function () { + var sw = new Swarm() + sw.close() + }).to.throw(Error) + done() + }) +}) + +describe('When dialing', function () { + describe('if the swarm does add any of the peer transports', function () { + it('it returns an error', function (done) { + var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')]) + var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')]) + var swarm = new Swarm(peerOne) + + swarm.dial(peerTwo, {}, function (err) { + expect(err).to.exist + done() + }) + }) + }) +}) + +describe('Without a Stream Muxer', function () { + describe('and one swarm over tcp', function () { + it('add the transport', function (done) { + var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p = new Peer(Id.create(), []) + var sw = new Swarm(p) + + sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) + + function ready () { + expect(sw.transports['tcp'].options).to.deep.equal({}) + expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) + expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) + expect(sw.transports['tcp'].transport).to.deep.equal(tcp) + + sw.close(done) + } + }) + }) + + describe('and two swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + afterEach(function (done) { + async.parallel([sw1.close, sw2.close], done) + }) + + it('dial a conn', function (done) { + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) + done() + }) + }) + + it('dial a conn on a protocol', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + conn.end() + conn.on('end', done) + }) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + }) + + it('dial a protocol on a previous created conn', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + conn.end() + conn.on('end', done) + }) + + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + + conn.end() + }) + }) + }) + + // it('add an upgrade', function (done) { done() }) + // it('dial a conn on top of a upgrade', function (done) { done() }) + // it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) + + /* TODO + describe('udp', function () { + it('add the transport', function (done) { done() }) + it('dial a conn', function (done) { done() }) + it('dial a conn on a protocol', function (done) { done() }) + it('add an upgrade', function (done) { done() }) + it('dial a conn on top of a upgrade', function (done) { done() }) + it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) */ + + /* TODO + describe('udt', function () { + it('add the transport', function (done) { done() }) + it('dial a conn', function (done) { done() }) + it('dial a conn on a protocol', function (done) { done() }) + it('add an upgrade', function (done) { done() }) + it('dial a conn on top of a upgrade', function (done) { done() }) + it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) */ + +/* TODO +describe('utp', function () { + it('add the transport', function (done) { done() }) + it('dial a conn', function (done) { done() }) + it('dial a conn on a protocol', function (done) { done() }) + it('add an upgrade', function (done) { done() }) + it('dial a conn on top of a upgrade', function (done) { done() }) + it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) +}) */ +}) + +describe('With a SPDY Stream Muxer', function () { + describe('and one swarm over tcp', function () { + // TODO: What is the it here? + it('add Stream Muxer', function (done) { + // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p = new Peer(Id.create(), []) + var sw = new Swarm(p) + sw.addStreamMuxer('spdy', Spdy, {}) + + done() + }) + }) + + describe('and two swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + sw1.addStreamMuxer('spdy', Spdy, {}) + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + sw2.addStreamMuxer('spdy', Spdy, {}) + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + function afterEach (done) { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + + done() + } + } + + it('dial a conn on a protocol', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + expect(Object.keys(sw2.muxedConns).length).to.equal(0) + afterEach(done) + }) + }) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + }) + + it('dial two conns (transport reuse)', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formality so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + expect(Object.keys(sw2.muxedConns).length).to.equal(0) + + afterEach(done) + }) + }) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + // TODO Improve clarity + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + + conn.end() + }) + + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + + conn.end() + }) + }) + }) + + describe('and two identity enabled swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + sw1.addStreamMuxer('spdy', Spdy, {}) + sw1.enableIdentify() + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + sw2.addStreamMuxer('spdy', Spdy, {}) + sw2.enableIdentify() + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + afterEach(function (done) { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + // give time for identify to finish + setTimeout(function () { + expect(Object.keys(sw2.muxedConns).length).to.equal(1) + done() + }, 500) + } + }) + + it('identify', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + done() + }) + }) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + }) + }) +}) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 0acd7654..a9ecfb26 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -1,354 +1,246 @@ /* eslint-env mocha */ -var async = require('async') -var expect = require('chai').expect +const expect = require('chai').expect +// const async = require('async') -var multiaddr = require('multiaddr') -var Id = require('peer-id') -var Peer = require('peer-info') -var Swarm = require('../src') -var tcp = require('libp2p-tcp') -var Spdy = require('libp2p-spdy') +const multiaddr = require('multiaddr') +// const Id = require('peer-id') +const Peer = require('peer-info') +const Swarm = require('../src') +const TCP = require('libp2p-tcp') +const bl = require('bl') +// var SPDY = require('libp2p-spdy') -// because of Travis-CI -process.on('uncaughtException', function (err) { - console.log('Caught exception: ' + err) -}) - -describe('Basics', function () { - it('enforces creation with new', function (done) { - expect(function () { - Swarm() - }).to.throw() - done() - }) - - it('it throws an exception without peerSelf', function (done) { - expect(function () { - var sw = new Swarm() - sw.close() - }).to.throw(Error) +describe('basics', () => { + it('throws on missing peerInfo', (done) => { + expect(Swarm).to.throw(Error) done() }) }) -describe('When dialing', function () { - describe('if the swarm does add any of the peer transports', function () { - it('it returns an error', function (done) { - var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')]) - var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')]) - var swarm = new Swarm(peerOne) +describe('transport - tcp', function () { + this.timeout(10000) - swarm.dial(peerTwo, {}, function (err) { - expect(err).to.exist - done() - }) - }) - }) -}) + var swarmA + var swarmB + var peerA = new Peer() + var peerB = new Peer() -describe('Without a Stream Muxer', function () { - describe('and one swarm over tcp', function () { - it('add the transport', function (done) { - var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - - sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) - - function ready () { - expect(sw.transports['tcp'].options).to.deep.equal({}) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - - sw.close(done) - } - }) + before((done) => { + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + done() }) - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - async.parallel([sw1.close, sw2.close], done) - }) - - it('dial a conn', function (done) { - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - done() - }) - }) - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial a protocol on a previous created conn', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - // it('add an upgrade', function (done) { done() }) - // it('dial a conn on top of a upgrade', function (done) { done() }) - // it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) - - /* TODO - describe('udp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - - /* TODO - describe('udt', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - -/* TODO -describe('utp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) -}) */ -}) - -describe('With a SPDY Stream Muxer', function () { - describe('and one swarm over tcp', function () { - // TODO: What is the it here? - it('add Stream Muxer', function (done) { - // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - sw.addStreamMuxer('spdy', Spdy, {}) - + it('add', (done) => { + swarmA.transport.add('tcp', new TCP()) + expect(Object.keys(swarmA.transports).length).to.equal(1) + swarmB.transport.add('tcp', new TCP(), () => { + expect(Object.keys(swarmB.transports).length).to.equal(1) done() }) }) - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - function afterEach (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } + it('listen', (done) => { + var count = 0 + swarmA.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + swarmB.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + function ready () { + if (++count === 2) { + expect(peerA.multiaddrs.length).to.equal(1) + expect(peerA.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9888')) + expect(peerB.multiaddrs.length).to.equal(1) + expect(peerB.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9999')) done() } } - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial two conns (transport reuse)', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formality so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - // TODO Improve clarity - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) }) - describe('and two identity enabled swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - sw1.enableIdentify() - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.enableIdentify() - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) + it('dial to a multiaddr', (done) => { + const conn = swarmA.transport.dial('tcp', multiaddr('/ip4/127.0.0.1/tcp/9999'), (err, conn) => { + expect(err).to.not.exist }) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + done() + })) + conn.write('hey') + conn.end() + }) - afterEach(function (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) + it('dial to set of multiaddr, only one is available', (done) => { + const conn = swarmA.transport.dial('tcp', [ + multiaddr('/ip4/127.0.0.1/tcp/9910'), + multiaddr('/ip4/127.0.0.1/tcp/9999'), + multiaddr('/ip4/127.0.0.1/tcp/9309') + ], (err, conn) => { + expect(err).to.not.exist + }) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + done() + })) + conn.write('hey') + conn.end() + }) - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) + it('close', (done) => { + var count = 0 + swarmA.transport.close('tcp', closed) + swarmB.transport.close('tcp', closed) - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - // give time for identify to finish - setTimeout(function () { - expect(Object.keys(sw2.muxedConns).length).to.equal(1) - done() - }, 500) + function closed () { + if (++count === 2) { + done() } - }) + } + }) - it('identify', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) + it('support port 0', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - done() - }) - }) + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/0')) + swarm.close(done) + } + }) - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) + it('support addr /ip4/0.0.0.0/tcp/9050', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/9050')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/9050')) + swarm.close(done) + } + }) + + it('support addr /ip4/0.0.0.0/tcp/0', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/0')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0')) + swarm.close(done) + } + }) + + it('listen in several addrs', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(3) + swarm.close(done) + } }) }) + +describe('transport - udt', () => { + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('listen', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('close', (done) => {}) +}) + +describe('transport - websockets', () => { + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('listen', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('close', (done) => {}) +}) + +describe('high level API - 1st stage, without stream multiplexing (on TCP)', () => { + it.skip('handle a protocol', (done) => {}) + it.skip('dial on protocol', (done) => {}) + it.skip('dial to warm conn', (done) => {}) + it.skip('dial on protocol, reuse warmed conn', (done) => {}) + it.skip('close', (done) => {}) +}) + +describe('stream muxing (on TCP)', () => { + describe('multiplex', () => { + before((done) => { done() }) + it.skip('add', (done) => {}) + it.skip('handle + dial on protocol', (done) => {}) + it.skip('dial to warm conn', (done) => {}) + it.skip('dial on protocol, reuse warmed conn', (done) => {}) + it.skip('enable identify to reuse incomming muxed conn', (done) => {}) + it.skip('close', (done) => {}) + }) + describe('spdy', () => { + it.skip('add', (done) => {}) + it.skip('handle + dial on protocol', (done) => {}) + it.skip('dial to warm conn', (done) => {}) + it.skip('dial on protocol, reuse warmed conn', (done) => {}) + it.skip('enable identify to reuse incomming muxed conn', (done) => {}) + it.skip('close', (done) => {}) + }) +}) + +describe('conn upgrades', () => { + describe('secio on tcp', () => { + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('tls on a muxed stream (not the full conn)', (done) => {}) + }) + describe('tls on tcp', () => { + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('tls on a muxed stream (not the full conn)', (done) => {}) + }) +}) + +describe('high level API = 2nd stage, with everything all together!', () => { + before((done) => { done() }) + + it.skip('add tcp', (done) => {}) + it.skip('add utp', (done) => {}) + it.skip('add websockets', (done) => {}) + it.skip('dial', (done) => {}) +}) From 9d8ee67c61d2c57b6a68cd04fabc8e4b8afdfeea Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 6 Mar 2016 08:40:49 +0000 Subject: [PATCH 2/6] high level API working + tests --- package.json | 2 +- src/index.js | 495 ++++++++++++-------------------------------- tests/swarm-test.js | 115 ++++++++-- 3 files changed, 238 insertions(+), 374 deletions(-) diff --git a/package.json b/package.json index f6987119..70983e97 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "libp2p-tcp": "^0.2.1", "mocha": "^2.4.5", "multiaddr": "^1.1.1", - "peer-id": "^0.5.1", + "peer-id": "^0.5.3", "peer-info": "^0.5.2", "pre-commit": "^1.1.2", "standard": "^6.0.7", diff --git a/src/index.js b/src/index.js index f933418c..a4658713 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,4 @@ -// const multistream = require('multistream-select') +const multistream = require('multistream-select') // const async = require('async') // const identify = require('./identify') const PassThrough = require('stream').PassThrough @@ -38,11 +38,14 @@ function Swarm (peerInfo) { this.transport.dial = (key, multiaddrs, callback) => { const t = this.transports[key] - // a) if multiaddrs.length = 1, return the conn from the - // transport, otherwise, create a passthrough if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] } + + // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) + + // b) if multiaddrs.length = 1, return the conn from the + // transport, otherwise, create a passthrough if (multiaddrs.length === 1) { const conn = t.dial(multiaddrs.shift(), {ready: () => { const cb = callback @@ -55,7 +58,7 @@ function Swarm (peerInfo) { return conn } - // b) multiaddrs should already be a filtered list + // c) multiaddrs should already be a filtered list // specific for the transport we are using const pt = new PassThrough() @@ -112,28 +115,143 @@ function Swarm (peerInfo) { // { // peerIdB58: { - // muxerName: { - // muxer: - // rawSocket: socket // to abstract info required for the Identify Protocol - // } + // muxer: + // rawSocket: socket // to abstract info required for the Identify Protocol // } + // } this.muxedConns = {} + // { protocol: handler } + this.protocols = {} + this.connection = {} this.connection.addUpgrade = () => {} - this.connection.addStreamMuxer = () => {} + + // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } + this.muxers = {} + this.connection.addStreamMuxer = (muxer) => { + // TODO + // .handle(protocol, () => { + // after attaching the stream muxer, check if identify is enabled + // }) + // TODO add to the list of muxers available + } // enable the Identify protocol - this.connection.reuse = () => {} - - // main API - higher level abstractions -- - - this.dial = () => { - // TODO + this.connection.reuse = () => { + // TODO identify } - this.handle = (protocol, callback) => { - // TODO + + const self = this // couldn't get rid of this + + // incomming connection handler + function connHandler (conn) { + var msS = new multistream.Select() + msS.handle(conn) + Object.keys(self.protocols).forEach(function (protocol) { + msS.addHandler(protocol, self.protocols[protocol]) + }) } + + // higher level (public) API + this.dial = (pi, protocol, callback) => { + var pt = null + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } else { + pt = new PassThrough() + } + + const b58Id = pi.id.toB58String() + if (!this.muxedConns[b58Id]) { + if (!this.conns[b58Id]) { + attemptDial(pi, (err, conn) => { + if (err) { + return callback(err) + } + gotWarmedUpConn(conn) + }) + } else { + const conn = this.conns[b58Id] + this.conns[b58Id] = undefined + gotWarmedUpConn(conn) + } + } else { + gotMuxer(this.muxedConns[b58Id].muxer) + } + + function gotWarmedUpConn (conn) { + attemptMuxerUpgrade(conn, (err, muxer) => { + if (!protocol) { + if (err) { + self.conns[b58Id] = conn + } + return callback() + } + + if (err) { + // couldn't upgrade to Muxer, it is ok + protocolHandshake(conn, protocol, callback) + } else { + gotMuxer(muxer) + } + }) + } + + function gotMuxer (muxer) { + openConnInMuxedConn(muxer, (conn) => { + protocolHandshake(conn, protocol, callback) + }) + } + + function attemptDial (pi, cb) { + const tKeys = Object.keys(self.transports) + nextTransport(tKeys.shift()) + + function nextTransport (key) { + const multiaddrs = pi.multiaddrs.slice() + self.transport.dial(key, multiaddrs, (err, conn) => { + if (err) { + if (tKeys.length === 0) { + return cb(new Error('Could not dial in any of the transports')) + } + return nextTransport(tKeys.shift()) + } + cb(null, conn) + }) + } + } + + function attemptMuxerUpgrade (conn, cb) { + if (Object.keys(self.muxers).length === 0) { + return cb(new Error('no muxers available')) + } + // TODO add muxer to the muxedConns object for the peerId + // TODO if it succeeds, add incomming open coons to connHandler + } + function openConnInMuxedConn (muxer, cb) { + // TODO open a conn in this muxer + } + + function protocolHandshake (conn, protocol, cb) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(protocol, (err, conn) => { + if (err) { + return callback(err) + } + pt.pipe(conn).pipe(pt) + callback(null, pt) + }) + }) + } + } + + this.handle = (protocol, handler) => { + this.protocols[protocol] = handler + } + this.close = (callback) => { var count = 0 @@ -145,349 +263,6 @@ function Swarm (peerInfo) { }) }) } - - function connHandler (conn) { - // do all the multistream select handshakes (this should be probably done recursively - } } -/* -function Swarm (peerInfo) { - var self = this - if (!(this instanceof Swarm)) { - return new Swarm(peerInfo) - } - - if (!peerInfo) { - throw new Error('You must provide a value for `peerInfo`') - } - - self.peerInfo = peerInfo - - // peerIdB58: { conn: } - self.conns = {} - - // peerIdB58: { - // muxer: , - // socket: socket // so we can extract the info we need for identify - // } - self.muxedConns = {} - - // transportName: { transport: transport, - // dialOptions: dialOptions, - // listenOptions: listenOptions, - // listeners: [] } - self.transports = {} - - // transportName: listener - self.listeners = {} - - // protocolName: handlerFunc - self.protocols = {} - - // muxerName: { Muxer: Muxer // Muxer is a constructor - // options: options } - self.muxers = {} - - // for connection reuse - self.identify = false - - // public interface - - self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { - // set up the transport and add the list of incoming streams - // add transport to the list of transports - - var multiaddr = options.multiaddr - if (multiaddr) { - // no need to pass that to the transports - delete options.multiaddr - } - - var listener = transport.createListener(options, listen) - - listener.listen(listenOptions, function ready () { - self.transports[name] = { - transport: transport, - options: options, - dialOptions: dialOptions, - listenOptions: listenOptions, - listener: listener - } - - // If a known multiaddr is passed, then add to our list of multiaddrs - if (multiaddr) { - self.peerInfo.multiaddrs.push(multiaddr) - } - - callback() - }) - } - - self.addUpgrade = function (ConnUpgrade, options) {} - - self.addStreamMuxer = function (name, StreamMuxer, options) { - self.muxers[name] = { - Muxer: StreamMuxer, - options: options - } - } - - self.dial = function (peerInfo, options, protocol, callback) { - // 1. check if we have transports we support - // 2. check if we have a conn waiting - // 3. check if we have a stream muxer available - - if (typeof protocol === 'function') { - callback = protocol - protocol = undefined - } - - // check if a conn is waiting - // if it is and protocol was selected, jump into multistreamHandshake - // if it is and no protocol was selected, do nothing and call and empty callback - - if (self.conns[peerInfo.id.toB58String()]) { - if (protocol) { - if (self.muxers['spdy']) { - // TODO upgrade this conn to a muxer - console.log('TODO: upgrade a warm conn to muxer that was added after') - } else { - multistreamHandshake(self.conns[peerInfo.id.toB58String()]) - } - self.conns[peerInfo.id.toB58String()] = undefined - delete self.conns[peerInfo.id.toB58String()] - return - } else { - return callback() - } - } - - // check if a stream muxer for this peer is available - if (self.muxedConns[peerInfo.id.toB58String()]) { - if (protocol) { - return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer) - } else { - return callback() - } - } - - // Creating a new conn with this peer routine - - // TODO - check if there is a preference in protocol to use on - // options.protocol - var supportedTransports = Object.keys(self.transports) - var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) { - return multiaddr.protoNames().some(function (proto) { - return supportedTransports.indexOf(proto) >= 0 - }) - }) - - if (!multiaddrs.length) { - callback(new Error("The swarm doesn't support any of the peer transports")) - return - } - - var conn - - async.eachSeries(multiaddrs, function (multiaddr, next) { - if (conn) { - return next() - } - - var transportName = getTransportNameForMultiaddr(multiaddr) - var transport = self.transports[transportName] - var dialOptions = clone(transport.dialOptions) - dialOptions.ready = connected - - var connTry = transport.transport.dial(multiaddr, dialOptions) - - connTry.once('error', function (err) { - if (err) { - return console.log(err) // TODO handle error - } - next() // try next multiaddr - }) - - function connected () { - conn = connTry - next() - } - - function getTransportNameForMultiaddr (multiaddr) { - // this works for all those ip + transport + port tripplets - return multiaddr.protoNames()[1] - } - - function clone (obj) { - var target = {} - for (var i in obj) { - if (obj.hasOwnProperty(i)) { - target[i] = obj[i] - } - } - return target - } - }, done) - - function done () { - // TODO apply upgrades - // apply stream muxer - // if no protocol is selected, save it in the pool - // if protocol is selected, multistream that protocol - if (!conn) { - callback(new Error('Unable to open a connection')) - return - } - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, false, function (err, muxer) { - if (err) { - return console.log(err) // TODO Treat error - } - - muxer.on('stream', userProtocolMuxer) - - self.muxedConns[peerInfo.id.toB58String()] = { - muxer: muxer, - socket: conn - } - - if (protocol) { - openMuxedStream(muxer) - } else { - callback() - } - }) - } else { - if (protocol) { - multistreamHandshake(conn) - } else { - self.conns[peerInfo.id.toB58String()] = conn - callback() - } - } - } - - function openMuxedStream (muxer) { - // 1. create a new stream on this muxedConn and pass that to - // multistreamHanshake - muxer.dialStream(function (err, conn) { - if (err) { - return console.log(err) // TODO Treat error - } - multistreamHandshake(conn) - }) - } - - function multistreamHandshake (conn) { - var msI = new multistream.Interactive() - msI.handle(conn, function () { - msI.select(protocol, callback) - }) - } - } - - self.closeListener = function (transportName, callback) { - self.transports[transportName].listener.close(closed) - - // only gets called when all the streams on this transport are closed too - function closed () { - delete self.transports[transportName] - callback() - } - } - - // Iterates all the listeners closing them - // one by one. It calls back once all are closed. - self.closeAllListeners = function (callback) { - var transportNames = Object.keys(self.transports) - - async.each(transportNames, self.closeListener, callback) - } - - self.closeConns = function (callback) { - // close warmed up cons so that the listener can gracefully exit - Object.keys(self.conns).forEach(function (conn) { - self.conns[conn].destroy() - }) - self.conns = {} - - callback() - } - - // Closes both transport listeners and - // connections. It calls back once everything - // is closed - self.close = function (callback) { - async.parallel([ - self.closeAllListeners, - self.closeConns - ], callback) - } - - self.enableIdentify = function () { - // set flag to true - // add identify to the list of handled protocols - self.identify = true - - // we pass muxedConns so that identify can access the socket of the other - // peer - self.handleProtocol(identify.protoId, - identify.getHandlerFunction(self.peerInfo, self.muxedConns)) - } - - self.handleProtocol = function (protocol, handlerFunction) { - self.protocols[protocol] = handlerFunction - } - - // internals - - function listen (conn) { - // TODO apply upgrades - // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, true, function (err, muxer) { - if (err) { - return console.log(err) // TODO treat error - } - - // TODO This muxer has to be identified! - // pass to identify a reference of - // our muxedConn list - // ourselves (peerInfo) - // the conn, which is the socket - // and a stream it can send stuff - if (self.identify) { - muxer.dialStream(function (err, stream) { - if (err) { - return console.log(err) // TODO Treat error - } - // conn === socket at this point - identify(self.muxedConns, self.peerInfo, conn, stream, muxer) - }) - } - - muxer.on('stream', userProtocolMuxer) - }) - } else { - // if no stream muxer, then - userProtocolMuxer(conn) - } - } - - // Handle user given protocols - function userProtocolMuxer (conn) { - var msS = new multistream.Select() - msS.handle(conn) - Object.keys(self.protocols).forEach(function (protocol) { - msS.addHandler(protocol, self.protocols[protocol]) - }) - } -} -*/ - function noop () {} diff --git a/tests/swarm-test.js b/tests/swarm-test.js index a9ecfb26..02e752cd 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -173,7 +173,9 @@ describe('transport - tcp', function () { }) }) -describe('transport - udt', () => { +describe('transport - udt', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add', (done) => {}) @@ -182,7 +184,9 @@ describe('transport - udt', () => { it.skip('close', (done) => {}) }) -describe('transport - websockets', () => { +describe('transport - websockets', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add', (done) => {}) @@ -191,37 +195,119 @@ describe('transport - websockets', () => { it.skip('close', (done) => {}) }) -describe('high level API - 1st stage, without stream multiplexing (on TCP)', () => { - it.skip('handle a protocol', (done) => {}) - it.skip('dial on protocol', (done) => {}) - it.skip('dial to warm conn', (done) => {}) - it.skip('dial on protocol, reuse warmed conn', (done) => {}) - it.skip('close', (done) => {}) +describe('high level API - 1st without stream multiplexing (on TCP)', function () { + this.timeout(10000) + + var swarmA + var peerA + var swarmB + var peerB + + before((done) => { + peerA = new Peer() + peerB = new Peer() + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 2) { + done() + } + } + }) + + after((done) => { + var counter = 0 + + swarmA.close(closed) + swarmB.close(closed) + + function closed () { + if (++counter === 2) { + done() + } + } + }) + + it('handle a protocol', (done) => { + swarmB.handle('/bananas/1.0.0', (conn) => { + conn.pipe(conn) + }) + expect(Object.keys(swarmB.protocols).length).to.equal(1) + done() + }) + + it('dial on protocol', (done) => { + swarmB.handle('/pineapple/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmA.dial(peerB, '/pineapple/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) + }) + }) + + it('dial to warm a conn', (done) => { + swarmA.dial(peerB, (err) => { + expect(err).to.not.exist + done() + }) + }) + + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.dial(peerB, '/bananas/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) + }) + }) }) -describe('stream muxing (on TCP)', () => { +describe('stream muxing (on TCP)', function () { + this.timeout(10000) + describe('multiplex', () => { before((done) => { done() }) + after((done) => { done() }) + it.skip('add', (done) => {}) it.skip('handle + dial on protocol', (done) => {}) it.skip('dial to warm conn', (done) => {}) it.skip('dial on protocol, reuse warmed conn', (done) => {}) it.skip('enable identify to reuse incomming muxed conn', (done) => {}) - it.skip('close', (done) => {}) }) describe('spdy', () => { + before((done) => { done() }) + after((done) => { done() }) + it.skip('add', (done) => {}) it.skip('handle + dial on protocol', (done) => {}) it.skip('dial to warm conn', (done) => {}) it.skip('dial on protocol, reuse warmed conn', (done) => {}) it.skip('enable identify to reuse incomming muxed conn', (done) => {}) - it.skip('close', (done) => {}) }) }) -describe('conn upgrades', () => { +describe('conn upgrades', function () { + this.timeout(10000) + describe('secio on tcp', () => { before((done) => { done() }) + after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -229,6 +315,7 @@ describe('conn upgrades', () => { }) describe('tls on tcp', () => { before((done) => { done() }) + after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -236,7 +323,9 @@ describe('conn upgrades', () => { }) }) -describe('high level API = 2nd stage, with everything all together!', () => { +describe('high level API - with everything mixed all together!', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add tcp', (done) => {}) From f8e14e4ddf050eb6626decc1146b7bde9e045648 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 7 Mar 2016 12:47:11 +0000 Subject: [PATCH 3/6] stream multiplexing done, starting on identify refactor --- package.json | 2 +- src/identify.js | 59 ++++---- src/index.js | 64 ++++++-- tests/swarm-old.js | 354 -------------------------------------------- tests/swarm-test.js | 136 ++++++++++++++--- 5 files changed, 206 insertions(+), 409 deletions(-) delete mode 100644 tests/swarm-old.js diff --git a/package.json b/package.json index 70983e97..8c133589 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "bl": "^1.1.2", "chai": "^3.5.0", "istanbul": "^0.4.2", - "libp2p-spdy": "^0.1.0", + "libp2p-spdy": "^0.2.3", "libp2p-tcp": "^0.2.1", "mocha": "^2.4.5", "multiaddr": "^1.1.1", diff --git a/src/identify.js b/src/identify.js index bf78cd89..7bad7eb7 100644 --- a/src/identify.js +++ b/src/identify.js @@ -1,30 +1,47 @@ /* - * Identify is one of the protocols swarms speaks in order to broadcast and learn - * about the ip:port pairs a specific peer is available through + * Identify is one of the protocols swarms speaks in order to + * broadcast and learn about the ip:port pairs a specific peer + * is available through */ -var Interactive = require('multistream-select').Interactive -var protobufs = require('protocol-buffers-stream') -var fs = require('fs') -var path = require('path') -var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -var Address6 = require('ip-address').Address6 -var Id = require('peer-id') -var multiaddr = require('multiaddr') +// var multistream = require('multistream-select') +// var protobufs = require('protocol-buffers-stream') +// var fs = require('fs') +// var path = require('path') +// var protobufs = require('protocol-buffers-stream') +// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) +// var Address6 = require('ip-address').Address6 +// var Id = require('peer-id') +// var multiaddr = require('multiaddr') -exports = module.exports = identify +exports = module.exports -var protoId = '/ipfs/identify/1.0.0' +exports.multicodec = '/ipfs/identify/1.0.0' -exports.protoId = protoId -var createProtoStream = protobufs(schema) +exports.exec = (muxedConn, callback) => { + // TODO + // 1. open a stream + // 2. multistream into identify + // 3. send what I see from this other peer + // 4. receive what the other peer sees from me + // 4. callback with (err, peerInfo) +} +exports.handler = (peerInfo) => { + return function (conn) { + // TODO + // 1. receive incoming observed info about me + // 2. send back what I see from the other + } +} + +/* function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { var msi = new Interactive() msi.handle(conn, function () { msi.select(protoId, function (err, ds) { if (err) { - return console.log(err) // TODO Treat error + return console.log(err) } var ps = createProtoStream() @@ -39,16 +56,6 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { socket: socket } - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - // }) - }) - var mh = getMultiaddr(socket) ps.identify({ @@ -156,4 +163,4 @@ function updateSelf (peerSelf, observedAddr) { peerSelf.multiaddrs.push(omh) } } -} +}*/ diff --git a/src/index.js b/src/index.js index a4658713..4e3f3f6d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,6 @@ const multistream = require('multistream-select') // const async = require('async') -// const identify = require('./identify') +const identify = require('./identify') const PassThrough = require('stream').PassThrough exports = module.exports = Swarm @@ -130,16 +130,28 @@ function Swarm (peerInfo) { // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } this.muxers = {} this.connection.addStreamMuxer = (muxer) => { - // TODO - // .handle(protocol, () => { - // after attaching the stream muxer, check if identify is enabled - // }) - // TODO add to the list of muxers available + // for dialing + this.muxers[muxer.multicodec] = muxer + + // for listening + this.handle(muxer.multicodec, (conn) => { + const muxedConn = muxer(conn, true) + muxedConn.on('stream', connHandler) + + if (this.identify) { + identify.exec(muxedConn, (err, pi) => { + if (err) {} + // TODO muxedConns[pi.id.toB58String()].muxer = muxedConn + }) + } + }) } // enable the Identify protocol + this.identify = false this.connection.reuse = () => { - // TODO identify + this.identify = true + this.handle(identify.multicodec, identify.handler(peerInfo)) } const self = this // couldn't get rid of this @@ -224,14 +236,40 @@ function Swarm (peerInfo) { } function attemptMuxerUpgrade (conn, cb) { - if (Object.keys(self.muxers).length === 0) { + const muxers = Object.keys(self.muxers) + if (muxers.length === 0) { return cb(new Error('no muxers available')) } - // TODO add muxer to the muxedConns object for the peerId - // TODO if it succeeds, add incomming open coons to connHandler + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + + nextMuxer(muxers.shift()) + + function nextMuxer (key) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + cb(new Error('could not upgrade to stream muxing')) + } else { + nextMuxer(muxers.shift()) + } + } + + const muxedConn = self.muxers[key](conn, false) + self.muxedConns[b58Id] = {} + self.muxedConns[b58Id].muxer = muxedConn + cb(null, muxedConn) + }) + }) + } } function openConnInMuxedConn (muxer, cb) { - // TODO open a conn in this muxer + cb(muxer.newStream()) } function protocolHandshake (conn, protocol, cb) { @@ -255,6 +293,10 @@ function Swarm (peerInfo) { this.close = (callback) => { var count = 0 + Object.keys(this.muxedConns).forEach((key) => { + this.muxedConns[key].muxer.end() + }) + Object.keys(this.transports).forEach((key) => { this.transports[key].close(() => { if (++count === Object.keys(this.transports).length) { diff --git a/tests/swarm-old.js b/tests/swarm-old.js deleted file mode 100644 index 0acd7654..00000000 --- a/tests/swarm-old.js +++ /dev/null @@ -1,354 +0,0 @@ -/* eslint-env mocha */ - -var async = require('async') -var expect = require('chai').expect - -var multiaddr = require('multiaddr') -var Id = require('peer-id') -var Peer = require('peer-info') -var Swarm = require('../src') -var tcp = require('libp2p-tcp') -var Spdy = require('libp2p-spdy') - -// because of Travis-CI -process.on('uncaughtException', function (err) { - console.log('Caught exception: ' + err) -}) - -describe('Basics', function () { - it('enforces creation with new', function (done) { - expect(function () { - Swarm() - }).to.throw() - done() - }) - - it('it throws an exception without peerSelf', function (done) { - expect(function () { - var sw = new Swarm() - sw.close() - }).to.throw(Error) - done() - }) -}) - -describe('When dialing', function () { - describe('if the swarm does add any of the peer transports', function () { - it('it returns an error', function (done) { - var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')]) - var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')]) - var swarm = new Swarm(peerOne) - - swarm.dial(peerTwo, {}, function (err) { - expect(err).to.exist - done() - }) - }) - }) -}) - -describe('Without a Stream Muxer', function () { - describe('and one swarm over tcp', function () { - it('add the transport', function (done) { - var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - - sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) - - function ready () { - expect(sw.transports['tcp'].options).to.deep.equal({}) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - - sw.close(done) - } - }) - }) - - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - async.parallel([sw1.close, sw2.close], done) - }) - - it('dial a conn', function (done) { - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - done() - }) - }) - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial a protocol on a previous created conn', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - // it('add an upgrade', function (done) { done() }) - // it('dial a conn on top of a upgrade', function (done) { done() }) - // it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) - - /* TODO - describe('udp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - - /* TODO - describe('udt', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - -/* TODO -describe('utp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) -}) */ -}) - -describe('With a SPDY Stream Muxer', function () { - describe('and one swarm over tcp', function () { - // TODO: What is the it here? - it('add Stream Muxer', function (done) { - // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - sw.addStreamMuxer('spdy', Spdy, {}) - - done() - }) - }) - - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - function afterEach (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - - done() - } - } - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial two conns (transport reuse)', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formality so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - // TODO Improve clarity - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - describe('and two identity enabled swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - sw1.enableIdentify() - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.enableIdentify() - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - // give time for identify to finish - setTimeout(function () { - expect(Object.keys(sw2.muxedConns).length).to.equal(1) - done() - }, 500) - } - }) - - it('identify', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - done() - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - }) -}) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 02e752cd..06e11612 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -9,7 +9,7 @@ const Peer = require('peer-info') const Swarm = require('../src') const TCP = require('libp2p-tcp') const bl = require('bl') -// var SPDY = require('libp2p-spdy') +const spdy = require('libp2p-spdy') describe('basics', () => { it('throws on missing peerInfo', (done) => { @@ -196,7 +196,7 @@ describe('transport - websockets', function () { }) describe('high level API - 1st without stream multiplexing (on TCP)', function () { - this.timeout(10000) + this.timeout(20000) var swarmA var peerA @@ -278,7 +278,7 @@ describe('high level API - 1st without stream multiplexing (on TCP)', function ( }) describe('stream muxing (on TCP)', function () { - this.timeout(10000) + this.timeout(20000) describe('multiplex', () => { before((done) => { done() }) @@ -291,31 +291,131 @@ describe('stream muxing (on TCP)', function () { it.skip('enable identify to reuse incomming muxed conn', (done) => {}) }) describe('spdy', () => { - before((done) => { done() }) - after((done) => { done() }) + var swarmA + var peerA + var swarmB + var peerB + var swarmC + var peerC - it.skip('add', (done) => {}) - it.skip('handle + dial on protocol', (done) => {}) - it.skip('dial to warm conn', (done) => {}) - it.skip('dial on protocol, reuse warmed conn', (done) => {}) - it.skip('enable identify to reuse incomming muxed conn', (done) => {}) + before((done) => { + peerA = new Peer() + peerB = new Peer() + peerC = new Peer() + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + swarmC = new Swarm(peerC) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + swarmC.transport.add('tcp', new TCP()) + swarmC.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 3) { + done() + } + } + }) + + after((done) => { + var counter = 0 + + swarmA.close(closed) + swarmB.close(closed) + swarmC.close(closed) + + function closed () { + if (++counter === 3) { + done() + } + } + }) + + it('add', (done) => { + swarmA.connection.addStreamMuxer(spdy) + swarmB.connection.addStreamMuxer(spdy) + swarmC.connection.addStreamMuxer(spdy) + done() + }) + + it('handle + dial on protocol', (done) => { + swarmB.handle('/abacaxi/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmA.muxedConns).length).to.equal(1) + conn.end() + conn.on('end', done) + }) + }) + + it('dial to warm conn', (done) => { + swarmB.dial(peerA, (err) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + done() + }) + }) + + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.handle('/papaia/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + conn.end() + conn.on('end', done) + }) + }) + + it.skip('enable identify to reuse incomming muxed conn', (done) => { + swarmA.connection.reuse() + swarmC.connection.reuse() + + swarmC.dial(peerA, (err) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(swarmC.muxedConns).length).to.equal(1) + expect(Object.keys(swarmA.muxedConns).length).to.equal(2) + }, 100) + }) + }) }) }) +/* describe('conn upgrades', function () { - this.timeout(10000) + this.timeout(20000) describe('secio on tcp', () => { - before((done) => { done() }) - after((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) it.skip('tls on a muxed stream (not the full conn)', (done) => {}) }) describe('tls on tcp', () => { - before((done) => { done() }) - after((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -324,12 +424,14 @@ describe('conn upgrades', function () { }) describe('high level API - with everything mixed all together!', function () { - this.timeout(10000) + this.timeout(20000) - before((done) => { done() }) + // before((done) => { done() }) + // after((done) => { done() }) it.skip('add tcp', (done) => {}) it.skip('add utp', (done) => {}) it.skip('add websockets', (done) => {}) it.skip('dial', (done) => {}) }) +*/ From e8de55bc2875f86a754cbd05734da367a2f95195 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 10 Mar 2016 14:38:22 +0000 Subject: [PATCH 4/6] update the docs --- README.md | 101 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 3b8225f2..fd4df148 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ libp2p-swarm is used by libp2p but it can be also used as a standalone module. # Usage -### Install and create a Swarm +## Install libp2p-swarm is available on npm and so, like any other npm module, just: @@ -21,6 +21,10 @@ libp2p-swarm is available on npm and so, like any other npm module, just: > npm install libp2p-swarm --save ``` +## API + +#### Create a libp2p Swarm + And use it in your Node.js code as: ```JavaScript @@ -31,68 +35,77 @@ const sw = new Swarm(peerInfo) peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. +### Transports ----------- -BELOW NEEDS AN UPDATE +##### `swarm.transport.add(key, transport, options, callback)` +libp2p-swarm expects transports that implement [interface-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/js-libp2p-tcp). -### Support a transport +- `key` - the transport identifier +- `transport` - +- `options` +- `callback` -libp2p-swarm expects transports that implement [abstract-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/js-libp2p-tcp), a simple shim on top of the `net` module to make it work with swarm expectations. +##### `swarm.transport.dial(key, multiaddrs, callback)` -```JavaScript -sw.addTransport(transport, [options, dialOptions, listenOptions]) -``` +Dial to a peer on a specific transport. -### Add a connection upgrade +- `key` +- `multiaddrs` +- `callback` -A connection upgrade must be able to receive and return something that implements the [abstract-connection](https://github.com/diasdavid/abstract-connection) interface. +##### `swarm.transport.listen(key, options, handler, callback)` -```JavaScript -sw.addUpgrade(connUpgrade, [options]) -``` +Set a transport to start listening mode. -Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer](https://github.com/diasdavid/abstract-stream-muxer) interface. +- `key` +- `options` +- `handler` +- `callback` -```JavaScript -sw.addStreamMuxer(streamMuxer, [options]) -``` +##### `swarm.transport.close(key, callback)` -### Dial to another peer +Close the listeners of a given transport. -```JavaScript -sw.dial(PeerInfo, options, protocol, callback) -sw.dial(PeerInfo, options, callback) -``` +- `key` +- `callback` + +### Connection + +##### `swarm.connection.addUpgrade()` + +A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/diasdavid/interface-connection) specification. + +> **WIP** + +##### `swarm.connection.addStreamMuxer(muxer)` + +Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/diasdavid/interface-stream-muxer) spec. + +- `muxer` + +##### `swarm.connection.reuse()` + +Enable the identify protocol + +### `swarm.dial(pi, protocol, callback)` dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing. -### Accept requests on a specific protocol +- `pi` - peer info project +- `protocol` +- `callback` -```JavaScript -sw.handleProtocol(protocol, handlerFunction) -``` +### `swarm.handle(protocol, handler)` -### Cleaning up before exiting +handle a new protocol. -Each time you add a transport or dial you create connections. Be sure to close -them up before exiting. To do so you can: +- `protocol` +- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}` -Close a transport listener: +### `swarm.close(callback)` -```js -sw.closeListener(transportName, callback) -sw.closeAllListeners(callback) -``` +close all the listeners and muxers. -Close all open connections +- `callback` -```js -sw.closeConns(callback) -``` - -Close everything! - -```js -sw.close(callback) -``` From 366b6ef382ac40172b3e133b86acbb41b01801a7 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 10 Mar 2016 15:17:07 +0000 Subject: [PATCH 5/6] design notes --- README.md | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/README.md b/README.md index fd4df148..9a7b56b8 100644 --- a/README.md +++ b/README.md @@ -109,3 +109,44 @@ close all the listeners and muxers. - `callback` +# Design + +## Multitransport + +libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through. + +In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec. + +## Connection upgrades + +Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports. + +We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it. + +Types of upgrades to a connection: + +- encrypted channel (with TLS for e.g) +- congestion flow (some transports don't have it by default) +- multipath (open several connections and abstract it as a single connection) +- simulcast (still really thinking this one through, it might be interesting to send a packet through different connections under some hard network circumstances) +- stream-muxer - this a special case, because once we upgrade a connection to a stream-muxer, we can open more streams (multiplex them) on a single stream, also enabling us to reuse the underlying dialed transport + +We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol) + +## Identify + +Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g: + +- a) peer A dials a conn to peer B +- b) that conn gets upgraded to a stream multiplexer that both peers agree +- c) peer B executes de identify protocol +- d) peer B now can open streams to peer A, knowing which is the identity of peer A + +In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies. + +## Notes + +To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as: + +- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex +- transport - something that as a dial/listen interface and return objs that implement a connection interface From 990111980b2773b921538c663706cd9f15cdc00e Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 10 Mar 2016 20:28:58 +0000 Subject: [PATCH 6/6] woot --- examples/peerA.js | 25 ------ examples/peerB.js | 14 ---- package.json | 6 +- src/identify.js | 183 +++++++++++++------------------------------- src/index.js | 34 +++++--- tests/swarm-test.js | 9 ++- 6 files changed, 89 insertions(+), 182 deletions(-) delete mode 100644 examples/peerA.js delete mode 100644 examples/peerB.js diff --git a/examples/peerA.js b/examples/peerA.js deleted file mode 100644 index 9bba4793..00000000 --- a/examples/peerA.js +++ /dev/null @@ -1,25 +0,0 @@ -// var Identify = require('./../src/identify') -var Swarm = require('./../src') -var Peer = require('ipfs-peer') -var Id = require('ipfs-peer-id') -var multiaddr = require('multiaddr') - -var a = new Swarm() -a.port = 4000 -// a.listen() -// var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)]) - -// attention, peerB Id isn't going to match, but whateves -var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')]) - -// var i = new Identify(a, peerA) -// i.on('thenews', function (news) { -// console.log('such news') -// }) - -a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) { - if (err) { - return console.log(err) - } - console.log('WoHoo, dialed a stream') -}) diff --git a/examples/peerB.js b/examples/peerB.js deleted file mode 100644 index 95ca3ff6..00000000 --- a/examples/peerB.js +++ /dev/null @@ -1,14 +0,0 @@ -var Swarm = require('./../src') - -var Peer = require('peer-info') -var Id = require('peer-id') -var multiaddr = require('multiaddr') -var tcp = require('libp2p-tcp') - -var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') -var p = new Peer(Id.create(), []) -var sw = new Swarm(p) - -sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () { - console.log('transport added') -}) diff --git a/package.json b/package.json index 8c133589..e2e81a6c 100644 --- a/package.json +++ b/package.json @@ -33,11 +33,11 @@ "chai": "^3.5.0", "istanbul": "^0.4.2", "libp2p-spdy": "^0.2.3", - "libp2p-tcp": "^0.2.1", + "libp2p-tcp": "^0.3.0", "mocha": "^2.4.5", "multiaddr": "^1.1.1", - "peer-id": "^0.5.3", - "peer-info": "^0.5.2", + "peer-id": "^0.6.0", + "peer-info": "^0.6.0", "pre-commit": "^1.1.2", "standard": "^6.0.7", "stream-pair": "^1.0.3" diff --git a/src/identify.js b/src/identify.js index 7bad7eb7..0ef04ae9 100644 --- a/src/identify.js +++ b/src/identify.js @@ -1,166 +1,93 @@ /* * Identify is one of the protocols swarms speaks in order to * broadcast and learn about the ip:port pairs a specific peer - * is available through + * is available through and to know when a new stream muxer is + * established, so a conn can be reused */ -// var multistream = require('multistream-select') -// var protobufs = require('protocol-buffers-stream') -// var fs = require('fs') -// var path = require('path') -// var protobufs = require('protocol-buffers-stream') -// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -// var Address6 = require('ip-address').Address6 -// var Id = require('peer-id') -// var multiaddr = require('multiaddr') +const multistream = require('multistream-select') +const fs = require('fs') +const path = require('path') +const pbStream = require('protocol-buffers-stream')( + fs.readFileSync(path.join(__dirname, 'identify.proto'))) +const Info = require('peer-info') +const Id = require('peer-id') +const multiaddr = require('multiaddr') exports = module.exports - exports.multicodec = '/ipfs/identify/1.0.0' -exports.exec = (muxedConn, callback) => { - // TODO +exports.exec = (rawConn, muxer, peerInfo, callback) => { // 1. open a stream // 2. multistream into identify - // 3. send what I see from this other peer + // 3. send what I see from this other peer (extract fro conn) // 4. receive what the other peer sees from me // 4. callback with (err, peerInfo) -} -exports.handler = (peerInfo) => { - return function (conn) { - // TODO - // 1. receive incoming observed info about me - // 2. send back what I see from the other - } -} + const conn = muxer.newStream() -/* -function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { - var msi = new Interactive() - msi.handle(conn, function () { - msi.select(protoId, function (err, ds) { + var msI = new multistream.Interactive() + msI.handle(conn, () => { + msI.select(exports.multicodec, (err, ds) => { if (err) { - return console.log(err) + return callback(err) } - var ps = createProtoStream() + var pbs = pbStream() - ps.on('identify', function (msg) { - var peerId = Id.createFromPubKey(msg.publicKey) + pbs.on('identify', (msg) => { + peerInfo.multiaddr.addSafe(msg.observedAddr) - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const otherPeerInfo = new Info(peerId) + msg.listenAddrs.forEach((ma) => { + otherPeerInfo.multiaddr.add(multiaddr(ma)) + }) - muxedConns[peerId.toB58String()] = { - muxer: muxer, - socket: socket - } - - var mh = getMultiaddr(socket) - - ps.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer - }), - observedAddr: mh.buffer + callback(null, otherPeerInfo) }) - ps.pipe(ds).pipe(ps) - ps.finalize() + const obsMultiaddr = rawConn.getObservedAddrs()[0] + + pbs.identify({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map((mh) => { return mh.buffer }), + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null + }) + + pbs.pipe(ds).pipe(pbs) + pbs.finalize() }) }) } -exports.getHandlerFunction = function (peerInfoSelf, muxedConns) { +exports.handler = (peerInfo, swarm) => { return function (conn) { - // wait for the other peer to identify itself - // update our multiaddr with observed addr list - // then get the socket from our list of muxedConns and send the reply back + // 1. receive incoming observed info about me + // 2. update my own information (on peerInfo) + // 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs() + var pbs = pbStream() - var ps = createProtoStream() + pbs.on('identify', function (msg) { + peerInfo.multiaddr.addSafe(msg.observedAddr) - ps.on('identify', function (msg) { - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const conn = swarm.muxedConns[peerId.toB58String()].conn + const obsMultiaddr = conn.getObservedAddrs()[0] - var peerId = Id.createFromPubKey(msg.publicKey) - - var socket = muxedConns[peerId.toB58String()].socket - - var mh = getMultiaddr(socket) - - ps.identify({ + pbs.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map(function (ma) { + return ma.buffer }), - observedAddr: mh.buffer + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null }) - - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) { - // return multiaddr(mhb) - // }) - // }) - - ps.finalize() + pbs.finalize() }) - ps.pipe(conn).pipe(ps) + pbs.pipe(conn).pipe(pbs) } } - -function getMultiaddr (socket) { - var mh - if (socket.remoteFamily === 'IPv6') { - var addr = new Address6(socket.remoteAddress) - if (addr.v4) { - var ip4 = addr.to4().correctForm() - mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort) - } else { - mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - } else { - mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - return mh -} - -function updateSelf (peerSelf, observedAddr) { - var omh = multiaddr(observedAddr) - - if (!peerSelf.previousObservedAddrs) { - peerSelf.previousObservedAddrs = [] - } - - for (var i = 0; i < peerSelf.previousObservedAddrs.length; i++) { - if (peerSelf.previousObservedAddrs[i].toString() === omh.toString()) { - peerSelf.previousObservedAddrs.splice(i, 1) - addToSelf() - return - } - } - - peerSelf.previousObservedAddrs.push(omh) - - function addToSelf () { - var isIn = false - peerSelf.multiaddrs.forEach(function (mh) { - if (mh.toString() === omh.toString()) { - isIn = true - } - }) - - if (!isIn) { - peerSelf.multiaddrs.push(omh) - } - } -}*/ diff --git a/src/index.js b/src/index.js index 4e3f3f6d..34427f55 100644 --- a/src/index.js +++ b/src/index.js @@ -42,7 +42,7 @@ function Swarm (peerInfo) { multiaddrs = [multiaddrs] } - // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) + // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) // b) if multiaddrs.length = 1, return the conn from the // transport, otherwise, create a passthrough @@ -116,7 +116,7 @@ function Swarm (peerInfo) { // { // peerIdB58: { // muxer: - // rawSocket: socket // to abstract info required for the Identify Protocol + // conn: // to extract info required for the Identify Protocol // } // } this.muxedConns = {} @@ -136,12 +136,19 @@ function Swarm (peerInfo) { // for listening this.handle(muxer.multicodec, (conn) => { const muxedConn = muxer(conn, true) - muxedConn.on('stream', connHandler) + muxedConn.on('stream', (conn) => { + connHandler(conn) + }) + // if identify is enabled, attempt to do it for muxer reuse if (this.identify) { - identify.exec(muxedConn, (err, pi) => { - if (err) {} - // TODO muxedConns[pi.id.toB58String()].muxer = muxedConn + identify.exec(conn, muxedConn, peerInfo, (err, pi) => { + if (err) { + return console.log('Identify exec failed', err) + } + this.muxedConns[pi.id.toB58String()] = {} + this.muxedConns[pi.id.toB58String()].muxer = muxedConn + this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs }) } }) @@ -151,18 +158,19 @@ function Swarm (peerInfo) { this.identify = false this.connection.reuse = () => { this.identify = true - this.handle(identify.multicodec, identify.handler(peerInfo)) + this.handle(identify.multicodec, identify.handler(peerInfo, this)) } - const self = this // couldn't get rid of this + const self = this // prefered this to bind // incomming connection handler function connHandler (conn) { var msS = new multistream.Select() - msS.handle(conn) - Object.keys(self.protocols).forEach(function (protocol) { + Object.keys(self.protocols).forEach((protocol) => { + if (!protocol) { return } msS.addHandler(protocol, self.protocols[protocol]) }) + msS.handle(conn) } // higher level (public) API @@ -258,11 +266,17 @@ function Swarm (peerInfo) { } else { nextMuxer(muxers.shift()) } + return } const muxedConn = self.muxers[key](conn, false) self.muxedConns[b58Id] = {} self.muxedConns[b58Id].muxer = muxedConn + self.muxedConns[b58Id].conn = conn + + // in case identify is on + muxedConn.on('stream', connHandler) + cb(null, muxedConn) }) }) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 06e11612..13c28894 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -303,6 +303,10 @@ describe('stream muxing (on TCP)', function () { peerB = new Peer() peerC = new Peer() + // console.log('peer A', peerA.id.toB58String()) + // console.log('peer B', peerB.id.toB58String()) + // console.log('peer C', peerC.id.toB58String()) + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) @@ -386,7 +390,7 @@ describe('stream muxing (on TCP)', function () { }) }) - it.skip('enable identify to reuse incomming muxed conn', (done) => { + it('enable identify to reuse incomming muxed conn', (done) => { swarmA.connection.reuse() swarmC.connection.reuse() @@ -395,7 +399,8 @@ describe('stream muxing (on TCP)', function () { setTimeout(() => { expect(Object.keys(swarmC.muxedConns).length).to.equal(1) expect(Object.keys(swarmA.muxedConns).length).to.equal(2) - }, 100) + done() + }, 500) }) }) })