diff --git a/README.md b/README.md index 7a87626e..7c21bc1d 100644 --- a/README.md +++ b/README.md @@ -67,3 +67,27 @@ dial uses the best transport (whatever works first, in the future we can have so ```JavaScript sw.handleProtocol(protocol, handlerFunction) ``` + +### Cleaning up before exiting + +Each time you add a transport or dial you create connections. Be sure to close +them up before exiting. To do so you can: + +Close a transport listener: + +```js +sw.closeListener(transportName, callback) +sw.closeAllListeners(callback) +``` + +Close all open connections + +```js +sw.closeConns(callback) +``` + +Close everything! + +```js +sw.close(callback) +``` diff --git a/src/swarm.js b/src/swarm.js index 2fa4f5d4..fdeedc2b 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -47,6 +47,12 @@ function Swarm (peerInfo) { // set up the transport and add the list of incoming streams // add transport to the list of transports + var multiaddr = options.multiaddr + if (multiaddr) { + // no need to pass that to the transports + delete options.multiaddr + } + var listener = transport.createListener(options, listen) listener.listen(listenOptions, function ready () { @@ -59,8 +65,8 @@ function Swarm (peerInfo) { } // If a known multiaddr is passed, then add to our list of multiaddrs - if (options.multiaddr) { - self.peerInfo.multiaddrs.push(options.multiaddr) + if (multiaddr) { + self.peerInfo.multiaddrs.push(multiaddr) } callback() @@ -236,6 +242,14 @@ function Swarm (peerInfo) { } } + // Iterates all the listeners closing them + // one by one. It calls back once all are closed. + self.closeAllListeners = function (callback) { + var transportNames = Object.keys(self.transports) + + async.each(transportNames, self.closeListener, callback) + } + self.closeConns = function (callback) { // close warmed up cons so that the listener can gracefully exit Object.keys(self.conns).forEach(function (conn) { @@ -246,8 +260,14 @@ function Swarm (peerInfo) { callback() } + // Closes both transport listeners and + // connections. It calls back once everything + // is closed self.close = function (callback) { - // close everything + async.parallel([ + self.closeAllListeners, + self.closeConns + ], callback) } self.enableIdentify = function () { diff --git a/tests/swarm-test.js b/tests/swarm-test.js index bf5baeaa..96708469 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -1,9 +1,12 @@ var Lab = require('lab') var Code = require('code') var lab = exports.lab = Lab.script() +var async = require('async') var experiment = lab.experiment var test = lab.test +var beforeEach = lab.beforeEach +var afterEach = lab.afterEach var expect = Code.expect var multiaddr = require('multiaddr') @@ -25,164 +28,89 @@ process.on('uncaughtException', function (err) { }) experiment('Without a Stream Muxer', function () { - experiment('tcp', function () { + experiment('and one swarm over tcp', function () { test('add the transport', function (done) { var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') var p = new Peer(Id.create(), []) var sw = new Swarm(p) - sw.addTransport('tcp', tcp, - { multiaddr: mh }, {}, {port: 8010}, function () { - expect(sw.transports['tcp'].options).to.deep.equal({ multiaddr: mh }) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - sw.closeListener('tcp', function () { - done() - }) - }) + sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) + + function ready () { + expect(sw.transports['tcp'].options).to.deep.equal({}) + expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) + expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) + expect(sw.transports['tcp'].transport).to.deep.equal(tcp) + + sw.close(done) + } + }) + }) + + experiment('and two swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + afterEach(function (done) { + async.parallel([sw1.close, sw2.close], done) }) test('dial a conn', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - - var readyCounter = 0 - - function ready () { - readyCounter++ - if (readyCounter < 2) { - return - } - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - if (cleaningCounter < 4) { - return - } - - done() - } - }) - } + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) + done() + }) }) test('dial a conn on a protocol', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { conn.end() - conn.on('end', function () { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - if (cleaningCounter < 4) { - return - } - - done() - } - }) + conn.on('end', done) }) - var count = 0 + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + }) - function ready () { - count++ - if (count < 2) { - return - } + test('dial a protocol on a previous created conn', function (done) { + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + conn.end() + conn.on('end', done) + }) + + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { expect(err).to.equal(null) expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() }) - } - }) - - test('dial a protocol on a previous created conn', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - - var readyCounter = 0 - - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', function () { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - if (cleaningCounter < 4) { - return - } - - done() - } - }) }) - - function ready () { - readyCounter++ - if (readyCounter < 2) { - return - } - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - } }) // test('add an upgrade', function (done) { done() }) @@ -222,7 +150,8 @@ experiment('utp', function () { }) experiment('With a SPDY Stream Muxer', function () { - experiment('tcp', function () { + experiment('and one swarm over tcp', function () { + // TODO: What is the test here? test('add Stream Muxer', function (done) { // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') var p = new Peer(Id.create(), []) @@ -231,19 +160,54 @@ experiment('With a SPDY Stream Muxer', function () { done() }) + }) + + experiment('and two swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + sw1.addStreamMuxer('spdy', Spdy, {}) + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + sw2.addStreamMuxer('spdy', Spdy, {}) + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + function afterEach (done) { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + + done() + } + } test('dial a conn on a protocol', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - sw1.addStreamMuxer('spdy', Spdy, {}) - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - sw2.addStreamMuxer('spdy', Spdy, {}) sw2.handleProtocol('/sparkles/1.0.0', function (conn) { // formallity so that the conn starts flowing @@ -253,127 +217,102 @@ experiment('With a SPDY Stream Muxer', function () { conn.on('end', function () { expect(Object.keys(sw1.muxedConns).length).to.equal(1) expect(Object.keys(sw2.muxedConns).length).to.equal(0) - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - - done() - } + afterEach(done) }) }) - var count = 0 - - function ready () { - count++ - if (count < 2) { - return - } - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - } + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) }) + test('dial two conns (transport reuse)', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - sw1.addStreamMuxer('spdy', Spdy, {}) - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing + // formality so that the conn starts flowing conn.on('data', function (chunk) {}) conn.end() conn.on('end', function () { expect(Object.keys(sw1.muxedConns).length).to.equal(1) expect(Object.keys(sw2.muxedConns).length).to.equal(0) - conn.end() - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - - done() - } + afterEach(done) }) }) - var count = 0 - - function ready () { - count++ - if (count < 2) { - return - } - + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + // TODO Improve clarity sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - // TODO Improve clarity - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - conn.on('data', function () {}) expect(err).to.equal(null) expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() }) + + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + + conn.end() + }) + }) + }) + + experiment('and two identity enabled swarms over tcp', function () { + var mh1, p1, sw1, mh2, p2, sw2 + + beforeEach(function (done) { + mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + p1 = new Peer(Id.create(), []) + sw1 = new Swarm(p1) + sw1.addStreamMuxer('spdy', Spdy, {}) + sw1.enableIdentify() + + mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + p2 = new Peer(Id.create(), []) + sw2 = new Swarm(p2) + sw2.addStreamMuxer('spdy', Spdy, {}) + sw2.enableIdentify() + + async.parallel([ + function (cb) { + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) + }, + function (cb) { + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) + } + ], done) + }) + + afterEach(function (done) { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + // give time for identify to finish + setTimeout(function () { + expect(Object.keys(sw2.muxedConns).length).to.equal(1) + done() + }, 500) } }) test('identify', function (done) { - var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p1 = new Peer(Id.create(), []) - var sw1 = new Swarm(p1) - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) - sw1.addStreamMuxer('spdy', Spdy, {}) - sw1.enableIdentify() - - var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - var p2 = new Peer(Id.create(), []) - var sw2 = new Swarm(p2) - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.enableIdentify() - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { // formallity so that the conn starts flowing conn.on('data', function (chunk) {}) @@ -381,46 +320,16 @@ experiment('With a SPDY Stream Muxer', function () { conn.end() conn.on('end', function () { expect(Object.keys(sw1.muxedConns).length).to.equal(1) - - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - // give time for identify to finish - setTimeout(function () { - expect(Object.keys(sw2.muxedConns).length).to.equal(1) - done() - }, 500) - } + done() }) }) - var count = 0 - - function ready () { - count++ - if (count < 2) { - return - } - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - } + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) }) }) })