Compare commits

...

28 Commits

Author SHA1 Message Date
c01c49d5e2 chore: release version v0.22.1 2016-06-27 18:50:56 +01:00
ceb640cca0 chore: update contributors 2016-06-27 18:50:56 +01:00
3712c16e08 update identify 2016-06-27 18:48:31 +01:00
4737493d26 chore: release version v0.22.0 2016-06-27 11:56:36 +01:00
bb95ef119c chore: update contributors 2016-06-27 11:56:36 +01:00
d6e1b96a09 Merge pull request #80 from libp2p/new/identify
The new Identify™
2016-06-27 11:52:16 +01:00
071cdefd83 new identify 2016-06-27 10:48:21 +01:00
84d3471c01 chore: release version v0.21.0 2016-06-24 09:22:25 +01:00
64375d034d chore: update contributors 2016-06-24 09:22:25 +01:00
ca75c3ca4d Merge pull request #79 from libp2p/update-transports
update the transports
2016-06-23 21:49:29 +01:00
0ff3a0a3cd update the transports 2016-06-23 18:58:42 +01:00
edee20ba1a chore: release version v0.20.0 2016-06-04 19:55:53 +01:00
d793469311 chore: update contributors 2016-06-04 19:55:53 +01:00
0c869041b9 Merge pull request #76 from diasdavid/switch-to-lps
switch to lpm stream to match go
2016-06-04 20:53:09 +02:00
cb822757c1 switch to lpm stream to match go 2016-06-04 19:43:56 +01:00
ce86b7b4fb Merge pull request #73 from diasdavid/test/blow-up-websockets
add a test that blows up a WebSockets socket to make sure that spdy does not crash
2016-06-04 20:13:41 +02:00
0c9cba3a5c Merge pull request #74 from RichardLitt/feature/standardize-readme
Standardized README, fixed some grammar
2016-05-31 11:25:26 +01:00
066a157235 Standardized README, fixed some grammar 2016-05-31 11:15:37 +01:00
b917054acc add a test that blows up a WebSockets socket to make sure that spdy does not crash 2016-05-30 22:36:55 +01:00
a579ff818a chore: release version v0.19.5 2016-05-30 15:25:22 +01:00
6fa9dfc2f5 chore: update contributors 2016-05-30 15:25:22 +01:00
cad6d04295 update aegir 2016-05-30 15:18:10 +01:00
215fa08cc8 Merge pull request #72 from diasdavid/fix/spdy-blows-up
fix: Uncaught Error: socket hang up
2016-05-30 15:17:31 +01:00
834a15ddca propagete error and close events properly 2016-05-30 15:05:57 +01:00
33172f5850 chore: release version v0.19.4 2016-05-29 10:40:09 +01:00
6519e0ebd7 chore: update contributors 2016-05-29 10:40:09 +01:00
8341249aa6 Merge pull request #71 from diasdavid/fix/lost-getObservedAddrs-call
if there more than a multiaddr option, we would lose the ability to call getObservedAddrs
2016-05-29 10:33:38 +01:00
4eed2700b0 if there more than a multiaddr option, we would lose the ability to call getObservedAddrs 2016-05-29 10:21:56 +01:00
15 changed files with 345 additions and 270 deletions

View File

@ -11,6 +11,9 @@ module.exports = {
'../vendor/forge.bundle.js' '../vendor/forge.bundle.js'
) )
} }
},
externals: {
'simple-websocket-server': '{}'
} }
} }
} }

View File

