Compare commits

...

28 Commits

Author SHA1 Message Date
c01c49d5e2 chore: release version v0.22.1 2016-06-27 18:50:56 +01:00
ceb640cca0 chore: update contributors 2016-06-27 18:50:56 +01:00
3712c16e08 update identify 2016-06-27 18:48:31 +01:00
4737493d26 chore: release version v0.22.0 2016-06-27 11:56:36 +01:00
bb95ef119c chore: update contributors 2016-06-27 11:56:36 +01:00
d6e1b96a09 Merge pull request #80 from libp2p/new/identify
The new Identify™
2016-06-27 11:52:16 +01:00
071cdefd83 new identify 2016-06-27 10:48:21 +01:00
84d3471c01 chore: release version v0.21.0 2016-06-24 09:22:25 +01:00
64375d034d chore: update contributors 2016-06-24 09:22:25 +01:00
ca75c3ca4d Merge pull request #79 from libp2p/update-transports
update the transports
2016-06-23 21:49:29 +01:00
0ff3a0a3cd update the transports 2016-06-23 18:58:42 +01:00
edee20ba1a chore: release version v0.20.0 2016-06-04 19:55:53 +01:00
d793469311 chore: update contributors 2016-06-04 19:55:53 +01:00
0c869041b9 Merge pull request #76 from diasdavid/switch-to-lps
switch to lpm stream to match go
2016-06-04 20:53:09 +02:00
cb822757c1 switch to lpm stream to match go 2016-06-04 19:43:56 +01:00
ce86b7b4fb Merge pull request #73 from diasdavid/test/blow-up-websockets
add a test that blows up a WebSockets socket to make sure that spdy does not crash
2016-06-04 20:13:41 +02:00
0c9cba3a5c Merge pull request #74 from RichardLitt/feature/standardize-readme
Standardized README, fixed some grammar
2016-05-31 11:25:26 +01:00
066a157235 Standardized README, fixed some grammar 2016-05-31 11:15:37 +01:00
b917054acc add a test that blows up a WebSockets socket to make sure that spdy does not crash 2016-05-30 22:36:55 +01:00
a579ff818a chore: release version v0.19.5 2016-05-30 15:25:22 +01:00
6fa9dfc2f5 chore: update contributors 2016-05-30 15:25:22 +01:00
cad6d04295 update aegir 2016-05-30 15:18:10 +01:00
215fa08cc8 Merge pull request #72 from diasdavid/fix/spdy-blows-up
fix: Uncaught Error: socket hang up
2016-05-30 15:17:31 +01:00
834a15ddca propagete error and close events properly 2016-05-30 15:05:57 +01:00
33172f5850 chore: release version v0.19.4 2016-05-29 10:40:09 +01:00
6519e0ebd7 chore: update contributors 2016-05-29 10:40:09 +01:00
8341249aa6 Merge pull request #71 from diasdavid/fix/lost-getObservedAddrs-call
if there more than a multiaddr option, we would lose the ability to call getObservedAddrs
2016-05-29 10:33:38 +01:00
4eed2700b0 if there more than a multiaddr option, we would lose the ability to call getObservedAddrs 2016-05-29 10:21:56 +01:00
15 changed files with 345 additions and 270 deletions

View File

@ -11,6 +11,9 @@ module.exports = {
'../vendor/forge.bundle.js'
)
}
},
externals: {
'simple-websocket-server': '{}'
}
}
}

View File

