diff --git a/README.md b/README.md index dc4200c1..9a7b56b8 100644 --- a/README.md +++ b/README.md @@ -13,81 +13,140 @@ libp2p-swarm is used by libp2p but it can be also used as a standalone module. # Usage -### Install and create a Swarm +## Install libp2p-swarm is available on npm and so, like any other npm module, just: ```bash -$ npm install libp2p-swarm --save +> npm install libp2p-swarm --save ``` +## API + +#### Create a libp2p Swarm + And use it in your Node.js code as: ```JavaScript -var Swarm = require('libp2p-swarm') +const Swarm = require('libp2p-swarm') -var sw = new Swarm(peerInfoSelf) +const sw = new Swarm(peerInfo) ``` -peerInfoSelf is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. +peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. -### Support a transport +### Transports -libp2p-swarm expects transports that implement [abstract-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/js-libp2p-tcp), a simple shim on top of the `net` module to make it work with swarm expectations. +##### `swarm.transport.add(key, transport, options, callback)` -```JavaScript -sw.addTransport(transport, [options, dialOptions, listenOptions]) -``` +libp2p-swarm expects transports that implement [interface-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/js-libp2p-tcp). -### Add a connection upgrade +- `key` - the transport identifier +- `transport` - +- `options` +- `callback` -A connection upgrade must be able to receive and return something that implements the [abstract-connection](https://github.com/diasdavid/abstract-connection) interface. +##### `swarm.transport.dial(key, multiaddrs, callback)` -```JavaScript -sw.addUpgrade(connUpgrade, [options]) -``` +Dial to a peer on a specific transport. -Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer](https://github.com/diasdavid/abstract-stream-muxer) interface. +- `key` +- `multiaddrs` +- `callback` -```JavaScript -sw.addStreamMuxer(streamMuxer, [options]) -``` +##### `swarm.transport.listen(key, options, handler, callback)` -### Dial to another peer +Set a transport to start listening mode. -```JavaScript -sw.dial(PeerInfo, options, protocol, callback) -sw.dial(PeerInfo, options, callback) -``` +- `key` +- `options` +- `handler` +- `callback` + +##### `swarm.transport.close(key, callback)` + +Close the listeners of a given transport. + +- `key` +- `callback` + +### Connection + +##### `swarm.connection.addUpgrade()` + +A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/diasdavid/interface-connection) specification. + +> **WIP** + +##### `swarm.connection.addStreamMuxer(muxer)` + +Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/diasdavid/interface-stream-muxer) spec. + +- `muxer` + +##### `swarm.connection.reuse()` + +Enable the identify protocol + +### `swarm.dial(pi, protocol, callback)` dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing. -### Accept requests on a specific protocol +- `pi` - peer info project +- `protocol` +- `callback` -```JavaScript -sw.handleProtocol(protocol, handlerFunction) -``` +### `swarm.handle(protocol, handler)` -### Cleaning up before exiting +handle a new protocol. -Each time you add a transport or dial you create connections. Be sure to close -them up before exiting. To do so you can: +- `protocol` +- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}` -Close a transport listener: +### `swarm.close(callback)` -```js -sw.closeListener(transportName, callback) -sw.closeAllListeners(callback) -``` +close all the listeners and muxers. -Close all open connections +- `callback` -```js -sw.closeConns(callback) -``` +# Design -Close everything! +## Multitransport -```js -sw.close(callback) -``` +libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through. + +In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec. + +## Connection upgrades + +Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports. + +We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it. + +Types of upgrades to a connection: + +- encrypted channel (with TLS for e.g) +- congestion flow (some transports don't have it by default) +- multipath (open several connections and abstract it as a single connection) +- simulcast (still really thinking this one through, it might be interesting to send a packet through different connections under some hard network circumstances) +- stream-muxer - this a special case, because once we upgrade a connection to a stream-muxer, we can open more streams (multiplex them) on a single stream, also enabling us to reuse the underlying dialed transport + +We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol) + +## Identify + +Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g: + +- a) peer A dials a conn to peer B +- b) that conn gets upgraded to a stream multiplexer that both peers agree +- c) peer B executes de identify protocol +- d) peer B now can open streams to peer A, knowing which is the identity of peer A + +In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies. + +## Notes + +To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as: + +- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex +- transport - something that as a dial/listen interface and return objs that implement a connection interface diff --git a/examples/peerA.js b/examples/peerA.js deleted file mode 100644 index 9bba4793..00000000 --- a/examples/peerA.js +++ /dev/null @@ -1,25 +0,0 @@ -// var Identify = require('./../src/identify') -var Swarm = require('./../src') -var Peer = require('ipfs-peer') -var Id = require('ipfs-peer-id') -var multiaddr = require('multiaddr') - -var a = new Swarm() -a.port = 4000 -// a.listen() -// var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)]) - -// attention, peerB Id isn't going to match, but whateves -var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')]) - -// var i = new Identify(a, peerA) -// i.on('thenews', function (news) { -// console.log('such news') -// }) - -a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) { - if (err) { - return console.log(err) - } - console.log('WoHoo, dialed a stream') -}) diff --git a/examples/peerB.js b/examples/peerB.js deleted file mode 100644 index 95ca3ff6..00000000 --- a/examples/peerB.js +++ /dev/null @@ -1,14 +0,0 @@ -var Swarm = require('./../src') - -var Peer = require('peer-info') -var Id = require('peer-id') -var multiaddr = require('multiaddr') -var tcp = require('libp2p-tcp') - -var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') -var p = new Peer(Id.create(), []) -var sw = new Swarm(p) - -sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () { - console.log('transport added') -}) diff --git a/package.json b/package.json index 180bb5bc..e2e81a6c 100644 --- a/package.json +++ b/package.json @@ -26,28 +26,26 @@ "test" ], "engines": { - "node": "^4.0.0" + "node": "^4.3.0" }, "devDependencies": { + "bl": "^1.1.2", "chai": "^3.5.0", "istanbul": "^0.4.2", - "libp2p-spdy": "^0.1.0", - "libp2p-tcp": "^0.1.1", + "libp2p-spdy": "^0.2.3", + "libp2p-tcp": "^0.3.0", "mocha": "^2.4.5", + "multiaddr": "^1.1.1", + "peer-id": "^0.6.0", + "peer-info": "^0.6.0", "pre-commit": "^1.1.2", - "sinon": "^1.15.4", "standard": "^6.0.7", "stream-pair": "^1.0.3" }, "dependencies": { "async": "^1.3.0", "ip-address": "^5.0.2", - "ipfs-logger": "^0.1.0", - "multiaddr": "^1.0.0", - "multiplex-stream-muxer": "^0.2.0", "multistream-select": "^0.6.1", - "peer-id": "^0.3.3", - "peer-info": "^0.3.2", "protocol-buffers-stream": "^1.2.0" } } diff --git a/src/identify.js b/src/identify.js index bf78cd89..0ef04ae9 100644 --- a/src/identify.js +++ b/src/identify.js @@ -1,159 +1,93 @@ /* - * Identify is one of the protocols swarms speaks in order to broadcast and learn - * about the ip:port pairs a specific peer is available through + * Identify is one of the protocols swarms speaks in order to + * broadcast and learn about the ip:port pairs a specific peer + * is available through and to know when a new stream muxer is + * established, so a conn can be reused */ -var Interactive = require('multistream-select').Interactive -var protobufs = require('protocol-buffers-stream') -var fs = require('fs') -var path = require('path') -var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -var Address6 = require('ip-address').Address6 -var Id = require('peer-id') -var multiaddr = require('multiaddr') +const multistream = require('multistream-select') +const fs = require('fs') +const path = require('path') +const pbStream = require('protocol-buffers-stream')( + fs.readFileSync(path.join(__dirname, 'identify.proto'))) +const Info = require('peer-info') +const Id = require('peer-id') +const multiaddr = require('multiaddr') -exports = module.exports = identify +exports = module.exports +exports.multicodec = '/ipfs/identify/1.0.0' -var protoId = '/ipfs/identify/1.0.0' +exports.exec = (rawConn, muxer, peerInfo, callback) => { + // 1. open a stream + // 2. multistream into identify + // 3. send what I see from this other peer (extract fro conn) + // 4. receive what the other peer sees from me + // 4. callback with (err, peerInfo) -exports.protoId = protoId -var createProtoStream = protobufs(schema) + const conn = muxer.newStream() -function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { - var msi = new Interactive() - msi.handle(conn, function () { - msi.select(protoId, function (err, ds) { + var msI = new multistream.Interactive() + msI.handle(conn, () => { + msI.select(exports.multicodec, (err, ds) => { if (err) { - return console.log(err) // TODO Treat error + return callback(err) } - var ps = createProtoStream() + var pbs = pbStream() - ps.on('identify', function (msg) { - var peerId = Id.createFromPubKey(msg.publicKey) + pbs.on('identify', (msg) => { + peerInfo.multiaddr.addSafe(msg.observedAddr) - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const otherPeerInfo = new Info(peerId) + msg.listenAddrs.forEach((ma) => { + otherPeerInfo.multiaddr.add(multiaddr(ma)) + }) - muxedConns[peerId.toB58String()] = { - muxer: muxer, - socket: socket - } - - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - // }) + callback(null, otherPeerInfo) }) - var mh = getMultiaddr(socket) + const obsMultiaddr = rawConn.getObservedAddrs()[0] - ps.identify({ + pbs.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer - }), - observedAddr: mh.buffer + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map((mh) => { return mh.buffer }), + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null }) - ps.pipe(ds).pipe(ps) - ps.finalize() + pbs.pipe(ds).pipe(pbs) + pbs.finalize() }) }) } -exports.getHandlerFunction = function (peerInfoSelf, muxedConns) { +exports.handler = (peerInfo, swarm) => { return function (conn) { - // wait for the other peer to identify itself - // update our multiaddr with observed addr list - // then get the socket from our list of muxedConns and send the reply back + // 1. receive incoming observed info about me + // 2. update my own information (on peerInfo) + // 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs() + var pbs = pbStream() - var ps = createProtoStream() + pbs.on('identify', function (msg) { + peerInfo.multiaddr.addSafe(msg.observedAddr) - ps.on('identify', function (msg) { - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const conn = swarm.muxedConns[peerId.toB58String()].conn + const obsMultiaddr = conn.getObservedAddrs()[0] - var peerId = Id.createFromPubKey(msg.publicKey) - - var socket = muxedConns[peerId.toB58String()].socket - - var mh = getMultiaddr(socket) - - ps.identify({ + pbs.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map(function (ma) { + return ma.buffer }), - observedAddr: mh.buffer + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null }) - - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) { - // return multiaddr(mhb) - // }) - // }) - - ps.finalize() + pbs.finalize() }) - ps.pipe(conn).pipe(ps) - } -} - -function getMultiaddr (socket) { - var mh - if (socket.remoteFamily === 'IPv6') { - var addr = new Address6(socket.remoteAddress) - if (addr.v4) { - var ip4 = addr.to4().correctForm() - mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort) - } else { - mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - } else { - mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - return mh -} - -function updateSelf (peerSelf, observedAddr) { - var omh = multiaddr(observedAddr) - - if (!peerSelf.previousObservedAddrs) { - peerSelf.previousObservedAddrs = [] - } - - for (var i = 0; i < peerSelf.previousObservedAddrs.length; i++) { - if (peerSelf.previousObservedAddrs[i].toString() === omh.toString()) { - peerSelf.previousObservedAddrs.splice(i, 1) - addToSelf() - return - } - } - - peerSelf.previousObservedAddrs.push(omh) - - function addToSelf () { - var isIn = false - peerSelf.multiaddrs.forEach(function (mh) { - if (mh.toString() === omh.toString()) { - isIn = true - } - }) - - if (!isIn) { - peerSelf.multiaddrs.push(omh) - } + pbs.pipe(conn).pipe(pbs) } } diff --git a/src/index.js b/src/index.js index 54c25426..34427f55 100644 --- a/src/index.js +++ b/src/index.js @@ -1,343 +1,324 @@ -var multistream = require('multistream-select') -var async = require('async') -var identify = require('./identify') +const multistream = require('multistream-select') +// const async = require('async') +const identify = require('./identify') +const PassThrough = require('stream').PassThrough exports = module.exports = Swarm function Swarm (peerInfo) { - var self = this - - if (!(self instanceof Swarm)) { - throw new Error('Swarm must be called with new') + if (!(this instanceof Swarm)) { + return new Swarm(peerInfo) } if (!peerInfo) { throw new Error('You must provide a value for `peerInfo`') } - self.peerInfo = peerInfo + // transports -- - // peerIdB58: { conn: } - self.conns = {} + // { key: transport }; e.g { tcp: } + this.transports = {} - // peerIdB58: { - // muxer: , - // socket: socket // so we can extract the info we need for identify - // } - self.muxedConns = {} + this.transport = {} - // transportName: { transport: transport, - // dialOptions: dialOptions, - // listenOptions: listenOptions, - // listeners: [] } - self.transports = {} - - // transportName: listener - self.listeners = {} - - // protocolName: handlerFunc - self.protocols = {} - - // muxerName: { Muxer: Muxer // Muxer is a constructor - // options: options } - self.muxers = {} - - // for connection reuse - self.identify = false - - // public interface - - self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { - // set up the transport and add the list of incoming streams - // add transport to the list of transports - - var multiaddr = options.multiaddr - if (multiaddr) { - // no need to pass that to the transports - delete options.multiaddr + this.transport.add = (key, transport, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} } + if (!callback) { callback = noop } - var listener = transport.createListener(options, listen) - - listener.listen(listenOptions, function ready () { - self.transports[name] = { - transport: transport, - options: options, - dialOptions: dialOptions, - listenOptions: listenOptions, - listener: listener - } - - // If a known multiaddr is passed, then add to our list of multiaddrs - if (multiaddr) { - self.peerInfo.multiaddrs.push(multiaddr) - } - - callback() - }) - } - - self.addUpgrade = function (ConnUpgrade, options) {} - - self.addStreamMuxer = function (name, StreamMuxer, options) { - self.muxers[name] = { - Muxer: StreamMuxer, - options: options + if (this.transports[key]) { + throw new Error('There is already a transport with this key') } - } - - self.dial = function (peerInfo, options, protocol, callback) { - // 1. check if we have transports we support - // 2. check if we have a conn waiting - // 3. check if we have a stream muxer available - - if (typeof protocol === 'function') { - callback = protocol - protocol = undefined - } - - // check if a conn is waiting - // if it is and protocol was selected, jump into multistreamHandshake - // if it is and no protocol was selected, do nothing and call and empty callback - - if (self.conns[peerInfo.id.toB58String()]) { - if (protocol) { - if (self.muxers['spdy']) { - // TODO upgrade this conn to a muxer - console.log('TODO: upgrade a warm conn to muxer that was added after') - } else { - multistreamHandshake(self.conns[peerInfo.id.toB58String()]) - } - self.conns[peerInfo.id.toB58String()] = undefined - delete self.conns[peerInfo.id.toB58String()] - return - } else { - return callback() - } - } - - // check if a stream muxer for this peer is available - if (self.muxedConns[peerInfo.id.toB58String()]) { - if (protocol) { - return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer) - } else { - return callback() - } - } - - // Creating a new conn with this peer routine - - // TODO - check if there is a preference in protocol to use on - // options.protocol - var supportedTransports = Object.keys(self.transports) - var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) { - return multiaddr.protoNames().some(function (proto) { - return supportedTransports.indexOf(proto) >= 0 - }) - }) - - if (!multiaddrs.length) { - callback(new Error("The swarm doesn't support any of the peer transports")) - return - } - - var conn - - async.eachSeries(multiaddrs, function (multiaddr, next) { - if (conn) { - return next() - } - - var transportName = getTransportNameForMultiaddr(multiaddr) - var transport = self.transports[transportName] - var dialOptions = clone(transport.dialOptions) - dialOptions.ready = connected - - var connTry = transport.transport.dial(multiaddr, dialOptions) - - connTry.once('error', function (err) { - if (err) { - return console.log(err) // TODO handle error - } - next() // try next multiaddr - }) - - function connected () { - conn = connTry - next() - } - - function getTransportNameForMultiaddr (multiaddr) { - // this works for all those ip + transport + port tripplets - return multiaddr.protoNames()[1] - } - - function clone (obj) { - var target = {} - for (var i in obj) { - if (obj.hasOwnProperty(i)) { - target[i] = obj[i] - } - } - return target - } - }, done) - - function done () { - // TODO apply upgrades - // apply stream muxer - // if no protocol is selected, save it in the pool - // if protocol is selected, multistream that protocol - if (!conn) { - callback(new Error('Unable to open a connection')) - return - } - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, false, function (err, muxer) { - if (err) { - return console.log(err) // TODO Treat error - } - - muxer.on('stream', userProtocolMuxer) - - self.muxedConns[peerInfo.id.toB58String()] = { - muxer: muxer, - socket: conn - } - - if (protocol) { - openMuxedStream(muxer) - } else { - callback() - } - }) - } else { - if (protocol) { - multistreamHandshake(conn) - } else { - self.conns[peerInfo.id.toB58String()] = conn - callback() - } - } - } - - function openMuxedStream (muxer) { - // 1. create a new stream on this muxedConn and pass that to - // multistreamHanshake - muxer.dialStream(function (err, conn) { - if (err) { - return console.log(err) // TODO Treat error - } - multistreamHandshake(conn) - }) - } - - function multistreamHandshake (conn) { - var msI = new multistream.Interactive() - msI.handle(conn, function () { - msI.select(protocol, callback) - }) - } - } - - self.closeListener = function (transportName, callback) { - self.transports[transportName].listener.close(closed) - - // only gets called when all the streams on this transport are closed too - function closed () { - delete self.transports[transportName] - callback() - } - } - - // Iterates all the listeners closing them - // one by one. It calls back once all are closed. - self.closeAllListeners = function (callback) { - var transportNames = Object.keys(self.transports) - - async.each(transportNames, self.closeListener, callback) - } - - self.closeConns = function (callback) { - // close warmed up cons so that the listener can gracefully exit - Object.keys(self.conns).forEach(function (conn) { - self.conns[conn].destroy() - }) - self.conns = {} - + this.transports[key] = transport callback() } - // Closes both transport listeners and - // connections. It calls back once everything - // is closed - self.close = function (callback) { - async.parallel([ - self.closeAllListeners, - self.closeConns - ], callback) - } + this.transport.dial = (key, multiaddrs, callback) => { + const t = this.transports[key] - self.enableIdentify = function () { - // set flag to true - // add identify to the list of handled protocols - self.identify = true + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } - // we pass muxedConns so that identify can access the socket of the other - // peer - self.handleProtocol(identify.protoId, - identify.getHandlerFunction(self.peerInfo, self.muxedConns)) - } + // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) - self.handleProtocol = function (protocol, handlerFunction) { - self.protocols[protocol] = handlerFunction - } - - // internals - - function listen (conn) { - // TODO apply upgrades - // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) - - if (self.muxers['spdy']) { - var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) - spdy.attach(conn, true, function (err, muxer) { - if (err) { - return console.log(err) // TODO treat error - } - - // TODO This muxer has to be identified! - // pass to identify a reference of - // our muxedConn list - // ourselves (peerInfo) - // the conn, which is the socket - // and a stream it can send stuff - if (self.identify) { - muxer.dialStream(function (err, stream) { - if (err) { - return console.log(err) // TODO Treat error - } - // conn === socket at this point - identify(self.muxedConns, self.peerInfo, conn, stream, muxer) - }) - } - - muxer.on('stream', userProtocolMuxer) + // b) if multiaddrs.length = 1, return the conn from the + // transport, otherwise, create a passthrough + if (multiaddrs.length === 1) { + const conn = t.dial(multiaddrs.shift(), {ready: () => { + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, conn) + }}) + conn.once('error', () => { + callback(new Error('failed to connect to every multiaddr')) + }) + return conn + } + + // c) multiaddrs should already be a filtered list + // specific for the transport we are using + const pt = new PassThrough() + + next(multiaddrs.shift()) + return pt + function next (multiaddr) { + const conn = t.dial(multiaddr, {ready: () => { + pt.pipe(conn).pipe(pt) + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, pt) + }}) + + conn.once('error', () => { + if (multiaddrs.length === 0) { + return callback(new Error('failed to connect to every multiaddr')) + } + next(multiaddrs.shift()) }) - } else { - // if no stream muxer, then - userProtocolMuxer(conn) } } - // Handle user given protocols - function userProtocolMuxer (conn) { + this.transport.listen = (key, options, handler, callback) => { + // if no callback is passed, we pass conns to connHandler + if (!handler) { handler = connHandler } + + const multiaddrs = peerInfo.multiaddrs.filter((m) => { + if (m.toString().indexOf('tcp') !== -1) { + return m + } + }) + + this.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => { + if (err) { + return callback(err) + } + if (maUpdate) { + // because we can listen on port 0... + peerInfo.multiaddr.replace(multiaddrs, maUpdate) + } + + callback() + }) + } + + this.transport.close = (key, callback) => { + this.transports[key].close(callback) + } + + // connections -- + + // { peerIdB58: { conn: }} + this.conns = {} + + // { + // peerIdB58: { + // muxer: + // conn: // to extract info required for the Identify Protocol + // } + // } + this.muxedConns = {} + + // { protocol: handler } + this.protocols = {} + + this.connection = {} + this.connection.addUpgrade = () => {} + + // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } + this.muxers = {} + this.connection.addStreamMuxer = (muxer) => { + // for dialing + this.muxers[muxer.multicodec] = muxer + + // for listening + this.handle(muxer.multicodec, (conn) => { + const muxedConn = muxer(conn, true) + muxedConn.on('stream', (conn) => { + connHandler(conn) + }) + + // if identify is enabled, attempt to do it for muxer reuse + if (this.identify) { + identify.exec(conn, muxedConn, peerInfo, (err, pi) => { + if (err) { + return console.log('Identify exec failed', err) + } + this.muxedConns[pi.id.toB58String()] = {} + this.muxedConns[pi.id.toB58String()].muxer = muxedConn + this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs + }) + } + }) + } + + // enable the Identify protocol + this.identify = false + this.connection.reuse = () => { + this.identify = true + this.handle(identify.multicodec, identify.handler(peerInfo, this)) + } + + const self = this // prefered this to bind + + // incomming connection handler + function connHandler (conn) { var msS = new multistream.Select() - msS.handle(conn) - Object.keys(self.protocols).forEach(function (protocol) { + Object.keys(self.protocols).forEach((protocol) => { + if (!protocol) { return } msS.addHandler(protocol, self.protocols[protocol]) }) + msS.handle(conn) + } + + // higher level (public) API + this.dial = (pi, protocol, callback) => { + var pt = null + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } else { + pt = new PassThrough() + } + + const b58Id = pi.id.toB58String() + if (!this.muxedConns[b58Id]) { + if (!this.conns[b58Id]) { + attemptDial(pi, (err, conn) => { + if (err) { + return callback(err) + } + gotWarmedUpConn(conn) + }) + } else { + const conn = this.conns[b58Id] + this.conns[b58Id] = undefined + gotWarmedUpConn(conn) + } + } else { + gotMuxer(this.muxedConns[b58Id].muxer) + } + + function gotWarmedUpConn (conn) { + attemptMuxerUpgrade(conn, (err, muxer) => { + if (!protocol) { + if (err) { + self.conns[b58Id] = conn + } + return callback() + } + + if (err) { + // couldn't upgrade to Muxer, it is ok + protocolHandshake(conn, protocol, callback) + } else { + gotMuxer(muxer) + } + }) + } + + function gotMuxer (muxer) { + openConnInMuxedConn(muxer, (conn) => { + protocolHandshake(conn, protocol, callback) + }) + } + + function attemptDial (pi, cb) { + const tKeys = Object.keys(self.transports) + nextTransport(tKeys.shift()) + + function nextTransport (key) { + const multiaddrs = pi.multiaddrs.slice() + self.transport.dial(key, multiaddrs, (err, conn) => { + if (err) { + if (tKeys.length === 0) { + return cb(new Error('Could not dial in any of the transports')) + } + return nextTransport(tKeys.shift()) + } + cb(null, conn) + }) + } + } + + function attemptMuxerUpgrade (conn, cb) { + const muxers = Object.keys(self.muxers) + if (muxers.length === 0) { + return cb(new Error('no muxers available')) + } + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + + nextMuxer(muxers.shift()) + + function nextMuxer (key) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + cb(new Error('could not upgrade to stream muxing')) + } else { + nextMuxer(muxers.shift()) + } + return + } + + const muxedConn = self.muxers[key](conn, false) + self.muxedConns[b58Id] = {} + self.muxedConns[b58Id].muxer = muxedConn + self.muxedConns[b58Id].conn = conn + + // in case identify is on + muxedConn.on('stream', connHandler) + + cb(null, muxedConn) + }) + }) + } + } + function openConnInMuxedConn (muxer, cb) { + cb(muxer.newStream()) + } + + function protocolHandshake (conn, protocol, cb) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(protocol, (err, conn) => { + if (err) { + return callback(err) + } + pt.pipe(conn).pipe(pt) + callback(null, pt) + }) + }) + } + } + + this.handle = (protocol, handler) => { + this.protocols[protocol] = handler + } + + this.close = (callback) => { + var count = 0 + + Object.keys(this.muxedConns).forEach((key) => { + this.muxedConns[key].muxer.end() + }) + + Object.keys(this.transports).forEach((key) => { + this.transports[key].close(() => { + if (++count === Object.keys(this.transports).length) { + callback() + } + }) + }) } } + +function noop () {} diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 0acd7654..13c28894 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -1,354 +1,442 @@ /* eslint-env mocha */ -var async = require('async') -var expect = require('chai').expect +const expect = require('chai').expect +// const async = require('async') -var multiaddr = require('multiaddr') -var Id = require('peer-id') -var Peer = require('peer-info') -var Swarm = require('../src') -var tcp = require('libp2p-tcp') -var Spdy = require('libp2p-spdy') +const multiaddr = require('multiaddr') +// const Id = require('peer-id') +const Peer = require('peer-info') +const Swarm = require('../src') +const TCP = require('libp2p-tcp') +const bl = require('bl') +const spdy = require('libp2p-spdy') -// because of Travis-CI -process.on('uncaughtException', function (err) { - console.log('Caught exception: ' + err) -}) - -describe('Basics', function () { - it('enforces creation with new', function (done) { - expect(function () { - Swarm() - }).to.throw() - done() - }) - - it('it throws an exception without peerSelf', function (done) { - expect(function () { - var sw = new Swarm() - sw.close() - }).to.throw(Error) +describe('basics', () => { + it('throws on missing peerInfo', (done) => { + expect(Swarm).to.throw(Error) done() }) }) -describe('When dialing', function () { - describe('if the swarm does add any of the peer transports', function () { - it('it returns an error', function (done) { - var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')]) - var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')]) - var swarm = new Swarm(peerOne) +describe('transport - tcp', function () { + this.timeout(10000) - swarm.dial(peerTwo, {}, function (err) { - expect(err).to.exist - done() - }) - }) - }) -}) + var swarmA + var swarmB + var peerA = new Peer() + var peerB = new Peer() -describe('Without a Stream Muxer', function () { - describe('and one swarm over tcp', function () { - it('add the transport', function (done) { - var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - - sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready) - - function ready () { - expect(sw.transports['tcp'].options).to.deep.equal({}) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - - sw.close(done) - } - }) + before((done) => { + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999')) + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + done() }) - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - afterEach(function (done) { - async.parallel([sw1.close, sw2.close], done) - }) - - it('dial a conn', function (done) { - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - done() - }) - }) - - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) - }) - - it('dial a protocol on a previous created conn', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - conn.end() - conn.on('end', done) - }) - - sw1.dial(p2, {}, function (err) { - expect(err).to.equal(undefined) - expect(Object.keys(sw1.conns).length).to.equal(1) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - }) - }) - - // it('add an upgrade', function (done) { done() }) - // it('dial a conn on top of a upgrade', function (done) { done() }) - // it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) - - /* TODO - describe('udp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - - /* TODO - describe('udt', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ - -/* TODO -describe('utp', function () { - it('add the transport', function (done) { done() }) - it('dial a conn', function (done) { done() }) - it('dial a conn on a protocol', function (done) { done() }) - it('add an upgrade', function (done) { done() }) - it('dial a conn on top of a upgrade', function (done) { done() }) - it('dial a conn on a protocol on top of a upgrade', function (done) { done() }) -}) */ -}) - -describe('With a SPDY Stream Muxer', function () { - describe('and one swarm over tcp', function () { - // TODO: What is the it here? - it('add Stream Muxer', function (done) { - // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') - var p = new Peer(Id.create(), []) - var sw = new Swarm(p) - sw.addStreamMuxer('spdy', Spdy, {}) - + it('add', (done) => { + swarmA.transport.add('tcp', new TCP()) + expect(Object.keys(swarmA.transports).length).to.equal(1) + swarmB.transport.add('tcp', new TCP(), () => { + expect(Object.keys(swarmB.transports).length).to.equal(1) done() }) }) - describe('and two swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) - }) - - function afterEach (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) - - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) - - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } + it('listen', (done) => { + var count = 0 + swarmA.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + swarmB.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + function ready () { + if (++count === 2) { + expect(peerA.multiaddrs.length).to.equal(1) + expect(peerA.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9888')) + expect(peerB.multiaddrs.length).to.equal(1) + expect(peerB.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9999')) done() } } + }) - it('dial a conn on a protocol', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) + it('dial to a multiaddr', (done) => { + const conn = swarmA.transport.dial('tcp', multiaddr('/ip4/127.0.0.1/tcp/9999'), (err, conn) => { + expect(err).to.not.exist + }) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + done() + })) + conn.write('hey') + conn.end() + }) - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - afterEach(done) - }) - }) + it('dial to set of multiaddr, only one is available', (done) => { + const conn = swarmA.transport.dial('tcp', [ + multiaddr('/ip4/127.0.0.1/tcp/9910'), + multiaddr('/ip4/127.0.0.1/tcp/9999'), + multiaddr('/ip4/127.0.0.1/tcp/9309') + ], (err, conn) => { + expect(err).to.not.exist + }) + conn.pipe(bl((err, data) => { + expect(err).to.not.exist + done() + })) + conn.write('hey') + conn.end() + }) - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - conn.end() - }) + it('close', (done) => { + var count = 0 + swarmA.transport.close('tcp', closed) + swarmB.transport.close('tcp', closed) + + function closed () { + if (++count === 2) { + done() + } + } + }) + + it('support port 0', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/0')) + swarm.close(done) + } + }) + + it('support addr /ip4/0.0.0.0/tcp/9050', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/9050')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/9050')) + swarm.close(done) + } + }) + + it('support addr /ip4/0.0.0.0/tcp/0', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/0')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(1) + expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0')) + swarm.close(done) + } + }) + + it('listen in several addrs', (done) => { + var swarm + var peer = new Peer() + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) + swarm = new Swarm(peer) + swarm.transport.add('tcp', new TCP()) + swarm.transport.listen('tcp', {}, (conn) => { + conn.pipe(conn) + }, ready) + + function ready () { + expect(peer.multiaddrs.length).to.equal(3) + swarm.close(done) + } + }) +}) + +describe('transport - udt', function () { + this.timeout(10000) + + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('listen', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('close', (done) => {}) +}) + +describe('transport - websockets', function () { + this.timeout(10000) + + before((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('listen', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('close', (done) => {}) +}) + +describe('high level API - 1st without stream multiplexing (on TCP)', function () { + this.timeout(20000) + + var swarmA + var peerA + var swarmB + var peerB + + before((done) => { + peerA = new Peer() + peerB = new Peer() + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 2) { + done() + } + } + }) + + after((done) => { + var counter = 0 + + swarmA.close(closed) + swarmB.close(closed) + + function closed () { + if (++counter === 2) { + done() + } + } + }) + + it('handle a protocol', (done) => { + swarmB.handle('/bananas/1.0.0', (conn) => { + conn.pipe(conn) + }) + expect(Object.keys(swarmB.protocols).length).to.equal(1) + done() + }) + + it('dial on protocol', (done) => { + swarmB.handle('/pineapple/1.0.0', (conn) => { + conn.pipe(conn) }) - it('dial two conns (transport reuse)', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formality so that the conn starts flowing - conn.on('data', function (chunk) {}) - - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) - expect(Object.keys(sw2.muxedConns).length).to.equal(0) - - afterEach(done) - }) - }) - - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - // TODO Improve clarity - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) - - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) - - conn.end() - }) + swarmA.dial(peerB, '/pineapple/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) }) }) - describe('and two identity enabled swarms over tcp', function () { - var mh1, p1, sw1, mh2, p2, sw2 - - beforeEach(function (done) { - mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') - p1 = new Peer(Id.create(), []) - sw1 = new Swarm(p1) - sw1.addStreamMuxer('spdy', Spdy, {}) - sw1.enableIdentify() - - mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') - p2 = new Peer(Id.create(), []) - sw2 = new Swarm(p2) - sw2.addStreamMuxer('spdy', Spdy, {}) - sw2.enableIdentify() - - async.parallel([ - function (cb) { - sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb) - }, - function (cb) { - sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb) - } - ], done) + it('dial to warm a conn', (done) => { + swarmA.dial(peerB, (err) => { + expect(err).to.not.exist + done() }) + }) - afterEach(function (done) { - var cleaningCounter = 0 - sw1.closeConns(cleaningUp) - sw2.closeConns(cleaningUp) + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.dial(peerB, '/bananas/1.0.0', (err, conn) => { + expect(err).to.not.exist + conn.end() + conn.on('end', done) + }) + }) +}) - sw1.closeListener('tcp', cleaningUp) - sw2.closeListener('tcp', cleaningUp) +describe('stream muxing (on TCP)', function () { + this.timeout(20000) - function cleaningUp () { - cleaningCounter++ - // TODO FIX: here should be 4, but because super wrapping of - // streams, it makes it so hard to properly close the muxed - // streams - https://github.com/indutny/spdy-transport/issues/14 - if (cleaningCounter < 3) { - return - } - // give time for identify to finish - setTimeout(function () { - expect(Object.keys(sw2.muxedConns).length).to.equal(1) + describe('multiplex', () => { + before((done) => { done() }) + after((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('handle + dial on protocol', (done) => {}) + it.skip('dial to warm conn', (done) => {}) + it.skip('dial on protocol, reuse warmed conn', (done) => {}) + it.skip('enable identify to reuse incomming muxed conn', (done) => {}) + }) + describe('spdy', () => { + var swarmA + var peerA + var swarmB + var peerB + var swarmC + var peerC + + before((done) => { + peerA = new Peer() + peerB = new Peer() + peerC = new Peer() + + // console.log('peer A', peerA.id.toB58String()) + // console.log('peer B', peerB.id.toB58String()) + // console.log('peer C', peerC.id.toB58String()) + + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) + peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) + + swarmA = new Swarm(peerA) + swarmB = new Swarm(peerB) + swarmC = new Swarm(peerC) + + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.listen('tcp', {}, null, ready) + + swarmB.transport.add('tcp', new TCP()) + swarmB.transport.listen('tcp', {}, null, ready) + + swarmC.transport.add('tcp', new TCP()) + swarmC.transport.listen('tcp', {}, null, ready) + + var counter = 0 + + function ready () { + if (++counter === 3) { done() - }, 500) + } } }) - it('identify', function (done) { - sw2.handleProtocol('/sparkles/1.0.0', function (conn) { - // formallity so that the conn starts flowing - conn.on('data', function (chunk) {}) + after((done) => { + var counter = 0 - conn.end() - conn.on('end', function () { - expect(Object.keys(sw1.muxedConns).length).to.equal(1) + swarmA.close(closed) + swarmB.close(closed) + swarmC.close(closed) + + function closed () { + if (++counter === 3) { done() - }) + } + } + }) + + it('add', (done) => { + swarmA.connection.addStreamMuxer(spdy) + swarmB.connection.addStreamMuxer(spdy) + swarmC.connection.addStreamMuxer(spdy) + done() + }) + + it('handle + dial on protocol', (done) => { + swarmB.handle('/abacaxi/1.0.0', (conn) => { + conn.pipe(conn) }) - sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { - conn.on('data', function () {}) - expect(err).to.equal(null) - expect(Object.keys(sw1.conns).length).to.equal(0) + swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmA.muxedConns).length).to.equal(1) conn.end() + conn.on('end', done) + }) + }) + + it('dial to warm conn', (done) => { + swarmB.dial(peerA, (err) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + done() + }) + }) + + it('dial on protocol, reuse warmed conn', (done) => { + swarmA.handle('/papaia/1.0.0', (conn) => { + conn.pipe(conn) + }) + + swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => { + expect(err).to.not.exist + expect(Object.keys(swarmB.conns).length).to.equal(0) + expect(Object.keys(swarmB.muxedConns).length).to.equal(1) + conn.end() + conn.on('end', done) + }) + }) + + it('enable identify to reuse incomming muxed conn', (done) => { + swarmA.connection.reuse() + swarmC.connection.reuse() + + swarmC.dial(peerA, (err) => { + expect(err).to.not.exist + setTimeout(() => { + expect(Object.keys(swarmC.muxedConns).length).to.equal(1) + expect(Object.keys(swarmA.muxedConns).length).to.equal(2) + done() + }, 500) }) }) }) }) + +/* +describe('conn upgrades', function () { + this.timeout(20000) + + describe('secio on tcp', () => { + // before((done) => { done() }) + // after((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('tls on a muxed stream (not the full conn)', (done) => {}) + }) + describe('tls on tcp', () => { + // before((done) => { done() }) + // after((done) => { done() }) + + it.skip('add', (done) => {}) + it.skip('dial', (done) => {}) + it.skip('tls on a muxed stream (not the full conn)', (done) => {}) + }) +}) + +describe('high level API - with everything mixed all together!', function () { + this.timeout(20000) + + // before((done) => { done() }) + // after((done) => { done() }) + + it.skip('add tcp', (done) => {}) + it.skip('add utp', (done) => {}) + it.skip('add websockets', (done) => {}) + it.skip('dial', (done) => {}) +}) +*/