@ -4,20 +4,38 @@ libp2p-swarm JavaScript implementation
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Build Status](https://img.shields.io/travis/diasdavid/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/js-libp2p-swarm) [![Build Status](https://img.shields.io/travis/libp2p/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-swarm)
[![Coverage Status](https://coveralls.io/repos/github/diasdavid/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/diasdavid/js-libp2p-swarm?branch=master) [![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-swarm?branch=master)
[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/diasdavid/js-libp2p-swarm) [![Dependency Status](https://david-dm.org/libp2p/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-swarm)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
> libp2p swarm implementation in JavaScript. > libp2p swarm implementation in JavaScript.
# Description libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, the multiplexing of several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream. libp2p-swarm is used by [libp2p](https://github.com/libp2p/js-libp2p) but it can be also used as a standalone module.
libp2p-swarm is used by libp2p but it can be also used as a standalone module. ## Table of Contents
# Usage - [Install](#install)
- [Usage](#usage)
- [Create a libp2p Swarm](#create-a-libp2p-swarm)
- [API](#api)
- [Transports](#transports)
- [Connection](#connection)
- [`swarm.dial(pi, protocol, callback)`](#swarmdialpi-protocol-callback)
- [`swarm.hangUp(pi, callback)`](#swarmhanguppi-callback)
- [`swarm.listen(callback)`](#swarmlistencallback)
- [`swarm.handle(protocol, handler)`](#swarmhandleprotocol-handler)
- [`swarm.unhandle(protocol)`](#swarmunhandleprotocol)
- [`swarm.close(callback)`](#swarmclosecallback)
- [Design](#design)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
- [Notes](#notes)
- [Contribute](#contribute)
- [License](#license)
## Install ## Install
@ -27,9 +45,9 @@ libp2p-swarm is available on npm and so, like any other npm module, just:
> npm install libp2p-swarm --save > npm install libp2p-swarm --save
``` ```
## API ## Usage
#### Create a libp2p Swarm ### Create a libp2p Swarm
And use it in your Node.js code as: And use it in your Node.js code as:
@ -39,6 +57,8 @@ const Swarm = require('libp2p-swarm')
const sw = new Swarm(peerInfo) const sw = new Swarm(peerInfo)
``` ```
## API
peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance. peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance.
### Transports ### Transports
@ -92,11 +112,11 @@ Upgrading a connection to use a stream muxer is still considered an upgrade, but
##### `swarm.connection.reuse()` ##### `swarm.connection.reuse()`
Enable the identify protocol Enable the identify protocol.
### `swarm.dial(pi, protocol, callback)` ### `swarm.dial(pi, protocol, callback)`
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing. dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
- `pi` - peer info project - `pi` - peer info project
- `protocol` - `protocol`
@ -104,7 +124,7 @@ dial uses the best transport (whatever works first, in the future we can have so
### `swarm.hangUp(pi, callback)` ### `swarm.hangUp(pi, callback)`
hangUp the muxedConn we have with the peer Hang up the muxed connection we have with the peer.
- `pi` - peer info project - `pi` - peer info project
- `callback` - `callback`
@ -115,32 +135,32 @@ Start listening on all added transports that are available on the current `peerI
### `swarm.handle(protocol, handler)` ### `swarm.handle(protocol, handler)`
handle a new protocol. Handle a new protocol.
- `protocol` - `protocol`
- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}` - `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}`
### `swarm.unhandle(protocol)` ### `swarm.unhandle(protocol)`
unhandle a protocol. Unhandle a protocol.
- `protocol` - `protocol`
### `swarm.close(callback)` ### `swarm.close(callback)`
close all the listeners and muxers. Close all the listeners and muxers.
- `callback` - `callback`
# Design ## Design
## Multitransport ### Multitransport
libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through. libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through.
In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec. In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec.
## Connection upgrades ### Connection upgrades
Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports. Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
@ -156,7 +176,7 @@ Types of upgrades to a connection:
We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol) We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol)
## Identify ### Identify
Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g: Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g:
@ -167,9 +187,17 @@ Identify is a protocol that Swarms mounts on top of itself, to identify the conn
In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies. In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
## Notes ### Notes
To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as: To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as:
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex - connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex
- transport - something that as a dial/listen interface and return objs that implement a connection interface - transport - something that as a dial/listen interface and return objs that implement a connection interface
## Contribute
This module is actively under development. Please check out the issues and submit PRs!
## License
MIT © Protocol Labs

View File

@ -1,6 +1,6 @@
{ {
"name": "libp2p-swarm", "name": "libp2p-swarm",
"version": "0.19.3", "version": "0.22.1",
"description": "libp2p swarm implementation in JavaScript", "description": "libp2p swarm implementation in JavaScript",
"main": "lib/index.js", "main": "lib/index.js",
"jsnext:main": "src/index.js", "jsnext:main": "src/index.js",
@ -18,7 +18,7 @@
}, },
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/diasdavid/js-libp2p-swarm.git" "url": "https://github.com/libp2p/js-libp2p-swarm.git"
}, },
"keywords": [ "keywords": [
"IPFS" "IPFS"
@ -26,9 +26,9 @@
"author": "David Dias <daviddias@ipfs.io>", "author": "David Dias <daviddias@ipfs.io>",
"license": "MIT", "license": "MIT",
"bugs": { "bugs": {
"url": "https://github.com/diasdavid/js-libp2p-swarm/issues" "url": "https://github.com/libp2p/js-libp2p-swarm/issues"
}, },
"homepage": "https://github.com/diasdavid/js-libp2p-swarm", "homepage": "https://github.com/libp2p/js-libp2p-swarm",
"pre-commit": [ "pre-commit": [
"lint", "lint",
"test" "test"
@ -37,32 +37,35 @@
"node": "^4.3.0" "node": "^4.3.0"
}, },
"devDependencies": { "devDependencies": {
"aegir": "^3.0.4", "aegir": "^3.2.0",
"bl": "^1.1.2",
"buffer-loader": "0.0.1", "buffer-loader": "0.0.1",
"chai": "^3.5.0", "chai": "^3.5.0",
"gulp": "^3.9.1", "gulp": "^3.9.1",
"istanbul": "^0.4.3", "istanbul": "^0.4.3",
"libp2p-multiplex": "^0.2.1", "libp2p-multiplex": "^0.2.1",
"libp2p-spdy": "^0.6.1", "libp2p-spdy": "^0.7.0",
"libp2p-tcp": "^0.6.1", "libp2p-tcp": "^0.7.1",
"libp2p-webrtc-star": "^0.2.0", "libp2p-webrtc-star": "^0.3.1",
"libp2p-websockets": "^0.6.1", "libp2p-websockets": "^0.7.0",
"pre-commit": "^1.1.2", "pre-commit": "^1.1.2",
"stream-pair": "^1.0.3", "stream-pair": "^1.0.3",
"webrtcsupport": "^2.2.0" "webrtcsupport": "^2.2.0"
}, },
"dependencies": { "dependencies": {
"babel-runtime": "^6.6.1", "babel-runtime": "^6.6.1",
"bl": "^1.1.2",
"browserify-zlib": "github:ipfs/browserify-zlib", "browserify-zlib": "github:ipfs/browserify-zlib",
"duplex-passthrough": "github:diasdavid/duplex-passthrough", "duplexify": "^3.4.3",
"interface-connection": "^0.1.7",
"ip-address": "^5.8.0", "ip-address": "^5.8.0",
"length-prefixed-stream": "^1.5.0",
"libp2p-identify": "^0.1.2",
"lodash.contains": "^2.4.3", "lodash.contains": "^2.4.3",
"multiaddr": "^2.0.0", "multiaddr": "^2.0.0",
"multistream-select": "^0.9.0", "multistream-select": "^0.9.0",
"peer-id": "^0.7.0", "peer-id": "^0.7.0",
"peer-info": "^0.7.0", "peer-info": "^0.7.0",
"protocol-buffers-stream": "^1.3.1", "protocol-buffers": "^3.1.6",
"run-parallel": "^1.1.6" "run-parallel": "^1.1.6"
}, },
"contributors": [ "contributors": [

View File

@ -1,7 +1,8 @@
'use strict' 'use strict'
const connHandler = require('./default-handler') const protocolMuxer = require('./protocol-muxer')
const identify = require('./identify') const identify = require('libp2p-identify')
const multistream = require('multistream-select')
module.exports = function connection (swarm) { module.exports = function connection (swarm) {
return { return {
@ -15,42 +16,50 @@ module.exports = function connection (swarm) {
swarm.handle(muxer.multicodec, (conn) => { swarm.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true) const muxedConn = muxer(conn, true)
var peerIdForConn
muxedConn.on('stream', (conn) => { muxedConn.on('stream', (conn) => {
function gotId () { protocolMuxer(swarm.protocols, conn)
if (peerIdForConn) {
conn.peerId = peerIdForConn
connHandler(swarm.protocols, conn)
} else {
setTimeout(gotId, 100)
}
}
if (swarm.identify) {
return gotId()
}
connHandler(swarm.protocols, conn)
}) })
// if identify is enabled, attempt to do it for muxer reuse // If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (swarm.identify) { if (swarm.identify) {
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => { // overload peerInfo to use Identify instead
conn.getPeerInfo = (cb) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) { return cb(err) }
ms.select(identify.multicodec, (err, conn) => {
if (err) { return cb(err) }
identify.exec(conn, (err, peerInfo, observedAddrs) => {
if (err) { return cb(err) }
observedAddrs.forEach((oa) => {
swarm._peerInfo.multiaddr.addSafe(oa)
})
cb(null, peerInfo)
})
})
})
}
conn.getPeerInfo((err, peerInfo) => {
if (err) { if (err) {
return console.log('Identify exec failed', err) return console.log('Identify not successful')
}
swarm.muxedConns[peerInfo.id.toB58String()] = {
muxer: muxedConn
} }
peerIdForConn = pi.id swarm.emit('peer-mux-established', peerInfo)
swarm.muxedConns[pi.id.toB58String()] = {}
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
swarm.emit('peer-mux-established', pi)
muxedConn.on('close', () => { muxedConn.on('close', () => {
delete swarm.muxedConns[pi.id.toB58String()] delete swarm.muxedConns[peerInfo.id.toB58String()]
swarm.emit('peer-mux-closed', pi) swarm.emit('peer-mux-closed', peerInfo)
}) })
}) })
} }
@ -59,7 +68,7 @@ module.exports = function connection (swarm) {
reuse () { reuse () {
swarm.identify = true swarm.identify = true
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm)) swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo))
} }
} }
} }

View File

@ -1,9 +1,9 @@
'use strict' 'use strict'
const multistream = require('multistream-select') const multistream = require('multistream-select')
const DuplexPassThrough = require('duplex-passthrough') const Connection = require('interface-connection').Connection
const connHandler = require('./default-handler') const protocolMuxer = require('./protocol-muxer')
module.exports = function dial (swarm) { module.exports = function dial (swarm) {
return (pi, protocol, callback) => { return (pi, protocol, callback) => {
@ -16,7 +16,7 @@ module.exports = function dial (swarm) {
callback = function noop () {} callback = function noop () {}
} }
const pt = new DuplexPassThrough() const proxyConn = new Connection()
const b58Id = pi.id.toB58String() const b58Id = pi.id.toB58String()
@ -40,9 +40,11 @@ module.exports = function dial (swarm) {
gotMuxer(swarm.muxedConns[b58Id].muxer) gotMuxer(swarm.muxedConns[b58Id].muxer)
} }
return pt return proxyConn
function gotWarmedUpConn (conn) { function gotWarmedUpConn (conn) {
conn.setPeerInfo(pi)
attemptMuxerUpgrade(conn, (err, muxer) => { attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) { if (!protocol) {
if (err) { if (err) {
@ -61,6 +63,13 @@ module.exports = function dial (swarm) {
} }
function gotMuxer (muxer) { function gotMuxer (muxer) {
if (swarm.identify) {
// TODO: Consider:
// 1. overload getPeerInfo
// 2. exec identify (through getPeerInfo)
// 3. update the peerInfo that is already stored in the conn
}
openConnInMuxedConn(muxer, (conn) => { openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, callback) protocolHandshake(conn, protocol, callback)
}) })
@ -88,7 +97,7 @@ module.exports = function dial (swarm) {
cryptoDial() cryptoDial()
function cryptoDial () { function cryptoDial () {
// currently, js-libp2p-swarm doesn't implement any crypto // currently, no crypto channel is implemented
const ms = new multistream.Dialer() const ms = new multistream.Dialer()
ms.handle(conn, (err) => { ms.handle(conn, (err) => {
if (err) { if (err) {
@ -133,7 +142,7 @@ module.exports = function dial (swarm) {
const muxedConn = swarm.muxers[key](conn, false) const muxedConn = swarm.muxers[key](conn, false)
swarm.muxedConns[b58Id] = {} swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn swarm.muxedConns[b58Id].muxer = muxedConn
swarm.muxedConns[b58Id].conn = conn // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
swarm.emit('peer-mux-established', pi) swarm.emit('peer-mux-established', pi)
@ -142,10 +151,9 @@ module.exports = function dial (swarm) {
swarm.emit('peer-mux-closed', pi) swarm.emit('peer-mux-closed', pi)
}) })
// in case identify is on // For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => { muxedConn.on('stream', (conn) => {
conn.peerId = pi.id protocolMuxer(swarm.protocols, conn)
connHandler(swarm.protocols, conn)
}) })
cb(null, muxedConn) cb(null, muxedConn)
@ -168,10 +176,8 @@ module.exports = function dial (swarm) {
if (err) { if (err) {
return callback(err) return callback(err)
} }
proxyConn.setInnerConn(conn)
pt.wrapStream(conn) callback(null, proxyConn)
pt.peerId = pi.id
callback(null, pt)
}) })
}) })
} }

View File

@ -1,113 +0,0 @@
/*
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through and to know when a new stream muxer is
* established, so a conn can be reused
*/
'use strict'
const multistream = require('multistream-select')
const fs = require('fs')
const path = require('path')
const Info = require('peer-info')
const Id = require('peer-id')
const multiaddr = require('multiaddr')
const identity = fs.readFileSync(path.join(__dirname, 'identify.proto'))
const pbStream = require('protocol-buffers-stream')(identity)
exports = module.exports
exports.multicodec = '/ipfs/id/1.0.0'
exports.exec = (rawConn, muxer, peerInfo, callback) => {
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer (extract fro conn)
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
const conn = muxer.newStream()
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return callback(err)
}
ms.select(exports.multicodec, (err, ds) => {
if (err) {
return callback(err)
}
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}
const peerId = Id.createFromPubKey(msg.publicKey)
const otherPeerInfo = new Info(peerId)
msg.listenAddrs.forEach((ma) => {
otherPeerInfo.multiaddr.add(multiaddr(ma))
})
callback(null, otherPeerInfo)
})
const obsMultiaddr = rawConn.getObservedAddrs()[0]
let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
}
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((mh) => mh.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.pipe(ds).pipe(pbs)
pbs.finalize()
})
})
}
exports.handler = (peerInfo, swarm) => {
return (conn) => {
// 1. receive incoming observed info about me
// 2. update my own information (on peerInfo)
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}
const peerId = Id.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[peerId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
}
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.finalize()
})
pbs.pipe(conn).pipe(pbs)
}
}

View File

@ -1,25 +0,0 @@
message Identify {
// protocolVersion determines compatibility between peers
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
// includes the client name and client.
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
// publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent.
// - then again, if we change / disable secure channel, may still want it.
optional bytes publicKey = 1;
// listenAddrs are the multiaddrs the sender node listens for open connections on
repeated bytes listenAddrs = 2;
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT.
optional bytes observedAddr = 4;
// (DEPRECATED) protocols are the services this node is running
// repeated string protocols = 3;
}

View File

@ -8,7 +8,7 @@ const contains = require('lodash.contains')
const transport = require('./transport') const transport = require('./transport')
const connection = require('./connection') const connection = require('./connection')
const dial = require('./dial') const dial = require('./dial')
const connHandler = require('./default-handler') const protocolMuxer = require('./protocol-muxer')
exports = module.exports = Swarm exports = module.exports = Swarm
@ -26,12 +26,10 @@ function Swarm (peerInfo) {
this._peerInfo = peerInfo this._peerInfo = peerInfo
// transports -- // transports --
// { key: transport }; e.g { tcp: <tcp> } // { key: transport }; e.g { tcp: <tcp> }
this.transports = {} this.transports = {}
// connections -- // connections --
// { peerIdB58: { conn: <conn> }} // { peerIdB58: { conn: <conn> }}
this.conns = {} this.conns = {}
@ -94,7 +92,7 @@ function Swarm (peerInfo) {
// our crypto handshake :) // our crypto handshake :)
this.handle('/plaintext/1.0.0', (conn) => { this.handle('/plaintext/1.0.0', (conn) => {
connHandler(this.protocols, conn) protocolMuxer(this.protocols, conn)
}) })
this.unhandle = (protocol, handler) => { this.unhandle = (protocol, handler) => {
@ -122,8 +120,16 @@ function Swarm (peerInfo) {
this.muxedConns[key].muxer.end() this.muxedConns[key].muxer.end()
}) })
parallel(Object.keys(this.transports).map((key) => { const transports = this.transports
return (cb) => this.transports[key].close(cb)
parallel(Object.keys(transports).map((key) => {
return (cb) => {
parallel(transports[key].listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), cb)
}
}), callback) }), callback)
} }
} }

View File

@ -2,9 +2,9 @@
const multistream = require('multistream-select') const multistream = require('multistream-select')
// incomming connection handler module.exports = function protocolMuxer (protocols, conn) {
module.exports = function connHandler (protocols, conn) {
const ms = new multistream.Listener() const ms = new multistream.Listener()
Object.keys(protocols).forEach((protocol) => { Object.keys(protocols).forEach((protocol) => {
if (!protocol) { if (!protocol) {
return return

View File

@ -1,9 +1,11 @@
'use strict' 'use strict'
const contains = require('lodash.contains') const Connection = require('interface-connection').Connection
const DuplexPassThrough = require('duplex-passthrough') const parallel = require('run-parallel')
const debug = require('debug')
const log = debug('libp2p:swarm')
const connHandler = require('./default-handler') const protocolMuxer = require('./protocol-muxer')
module.exports = function (swarm) { module.exports = function (swarm) {
return { return {
@ -12,12 +14,17 @@ module.exports = function (swarm) {
callback = options callback = options
options = {} options = {}
} }
if (!callback) { callback = noop } if (!callback) { callback = noop }
if (swarm.transports[key]) { if (swarm.transports[key]) {
throw new Error('There is already a transport with this key') throw new Error('There is already a transport with this key')
} }
swarm.transports[key] = transport swarm.transports[key] = transport
if (!swarm.transports[key].listeners) {
swarm.transports[key].listeners = []
}
callback() callback()
}, },
@ -34,57 +41,86 @@ module.exports = function (swarm) {
// b) if multiaddrs.length = 1, return the conn from the // b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough // transport, otherwise, create a passthrough
if (multiaddrs.length === 1) { if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift(), {ready: () => { const conn = t.dial(multiaddrs.shift())
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors conn.once('error', connectError)
cb(null, conn)
}}) conn.once('connect', () => {
conn.once('error', () => { conn.removeListener('error', connectError)
callback(new Error('failed to connect to every multiaddr')) callback(null, conn)
}) })
return conn return conn
} }
function connectError () {
callback(new Error('failed to connect to every multiaddr'))
}
// c) multiaddrs should already be a filtered list // c) multiaddrs should already be a filtered list
// specific for the transport we are using // specific for the transport we are using
const pt = new DuplexPassThrough() const proxyConn = new Connection()
next(multiaddrs.shift()) next(multiaddrs.shift())
return pt
function next (multiaddr) {
const conn = t.dial(multiaddr, {ready: () => {
pt.wrapStream(conn)
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors
cb(null, pt)
}})
conn.once('error', () => { return proxyConn
// TODO improve in the future to make all the dials in paralell
function next (multiaddr) {
const conn = t.dial(multiaddr)
conn.once('error', connectError)
function connectError () {
if (multiaddrs.length === 0) { if (multiaddrs.length === 0) {
return callback(new Error('failed to connect to every multiaddr')) return callback(new Error('failed to connect to every multiaddr'))
} }
next(multiaddrs.shift()) next(multiaddrs.shift())
}
conn.once('connect', () => {
conn.removeListener('error', connectError)
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
}) })
} }
}, },
listen (key, options, handler, callback) { listen (key, options, handler, callback) {
// if no callback is passed, we pass conns to connHandler // if no handler is passed, we pass conns to protocolMuxer
if (!handler) { if (!handler) {
handler = connHandler.bind(null, swarm.protocols) handler = protocolMuxer.bind(null, swarm.protocols)
} }
const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs) const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs)
swarm.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => { const transport = swarm.transports[key]
if (err) {
return callback(err)
}
if (maUpdate) {
// because we can listen on port 0...
swarm._peerInfo.multiaddr.replace(multiaddrs, maUpdate)
}
if (!transport.listeners) {
transport.listeners = []
}
let freshMultiaddrs = []
const createListeners = multiaddrs.map((ma) => {
return (cb) => {
const listener = transport.createListener(handler)
listener.listen(ma, () => {
log('Listener started on:', ma.toString())
listener.getAddrs((err, addrs) => {
if (err) {
return cb(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
cb()
})
})
}
})
parallel(createListeners, () => {
// cause we can listen on port 0 or 0.0.0.0
swarm._peerInfo.multiaddr.replace(multiaddrs, freshMultiaddrs)
callback() callback()
}) })
}, },
@ -96,13 +132,15 @@ module.exports = function (swarm) {
return callback(new Error(`Trying to close non existing transport: ${key}`)) return callback(new Error(`Trying to close non existing transport: ${key}`))
} }
transport.close(callback) parallel(transport.listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), callback)
} }
} }
} }
// transform given multiaddrs to a list of dialable addresses
// for the given transport `tp`.
function dialables (tp, multiaddrs) { function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs.map((addr) => { return tp.filter(multiaddrs.map((addr) => {
// webrtc-star needs the /ipfs/QmHash // webrtc-star needs the /ipfs/QmHash
@ -110,13 +148,7 @@ function dialables (tp, multiaddrs) {
return addr return addr
} }
// ipfs multiaddrs are not dialable so we drop them here
if (contains(addr.protoNames(), 'ipfs')) {
return addr.decapsulate('ipfs')
}
return addr return addr
})) }))
} }
function noop () {} function noop () {}

