From 9d8ee67c61d2c57b6a68cd04fabc8e4b8afdfeea Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 6 Mar 2016 08:40:49 +0000 Subject: [PATCH] high level API working + tests --- package.json | 2 +- src/index.js | 495 ++++++++++++-------------------------------- tests/swarm-test.js | 115 ++++++++-- 3 files changed, 238 insertions(+), 374 deletions(-) diff --git a/package.json b/package.json index f6987119..70983e97 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "libp2p-tcp": "^0.2.1", "mocha": "^2.4.5", "multiaddr": "^1.1.1", - "peer-id": "^0.5.1", + "peer-id": "^0.5.3", "peer-info": "^0.5.2", "pre-commit": "^1.1.2", "standard": "^6.0.7", diff --git a/src/index.js b/src/index.js index f933418c..a4658713 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,4 @@ -// const multistream = require('multistream-select') +const multistream = require('multistream-select') // const async = require('async') // const identify = require('./identify') const PassThrough = require('stream').PassThrough @@ -38,11 +38,14 @@ function Swarm (peerInfo) { this.transport.dial = (key, multiaddrs, callback) => { const t = this.transports[key] - // a) if multiaddrs.length = 1, return the conn from the - // transport, otherwise, create a passthrough if (!Array.isArray(multiaddrs)) { multiaddrs = [multiaddrs] } + + // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) + + // b) if multiaddrs.length = 1, return the conn from the + // transport, otherwise, create a passthrough if (multiaddrs.length === 1) { const conn = t.dial(multiaddrs.shift(), {ready: () => { const cb = callback @@ -55,7 +58,7 @@ function Swarm (peerInfo) { return conn } - // b) multiaddrs should already be a filtered list + // c) multiaddrs should already be a filtered list // specific for the transport we are using const pt = new PassThrough() @@ -112,28 +115,143 @@ function Swarm (peerInfo) { // { // peerIdB58: { - // muxerName: { - // muxer: - // rawSocket: socket // to abstract info required for the Identify Protocol - // } + // muxer: + // rawSocket: socket // to abstract info required for the Identify Protocol // } + // } this.muxedConns = {} + // { protocol: handler } + this.protocols = {} + this.connection = {} this.connection.addUpgrade = () => {} - this.connection.addStreamMuxer = () => {} + + // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } + this.muxers = {} + this.connection.addStreamMuxer = (muxer) => { + // TODO + // .handle(protocol, () => { + // after attaching the stream muxer, check if identify is enabled + // }) + // TODO add to the list of muxers available + } // enable the Identify protocol - this.connection.reuse = () => {} - - // main API - higher level abstractions -- - - this.dial = () => { - // TODO + this.connection.reuse = () => { + // TODO identify } - this.handle = (protocol, callback) => { - // TODO + + const self = this // couldn't get rid of this + + // incomming connection handler + function connHandler (conn) { + var msS = new multistream.Select() + msS.handle(conn) + Object.keys(self.protocols).forEach(function (protocol) { + msS.addHandler(protocol, self.protocols[protocol]) + }) } + + // higher level (public) API + this.dial = (pi, protocol, callback) => { + var pt = null + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } else { + pt = new PassThrough() + } + + const b58Id = pi.id.toB58String() + if (!this.muxedConns[b58Id]) { + if (!this.conns[b58Id]) { + attemptDial(pi, (err, conn) => { + if (err) { + return callback(err) + } + gotWarmedUpConn(conn) + }) + } else { + const conn = this.conns[b58Id] + this.conns[b58Id] = undefined + gotWarmedUpConn(conn) + } + } else { + gotMuxer(this.muxedConns[b58Id].muxer) + } + + function gotWarmedUpConn (conn) { + attemptMuxerUpgrade(conn, (err, muxer) => { + if (!protocol) { + if (err) { + self.conns[b58Id] = conn + } + return callback() + } + + if (err) { + // couldn't upgrade to Muxer, it is ok + protocolHandshake(conn, protocol, callback) + } else { + gotMuxer(muxer) + } + }) + } + + function gotMuxer (muxer) { + openConnInMuxedConn(muxer, (conn) => { + protocolHandshake(conn, protocol, callback) + }) + } + + function attemptDial (pi, cb) { + const tKeys = Object.keys(self.transports) + nextTransport(tKeys.shift()) + + function nextTransport (key) { + const multiaddrs = pi.multiaddrs.slice() + self.transport.dial(key, multiaddrs, (err, conn) => { + if (err) { + if (tKeys.length === 0) { + return cb(new Error('Could not dial in any of the transports')) + } + return nextTransport(tKeys.shift()) + } + cb(null, conn) + }) + } + } + + function attemptMuxerUpgrade (conn, cb) { + if (Object.keys(self.muxers).length === 0) { + return cb(new Error('no muxers available')) + } + // TODO add muxer to the muxedConns object for the peerId + // TODO if it succeeds, add incomming open coons to connHandler + } + function openConnInMuxedConn (muxer, cb) { + // TODO open a conn in this muxer + } + + function protocolHandshake (conn, protocol, cb) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(protocol, (err, conn) => { + if (err) { + return callback(err) + } + pt.pipe(conn).pipe(pt) + callback(null, pt) + }) + }) + } + } + + this.handle = (protocol, handler) => { + this.protocols[protocol] = handler + } + this.close = (callback) => { var count = 0 @@ -145,349 +263,6 @@ function Swarm (peerInfo) { }) }) } - - function connHandler (conn) { - // do all the multistream select handshakes (this should be probably done recursively - } } -/* -function Swarm (peerInfo) { - var self = this - if (!(this instanceof Swarm)) { - return new Swarm(peerInfo) - } - - if (!peerInfo) { - throw new Error('You must provide a value for `peerInfo`') - } - - self.peerInfo = peerInfo - - // peerIdB58: { conn: } - self.conns = {} - - // peerIdB58: { - // muxer: , - // socket: socket // so we can extract the info we need for identify - // } - self.muxedConns = {} - - // transportName: { transport: transport, - // dialOptions: dialOptions, - // listenOptions: listenOptions, - // listeners: [] } - self.transports = {} - - // transportName: listener - self.listeners = {} - - // protocolName: handlerFunc - self.protocols = {} - - // muxerName: { Muxer: Muxer // Muxer is a constructor - // options: options } - self.muxers = {} - - // for connection reuse - self.identify = false - - // public interface - - self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { - // set up the transport and add the list of incoming streams - // add transport to the list of transports - - var multiaddr = options.multiaddr - if (multiaddr) { - // no need to pass that to the transports - delete options.multiaddr - } - - var listener = transport.createListener(options, listen) - - listener.listen(listenOptions, function ready () { - self.transports[name] = { - transport: transport, - options: options, - dialOptions: dialOptions, - listenOptions: listenOptions, - listener: listener - } - - // If a known multiaddr is passed, then add to our list of multiaddrs - if (multiaddr) { - self.peerInfo.multiaddrs.push(multiaddr) - } - - callback() - }) - } - - self.addUpgrade = function (ConnUpgrade, options) {} - - self.addStreamMuxer = function (name, StreamMuxer, options) { - self.muxers[name] = { - Muxer: StreamMuxer, - options: options - } - } - - self.dial = function (peerInfo, options, protocol, callback) { - // 1. check if we have transports we support - // 2. check if we have a conn waiting - // 3. check if we have a stream muxer available - - if (typeof protocol === 'function') { - callback = protocol - protocol = undefined - } - - // check if a conn is waiting - // if it is and protocol was selected, jump into multistreamHandshake - // if it is and no protocol was selected, do nothing and call and empty callback - - if (self.conns[peerInfo.id.toB58String()]) { - if (protocol) { - if (self.muxers['spdy']) { - // TODO upgrade this conn to a muxer - console.log('TODO: upgrade a warm conn to muxer that was added after') - } else { - multistreamHandshake(self.conns[peerInfo.id.toB58String()]) - } - self.conns[peerInfo.id.toB58String()] = undefined - delete self.conns[peerInfo.id.toB58String()] - return - } else { - return callback() - } - } - - // check if a stream muxer for this peer is available - if (self.muxedConns[peerInfo.id.toB58String()]) { - if (protocol) { - return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer) - } else { - return callback() - } - } - - // Creating a new conn with this peer routine - - // TODO - check if there is a preference in protocol to use on - // options.protocol - var supportedTransports = Object.keys(self.transports) - var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) { - return multiaddr.protoNames().some(function (proto) { - return supportedTransports.indexOf(proto) >= 0 - }) - }) - - if (!multiaddrs.length) { - callback(new Error("The swarm doesn't support any of the peer transports")) - return - } - - var conn - - async.eachSeries(multiaddrs, function (multiaddr, next) { - if (conn) { - return next() - } - - var transportName = getTransportNameForMultiaddr(multiaddr) - var transport = self.transports[transportName] - var dialOptions = clone(transport.dialOptions) - dialOptions.ready = connected - - var connTry = transport.transport.dial(multiaddr, dialOptions) - - connTry.once('error', function (err) { - if (err) { - return console.log(err) // TODO handle error - } - next() // try next multiaddr - }) - - function connected () { - conn = connTry - next() - } - - function getTransportNameForMultiaddr (multiaddr) { - // this works for all those ip + transport + port tripplets - return multiaddr.protoNames()[1] - } - - function clone (obj) { - var target = {} - for (var i in obj) { - if (obj.hasOwnProperty(i)) { - target[i] = obj[i] - } - } - return target - } - }, done) - - function done () { - // TODO apply upgrades - // apply stream muxer - // if no protocol is selected, save it in the pool - // if protocol is selected, multistream that protocol - if (!conn) { - callback(new Error('Unable to open a connection')) - return - } - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, false, function (err, muxer) { - if (err) { - return console.log(err) // TODO Treat error - } - - muxer.on('stream', userProtocolMuxer) - - self.muxedConns[peerInfo.id.toB58String()] = { - muxer: muxer, - socket: conn - } - - if (protocol) { - openMuxedStream(muxer) - } else { - callback() - } - }) - } else { - if (protocol) { - multistreamHandshake(conn) - } else { - self.conns[peerInfo.id.toB58String()] = conn - callback() - } - } - } - - function openMuxedStream (muxer) { - // 1. create a new stream on this muxedConn and pass that to - // multistreamHanshake - muxer.dialStream(function (err, conn) { - if (err) { - return console.log(err) // TODO Treat error - } - multistreamHandshake(conn) - }) - } - - function multistreamHandshake (conn) { - var msI = new multistream.Interactive() - msI.handle(conn, function () { - msI.select(protocol, callback) - }) - } - } - - self.closeListener = function (transportName, callback) { - self.transports[transportName].listener.close(closed) - - // only gets called when all the streams on this transport are closed too - function closed () { - delete self.transports[transportName] - callback() - } - } - - // Iterates all the listeners closing them - // one by one. It calls back once all are closed. - self.closeAllListeners = function (callback) { - var transportNames = Object.keys(self.transports) - - async.each(transportNames, self.closeListener, callback) - } - - self.closeConns = function (callback) { - // close warmed up cons so that the listener can gracefully exit - Object.keys(self.conns).forEach(function (conn) { - self.conns[conn].destroy() - }) - self.conns = {} - - callback() - } - - // Closes both transport listeners and - // connections. It calls back once everything - // is closed - self.close = function (callback) { - async.parallel([ - self.closeAllListeners, - self.closeConns - ], callback) - } - - self.enableIdentify = function () { - // set flag to true - // add identify to the list of handled protocols - self.identify = true - - // we pass muxedConns so that identify can access the socket of the other - // peer - self.handleProtocol(identify.protoId, - identify.getHandlerFunction(self.peerInfo, self.muxedConns)) - } - - self.handleProtocol = function (protocol, handlerFunction) { - self.protocols[protocol] = handlerFunction - } - - // internals - - function listen (conn) { - // TODO apply upgrades - // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, true, function (err, muxer) { - if (err) { - return console.log(err) // TODO treat error - } - - // TODO This muxer has to be identified! - // pass to identify a reference of - // our muxedConn list - // ourselves (peerInfo) - // the conn, which is the socket - // and a stream it can send stuff - if (self.identify) { - muxer.dialStream(function (err, stream) { - if (err) { - return console.log(err) // TODO Treat error - } - // conn === socket at this point - identify(self.muxedConns, self.peerInfo, conn, stream, muxer) - }) - } - - muxer.on('stream', userProtocolMuxer) - }) - } else { - // if no stream muxer, then - userProtocolMuxer(conn) - } - } - - // Handle user given protocols - function userProtocolMuxer (conn) { - var msS = new multistream.Select() - msS.handle(conn) - Object.keys(self.protocols).forEach(function (protocol) { - msS.addHandler(protocol, self.protocols[protocol]) - }) - } -} -*/ - function noop () {} diff --git a/tests/swarm-test.js b/tests/swarm-test.js index a9ecfb26..02e752cd 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -173,7 +173,9 @@ describe('transport - tcp', function () { }) }) -describe('transport - udt', () => { +describe('transport - udt', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add', (done) => {}) @@ -182,7 +184,9 @@ describe('transport - udt', () => { it.skip('close', (done) => {}) }) -describe('transport - websockets', () => { +describe('transport - websockets', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add', (done) => {}) @@ -191,37 +195,119 @@ describe('transport - websockets', () => { it.skip('close', (done) => {}) }) -describe('high level API - 1st stage, without stream multiplexing (on TCP)', () => { - it.skip('handle a protocol', (done) => {}) - it.skip('dial on protocol', (done) => {}) - it.skip('dial to warm conn', (done) => {}) - it.skip('dial on protocol, reuse warmed conn', (done) => {}) - it.skip('close', (done) => {}) +describe('high level API - 1st without stream multiplexing (on TCP)', function () { + this.timeout(10000) + + var swarmA + var peerA + var swarmB + var peerB + + before((done) => { + peerA = new Peer() + peerB = new Peer() + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 2) { + done() + } + } + }) + + after((done) => { + var counter = 0 + + swarmA.close(closed) + swarmB.close(closed) + + function closed () { + if (++counter === 2) { + done() + } + } + }) + + it('handle a protocol', (done) => { + swarmB.handle('/bananas/1.0.0', (conn) => { + conn.pipe(conn) + }) + expect(Object.keys(swarmB.protocols).length).to.equal(1) + done() + }) + + it('dial on protocol', (done) => { + swarmB.handle('/pineapple/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmA.dial(peerB, '/pineapple/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) + }) + }) + + it('dial to warm a conn', (done) => { + swarmA.dial(peerB, (err) => { + expect(err).to.not.exist + done() + }) + }) + + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.dial(peerB, '/bananas/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) + }) + }) }) -describe('stream muxing (on TCP)', () => { +describe('stream muxing (on TCP)', function () { + this.timeout(10000) + describe('multiplex', () => { before((done) => { done() }) + after((done) => { done() }) + it.skip('add', (done) => {}) it.skip('handle + dial on protocol', (done) => {}) it.skip('dial to warm conn', (done) => {}) it.skip('dial on protocol, reuse warmed conn', (done) => {}) it.skip('enable identify to reuse incomming muxed conn', (done) => {}) - it.skip('close', (done) => {}) }) describe('spdy', () => { + before((done) => { done() }) + after((done) => { done() }) + it.skip('add', (done) => {}) it.skip('handle + dial on protocol', (done) => {}) it.skip('dial to warm conn', (done) => {}) it.skip('dial on protocol, reuse warmed conn', (done) => {}) it.skip('enable identify to reuse incomming muxed conn', (done) => {}) - it.skip('close', (done) => {}) }) }) -describe('conn upgrades', () => { +describe('conn upgrades', function () { + this.timeout(10000) + describe('secio on tcp', () => { before((done) => { done() }) + after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -229,6 +315,7 @@ describe('conn upgrades', () => { }) describe('tls on tcp', () => { before((done) => { done() }) + after((done) => { done() }) it.skip('add', (done) => {}) it.skip('dial', (done) => {}) @@ -236,7 +323,9 @@ describe('conn upgrades', () => { }) }) -describe('high level API = 2nd stage, with everything all together!', () => { +describe('high level API - with everything mixed all together!', function () { + this.timeout(10000) + before((done) => { done() }) it.skip('add tcp', (done) => {})