mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-09 13:51:34 +00:00
Compare commits
44 Commits
Author | SHA1 | Date | |
---|---|---|---|
e076c37523 | |||
3955f4fae9 | |||
9ea72f3019 | |||
ed35c767f8 | |||
48148c115f | |||
8dfd54e1d3 | |||
aea637340e | |||
ffb5436937 | |||
1b94eb2005 | |||
ff91575801 | |||
25834bcd55 | |||
4e8788afcc | |||
68cccbe6b6 | |||
08d976af22 | |||
e9fc1e3400 | |||
2c8ef1dced | |||
702fddcfd5 | |||
5b709dcd66 | |||
29ed23c02e | |||
36e0e44221 | |||
4d0a673e1c | |||
c27c344ffd | |||
b2698fd6cf | |||
1d872ca82f | |||
e830ad862d | |||
f17539a3e8 | |||
26ad29f38b | |||
b0ac05a828 | |||
9dd3a4087f | |||
4a71892328 | |||
1ac430d4d6 | |||
a35ce50ac8 | |||
9edabe8ee2 | |||
f3cb926e99 | |||
ffd2f8dfa3 | |||
22e8518634 | |||
546af97e47 | |||
800c394c59 | |||
b374735440 | |||
80fbfd9c1a | |||
2e1432007e | |||
a77bbbc238 | |||
48ac51ecb9 | |||
4bdff3896a |
59
.travis.yml
59
.travis.yml
@ -1,32 +1,41 @@
|
||||
# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
|
||||
sudo: false
|
||||
language: node_js
|
||||
cache: npm
|
||||
stages:
|
||||
- check
|
||||
- test
|
||||
- cov
|
||||
|
||||
matrix:
|
||||
node_js:
|
||||
- '10'
|
||||
|
||||
os:
|
||||
- linux
|
||||
- osx
|
||||
- windows
|
||||
|
||||
script: npx nyc -s npm run test:node -- --bail
|
||||
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov
|
||||
|
||||
jobs:
|
||||
include:
|
||||
- node_js: 6
|
||||
env: CXX=g++-4.8
|
||||
- node_js: 8
|
||||
env: CXX=g++-4.8
|
||||
# - node_js: stable
|
||||
# env: CXX=g++-4.8
|
||||
|
||||
script:
|
||||
- stage: check
|
||||
script:
|
||||
- npx aegir commitlint --travis
|
||||
- npx aegir dep-check
|
||||
- npm run lint
|
||||
- npm run test
|
||||
- npm run coverage
|
||||
|
||||
before_script:
|
||||
- export DISPLAY=:99.0
|
||||
- sh -e /etc/init.d/xvfb start
|
||||
- stage: test
|
||||
name: chrome
|
||||
addons:
|
||||
chrome: stable
|
||||
script: npx aegir test -t browser
|
||||
|
||||
after_success:
|
||||
- npm run coverage-publish
|
||||
- stage: test
|
||||
name: firefox
|
||||
addons:
|
||||
firefox: latest
|
||||
script: npx aegir test -t browser -- --browsers FirefoxHeadless
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
||||
addons:
|
||||
firefox: 'latest'
|
||||
apt:
|
||||
sources:
|
||||
- ubuntu-toolchain-r-test
|
||||
packages:
|
||||
- g++-4.8
|
||||
|
86
CHANGELOG.md
86
CHANGELOG.md
@ -1,3 +1,89 @@
|
||||
<a name="0.3.7"></a>
|
||||
## [0.3.7](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.6...v0.3.7) (2019-05-29)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* shim setImmediate in browser ([#51](https://github.com/libp2p/js-libp2p-circuit/issues/51)) ([9ea72f3](https://github.com/libp2p/js-libp2p-circuit/commit/9ea72f3))
|
||||
|
||||
|
||||
|
||||
<a name="0.3.6"></a>
|
||||
## [0.3.6](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.5...v0.3.6) (2019-03-12)
|
||||
|
||||
|
||||
|
||||
<a name="0.3.5"></a>
|
||||
## [0.3.5](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.4...v0.3.5) (2019-02-15)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* src/dst addrs validation ([68cccbe](https://github.com/libp2p/js-libp2p-circuit/commit/68cccbe))
|
||||
|
||||
|
||||
|
||||
<a name="0.3.4"></a>
|
||||
## [0.3.4](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.3...v0.3.4) (2019-01-10)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* reduce bundle size ([#40](https://github.com/libp2p/js-libp2p-circuit/issues/40)) ([2c8ef1d](https://github.com/libp2p/js-libp2p-circuit/commit/2c8ef1d))
|
||||
|
||||
|
||||
|
||||
<a name="0.3.3"></a>
|
||||
## [0.3.3](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.2...v0.3.3) (2019-01-04)
|
||||
|
||||
|
||||
|
||||
<a name="0.3.2"></a>
|
||||
## [0.3.2](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.1...v0.3.2) (2019-01-02)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* connection establishment event handling ([#41](https://github.com/libp2p/js-libp2p-circuit/issues/41)) ([c27c344](https://github.com/libp2p/js-libp2p-circuit/commit/c27c344))
|
||||
|
||||
|
||||
|
||||
<a name="0.3.1"></a>
|
||||
## [0.3.1](https://github.com/libp2p/js-libp2p-circuit/compare/v0.3.0...v0.3.1) (2018-11-15)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* catch bad peerid and update deps ([#39](https://github.com/libp2p/js-libp2p-circuit/issues/39)) ([f17539a](https://github.com/libp2p/js-libp2p-circuit/commit/f17539a))
|
||||
|
||||
|
||||
|
||||
<a name="0.3.0"></a>
|
||||
# [0.3.0](https://github.com/libp2p/js-libp2p-circuit/compare/v0.2.1...v0.3.0) (2018-10-01)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* listener should emit connection ([#30](https://github.com/libp2p/js-libp2p-circuit/issues/30)) ([1ac430d](https://github.com/libp2p/js-libp2p-circuit/commit/1ac430d))
|
||||
|
||||
|
||||
|
||||
<a name="0.2.1"></a>
|
||||
## [0.2.1](https://github.com/libp2p/js-libp2p-circuit/compare/v0.2.0...v0.2.1) (2018-08-13)
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* close streams ([546af97](https://github.com/libp2p/js-libp2p-circuit/commit/546af97))
|
||||
* options ([22e8518](https://github.com/libp2p/js-libp2p-circuit/commit/22e8518))
|
||||
|
||||
|
||||
|
||||
<a name="0.2.0"></a>
|
||||
# [0.2.0](https://github.com/libp2p/js-libp2p-circuit/compare/v0.1.5...v0.2.0) (2018-04-05)
|
||||
|
||||
|
||||
|
||||
<a name="0.1.5"></a>
|
||||
## [0.1.5](https://github.com/libp2p/js-libp2p-circuit/compare/v0.1.4...v0.1.5) (2018-03-14)
|
||||
|
||||
|
21
README.md
21
README.md
@ -1,13 +1,14 @@
|
||||
# js-libp2p-circuit
|
||||
|
||||
[](http://ipn.io)
|
||||
[](http://webchat.freenode.net/?channels=%23ipfs)
|
||||
[](https://travis-ci.org/libp2p/js-libp2p-circuit)
|
||||
[](https://coveralls.io/github/libp2p/js-libp2p-circuit?branch=master)
|
||||
|
||||
[](http://protocol.ai)
|
||||
[](http://libp2p.io/)
|
||||
[](http://webchat.freenode.net/?channels=%23libp2p)
|
||||
[](https://discuss.libp2p.io)
|
||||
[](https://travis-ci.com/libp2p/js-libp2p-circuit)
|
||||
[](https://codecov.io/gh/libp2p/js-libp2p-circuit)
|
||||
[](https://david-dm.org/libp2p/js-libp2p-circuit)
|
||||
[](https://github.com/feross/standard)
|
||||

|
||||

|
||||
|
||||

|
||||

|
||||
@ -32,6 +33,10 @@ The use of circuit-relaying is not limited to routing traffic between browser no
|
||||
|
||||
Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes could only access content from nodes that speak the same protocol, for example TCP only nodes could only dial to other TCP only nodes, same for any other protocol combination. In practice, this limitation was most visible in JS-IPFS browser nodes, since they can only dial out but not be dialed in over WebRTC or WebSockets, hence any content that the browser node held was not reachable by the rest of the network even through it was announced on the DHT. Non browser IPFS nodes would usually speak more than one protocol such as TCP, WebSockets and/or WebRTC, this made the problem less severe outside of the browser. `libp2p-circuit` solves this problem completely, as long as there are `relay nodes` capable of routing traffic between those nodes their content should be available to the rest of the IPFS network.
|
||||
|
||||
## Lead Maintainer
|
||||
|
||||
[Jacob Heun](https://github.com/jacobheun)
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Install](#install)
|
||||
@ -99,7 +104,7 @@ hello
|
||||
#### Create `relay`
|
||||
|
||||
```js
|
||||
const Relay = require('libp2p-circuit').Realy
|
||||
const Relay = require('libp2p-circuit').Relay
|
||||
|
||||
const relay = new Relay(options)
|
||||
|
||||
@ -166,7 +171,7 @@ libp2p
|
||||
|| | | | | || | | | | | |
|
||||
||libp2p-tcp |libp2p-ws | .... |libp2p-circuit || listener handles STOP messages from| | | listener | | |
|
||||
|| | +--------------------------------------------------------------------------> | | |
|
||||
|| | | |pluggs in just || circuit-relay nodes | | +-------------+ | |
|
||||
|| | | |plugs in just || circuit-relay nodes | | +-------------+ | |
|
||||
|| | | |as any other || | | | |
|
||||
|| | | |transport || | +------------------+ |
|
||||
|+-----------------------------------------------+| | |
|
||||
|
29
appveyor.yml
29
appveyor.yml
@ -1,29 +0,0 @@
|
||||
# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
|
||||
version: "{build}"
|
||||
|
||||
environment:
|
||||
matrix:
|
||||
- nodejs_version: "6"
|
||||
- nodejs_version: "8"
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
|
||||
install:
|
||||
# Install Node.js
|
||||
- ps: Install-Product node $env:nodejs_version
|
||||
|
||||
# Upgrade npm
|
||||
- npm install -g npm
|
||||
|
||||
# Output our current versions for debugging
|
||||
- node --version
|
||||
- npm --version
|
||||
|
||||
# Install our package dependencies
|
||||
- npm install
|
||||
|
||||
test_script:
|
||||
- npm run test:node
|
||||
|
||||
build: off
|
2
ci/Jenkinsfile
vendored
2
ci/Jenkinsfile
vendored
@ -1,2 +0,0 @@
|
||||
// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
|
||||
javascript()
|
15
circle.yml
15
circle.yml
@ -1,15 +0,0 @@
|
||||
# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
|
||||
machine:
|
||||
node:
|
||||
version: stable
|
||||
|
||||
dependencies:
|
||||
pre:
|
||||
- google-chrome --version
|
||||
- curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
|
||||
- sudo dpkg -i google-chrome.deb || true
|
||||
- sudo apt-get update
|
||||
- sudo apt-get install -f
|
||||
- sudo apt-get install --only-upgrade lsb-base
|
||||
- sudo dpkg -i google-chrome.deb
|
||||
- google-chrome --version
|
64
package.json
64
package.json
@ -1,8 +1,13 @@
|
||||
{
|
||||
"name": "libp2p-circuit",
|
||||
"version": "0.1.5",
|
||||
"version": "0.3.7",
|
||||
"description": "JavaScript implementation of circuit/switch relaying",
|
||||
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
|
||||
"main": "src/index.js",
|
||||
"files": [
|
||||
"src",
|
||||
"dist"
|
||||
],
|
||||
"scripts": {
|
||||
"lint": "aegir lint",
|
||||
"build": "aegir build",
|
||||
@ -15,9 +20,8 @@
|
||||
"coverage": "aegir coverage",
|
||||
"coverage-publish": "aegir coverage --provider coveralls"
|
||||
},
|
||||
"pre-commit": [
|
||||
"lint",
|
||||
"test"
|
||||
"pre-push": [
|
||||
"lint"
|
||||
],
|
||||
"repository": {
|
||||
"type": "git",
|
||||
@ -26,42 +30,48 @@
|
||||
"keywords": [
|
||||
"IPFS"
|
||||
],
|
||||
"author": "Dmitriy Ryajov <dryajov@gmail.com>",
|
||||
"license": "MIT",
|
||||
"bugs": {
|
||||
"url": "https://github.com/libp2p/js-libp2p-circuit/issues"
|
||||
},
|
||||
"homepage": "https://github.com/libp2p/js-libp2p-circuit#readme",
|
||||
"devDependencies": {
|
||||
"aegir": "^13.0.6",
|
||||
"chai": "^4.1.2",
|
||||
"dirty-chai": "^2.0.1",
|
||||
"pre-commit": "^1.2.2",
|
||||
"sinon": "^4.4.5"
|
||||
},
|
||||
"contributors": [
|
||||
"David Dias <daviddias.p@gmail.com>",
|
||||
"Dmitriy Ryajov <dryajov@gmail.com>",
|
||||
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
|
||||
"Hugo Dias <mail@hugodias.me>",
|
||||
"Jacob Heun <jacobheun@gmail.com>",
|
||||
"Jacob Heun <jake@andyet.net>",
|
||||
"Maciej Krüger <mkg20001@gmail.com>",
|
||||
"Oli Evans <oli@tableflip.io>",
|
||||
"Pedro Teixeira <i@pgte.me>",
|
||||
"Victor Bjelkholm <victorbjelkholm@gmail.com>"
|
||||
"Vasco Santos <vasco.santos@ua.pt>",
|
||||
"Victor Bjelkholm <victorbjelkholm@gmail.com>",
|
||||
"Yusef Napora <yusef@napora.org>",
|
||||
"dirkmc <dirk@mccormick.cx>"
|
||||
],
|
||||
"devDependencies": {
|
||||
"aegir": "^18.2.2",
|
||||
"chai": "^4.2.0",
|
||||
"dirty-chai": "^2.0.1",
|
||||
"libp2p": "~0.25.0",
|
||||
"libp2p-secio": "~0.11.1",
|
||||
"pull-protocol-buffers": "~0.1.2",
|
||||
"sinon": "^7.3.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"assert": "^1.4.1",
|
||||
"async": "^2.6.0",
|
||||
"debug": "^3.1.0",
|
||||
"interface-connection": "^0.3.2",
|
||||
"lodash": "^4.17.5",
|
||||
"mafmt": "^4.0.0",
|
||||
"multiaddr": "^3.0.2",
|
||||
"multistream-select": "^0.14.1",
|
||||
"peer-id": "^0.10.6",
|
||||
"peer-info": "^0.11.6",
|
||||
"async": "^2.6.2",
|
||||
"debug": "^4.1.1",
|
||||
"interface-connection": "~0.3.3",
|
||||
"mafmt": "^6.0.7",
|
||||
"multiaddr": "^6.0.6",
|
||||
"once": "^1.4.0",
|
||||
"peer-id": "~0.12.2",
|
||||
"peer-info": "~0.15.1",
|
||||
"protons": "^1.0.1",
|
||||
"pull-abortable": "^4.1.1",
|
||||
"pull-handshake": "^1.1.4",
|
||||
"pull-stream": "^3.6.2",
|
||||
"safe-buffer": "^5.1.1",
|
||||
"setimmediate": "^1.0.5"
|
||||
"pull-length-prefixed": "^1.3.2",
|
||||
"pull-pair": "^1.1.0",
|
||||
"pull-stream": "^3.6.9"
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,9 @@ class Circuit {
|
||||
|
||||
this.dialer = new CircuitDialer(swarm, options)
|
||||
|
||||
this.swarm.on('peer-mux-established', this.dialer.canHop.bind(this.dialer))
|
||||
this.swarm.on('peer-mux-established', (peerInfo) => {
|
||||
this.dialer.canHop(peerInfo)
|
||||
})
|
||||
this.swarm.on('peer-mux-closed', (peerInfo) => {
|
||||
this.dialer.relayPeers.delete(peerInfo.id.toB58String())
|
||||
})
|
||||
@ -64,7 +66,8 @@ class Circuit {
|
||||
.filter(segment => segment.length)
|
||||
|
||||
relaySegments.forEach((relaySegment) => {
|
||||
this.dialer._dialRelay(this.utils.peerInfoFromMa(multiaddr(relaySegment)))
|
||||
const ma = this.utils.peerInfoFromMa(multiaddr(relaySegment))
|
||||
this.dialer._dialRelay(ma)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -1,13 +1,15 @@
|
||||
'use strict'
|
||||
|
||||
const Connection = require('interface-connection').Connection
|
||||
const isFunction = require('lodash.isfunction')
|
||||
const multiaddr = require('multiaddr')
|
||||
const once = require('once')
|
||||
const PeerId = require('peer-id')
|
||||
const waterfall = require('async/waterfall')
|
||||
const setImmediate = require('async/setImmediate')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const Connection = require('interface-connection').Connection
|
||||
|
||||
const utilsFactory = require('./utils')
|
||||
const StreamHandler = require('./stream-handler')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit:dialer')
|
||||
@ -27,10 +29,26 @@ class Dialer {
|
||||
constructor (swarm, options) {
|
||||
this.swarm = swarm
|
||||
this.relayPeers = new Map()
|
||||
this.relayConns = new Map()
|
||||
this.options = options
|
||||
this.utils = utilsFactory(swarm)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper that returns a relay connection
|
||||
*
|
||||
* @param {*} relay
|
||||
* @param {*} callback
|
||||
* @returns {Function} - callback
|
||||
*/
|
||||
_dialRelayHelper (relay, callback) {
|
||||
if (this.relayConns.has(relay.id.toB58String())) {
|
||||
return callback(null, this.relayConns.get(relay.id.toB58String()))
|
||||
}
|
||||
|
||||
return this._dialRelay(relay, callback)
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*
|
||||
@ -38,10 +56,9 @@ class Dialer {
|
||||
* @param {Function} cb - a callback called once dialed
|
||||
* @returns {Connection} - the connection
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
dial (ma, cb) {
|
||||
cb = cb || (() => {})
|
||||
cb = cb || (() => { })
|
||||
const strMa = ma.toString()
|
||||
if (!strMa.includes('/p2p-circuit')) {
|
||||
log.err('invalid circuit address')
|
||||
@ -53,7 +70,11 @@ class Dialer {
|
||||
const peer = multiaddr(addr[1] || addr[0])
|
||||
|
||||
const dstConn = new Connection()
|
||||
setImmediate(this._dialPeer.bind(this), peer, relay, (err, conn) => {
|
||||
setImmediate(
|
||||
this._dialPeer.bind(this),
|
||||
peer,
|
||||
relay,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
@ -70,39 +91,41 @@ class Dialer {
|
||||
* Does the peer support the HOP protocol
|
||||
*
|
||||
* @param {PeerInfo} peer
|
||||
* @param {Function} cb
|
||||
* @returns {*}
|
||||
* @param {Function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
canHop (peer, cb) {
|
||||
cb = once(cb || (() => {}))
|
||||
canHop (peer, callback) {
|
||||
callback = once(callback || (() => { }))
|
||||
|
||||
if (!this.relayPeers.get(this.utils.getB58String(peer))) {
|
||||
let streamHandler
|
||||
this._dialRelayHelper(peer, (err, conn) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const sh = new StreamHandler(conn)
|
||||
waterfall([
|
||||
(wCb) => this._dialRelay(peer, wCb),
|
||||
(sh, wCb) => {
|
||||
streamHandler = sh
|
||||
wCb()
|
||||
},
|
||||
(wCb) => streamHandler.write(proto.CircuitRelay.encode({
|
||||
(cb) => sh.write(proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.CAN_HOP
|
||||
}), wCb),
|
||||
(wCb) => streamHandler.read(wCb),
|
||||
(msg, wCb) => {
|
||||
}), cb),
|
||||
(cb) => sh.read(cb)
|
||||
], (err, msg) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
|
||||
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
|
||||
return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
|
||||
const err = new Error(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
|
||||
log(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`)
|
||||
log('HOP supported adding as relay - %s', this.utils.getB58String(peer))
|
||||
this.relayPeers.set(this.utils.getB58String(peer), peer)
|
||||
wCb(null)
|
||||
}
|
||||
], cb)
|
||||
}
|
||||
|
||||
return cb(null)
|
||||
sh.close()
|
||||
callback()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -115,7 +138,7 @@ class Dialer {
|
||||
* @private
|
||||
*/
|
||||
_dialPeer (dstMa, relay, cb) {
|
||||
if (isFunction(relay)) {
|
||||
if (typeof relay === 'function') {
|
||||
cb = relay
|
||||
relay = null
|
||||
}
|
||||
@ -135,7 +158,10 @@ class Dialer {
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
return this._negotiateRelay(nextRelay, dstMa, (err, conn) => {
|
||||
return this._negotiateRelay(
|
||||
nextRelay,
|
||||
dstMa,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return next(relays.shift())
|
||||
@ -145,9 +171,12 @@ class Dialer {
|
||||
}
|
||||
next(relays.shift())
|
||||
} else {
|
||||
return this._negotiateRelay(relay, dstMa, (err, conn) => {
|
||||
return this._negotiateRelay(
|
||||
relay,
|
||||
dstMa,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err(`An error has occurred negotiating the relay connection`, err)
|
||||
log.err('An error has occurred negotiating the relay connection', err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
@ -169,23 +198,24 @@ class Dialer {
|
||||
*/
|
||||
_negotiateRelay (relay, dstMa, callback) {
|
||||
dstMa = multiaddr(dstMa)
|
||||
|
||||
relay = this.utils.peerInfoFromMa(relay)
|
||||
const srcMas = this.swarm._peerInfo.multiaddrs.toArray()
|
||||
let streamHandler
|
||||
this._dialRelayHelper(relay, (err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
let sh = new StreamHandler(conn)
|
||||
waterfall([
|
||||
(cb) => {
|
||||
if (relay instanceof Connection) {
|
||||
return cb(null, new StreamHandler(relay))
|
||||
log('negotiating relay for peer %s', dstMa.getPeerId())
|
||||
let dstPeerId
|
||||
try {
|
||||
dstPeerId = PeerId.createFromB58String(dstMa.getPeerId()).id
|
||||
} catch (err) {
|
||||
return cb(err)
|
||||
}
|
||||
return this._dialRelay(this.utils.peerInfoFromMa(relay), cb)
|
||||
},
|
||||
(sh, cb) => {
|
||||
streamHandler = sh
|
||||
cb(null)
|
||||
},
|
||||
(cb) => {
|
||||
log(`negotiating relay for peer ${dstMa.getPeerId()}`)
|
||||
streamHandler.write(
|
||||
sh.write(
|
||||
proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -193,26 +223,29 @@ class Dialer {
|
||||
addrs: srcMas.map((addr) => addr.buffer)
|
||||
},
|
||||
dstPeer: {
|
||||
id: PeerId.createFromB58String(dstMa.getPeerId()).id,
|
||||
id: dstPeerId,
|
||||
addrs: [dstMa.buffer]
|
||||
}
|
||||
}), cb)
|
||||
},
|
||||
(cb) => streamHandler.read(cb),
|
||||
(msg, cb) => {
|
||||
(cb) => sh.read(cb)
|
||||
], (err, msg) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const message = proto.CircuitRelay.decode(msg)
|
||||
if (message.type !== proto.CircuitRelay.Type.STATUS) {
|
||||
return cb(new Error(`Got invalid message type - ` +
|
||||
return callback(new Error(`Got invalid message type - ` +
|
||||
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
|
||||
}
|
||||
|
||||
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
|
||||
return cb(new Error(`Got ${message.code} error code trying to dial over relay`))
|
||||
return callback(new Error(`Got ${message.code} error code trying to dial over relay`))
|
||||
}
|
||||
|
||||
cb(null, new Connection(streamHandler.rest()))
|
||||
}
|
||||
], callback)
|
||||
callback(null, new Connection(sh.rest()))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -220,18 +253,21 @@ class Dialer {
|
||||
*
|
||||
* @param {PeerInfo} peer - the PeerInfo of the relay peer
|
||||
* @param {Function} cb - a callback with the connection to the relay peer
|
||||
* @returns {Function|void}
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_dialRelay (peer, cb) {
|
||||
cb = once(cb || (() => {}))
|
||||
cb = once(cb || (() => { }))
|
||||
|
||||
this.swarm.dial(peer, multicodec.relay, once((err, conn) => {
|
||||
this.swarm.dial(
|
||||
peer,
|
||||
multicodec.relay,
|
||||
once((err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
cb(null, new StreamHandler(conn))
|
||||
cb(null, conn)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
require('setimmediate')
|
||||
require('safe-buffer')
|
||||
|
||||
const pull = require('pull-stream')
|
||||
const pull = require('pull-stream/pull')
|
||||
const debug = require('debug')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
@ -11,14 +8,16 @@ const EE = require('events').EventEmitter
|
||||
const once = require('once')
|
||||
const utilsFactory = require('./utils')
|
||||
const StreamHandler = require('./stream-handler')
|
||||
const assignInWith = require('lodash/assignInWith')
|
||||
const proto = require('../protocol')
|
||||
const proto = require('../protocol').CircuitRelay
|
||||
const multiaddr = require('multiaddr')
|
||||
const series = require('async/series')
|
||||
const waterfall = require('async/waterfall')
|
||||
const setImmediate = require('async/setImmediate')
|
||||
|
||||
const multicodec = require('./../multicodec')
|
||||
|
||||
const log = debug('libp2p:swarm:circuit:relay')
|
||||
log.err = debug('libp2p:swarm:circuit:error:relay')
|
||||
const log = debug('libp2p:circuit:relay')
|
||||
log.err = debug('libp2p:circuit:error:relay')
|
||||
|
||||
class Hop extends EE {
|
||||
/**
|
||||
@ -36,14 +35,7 @@ class Hop extends EE {
|
||||
this.swarm = swarm
|
||||
this.peerInfo = this.swarm._peerInfo
|
||||
this.utils = utilsFactory(swarm)
|
||||
this.config = assignInWith(
|
||||
{
|
||||
active: false,
|
||||
enabled: false
|
||||
},
|
||||
options,
|
||||
(orig, src) => typeof src === 'undefined' ? false : src)
|
||||
|
||||
this.config = options || { active: false, enabled: false }
|
||||
this.active = this.config.active
|
||||
}
|
||||
|
||||
@ -51,127 +43,218 @@ class Hop extends EE {
|
||||
* Handle the relay message
|
||||
*
|
||||
* @param {CircuitRelay} message
|
||||
* @param {StreamHandler} streamHandler
|
||||
* @param {StreamHandler} sh
|
||||
* @returns {*}
|
||||
*/
|
||||
handle (message, streamHandler) {
|
||||
handle (message, sh) {
|
||||
if (!this.config.enabled) {
|
||||
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY)
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_CANT_SPEAK_RELAY)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
// check if message is `CAN_HOP`
|
||||
if (message.type === proto.CircuitRelay.Type.CAN_HOP) {
|
||||
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.SUCCESS)
|
||||
if (message.type === proto.Type.CAN_HOP) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.SUCCESS)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
// This is a relay request - validate and create a circuit
|
||||
const srcPeerId = PeerId.createFromBytes(message.dstPeer.id)
|
||||
if (srcPeerId.toB58String() === this.peerInfo.id.toB58String()) {
|
||||
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF)
|
||||
let srcPeerId = null
|
||||
let dstPeerId = null
|
||||
try {
|
||||
srcPeerId = PeerId.createFromBytes(message.srcPeer.id).toB58String()
|
||||
dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String()
|
||||
} catch (err) {
|
||||
log.err(err)
|
||||
|
||||
if (!srcPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_SRC_MULTIADDR_INVALID)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
if (!dstPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_DST_MULTIADDR_INVALID)
|
||||
return sh.close()
|
||||
}
|
||||
}
|
||||
|
||||
if (srcPeerId === dstPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_CANT_RELAY_TO_SELF)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
const dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String()
|
||||
if (!message.dstPeer.addrs.length) {
|
||||
// TODO: use encapsulate here
|
||||
const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer
|
||||
message.dstPeer.addrs.push(addr)
|
||||
}
|
||||
|
||||
this.utils.validateAddrs(message, streamHandler, proto.CircuitRelay.Type.HOP, (err) => {
|
||||
if (err) {
|
||||
return log(err)
|
||||
log('trying to establish a circuit: %s <-> %s', srcPeerId, dstPeerId)
|
||||
const noPeer = () => {
|
||||
// log.err(err)
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_NO_CONN_TO_DST)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
const isConnected = (cb) => {
|
||||
let dstPeer
|
||||
try {
|
||||
dstPeer = this.swarm._peerBook.get(dstPeerId)
|
||||
if (!dstPeer.isConnected() && !this.active) {
|
||||
throw new Error('No Connection to peer')
|
||||
const err = new Error(`No Connection to peer ${dstPeerId}`)
|
||||
noPeer(err)
|
||||
return cb(err)
|
||||
}
|
||||
} catch (err) {
|
||||
if (!this.active) {
|
||||
log.err(err)
|
||||
setImmediate(() => this.emit('circuit:error', err))
|
||||
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
|
||||
noPeer(err)
|
||||
return cb(err)
|
||||
}
|
||||
}
|
||||
cb()
|
||||
}
|
||||
|
||||
return this._circuit(streamHandler.rest(), message, (err) => {
|
||||
series([
|
||||
(cb) => this.utils.validateAddrs(message, sh, proto.Type.HOP, cb),
|
||||
(cb) => isConnected(cb),
|
||||
(cb) => this._circuit(sh, message, cb)
|
||||
], (err) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
setImmediate(() => this.emit('circuit:error', err))
|
||||
sh.close()
|
||||
return setImmediate(() => this.emit('circuit:error', err))
|
||||
}
|
||||
setImmediate(() => this.emit('circuit:success'))
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to STOP
|
||||
*
|
||||
* @param {PeerInfo} peer
|
||||
* @param {StreamHandler} srcSh
|
||||
* @param {function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
_connectToStop (peer, srcSh, callback) {
|
||||
this._dialPeer(peer, (err, dstConn) => {
|
||||
if (err) {
|
||||
this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.HOP_CANT_DIAL_DST)
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
return this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.SUCCESS,
|
||||
(err) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
return callback(null, dstConn)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Negotiate STOP
|
||||
*
|
||||
* @param {StreamHandler} dstSh
|
||||
* @param {StreamHandler} srcSh
|
||||
* @param {CircuitRelay} message
|
||||
* @param {function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
_negotiateStop (dstSh, srcSh, message, callback) {
|
||||
const stopMsg = Object.assign({}, message, {
|
||||
type: proto.Type.STOP // change the message type
|
||||
})
|
||||
dstSh.write(proto.encode(stopMsg),
|
||||
(err) => {
|
||||
if (err) {
|
||||
this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.HOP_CANT_OPEN_DST_STREAM)
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
// read response from STOP
|
||||
dstSh.read((err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const message = proto.decode(msg)
|
||||
if (message.code !== proto.Status.SUCCESS) {
|
||||
return callback(new Error(`Unable to create circuit!`))
|
||||
}
|
||||
|
||||
return callback(null, msg)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to make a circuit from A <-> R <-> B where R is this relay
|
||||
*
|
||||
* @param {Connection} conn - the source connection
|
||||
* @param {StreamHandler} srcSh - the source stream handler
|
||||
* @param {CircuitRelay} message - the message with the src and dst entries
|
||||
* @param {Function} cb - callback to signal success or failure
|
||||
* @param {Function} callback - callback to signal success or failure
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_circuit (conn, message, cb) {
|
||||
this._dialPeer(message.dstPeer, (err, dstConn) => {
|
||||
const srcStreamHandler = new StreamHandler(conn)
|
||||
_circuit (srcSh, message, callback) {
|
||||
let dstSh = null
|
||||
waterfall([
|
||||
(cb) => this._connectToStop(message.dstPeer, srcSh, cb),
|
||||
(_dstConn, cb) => {
|
||||
dstSh = new StreamHandler(_dstConn)
|
||||
this._negotiateStop(dstSh, srcSh, message, cb)
|
||||
}
|
||||
], (err) => {
|
||||
if (err) {
|
||||
this.utils.writeResponse(srcStreamHandler, proto.CircuitRelay.Status.HOP_CANT_DIAL_DST)
|
||||
pull(pull.empty(), srcStreamHandler.rest())
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
// close/end the source stream if there was an error
|
||||
if (srcSh) {
|
||||
srcSh.close()
|
||||
}
|
||||
|
||||
return this.utils.writeResponse(srcStreamHandler, proto.CircuitRelay.Status.SUCCESS, (err) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
if (dstSh) {
|
||||
dstSh.close()
|
||||
}
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const streamHandler = new StreamHandler(dstConn)
|
||||
const stopMsg = Object.assign({}, message, {
|
||||
type: proto.CircuitRelay.Type.STOP // change the message type
|
||||
})
|
||||
streamHandler.write(proto.CircuitRelay.encode(stopMsg), (err) => {
|
||||
if (err) {
|
||||
const errStreamHandler = new StreamHandler(conn)
|
||||
this.utils.writeResponse(errStreamHandler, proto.CircuitRelay.Status.HOP_CANT_OPEN_DST_STREAM)
|
||||
pull(pull.empty(), errStreamHandler.rest())
|
||||
const src = srcSh.rest()
|
||||
const dst = dstSh.rest()
|
||||
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
const srcIdStr = PeerId.createFromBytes(message.srcPeer.id).toB58String()
|
||||
const dstIdStr = PeerId.createFromBytes(message.dstPeer.id).toB58String()
|
||||
|
||||
streamHandler.read((err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
const message = proto.CircuitRelay.decode(msg)
|
||||
const srcConn = srcStreamHandler.rest()
|
||||
if (message.code === proto.CircuitRelay.Status.SUCCESS) {
|
||||
// circuit the src and dst streams
|
||||
pull(
|
||||
srcConn,
|
||||
streamHandler.rest(),
|
||||
srcConn
|
||||
src,
|
||||
dst,
|
||||
src
|
||||
)
|
||||
|
||||
cb()
|
||||
} else {
|
||||
// close/end the source stream if there was an error
|
||||
pull(
|
||||
pull.empty(),
|
||||
srcConn
|
||||
)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
log('circuit %s <-> %s established', srcIdStr, dstIdStr)
|
||||
callback()
|
||||
})
|
||||
}
|
||||
|
||||
@ -180,7 +263,7 @@ class Hop extends EE {
|
||||
*
|
||||
* @param {Multiaddr} dstPeer
|
||||
* @param {Function} callback
|
||||
* @returns {Function|void}
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_dialPeer (dstPeer, callback) {
|
||||
|
@ -6,8 +6,7 @@ const EE = require('events').EventEmitter
|
||||
const Connection = require('interface-connection').Connection
|
||||
const utilsFactory = require('./utils')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const proto = require('../protocol')
|
||||
const proto = require('../protocol').CircuitRelay
|
||||
const series = require('async/series')
|
||||
|
||||
const debug = require('debug')
|
||||
@ -22,21 +21,31 @@ class Stop extends EE {
|
||||
this.utils = utilsFactory(swarm)
|
||||
}
|
||||
|
||||
handle (message, streamHandler, callback) {
|
||||
/**
|
||||
* Handle the incoming STOP message
|
||||
*
|
||||
* @param {{}} msg - the parsed protobuf message
|
||||
* @param {StreamHandler} sh - the stream handler wrapped connection
|
||||
* @param {Function} callback - callback
|
||||
* @returns {undefined}
|
||||
*/
|
||||
handle (msg, sh, callback) {
|
||||
callback = callback || (() => {})
|
||||
|
||||
series([
|
||||
(cb) => this.utils.validateAddrs(message, streamHandler, proto.CircuitRelay.Type.STOP, cb),
|
||||
(cb) => this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.Success, cb)
|
||||
(cb) => this.utils.validateAddrs(msg, sh, proto.Type.STOP, cb),
|
||||
(cb) => this.utils.writeResponse(sh, proto.Status.Success, cb)
|
||||
], (err) => {
|
||||
if (err) {
|
||||
callback() // we don't return the error here, since multistream select don't expect one
|
||||
// we don't return the error here,
|
||||
// since multistream select don't expect one
|
||||
callback()
|
||||
return log(err)
|
||||
}
|
||||
|
||||
const peerInfo = new PeerInfo(peerIdFromId(message.srcPeer.id))
|
||||
message.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr))
|
||||
const newConn = new Connection(streamHandler.rest())
|
||||
const peerInfo = new PeerInfo(this.utils.peerIdFromId(msg.srcPeer.id))
|
||||
msg.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr))
|
||||
const newConn = new Connection(sh.rest())
|
||||
newConn.setPeerInfo(peerInfo)
|
||||
setImmediate(() => this.emit('connection', newConn))
|
||||
callback(newConn)
|
||||
@ -45,11 +54,3 @@ class Stop extends EE {
|
||||
}
|
||||
|
||||
module.exports = Stop
|
||||
|
||||
function peerIdFromId (id) {
|
||||
if (typeof id === 'string') {
|
||||
return PeerId.createFromB58String(id)
|
||||
}
|
||||
|
||||
return PeerId.createFromBytes(id)
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
'use strict'
|
||||
|
||||
const pull = require('pull-stream')
|
||||
const values = require('pull-stream/sources/values')
|
||||
const collect = require('pull-stream/sinks/collect')
|
||||
const empty = require('pull-stream/sources/empty')
|
||||
const pull = require('pull-stream/pull')
|
||||
const lp = require('pull-length-prefixed')
|
||||
const handshake = require('pull-handshake')
|
||||
|
||||
@ -28,7 +31,7 @@ class StreamHandler {
|
||||
this.timeout = timeout || 1000 * 60
|
||||
}
|
||||
|
||||
this.stream = handshake({timeout: this.timeout}, cb)
|
||||
this.stream = handshake({ timeout: this.timeout }, cb)
|
||||
this.shake = this.stream.handshake
|
||||
|
||||
pull(this.stream, conn, this.stream)
|
||||
@ -46,10 +49,13 @@ class StreamHandler {
|
||||
*/
|
||||
read (cb) {
|
||||
if (!this.isValid()) {
|
||||
cb(new Error(`handler is not in a valid state`))
|
||||
return cb(new Error(`handler is not in a valid state`))
|
||||
}
|
||||
|
||||
lp.decodeFromReader(this.shake, {maxLength: this.maxLength}, (err, msg) => {
|
||||
lp.decodeFromReader(
|
||||
this.shake,
|
||||
{ maxLength: this.maxLength },
|
||||
(err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
// this.shake.abort(err)
|
||||
@ -71,13 +77,13 @@ class StreamHandler {
|
||||
cb = cb || (() => {})
|
||||
|
||||
if (!this.isValid()) {
|
||||
cb(new Error(`handler is not in a valid state`))
|
||||
return cb(new Error(`handler is not in a valid state`))
|
||||
}
|
||||
|
||||
pull(
|
||||
pull.values([msg]),
|
||||
values([msg]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
collect((err, encoded) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
this.shake.abort(err)
|
||||
@ -112,6 +118,23 @@ class StreamHandler {
|
||||
this.shake = null
|
||||
return rest
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the stream
|
||||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
close () {
|
||||
if (!this.isValid()) {
|
||||
return
|
||||
}
|
||||
|
||||
// close stream
|
||||
pull(
|
||||
empty(),
|
||||
this.rest()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = StreamHandler
|
||||
|
@ -119,11 +119,20 @@ module.exports = function (swarm) {
|
||||
return cb(null)
|
||||
}
|
||||
|
||||
function peerIdFromId (id) {
|
||||
if (typeof id === 'string') {
|
||||
return PeerId.createFromB58String(id)
|
||||
}
|
||||
|
||||
return PeerId.createFromBytes(id)
|
||||
}
|
||||
|
||||
return {
|
||||
getB58String: getB58String,
|
||||
peerInfoFromMa: peerInfoFromMa,
|
||||
isPeerConnected: isPeerConnected,
|
||||
validateAddrs: validateAddrs,
|
||||
writeResponse: writeResponse
|
||||
getB58String,
|
||||
peerInfoFromMa,
|
||||
isPeerConnected,
|
||||
validateAddrs,
|
||||
writeResponse,
|
||||
peerIdFromId
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ module.exports = (swarm, options, connHandler) => {
|
||||
const utils = utilsFactory(swarm)
|
||||
|
||||
listener.stopHandler = new Stop(swarm)
|
||||
listener.stopHandler.on('connection', (conn) => listener.emit('connection', conn))
|
||||
listener.hopHandler = new Hop(swarm, options.hop)
|
||||
|
||||
/**
|
||||
@ -35,10 +36,10 @@ module.exports = (swarm, options, connHandler) => {
|
||||
listener.listen = (ma, callback) => {
|
||||
callback = callback || (() => {})
|
||||
|
||||
swarm.handle(multicodec.relay, (relayProto, conn) => {
|
||||
const streamHandler = new StreamHandler(conn)
|
||||
swarm.handle(multicodec.relay, (_, conn) => {
|
||||
const sh = new StreamHandler(conn)
|
||||
|
||||
streamHandler.read((err, msg) => {
|
||||
sh.read((err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return
|
||||
@ -48,21 +49,26 @@ module.exports = (swarm, options, connHandler) => {
|
||||
try {
|
||||
request = proto.CircuitRelay.decode(msg)
|
||||
} catch (err) {
|
||||
return utils.writeResponse(streamHandler, proto.CircuitRelay.Status.MALFORMED_MESSAGE)
|
||||
return utils.writeResponse(
|
||||
sh,
|
||||
proto.CircuitRelay.Status.MALFORMED_MESSAGE)
|
||||
}
|
||||
|
||||
switch (request.type) {
|
||||
case proto.CircuitRelay.Type.CAN_HOP:
|
||||
case proto.CircuitRelay.Type.HOP: {
|
||||
return listener.hopHandler.handle(request, streamHandler)
|
||||
return listener.hopHandler.handle(request, sh)
|
||||
}
|
||||
|
||||
case proto.CircuitRelay.Type.STOP: {
|
||||
return listener.stopHandler.handle(request, streamHandler, connHandler)
|
||||
return listener.stopHandler.handle(request, sh, connHandler)
|
||||
}
|
||||
|
||||
default: {
|
||||
return utils.writeResponse(streamHandler, proto.CircuitRelay.Status.INVALID_MSG_TYPE)
|
||||
utils.writeResponse(
|
||||
sh,
|
||||
proto.CircuitRelay.Status.INVALID_MSG_TYPE)
|
||||
return sh.close()
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -79,7 +85,7 @@ module.exports = (swarm, options, connHandler) => {
|
||||
* @return {void}
|
||||
*/
|
||||
listener.close = (cb) => {
|
||||
swarm.unhandle(multicodec.stop)
|
||||
swarm.unhandle(multicodec.relay)
|
||||
setImmediate(() => listener.emit('close'))
|
||||
cb()
|
||||
}
|
||||
@ -128,7 +134,8 @@ module.exports = (swarm, options, connHandler) => {
|
||||
// by default we're reachable over any relay
|
||||
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(addr))
|
||||
} else {
|
||||
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(`${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`))
|
||||
const ma = `${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`
|
||||
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(ma))
|
||||
}
|
||||
} else {
|
||||
listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`))
|
||||
|
@ -1,4 +1,44 @@
|
||||
'use strict'
|
||||
|
||||
const protobuf = require('protons')
|
||||
module.exports = protobuf(require('./proto.js'))
|
||||
module.exports = protobuf(`
|
||||
message CircuitRelay {
|
||||
|
||||
enum Status {
|
||||
SUCCESS = 100;
|
||||
HOP_SRC_ADDR_TOO_LONG = 220;
|
||||
HOP_DST_ADDR_TOO_LONG = 221;
|
||||
HOP_SRC_MULTIADDR_INVALID = 250;
|
||||
HOP_DST_MULTIADDR_INVALID = 251;
|
||||
HOP_NO_CONN_TO_DST = 260;
|
||||
HOP_CANT_DIAL_DST = 261;
|
||||
HOP_CANT_OPEN_DST_STREAM = 262;
|
||||
HOP_CANT_SPEAK_RELAY = 270;
|
||||
HOP_CANT_RELAY_TO_SELF = 280;
|
||||
STOP_SRC_ADDR_TOO_LONG = 320;
|
||||
STOP_DST_ADDR_TOO_LONG = 321;
|
||||
STOP_SRC_MULTIADDR_INVALID = 350;
|
||||
STOP_DST_MULTIADDR_INVALID = 351;
|
||||
STOP_RELAY_REFUSED = 390;
|
||||
MALFORMED_MESSAGE = 400;
|
||||
}
|
||||
|
||||
enum Type { // RPC identifier, either HOP, STOP or STATUS
|
||||
HOP = 1;
|
||||
STOP = 2;
|
||||
STATUS = 3;
|
||||
CAN_HOP = 4;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
required bytes id = 1; // peer id
|
||||
repeated bytes addrs = 2; // peer's known addresses
|
||||
}
|
||||
|
||||
optional Type type = 1; // Type of the message
|
||||
|
||||
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STATUS
|
||||
optional Peer dstPeer = 3;
|
||||
|
||||
optional Status code = 4; // Status code, used when Type is STATUS
|
||||
}
|
||||
`)
|
||||
|
@ -1,43 +0,0 @@
|
||||
'use strict'
|
||||
module.exports = `
|
||||
message CircuitRelay {
|
||||
|
||||
enum Status {
|
||||
SUCCESS = 100;
|
||||
HOP_SRC_ADDR_TOO_LONG = 220;
|
||||
HOP_DST_ADDR_TOO_LONG = 221;
|
||||
HOP_SRC_MULTIADDR_INVALID = 250;
|
||||
HOP_DST_MULTIADDR_INVALID = 251;
|
||||
HOP_NO_CONN_TO_DST = 260;
|
||||
HOP_CANT_DIAL_DST = 261;
|
||||
HOP_CANT_OPEN_DST_STREAM = 262;
|
||||
HOP_CANT_SPEAK_RELAY = 270;
|
||||
HOP_CANT_RELAY_TO_SELF = 280;
|
||||
STOP_SRC_ADDR_TOO_LONG = 320;
|
||||
STOP_DST_ADDR_TOO_LONG = 321;
|
||||
STOP_SRC_MULTIADDR_INVALID = 350;
|
||||
STOP_DST_MULTIADDR_INVALID = 351;
|
||||
STOP_RELAY_REFUSED = 390;
|
||||
MALFORMED_MESSAGE = 400;
|
||||
}
|
||||
|
||||
enum Type { // RPC identifier, either HOP, STOP or STATUS
|
||||
HOP = 1;
|
||||
STOP = 2;
|
||||
STATUS = 3;
|
||||
CAN_HOP = 4;
|
||||
}
|
||||
|
||||
message Peer {
|
||||
required bytes id = 1; // peer id
|
||||
repeated bytes addrs = 2; // peer's known addresses
|
||||
}
|
||||
|
||||
optional Type type = 1; // Type of the message
|
||||
|
||||
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STATUS
|
||||
optional Peer dstPeer = 3;
|
||||
|
||||
optional Status code = 4; // Status code, used when Type is STATUS
|
||||
}
|
||||
`
|
@ -7,14 +7,16 @@ const Dialer = require('../src/circuit/dialer')
|
||||
const nodes = require('./fixtures/nodes')
|
||||
const Connection = require('interface-connection').Connection
|
||||
const multiaddr = require('multiaddr')
|
||||
const handshake = require('pull-handshake')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const waterfall = require('async/waterfall')
|
||||
const pull = require('pull-stream')
|
||||
const lp = require('pull-length-prefixed')
|
||||
const pull = require('pull-stream/pull')
|
||||
const values = require('pull-stream/sources/values')
|
||||
const asyncMap = require('pull-stream/throughs/async-map')
|
||||
const pair = require('pull-pair/duplex')
|
||||
const pb = require('pull-protocol-buffers')
|
||||
|
||||
const proto = require('../src/protocol')
|
||||
const StreamHandler = require('../src/circuit/stream-handler')
|
||||
const utilsFactory = require('../src/circuit/utils')
|
||||
|
||||
const sinon = require('sinon')
|
||||
@ -52,7 +54,7 @@ describe(`dialer tests`, function () {
|
||||
})
|
||||
|
||||
dialer.dial(dstMa, (err, conn) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(conn).to.be.an.instanceOf(Connection)
|
||||
done()
|
||||
})
|
||||
@ -66,7 +68,7 @@ describe(`dialer tests`, function () {
|
||||
})
|
||||
|
||||
dialer.dial(dstMa, (err, conn) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(conn).to.be.an.instanceOf(Connection)
|
||||
done()
|
||||
})
|
||||
@ -76,66 +78,62 @@ describe(`dialer tests`, function () {
|
||||
describe(`.canHop`, function () {
|
||||
const dialer = sinon.createStubInstance(Dialer)
|
||||
|
||||
let stream = null
|
||||
let shake = null
|
||||
let fromConn = null
|
||||
let peer = new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA'))
|
||||
|
||||
let p = null
|
||||
beforeEach(function () {
|
||||
stream = handshake({ timeout: 1000 * 60 })
|
||||
shake = stream.handshake
|
||||
fromConn = new Connection(stream)
|
||||
p = pair()
|
||||
fromConn = new Connection(p[0])
|
||||
|
||||
dialer.relayPeers = new Map()
|
||||
dialer.relayConns = new Map()
|
||||
dialer.utils = utilsFactory({})
|
||||
dialer.canHop.callThrough()
|
||||
dialer._dialRelayHelper.callThrough()
|
||||
})
|
||||
|
||||
afterEach(function () {
|
||||
dialer._dialRelay.reset()
|
||||
})
|
||||
|
||||
it(`should handle successful CAN_HOP`, function () {
|
||||
it(`should handle successful CAN_HOP`, (done) => {
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode({
|
||||
values([{
|
||||
type: proto.CircuitRelay.type.HOP,
|
||||
code: proto.CircuitRelay.Status.SUCCESS
|
||||
})]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
dialer._dialRelay.callsFake((peer, cb) => {
|
||||
cb(null, new StreamHandler(fromConn))
|
||||
})
|
||||
})
|
||||
}]),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[1]
|
||||
)
|
||||
cb(null, fromConn)
|
||||
})
|
||||
|
||||
dialer.canHop(peer, (err) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(dialer.relayPeers.has(peer.id.toB58String())).to.be.ok()
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it(`should handle failed CAN_HOP`, function () {
|
||||
it(`should handle failed CAN_HOP`, function (done) {
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode({
|
||||
values([{
|
||||
type: proto.CircuitRelay.type.HOP,
|
||||
code: proto.CircuitRelay.Status.HOP_CANT_SPEAK_RELAY
|
||||
})]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
dialer._dialRelay.callsFake((peer, cb) => {
|
||||
cb(null, new StreamHandler(fromConn))
|
||||
})
|
||||
})
|
||||
}]),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[1]
|
||||
)
|
||||
cb(null, fromConn)
|
||||
})
|
||||
|
||||
dialer.canHop(peer, (err) => {
|
||||
expect(err).to.be.null()
|
||||
expect(dialer.relayPeers.has(peer.id.toB58String())).to.not.be.ok()
|
||||
expect(err).to.exist()
|
||||
expect(dialer.relayPeers.has(peer.id.toB58String())).not.to.be.ok()
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
@ -166,7 +164,7 @@ describe(`dialer tests`, function () {
|
||||
})
|
||||
|
||||
dialer._dialPeer(dstMa, (err, conn) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(conn).to.be.an.instanceOf(Connection)
|
||||
expect(conn).to.deep.equal(dialer.relayPeers.get(nodes.node3.id))
|
||||
done()
|
||||
@ -192,9 +190,9 @@ describe(`dialer tests`, function () {
|
||||
const dialer = sinon.createStubInstance(Dialer)
|
||||
const dstMa = multiaddr(`/ipfs/${nodes.node4.id}`)
|
||||
|
||||
let conn
|
||||
let stream
|
||||
let shake
|
||||
let conn = null
|
||||
let peer = null
|
||||
let p = null
|
||||
let callback = sinon.stub()
|
||||
|
||||
beforeEach(function (done) {
|
||||
@ -209,14 +207,13 @@ describe(`dialer tests`, function () {
|
||||
cb()
|
||||
},
|
||||
(cb) => {
|
||||
dialer.utils = utilsFactory({})
|
||||
dialer.relayConns = new Map()
|
||||
dialer._negotiateRelay.callThrough()
|
||||
stream = handshake({ timeout: 1000 * 60 })
|
||||
shake = stream.handshake
|
||||
conn = new Connection()
|
||||
conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`)))
|
||||
conn.setInnerConn(stream)
|
||||
dialer._negotiateRelay(conn, dstMa, callback)
|
||||
dialer._dialRelayHelper.callThrough()
|
||||
peer = new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`))
|
||||
p = pair()
|
||||
conn = new Connection(p[1])
|
||||
cb()
|
||||
}
|
||||
], done)
|
||||
@ -227,42 +224,96 @@ describe(`dialer tests`, function () {
|
||||
})
|
||||
|
||||
it(`should write the correct dst addr`, function (done) {
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
shake.write(proto.CircuitRelay.encode({
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
pull(
|
||||
p[0],
|
||||
pb.decode(proto.CircuitRelay),
|
||||
asyncMap((msg, cb) => {
|
||||
expect(msg.dstPeer.addrs[0]).to.deep.equal(dstMa.buffer)
|
||||
cb(null, {
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.SUCCESS
|
||||
}))
|
||||
expect(err).to.be.null()
|
||||
expect(proto.CircuitRelay.decode(msg).dstPeer.addrs[0]).to.deep.equal(dstMa.buffer)
|
||||
})
|
||||
}),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[0]
|
||||
)
|
||||
cb(null, conn)
|
||||
})
|
||||
|
||||
dialer._negotiateRelay(peer, dstMa, done)
|
||||
})
|
||||
|
||||
it(`should negotiate relay`, function (done) {
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
pull(
|
||||
p[0],
|
||||
pb.decode(proto.CircuitRelay),
|
||||
asyncMap((msg, cb) => {
|
||||
expect(msg.dstPeer.addrs[0]).to.deep.equal(dstMa.buffer)
|
||||
cb(null, {
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.SUCCESS
|
||||
})
|
||||
}),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[0]
|
||||
)
|
||||
cb(null, conn)
|
||||
})
|
||||
|
||||
dialer._negotiateRelay(peer, dstMa, (err, conn) => {
|
||||
expect(err).to.not.exist()
|
||||
expect(conn).to.be.instanceOf(Connection)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it(`should fail with an invalid peer id`, function (done) {
|
||||
const dstMa = multiaddr('/ip4/127.0.0.1/tcp/4001')
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
pull(
|
||||
p[0],
|
||||
pb.decode(proto.CircuitRelay),
|
||||
asyncMap((msg, cb) => {
|
||||
expect(msg.dstPeer.addrs[0]).to.deep.equal(dstMa.buffer)
|
||||
cb(null, {
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.SUCCESS
|
||||
})
|
||||
}),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[0]
|
||||
)
|
||||
cb(null, conn)
|
||||
})
|
||||
|
||||
dialer._negotiateRelay(peer, dstMa, (err, conn) => {
|
||||
expect(err).to.exist()
|
||||
expect(conn).to.not.exist()
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it(`should handle failed relay negotiation`, function (done) {
|
||||
callback.callsFake((err, msg) => {
|
||||
dialer._dialRelay.callsFake((_, cb) => {
|
||||
cb(null, conn)
|
||||
pull(
|
||||
values([{
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.MALFORMED_MESSAGE
|
||||
}]),
|
||||
pb.encode(proto.CircuitRelay),
|
||||
p[0]
|
||||
)
|
||||
})
|
||||
|
||||
dialer._negotiateRelay(peer, dstMa, (err, conn) => {
|
||||
expect(err).to.not.be.null()
|
||||
expect(err).to.be.an.instanceOf(Error)
|
||||
expect(err.message).to.be.equal(`Got 400 error code trying to dial over relay`)
|
||||
expect(callback.calledOnce).to.be.ok()
|
||||
done()
|
||||
})
|
||||
|
||||
// send failed message
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
if (err) return done(err)
|
||||
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.MALFORMED_MESSAGE
|
||||
})]), // send arbitrary non 200 code
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -4,7 +4,6 @@ const TestNode = require('./test-node')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const eachAsync = require('async/each')
|
||||
const pull = require('pull-stream')
|
||||
|
||||
exports.createNodes = function createNodes (configNodes, callback) {
|
||||
const nodes = {}
|
||||
@ -77,34 +76,3 @@ exports.stopNodes = function stopNodes (nodes, callback) {
|
||||
callback()
|
||||
})
|
||||
}
|
||||
|
||||
function reverse (protocol, conn) {
|
||||
pull(
|
||||
conn,
|
||||
pull.map((data) => {
|
||||
return data.toString().split('').reverse().join('')
|
||||
}),
|
||||
conn
|
||||
)
|
||||
}
|
||||
|
||||
exports.dialAndReverse = function dialAndRevers (srcNode, dstNode, vals, done) {
|
||||
dstNode.handle('/ipfs/reverse/1.0.0', reverse)
|
||||
|
||||
srcNode.dial(dstNode.peerInfo, '/ipfs/reverse/1.0.0', (err, conn) => {
|
||||
if (err) return done(err)
|
||||
|
||||
pull(
|
||||
pull.values(vals),
|
||||
conn,
|
||||
pull.collect((err, data) => {
|
||||
if (err) return done(err)
|
||||
|
||||
let reversed = data.map((val, i) => {
|
||||
return val.toString()
|
||||
})
|
||||
|
||||
srcNode.hangUp(srcNode.peerInfo, () => done(null, reversed))
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
243
test/hop.spec.js
243
test/hop.spec.js
@ -1,4 +1,5 @@
|
||||
/* eslint-env mocha */
|
||||
/* eslint max-nested-callbacks: ["error", 5] */
|
||||
'use strict'
|
||||
|
||||
const Hop = require('../src/circuit/hop')
|
||||
@ -9,6 +10,9 @@ const waterfall = require('async/waterfall')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
const pull = require('pull-stream/pull')
|
||||
const values = require('pull-stream/sources/values')
|
||||
const collect = require('pull-stream/sinks/collect')
|
||||
const lp = require('pull-length-prefixed')
|
||||
const proto = require('../src/protocol')
|
||||
const StreamHandler = require('../src/circuit/stream-handler')
|
||||
@ -19,19 +23,20 @@ const dirtyChai = require('dirty-chai')
|
||||
const expect = chai.expect
|
||||
chai.use(dirtyChai)
|
||||
|
||||
describe('relay', function () {
|
||||
describe(`should handle circuit requests`, function () {
|
||||
describe('relay', () => {
|
||||
describe(`.handle`, () => {
|
||||
let relay
|
||||
let swarm
|
||||
let fromConn
|
||||
let stream
|
||||
let shake
|
||||
|
||||
beforeEach(function (done) {
|
||||
beforeEach((done) => {
|
||||
stream = handshake({ timeout: 1000 * 60 })
|
||||
shake = stream.handshake
|
||||
fromConn = new Connection(stream)
|
||||
fromConn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA')))
|
||||
const peerInfo = new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA'))
|
||||
fromConn.setPeerInfo(peerInfo)
|
||||
|
||||
let peers = {
|
||||
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE:
|
||||
@ -72,7 +77,7 @@ describe('relay', function () {
|
||||
], () => {
|
||||
relay = new Hop(swarm, { enabled: true })
|
||||
relay._circuit = sinon.stub()
|
||||
relay._circuit.callsArg(2, null, new Connection())
|
||||
relay._circuit.callsArgWith(2, null, new Connection())
|
||||
done()
|
||||
})
|
||||
})
|
||||
@ -81,7 +86,7 @@ describe('relay', function () {
|
||||
relay._circuit.reset()
|
||||
})
|
||||
|
||||
it(`should handle a valid circuit request`, function (done) {
|
||||
it(`should handle a valid circuit request`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -102,7 +107,7 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
|
||||
it(`should handle a request to passive circuit`, function (done) {
|
||||
it(`should handle a request to passive circuit`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -116,8 +121,10 @@ describe('relay', function () {
|
||||
}
|
||||
|
||||
relay.active = false
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
expect(err).to.be.null()
|
||||
lp.decodeFromReader(
|
||||
shake,
|
||||
(err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
|
||||
@ -128,7 +135,7 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
|
||||
it(`should handle a request to active circuit`, function (done) {
|
||||
it(`should handle a request to active circuit`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -154,7 +161,7 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
|
||||
it(`not dial to self`, function (done) {
|
||||
it(`not dial to self`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -162,13 +169,15 @@ describe('relay', function () {
|
||||
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
|
||||
},
|
||||
dstPeer: {
|
||||
id: PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).id,
|
||||
addrs: [multiaddr(`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).buffer]
|
||||
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
|
||||
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
|
||||
}
|
||||
}
|
||||
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
expect(err).to.be.null()
|
||||
lp.decodeFromReader(
|
||||
shake,
|
||||
(err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF)
|
||||
@ -179,7 +188,7 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
|
||||
it(`fail on invalid src address`, function (done) {
|
||||
it(`fail on invalid src address`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -192,8 +201,10 @@ describe('relay', function () {
|
||||
}
|
||||
}
|
||||
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
expect(err).to.be.null()
|
||||
lp.decodeFromReader(
|
||||
shake,
|
||||
(err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID)
|
||||
@ -204,7 +215,7 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
|
||||
it(`fail on invalid dst address`, function (done) {
|
||||
it(`fail on invalid dst address`, (done) => {
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
@ -217,8 +228,10 @@ describe('relay', function () {
|
||||
}
|
||||
}
|
||||
|
||||
lp.decodeFromReader(shake, (err, msg) => {
|
||||
expect(err).to.be.null()
|
||||
lp.decodeFromReader(
|
||||
shake,
|
||||
(err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID)
|
||||
@ -229,4 +242,192 @@ describe('relay', function () {
|
||||
relay.handle(relayMsg, new StreamHandler(fromConn))
|
||||
})
|
||||
})
|
||||
|
||||
describe(`._circuit`, () => {
|
||||
let relay
|
||||
let swarm
|
||||
let srcConn
|
||||
let dstConn
|
||||
let srcStream
|
||||
let dstStream
|
||||
let srcShake
|
||||
let dstShake
|
||||
|
||||
before((done) => {
|
||||
srcStream = handshake({ timeout: 1000 * 60 })
|
||||
srcShake = srcStream.handshake
|
||||
srcConn = new Connection(srcStream)
|
||||
dstStream = handshake({ timeout: 1000 * 60 })
|
||||
dstShake = dstStream.handshake
|
||||
dstConn = new Connection(dstStream)
|
||||
const peerInfo = new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA'))
|
||||
srcConn.setPeerInfo(peerInfo)
|
||||
|
||||
let peers = {
|
||||
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE:
|
||||
new PeerInfo(PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`)),
|
||||
QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA:
|
||||
new PeerInfo(PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`)),
|
||||
QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy:
|
||||
new PeerInfo(PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`))
|
||||
}
|
||||
|
||||
Object.keys(peers).forEach((key) => { peers[key]._connectedMultiaddr = true }) // make it truthy
|
||||
|
||||
waterfall([
|
||||
(cb) => PeerId.createFromJSON(nodes.node4, cb),
|
||||
(peerId, cb) => PeerInfo.create(peerId, cb),
|
||||
(peer, cb) => {
|
||||
peer.multiaddrs.add('/p2p-circuit/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')
|
||||
swarm = {
|
||||
_peerInfo: peer,
|
||||
conns: {
|
||||
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection(),
|
||||
QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA: new Connection(),
|
||||
QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy: new Connection()
|
||||
},
|
||||
_peerBook: {
|
||||
get: (peer) => {
|
||||
if (!peers[peer]) {
|
||||
throw new Error()
|
||||
}
|
||||
|
||||
return peers[peer]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cb()
|
||||
}
|
||||
], () => {
|
||||
relay = new Hop(swarm, { enabled: true })
|
||||
relay._dialPeer = sinon.stub()
|
||||
relay._dialPeer.callsArgWith(1, null, dstConn)
|
||||
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
after(() => relay._dialPeer.reset())
|
||||
|
||||
describe('should correctly dial destination node', () => {
|
||||
let msg = {
|
||||
type: proto.CircuitRelay.Type.STOP,
|
||||
srcPeer: {
|
||||
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
|
||||
addrs: [Buffer.from(`dsfsdfsdf`)]
|
||||
},
|
||||
dstPeer: {
|
||||
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
|
||||
addrs: [Buffer.from(`sdflksdfndsklfnlkdf`)]
|
||||
}
|
||||
}
|
||||
|
||||
before(() => {
|
||||
relay._circuit(
|
||||
new StreamHandler(srcConn),
|
||||
msg,
|
||||
(err) => {
|
||||
expect(err).to.not.exist()
|
||||
})
|
||||
})
|
||||
|
||||
it('should respond with SUCCESS to source node', (done) => {
|
||||
lp.decodeFromReader(
|
||||
srcShake,
|
||||
(err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS)
|
||||
expect(response.code).to.equal(proto.CircuitRelay.Status.SUCCESS)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('should send STOP message to destination node', (done) => {
|
||||
lp.decodeFromReader(
|
||||
dstShake,
|
||||
(err, _msg) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
const response = proto.CircuitRelay.decode(_msg)
|
||||
expect(response.type).to.deep.equal(msg.type)
|
||||
expect(response.srcPeer).to.deep.equal(msg.srcPeer)
|
||||
expect(response.dstPeer).to.deep.equal(msg.dstPeer)
|
||||
done()
|
||||
})
|
||||
})
|
||||
|
||||
it('should create circuit', (done) => {
|
||||
pull(
|
||||
values([proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.SUCCESS
|
||||
})]),
|
||||
lp.encode(),
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
encoded.forEach((e) => dstShake.write(e))
|
||||
pull(
|
||||
values([Buffer.from('hello')]),
|
||||
lp.encode(),
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
encoded.forEach((e) => srcShake.write(e))
|
||||
lp.decodeFromReader(
|
||||
dstShake,
|
||||
(err, _msg) => {
|
||||
expect(err).to.not.exist()
|
||||
expect(_msg.toString()).to.equal('hello')
|
||||
|
||||
done()
|
||||
})
|
||||
})
|
||||
)
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('should fail creating circuit', () => {
|
||||
let msg = {
|
||||
type: proto.CircuitRelay.Type.STOP,
|
||||
srcPeer: {
|
||||
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
|
||||
addrs: [Buffer.from(`dsfsdfsdf`)]
|
||||
},
|
||||
dstPeer: {
|
||||
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
|
||||
addrs: [Buffer.from(`sdflksdfndsklfnlkdf`)]
|
||||
}
|
||||
}
|
||||
|
||||
it('should not create circuit', (done) => {
|
||||
relay._circuit(
|
||||
new StreamHandler(srcConn),
|
||||
msg,
|
||||
(err) => {
|
||||
expect(err).to.exist()
|
||||
expect(err).to.match(/Unable to create circuit!/)
|
||||
done()
|
||||
})
|
||||
|
||||
pull(
|
||||
values([proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: proto.CircuitRelay.Status.STOP_RELAY_REFUSED
|
||||
})]),
|
||||
lp.encode(),
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
|
||||
encoded.forEach((e) => dstShake.write(e))
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -11,7 +11,9 @@ const handshake = require('pull-handshake')
|
||||
const Connection = require('interface-connection').Connection
|
||||
const proto = require('../src/protocol')
|
||||
const lp = require('pull-length-prefixed')
|
||||
const pull = require('pull-stream')
|
||||
const pull = require('pull-stream/pull')
|
||||
const values = require('pull-stream/sources/values')
|
||||
const collect = require('pull-stream/sinks/collect')
|
||||
const multicodec = require('../src/multicodec')
|
||||
|
||||
const chai = require('chai')
|
||||
@ -30,10 +32,11 @@ describe('listener', function () {
|
||||
let conn = null
|
||||
|
||||
beforeEach(function (done) {
|
||||
stream = handshake({timeout: 1000 * 60})
|
||||
stream = handshake({ timeout: 1000 * 60 })
|
||||
shake = stream.handshake
|
||||
conn = new Connection(stream)
|
||||
conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')))
|
||||
conn.setPeerInfo(new PeerInfo(PeerId
|
||||
.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')))
|
||||
|
||||
waterfall([
|
||||
(cb) => PeerId.createFromJSON(nodes.node4, cb),
|
||||
@ -88,10 +91,10 @@ describe('listener', function () {
|
||||
}
|
||||
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
})
|
||||
)
|
||||
@ -125,10 +128,45 @@ describe('listener', function () {
|
||||
}
|
||||
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
it(`should emit 'connection'`, function (done) {
|
||||
handlerSpy(multicodec.relay, conn)
|
||||
|
||||
let relayMsg = {
|
||||
type: proto.CircuitRelay.Type.STOP,
|
||||
srcPeer: {
|
||||
id: `QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`,
|
||||
addrs: [`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`]
|
||||
},
|
||||
dstPeer: {
|
||||
id: `QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`,
|
||||
addrs: [`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`]
|
||||
}
|
||||
}
|
||||
|
||||
listener.stopHandler.handle = (message, sh) => {
|
||||
const newConn = new Connection(sh.rest())
|
||||
listener.stopHandler.emit('connection', newConn)
|
||||
}
|
||||
|
||||
listener.on('connection', (conn) => {
|
||||
expect(conn).to.be.instanceof(Connection)
|
||||
done()
|
||||
})
|
||||
|
||||
pull(
|
||||
values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
lp.encode(),
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
})
|
||||
)
|
||||
@ -162,10 +200,10 @@ describe('listener', function () {
|
||||
}
|
||||
|
||||
pull(
|
||||
pull.values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
values([proto.CircuitRelay.encode(relayMsg)]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
})
|
||||
)
|
||||
@ -187,14 +225,14 @@ describe('listener', function () {
|
||||
}
|
||||
|
||||
pull(
|
||||
pull.values([Buffer.from([relayMsg])]),
|
||||
values([Buffer.from([relayMsg])]),
|
||||
lp.encode(),
|
||||
pull.collect((err, encoded) => {
|
||||
expect(err).to.be.null()
|
||||
collect((err, encoded) => {
|
||||
expect(err).to.not.exist()
|
||||
encoded.forEach((e) => shake.write(e))
|
||||
}),
|
||||
lp.decodeFromReader(shake, {maxLength: this.maxLength}, (err, msg) => {
|
||||
expect(err).to.be.null()
|
||||
lp.decodeFromReader(shake, { maxLength: this.maxLength }, (err, msg) => {
|
||||
expect(err).to.not.exist()
|
||||
expect(proto.CircuitRelay.decode(msg).type).to.equal(proto.CircuitRelay.Type.STATUS)
|
||||
expect(proto.CircuitRelay.decode(msg).code).to.equal(proto.CircuitRelay.Status.MALFORMED_MESSAGE)
|
||||
done()
|
||||
@ -233,7 +271,7 @@ describe('listener', function () {
|
||||
peerInfo.multiaddrs.add(`/ip4/127.0.0.1/tcp/4003/ws`)
|
||||
|
||||
listener.getAddrs((err, addrs) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(addrs).to.deep.equal([
|
||||
multiaddr(`/p2p-circuit/ip4/0.0.0.0/tcp/4002/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`),
|
||||
multiaddr(`/p2p-circuit/ip4/127.0.0.1/tcp/4003/ws/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`)])
|
||||
@ -244,7 +282,7 @@ describe('listener', function () {
|
||||
peerInfo.multiaddrs.add(`/ip4/127.0.0.1/tcp/4003/ws`)
|
||||
peerInfo.multiaddrs.add(`/p2p-circuit/ip4/0.0.0.0/tcp/4002`)
|
||||
listener.getAddrs((err, addrs) => {
|
||||
expect(err).to.be.null()
|
||||
expect(err).to.not.exist()
|
||||
expect(addrs[0]
|
||||
.toString())
|
||||
.to.equal(`/p2p-circuit/ip4/0.0.0.0/tcp/4002/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`)
|
||||
|
@ -25,9 +25,10 @@ describe('stop', function () {
|
||||
let stream
|
||||
|
||||
beforeEach(function (done) {
|
||||
stream = handshake({timeout: 1000 * 60})
|
||||
stream = handshake({ timeout: 1000 * 60 })
|
||||
conn = new Connection(stream)
|
||||
conn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')))
|
||||
const peerId = PeerId.createFromB58String('QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE')
|
||||
conn.setPeerInfo(new PeerInfo(peerId))
|
||||
|
||||
waterfall([
|
||||
(cb) => PeerId.createFromJSON(nodes.node4, cb),
|
||||
|
Reference in New Issue
Block a user