View File

@ -5,7 +5,7 @@ const expect = require('chai').expect
const Swarm = require('../src') const Swarm = require('../src')
describe('basics', () => { describe('create Swarm instance', () => {
it('throws on missing peerInfo', () => { it('throws on missing peerInfo', () => {
expect(() => Swarm()).to.throw(Error) expect(() => Swarm()).to.throw(Error)
}) })

View File

@ -1,4 +1,5 @@
/* eslint-env mocha */ /* eslint-env mocha */
'use strict' 'use strict'
const expect = require('chai').expect const expect = require('chai').expect
@ -127,9 +128,9 @@ describe('transport - tcp', function () {
}, ready) }, ready)
function ready () { function ready () {
expect(peer.multiaddrs.length).to.equal(1) expect(peer.multiaddrs.length >= 1).to.equal(true)
expect( expect(
peer.multiaddrs[0].equals(multiaddr('/ip4/0.0.0.0/tcp/9050')) peer.multiaddrs[0].equals(multiaddr('/ip4/127.0.0.1/tcp/9050'))
).to.be.equal( ).to.be.equal(
true true
) )
@ -148,7 +149,7 @@ describe('transport - tcp', function () {
}, ready) }, ready)
function ready () { function ready () {
expect(peer.multiaddrs.length).to.equal(1) expect(peer.multiaddrs.length >= 1).to.equal(true)
expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0')) expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0'))
swarm.close(done) swarm.close(done)
} }

