Merge pull request #10 from diasdavid/revisit

Revisit Swarm - multitransport + upgrades - https://github.com/diasdavid/node-ipfs-swarm/issues/8
This commit is contained in:
David Dias 2015-09-23 23:09:27 +01:00
commit 4d9d8c94c7
10 changed files with 802 additions and 517 deletions

View File

@ -1,82 +1,69 @@
ipfs-swarm Node.js implementation libp2p-swarm Node.js implementation
================================= =================================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![Build Status](https://img.shields.io/travis/diasdavid/node-ipfs-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/node-ipfs-swarm) [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![Build Status](https://img.shields.io/travis/diasdavid/node-ipfs-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/node-ipfs-swarm)
> IPFS swarm implementation in Node.js > libp2p swarm implementation in Node.js
# Description # Description
ipfs-swarm is an abstraction for the network layer on IPFS. It offers an API to open streams between peers on a specific protocol. libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
Ref spec (WIP) - https://github.com/diasdavid/specs/blob/protocol-spec/protocol/layers.md#network-layer libp2p-swarm is used by libp2p but it can be also used as a standalone module.
# Usage # Usage
### Create a new Swarm ### Install and create a Swarm
```javascript libp2p-swarm is available on npm and so, like any other npm module, just:
var Swarm = require('ipfs-swarm')
var s = new Swarm([port]) // `port` defalts to 4001 ```bash
$ npm install libp2p-swarm --save
``` ```
### Set the swarm to listen for incoming streams And use it in your Node.js code as:
```javascript ```JavaScript
s.listen([port], [callback]) // `port` defaults to 4001, `callback` gets called when the socket starts listening var Swarm = require('libp2p-swarm')
var sw = new Swarm(peerInfoSelf)
``` ```
### Close the listener/socket and every open stream that was multiplexed on it peerInfoSelf is a [PeerInfo](https://github.com/diasdavid/node-peer-info) object that represents the peer creating this swarm instance.
```javascript ### Support a transport
s.closeListener()
libp2p-swarm expects transports that implement [abstract-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/node-libp2p-tcp), a simple shim on top of the `net` module to make it work with swarm expectations.
```JavaScript
sw.addTransport(transport, [options, dialOptions, listenOptions])
``` ```
### Register a protocol to be handled by an incoming stream ### Add a connection upgrade
```javascript A connection upgrade must be able to receive and return something that implements the [abstract-connection](https://github.com/diasdavid/abstract-connection) interface.
s.registerHandler('/name/protocol/you/want/version', function (stream) {})
```JavaScript
sw.addUpgrade(connUpgrade, [options])
``` ```
### Open a new connection Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer](https://github.com/diasdavid/abstract-stream-muxer) interface.
Used when we want to make sure we can connect to a given peer, but do not intend to establish a stream with any of the services offered right away. ```JavaScript
sw.addStreamMuxer(streamMuxer, [options])
```
s.openConnection(peerConnection, function (err) {})
``` ```
### Dial to another peer
### Dial a new stream ```JavaScript
sw.dial(PeerInfo, options, protocol)
``` sw.dial(PeerInfo, options)
s.openStream(peerInfo, protocol, function (err, stream) {})
``` ```
peerInfo must be a [`ipfs-peer`](https://www.npmjs.com/package/ipfs-peer) object, contaning both peer-id and multiaddrs. dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing.
## Events emitted ### Accept requests on a specific protocol
```JavaScript
sw.handleProtocol(protocol, handlerFunction)
``` ```
.on('error')
.on('connection')
.on('connection-unknown') // used by Identify to start the Identify protocol from listener to dialer
```
## Identify protocol
The Identify protocol is an integral part to Swarm. It enables peers to share observedAddrs, identities and other possible address available. This enables us to do better NAT traversal.
To instantiate Identify:
```
var Identify = require('ipfs-swarm/identify')
var i = new Identify(swarmInstance, peerSelf)
```
`swarmInstance` must be an Instance of swarm and `peerSelf` must be a instance of `ipfs-peer` that represents the peer that instantiated this Identify
Identify emits a `peer-update` event each time it receives information from another peer.

14
examples/peerB.js Normal file
View File

@ -0,0 +1,14 @@
var Swarm = require('./../src')
var Peer = require('peer-info')
var Id = require('peer-id')
var multiaddr = require('multiaddr')
var tcp = require('libp2p-tcp')
var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () {
console.log('transport added')
})

View File

@ -1,28 +0,0 @@
// var Identify = require('./../src/identify')
var Swarm = require('./../src')
// var Peer = require('ipfs-peer')
// var Id = require('ipfs-peer-id')
// var multiaddr = require('multiaddr')
var b = new Swarm()
b.port = 4001
// var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)])
// var i = new Identify(b, peerB)
// i.on('thenews', function (news) {
// console.log('such news')
// })
b.on('error', function (err) {
console.log(err)
})
b.listen()
b.registerHandler('/ipfs/sparkles/1.2.3', function (stream) {
// if (err) {
// return console.log(err)
// }
console.log('woop got a stream')
})

View File

@ -1,14 +1,12 @@
{ {
"name": "ipfs-swarm", "name": "libp2p-swarm",
"version": "0.4.1", "version": "0.4.1",
"description": "IPFS swarm implementation in Node.js", "description": "IPFS swarm implementation in Node.js",
"main": "src/index.js", "main": "src/index.js",
"scripts": { "scripts": {
"test": "./node_modules/.bin/lab tests/*-test.js", "test": "./node_modules/.bin/lab tests/*-test.js",
"coverage": "./node_modules/.bin/lab -t 100 tests/*-test.js", "coverage": "./node_modules/.bin/lab -t 88 tests/*-test.js",
"codestyle": "./node_modules/.bin/standard --format", "lint": "./node_modules/.bin/standard"
"lint": "./node_modules/.bin/standard",
"validate": "npm ls"
}, },
"repository": { "repository": {
"type": "git", "type": "git",
@ -24,8 +22,9 @@
}, },
"homepage": "https://github.com/diasdavid/node-ipfs-swarm", "homepage": "https://github.com/diasdavid/node-ipfs-swarm",
"pre-commit": [ "pre-commit": [
"codestyle", "lint",
"test" "test",
"coverage"
], ],
"engines": { "engines": {
"node": "^4.0.0" "node": "^4.0.0"
@ -33,6 +32,8 @@
"devDependencies": { "devDependencies": {
"code": "^1.4.1", "code": "^1.4.1",
"lab": "^5.13.0", "lab": "^5.13.0",
"libp2p-spdy": "^0.1.0",
"libp2p-tcp": "^0.1.1",
"precommit-hook": "^3.0.0", "precommit-hook": "^3.0.0",
"sinon": "^1.15.4", "sinon": "^1.15.4",
"standard": "^4.5.2", "standard": "^4.5.2",
@ -42,11 +43,11 @@
"async": "^1.3.0", "async": "^1.3.0",
"ip-address": "^4.0.0", "ip-address": "^4.0.0",
"ipfs-logger": "^0.1.0", "ipfs-logger": "^0.1.0",
"ipfs-peer": "^0.3.0",
"ipfs-peer-id": "^0.3.0",
"multiaddr": "^1.0.0", "multiaddr": "^1.0.0",
"multiplex-stream-muxer": "^0.2.0", "multiplex-stream-muxer": "^0.2.0",
"multistream-select": "^0.6.1", "multistream-select": "^0.6.1",
"peer-id": "^0.3.3",
"peer-info": "^0.3.2",
"protocol-buffers-stream": "^1.2.0", "protocol-buffers-stream": "^1.2.0",
"spdy-stream-muxer": "^0.6.0" "spdy-stream-muxer": "^0.6.0"
} }

View File

@ -4,75 +4,48 @@
*/ */
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
var EventEmmiter = require('events').EventEmitter
var util = require('util')
var protobufs = require('protocol-buffers-stream') var protobufs = require('protocol-buffers-stream')
var fs = require('fs') var fs = require('fs')
var schema = fs.readFileSync(__dirname + '/identify.proto') var schema = fs.readFileSync(__dirname + '/identify.proto')
var v6 = require('ip-address').v6 var v6 = require('ip-address').v6
var Id = require('ipfs-peer-id') var Id = require('peer-id')
var multiaddr = require('multiaddr') var multiaddr = require('multiaddr')
exports = module.exports = Identify exports = module.exports = identify
util.inherits(Identify, EventEmmiter) var protoId = '/ipfs/identify/1.0.0'
function Identify (swarm, peerSelf) { exports.protoId = protoId
var self = this var createProtoStream = protobufs(schema)
self.createProtoStream = protobufs(schema)
swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
var ps = self.createProtoStream()
ps.on('identify', function (msg) {
updateSelf(peerSelf, msg.observedAddr)
var peerId = Id.createFromPubKey(msg.publicKey)
var socket = swarm.connections[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
observedAddr: mh.buffer
})
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
ps.finalize()
})
ps.pipe(stream).pipe(ps)
})
swarm.on('connection-unknown', function (conn, socket) {
conn.dialStream(function (err, stream) {
if (err) { return console.log(err) }
var msi = new Interactive() var msi = new Interactive()
msi.handle(stream, function () { msi.handle(conn, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) { msi.select(protoId, function (err, ds) {
if (err) { return console.log(err) } if (err) {
return console.log(err) // TODO Treat error
}
var ps = self.createProtoStream() var ps = createProtoStream()
ps.on('identify', function (msg) { ps.on('identify', function (msg) {
var peerId = Id.createFromPubKey(msg.publicKey) var peerId = Id.createFromPubKey(msg.publicKey)
updateSelf(peerSelf, msg.observedAddr) updateSelf(peerInfoSelf, msg.observedAddr)
swarm.connections[peerId.toB58String()] = { muxedConns[peerId.toB58String()] = {
conn: conn, muxer: muxer,
socket: socket socket: socket
} }
self.emit('peer-update', { // TODO: Pass the new discovered info about the peer that contacted us
peerId: peerId, // to something like the Kademlia Router, so the peerInfo for this peer
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) // is fresh
}) // - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
// })
}) })
var mh = getMultiaddr(socket) var mh = getMultiaddr(socket)
@ -80,8 +53,10 @@ function Identify (swarm, peerSelf) {
ps.identify({ ps.identify({
protocolVersion: 'na', protocolVersion: 'na',
agentVersion: 'na', agentVersion: 'na',
publicKey: peerSelf.id.pubKey, publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
}),
observedAddr: mh.buffer observedAddr: mh.buffer
}) })
@ -89,8 +64,50 @@ function Identify (swarm, peerSelf) {
ps.finalize() ps.finalize()
}) })
}) })
}
exports.getHandlerFunction = function (peerInfoSelf, muxedConns) {
return function (conn) {
// wait for the other peer to identify itself
// update our multiaddr with observed addr list
// then get the socket from our list of muxedConns and send the reply back
var ps = createProtoStream()
ps.on('identify', function (msg) {
updateSelf(peerInfoSelf, msg.observedAddr)
var peerId = Id.createFromPubKey(msg.publicKey)
var socket = muxedConns[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
}),
observedAddr: mh.buffer
}) })
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {
// return multiaddr(mhb)
// })
// })
ps.finalize()
}) })
ps.pipe(conn).pipe(ps)
}
} }
function getMultiaddr (socket) { function getMultiaddr (socket) {

View File

@ -1,2 +0,0 @@
exports = module.exports = require('spdy-stream-muxer')
// exports = module.exports = require('multiplex-stream-muxer')

View File

@ -1,189 +1,313 @@
var tcp = require('net') var multistream = require('multistream-select')
var Select = require('multistream-select').Select
var Interactive = require('multistream-select').Interactive
var Muxer = require('./stream-muxer')
var log = require('ipfs-logger').group('swarm')
var async = require('async') var async = require('async')
var EventEmitter = require('events').EventEmitter var identify = require('./identify')
var util = require('util')
exports = module.exports = Swarm exports = module.exports = Swarm
util.inherits(Swarm, EventEmitter) function Swarm (peerInfo) {
function Swarm () {
var self = this var self = this
if (!(self instanceof Swarm)) { if (!(self instanceof Swarm)) {
throw new Error('Swarm must be called with new') throw new Error('Swarm must be called with new')
} }
self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 self.peerInfo = peerInfo
self.connections = {} // {peerIdB58: {conn: <>, socket: <>}
self.handles = {}
// set the listener // peerIdB58: { conn: <conn> }
self.conns = {}
self.listen = function (port, ready) { // peerIdB58: {
if (!ready) { // muxer: <muxer>,
ready = function noop () {} // socket: socket // so we can extract the info we need for identify
// }
self.muxedConns = {}
// transportName: { transport: transport,
// dialOptions: dialOptions,
// listenOptions: listenOptions,
// listeners: [] }
self.transports = {}
// transportName: listener
self.listeners = {}
// protocolName: handlerFunc
self.protocols = {}
// muxerName: { Muxer: Muxer // Muxer is a constructor
// options: options }
self.muxers = {}
// for connection reuse
self.identify = false
// public interface
self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) {
// set up the transport and add the list of incoming streams
// add transport to the list of transports
var listener = transport.createListener(options, listen)
listener.listen(listenOptions, function ready () {
self.transports[name] = {
transport: transport,
options: options,
dialOptions: dialOptions,
listenOptions: listenOptions,
listener: listener
} }
if (typeof port === 'function') {
ready = port
} else if (port) { self.port = port }
// // If a known multiaddr is passed, then add to our list of multiaddrs
if (options.multiaddr) {
self.peerInfo.multiaddrs.push(options.multiaddr)
}
self.listener = tcp.createServer(function (socket) { callback()
errorUp(self, socket)
var ms = new Select()
ms.handle(socket)
ms.addHandler('/spdy/3.1.0', function (ds) {
log.info('Negotiated spdy with incoming socket')
var conn = new Muxer().attach(ds, true)
// attach multistream handlers to incoming streams
conn.on('stream', registerHandles)
errorUp(self, conn)
// FOR IDENTIFY
self.emit('connection-unknown', conn, socket)
// IDENTIFY DOES THIS FOR US
// conn.on('close', function () { delete self.connections[conn.peerId] })
}) })
}).listen(self.port, ready)
errorUp(self, self.listener)
} }
// interface self.addUpgrade = function (ConnUpgrade, options) {}
// open stream account for connection reuse self.addStreamMuxer = function (name, StreamMuxer, options) {
self.openConnection = function (peer, cb) { self.muxers[name] = {
// If no connection open yet, open it Muxer: StreamMuxer,
if (!self.connections[peer.id.toB58String()]) { options: options
// Establish a socket with one of the addresses }
var socket }
async.eachSeries(peer.multiaddrs, function (multiaddr, next) {
if (socket) { return next() }
var tmp = tcp.connect(multiaddr.toOptions(), function () { self.dial = function (peerInfo, options, protocol, callback) {
socket = tmp // 1. check if we have transports we support
errorUp(self, socket) // 2. check if we have a conn waiting
// 3. check if we have a stream muxer available
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
// check if a conn is waiting
// if it is and protocol was selected, jump into multistreamHandshake
// if it is and no protocol was selected, do nothing and call and empty callback
if (self.conns[peerInfo.id.toB58String()]) {
if (protocol) {
if (self.muxers['spdy']) {
// TODO upgrade this conn to a muxer
console.log('TODO: upgrade a warm conn to muxer that was added after')
} else {
multistreamHandshake(self.conns[peerInfo.id.toB58String()])
}
self.conns[peerInfo.id.toB58String()] = undefined
delete self.conns[peerInfo.id.toB58String()]
return
} else {
return callback()
}
}
// check if a stream muxer for this peer is available
if (self.muxedConns[peerInfo.id.toB58String()]) {
if (protocol) {
return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer)
} else {
return callback()
}
}
// Creating a new conn with this peer routine
// TODO - check if there is a preference in protocol to use on
// options.protocol
var supportedTransports = Object.keys(self.transports)
var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) {
return multiaddr.protoNames().some(function (proto) {
return supportedTransports.indexOf(proto) >= 0
})
})
var conn
async.eachSeries(multiaddrs, function (multiaddr, next) {
if (conn) {
return next()
}
var transportName = getTransportNameForMultiaddr(multiaddr)
var transport = self.transports[transportName]
var dialOptions = clone(transport.dialOptions)
dialOptions.ready = connected
var connTry = transport.transport.dial(multiaddr, dialOptions)
connTry.once('error', function (err) {
if (err) {
return console.log(err) // TODO handle error
}
next() // try next multiaddr
})
function connected () {
conn = connTry
next() next()
})
tmp.once('error', function (err) {
log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err)
next()
})
}, function done () {
if (!socket) {
return cb(new Error('Not able to open a scoket with peer - ',
peer.id.toB58String()))
} }
gotSocket(socket)
function getTransportNameForMultiaddr (multiaddr) {
// this works for all those ip + transport + port tripplets
return multiaddr.protoNames()[1]
}
function clone (obj) {
var target = {}
for (var i in obj) {
if (obj.hasOwnProperty(i)) {
target[i] = obj[i]
}
}
return target
}
}, done)
function done () {
// TODO apply upgrades
// apply stream muxer
// if no protocol is selected, save it in the pool
// if protocol is selected, multistream that protocol
if (!conn) {
callback(new Error('Unable to open a connection'))
}
if (self.muxers['spdy']) {
var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options)
spdy.attach(conn, false, function (err, muxer) {
if (err) {
return console.log(err) // TODO Treat error
}
muxer.on('stream', userProtocolMuxer)
self.muxedConns[peerInfo.id.toB58String()] = {
muxer: muxer,
socket: conn
}
if (protocol) {
openMuxedStream(muxer)
} else {
callback()
}
}) })
} else { } else {
cb() if (protocol) {
multistreamHandshake(conn)
} else {
self.conns[peerInfo.id.toB58String()] = conn
callback()
} }
// do the spdy people dance (multistream-select into spdy)
function gotSocket (socket) {
var msi = new Interactive()
msi.handle(socket, function () {
msi.select('/spdy/3.1.0', function (err, ds) {
if (err) { cb(err) }
var conn = new Muxer().attach(ds, false)
conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = {
conn: conn,
socket: socket
}
conn.on('close', function () { delete self.connections[peer.id.toB58String()]})
errorUp(self, conn)
cb()
})
})
} }
} }
self.openStream = function (peer, protocol, cb) { function openMuxedStream (muxer) {
self.openConnection(peer, function (err) { // 1. create a new stream on this muxedConn and pass that to
// multistreamHanshake
muxer.dialStream(function (err, conn) {
if (err) { if (err) {
return cb(err) return console.log(err) // TODO Treat error
} }
// spawn new muxed stream multistreamHandshake(conn)
var conn = self.connections[peer.id.toB58String()].conn
conn.dialStream(function (err, stream) {
if (err) { return cb(err) }
errorUp(self, stream)
// negotiate desired protocol
var msi = new Interactive()
msi.handle(stream, function () {
msi.select(protocol, function (err, ds) {
if (err) { return cb(err) }
peer.lastSeen = new Date()
cb(null, ds) // return the stream
})
})
})
}) })
} }
self.registerHandler = function (protocol, handlerFunc) { function multistreamHandshake (conn) {
if (self.handles[protocol]) { var msI = new multistream.Interactive()
return handlerFunc(new Error('Handle for protocol already exists', protocol)) msI.handle(conn, function () {
msI.select(protocol, callback)
})
} }
self.handles[protocol] = handlerFunc
log.info('Registered handler for protocol:', protocol)
} }
self.closeConns = function (cb) { self.closeListener = function (transportName, callback) {
var keys = Object.keys(self.connections) self.transports[transportName].listener.close(closed)
var number = keys.length
if (number === 0) { cb() }
var c = new Counter(number, cb)
keys.forEach(function (key) { // only gets called when all the streams on this transport are closed too
self.connections[key].conn.end() function closed () {
c.hit() delete self.transports[transportName]
callback()
}
}
self.closeConns = function (callback) {
// close warmed up cons so that the listener can gracefully exit
Object.keys(self.conns).forEach(function (conn) {
self.conns[conn].destroy()
})
self.conns = {}
callback()
}
self.close = function (callback) {
// close everything
}
self.enableIdentify = function () {
// set flag to true
// add identify to the list of handled protocols
self.identify = true
// we pass muxedConns so that identify can access the socket of the other
// peer
self.handleProtocol(identify.protoId,
identify.getHandlerFunction(self.peerInfo, self.muxedConns))
}
self.handleProtocol = function (protocol, handlerFunction) {
self.protocols[protocol] = handlerFunction
}
// internals
function listen (conn) {
// TODO apply upgrades
// add StreamMuxer if available (and point streams from muxer to userProtocolMuxer)
if (self.muxers['spdy']) {
var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options)
spdy.attach(conn, true, function (err, muxer) {
if (err) {
return console.log(err) // TODO treat error
}
// TODO This muxer has to be identified!
// pass to identify a reference of
// our muxedConn list
// ourselves (peerInfo)
// the conn, which is the socket
// and a stream it can send stuff
if (self.identify) {
muxer.dialStream(function (err, stream) {
if (err) {
return console.log(err) // TODO Treat error
}
// conn === socket at this point
identify(self.muxedConns, self.peerInfo, conn, stream, muxer)
}) })
} }
self.closeListener = function (cb) { muxer.on('stream', userProtocolMuxer)
self.listener.close(cb) })
} else {
// if no stream muxer, then
userProtocolMuxer(conn)
}
} }
function registerHandles (stream) { // Handle user given protocols
log.info('Registering protocol handlers on new stream') function userProtocolMuxer (conn) {
errorUp(self, stream) var msS = new multistream.Select()
var msH = new Select() msS.handle(conn)
msH.handle(stream) Object.keys(self.protocols).forEach(function (protocol) {
Object.keys(self.handles).forEach(function (protocol) { msS.addHandler(protocol, self.protocols[protocol])
msH.addHandler(protocol, self.handles[protocol])
}) })
} }
}
function errorUp (self, emitter) {
emitter.on('error', function (err) {
self.emit('error', err)
})
}
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) { callback() }
}
} }

View File

@ -1,254 +1,426 @@
var Lab = require('lab') var Lab = require('lab')
var Code = require('code') var Code = require('code')
var sinon = require('sinon')
var lab = exports.lab = Lab.script() var lab = exports.lab = Lab.script()
var experiment = lab.experiment var experiment = lab.experiment
var test = lab.test var test = lab.test
var beforeEach = lab.beforeEach
var afterEach = lab.afterEach
var expect = Code.expect var expect = Code.expect
var multiaddr = require('multiaddr') var multiaddr = require('multiaddr')
var Id = require('ipfs-peer-id') var Id = require('peer-id')
var Peer = require('ipfs-peer') var Peer = require('peer-info')
var Swarm = require('../src/') var Swarm = require('../src')
var Identify = require('../src/identify') var tcp = require('libp2p-tcp')
var Spdy = require('libp2p-spdy')
var swarmA /* TODO
var swarmB experiment('Basics', function () {
var peerA test('enforces creation with new', function (done) {done() })
var peerB })
*/
beforeEach(function (done) { // because of Travis-CI
swarmA = new Swarm() process.on('uncaughtException', function (err) {
swarmB = new Swarm() console.log('Caught exception: ' + err)
var c = new Counter(2, done)
swarmA.listen(8100, function () {
peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)])
c.hit()
}) })
swarmB.listen(8101, function () { experiment('Without a Stream Muxer', function () {
peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)]) experiment('tcp', function () {
c.hit() test('add the transport', function (done) {
}) var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
}) sw.addTransport('tcp', tcp,
{ multiaddr: mh }, {}, {port: 8010}, function () {
afterEach(function (done) { expect(sw.transports['tcp'].options).to.deep.equal({ multiaddr: mh })
// This should be 2, but for some reason expect(sw.transports['tcp'].dialOptions).to.deep.equal({})
// that will fail in most of the tests expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010})
var c = new Counter(1, done) expect(sw.transports['tcp'].transport).to.deep.equal(tcp)
sw.closeListener('tcp', function () {
swarmA.closeListener(function () {
c.hit()
})
swarmB.closeListener(function () {
c.hit()
})
})
experiment('BASICS', function () {
experiment('Swarm', function () {
test('enforces instantiation with new', function (done) {
expect(function () {
Swarm()
}).to.throw('Swarm must be called with new')
done()
})
test('parses $IPFS_SWARM_PORT', function (done) {
process.env.IPFS_SWARM_PORT = 1111
var swarm = new Swarm()
expect(swarm.port).to.be.equal(1111)
process.env.IPFS_SWARM_PORT = undefined
done()
})
})
experiment('Swarm.listen', function (done) {
test('handles missing port', function (done) {
var swarm = new Swarm()
swarm.listen(done)
})
test('handles passed in port', function (done) {
var swarm = new Swarm()
swarm.listen(1234)
expect(swarm.port).to.be.equal(1234)
done()
})
})
experiment('Swarm.registerHandler', function () {
test('throws when registering a protcol handler twice', function (done) {
var swarm = new Swarm()
swarm.registerHandler('/sparkles/1.1.1', function () {})
swarm.registerHandler('/sparkles/1.1.1', function (err) {
expect(err).to.be.an.instanceOf(Error)
expect(err.message).to.be.equal('Handle for protocol already exists')
done() done()
}) })
}) })
}) })
experiment('Swarm.closeConns', function () { test('dial a conn', function (done) {
test('calls end on all connections', function (done) { var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
swarmA.openConnection(peerB, function () { var p1 = new Peer(Id.create(), [])
var key = Object.keys(swarmA.connections)[0] var sw1 = new Swarm(p1)
sinon.spy(swarmA.connections[key].conn, 'end') sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
swarmA.closeConns(function () {
expect(swarmA.connections[key].conn.end.called).to.be.equal(true) var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
var readyCounter = 0
function ready () {
readyCounter++
if (readyCounter < 2) {
return
}
sw1.dial(p2, {}, function (err) {
expect(err).to.equal(undefined)
expect(Object.keys(sw1.conns).length).to.equal(1)
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
if (cleaningCounter < 4) {
return
}
done()
}
})
}
})
test('dial a conn on a protocol', function (done) {
var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p1 = new Peer(Id.create(), [])
var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
conn.end()
conn.on('end', function () {
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
if (cleaningCounter < 4) {
return
}
done()
}
})
})
var count = 0
function ready () {
count++
if (count < 2) {
return
}
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
}
})
test('dial a protocol on a previous created conn', function (done) {
var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p1 = new Peer(Id.create(), [])
var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
var readyCounter = 0
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
conn.end()
conn.on('end', function () {
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
if (cleaningCounter < 4) {
return
}
done()
}
})
})
function ready () {
readyCounter++
if (readyCounter < 2) {
return
}
sw1.dial(p2, {}, function (err) {
expect(err).to.equal(undefined)
expect(Object.keys(sw1.conns).length).to.equal(1)
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
}
})
// test('add an upgrade', function (done) { done() })
// test('dial a conn on top of a upgrade', function (done) { done() })
// test('dial a conn on a protocol on top of a upgrade', function (done) { done() })
})
/* TODO
experiment('udp', function () {
test('add the transport', function (done) { done() })
test('dial a conn', function (done) { done() })
test('dial a conn on a protocol', function (done) { done() })
test('add an upgrade', function (done) { done() })
test('dial a conn on top of a upgrade', function (done) { done() })
test('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
/* TODO
experiment('udt', function () {
test('add the transport', function (done) { done() })
test('dial a conn', function (done) { done() })
test('dial a conn on a protocol', function (done) { done() })
test('add an upgrade', function (done) { done() })
test('dial a conn on top of a upgrade', function (done) { done() })
test('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
/* TODO
experiment('utp', function () {
test('add the transport', function (done) { done() })
test('dial a conn', function (done) { done() })
test('dial a conn on a protocol', function (done) { done() })
test('add an upgrade', function (done) { done() })
test('dial a conn on top of a upgrade', function (done) { done() })
test('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
})
experiment('With a SPDY Stream Muxer', function () {
experiment('tcp', function () {
test('add Stream Muxer', function (done) {
// var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
sw.addStreamMuxer('spdy', Spdy, {})
done() done()
}) })
})
})
})
})
experiment('BASE', function () { test('dial a conn on a protocol', function (done) {
test('Open a stream', function (done) { var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
var protocol = '/sparkles/3.3.3' var p1 = new Peer(Id.create(), [])
var c = new Counter(2, done) var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
sw1.addStreamMuxer('spdy', Spdy, {})
swarmB.registerHandler(protocol, function (stream) { var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
c.hit() var p2 = new Peer(Id.create(), [])
}) var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
sw2.addStreamMuxer('spdy', Spdy, {})
swarmA.openStream(peerB, protocol, function (err, stream) { sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
expect(err).to.not.be.instanceof(Error) // formallity so that the conn starts flowing
c.hit() conn.on('data', function (chunk) {})
})
})
test('Reuse connection (from dialer)', function (done) { conn.end()
var protocol = '/sparkles/3.3.3' conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
expect(Object.keys(sw2.muxedConns).length).to.equal(0)
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
swarmB.registerHandler(protocol, function (stream) {}) sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
swarmA.openStream(peerB, protocol, function (err, stream) { function cleaningUp () {
expect(err).to.not.be.instanceof(Error) cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(swarmA.connections.length === 1)
done() done()
}
}) })
}) })
})
test('Check for lastSeen', function (done) {
var protocol = '/sparkles/3.3.3'
swarmB.registerHandler(protocol, function (stream) {}) var count = 0
function ready () {
count++
if (count < 2) {
return
}
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
}
})
test('dial two conns (transport reuse)', function (done) {
var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p1 = new Peer(Id.create(), [])
var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
sw1.addStreamMuxer('spdy', Spdy, {})
var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
sw2.addStreamMuxer('spdy', Spdy, {})
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
// formallity so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
expect(Object.keys(sw2.muxedConns).length).to.equal(0)
conn.end()
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(peerB.lastSeen).to.be.instanceof(Date)
done() done()
}
}) })
}) })
var count = 0
function ready () {
count++
if (count < 2) {
return
}
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
// TODO Improve clarity
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
}) })
experiment('IDENTIFY', function () { conn.on('data', function () {})
test('Attach Identify, open a stream, see a peer update', function (done) { expect(err).to.equal(null)
swarmA.on('error', function (err) { expect(Object.keys(sw1.conns).length).to.equal(0)
console.log('A - ', err) conn.end()
})
}
}) })
swarmB.on('error', function (err) { test('identify', function (done) {
console.log('B - ', err) var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
}) var p1 = new Peer(Id.create(), [])
var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
sw1.addStreamMuxer('spdy', Spdy, {})
sw1.enableIdentify()
var protocol = '/sparkles/3.3.3' var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
sw2.addStreamMuxer('spdy', Spdy, {})
sw2.enableIdentify()
var identifyA = new Identify(swarmA, peerA) sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
var identifyB = new Identify(swarmB, peerB) // formallity so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
// give time for identify to finish
setTimeout(function () { setTimeout(function () {
swarmB.registerHandler(protocol, function (stream) {}) expect(Object.keys(sw2.muxedConns).length).to.equal(1)
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
})
identifyB.on('peer-update', function (answer) {
done() done()
})
identifyA.on('peer-update', function (answer) {})
}, 500) }, 500)
}) }
test('Attach Identify, open a stream, reuse stream', function (done) {
var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA)
var identifyB = new Identify(swarmB, peerB)
swarmA.registerHandler(protocol, function (stream) {})
swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
})
identifyB.on('peer-update', function (answer) {
expect(Object.keys(swarmB.connections).length).to.equal(1)
swarmB.openStream(peerA, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(Object.keys(swarmB.connections).length).to.equal(1)
done()
}) })
}) })
identifyA.on('peer-update', function (answer) {})
})
test('Attach Identify, reuse peer', function (done) { var count = 0
var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA) function ready () {
var identifyB = new Identify(swarmB, peerB) // eslint-disable-line no-unused-vars count++
if (count < 2) {
swarmA.registerHandler(protocol, function (stream) {}) return
swarmB.registerHandler(protocol, function (stream) {})
var restartA = function (cb) {
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.end(cb)
})
} }
restartA(function () { sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
identifyA.once('peer-update', function () { conn.on('data', function () {})
expect(peerA.previousObservedAddrs.length).to.be.equal(1) expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
var c = new Counter(2, done) conn.end()
swarmA.closeConns(c.hit.bind(c))
swarmB.closeConns(c.hit.bind(c))
}) })
})
})
})
experiment('HARDNESS', function () {})
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) {
callback()
} }
} })
} })
})
// function checkErr (err) {
// console.log('err')
// expect(err).to.be.instanceof(Error)
// }