@ -4,20 +4,38 @@ libp2p-swarm JavaScript implementation
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Build Status](https://img.shields.io/travis/diasdavid/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/js-libp2p-swarm)
[![Coverage Status](https://coveralls.io/repos/github/diasdavid/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/diasdavid/js-libp2p-swarm?branch=master)
[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/diasdavid/js-libp2p-swarm)
[![Build Status](https://img.shields.io/travis/libp2p/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-swarm)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-swarm?branch=master)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-swarm)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
> libp2p swarm implementation in JavaScript.
# Description
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, the multiplexing of several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
libp2p-swarm is used by [libp2p](https://github.com/libp2p/js-libp2p) but it can be also used as a standalone module.
libp2p-swarm is used by libp2p but it can be also used as a standalone module.
## Table of Contents
# Usage
- [Install](#install)
- [Usage](#usage)
- [Create a libp2p Swarm](#create-a-libp2p-swarm)
- [API](#api)
- [Transports](#transports)
- [Connection](#connection)
- [`swarm.dial(pi, protocol, callback)`](#swarmdialpi-protocol-callback)
- [`swarm.hangUp(pi, callback)`](#swarmhanguppi-callback)
- [`swarm.listen(callback)`](#swarmlistencallback)
- [`swarm.handle(protocol, handler)`](#swarmhandleprotocol-handler)
- [`swarm.unhandle(protocol)`](#swarmunhandleprotocol)
- [`swarm.close(callback)`](#swarmclosecallback)
- [Design](#design)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
- [Notes](#notes)
- [Contribute](#contribute)
- [License](#license)
## Install
@ -27,9 +45,9 @@ libp2p-swarm is available on npm and so, like any other npm module, just:
> npm install libp2p-swarm --save
```
## API
## Usage
#### Create a libp2p Swarm
### Create a libp2p Swarm
And use it in your Node.js code as:
@ -39,6 +57,8 @@ const Swarm = require('libp2p-swarm')
const sw = new Swarm(peerInfo)
```
## API
peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance.
### Transports
@ -92,11 +112,11 @@ Upgrading a connection to use a stream muxer is still considered an upgrade, but
##### `swarm.connection.reuse()`
Enable the identify protocol
Enable the identify protocol.
### `swarm.dial(pi, protocol, callback)`
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing.
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
- `pi` - peer info project
- `protocol`
@ -104,7 +124,7 @@ dial uses the best transport (whatever works first, in the future we can have so
### `swarm.hangUp(pi, callback)`
hangUp the muxedConn we have with the peer
Hang up the muxed connection we have with the peer.
- `pi` - peer info project
- `callback`
@ -115,32 +135,32 @@ Start listening on all added transports that are available on the current `peerI
### `swarm.handle(protocol, handler)`
handle a new protocol.
Handle a new protocol.
- `protocol`
- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}`
### `swarm.unhandle(protocol)`
unhandle a protocol.
Unhandle a protocol.
- `protocol`
### `swarm.close(callback)`
close all the listeners and muxers.
Close all the listeners and muxers.
- `callback`
# Design
## Design
## Multitransport
### Multitransport
libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through.
In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec.
## Connection upgrades
### Connection upgrades
Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
@ -156,7 +176,7 @@ Types of upgrades to a connection:
We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol)
## Identify
### Identify
Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g:
@ -167,9 +187,17 @@ Identify is a protocol that Swarms mounts on top of itself, to identify the conn
In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
## Notes
### Notes
To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as:
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex
- transport - something that as a dial/listen interface and return objs that implement a connection interface
## Contribute
This module is actively under development. Please check out the issues and submit PRs!
## License
MIT © Protocol Labs

View File

@ -1,6 +1,6 @@
{
"name": "libp2p-swarm",
"version": "0.19.3",
"version": "0.22.1",
"description": "libp2p swarm implementation in JavaScript",
"main": "lib/index.js",
"jsnext:main": "src/index.js",
@ -18,7 +18,7 @@
},
"repository": {
"type": "git",
"url": "https://github.com/diasdavid/js-libp2p-swarm.git"
"url": "https://github.com/libp2p/js-libp2p-swarm.git"
},
"keywords": [
"IPFS"
@ -26,9 +26,9 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/js-libp2p-swarm/issues"
"url": "https://github.com/libp2p/js-libp2p-swarm/issues"
},
"homepage": "https://github.com/diasdavid/js-libp2p-swarm",
"homepage": "https://github.com/libp2p/js-libp2p-swarm",
"pre-commit": [
"lint",
"test"
@ -37,32 +37,35 @@
"node": "^4.3.0"
},
"devDependencies": {
"aegir": "^3.0.4",
"bl": "^1.1.2",
"aegir": "^3.2.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"istanbul": "^0.4.3",
"libp2p-multiplex": "^0.2.1",
"libp2p-spdy": "^0.6.1",
"libp2p-tcp": "^0.6.1",
"libp2p-webrtc-star": "^0.2.0",
"libp2p-websockets": "^0.6.1",
"libp2p-spdy": "^0.7.0",
"libp2p-tcp": "^0.7.1",
"libp2p-webrtc-star": "^0.3.1",
"libp2p-websockets": "^0.7.0",
"pre-commit": "^1.1.2",
"stream-pair": "^1.0.3",
"webrtcsupport": "^2.2.0"
},
"dependencies": {
"babel-runtime": "^6.6.1",
"bl": "^1.1.2",
"browserify-zlib": "github:ipfs/browserify-zlib",
"duplex-passthrough": "github:diasdavid/duplex-passthrough",
"duplexify": "^3.4.3",
"interface-connection": "^0.1.7",
"ip-address": "^5.8.0",
"length-prefixed-stream": "^1.5.0",
"libp2p-identify": "^0.1.2",
"lodash.contains": "^2.4.3",
"multiaddr": "^2.0.0",
"multistream-select": "^0.9.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"protocol-buffers-stream": "^1.3.1",
"protocol-buffers": "^3.1.6",
"run-parallel": "^1.1.6"
},
"contributors": [

View File

@ -1,7 +1,8 @@
'use strict'
const connHandler = require('./default-handler')
const identify = require('./identify')
const protocolMuxer = require('./protocol-muxer')
const identify = require('libp2p-identify')
const multistream = require('multistream-select')
module.exports = function connection (swarm) {
return {
@ -15,42 +16,50 @@ module.exports = function connection (swarm) {
swarm.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
var peerIdForConn
muxedConn.on('stream', (conn) => {
function gotId () {
if (peerIdForConn) {
conn.peerId = peerIdForConn
connHandler(swarm.protocols, conn)
} else {
setTimeout(gotId, 100)
}
}
if (swarm.identify) {
return gotId()
}
connHandler(swarm.protocols, conn)
protocolMuxer(swarm.protocols, conn)
})
// if identify is enabled, attempt to do it for muxer reuse
// If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (swarm.identify) {
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => {
// overload peerInfo to use Identify instead
conn.getPeerInfo = (cb) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) { return cb(err) }
ms.select(identify.multicodec, (err, conn) => {
if (err) { return cb(err) }
identify.exec(conn, (err, peerInfo, observedAddrs) => {
if (err) { return cb(err) }
observedAddrs.forEach((oa) => {
swarm._peerInfo.multiaddr.addSafe(oa)
})
cb(null, peerInfo)
})
})
})
}
conn.getPeerInfo((err, peerInfo) => {
if (err) {
return console.log('Identify exec failed', err)
return console.log('Identify not successful')
}
swarm.muxedConns[peerInfo.id.toB58String()] = {
muxer: muxedConn
}
peerIdForConn = pi.id
swarm.muxedConns[pi.id.toB58String()] = {}
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
swarm.emit('peer-mux-established', pi)
swarm.emit('peer-mux-established', peerInfo)
muxedConn.on('close', () => {
delete swarm.muxedConns[pi.id.toB58String()]
swarm.emit('peer-mux-closed', pi)
delete swarm.muxedConns[peerInfo.id.toB58String()]
swarm.emit('peer-mux-closed', peerInfo)
})
})
}
@ -59,7 +68,7 @@ module.exports = function connection (swarm) {
reuse () {
swarm.identify = true
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm))
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo))
}
}
}

View File

@ -1,9 +1,9 @@
'use strict'
const multistream = require('multistream-select')
const DuplexPassThrough = require('duplex-passthrough')
const Connection = require('interface-connection').Connection
const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')
module.exports = function dial (swarm) {
return (pi, protocol, callback) => {
@ -16,7 +16,7 @@ module.exports = function dial (swarm) {
callback = function noop () {}
}
const pt = new DuplexPassThrough()
const proxyConn = new Connection()
const b58Id = pi.id.toB58String()
@ -40,9 +40,11 @@ module.exports = function dial (swarm) {
gotMuxer(swarm.muxedConns[b58Id].muxer)
}
return pt
return proxyConn
function gotWarmedUpConn (conn) {
conn.setPeerInfo(pi)
attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
@ -61,6 +63,13 @@ module.exports = function dial (swarm) {
}
function gotMuxer (muxer) {
if (swarm.identify) {
// TODO: Consider:
// 1. overload getPeerInfo
// 2. exec identify (through getPeerInfo)
// 3. update the peerInfo that is already stored in the conn
}
openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, callback)
})
@ -88,7 +97,7 @@ module.exports = function dial (swarm) {
cryptoDial()
function cryptoDial () {
// currently, js-libp2p-swarm doesn't implement any crypto
// currently, no crypto channel is implemented
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
@ -133,7 +142,7 @@ module.exports = function dial (swarm) {
const muxedConn = swarm.muxers[key](conn, false)
swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn
swarm.muxedConns[b58Id].conn = conn
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
swarm.emit('peer-mux-established', pi)
@ -142,10 +151,9 @@ module.exports = function dial (swarm) {
swarm.emit('peer-mux-closed', pi)
})
// in case identify is on
// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
conn.peerId = pi.id
connHandler(swarm.protocols, conn)
protocolMuxer(swarm.protocols, conn)
})
cb(null, muxedConn)
@ -168,10 +176,8 @@ module.exports = function dial (swarm) {
if (err) {
return callback(err)
}
pt.wrapStream(conn)
pt.peerId = pi.id
callback(null, pt)
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
})
}

View File

@ -1,113 +0,0 @@
/*
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through and to know when a new stream muxer is
* established, so a conn can be reused
*/
'use strict'
const multistream = require('multistream-select')
const fs = require('fs')
const path = require('path')
const Info = require('peer-info')
const Id = require('peer-id')
const multiaddr = require('multiaddr')
const identity = fs.readFileSync(path.join(__dirname, 'identify.proto'))
const pbStream = require('protocol-buffers-stream')(identity)
exports = module.exports
exports.multicodec = '/ipfs/id/1.0.0'
exports.exec = (rawConn, muxer, peerInfo, callback) => {
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer (extract fro conn)
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
const conn = muxer.newStream()
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return callback(err)
}
ms.select(exports.multicodec, (err, ds) => {
if (err) {
return callback(err)
}
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}
const peerId = Id.createFromPubKey(msg.publicKey)
const otherPeerInfo = new Info(peerId)
msg.listenAddrs.forEach((ma) => {
otherPeerInfo.multiaddr.add(multiaddr(ma))
})
callback(null, otherPeerInfo)
})
const obsMultiaddr = rawConn.getObservedAddrs()[0]
let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
}
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((mh) => mh.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.pipe(ds).pipe(pbs)
pbs.finalize()
})
})
}
exports.handler = (peerInfo, swarm) => {
return (conn) => {
// 1. receive incoming observed info about me
// 2. update my own information (on peerInfo)
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}
const peerId = Id.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[peerId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
}
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.finalize()
})
pbs.pipe(conn).pipe(pbs)
}
}

View File

@ -1,25 +0,0 @@
message Identify {
// protocolVersion determines compatibility between peers
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
// includes the client name and client.
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
// publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent.
// - then again, if we change / disable secure channel, may still want it.
optional bytes publicKey = 1;
// listenAddrs are the multiaddrs the sender node listens for open connections on
repeated bytes listenAddrs = 2;
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT.
optional bytes observedAddr = 4;
// (DEPRECATED) protocols are the services this node is running
// repeated string protocols = 3;
}

View File

@ -8,7 +8,7 @@ const contains = require('lodash.contains')
const transport = require('./transport')
const connection = require('./connection')
const dial = require('./dial')
const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')
exports = module.exports = Swarm
@ -26,12 +26,10 @@ function Swarm (peerInfo) {
this._peerInfo = peerInfo
// transports --
// { key: transport }; e.g { tcp: <tcp> }
this.transports = {}
// connections --
// { peerIdB58: { conn: <conn> }}
this.conns = {}
@ -94,7 +92,7 @@ function Swarm (peerInfo) {
// our crypto handshake :)
this.handle('/plaintext/1.0.0', (conn) => {
connHandler(this.protocols, conn)
protocolMuxer(this.protocols, conn)
})
this.unhandle = (protocol, handler) => {
@ -122,8 +120,16 @@ function Swarm (peerInfo) {
this.muxedConns[key].muxer.end()
})
parallel(Object.keys(this.transports).map((key) => {
return (cb) => this.transports[key].close(cb)
const transports = this.transports
parallel(Object.keys(transports).map((key) => {
return (cb) => {
parallel(transports[key].listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), cb)
}
}), callback)
}
}

View File

@ -2,9 +2,9 @@
const multistream = require('multistream-select')
// incomming connection handler
module.exports = function connHandler (protocols, conn) {
module.exports = function protocolMuxer (protocols, conn) {
const ms = new multistream.Listener()
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return

View File

@ -1,9 +1,11 @@
'use strict'
const contains = require('lodash.contains')
const DuplexPassThrough = require('duplex-passthrough')
const Connection = require('interface-connection').Connection
const parallel = require('run-parallel')
const debug = require('debug')
const log = debug('libp2p:swarm')
const connHandler = require('./default-handler')
const protocolMuxer = require('./protocol-muxer')
module.exports = function (swarm) {
return {
@ -12,12 +14,17 @@ module.exports = function (swarm) {
callback = options
options = {}
}
if (!callback) { callback = noop }
if (swarm.transports[key]) {
throw new Error('There is already a transport with this key')
}
swarm.transports[key] = transport
if (!swarm.transports[key].listeners) {
swarm.transports[key].listeners = []
}
callback()
},
@ -34,57 +41,86 @@ module.exports = function (swarm) {
// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift(), {ready: () => {
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors
cb(null, conn)
}})
conn.once('error', () => {
callback(new Error('failed to connect to every multiaddr'))
const conn = t.dial(multiaddrs.shift())
conn.once('error', connectError)
conn.once('connect', () => {
conn.removeListener('error', connectError)
callback(null, conn)
})
return conn
}
function connectError () {
callback(new Error('failed to connect to every multiaddr'))
}
// c) multiaddrs should already be a filtered list
// specific for the transport we are using
const pt = new DuplexPassThrough()
const proxyConn = new Connection()
next(multiaddrs.shift())
return pt
function next (multiaddr) {
const conn = t.dial(multiaddr, {ready: () => {
pt.wrapStream(conn)
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors
cb(null, pt)
}})
conn.once('error', () => {
return proxyConn
// TODO improve in the future to make all the dials in paralell
function next (multiaddr) {
const conn = t.dial(multiaddr)
conn.once('error', connectError)
function connectError () {
if (multiaddrs.length === 0) {
return callback(new Error('failed to connect to every multiaddr'))
}
next(multiaddrs.shift())
}
conn.once('connect', () => {
conn.removeListener('error', connectError)
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
}
},
listen (key, options, handler, callback) {
// if no callback is passed, we pass conns to connHandler
// if no handler is passed, we pass conns to protocolMuxer
if (!handler) {
handler = connHandler.bind(null, swarm.protocols)
handler = protocolMuxer.bind(null, swarm.protocols)
}
const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs)
swarm.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => {
if (err) {
return callback(err)
}
if (maUpdate) {
// because we can listen on port 0...
swarm._peerInfo.multiaddr.replace(multiaddrs, maUpdate)
}
const transport = swarm.transports[key]
if (!transport.listeners) {
transport.listeners = []
}
let freshMultiaddrs = []
const createListeners = multiaddrs.map((ma) => {
return (cb) => {
const listener = transport.createListener(handler)
listener.listen(ma, () => {
log('Listener started on:', ma.toString())
listener.getAddrs((err, addrs) => {
if (err) {
return cb(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
cb()
})
})
}
})
parallel(createListeners, () => {
// cause we can listen on port 0 or 0.0.0.0
swarm._peerInfo.multiaddr.replace(multiaddrs, freshMultiaddrs)
callback()
})
},
@ -96,13 +132,15 @@ module.exports = function (swarm) {
return callback(new Error(`Trying to close non existing transport: ${key}`))
}
transport.close(callback)
parallel(transport.listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), callback)
}
}
}
// transform given multiaddrs to a list of dialable addresses
// for the given transport `tp`.
function dialables (tp, multiaddrs) {
return tp.filter(multiaddrs.map((addr) => {
// webrtc-star needs the /ipfs/QmHash
@ -110,13 +148,7 @@ function dialables (tp, multiaddrs) {
return addr
}
// ipfs multiaddrs are not dialable so we drop them here
if (contains(addr.protoNames(), 'ipfs')) {
return addr.decapsulate('ipfs')
}
return addr
}))
}
function noop () {}

View File

@ -5,7 +5,7 @@ const expect = require('chai').expect
const Swarm = require('../src')
describe('basics', () => {
describe('create Swarm instance', () => {
it('throws on missing peerInfo', () => {
expect(() => Swarm()).to.throw(Error)
})

View File

@ -1,4 +1,5 @@
/* eslint-env mocha */
'use strict'
const expect = require('chai').expect
@ -127,9 +128,9 @@ describe('transport - tcp', function () {
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(1)
expect(peer.multiaddrs.length >= 1).to.equal(true)
expect(
peer.multiaddrs[0].equals(multiaddr('/ip4/0.0.0.0/tcp/9050'))
peer.multiaddrs[0].equals(multiaddr('/ip4/127.0.0.1/tcp/9050'))
).to.be.equal(
true
)
@ -148,7 +149,7 @@ describe('transport - tcp', function () {
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(1)
expect(peer.multiaddrs.length >= 1).to.equal(true)
expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0'))
swarm.close(done)
}

View File

@ -10,7 +10,8 @@ const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-spdy')
describe('stream muxing with multiplex (on TCP)', function () {
// TODO multiplex needs to be upgraded, like spdy, to work again
describe.skip('stream muxing with multiplex (on TCP)', function () {
this.timeout(60 * 1000)
var swarmA

View File

@ -8,6 +8,8 @@ const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const WebSockets = require('libp2p-websockets')
const spdy = require('libp2p-spdy')
describe('stream muxing with spdy (on TCP)', function () {
@ -19,11 +21,14 @@ describe('stream muxing with spdy (on TCP)', function () {
var peerB
var swarmC
var peerC
var swarmD
var peerD
before((done) => {
peerA = new Peer()
peerB = new Peer()
peerC = new Peer()
peerD = new Peer()
// console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String())
@ -32,27 +37,32 @@ describe('stream muxing with spdy (on TCP)', function () {
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
peerD.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9004'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC)
swarmD = new Swarm(peerD)
swarmA.transport.add('tcp', new TCP())
swarmB.transport.add('tcp', new TCP())
swarmC.transport.add('tcp', new TCP())
swarmD.transport.add('tcp', new TCP())
parallel([
(cb) => swarmA.transport.listen('tcp', {}, null, cb),
(cb) => swarmB.transport.listen('tcp', {}, null, cb),
(cb) => swarmC.transport.listen('tcp', {}, null, cb)
(cb) => swarmC.transport.listen('tcp', {}, null, cb),
(cb) => swarmD.transport.listen('tcp', {}, null, cb)
], done)
})
after((done) => {
parallel([
(cb) => swarmA.close(cb),
(cb) => swarmB.close(cb)
(cb) => swarmB.close(cb),
// (cb) => swarmC.close(cb)
(cb) => swarmD.close(cb)
], done)
})
@ -60,6 +70,7 @@ describe('stream muxing with spdy (on TCP)', function () {
swarmA.connection.addStreamMuxer(spdy)
swarmB.connection.addStreamMuxer(spdy)
swarmC.connection.addStreamMuxer(spdy)
swarmD.connection.addStreamMuxer(spdy)
})
it('handle + dial on protocol', (done) => {
@ -116,6 +127,113 @@ describe('stream muxing with spdy (on TCP)', function () {
})
})
it('with Identify, do getPeerInfo', (done) => {
swarmA.handle('/banana/1.0.0', (conn) => {
conn.getPeerInfo((err, peerInfoC) => {
expect(err).to.not.exist
expect(peerInfoC.id.toB58String()).to.equal(peerC.id.toB58String())
})
conn.pipe(conn)
})
swarmC.dial(peerA, '/banana/1.0.0', (err, conn) => {
expect(err).to.not.exist
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
conn.getPeerInfo((err, peerInfoA) => {
expect(err).to.not.exist
expect(peerInfoA.id.toB58String()).to.equal(peerA.id.toB58String())
conn.on('end', done)
conn.resume()
conn.end()
})
}, 500)
})
})
// This test is not possible as the raw conn is not exposed anymore
// TODO: create a similar version, but that spawns a swarm in a
// different proc
it.skip('make sure it does not blow up when the socket is closed', (done) => {
swarmD.connection.reuse()
let count = 0
const destroyed = () => ++count === 2 ? done() : null
swarmD.handle('/banana/1.0.0', (conn) => {
conn.on('error', () => {})
conn.on('close', destroyed)
})
swarmA.dial(peerD, '/banana/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.on('error', () => {})
conn.on('close', destroyed)
swarmD.muxedConns[peerA.id.toB58String()].conn.destroy()
})
})
// This test is not possible as the raw conn is not exposed anymore
// TODO: create a similar version, but that spawns a swarm in a
// different proc
it.skip('blow up a socket, with WebSockets', (done) => {
var swarmE
var peerE
var swarmF
var peerF
peerE = new Peer()
peerF = new Peer()
peerE.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9110/ws'))
peerF.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9120/ws'))
swarmE = new Swarm(peerE)
swarmF = new Swarm(peerF)
swarmE.transport.add('ws', new WebSockets())
swarmF.transport.add('ws', new WebSockets())
swarmE.connection.addStreamMuxer(spdy)
swarmF.connection.addStreamMuxer(spdy)
swarmE.connection.reuse()
swarmF.connection.reuse()
parallel([
(cb) => swarmE.transport.listen('ws', {}, null, cb),
(cb) => swarmF.transport.listen('ws', {}, null, cb)
], next)
function next () {
let count = 0
const destroyed = () => ++count === 2 ? close() : null
swarmE.handle('/avocado/1.0.0', (conn) => {
conn.on('error', () => {})
conn.on('close', destroyed)
})
swarmF.dial(peerE, '/avocado/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.on('error', () => {})
conn.on('close', destroyed)
swarmF.muxedConns[peerE.id.toB58String()].conn.destroy()
})
}
function close () {
parallel([
(cb) => swarmE.close(cb),
(cb) => swarmF.close(cb)
], done)
}
})
it('close one end, make sure the other does not blow', (done) => {
swarmC.close((err) => {
if (err) throw err

View File

@ -198,13 +198,19 @@ describe('high level API - with everything mixed all together!', function () {
it('dial from tcp+ws to tcp+ws', (done) => {
swarmC.handle('/mamao/1.0.0', (conn) => {
expect(conn.peerId).to.exist
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
})
conn.pipe(conn)
})
swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(conn.peerId).to.exist
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.exist
})
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
conn.end()