mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-24 18:12:14 +00:00
refactor: circuit relay to async (#477)
* refactor: add dialing over relay support * chore: fix lint * fix: dont clear listeners on close * fix: if dial errors already have codes, just rethrow them * fix: clear the registrar when libp2p stops * fix: improve connection maintenance with circuit * chore: correct feedback * test: use chai as promised * test(fix): reset multiaddrs on dial test
This commit is contained in:
parent
18a062ed12
commit
f77ce39484
@ -23,6 +23,15 @@ const before = async () => {
|
||||
transport: [WebSockets],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
},
|
||||
config: {
|
||||
relay: {
|
||||
enabled: true,
|
||||
hop: {
|
||||
enabled: true,
|
||||
active: false
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
// Add the echo protocol
|
||||
|
@ -59,13 +59,13 @@
|
||||
"mafmt": "^7.0.0",
|
||||
"merge-options": "^1.0.1",
|
||||
"moving-average": "^1.0.0",
|
||||
"multiaddr": "^7.1.0",
|
||||
"multiaddr": "^7.2.1",
|
||||
"multistream-select": "^0.15.0",
|
||||
"once": "^1.4.0",
|
||||
"p-map": "^3.0.0",
|
||||
"p-queue": "^6.1.1",
|
||||
"p-settle": "^3.1.0",
|
||||
"peer-id": "^0.13.3",
|
||||
"peer-id": "^0.13.4",
|
||||
"peer-info": "^0.17.0",
|
||||
"promisify-es6": "^1.0.3",
|
||||
"protons": "^1.0.1",
|
||||
@ -80,6 +80,7 @@
|
||||
"abortable-iterator": "^2.1.0",
|
||||
"aegir": "^20.0.0",
|
||||
"chai": "^4.2.0",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
"chai-checkmark": "^1.0.1",
|
||||
"cids": "^0.7.1",
|
||||
"delay": "^4.3.0",
|
||||
|
@ -32,8 +32,6 @@ Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes
|
||||
- [Example](#example)
|
||||
- [Create dialer/listener](#create-dialerlistener)
|
||||
- [Create `relay`](#create-relay)
|
||||
- [This module uses `pull-streams`](#this-module-uses-pull-streams)
|
||||
- [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams)
|
||||
- [API](#api)
|
||||
- [Implementation rational](#implementation-rational)
|
||||
|
||||
@ -48,7 +46,7 @@ const Circuit = require('libp2p-circuit')
|
||||
const multiaddr = require('multiaddr')
|
||||
const pull = require('pull-stream')
|
||||
|
||||
const mh1 = multiaddr('/p2p-circuit/ipfs/QmHash') // dial /ipfs/QmHash over any circuit
|
||||
const mh1 = multiaddr('/p2p-circuit/p2p/QmHash') // dial /p2p/QmHash over any circuit
|
||||
|
||||
const circuit = new Circuit(swarmInstance, options) // pass swarm instance and options
|
||||
|
||||
@ -91,37 +89,13 @@ const relay = new Relay(options)
|
||||
relay.mount(swarmInstance) // start relaying traffic
|
||||
```
|
||||
|
||||
### This module uses `pull-streams`
|
||||
|
||||
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
|
||||
|
||||
You can learn more about pull-streams at:
|
||||
|
||||
- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
|
||||
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
|
||||
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
|
||||
- [pull-streams documentation](https://pull-stream.github.io/)
|
||||
|
||||
#### Converting `pull-streams` to Node.js Streams
|
||||
|
||||
If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:
|
||||
|
||||
```js
|
||||
const pullToStream = require('pull-stream-to-stream')
|
||||
|
||||
const nodeStreamInstance = pullToStream(pullStreamInstance)
|
||||
// nodeStreamInstance is an instance of a Node.js Stream
|
||||
```
|
||||
|
||||
To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.
|
||||
|
||||
## API
|
||||
|
||||
[](https://github.com/libp2p/interface-transport)
|
||||
|
||||
`libp2p-circuit` accepts Circuit addresses for both IPFS and non IPFS encapsulated addresses, i.e:
|
||||
|
||||
`/p2p-circuit/ip4/127.0.0.1/tcp/4001/ipfs/QmHash`
|
||||
`/p2p-circuit/ip4/127.0.0.1/tcp/4001/p2p/QmHash`
|
||||
|
||||
Both for dialing and listening.
|
||||
|
||||
|
@ -1,126 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const mafmt = require('mafmt')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const CircuitDialer = require('./circuit/dialer')
|
||||
const utilsFactory = require('./circuit/utils')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit:transportdialer')
|
||||
log.err = debug('libp2p:circuit:error:transportdialer')
|
||||
|
||||
const createListener = require('./listener')
|
||||
|
||||
class Circuit {
|
||||
static get tag () {
|
||||
return 'Circuit'
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of Dialer.
|
||||
*
|
||||
* @param {Swarm} swarm - the swarm
|
||||
* @param {any} options - config options
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
constructor (swarm, options) {
|
||||
this.options = options || {}
|
||||
|
||||
this.swarm = swarm
|
||||
this.dialer = null
|
||||
this.utils = utilsFactory(swarm)
|
||||
this.peerInfo = this.swarm._peerInfo
|
||||
this.relays = this.filter(this.peerInfo.multiaddrs.toArray())
|
||||
|
||||
// if no explicit relays, add a default relay addr
|
||||
if (this.relays.length === 0) {
|
||||
this.peerInfo
|
||||
.multiaddrs
|
||||
.add(`/p2p-circuit/ipfs/${this.peerInfo.id.toB58String()}`)
|
||||
}
|
||||
|
||||
this.dialer = new CircuitDialer(swarm, options)
|
||||
|
||||
this.swarm.on('peer-mux-established', (peerInfo) => {
|
||||
this.dialer.canHop(peerInfo)
|
||||
})
|
||||
this.swarm.on('peer-mux-closed', (peerInfo) => {
|
||||
this.dialer.relayPeers.delete(peerInfo.id.toB58String())
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial the relays in the Addresses.Swarm config
|
||||
*
|
||||
* @param {Array} relays
|
||||
* @return {void}
|
||||
*/
|
||||
_dialSwarmRelays () {
|
||||
// if we have relay addresses in swarm config, then dial those relays
|
||||
this.relays.forEach((relay) => {
|
||||
const relaySegments = relay
|
||||
.toString()
|
||||
.split('/p2p-circuit')
|
||||
.filter(segment => segment.length)
|
||||
|
||||
relaySegments.forEach((relaySegment) => {
|
||||
const ma = this.utils.peerInfoFromMa(multiaddr(relaySegment))
|
||||
this.dialer._dialRelay(ma)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*
|
||||
* @param {multiaddr} ma - the multiaddr of the peer to dial
|
||||
* @param {Object} options - dial options
|
||||
* @param {Function} cb - a callback called once dialed
|
||||
* @returns {Connection} - the connection
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
dial (ma, options, cb) {
|
||||
return this.dialer.dial(ma, options, cb)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a listener
|
||||
*
|
||||
* @param {any} options
|
||||
* @param {Function} handler
|
||||
* @return {listener}
|
||||
*/
|
||||
createListener (options, handler) {
|
||||
if (typeof options === 'function') {
|
||||
handler = options
|
||||
options = this.options || {}
|
||||
}
|
||||
|
||||
const listener = createListener(this.swarm, options, handler)
|
||||
listener.on('listen', this._dialSwarmRelays.bind(this))
|
||||
return listener
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter check for all multiaddresses
|
||||
* that this transport can dial on
|
||||
*
|
||||
* @param {any} multiaddrs
|
||||
* @returns {Array<multiaddr>}
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
filter (multiaddrs) {
|
||||
if (!Array.isArray(multiaddrs)) {
|
||||
multiaddrs = [multiaddrs]
|
||||
}
|
||||
return multiaddrs.filter((ma) => {
|
||||
return mafmt.Circuit.matches(ma)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Circuit
|
@ -1,275 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const once = require('once')
|
||||
const PeerId = require('peer-id')
|
||||
const waterfall = require('async/waterfall')
|
||||
const setImmediate = require('async/setImmediate')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||
|
||||
const utilsFactory = require('./utils')
|
||||
const StreamHandler = require('./stream-handler')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit:dialer')
|
||||
log.err = debug('libp2p:circuit:error:dialer')
|
||||
|
||||
const multicodec = require('../multicodec')
|
||||
const proto = require('../protocol')
|
||||
|
||||
class Dialer {
|
||||
/**
|
||||
* Creates an instance of Dialer.
|
||||
* @param {Swarm} swarm - the swarm
|
||||
* @param {any} options - config options
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
constructor (swarm, options) {
|
||||
this.swarm = swarm
|
||||
this.relayPeers = new Map()
|
||||
this.relayConns = new Map()
|
||||
this.options = options
|
||||
this.utils = utilsFactory(swarm)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper that returns a relay connection
|
||||
*
|
||||
* @param {*} relay
|
||||
* @param {*} callback
|
||||
* @returns {Function} - callback
|
||||
*/
|
||||
_dialRelayHelper (relay, callback) {
|
||||
if (this.relayConns.has(relay.id.toB58String())) {
|
||||
return callback(null, this.relayConns.get(relay.id.toB58String()))
|
||||
}
|
||||
|
||||
return this._dialRelay(relay, callback)
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*
|
||||
* @param {multiaddr} ma - the multiaddr of the peer to dial
|
||||
* @param {Function} cb - a callback called once dialed
|
||||
* @returns {Connection} - the connection
|
||||
*
|
||||
*/
|
||||
dial (ma, cb) {
|
||||
cb = cb || (() => { })
|
||||
const strMa = ma.toString()
|
||||
if (!strMa.includes('/p2p-circuit')) {
|
||||
log.err('invalid circuit address')
|
||||
return cb(new Error('invalid circuit address'))
|
||||
}
|
||||
|
||||
const addr = strMa.split('p2p-circuit') // extract relay address if any
|
||||
const relay = addr[0] === '/' ? null : multiaddr(addr[0])
|
||||
const peer = multiaddr(addr[1] || addr[0])
|
||||
|
||||
const dstConn = new Connection()
|
||||
setImmediate(
|
||||
this._dialPeer.bind(this),
|
||||
peer,
|
||||
relay,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
dstConn.setInnerConn(conn)
|
||||
cb(null, dstConn)
|
||||
})
|
||||
|
||||
return dstConn
|
||||
}
|
||||
|
||||
/**
|
||||
* Does the peer support the HOP protocol
|
||||
*
|
||||
* @param {PeerInfo} peer
|
||||
* @param {Function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
canHop (peer, callback) {
|
||||
callback = once(callback || (() => { }))
|
||||
|
||||
this._dialRelayHelper(peer, (err, conn) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const sh = new StreamHandler(conn)
|
||||
waterfall([
|
||||
(cb) => sh.write(proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.CAN_HOP
|
||||
}), cb),
|
||||
(cb) => sh.read(cb)
|
||||
], (err, msg) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const response = proto.CircuitRelay.decode(msg)
|
||||
|
||||
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
|
||||
const err = new Error(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
|
||||
log(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
log('HOP supported adding as relay - %s', this.utils.getB58String(peer))
|
||||
this.relayPeers.set(this.utils.getB58String(peer), peer)
|
||||
sh.close()
|
||||
callback()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial the destination peer over a relay
|
||||
*
|
||||
* @param {multiaddr} dstMa
|
||||
* @param {Connection|PeerInfo} relay
|
||||
* @param {Function} cb
|
||||
* @return {Function|void}
|
||||
* @private
|
||||
*/
|
||||
_dialPeer (dstMa, relay, cb) {
|
||||
if (typeof relay === 'function') {
|
||||
cb = relay
|
||||
relay = null
|
||||
}
|
||||
|
||||
if (!cb) {
|
||||
cb = () => {}
|
||||
}
|
||||
|
||||
dstMa = multiaddr(dstMa)
|
||||
// if no relay provided, dial on all available relays until one succeeds
|
||||
if (!relay) {
|
||||
const relays = Array.from(this.relayPeers.values())
|
||||
const next = (nextRelay) => {
|
||||
if (!nextRelay) {
|
||||
const err = 'no relay peers were found or all relays failed to dial'
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
return this._negotiateRelay(
|
||||
nextRelay,
|
||||
dstMa,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return next(relays.shift())
|
||||
}
|
||||
cb(null, conn)
|
||||
})
|
||||
}
|
||||
next(relays.shift())
|
||||
} else {
|
||||
return this._negotiateRelay(
|
||||
relay,
|
||||
dstMa,
|
||||
(err, conn) => {
|
||||
if (err) {
|
||||
log.err('An error has occurred negotiating the relay connection', err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
return cb(null, conn)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Negotiate the relay connection
|
||||
*
|
||||
* @param {Multiaddr|PeerInfo|Connection} relay - the Connection or PeerInfo of the relay
|
||||
* @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for
|
||||
* @param {Function} callback - a callback which gets the negotiated relay connection
|
||||
* @returns {void}
|
||||
* @private
|
||||
*
|
||||
* @memberOf Dialer
|
||||
*/
|
||||
_negotiateRelay (relay, dstMa, callback) {
|
||||
dstMa = multiaddr(dstMa)
|
||||
relay = this.utils.peerInfoFromMa(relay)
|
||||
const srcMas = this.swarm._peerInfo.multiaddrs.toArray()
|
||||
this._dialRelayHelper(relay, (err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
const sh = new StreamHandler(conn)
|
||||
waterfall([
|
||||
(cb) => {
|
||||
log('negotiating relay for peer %s', dstMa.getPeerId())
|
||||
let dstPeerId
|
||||
try {
|
||||
dstPeerId = PeerId.createFromB58String(dstMa.getPeerId()).id
|
||||
} catch (err) {
|
||||
return cb(err)
|
||||
}
|
||||
sh.write(
|
||||
proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.HOP,
|
||||
srcPeer: {
|
||||
id: this.swarm._peerInfo.id.id,
|
||||
addrs: srcMas.map((addr) => addr.buffer)
|
||||
},
|
||||
dstPeer: {
|
||||
id: dstPeerId,
|
||||
addrs: [dstMa.buffer]
|
||||
}
|
||||
}), cb)
|
||||
},
|
||||
(cb) => sh.read(cb)
|
||||
], (err, msg) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const message = proto.CircuitRelay.decode(msg)
|
||||
if (message.type !== proto.CircuitRelay.Type.STATUS) {
|
||||
return callback(new Error('Got invalid message type - ' +
|
||||
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
|
||||
}
|
||||
|
||||
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
|
||||
return callback(new Error(`Got ${message.code} error code trying to dial over relay`))
|
||||
}
|
||||
|
||||
callback(null, new Connection(sh.rest()))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a relay peer by its PeerInfo
|
||||
*
|
||||
* @param {PeerInfo} peer - the PeerInfo of the relay peer
|
||||
* @param {Function} cb - a callback with the connection to the relay peer
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_dialRelay (peer, cb) {
|
||||
cb = once(cb || (() => { }))
|
||||
|
||||
this.swarm.dial(
|
||||
peer,
|
||||
multicodec.relay,
|
||||
once((err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return cb(err)
|
||||
}
|
||||
cb(null, conn)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Dialer
|
@ -1,283 +1,136 @@
|
||||
'use strict'
|
||||
|
||||
const pull = require('pull-stream/pull')
|
||||
const debug = require('debug')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const EE = require('events').EventEmitter
|
||||
const once = require('once')
|
||||
const utilsFactory = require('./utils')
|
||||
const { validateAddrs } = require('./utils')
|
||||
const StreamHandler = require('./stream-handler')
|
||||
const proto = require('../protocol').CircuitRelay
|
||||
const multiaddr = require('multiaddr')
|
||||
const series = require('async/series')
|
||||
const waterfall = require('async/waterfall')
|
||||
const setImmediate = require('async/setImmediate')
|
||||
const { CircuitRelay: CircuitPB } = require('../protocol')
|
||||
const pipe = require('it-pipe')
|
||||
const errCode = require('err-code')
|
||||
const { codes: Errors } = require('../../errors')
|
||||
|
||||
const { stop } = require('./stop')
|
||||
|
||||
const multicodec = require('./../multicodec')
|
||||
|
||||
const log = debug('libp2p:circuit:relay')
|
||||
log.err = debug('libp2p:circuit:error:relay')
|
||||
const log = debug('libp2p:circuit:hop')
|
||||
log.error = debug('libp2p:circuit:hop:error')
|
||||
|
||||
class Hop extends EE {
|
||||
/**
|
||||
* Construct a Circuit object
|
||||
*
|
||||
* This class will handle incoming circuit connections and
|
||||
* either start a relay or hand the relayed connection to
|
||||
* the swarm
|
||||
*
|
||||
* @param {Swarm} swarm
|
||||
* @param {Object} options
|
||||
*/
|
||||
constructor (swarm, options) {
|
||||
super()
|
||||
this.swarm = swarm
|
||||
this.peerInfo = this.swarm._peerInfo
|
||||
this.utils = utilsFactory(swarm)
|
||||
this.config = options || { active: false, enabled: false }
|
||||
this.active = this.config.active
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the relay message
|
||||
*
|
||||
* @param {CircuitRelay} message
|
||||
* @param {StreamHandler} sh
|
||||
* @returns {*}
|
||||
*/
|
||||
handle (message, sh) {
|
||||
if (!this.config.enabled) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_CANT_SPEAK_RELAY)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
// check if message is `CAN_HOP`
|
||||
if (message.type === proto.Type.CAN_HOP) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.SUCCESS)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
// This is a relay request - validate and create a circuit
|
||||
let srcPeerId = null
|
||||
let dstPeerId = null
|
||||
try {
|
||||
srcPeerId = PeerId.createFromBytes(message.srcPeer.id).toB58String()
|
||||
dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String()
|
||||
} catch (err) {
|
||||
log.err(err)
|
||||
|
||||
if (!srcPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_SRC_MULTIADDR_INVALID)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
if (!dstPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_DST_MULTIADDR_INVALID)
|
||||
return sh.close()
|
||||
}
|
||||
}
|
||||
|
||||
if (srcPeerId === dstPeerId) {
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_CANT_RELAY_TO_SELF)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
if (!message.dstPeer.addrs.length) {
|
||||
// TODO: use encapsulate here
|
||||
const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer
|
||||
message.dstPeer.addrs.push(addr)
|
||||
}
|
||||
|
||||
log('trying to establish a circuit: %s <-> %s', srcPeerId, dstPeerId)
|
||||
const noPeer = () => {
|
||||
// log.err(err)
|
||||
this.utils.writeResponse(
|
||||
sh,
|
||||
proto.Status.HOP_NO_CONN_TO_DST)
|
||||
return sh.close()
|
||||
}
|
||||
|
||||
const isConnected = (cb) => {
|
||||
let dstPeer
|
||||
try {
|
||||
dstPeer = this.swarm._peerBook.get(dstPeerId)
|
||||
if (!dstPeer.isConnected() && !this.active) {
|
||||
const err = new Error(`No Connection to peer ${dstPeerId}`)
|
||||
noPeer(err)
|
||||
return cb(err)
|
||||
}
|
||||
} catch (err) {
|
||||
if (!this.active) {
|
||||
noPeer(err)
|
||||
return cb(err)
|
||||
}
|
||||
}
|
||||
cb()
|
||||
}
|
||||
|
||||
series([
|
||||
(cb) => this.utils.validateAddrs(message, sh, proto.Type.HOP, cb),
|
||||
(cb) => isConnected(cb),
|
||||
(cb) => this._circuit(sh, message, cb)
|
||||
], (err) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
sh.close()
|
||||
return setImmediate(() => this.emit('circuit:error', err))
|
||||
}
|
||||
setImmediate(() => this.emit('circuit:success'))
|
||||
module.exports.handleHop = async function handleHop ({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
circuit
|
||||
}) {
|
||||
// Ensure hop is enabled
|
||||
if (!circuit._options.hop.enabled) {
|
||||
log('HOP request received but we are not acting as a relay')
|
||||
return streamHandler.end({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.HOP_CANT_SPEAK_RELAY
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to STOP
|
||||
*
|
||||
* @param {PeerInfo} peer
|
||||
* @param {StreamHandler} srcSh
|
||||
* @param {function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
_connectToStop (peer, srcSh, callback) {
|
||||
this._dialPeer(peer, (err, dstConn) => {
|
||||
if (err) {
|
||||
this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.HOP_CANT_DIAL_DST)
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
// Validate the HOP request has the required input
|
||||
try {
|
||||
validateAddrs(request, streamHandler)
|
||||
} catch (err) {
|
||||
return log.error('invalid hop request via peer %s', connection.remotePeer.toB58String(), err)
|
||||
}
|
||||
|
||||
return this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.SUCCESS,
|
||||
(err) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
return callback(null, dstConn)
|
||||
})
|
||||
// Get the connection to the destination (stop) peer
|
||||
const destinationPeer = new PeerId(request.dstPeer.id)
|
||||
|
||||
const destinationConnection = circuit._registrar.getConnection(new PeerInfo(destinationPeer))
|
||||
if (!destinationConnection && !circuit._options.hop.active) {
|
||||
log('HOP request received but we are not connected to the destination peer')
|
||||
return streamHandler.end({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.HOP_NO_CONN_TO_DST
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Negotiate STOP
|
||||
*
|
||||
* @param {StreamHandler} dstSh
|
||||
* @param {StreamHandler} srcSh
|
||||
* @param {CircuitRelay} message
|
||||
* @param {function} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
_negotiateStop (dstSh, srcSh, message, callback) {
|
||||
const stopMsg = Object.assign({}, message, {
|
||||
type: proto.Type.STOP // change the message type
|
||||
// TODO: Handle being an active relay
|
||||
|
||||
// Handle the incoming HOP request by performing a STOP request
|
||||
const stopRequest = {
|
||||
type: CircuitPB.Type.STOP,
|
||||
dstPeer: request.dstPeer,
|
||||
srcPeer: request.srcPeer
|
||||
}
|
||||
|
||||
let destinationStream
|
||||
try {
|
||||
destinationStream = await stop({
|
||||
connection: destinationConnection,
|
||||
request: stopRequest,
|
||||
circuit
|
||||
})
|
||||
dstSh.write(proto.encode(stopMsg),
|
||||
(err) => {
|
||||
if (err) {
|
||||
this.utils.writeResponse(
|
||||
srcSh,
|
||||
proto.Status.HOP_CANT_OPEN_DST_STREAM)
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
// read response from STOP
|
||||
dstSh.read((err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const message = proto.decode(msg)
|
||||
if (message.code !== proto.Status.SUCCESS) {
|
||||
return callback(new Error('Unable to create circuit!'))
|
||||
}
|
||||
|
||||
return callback(null, msg)
|
||||
})
|
||||
})
|
||||
} catch (err) {
|
||||
return log.error(err)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to make a circuit from A <-> R <-> B where R is this relay
|
||||
*
|
||||
* @param {StreamHandler} srcSh - the source stream handler
|
||||
* @param {CircuitRelay} message - the message with the src and dst entries
|
||||
* @param {Function} callback - callback to signal success or failure
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_circuit (srcSh, message, callback) {
|
||||
let dstSh = null
|
||||
waterfall([
|
||||
(cb) => this._connectToStop(message.dstPeer, srcSh, cb),
|
||||
(_dstConn, cb) => {
|
||||
dstSh = new StreamHandler(_dstConn)
|
||||
this._negotiateStop(dstSh, srcSh, message, cb)
|
||||
}
|
||||
], (err) => {
|
||||
if (err) {
|
||||
// close/end the source stream if there was an error
|
||||
if (srcSh) {
|
||||
srcSh.close()
|
||||
}
|
||||
log('hop request from %s is valid', connection.remotePeer.toB58String())
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.SUCCESS
|
||||
})
|
||||
const sourceStream = streamHandler.rest()
|
||||
|
||||
if (dstSh) {
|
||||
dstSh.close()
|
||||
}
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const src = srcSh.rest()
|
||||
const dst = dstSh.rest()
|
||||
|
||||
const srcIdStr = PeerId.createFromBytes(message.srcPeer.id).toB58String()
|
||||
const dstIdStr = PeerId.createFromBytes(message.dstPeer.id).toB58String()
|
||||
|
||||
// circuit the src and dst streams
|
||||
pull(
|
||||
src,
|
||||
dst,
|
||||
src
|
||||
)
|
||||
log('circuit %s <-> %s established', srcIdStr, dstIdStr)
|
||||
callback()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial the dest peer and create a circuit
|
||||
*
|
||||
* @param {Multiaddr} dstPeer
|
||||
* @param {Function} callback
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_dialPeer (dstPeer, callback) {
|
||||
const peerInfo = new PeerInfo(PeerId.createFromBytes(dstPeer.id))
|
||||
dstPeer.addrs.forEach((a) => peerInfo.multiaddrs.add(a))
|
||||
this.swarm.dial(peerInfo, multicodec.relay, once((err, conn) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
callback(null, conn)
|
||||
}))
|
||||
}
|
||||
// Short circuit the two streams to create the relayed connection
|
||||
return pipe(
|
||||
sourceStream,
|
||||
destinationStream,
|
||||
sourceStream
|
||||
)
|
||||
}
|
||||
|
||||
module.exports = Hop
|
||||
/**
|
||||
* Performs a HOP request to a relay peer, to request a connection to another
|
||||
* peer. A new, virtual, connection will be created between the two via the relay.
|
||||
*
|
||||
* @param {object} options
|
||||
* @param {Connection} options.connection Connection to the relay
|
||||
* @param {*} options.request
|
||||
* @param {Circuit} options.circuit
|
||||
* @returns {Promise<Connection>}
|
||||
*/
|
||||
module.exports.hop = async function hop ({
|
||||
connection,
|
||||
request
|
||||
}) {
|
||||
// Create a new stream to the relay
|
||||
const { stream } = await connection.newStream([multicodec.relay])
|
||||
// Send the HOP request
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
streamHandler.write(request)
|
||||
|
||||
const response = await streamHandler.read()
|
||||
|
||||
if (response.code === CircuitPB.Status.SUCCESS) {
|
||||
log('hop request was successful')
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
log('hop request failed with code %d, closing stream', response.code)
|
||||
streamHandler.close()
|
||||
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an unencoded CAN_HOP response based on the Circuits configuration
|
||||
* @private
|
||||
*/
|
||||
module.exports.handleCanHop = function handleCanHop ({
|
||||
connection,
|
||||
streamHandler,
|
||||
circuit
|
||||
}) {
|
||||
const canHop = circuit._options.hop.enabled
|
||||
log('can hop (%s) request from %s', canHop, connection.remotePeer.toB58String())
|
||||
streamHandler.end({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: canHop ? CircuitPB.Status.SUCCESS : CircuitPB.Status.HOP_CANT_SPEAK_RELAY
|
||||
})
|
||||
}
|
||||
|
@ -1,56 +1,69 @@
|
||||
'use strict'
|
||||
|
||||
const setImmediate = require('async/setImmediate')
|
||||
|
||||
const EE = require('events').EventEmitter
|
||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||
const utilsFactory = require('./utils')
|
||||
const PeerInfo = require('peer-info')
|
||||
const proto = require('../protocol').CircuitRelay
|
||||
const series = require('async/series')
|
||||
const { CircuitRelay: CircuitPB } = require('../protocol')
|
||||
const multicodec = require('../multicodec')
|
||||
const StreamHandler = require('./stream-handler')
|
||||
const { validateAddrs } = require('./utils')
|
||||
|
||||
const debug = require('debug')
|
||||
|
||||
const log = debug('libp2p:circuit:stop')
|
||||
log.err = debug('libp2p:circuit:error:stop')
|
||||
log.error = debug('libp2p:circuit:stop:error')
|
||||
|
||||
class Stop extends EE {
|
||||
constructor (swarm) {
|
||||
super()
|
||||
this.swarm = swarm
|
||||
this.utils = utilsFactory(swarm)
|
||||
/**
|
||||
* Handles incoming STOP requests
|
||||
*
|
||||
* @private
|
||||
* @param {*} options
|
||||
* @param {Connection} options.connection
|
||||
* @param {*} options.request The CircuitRelay protobuf request (unencoded)
|
||||
* @param {StreamHandler} options.streamHandler
|
||||
* @returns {Promise<*>} Resolves a duplex iterable
|
||||
*/
|
||||
module.exports.handleStop = function handleStop ({
|
||||
connection,
|
||||
request,
|
||||
streamHandler
|
||||
}) {
|
||||
// Validate the STOP request has the required input
|
||||
try {
|
||||
validateAddrs(request, streamHandler)
|
||||
} catch (err) {
|
||||
return log.error('invalid stop request via peer %s', connection.remotePeer.toB58String(), err)
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the incoming STOP message
|
||||
*
|
||||
* @param {{}} msg - the parsed protobuf message
|
||||
* @param {StreamHandler} sh - the stream handler wrapped connection
|
||||
* @param {Function} callback - callback
|
||||
* @returns {undefined}
|
||||
*/
|
||||
handle (msg, sh, callback) {
|
||||
callback = callback || (() => {})
|
||||
|
||||
series([
|
||||
(cb) => this.utils.validateAddrs(msg, sh, proto.Type.STOP, cb),
|
||||
(cb) => this.utils.writeResponse(sh, proto.Status.Success, cb)
|
||||
], (err) => {
|
||||
if (err) {
|
||||
// we don't return the error here,
|
||||
// since multistream select don't expect one
|
||||
callback()
|
||||
return log(err)
|
||||
}
|
||||
|
||||
const peerInfo = new PeerInfo(this.utils.peerIdFromId(msg.srcPeer.id))
|
||||
msg.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr))
|
||||
const newConn = new Connection(sh.rest())
|
||||
newConn.setPeerInfo(peerInfo)
|
||||
setImmediate(() => this.emit('connection', newConn))
|
||||
callback(newConn)
|
||||
})
|
||||
}
|
||||
// The request is valid
|
||||
log('stop request is valid')
|
||||
streamHandler.write({
|
||||
type: CircuitPB.Type.STATUS,
|
||||
code: CircuitPB.Status.SUCCESS
|
||||
})
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
module.exports = Stop
|
||||
/**
|
||||
* Creates a STOP request
|
||||
* @private
|
||||
* @param {*} options
|
||||
* @param {Connection} options.connection
|
||||
* @param {*} options.request The CircuitRelay protobuf request (unencoded)
|
||||
* @returns {Promise<*>} Resolves a duplex iterable
|
||||
*/
|
||||
module.exports.stop = async function stop ({
|
||||
connection,
|
||||
request
|
||||
}) {
|
||||
const { stream } = await connection.newStream([multicodec.relay])
|
||||
log('starting stop request to %s', connection.remotePeer.toB58String())
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
|
||||
streamHandler.write(request)
|
||||
const response = await streamHandler.read()
|
||||
|
||||
if (response.code === CircuitPB.Status.SUCCESS) {
|
||||
log('stop request to %s was successful', connection.remotePeer.toB58String())
|
||||
return streamHandler.rest()
|
||||
}
|
||||
|
||||
log('stop request failed with code %d', response.code)
|
||||
streamHandler.close()
|
||||
}
|
||||
|
@ -1,139 +1,79 @@
|
||||
'use strict'
|
||||
|
||||
const values = require('pull-stream/sources/values')
|
||||
const collect = require('pull-stream/sinks/collect')
|
||||
const empty = require('pull-stream/sources/empty')
|
||||
const pull = require('pull-stream/pull')
|
||||
const lp = require('pull-length-prefixed')
|
||||
const handshake = require('pull-handshake')
|
||||
const lp = require('it-length-prefixed')
|
||||
const handshake = require('it-handshake')
|
||||
const { CircuitRelay: CircuitPB } = require('../protocol')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit:stream-handler')
|
||||
log.err = debug('libp2p:circuit:error:stream-handler')
|
||||
log.error = debug('libp2p:circuit:stream-handler:error')
|
||||
|
||||
class StreamHandler {
|
||||
/**
|
||||
* Create a stream handler for connection
|
||||
*
|
||||
* @param {Connection} conn - connection to read/write
|
||||
* @param {Function|undefined} cb - handshake callback called on error
|
||||
* @param {Number} timeout - handshake timeout
|
||||
* @param {Number} maxLength - max bytes length of message
|
||||
* @param {object} options
|
||||
* @param {*} options.stream - A duplex iterable
|
||||
* @param {Number} options.maxLength - max bytes length of message
|
||||
*/
|
||||
constructor (conn, cb, timeout, maxLength) {
|
||||
this.conn = conn
|
||||
this.stream = null
|
||||
this.shake = null
|
||||
this.timeout = cb || 1000 * 60
|
||||
this.maxLength = maxLength || 4096
|
||||
constructor ({ stream, maxLength = 4096 }) {
|
||||
this.stream = stream
|
||||
|
||||
if (typeof cb === 'function') {
|
||||
this.timeout = timeout || 1000 * 60
|
||||
}
|
||||
|
||||
this.stream = handshake({ timeout: this.timeout }, cb)
|
||||
this.shake = this.stream.handshake
|
||||
|
||||
pull(this.stream, conn, this.stream)
|
||||
}
|
||||
|
||||
isValid () {
|
||||
return this.conn && this.shake && this.stream
|
||||
this.shake = handshake(this.stream)
|
||||
this.decoder = lp.decode.fromReader(this.shake.reader, { maxDataLength: maxLength })
|
||||
}
|
||||
|
||||
/**
|
||||
* Read and decode message
|
||||
*
|
||||
* @param {Function} cb
|
||||
* @returns {void|Function}
|
||||
* @async
|
||||
* @returns {void}
|
||||
*/
|
||||
read (cb) {
|
||||
if (!this.isValid()) {
|
||||
return cb(new Error('handler is not in a valid state'))
|
||||
async read () {
|
||||
const msg = await this.decoder.next()
|
||||
if (msg.value) {
|
||||
const value = CircuitPB.decode(msg.value.slice())
|
||||
log('read message type', value.type)
|
||||
return value
|
||||
}
|
||||
|
||||
lp.decodeFromReader(
|
||||
this.shake,
|
||||
{ maxLength: this.maxLength },
|
||||
(err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
// this.shake.abort(err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
return cb(null, msg)
|
||||
})
|
||||
log('read received no value, closing stream')
|
||||
// End the stream, we didn't get data
|
||||
this.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode and write array of buffers
|
||||
*
|
||||
* @param {Buffer[]} msg
|
||||
* @param {Function} [cb]
|
||||
* @returns {Function}
|
||||
* @param {*} msg An unencoded CircuitRelay protobuf message
|
||||
*/
|
||||
write (msg, cb) {
|
||||
cb = cb || (() => {})
|
||||
|
||||
if (!this.isValid()) {
|
||||
return cb(new Error('handler is not in a valid state'))
|
||||
}
|
||||
|
||||
pull(
|
||||
values([msg]),
|
||||
lp.encode(),
|
||||
collect((err, encoded) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
this.shake.abort(err)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
encoded.forEach((e) => this.shake.write(e))
|
||||
cb()
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the raw Connection
|
||||
*
|
||||
* @returns {null|Connection|*}
|
||||
*/
|
||||
getRawConn () {
|
||||
return this.conn
|
||||
write (msg) {
|
||||
log('write message type %s', msg.type)
|
||||
this.shake.write(lp.encode.single(CircuitPB.encode(msg)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the handshake rest stream and invalidate handler
|
||||
*
|
||||
* @return {*|{source, sink}}
|
||||
* @return {*} A duplex iterable
|
||||
*/
|
||||
rest () {
|
||||
const rest = this.shake.rest()
|
||||
this.shake.rest()
|
||||
return this.shake.stream
|
||||
}
|
||||
|
||||
this.conn = null
|
||||
this.stream = null
|
||||
this.shake = null
|
||||
return rest
|
||||
end (msg) {
|
||||
this.write(msg)
|
||||
this.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the stream
|
||||
*
|
||||
* @returns {undefined}
|
||||
* @returns {void}
|
||||
*/
|
||||
close () {
|
||||
if (!this.isValid()) {
|
||||
return
|
||||
}
|
||||
|
||||
// close stream
|
||||
pull(
|
||||
empty(),
|
||||
this.rest()
|
||||
)
|
||||
log('closing the stream')
|
||||
this.rest().sink([])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,118 +1,51 @@
|
||||
'use strict'
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const proto = require('../protocol')
|
||||
const { getPeerInfo } = require('../../get-peer-info')
|
||||
const { CircuitRelay } = require('../protocol')
|
||||
|
||||
module.exports = function (swarm) {
|
||||
/**
|
||||
* Get b58 string from multiaddr or peerinfo
|
||||
*
|
||||
* @param {Multiaddr|PeerInfo} peer
|
||||
* @return {*}
|
||||
*/
|
||||
function getB58String (peer) {
|
||||
let b58Id = null
|
||||
if (multiaddr.isMultiaddr(peer)) {
|
||||
const relayMa = multiaddr(peer)
|
||||
b58Id = relayMa.getPeerId()
|
||||
} else if (PeerInfo.isPeerInfo(peer)) {
|
||||
b58Id = peer.id.toB58String()
|
||||
}
|
||||
/**
|
||||
* Write a response
|
||||
*
|
||||
* @param {StreamHandler} streamHandler
|
||||
* @param {CircuitRelay.Status} status
|
||||
*/
|
||||
function writeResponse (streamHandler, status) {
|
||||
streamHandler.write({
|
||||
type: CircuitRelay.Type.STATUS,
|
||||
code: status
|
||||
})
|
||||
}
|
||||
|
||||
return b58Id
|
||||
/**
|
||||
* Validate incomming HOP/STOP message
|
||||
*
|
||||
* @param {*} msg A CircuitRelay unencoded protobuf message
|
||||
* @param {StreamHandler} streamHandler
|
||||
*/
|
||||
function validateAddrs (msg, streamHandler) {
|
||||
try {
|
||||
msg.dstPeer.addrs.forEach((addr) => {
|
||||
return multiaddr(addr)
|
||||
})
|
||||
} catch (err) {
|
||||
writeResponse(streamHandler, msg.type === CircuitRelay.Type.HOP
|
||||
? CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID
|
||||
: CircuitRelay.Status.STOP_DST_MULTIADDR_INVALID)
|
||||
throw err
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to make a peer info from a multiaddrs
|
||||
*
|
||||
* @param {Multiaddr|PeerInfo|PeerId} peer
|
||||
* @return {PeerInfo}
|
||||
* @private
|
||||
*/
|
||||
function peerInfoFromMa (peer) {
|
||||
return getPeerInfo(peer, swarm._peerBook)
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if peer has an existing connection
|
||||
*
|
||||
* @param {String} peerId
|
||||
* @param {Swarm} swarm
|
||||
* @return {Boolean}
|
||||
*/
|
||||
function isPeerConnected (peerId) {
|
||||
return swarm.muxedConns[peerId] || swarm.conns[peerId]
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a response
|
||||
*
|
||||
* @param {StreamHandler} streamHandler
|
||||
* @param {CircuitRelay.Status} status
|
||||
* @param {Function} cb
|
||||
* @returns {*}
|
||||
*/
|
||||
function writeResponse (streamHandler, status, cb) {
|
||||
cb = cb || (() => {})
|
||||
streamHandler.write(proto.CircuitRelay.encode({
|
||||
type: proto.CircuitRelay.Type.STATUS,
|
||||
code: status
|
||||
}))
|
||||
return cb()
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate incomming HOP/STOP message
|
||||
*
|
||||
* @param {CircuitRelay} msg
|
||||
* @param {StreamHandler} streamHandler
|
||||
* @param {CircuitRelay.Type} type
|
||||
* @returns {*}
|
||||
* @param {Function} cb
|
||||
*/
|
||||
function validateAddrs (msg, streamHandler, type, cb) {
|
||||
try {
|
||||
msg.dstPeer.addrs.forEach((addr) => {
|
||||
return multiaddr(addr)
|
||||
})
|
||||
} catch (err) {
|
||||
writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP
|
||||
? proto.CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID
|
||||
: proto.CircuitRelay.Status.STOP_DST_MULTIADDR_INVALID)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
try {
|
||||
msg.srcPeer.addrs.forEach((addr) => {
|
||||
return multiaddr(addr)
|
||||
})
|
||||
} catch (err) {
|
||||
writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP
|
||||
? proto.CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID
|
||||
: proto.CircuitRelay.Status.STOP_SRC_MULTIADDR_INVALID)
|
||||
return cb(err)
|
||||
}
|
||||
|
||||
return cb(null)
|
||||
}
|
||||
|
||||
function peerIdFromId (id) {
|
||||
if (typeof id === 'string') {
|
||||
return PeerId.createFromB58String(id)
|
||||
}
|
||||
|
||||
return PeerId.createFromBytes(id)
|
||||
}
|
||||
|
||||
return {
|
||||
getB58String,
|
||||
peerInfoFromMa,
|
||||
isPeerConnected,
|
||||
validateAddrs,
|
||||
writeResponse,
|
||||
peerIdFromId
|
||||
try {
|
||||
msg.srcPeer.addrs.forEach((addr) => {
|
||||
return multiaddr(addr)
|
||||
})
|
||||
} catch (err) {
|
||||
writeResponse(streamHandler, msg.type === CircuitRelay.Type.HOP
|
||||
? CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID
|
||||
: CircuitRelay.Status.STOP_SRC_MULTIADDR_INVALID)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
validateAddrs
|
||||
}
|
||||
|
@ -1,3 +1,186 @@
|
||||
'use strict'
|
||||
|
||||
module.exports = require('./circuit')
|
||||
const mafmt = require('mafmt')
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
const PeerInfo = require('peer-info')
|
||||
const withIs = require('class-is')
|
||||
const { CircuitRelay: CircuitPB } = require('./protocol')
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:circuit')
|
||||
log.error = debug('libp2p:circuit:error')
|
||||
|
||||
const { relay: multicodec } = require('./multicodec')
|
||||
const createListener = require('./listener')
|
||||
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
|
||||
const { handleStop } = require('./circuit/stop')
|
||||
const StreamHandler = require('./circuit/stream-handler')
|
||||
const toConnection = require('./stream-to-conn')
|
||||
|
||||
class Circuit {
|
||||
/**
|
||||
* Creates an instance of Circuit.
|
||||
*
|
||||
* @constructor
|
||||
* @param {object} options
|
||||
* @param {Libp2p} options.libp2p
|
||||
* @param {Upgrader} options.upgrader
|
||||
*/
|
||||
constructor ({ libp2p, upgrader }) {
|
||||
this._dialer = libp2p.dialer
|
||||
this._registrar = libp2p.registrar
|
||||
this._upgrader = upgrader
|
||||
this._options = libp2p._config.relay
|
||||
this.peerInfo = libp2p.peerInfo
|
||||
this._registrar.handle(multicodec, this._onProtocol.bind(this))
|
||||
}
|
||||
|
||||
async _onProtocol ({ connection, stream, protocol }) {
|
||||
const streamHandler = new StreamHandler({ stream })
|
||||
const request = await streamHandler.read()
|
||||
const circuit = this
|
||||
let virtualConnection
|
||||
|
||||
switch (request.type) {
|
||||
case CircuitPB.Type.CAN_HOP: {
|
||||
log('received CAN_HOP request from %s', connection.remotePeer.toB58String())
|
||||
await handleCanHop({ circuit, connection, streamHandler })
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.HOP: {
|
||||
log('received HOP request from %s', connection.remotePeer.toB58String())
|
||||
virtualConnection = await handleHop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
circuit
|
||||
})
|
||||
break
|
||||
}
|
||||
case CircuitPB.Type.STOP: {
|
||||
log('received STOP request from %s', connection.remotePeer.toB58String())
|
||||
virtualConnection = await handleStop({
|
||||
connection,
|
||||
request,
|
||||
streamHandler,
|
||||
circuit
|
||||
})
|
||||
break
|
||||
}
|
||||
default: {
|
||||
log('Request of type %s not supported', request.type)
|
||||
}
|
||||
}
|
||||
|
||||
if (virtualConnection) {
|
||||
const remoteAddr = multiaddr(request.dstPeer.addrs[0])
|
||||
const localAddr = multiaddr(request.srcPeer.addrs[0])
|
||||
const maConn = toConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr,
|
||||
localAddr
|
||||
})
|
||||
const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound'
|
||||
log('new %s connection %s', type, maConn.remoteAddr)
|
||||
|
||||
const conn = await this._upgrader.upgradeInbound(maConn)
|
||||
log('%s connection %s upgraded', type, maConn.remoteAddr)
|
||||
this.handler && this.handler(conn)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dial a peer over a relay
|
||||
*
|
||||
* @param {multiaddr} ma - the multiaddr of the peer to dial
|
||||
* @param {Object} options - dial options
|
||||
* @param {AbortSignal} [options.signal] - An optional abort signal
|
||||
* @returns {Connection} - the connection
|
||||
*/
|
||||
async dial (ma, options) {
|
||||
// Check the multiaddr to see if it contains a relay and a destination peer
|
||||
const addrs = ma.toString().split('/p2p-circuit')
|
||||
const relayAddr = multiaddr(addrs[0])
|
||||
const destinationAddr = multiaddr(addrs[addrs.length - 1])
|
||||
const relayPeer = PeerId.createFromCID(relayAddr.getPeerId())
|
||||
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
|
||||
|
||||
let disconnectOnFailure = false
|
||||
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
|
||||
if (!relayConnection) {
|
||||
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
|
||||
disconnectOnFailure = true
|
||||
}
|
||||
|
||||
try {
|
||||
const virtualConnection = await hop({
|
||||
connection: relayConnection,
|
||||
circuit: this,
|
||||
request: {
|
||||
type: CircuitPB.Type.HOP,
|
||||
srcPeer: {
|
||||
id: this.peerInfo.id.toBytes(),
|
||||
addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer)
|
||||
},
|
||||
dstPeer: {
|
||||
id: destinationPeer.toBytes(),
|
||||
addrs: [multiaddr(destinationAddr).buffer]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerInfo.id.toB58String()}`)
|
||||
const maConn = toConnection({
|
||||
stream: virtualConnection,
|
||||
remoteAddr: ma,
|
||||
localAddr
|
||||
})
|
||||
log('new outbound connection %s', maConn.remoteAddr)
|
||||
|
||||
return this._upgrader.upgradeOutbound(maConn)
|
||||
} catch (err) {
|
||||
log.error('Circuit relay dial failed', err)
|
||||
disconnectOnFailure && await relayConnection.close()
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a listener
|
||||
*
|
||||
* @param {any} options
|
||||
* @param {Function} handler
|
||||
* @return {listener}
|
||||
*/
|
||||
createListener (options, handler) {
|
||||
if (typeof options === 'function') {
|
||||
handler = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
// Called on successful HOP and STOP requests
|
||||
this.handler = handler
|
||||
|
||||
return createListener(this, options)
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter check for all Multiaddrs that this transport can dial on
|
||||
*
|
||||
* @param {Array<Multiaddr>} multiaddrs
|
||||
* @returns {Array<Multiaddr>}
|
||||
*/
|
||||
filter (multiaddrs) {
|
||||
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
|
||||
|
||||
return multiaddrs.filter((ma) => {
|
||||
return mafmt.Circuit.matches(ma)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {Circuit}
|
||||
*/
|
||||
module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' })
|
||||
|
@ -1,94 +1,42 @@
|
||||
'use strict'
|
||||
|
||||
const setImmediate = require('async/setImmediate')
|
||||
|
||||
const multicodec = require('./multicodec')
|
||||
const EE = require('events').EventEmitter
|
||||
const EventEmitter = require('events')
|
||||
const multiaddr = require('multiaddr')
|
||||
const mafmt = require('mafmt')
|
||||
const Stop = require('./circuit/stop')
|
||||
const Hop = require('./circuit/hop')
|
||||
const proto = require('./protocol')
|
||||
const utilsFactory = require('./circuit/utils')
|
||||
|
||||
const StreamHandler = require('./circuit/stream-handler')
|
||||
|
||||
const debug = require('debug')
|
||||
|
||||
const log = debug('libp2p:circuit:listener')
|
||||
log.err = debug('libp2p:circuit:error:listener')
|
||||
|
||||
module.exports = (swarm, options, connHandler) => {
|
||||
const listener = new EE()
|
||||
const utils = utilsFactory(swarm)
|
||||
|
||||
listener.stopHandler = new Stop(swarm)
|
||||
listener.stopHandler.on('connection', (conn) => listener.emit('connection', conn))
|
||||
listener.hopHandler = new Hop(swarm, options.hop)
|
||||
/**
|
||||
* @param {*} circuit
|
||||
* @returns {Listener} a transport listener
|
||||
*/
|
||||
module.exports = (circuit) => {
|
||||
const listener = new EventEmitter()
|
||||
const listeningAddrs = new Map()
|
||||
|
||||
/**
|
||||
* Add swarm handler and listen for incoming connections
|
||||
*
|
||||
* @param {Multiaddr} ma
|
||||
* @param {Function} callback
|
||||
* @param {Multiaddr} addr
|
||||
* @return {void}
|
||||
*/
|
||||
listener.listen = (ma, callback) => {
|
||||
callback = callback || (() => {})
|
||||
listener.listen = async (addr) => {
|
||||
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)
|
||||
|
||||
swarm.handle(multicodec.relay, (_, conn) => {
|
||||
const sh = new StreamHandler(conn)
|
||||
const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
|
||||
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
|
||||
|
||||
sh.read((err, msg) => {
|
||||
if (err) {
|
||||
log.err(err)
|
||||
return
|
||||
}
|
||||
|
||||
let request = null
|
||||
try {
|
||||
request = proto.CircuitRelay.decode(msg)
|
||||
} catch (err) {
|
||||
return utils.writeResponse(
|
||||
sh,
|
||||
proto.CircuitRelay.Status.MALFORMED_MESSAGE)
|
||||
}
|
||||
|
||||
switch (request.type) {
|
||||
case proto.CircuitRelay.Type.CAN_HOP:
|
||||
case proto.CircuitRelay.Type.HOP: {
|
||||
return listener.hopHandler.handle(request, sh)
|
||||
}
|
||||
|
||||
case proto.CircuitRelay.Type.STOP: {
|
||||
return listener.stopHandler.handle(request, sh, connHandler)
|
||||
}
|
||||
|
||||
default: {
|
||||
utils.writeResponse(
|
||||
sh,
|
||||
proto.CircuitRelay.Status.INVALID_MSG_TYPE)
|
||||
return sh.close()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
setImmediate(() => listener.emit('listen'))
|
||||
callback()
|
||||
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
|
||||
listener.emit('listening')
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove swarm listener
|
||||
* TODO: Remove the peers from our topology
|
||||
*
|
||||
* @param {Function} cb
|
||||
* @return {void}
|
||||
*/
|
||||
listener.close = (cb) => {
|
||||
swarm.unhandle(multicodec.relay)
|
||||
setImmediate(() => listener.emit('close'))
|
||||
cb()
|
||||
}
|
||||
listener.close = () => {}
|
||||
|
||||
/**
|
||||
* Get fixed up multiaddrs
|
||||
@ -104,45 +52,14 @@ module.exports = (swarm, options, connHandler) => {
|
||||
* the encapsulated transport address. This is useful when for example, a peer should only
|
||||
* be dialed over TCP rather than any other transport
|
||||
*
|
||||
* @param {Function} callback
|
||||
* @return {void}
|
||||
* @return {Multiaddr[]}
|
||||
*/
|
||||
listener.getAddrs = (callback) => {
|
||||
let addrs = swarm._peerInfo.multiaddrs.toArray()
|
||||
|
||||
// get all the explicit relay addrs excluding self
|
||||
const p2pAddrs = addrs.filter((addr) => {
|
||||
return mafmt.Circuit.matches(addr) &&
|
||||
!addr.toString().includes(swarm._peerInfo.id.toB58String())
|
||||
})
|
||||
|
||||
// use the explicit relays instead of any relay
|
||||
if (p2pAddrs.length) {
|
||||
addrs = p2pAddrs
|
||||
listener.getAddrs = () => {
|
||||
const addrs = []
|
||||
for (const addr of listeningAddrs.values()) {
|
||||
addrs.push(addr)
|
||||
}
|
||||
|
||||
const listenAddrs = []
|
||||
addrs.forEach((addr) => {
|
||||
const peerMa = `/p2p-circuit/ipfs/${swarm._peerInfo.id.toB58String()}`
|
||||
if (addr.toString() === peerMa) {
|
||||
listenAddrs.push(multiaddr(peerMa))
|
||||
return
|
||||
}
|
||||
|
||||
if (!mafmt.Circuit.matches(addr)) {
|
||||
if (addr.getPeerId()) {
|
||||
// by default we're reachable over any relay
|
||||
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(addr))
|
||||
} else {
|
||||
const ma = `${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`
|
||||
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(ma))
|
||||
}
|
||||
} else {
|
||||
listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`))
|
||||
}
|
||||
})
|
||||
|
||||
callback(null, listenAddrs)
|
||||
return addrs
|
||||
}
|
||||
|
||||
return listener
|
||||
|
49
src/circuit/stream-to-conn.js
Normal file
49
src/circuit/stream-to-conn.js
Normal file
@ -0,0 +1,49 @@
|
||||
'use strict'
|
||||
|
||||
const abortable = require('abortable-iterator')
|
||||
const log = require('debug')('libp2p:circuit:stream')
|
||||
|
||||
// Convert a duplex iterable into a MultiaddrConnection
|
||||
// https://github.com/libp2p/interface-transport#multiaddrconnection
|
||||
module.exports = ({ stream, remoteAddr, localAddr }, options = {}) => {
|
||||
const { sink, source } = stream
|
||||
const maConn = {
|
||||
async sink (source) {
|
||||
if (options.signal) {
|
||||
source = abortable(source, options.signal)
|
||||
}
|
||||
|
||||
try {
|
||||
await sink(source)
|
||||
} catch (err) {
|
||||
// If aborted we can safely ignore
|
||||
if (err.type !== 'aborted') {
|
||||
// If the source errored the socket will already have been destroyed by
|
||||
// toIterable.duplex(). If the socket errored it will already be
|
||||
// destroyed. There's nothing to do here except log the error & return.
|
||||
log(err)
|
||||
}
|
||||
}
|
||||
close()
|
||||
},
|
||||
|
||||
source: options.signal ? abortable(source, options.signal) : source,
|
||||
conn: stream,
|
||||
localAddr,
|
||||
remoteAddr,
|
||||
timeline: { open: Date.now() },
|
||||
|
||||
close () {
|
||||
sink([])
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
||||
function close () {
|
||||
if (!maConn.timeline.close) {
|
||||
maConn.timeline.close = Date.now()
|
||||
}
|
||||
}
|
||||
|
||||
return maConn
|
||||
}
|
@ -90,7 +90,6 @@ class Dialer {
|
||||
nextTick(async () => {
|
||||
try {
|
||||
await this.identifyService.identify(conn, conn.remotePeer)
|
||||
// TODO: Update the PeerStore with the information from identify
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ exports.codes = {
|
||||
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
||||
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
||||
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
||||
ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED',
|
||||
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
|
||||
ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE',
|
||||
ERR_INVALID_PEER: 'ERR_INVALID_PEER',
|
||||
|
21
src/index.js
21
src/index.js
@ -16,6 +16,7 @@ const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
|
||||
const { validate: validateConfig } = require('./config')
|
||||
const { codes } = require('./errors')
|
||||
|
||||
const Circuit = require('./circuit')
|
||||
const Dialer = require('./dialer')
|
||||
const TransportManager = require('./transport-manager')
|
||||
const Upgrader = require('./upgrader')
|
||||
@ -78,9 +79,6 @@ class Libp2p extends EventEmitter {
|
||||
libp2p: this,
|
||||
upgrader: this.upgrader
|
||||
})
|
||||
this._modules.transport.forEach((Transport) => {
|
||||
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
})
|
||||
|
||||
// Attach crypto channels
|
||||
if (this._modules.connEncryption) {
|
||||
@ -95,6 +93,12 @@ class Libp2p extends EventEmitter {
|
||||
peerStore: this.peerStore
|
||||
})
|
||||
|
||||
this._modules.transport.forEach((Transport) => {
|
||||
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
})
|
||||
// TODO: enable relay if enabled
|
||||
this.transportManager.add(Circuit.prototype[Symbol.toStringTag], Circuit)
|
||||
|
||||
// Attach stream multiplexers
|
||||
if (this._modules.streamMuxer) {
|
||||
const muxers = this._modules.streamMuxer
|
||||
@ -182,6 +186,7 @@ class Libp2p extends EventEmitter {
|
||||
this.pubsub && await this.pubsub.stop()
|
||||
this._dht && await this._dht.stop()
|
||||
await this.transportManager.close()
|
||||
await this.registrar.close()
|
||||
} catch (err) {
|
||||
if (err) {
|
||||
log.error(err)
|
||||
@ -282,7 +287,10 @@ class Libp2p extends EventEmitter {
|
||||
this.upgrader.protocols.set(protocol, handler)
|
||||
})
|
||||
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted()) {
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -296,7 +304,10 @@ class Libp2p extends EventEmitter {
|
||||
this.upgrader.protocols.delete(protocol)
|
||||
})
|
||||
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted()) {
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
}
|
||||
}
|
||||
|
||||
async _onStarting () {
|
||||
|
@ -46,6 +46,23 @@ class Registrar {
|
||||
this._handle = handle
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up the registrar
|
||||
* @async
|
||||
*/
|
||||
async close () {
|
||||
// Close all connections we're tracking
|
||||
const tasks = []
|
||||
for (const connectionList of this.connections.values()) {
|
||||
for (const connection of connectionList) {
|
||||
tasks.push(connection.close())
|
||||
}
|
||||
}
|
||||
|
||||
await tasks
|
||||
this.connections.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new connected peer to the record
|
||||
* TODO: this should live in the ConnectionManager
|
||||
@ -100,8 +117,9 @@ class Registrar {
|
||||
getConnection (peerInfo) {
|
||||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||
|
||||
const connections = this.connections.get(peerInfo.id.toB58String())
|
||||
// TODO: what should we return
|
||||
return this.connections.get(peerInfo.id.toB58String())[0]
|
||||
return connections ? connections[0] : null
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -64,7 +64,9 @@ class TransportManager {
|
||||
|
||||
await Promise.all(tasks)
|
||||
log('all listeners closed')
|
||||
this._listeners.clear()
|
||||
for (const key of this._listeners.keys()) {
|
||||
this._listeners.set(key, [])
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -82,6 +84,7 @@ class TransportManager {
|
||||
try {
|
||||
return await transport.dial(ma, options)
|
||||
} catch (err) {
|
||||
if (err.code) throw err
|
||||
throw errCode(err, codes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
const Transport = require('libp2p-tcp')
|
||||
@ -77,14 +78,9 @@ describe('Dialing (direct, TCP)', () => {
|
||||
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
|
||||
try {
|
||||
await dialer.connectToMultiaddr(unsupportedAddr)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToMultiaddr(unsupportedAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
})
|
||||
|
||||
it('should be able to connect to a given peer info', async () => {
|
||||
@ -121,14 +117,9 @@ describe('Dialing (direct, TCP)', () => {
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||
|
||||
try {
|
||||
await dialer.connectToPeer(peerInfo)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToPeer(peerInfo))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
@ -144,14 +135,9 @@ describe('Dialing (direct, TCP)', () => {
|
||||
expect(options.signal.aborted).to.equal(true)
|
||||
})
|
||||
|
||||
try {
|
||||
await dialer.connectToMultiaddr(remoteAddr)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToMultiaddr(remoteAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
|
||||
})
|
||||
|
||||
it('should dial to the max concurrency', async () => {
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
const pDefer = require('p-defer')
|
||||
@ -67,14 +68,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||
const dialer = new Dialer({ transportManager: localTM })
|
||||
|
||||
try {
|
||||
await dialer.connectToMultiaddr(unsupportedAddr)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToMultiaddr(unsupportedAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||
})
|
||||
|
||||
it('should be able to connect to a given peer', async () => {
|
||||
@ -94,14 +90,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
const peerInfo = new PeerInfo(peerId)
|
||||
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||
|
||||
try {
|
||||
await dialer.connectToPeer(peerInfo)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToPeer(peerInfo))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED)
|
||||
})
|
||||
|
||||
it('should abort dials on queue task timeout', async () => {
|
||||
@ -117,14 +108,9 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
expect(options.signal.aborted).to.equal(true)
|
||||
})
|
||||
|
||||
try {
|
||||
await dialer.connectToMultiaddr(remoteAddr)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial should have failed')
|
||||
await expect(dialer.connectToMultiaddr(remoteAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TIMEOUT)
|
||||
})
|
||||
|
||||
it('should dial to the max concurrency', async () => {
|
||||
|
162
test/dialing/relay.node.js
Normal file
162
test/dialing/relay.node.js
Normal file
@ -0,0 +1,162 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const { collect } = require('streaming-iterables')
|
||||
const pipe = require('it-pipe')
|
||||
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
|
||||
const baseOptions = require('../utils/base-options')
|
||||
const Libp2p = require('../../src')
|
||||
const { codes: Errors } = require('../../src/errors')
|
||||
|
||||
describe('Dialing (via relay, TCP)', () => {
|
||||
let srcLibp2p
|
||||
let relayLibp2p
|
||||
let dstLibp2p
|
||||
|
||||
before(async () => {
|
||||
const peerInfos = await createPeerInfoFromFixture(3)
|
||||
// Create 3 nodes, and turn HOP on for the relay
|
||||
;[srcLibp2p, relayLibp2p, dstLibp2p] = peerInfos.map((peerInfo, index) => {
|
||||
const opts = baseOptions
|
||||
index === 1 && (opts.config.relay.hop.enabled = true)
|
||||
return new Libp2p({
|
||||
...opts,
|
||||
peerInfo
|
||||
})
|
||||
})
|
||||
|
||||
dstLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
|
||||
})
|
||||
|
||||
beforeEach(() => {
|
||||
// Start each node
|
||||
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => {
|
||||
// Reset multiaddrs and start
|
||||
libp2p.peerInfo.multiaddrs.clear()
|
||||
libp2p.peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
|
||||
libp2p.start()
|
||||
}))
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
// Stop each node
|
||||
return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop()))
|
||||
})
|
||||
|
||||
it('should be able to connect to a peer over a relay with active connections', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerInfo.id.toString()
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
||||
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
|
||||
const connection = await srcLibp2p.dial(dialAddr)
|
||||
expect(connection).to.exist()
|
||||
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerInfo.id.toBytes())
|
||||
expect(connection.localPeer.toBytes()).to.eql(srcLibp2p.peerInfo.id.toBytes())
|
||||
expect(connection.remoteAddr).to.eql(dialAddr)
|
||||
expect(connection.localAddr).to.eql(
|
||||
relayAddr // the relay address
|
||||
.encapsulate(`/p2p/${relayIdString}`) // with its peer id
|
||||
.encapsulate('/p2p-circuit') // the local peer is connected over the relay
|
||||
.encapsulate(`/p2p/${srcLibp2p.peerInfo.id.toB58String()}`) // and the local peer id
|
||||
)
|
||||
|
||||
const { stream: echoStream } = await connection.newStream('/echo/1.0.0')
|
||||
const input = Buffer.from('hello')
|
||||
const [output] = await pipe(
|
||||
[input],
|
||||
echoStream,
|
||||
collect
|
||||
)
|
||||
expect(output.slice()).to.eql(input)
|
||||
})
|
||||
|
||||
it('should fail to connect to a peer over a relay with inactive connections', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerInfo.id.toString()
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
})
|
||||
|
||||
it('should not stay connected to a relay when not already connected and HOP fails', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerInfo.id.toString()
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
// We should not be connected to the relay, because we weren't before the dial
|
||||
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
expect(srcToRelayConn).to.not.exist()
|
||||
})
|
||||
|
||||
it('dialer should stay connected to an already connected relay on hop failure', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerInfo.id.toString()
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
await srcLibp2p.dial(relayAddr)
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
expect(srcToRelayConn).to.exist()
|
||||
expect(srcToRelayConn.stat.status).to.equal('open')
|
||||
})
|
||||
|
||||
it('destination peer should stay connected to an already connected relay on hop failure', async () => {
|
||||
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
|
||||
const relayIdString = relayLibp2p.peerInfo.id.toString()
|
||||
|
||||
const dialAddr = relayAddr
|
||||
.encapsulate(`/p2p/${relayIdString}`)
|
||||
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
|
||||
|
||||
// Connect the destination peer and the relay
|
||||
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
||||
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||
|
||||
// Tamper with the our multiaddrs for the circuit message
|
||||
sinon.stub(srcLibp2p.peerInfo.multiaddrs, 'toArray').returns([{
|
||||
buffer: Buffer.from('an invalid multiaddr')
|
||||
}])
|
||||
|
||||
await expect(srcLibp2p.dial(dialAddr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
|
||||
|
||||
const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
|
||||
expect(dstToRelayConn).to.exist()
|
||||
expect(dstToRelayConn.stat.status).to.equal('open')
|
||||
})
|
||||
})
|
@ -3,6 +3,7 @@
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
@ -98,20 +99,18 @@ describe('Identify', () => {
|
||||
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY })
|
||||
|
||||
// Run identify
|
||||
try {
|
||||
await Promise.all([
|
||||
localIdentify.identify(localConnectionMock, localPeer.id),
|
||||
remoteIdentify.handleMessage({
|
||||
connection: remoteConnectionMock,
|
||||
stream: remote,
|
||||
protocol: multicodecs.IDENTIFY
|
||||
})
|
||||
])
|
||||
expect.fail('should have thrown')
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
expect(err.code).to.eql(Errors.ERR_INVALID_PEER)
|
||||
}
|
||||
const identifyPromise = Promise.all([
|
||||
localIdentify.identify(localConnectionMock, localPeer.id),
|
||||
remoteIdentify.handleMessage({
|
||||
connection: remoteConnectionMock,
|
||||
stream: remote,
|
||||
protocol: multicodecs.IDENTIFY
|
||||
})
|
||||
])
|
||||
|
||||
await expect(identifyPromise)
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', Errors.ERR_INVALID_PEER)
|
||||
})
|
||||
|
||||
describe('push', () => {
|
||||
@ -229,6 +228,7 @@ describe('Identify', () => {
|
||||
|
||||
// Wait for identify to finish
|
||||
await libp2p.dialer.identifyService.identify.firstCall.returnValue
|
||||
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||
|
||||
libp2p.handle('/echo/2.0.0', () => {})
|
||||
libp2p.unhandle('/echo/2.0.0')
|
||||
|
@ -54,4 +54,19 @@ describe('registrar on dial', () => {
|
||||
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
|
||||
expect(remoteConn).to.exist()
|
||||
})
|
||||
|
||||
it('should be closed on libp2p stop', async () => {
|
||||
libp2p = new Libp2p(mergeOptions(baseOptions, {
|
||||
peerInfo
|
||||
}))
|
||||
|
||||
await libp2p.dial(remoteAddr)
|
||||
expect(libp2p.registrar.connections.size).to.equal(1)
|
||||
|
||||
sinon.spy(libp2p.registrar, 'close')
|
||||
|
||||
await libp2p.stop()
|
||||
expect(libp2p.registrar.close.callCount).to.equal(1)
|
||||
expect(libp2p.registrar.connections.size).to.equal(0)
|
||||
})
|
||||
})
|
||||
|
@ -24,26 +24,14 @@ describe('registrar', () => {
|
||||
})
|
||||
|
||||
it('should fail to register a protocol if no multicodec is provided', () => {
|
||||
try {
|
||||
registrar.register()
|
||||
} catch (err) {
|
||||
expect(err).to.exist()
|
||||
return
|
||||
}
|
||||
throw new Error('should fail to register a protocol if no multicodec is provided')
|
||||
expect(() => registrar.register()).to.throw()
|
||||
})
|
||||
|
||||
it('should fail to register a protocol if an invalid topology is provided', () => {
|
||||
const fakeTopology = {
|
||||
random: 1
|
||||
}
|
||||
try {
|
||||
registrar.register()
|
||||
} catch (err) {
|
||||
expect(err).to.exist(fakeTopology)
|
||||
return
|
||||
}
|
||||
throw new Error('should fail to register a protocol if an invalid topology is provided')
|
||||
expect(() => registrar.register(fakeTopology)).to.throw()
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -38,11 +38,12 @@ describe('Transport Manager (TCP)', () => {
|
||||
it('should be able to listen', async () => {
|
||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
await tm.listen(addrs)
|
||||
expect(tm._listeners.size).to.equal(1)
|
||||
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
|
||||
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
|
||||
// Ephemeral ip addresses may result in multiple listeners
|
||||
expect(tm.getAddrs().length).to.equal(addrs.length)
|
||||
await tm.close()
|
||||
expect(tm._listeners.size).to.equal(0)
|
||||
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
|
||||
})
|
||||
|
||||
it('should be able to dial', async () => {
|
||||
|
@ -3,6 +3,7 @@
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
chai.use(require('chai-as-promised'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
@ -39,21 +40,25 @@ describe('Transport Manager (WebSockets)', () => {
|
||||
await tm.remove(Transport.prototype[Symbol.toStringTag])
|
||||
})
|
||||
|
||||
it('should not be able to add a transport without a key', () => {
|
||||
expect(() => {
|
||||
it('should not be able to add a transport without a key', async () => {
|
||||
// Chai as promised conflicts with normal `throws` validation,
|
||||
// so wrap the call in an async function
|
||||
await expect((async () => { // eslint-disable-line
|
||||
tm.add(undefined, Transport)
|
||||
}).to.throw().that.satisfies((err) => {
|
||||
return err.code === ErrorCodes.ERR_INVALID_KEY
|
||||
})
|
||||
})())
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_INVALID_KEY)
|
||||
})
|
||||
|
||||
it('should not be able to add a transport twice', () => {
|
||||
it('should not be able to add a transport twice', async () => {
|
||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
expect(() => {
|
||||
// Chai as promised conflicts with normal `throws` validation,
|
||||
// so wrap the call in an async function
|
||||
await expect((async () => { // eslint-disable-line
|
||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
}).to.throw().that.satisfies((err) => {
|
||||
return err.code === ErrorCodes.ERR_DUPLICATE_TRANSPORT
|
||||
})
|
||||
})())
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_DUPLICATE_TRANSPORT)
|
||||
})
|
||||
|
||||
it('should be able to dial', async () => {
|
||||
@ -67,27 +72,18 @@ describe('Transport Manager (WebSockets)', () => {
|
||||
it('should fail to dial an unsupported address', async () => {
|
||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
const addr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||
try {
|
||||
await tm.dial(addr)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('Dial attempt should have failed')
|
||||
await expect(tm.dial(addr))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||
})
|
||||
|
||||
it('should fail to listen with no valid address', async () => {
|
||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
const addrs = [multiaddr('/ip4/127.0.0.1/tcp/0')]
|
||||
try {
|
||||
await tm.listen(addrs)
|
||||
} catch (err) {
|
||||
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_NO_VALID_ADDRESSES)
|
||||
return
|
||||
}
|
||||
|
||||
expect.fail('should have failed')
|
||||
await expect(tm.listen(addrs))
|
||||
.to.eventually.be.rejected()
|
||||
.and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
|
||||
})
|
||||
})
|
||||
|
||||
@ -115,7 +111,8 @@ describe('libp2p.transportManager', () => {
|
||||
})
|
||||
|
||||
expect(libp2p.transportManager).to.exist()
|
||||
expect(libp2p.transportManager._transports.size).to.equal(1)
|
||||
// Our transport and circuit relay
|
||||
expect(libp2p.transportManager._transports.size).to.equal(2)
|
||||
})
|
||||
|
||||
it('starting and stopping libp2p should start and stop TransportManager', async () => {
|
||||
|
@ -9,5 +9,13 @@ module.exports = {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
},
|
||||
config: {
|
||||
relay: {
|
||||
enabled: true,
|
||||
hop: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,5 +9,13 @@ module.exports = {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
},
|
||||
config: {
|
||||
relay: {
|
||||
enabled: true,
|
||||
hop: {
|
||||
enabled: false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ const PeerInfo = require('peer-info')
|
||||
|
||||
const Peers = require('../../fixtures/peers')
|
||||
|
||||
module.exports.createPeerInfo = async (length) => {
|
||||
async function createPeerInfo (length) {
|
||||
const peers = await Promise.all(
|
||||
Array.from({ length })
|
||||
.map((_, i) => PeerId.create())
|
||||
@ -14,11 +14,19 @@ module.exports.createPeerInfo = async (length) => {
|
||||
return peers.map((peer) => new PeerInfo(peer))
|
||||
}
|
||||
|
||||
module.exports.createPeerInfoFromFixture = async (length) => {
|
||||
const peers = await Promise.all(
|
||||
function createPeerIdsFromFixture (length) {
|
||||
return Promise.all(
|
||||
Array.from({ length })
|
||||
.map((_, i) => PeerId.createFromJSON(Peers[i]))
|
||||
)
|
||||
}
|
||||
|
||||
async function createPeerInfoFromFixture (length) {
|
||||
const peers = await createPeerIdsFromFixture(length)
|
||||
|
||||
return peers.map((peer) => new PeerInfo(peer))
|
||||
}
|
||||
|
||||
module.exports.createPeerInfo = createPeerInfo
|
||||
module.exports.createPeerIdsFromFixture = createPeerIdsFromFixture
|
||||
module.exports.createPeerInfoFromFixture = createPeerInfoFromFixture
|
||||
|
@ -1,10 +1,15 @@
|
||||
'use strict'
|
||||
|
||||
const pipe = require('it-pipe')
|
||||
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||
const multiaddr = require('multiaddr')
|
||||
|
||||
const Muxer = require('libp2p-mplex')
|
||||
const Multistream = require('multistream-select')
|
||||
const pair = require('it-pair')
|
||||
const errCode = require('err-code')
|
||||
const { codes } = require('../../src/errors')
|
||||
|
||||
const mockMultiaddrConnPair = require('./mockMultiaddrConn')
|
||||
const peerUtils = require('./creators/peer')
|
||||
|
||||
module.exports = async (properties = {}) => {
|
||||
@ -48,3 +53,103 @@ module.exports = async (properties = {}) => {
|
||||
...properties
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a full connection pair, without the transport or encryption
|
||||
*
|
||||
* @param {object} options
|
||||
* @param {Multiaddr[]} options.addrs Should contain two addresses for the local and remote peer respectively
|
||||
* @param {PeerId[]} options.remotePeer Should contain two peer ids, for the local and remote peer respectively
|
||||
* @param {Map<string, function>} options.protocols The protocols the connections should support
|
||||
* @returns {{inbound:Connection, outbound:Connection}}
|
||||
*/
|
||||
module.exports.pair = function connectionPair ({ addrs, peers, protocols }) {
|
||||
const [localPeer, remotePeer] = peers
|
||||
|
||||
const {
|
||||
inbound: inboundMaConn,
|
||||
outbound: outboundMaConn
|
||||
} = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||
|
||||
const inbound = createConnection({
|
||||
direction: 'inbound',
|
||||
maConn: inboundMaConn,
|
||||
protocols,
|
||||
// Inbound connection peers are reversed
|
||||
localPeer: remotePeer,
|
||||
remotePeer: localPeer
|
||||
})
|
||||
const outbound = createConnection({
|
||||
direction: 'outbound',
|
||||
maConn: outboundMaConn,
|
||||
protocols,
|
||||
localPeer,
|
||||
remotePeer
|
||||
})
|
||||
|
||||
return { inbound, outbound }
|
||||
}
|
||||
|
||||
function createConnection ({
|
||||
direction,
|
||||
maConn,
|
||||
localPeer,
|
||||
remotePeer,
|
||||
protocols
|
||||
}) {
|
||||
// Create the muxer
|
||||
const muxer = new Muxer({
|
||||
// Run anytime a remote stream is created
|
||||
onStream: async muxedStream => {
|
||||
const mss = new Multistream.Listener(muxedStream)
|
||||
try {
|
||||
const { stream, protocol } = await mss.handle(Array.from(protocols.keys()))
|
||||
connection.addStream(stream, protocol)
|
||||
// Need to be able to notify a peer of this this._onStream({ connection, stream, protocol })
|
||||
const handler = protocols.get(protocol)
|
||||
handler({ connection, stream, protocol })
|
||||
} catch (err) {
|
||||
// Do nothing
|
||||
}
|
||||
},
|
||||
// Run anytime a stream closes
|
||||
onStreamEnd: muxedStream => {
|
||||
connection.removeStream(muxedStream.id)
|
||||
}
|
||||
})
|
||||
|
||||
const newStream = async protocols => {
|
||||
const muxedStream = muxer.newStream()
|
||||
const mss = new Multistream.Dialer(muxedStream)
|
||||
try {
|
||||
const { stream, protocol } = await mss.select(protocols)
|
||||
return { stream: { ...muxedStream, ...stream }, protocol }
|
||||
} catch (err) {
|
||||
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||
}
|
||||
}
|
||||
|
||||
// Pipe all data through the muxer
|
||||
pipe(maConn, muxer, maConn)
|
||||
|
||||
maConn.timeline.upgraded = Date.now()
|
||||
|
||||
// Create the connection
|
||||
const connection = new Connection({
|
||||
localAddr: maConn.localAddr,
|
||||
remoteAddr: maConn.remoteAddr,
|
||||
localPeer: localPeer,
|
||||
remotePeer: remotePeer,
|
||||
stat: {
|
||||
direction,
|
||||
timeline: maConn.timeline,
|
||||
multiplexer: Muxer.multicodec,
|
||||
encryption: 'N/A'
|
||||
},
|
||||
newStream,
|
||||
getStreams: () => muxer.streams,
|
||||
close: err => maConn.close(err)
|
||||
})
|
||||
|
||||
return connection
|
||||
}
|
||||
|
@ -13,10 +13,11 @@ const AbortController = require('abort-controller')
|
||||
*/
|
||||
module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) {
|
||||
const controller = new AbortController()
|
||||
const [localAddr, remoteAddr] = addrs
|
||||
|
||||
const [inbound, outbound] = duplexPair()
|
||||
outbound.localAddr = addrs[0]
|
||||
outbound.remoteAddr = addrs[1].encapsulate(`/p2p/${remotePeer.toB58String()}`)
|
||||
outbound.localAddr = localAddr
|
||||
outbound.remoteAddr = remoteAddr.encapsulate(`/p2p/${remotePeer.toB58String()}`)
|
||||
outbound.timeline = {
|
||||
open: Date.now()
|
||||
}
|
||||
@ -25,8 +26,8 @@ module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) {
|
||||
controller.abort()
|
||||
}
|
||||
|
||||
inbound.localAddr = addrs[1]
|
||||
inbound.remoteAddr = addrs[0]
|
||||
inbound.localAddr = remoteAddr
|
||||
inbound.remoteAddr = localAddr
|
||||
inbound.timeline = {
|
||||
open: Date.now()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user