mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-10 14:21:33 +00:00
Compare commits
26 Commits
Author | SHA1 | Date | |
---|---|---|---|
6cbcf23706 | |||
171da593c3 | |||
5b0d96e80e | |||
6fa39a1b75 | |||
480b79f588 | |||
7ec197cd5a | |||
81fdc0d267 | |||
c01c49d5e2 | |||
ceb640cca0 | |||
3712c16e08 | |||
4737493d26 | |||
bb95ef119c | |||
d6e1b96a09 | |||
071cdefd83 | |||
84d3471c01 | |||
64375d034d | |||
ca75c3ca4d | |||
0ff3a0a3cd | |||
edee20ba1a | |||
d793469311 | |||
0c869041b9 | |||
cb822757c1 | |||
ce86b7b4fb | |||
0c9cba3a5c | |||
066a157235 | |||
b917054acc |
@ -11,6 +11,9 @@ module.exports = {
|
||||
'../vendor/forge.bundle.js'
|
||||
)
|
||||
}
|
||||
},
|
||||
externals: {
|
||||
'simple-websocket-server': '{}'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
68
README.md
68
README.md
@ -4,20 +4,38 @@ libp2p-swarm JavaScript implementation
|
||||
[](http://ipn.io)
|
||||
[](http://ipfs.io/)
|
||||
[](http://webchat.freenode.net/?channels=%23ipfs)
|
||||
[](https://travis-ci.org/diasdavid/js-libp2p-swarm)
|
||||
[](https://coveralls.io/github/diasdavid/js-libp2p-swarm?branch=master)
|
||||
[](https://david-dm.org/diasdavid/js-libp2p-swarm)
|
||||
[](https://travis-ci.org/libp2p/js-libp2p-swarm)
|
||||
[](https://coveralls.io/github/libp2p/js-libp2p-swarm?branch=master)
|
||||
[](https://david-dm.org/libp2p/js-libp2p-swarm)
|
||||
[](https://github.com/feross/standard)
|
||||
|
||||
> libp2p swarm implementation in JavaScript.
|
||||
|
||||
# Description
|
||||
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, the multiplexing of several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
|
||||
|
||||
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
|
||||
libp2p-swarm is used by [libp2p](https://github.com/libp2p/js-libp2p) but it can be also used as a standalone module.
|
||||
|
||||
libp2p-swarm is used by libp2p but it can be also used as a standalone module.
|
||||
## Table of Contents
|
||||
|
||||
# Usage
|
||||
- [Install](#install)
|
||||
- [Usage](#usage)
|
||||
- [Create a libp2p Swarm](#create-a-libp2p-swarm)
|
||||
- [API](#api)
|
||||
- [Transports](#transports)
|
||||
- [Connection](#connection)
|
||||
- [`swarm.dial(pi, protocol, callback)`](#swarmdialpi-protocol-callback)
|
||||
- [`swarm.hangUp(pi, callback)`](#swarmhanguppi-callback)
|
||||
- [`swarm.listen(callback)`](#swarmlistencallback)
|
||||
- [`swarm.handle(protocol, handler)`](#swarmhandleprotocol-handler)
|
||||
- [`swarm.unhandle(protocol)`](#swarmunhandleprotocol)
|
||||
- [`swarm.close(callback)`](#swarmclosecallback)
|
||||
- [Design](#design)
|
||||
- [Multitransport](#multitransport)
|
||||
- [Connection upgrades](#connection-upgrades)
|
||||
- [Identify](#identify)
|
||||
- [Notes](#notes)
|
||||
- [Contribute](#contribute)
|
||||
- [License](#license)
|
||||
|
||||
## Install
|
||||
|
||||
@ -27,9 +45,9 @@ libp2p-swarm is available on npm and so, like any other npm module, just:
|
||||
> npm install libp2p-swarm --save
|
||||
```
|
||||
|
||||
## API
|
||||
## Usage
|
||||
|
||||
#### Create a libp2p Swarm
|
||||
### Create a libp2p Swarm
|
||||
|
||||
And use it in your Node.js code as:
|
||||
|
||||
@ -39,6 +57,8 @@ const Swarm = require('libp2p-swarm')
|
||||
const sw = new Swarm(peerInfo)
|
||||
```
|
||||
|
||||
## API
|
||||
|
||||
peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance.
|
||||
|
||||
### Transports
|
||||
@ -92,11 +112,11 @@ Upgrading a connection to use a stream muxer is still considered an upgrade, but
|
||||
|
||||
##### `swarm.connection.reuse()`
|
||||
|
||||
Enable the identify protocol
|
||||
Enable the identify protocol.
|
||||
|
||||
### `swarm.dial(pi, protocol, callback)`
|
||||
|
||||
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing.
|
||||
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
|
||||
|
||||
- `pi` - peer info project
|
||||
- `protocol`
|
||||
@ -104,7 +124,7 @@ dial uses the best transport (whatever works first, in the future we can have so
|
||||
|
||||
### `swarm.hangUp(pi, callback)`
|
||||
|
||||
hangUp the muxedConn we have with the peer
|
||||
Hang up the muxed connection we have with the peer.
|
||||
|
||||
- `pi` - peer info project
|
||||
- `callback`
|
||||
@ -115,32 +135,32 @@ Start listening on all added transports that are available on the current `peerI
|
||||
|
||||
### `swarm.handle(protocol, handler)`
|
||||
|
||||
handle a new protocol.
|
||||
Handle a new protocol.
|
||||
|
||||
- `protocol`
|
||||
- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}`
|
||||
|
||||
### `swarm.unhandle(protocol)`
|
||||
|
||||
unhandle a protocol.
|
||||
Unhandle a protocol.
|
||||
|
||||
- `protocol`
|
||||
|
||||
### `swarm.close(callback)`
|
||||
|
||||
close all the listeners and muxers.
|
||||
Close all the listeners and muxers.
|
||||
|
||||
- `callback`
|
||||
|
||||
# Design
|
||||
## Design
|
||||
|
||||
## Multitransport
|
||||
### Multitransport
|
||||
|
||||
libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through.
|
||||
|
||||
In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec.
|
||||
|
||||
## Connection upgrades
|
||||
### Connection upgrades
|
||||
|
||||
Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
|
||||
|
||||
@ -156,7 +176,7 @@ Types of upgrades to a connection:
|
||||
|
||||
We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol)
|
||||
|
||||
## Identify
|
||||
### Identify
|
||||
|
||||
Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g:
|
||||
|
||||
@ -167,9 +187,17 @@ Identify is a protocol that Swarms mounts on top of itself, to identify the conn
|
||||
|
||||
In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
|
||||
|
||||
## Notes
|
||||
### Notes
|
||||
|
||||
To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as:
|
||||
|
||||
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex
|
||||
- transport - something that as a dial/listen interface and return objs that implement a connection interface
|
||||
|
||||
## Contribute
|
||||
|
||||
This module is actively under development. Please check out the issues and submit PRs!
|
||||
|
||||
## License
|
||||
|
||||
MIT © Protocol Labs
|
||||
|
26
package.json
26
package.json
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "libp2p-swarm",
|
||||
"version": "0.19.5",
|
||||
"version": "0.22.3",
|
||||
"description": "libp2p swarm implementation in JavaScript",
|
||||
"main": "lib/index.js",
|
||||
"jsnext:main": "src/index.js",
|
||||
@ -18,7 +18,7 @@
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/diasdavid/js-libp2p-swarm.git"
|
||||
"url": "https://github.com/libp2p/js-libp2p-swarm.git"
|
||||
},
|
||||
"keywords": [
|
||||
"IPFS"
|
||||
@ -26,9 +26,9 @@
|
||||
"author": "David Dias <daviddias@ipfs.io>",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://github.com/diasdavid/js-libp2p-swarm/issues"
|
||||
"url": "https://github.com/libp2p/js-libp2p-swarm/issues"
|
||||
},
|
||||
"homepage": "https://github.com/diasdavid/js-libp2p-swarm",
|
||||
"homepage": "https://github.com/libp2p/js-libp2p-swarm",
|
||||
"pre-commit": [
|
||||
"lint",
|
||||
"test"
|
||||
@ -37,32 +37,36 @@
|
||||
"node": "^4.3.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"aegir": "^3.2.0",
|
||||
"bl": "^1.1.2",
|
||||
"aegir": "^4.0.0",
|
||||
"buffer-loader": "0.0.1",
|
||||
"chai": "^3.5.0",
|
||||
"gulp": "^3.9.1",
|
||||
"istanbul": "^0.4.3",
|
||||
"libp2p-multiplex": "^0.2.1",
|
||||
"libp2p-spdy": "^0.6.1",
|
||||
"libp2p-tcp": "^0.6.1",
|
||||
"libp2p-webrtc-star": "^0.2.0",
|
||||
"libp2p-websockets": "^0.6.1",
|
||||
"libp2p-spdy": "^0.8.1",
|
||||
"libp2p-tcp": "^0.7.4",
|
||||
"libp2p-webrtc-star": "^0.3.2",
|
||||
"libp2p-websockets": "^0.7.1",
|
||||
"pre-commit": "^1.1.2",
|
||||
"stream-pair": "^1.0.3",
|
||||
"webrtcsupport": "^2.2.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"babel-runtime": "^6.6.1",
|
||||
"bl": "^1.1.2",
|
||||
"browserify-zlib": "github:ipfs/browserify-zlib",
|
||||
"debug": "^2.2.0",
|
||||
"duplexify": "^3.4.3",
|
||||
"interface-connection": "^0.1.7",
|
||||
"ip-address": "^5.8.0",
|
||||
"length-prefixed-stream": "^1.5.0",
|
||||
"libp2p-identify": "^0.1.3",
|
||||
"lodash.contains": "^2.4.3",
|
||||
"multiaddr": "^2.0.0",
|
||||
"multistream-select": "^0.9.0",
|
||||
"peer-id": "^0.7.0",
|
||||
"peer-info": "^0.7.0",
|
||||
"protocol-buffers-stream": "^1.3.1",
|
||||
"protocol-buffers": "^3.1.6",
|
||||
"run-parallel": "^1.1.6"
|
||||
},
|
||||
"contributors": [
|
||||
|
@ -1,7 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const connHandler = require('./default-handler')
|
||||
const identify = require('./identify')
|
||||
const protocolMuxer = require('./protocol-muxer')
|
||||
const identify = require('libp2p-identify')
|
||||
const multistream = require('multistream-select')
|
||||
|
||||
module.exports = function connection (swarm) {
|
||||
return {
|
||||
@ -15,43 +16,50 @@ module.exports = function connection (swarm) {
|
||||
swarm.handle(muxer.multicodec, (conn) => {
|
||||
const muxedConn = muxer(conn, true)
|
||||
|
||||
var peerIdForConn
|
||||
|
||||
muxedConn.on('stream', (conn) => {
|
||||
function gotId () {
|
||||
if (peerIdForConn) {
|
||||
conn.peerId = peerIdForConn
|
||||
connHandler(swarm.protocols, conn)
|
||||
} else {
|
||||
setTimeout(gotId, 100)
|
||||
}
|
||||
}
|
||||
|
||||
// If identify happened, when we have the Id of the conn
|
||||
if (swarm.identify) {
|
||||
return gotId()
|
||||
}
|
||||
|
||||
connHandler(swarm.protocols, conn)
|
||||
protocolMuxer(swarm.protocols, conn)
|
||||
})
|
||||
|
||||
// if identify is enabled, attempt to do it for muxer reuse
|
||||
// If identify is enabled
|
||||
// 1. overload getPeerInfo
|
||||
// 2. call getPeerInfo
|
||||
// 3. add this conn to the pool
|
||||
if (swarm.identify) {
|
||||
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => {
|
||||
// overload peerInfo to use Identify instead
|
||||
conn.getPeerInfo = (cb) => {
|
||||
const conn = muxedConn.newStream()
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
ms.select(identify.multicodec, (err, conn) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
identify.exec(conn, (err, peerInfo, observedAddrs) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
observedAddrs.forEach((oa) => {
|
||||
swarm._peerInfo.multiaddr.addSafe(oa)
|
||||
})
|
||||
|
||||
cb(null, peerInfo)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
if (err) {
|
||||
return console.log('Identify exec failed', err)
|
||||
return console.log('Identify not successful')
|
||||
}
|
||||
swarm.muxedConns[peerInfo.id.toB58String()] = {
|
||||
muxer: muxedConn
|
||||
}
|
||||
|
||||
peerIdForConn = pi.id
|
||||
swarm.muxedConns[pi.id.toB58String()] = {}
|
||||
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
|
||||
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
|
||||
|
||||
swarm.emit('peer-mux-established', pi)
|
||||
|
||||
swarm.emit('peer-mux-established', peerInfo)
|
||||
muxedConn.on('close', () => {
|
||||
delete swarm.muxedConns[pi.id.toB58String()]
|
||||
swarm.emit('peer-mux-closed', pi)
|
||||
delete swarm.muxedConns[peerInfo.id.toB58String()]
|
||||
swarm.emit('peer-mux-closed', peerInfo)
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -60,7 +68,7 @@ module.exports = function connection (swarm) {
|
||||
|
||||
reuse () {
|
||||
swarm.identify = true
|
||||
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm))
|
||||
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
33
src/dial.js
33
src/dial.js
@ -1,9 +1,9 @@
|
||||
'use strict'
|
||||
|
||||
const multistream = require('multistream-select')
|
||||
const Duplexify = require('duplexify')
|
||||
const Connection = require('interface-connection').Connection
|
||||
|
||||
const connHandler = require('./default-handler')
|
||||
const protocolMuxer = require('./protocol-muxer')
|
||||
|
||||
module.exports = function dial (swarm) {
|
||||
return (pi, protocol, callback) => {
|
||||
@ -16,7 +16,7 @@ module.exports = function dial (swarm) {
|
||||
callback = function noop () {}
|
||||
}
|
||||
|
||||
const pt = new Duplexify()
|
||||
const proxyConn = new Connection()
|
||||
|
||||
const b58Id = pi.id.toB58String()
|
||||
|
||||
@ -40,9 +40,11 @@ module.exports = function dial (swarm) {
|
||||
gotMuxer(swarm.muxedConns[b58Id].muxer)
|
||||
}
|
||||
|
||||
return pt
|
||||
return proxyConn
|
||||
|
||||
function gotWarmedUpConn (conn) {
|
||||
conn.setPeerInfo(pi)
|
||||
|
||||
attemptMuxerUpgrade(conn, (err, muxer) => {
|
||||
if (!protocol) {
|
||||
if (err) {
|
||||
@ -61,6 +63,13 @@ module.exports = function dial (swarm) {
|
||||
}
|
||||
|
||||
function gotMuxer (muxer) {
|
||||
if (swarm.identify) {
|
||||
// TODO: Consider:
|
||||
// 1. overload getPeerInfo
|
||||
// 2. exec identify (through getPeerInfo)
|
||||
// 3. update the peerInfo that is already stored in the conn
|
||||
}
|
||||
|
||||
openConnInMuxedConn(muxer, (conn) => {
|
||||
protocolHandshake(conn, protocol, callback)
|
||||
})
|
||||
@ -88,7 +97,7 @@ module.exports = function dial (swarm) {
|
||||
cryptoDial()
|
||||
|
||||
function cryptoDial () {
|
||||
// currently, js-libp2p-swarm doesn't implement any crypto
|
||||
// currently, no crypto channel is implemented
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) {
|
||||
@ -133,7 +142,7 @@ module.exports = function dial (swarm) {
|
||||
const muxedConn = swarm.muxers[key](conn, false)
|
||||
swarm.muxedConns[b58Id] = {}
|
||||
swarm.muxedConns[b58Id].muxer = muxedConn
|
||||
swarm.muxedConns[b58Id].conn = conn
|
||||
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
|
||||
|
||||
swarm.emit('peer-mux-established', pi)
|
||||
|
||||
@ -142,10 +151,9 @@ module.exports = function dial (swarm) {
|
||||
swarm.emit('peer-mux-closed', pi)
|
||||
})
|
||||
|
||||
// in case identify is on
|
||||
// For incoming streams, in case identify is on
|
||||
muxedConn.on('stream', (conn) => {
|
||||
conn.peerId = pi.id
|
||||
connHandler(swarm.protocols, conn)
|
||||
protocolMuxer(swarm.protocols, conn)
|
||||
})
|
||||
|
||||
cb(null, muxedConn)
|
||||
@ -168,11 +176,8 @@ module.exports = function dial (swarm) {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
pt.setReadable(conn)
|
||||
pt.setWritable(conn)
|
||||
pt.peerId = pi.id
|
||||
callback(null, pt)
|
||||
proxyConn.setInnerConn(conn)
|
||||
callback(null, proxyConn)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
113
src/identify.js
113
src/identify.js
@ -1,113 +0,0 @@
|
||||
/*
|
||||
* Identify is one of the protocols swarms speaks in order to
|
||||
* broadcast and learn about the ip:port pairs a specific peer
|
||||
* is available through and to know when a new stream muxer is
|
||||
* established, so a conn can be reused
|
||||
*/
|
||||
|
||||
'use strict'
|
||||
|
||||
const multistream = require('multistream-select')
|
||||
const fs = require('fs')
|
||||
const path = require('path')
|
||||
const Info = require('peer-info')
|
||||
const Id = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const identity = fs.readFileSync(path.join(__dirname, 'identify.proto'))
|
||||
|
||||
const pbStream = require('protocol-buffers-stream')(identity)
|
||||
|
||||
exports = module.exports
|
||||
exports.multicodec = '/ipfs/id/1.0.0'
|
||||
|
||||
exports.exec = (rawConn, muxer, peerInfo, callback) => {
|
||||
// 1. open a stream
|
||||
// 2. multistream into identify
|
||||
// 3. send what I see from this other peer (extract fro conn)
|
||||
// 4. receive what the other peer sees from me
|
||||
// 4. callback with (err, peerInfo)
|
||||
|
||||
const conn = muxer.newStream()
|
||||
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
ms.select(exports.multicodec, (err, ds) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
var pbs = pbStream()
|
||||
|
||||
pbs.on('identify', (msg) => {
|
||||
if (msg.observedAddr.length > 0) {
|
||||
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
|
||||
}
|
||||
|
||||
const peerId = Id.createFromPubKey(msg.publicKey)
|
||||
const otherPeerInfo = new Info(peerId)
|
||||
msg.listenAddrs.forEach((ma) => {
|
||||
otherPeerInfo.multiaddr.add(multiaddr(ma))
|
||||
})
|
||||
|
||||
callback(null, otherPeerInfo)
|
||||
})
|
||||
|
||||
const obsMultiaddr = rawConn.getObservedAddrs()[0]
|
||||
|
||||
let publicKey = new Buffer(0)
|
||||
if (peerInfo.id.pubKey) {
|
||||
publicKey = peerInfo.id.pubKey.bytes
|
||||
}
|
||||
|
||||
pbs.identify({
|
||||
protocolVersion: 'na',
|
||||
agentVersion: 'na',
|
||||
publicKey: publicKey,
|
||||
listenAddrs: peerInfo.multiaddrs.map((mh) => mh.buffer),
|
||||
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
|
||||
})
|
||||
|
||||
pbs.pipe(ds).pipe(pbs)
|
||||
pbs.finalize()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.handler = (peerInfo, swarm) => {
|
||||
return (conn) => {
|
||||
// 1. receive incoming observed info about me
|
||||
// 2. update my own information (on peerInfo)
|
||||
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
|
||||
var pbs = pbStream()
|
||||
pbs.on('identify', (msg) => {
|
||||
if (msg.observedAddr.length > 0) {
|
||||
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
|
||||
}
|
||||
|
||||
const peerId = Id.createFromPubKey(msg.publicKey)
|
||||
const conn = swarm.muxedConns[peerId.toB58String()].conn
|
||||
const obsMultiaddr = conn.getObservedAddrs()[0]
|
||||
|
||||
let publicKey = new Buffer(0)
|
||||
if (peerInfo.id.pubKey) {
|
||||
publicKey = peerInfo.id.pubKey.bytes
|
||||
}
|
||||
|
||||
pbs.identify({
|
||||
protocolVersion: 'na',
|
||||
agentVersion: 'na',
|
||||
publicKey: publicKey,
|
||||
listenAddrs: peerInfo.multiaddrs.map((ma) => ma.buffer),
|
||||
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
|
||||
})
|
||||
pbs.finalize()
|
||||
})
|
||||
|
||||
pbs.pipe(conn).pipe(pbs)
|
||||
}
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
message Identify {
|
||||
|
||||
// protocolVersion determines compatibility between peers
|
||||
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
|
||||
|
||||
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
|
||||
// includes the client name and client.
|
||||
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
|
||||
|
||||
// publicKey is this node's public key (which also gives its node.ID)
|
||||
// - may not need to be sent, as secure channel implies it has been sent.
|
||||
// - then again, if we change / disable secure channel, may still want it.
|
||||
optional bytes publicKey = 1;
|
||||
|
||||
// listenAddrs are the multiaddrs the sender node listens for open connections on
|
||||
repeated bytes listenAddrs = 2;
|
||||
|
||||
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
|
||||
// this is useful information to convey to the other side, as it helps the remote endpoint
|
||||
// determine whether its connection to the local peer goes through NAT.
|
||||
optional bytes observedAddr = 4;
|
||||
|
||||
// (DEPRECATED) protocols are the services this node is running
|
||||
// repeated string protocols = 3;
|
||||
}
|
18
src/index.js
18
src/index.js
@ -8,7 +8,7 @@ const contains = require('lodash.contains')
|
||||
const transport = require('./transport')
|
||||
const connection = require('./connection')
|
||||
const dial = require('./dial')
|
||||
const connHandler = require('./default-handler')
|
||||
const protocolMuxer = require('./protocol-muxer')
|
||||
|
||||
exports = module.exports = Swarm
|
||||
|
||||
@ -26,12 +26,10 @@ function Swarm (peerInfo) {
|
||||
this._peerInfo = peerInfo
|
||||
|
||||
// transports --
|
||||
|
||||
// { key: transport }; e.g { tcp: <tcp> }
|
||||
this.transports = {}
|
||||
|
||||
// connections --
|
||||
|
||||
// { peerIdB58: { conn: <conn> }}
|
||||
this.conns = {}
|
||||
|
||||
@ -94,7 +92,7 @@ function Swarm (peerInfo) {
|
||||
|
||||
// our crypto handshake :)
|
||||
this.handle('/plaintext/1.0.0', (conn) => {
|
||||
connHandler(this.protocols, conn)
|
||||
protocolMuxer(this.protocols, conn)
|
||||
})
|
||||
|
||||
this.unhandle = (protocol, handler) => {
|
||||
@ -122,8 +120,16 @@ function Swarm (peerInfo) {
|
||||
this.muxedConns[key].muxer.end()
|
||||
})
|
||||
|
||||
parallel(Object.keys(this.transports).map((key) => {
|
||||
return (cb) => this.transports[key].close(cb)
|
||||
const transports = this.transports
|
||||
|
||||
parallel(Object.keys(transports).map((key) => {
|
||||
return (cb) => {
|
||||
parallel(transports[key].listeners.map((listener) => {
|
||||
return (cb) => {
|
||||
listener.close(cb)
|
||||
}
|
||||
}), cb)
|
||||
}
|
||||
}), callback)
|
||||
}
|
||||
}
|
||||
|
@ -2,9 +2,9 @@
|
||||
|
||||
const multistream = require('multistream-select')
|
||||
|
||||
// incomming connection handler
|
||||
module.exports = function connHandler (protocols, conn) {
|
||||
module.exports = function protocolMuxer (protocols, conn) {
|
||||
const ms = new multistream.Listener()
|
||||
|
||||
Object.keys(protocols).forEach((protocol) => {
|
||||
if (!protocol) {
|
||||
return
|
112
src/transport.js
112
src/transport.js
@ -1,9 +1,11 @@
|
||||
'use strict'
|
||||
|
||||
const contains = require('lodash.contains')
|
||||
const Duplexify = require('duplexify')
|
||||
const Connection = require('interface-connection').Connection
|
||||
const parallel = require('run-parallel')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:swarm')
|
||||
|
||||
const connHandler = require('./default-handler')
|
||||
const protocolMuxer = require('./protocol-muxer')
|
||||
|
||||
module.exports = function (swarm) {
|
||||
return {
|
||||
@ -12,12 +14,17 @@ module.exports = function (swarm) {
|
||||
callback = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
if (!callback) { callback = noop }
|
||||
|
||||
if (swarm.transports[key]) {
|
||||
throw new Error('There is already a transport with this key')
|
||||
}
|
||||
swarm.transports[key] = transport
|
||||
if (!swarm.transports[key].listeners) {
|
||||
swarm.transports[key].listeners = []
|
||||
}
|
||||
|
||||
callback()
|
||||
},
|
||||
|
||||
@ -34,59 +41,86 @@ module.exports = function (swarm) {
|
||||
// b) if multiaddrs.length = 1, return the conn from the
|
||||
// transport, otherwise, create a passthrough
|
||||
if (multiaddrs.length === 1) {
|
||||
const conn = t.dial(multiaddrs.shift(), {ready: () => {
|
||||
const cb = callback
|
||||
callback = noop // this is done to avoid connection drops as connect errors
|
||||
cb(null, conn)
|
||||
}})
|
||||
conn.once('error', () => {
|
||||
callback(new Error('failed to connect to every multiaddr'))
|
||||
const conn = t.dial(multiaddrs.shift())
|
||||
|
||||
conn.once('error', connectError)
|
||||
|
||||
conn.once('connect', () => {
|
||||
conn.removeListener('error', connectError)
|
||||
callback(null, conn)
|
||||
})
|
||||
|
||||
return conn
|
||||
}
|
||||
function connectError () {
|
||||
callback(new Error('failed to connect to every multiaddr'))
|
||||
}
|
||||
|
||||
// c) multiaddrs should already be a filtered list
|
||||
// specific for the transport we are using
|
||||
const pt = new Duplexify()
|
||||
const proxyConn = new Connection()
|
||||
|
||||
next(multiaddrs.shift())
|
||||
return pt
|
||||
function next (multiaddr) {
|
||||
const conn = t.dial(multiaddr, {ready: () => {
|
||||
pt.setReadable(conn)
|
||||
pt.setWritable(conn)
|
||||
pt.getObservedAddrs = conn.getObservedAddrs.bind(conn)
|
||||
const cb = callback
|
||||
callback = noop // this is done to avoid connection drops as connect errors
|
||||
cb(null, pt)
|
||||
}})
|
||||
|
||||
conn.once('error', () => {
|
||||
return proxyConn
|
||||
|
||||
// TODO improve in the future to make all the dials in paralell
|
||||
function next (multiaddr) {
|
||||
const conn = t.dial(multiaddr)
|
||||
|
||||
conn.once('error', connectError)
|
||||
|
||||
function connectError () {
|
||||
if (multiaddrs.length === 0) {
|
||||
return callback(new Error('failed to connect to every multiaddr'))
|
||||
}
|
||||
next(multiaddrs.shift())
|
||||
}
|
||||
|
||||
conn.once('connect', () => {
|
||||
conn.removeListener('error', connectError)
|
||||
proxyConn.setInnerConn(conn)
|
||||
callback(null, proxyConn)
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
listen (key, options, handler, callback) {
|
||||
// if no callback is passed, we pass conns to connHandler
|
||||
// if no handler is passed, we pass conns to protocolMuxer
|
||||
if (!handler) {
|
||||
handler = connHandler.bind(null, swarm.protocols)
|
||||
handler = protocolMuxer.bind(null, swarm.protocols)
|
||||
}
|
||||
|
||||
const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs)
|
||||
|
||||
swarm.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
if (maUpdate) {
|
||||
// because we can listen on port 0...
|
||||
swarm._peerInfo.multiaddr.replace(multiaddrs, maUpdate)
|
||||
}
|
||||
const transport = swarm.transports[key]
|
||||
|
||||
if (!transport.listeners) {
|
||||
transport.listeners = []
|
||||
}
|
||||
|
||||
let freshMultiaddrs = []
|
||||
|
||||
const createListeners = multiaddrs.map((ma) => {
|
||||
return (cb) => {
|
||||
const listener = transport.createListener(handler)
|
||||
listener.listen(ma, () => {
|
||||
log('Listener started on:', ma.toString())
|
||||
listener.getAddrs((err, addrs) => {
|
||||
if (err) {
|
||||
return cb(err)
|
||||
}
|
||||
freshMultiaddrs = freshMultiaddrs.concat(addrs)
|
||||
transport.listeners.push(listener)
|
||||
cb()
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
parallel(createListeners, () => {
|
||||
// cause we can listen on port 0 or 0.0.0.0
|
||||
swarm._peerInfo.multiaddr.replace(multiaddrs, freshMultiaddrs)
|
||||
callback()
|
||||
})
|
||||
},
|
||||
@ -98,13 +132,15 @@ module.exports = function (swarm) {
|
||||
return callback(new Error(`Trying to close non existing transport: ${key}`))
|
||||
}
|
||||
|
||||
transport.close(callback)
|
||||
parallel(transport.listeners.map((listener) => {
|
||||
return (cb) => {
|
||||
listener.close(cb)
|
||||
}
|
||||
}), callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// transform given multiaddrs to a list of dialable addresses
|
||||
// for the given transport `tp`.
|
||||
function dialables (tp, multiaddrs) {
|
||||
return tp.filter(multiaddrs.map((addr) => {
|
||||
// webrtc-star needs the /ipfs/QmHash
|
||||
@ -112,13 +148,7 @@ function dialables (tp, multiaddrs) {
|
||||
return addr
|
||||
}
|
||||
|
||||
// ipfs multiaddrs are not dialable so we drop them here
|
||||
if (contains(addr.protoNames(), 'ipfs')) {
|
||||
return addr.decapsulate('ipfs')
|
||||
}
|
||||
|
||||
return addr
|
||||
}))
|
||||
}
|
||||
|
||||
function noop () {}
|
||||
|
@ -5,7 +5,7 @@ const expect = require('chai').expect
|
||||
|
||||
const Swarm = require('../src')
|
||||
|
||||
describe('basics', () => {
|
||||
describe('create Swarm instance', () => {
|
||||
it('throws on missing peerInfo', () => {
|
||||
expect(() => Swarm()).to.throw(Error)
|
||||
})
|
@ -1,4 +1,5 @@
|
||||
/* eslint-env mocha */
|
||||
|
||||
'use strict'
|
||||
|
||||
const expect = require('chai').expect
|
||||
@ -127,9 +128,9 @@ describe('transport - tcp', function () {
|
||||
}, ready)
|
||||
|
||||
function ready () {
|
||||
expect(peer.multiaddrs.length).to.equal(1)
|
||||
expect(peer.multiaddrs.length >= 1).to.equal(true)
|
||||
expect(
|
||||
peer.multiaddrs[0].equals(multiaddr('/ip4/0.0.0.0/tcp/9050'))
|
||||
peer.multiaddrs[0].equals(multiaddr('/ip4/127.0.0.1/tcp/9050'))
|
||||
).to.be.equal(
|
||||
true
|
||||
)
|
||||
@ -148,7 +149,7 @@ describe('transport - tcp', function () {
|
||||
}, ready)
|
||||
|
||||
function ready () {
|
||||
expect(peer.multiaddrs.length).to.equal(1)
|
||||
expect(peer.multiaddrs.length >= 1).to.equal(true)
|
||||
expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0'))
|
||||
swarm.close(done)
|
||||
}
|
||||
|
@ -10,7 +10,8 @@ const Swarm = require('../src')
|
||||
const TCP = require('libp2p-tcp')
|
||||
const multiplex = require('libp2p-spdy')
|
||||
|
||||
describe('stream muxing with multiplex (on TCP)', function () {
|
||||
// TODO multiplex needs to be upgraded, like spdy, to work again
|
||||
describe.skip('stream muxing with multiplex (on TCP)', function () {
|
||||
this.timeout(60 * 1000)
|
||||
|
||||
var swarmA
|
||||
|
@ -8,6 +8,8 @@ const multiaddr = require('multiaddr')
|
||||
const Peer = require('peer-info')
|
||||
const Swarm = require('../src')
|
||||
const TCP = require('libp2p-tcp')
|
||||
const WebSockets = require('libp2p-websockets')
|
||||
|
||||
const spdy = require('libp2p-spdy')
|
||||
|
||||
describe('stream muxing with spdy (on TCP)', function () {
|
||||
@ -125,25 +127,113 @@ describe('stream muxing with spdy (on TCP)', function () {
|
||||
})
|
||||
})
|
||||
|
||||
it('leave a stream open, make sure it does not blow up when the socket is closed', (done) => {
|
||||
it('with Identify, do getPeerInfo', (done) => {
|
||||
swarmA.handle('/banana/1.0.0', (conn) => {
|
||||
conn.getPeerInfo((err, peerInfoC) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfoC.id.toB58String()).to.equal(peerC.id.toB58String())
|
||||
})
|
||||
|
||||
conn.pipe(conn)
|
||||
})
|
||||
|
||||
swarmC.dial(peerA, '/banana/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
setTimeout(() => {
|
||||
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
|
||||
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
|
||||
conn.getPeerInfo((err, peerInfoA) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfoA.id.toB58String()).to.equal(peerA.id.toB58String())
|
||||
conn.on('end', done)
|
||||
conn.resume()
|
||||
conn.end()
|
||||
})
|
||||
}, 500)
|
||||
})
|
||||
})
|
||||
|
||||
// This test is not possible as the raw conn is not exposed anymore
|
||||
// TODO: create a similar version, but that spawns a swarm in a
|
||||
// different proc
|
||||
it.skip('make sure it does not blow up when the socket is closed', (done) => {
|
||||
swarmD.connection.reuse()
|
||||
|
||||
let count = 0
|
||||
const destroyed = () => ++count === 2 ? done() : null
|
||||
|
||||
swarmD.handle('/banana/1.0.0', (conn) => {
|
||||
conn.on('error', destroyed)
|
||||
conn.pipe(conn)
|
||||
conn.on('error', () => {})
|
||||
conn.on('close', destroyed)
|
||||
})
|
||||
|
||||
swarmA.dial(peerD, '/banana/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
conn.on('error', destroyed)
|
||||
|
||||
conn.on('error', () => {})
|
||||
conn.on('close', destroyed)
|
||||
|
||||
swarmD.muxedConns[peerA.id.toB58String()].conn.destroy()
|
||||
})
|
||||
})
|
||||
|
||||
// This test is not possible as the raw conn is not exposed anymore
|
||||
// TODO: create a similar version, but that spawns a swarm in a
|
||||
// different proc
|
||||
it.skip('blow up a socket, with WebSockets', (done) => {
|
||||
var swarmE
|
||||
var peerE
|
||||
var swarmF
|
||||
var peerF
|
||||
|
||||
peerE = new Peer()
|
||||
peerF = new Peer()
|
||||
|
||||
peerE.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9110/ws'))
|
||||
peerF.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9120/ws'))
|
||||
|
||||
swarmE = new Swarm(peerE)
|
||||
swarmF = new Swarm(peerF)
|
||||
|
||||
swarmE.transport.add('ws', new WebSockets())
|
||||
swarmF.transport.add('ws', new WebSockets())
|
||||
|
||||
swarmE.connection.addStreamMuxer(spdy)
|
||||
swarmF.connection.addStreamMuxer(spdy)
|
||||
swarmE.connection.reuse()
|
||||
swarmF.connection.reuse()
|
||||
|
||||
parallel([
|
||||
(cb) => swarmE.transport.listen('ws', {}, null, cb),
|
||||
(cb) => swarmF.transport.listen('ws', {}, null, cb)
|
||||
], next)
|
||||
|
||||
function next () {
|
||||
let count = 0
|
||||
const destroyed = () => ++count === 2 ? close() : null
|
||||
|
||||
swarmE.handle('/avocado/1.0.0', (conn) => {
|
||||
conn.on('error', () => {})
|
||||
conn.on('close', destroyed)
|
||||
})
|
||||
|
||||
swarmF.dial(peerE, '/avocado/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
conn.on('error', () => {})
|
||||
conn.on('close', destroyed)
|
||||
|
||||
swarmF.muxedConns[peerE.id.toB58String()].conn.destroy()
|
||||
})
|
||||
}
|
||||
|
||||
function close () {
|
||||
parallel([
|
||||
(cb) => swarmE.close(cb),
|
||||
(cb) => swarmF.close(cb)
|
||||
], done)
|
||||
}
|
||||
})
|
||||
|
||||
it('close one end, make sure the other does not blow', (done) => {
|
||||
swarmC.close((err) => {
|
||||
if (err) throw err
|
||||
|
@ -198,13 +198,19 @@ describe('high level API - with everything mixed all together!', function () {
|
||||
|
||||
it('dial from tcp+ws to tcp+ws', (done) => {
|
||||
swarmC.handle('/mamao/1.0.0', (conn) => {
|
||||
expect(conn.peerId).to.exist
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfo).to.exist
|
||||
})
|
||||
conn.pipe(conn)
|
||||
})
|
||||
|
||||
swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
expect(conn.peerId).to.exist
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfo).to.exist
|
||||
})
|
||||
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
|
||||
conn.end()
|
||||
|
||||
|
Reference in New Issue
Block a user