View File

@ -10,7 +10,8 @@ const Swarm = require('../src')
const TCP = require('libp2p-tcp') const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-spdy') const multiplex = require('libp2p-spdy')
describe('stream muxing with multiplex (on TCP)', function () { // TODO multiplex needs to be upgraded, like spdy, to work again
describe.skip('stream muxing with multiplex (on TCP)', function () {
this.timeout(60 * 1000) this.timeout(60 * 1000)
var swarmA var swarmA

View File

@ -8,6 +8,8 @@ const multiaddr = require('multiaddr')
const Peer = require('peer-info') const Peer = require('peer-info')
const Swarm = require('../src') const Swarm = require('../src')
const TCP = require('libp2p-tcp') const TCP = require('libp2p-tcp')
const WebSockets = require('libp2p-websockets')
const spdy = require('libp2p-spdy') const spdy = require('libp2p-spdy')
describe('stream muxing with spdy (on TCP)', function () { describe('stream muxing with spdy (on TCP)', function () {
@ -19,11 +21,14 @@ describe('stream muxing with spdy (on TCP)', function () {
var peerB var peerB
var swarmC var swarmC
var peerC var peerC
var swarmD
var peerD
before((done) => { before((done) => {
peerA = new Peer() peerA = new Peer()
peerB = new Peer() peerB = new Peer()
peerC = new Peer() peerC = new Peer()
peerD = new Peer()
// console.log('peer A', peerA.id.toB58String()) // console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String()) // console.log('peer B', peerB.id.toB58String())
@ -32,27 +37,32 @@ describe('stream muxing with spdy (on TCP)', function () {
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
peerD.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9004'))
swarmA = new Swarm(peerA) swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB) swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC) swarmC = new Swarm(peerC)
swarmD = new Swarm(peerD)
swarmA.transport.add('tcp', new TCP()) swarmA.transport.add('tcp', new TCP())
swarmB.transport.add('tcp', new TCP()) swarmB.transport.add('tcp', new TCP())
swarmC.transport.add('tcp', new TCP()) swarmC.transport.add('tcp', new TCP())
swarmD.transport.add('tcp', new TCP())
parallel([ parallel([
(cb) => swarmA.transport.listen('tcp', {}, null, cb), (cb) => swarmA.transport.listen('tcp', {}, null, cb),
(cb) => swarmB.transport.listen('tcp', {}, null, cb), (cb) => swarmB.transport.listen('tcp', {}, null, cb),
(cb) => swarmC.transport.listen('tcp', {}, null, cb) (cb) => swarmC.transport.listen('tcp', {}, null, cb),
(cb) => swarmD.transport.listen('tcp', {}, null, cb)
], done) ], done)
}) })
after((done) => { after((done) => {
parallel([ parallel([
(cb) => swarmA.close(cb), (cb) => swarmA.close(cb),
(cb) => swarmB.close(cb) (cb) => swarmB.close(cb),
// (cb) => swarmC.close(cb) // (cb) => swarmC.close(cb)
(cb) => swarmD.close(cb)
], done) ], done)
}) })
@ -60,6 +70,7 @@ describe('stream muxing with spdy (on TCP)', function () {
swarmA.connection.addStreamMuxer(spdy) swarmA.connection.addStreamMuxer(spdy)
swarmB.connection.addStreamMuxer(spdy) swarmB.connection.addStreamMuxer(spdy)
swarmC.connection.addStreamMuxer(spdy) swarmC.connection.addStreamMuxer(spdy)
swarmD.connection.addStreamMuxer(spdy)
}) })
it('handle + dial on protocol', (done) => { it('handle + dial on protocol', (done) => {
@ -116,6 +127,113 @@ describe('stream muxing with spdy (on TCP)', function () {
}) })
}) })
it('with Identify, do getPeerInfo', (done) => {
swarmA.handle('/banana/1.0.0', (conn) => {
conn.getPeerInfo((err, peerInfoC) => {
expect(err).to.not.exist
expect(peerInfoC.id.toB58String()).to.equal(peerC.id.toB58String())
})
conn.pipe(conn)
})
swarmC.dial(peerA, '/banana/1.0.0', (err, conn) => {
expect(err).to.not.exist
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
conn.getPeerInfo((err, peerInfoA) => {
expect(err).to.not.exist
expect(peerInfoA.id.toB58String()).to.equal(peerA.id.toB58String())
conn.on('end', done)
conn.resume()
conn.end()
})
}, 500)
})
})
// This test is not possible as the raw conn is not exposed anymore
// TODO: create a similar version, but that spawns a swarm in a
// different proc
it.skip('make sure it does not blow up when the socket is closed', (done) => {
swarmD.connection.reuse()
let count = 0
const destroyed = () => ++count === 2 ? done() : null
swarmD.handle('/banana/1.0.0', (conn) => {
conn.on('error', () => {})
conn.on('close', destroyed)
})
swarmA.dial(peerD, '/banana/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.on('error', () => {})
conn.on('close', destroyed)
swarmD.muxedConns[peerA.id.toB58String()].conn.destroy()
})
})
// This test is not possible as the raw conn is not exposed anymore
// TODO: create a similar version, but that spawns a swarm in a
// different proc
it.skip('blow up a socket, with WebSockets', (done) => {
var swarmE
var peerE
var swarmF
var peerF
peerE = new Peer()
peerF = new Peer()
peerE.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9110/ws'))
peerF.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9120/ws'))
swarmE = new Swarm(peerE)
swarmF = new Swarm(peerF)
swarmE.transport.add('ws', new WebSockets())
swarmF.transport.add('ws', new WebSockets())
swarmE.connection.addStreamMuxer(spdy)
swarmF.connection.addStreamMuxer(spdy)
swarmE.connection.reuse()
swarmF.connection.reuse()
parallel([
(cb) => swarmE.transport.listen('ws', {}, null, cb),
(cb) => swarmF.transport.listen('ws', {}, null, cb)
], next)
function next () {
let count = 0
const destroyed = () => ++count === 2 ? close() : null
swarmE.handle('/avocado/1.0.0', (conn) => {
conn.on('error', () => {})
conn.on('close', destroyed)
})
swarmF.dial(peerE, '/avocado/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.on('error', () => {})
conn.on('close', destroyed)
swarmF.muxedConns[peerE.id.toB58String()].conn.destroy()
})
}
function close () {
parallel([
(cb) => swarmE.close(cb),
(cb) => swarmF.close(cb)
], done)
}
})
it('close one end, make sure the other does not blow', (done) => { it('close one end, make sure the other does not blow', (done) => {
swarmC.close((err) => { swarmC.close((err) => {
if (err) throw err if (err) throw err

View File

@ -198,13 +198,19 @@ describe('high level API - with everything mixed all together!', function () {
it('dial from tcp+ws to tcp+ws', (done) => { it('dial from tcp+ws to tcp+ws', (done) => {
swarmC.handle('/mamao/1.0.0', (conn) => { swarmC.handle('/mamao/1.0.0', (conn) => {
expect(conn.peerId).to.exist conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
})
conn.pipe(conn) conn.pipe(conn)
}) })
swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => { swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => {
expect(err).to.not.exist expect(err).to.not.exist
expect(conn.peerId).to.exist conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
})
expect(Object.keys(swarmA.muxedConns).length).to.equal(2) expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
conn.end() conn.end()