diff --git a/.aegir.js b/.aegir.js index 51e31c89..f5eb7339 100644 --- a/.aegir.js +++ b/.aegir.js @@ -23,6 +23,15 @@ const before = async () => { transport: [WebSockets], streamMuxer: [Muxer], connEncryption: [Crypto] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: true, + active: false + } + } } }) // Add the echo protocol diff --git a/package.json b/package.json index b923e288..e80409f8 100644 --- a/package.json +++ b/package.json @@ -59,13 +59,13 @@ "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", - "multiaddr": "^7.1.0", + "multiaddr": "^7.2.1", "multistream-select": "^0.15.0", "once": "^1.4.0", "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", - "peer-id": "^0.13.3", + "peer-id": "^0.13.4", "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", "protons": "^1.0.1", @@ -80,6 +80,7 @@ "abortable-iterator": "^2.1.0", "aegir": "^20.0.0", "chai": "^4.2.0", + "chai-as-promised": "^7.1.1", "chai-checkmark": "^1.0.1", "cids": "^0.7.1", "delay": "^4.3.0", diff --git a/src/circuit/README.md b/src/circuit/README.md index 36b659f7..b9f89fcf 100644 --- a/src/circuit/README.md +++ b/src/circuit/README.md @@ -32,8 +32,6 @@ Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes - [Example](#example) - [Create dialer/listener](#create-dialerlistener) - [Create `relay`](#create-relay) - - [This module uses `pull-streams`](#this-module-uses-pull-streams) - - [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams) - [API](#api) - [Implementation rational](#implementation-rational) @@ -48,7 +46,7 @@ const Circuit = require('libp2p-circuit') const multiaddr = require('multiaddr') const pull = require('pull-stream') -const mh1 = multiaddr('/p2p-circuit/ipfs/QmHash') // dial /ipfs/QmHash over any circuit +const mh1 = multiaddr('/p2p-circuit/p2p/QmHash') // dial /p2p/QmHash over any circuit const circuit = new Circuit(swarmInstance, options) // pass swarm instance and options @@ -91,37 +89,13 @@ const relay = new Relay(options) relay.mount(swarmInstance) // start relaying traffic ``` -### This module uses `pull-streams` - -We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). - -You can learn more about pull-streams at: - -- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) -- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) -- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) -- [pull-streams documentation](https://pull-stream.github.io/) - -#### Converting `pull-streams` to Node.js Streams - -If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: - -```js -const pullToStream = require('pull-stream-to-stream') - -const nodeStreamInstance = pullToStream(pullStreamInstance) -// nodeStreamInstance is an instance of a Node.js Stream -``` - -To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. - ## API [![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) `libp2p-circuit` accepts Circuit addresses for both IPFS and non IPFS encapsulated addresses, i.e: -`/p2p-circuit/ip4/127.0.0.1/tcp/4001/ipfs/QmHash` +`/p2p-circuit/ip4/127.0.0.1/tcp/4001/p2p/QmHash` Both for dialing and listening. diff --git a/src/circuit/circuit.js b/src/circuit/circuit.js deleted file mode 100644 index 17597ab4..00000000 --- a/src/circuit/circuit.js +++ /dev/null @@ -1,126 +0,0 @@ -'use strict' - -const mafmt = require('mafmt') -const multiaddr = require('multiaddr') - -const CircuitDialer = require('./circuit/dialer') -const utilsFactory = require('./circuit/utils') - -const debug = require('debug') -const log = debug('libp2p:circuit:transportdialer') -log.err = debug('libp2p:circuit:error:transportdialer') - -const createListener = require('./listener') - -class Circuit { - static get tag () { - return 'Circuit' - } - - /** - * Creates an instance of Dialer. - * - * @param {Swarm} swarm - the swarm - * @param {any} options - config options - * - * @memberOf Dialer - */ - constructor (swarm, options) { - this.options = options || {} - - this.swarm = swarm - this.dialer = null - this.utils = utilsFactory(swarm) - this.peerInfo = this.swarm._peerInfo - this.relays = this.filter(this.peerInfo.multiaddrs.toArray()) - - // if no explicit relays, add a default relay addr - if (this.relays.length === 0) { - this.peerInfo - .multiaddrs - .add(`/p2p-circuit/ipfs/${this.peerInfo.id.toB58String()}`) - } - - this.dialer = new CircuitDialer(swarm, options) - - this.swarm.on('peer-mux-established', (peerInfo) => { - this.dialer.canHop(peerInfo) - }) - this.swarm.on('peer-mux-closed', (peerInfo) => { - this.dialer.relayPeers.delete(peerInfo.id.toB58String()) - }) - } - - /** - * Dial the relays in the Addresses.Swarm config - * - * @param {Array} relays - * @return {void} - */ - _dialSwarmRelays () { - // if we have relay addresses in swarm config, then dial those relays - this.relays.forEach((relay) => { - const relaySegments = relay - .toString() - .split('/p2p-circuit') - .filter(segment => segment.length) - - relaySegments.forEach((relaySegment) => { - const ma = this.utils.peerInfoFromMa(multiaddr(relaySegment)) - this.dialer._dialRelay(ma) - }) - }) - } - - /** - * Dial a peer over a relay - * - * @param {multiaddr} ma - the multiaddr of the peer to dial - * @param {Object} options - dial options - * @param {Function} cb - a callback called once dialed - * @returns {Connection} - the connection - * - * @memberOf Dialer - */ - dial (ma, options, cb) { - return this.dialer.dial(ma, options, cb) - } - - /** - * Create a listener - * - * @param {any} options - * @param {Function} handler - * @return {listener} - */ - createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = this.options || {} - } - - const listener = createListener(this.swarm, options, handler) - listener.on('listen', this._dialSwarmRelays.bind(this)) - return listener - } - - /** - * Filter check for all multiaddresses - * that this transport can dial on - * - * @param {any} multiaddrs - * @returns {Array} - * - * @memberOf Dialer - */ - filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } - return multiaddrs.filter((ma) => { - return mafmt.Circuit.matches(ma) - }) - } -} - -module.exports = Circuit diff --git a/src/circuit/circuit/dialer.js b/src/circuit/circuit/dialer.js deleted file mode 100644 index 2c250e2c..00000000 --- a/src/circuit/circuit/dialer.js +++ /dev/null @@ -1,275 +0,0 @@ -'use strict' - -const once = require('once') -const PeerId = require('peer-id') -const waterfall = require('async/waterfall') -const setImmediate = require('async/setImmediate') -const multiaddr = require('multiaddr') - -const { Connection } = require('libp2p-interfaces/src/connection') - -const utilsFactory = require('./utils') -const StreamHandler = require('./stream-handler') - -const debug = require('debug') -const log = debug('libp2p:circuit:dialer') -log.err = debug('libp2p:circuit:error:dialer') - -const multicodec = require('../multicodec') -const proto = require('../protocol') - -class Dialer { - /** - * Creates an instance of Dialer. - * @param {Swarm} swarm - the swarm - * @param {any} options - config options - * - * @memberOf Dialer - */ - constructor (swarm, options) { - this.swarm = swarm - this.relayPeers = new Map() - this.relayConns = new Map() - this.options = options - this.utils = utilsFactory(swarm) - } - - /** - * Helper that returns a relay connection - * - * @param {*} relay - * @param {*} callback - * @returns {Function} - callback - */ - _dialRelayHelper (relay, callback) { - if (this.relayConns.has(relay.id.toB58String())) { - return callback(null, this.relayConns.get(relay.id.toB58String())) - } - - return this._dialRelay(relay, callback) - } - - /** - * Dial a peer over a relay - * - * @param {multiaddr} ma - the multiaddr of the peer to dial - * @param {Function} cb - a callback called once dialed - * @returns {Connection} - the connection - * - */ - dial (ma, cb) { - cb = cb || (() => { }) - const strMa = ma.toString() - if (!strMa.includes('/p2p-circuit')) { - log.err('invalid circuit address') - return cb(new Error('invalid circuit address')) - } - - const addr = strMa.split('p2p-circuit') // extract relay address if any - const relay = addr[0] === '/' ? null : multiaddr(addr[0]) - const peer = multiaddr(addr[1] || addr[0]) - - const dstConn = new Connection() - setImmediate( - this._dialPeer.bind(this), - peer, - relay, - (err, conn) => { - if (err) { - log.err(err) - return cb(err) - } - - dstConn.setInnerConn(conn) - cb(null, dstConn) - }) - - return dstConn - } - - /** - * Does the peer support the HOP protocol - * - * @param {PeerInfo} peer - * @param {Function} callback - * @returns {void} - */ - canHop (peer, callback) { - callback = once(callback || (() => { })) - - this._dialRelayHelper(peer, (err, conn) => { - if (err) { - return callback(err) - } - - const sh = new StreamHandler(conn) - waterfall([ - (cb) => sh.write(proto.CircuitRelay.encode({ - type: proto.CircuitRelay.Type.CAN_HOP - }), cb), - (cb) => sh.read(cb) - ], (err, msg) => { - if (err) { - return callback(err) - } - const response = proto.CircuitRelay.decode(msg) - - if (response.code !== proto.CircuitRelay.Status.SUCCESS) { - const err = new Error(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`) - log(err) - return callback(err) - } - - log('HOP supported adding as relay - %s', this.utils.getB58String(peer)) - this.relayPeers.set(this.utils.getB58String(peer), peer) - sh.close() - callback() - }) - }) - } - - /** - * Dial the destination peer over a relay - * - * @param {multiaddr} dstMa - * @param {Connection|PeerInfo} relay - * @param {Function} cb - * @return {Function|void} - * @private - */ - _dialPeer (dstMa, relay, cb) { - if (typeof relay === 'function') { - cb = relay - relay = null - } - - if (!cb) { - cb = () => {} - } - - dstMa = multiaddr(dstMa) - // if no relay provided, dial on all available relays until one succeeds - if (!relay) { - const relays = Array.from(this.relayPeers.values()) - const next = (nextRelay) => { - if (!nextRelay) { - const err = 'no relay peers were found or all relays failed to dial' - log.err(err) - return cb(err) - } - - return this._negotiateRelay( - nextRelay, - dstMa, - (err, conn) => { - if (err) { - log.err(err) - return next(relays.shift()) - } - cb(null, conn) - }) - } - next(relays.shift()) - } else { - return this._negotiateRelay( - relay, - dstMa, - (err, conn) => { - if (err) { - log.err('An error has occurred negotiating the relay connection', err) - return cb(err) - } - - return cb(null, conn) - }) - } - } - - /** - * Negotiate the relay connection - * - * @param {Multiaddr|PeerInfo|Connection} relay - the Connection or PeerInfo of the relay - * @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for - * @param {Function} callback - a callback which gets the negotiated relay connection - * @returns {void} - * @private - * - * @memberOf Dialer - */ - _negotiateRelay (relay, dstMa, callback) { - dstMa = multiaddr(dstMa) - relay = this.utils.peerInfoFromMa(relay) - const srcMas = this.swarm._peerInfo.multiaddrs.toArray() - this._dialRelayHelper(relay, (err, conn) => { - if (err) { - log.err(err) - return callback(err) - } - const sh = new StreamHandler(conn) - waterfall([ - (cb) => { - log('negotiating relay for peer %s', dstMa.getPeerId()) - let dstPeerId - try { - dstPeerId = PeerId.createFromB58String(dstMa.getPeerId()).id - } catch (err) { - return cb(err) - } - sh.write( - proto.CircuitRelay.encode({ - type: proto.CircuitRelay.Type.HOP, - srcPeer: { - id: this.swarm._peerInfo.id.id, - addrs: srcMas.map((addr) => addr.buffer) - }, - dstPeer: { - id: dstPeerId, - addrs: [dstMa.buffer] - } - }), cb) - }, - (cb) => sh.read(cb) - ], (err, msg) => { - if (err) { - return callback(err) - } - const message = proto.CircuitRelay.decode(msg) - if (message.type !== proto.CircuitRelay.Type.STATUS) { - return callback(new Error('Got invalid message type - ' + - `expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`)) - } - - if (message.code !== proto.CircuitRelay.Status.SUCCESS) { - return callback(new Error(`Got ${message.code} error code trying to dial over relay`)) - } - - callback(null, new Connection(sh.rest())) - }) - }) - } - - /** - * Dial a relay peer by its PeerInfo - * - * @param {PeerInfo} peer - the PeerInfo of the relay peer - * @param {Function} cb - a callback with the connection to the relay peer - * @returns {void} - * @private - */ - _dialRelay (peer, cb) { - cb = once(cb || (() => { })) - - this.swarm.dial( - peer, - multicodec.relay, - once((err, conn) => { - if (err) { - log.err(err) - return cb(err) - } - cb(null, conn) - })) - } -} - -module.exports = Dialer diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index 33448655..0db27c0d 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -1,283 +1,136 @@ 'use strict' -const pull = require('pull-stream/pull') const debug = require('debug') const PeerInfo = require('peer-info') const PeerId = require('peer-id') -const EE = require('events').EventEmitter -const once = require('once') -const utilsFactory = require('./utils') +const { validateAddrs } = require('./utils') const StreamHandler = require('./stream-handler') -const proto = require('../protocol').CircuitRelay -const multiaddr = require('multiaddr') -const series = require('async/series') -const waterfall = require('async/waterfall') -const setImmediate = require('async/setImmediate') +const { CircuitRelay: CircuitPB } = require('../protocol') +const pipe = require('it-pipe') +const errCode = require('err-code') +const { codes: Errors } = require('../../errors') + +const { stop } = require('./stop') const multicodec = require('./../multicodec') -const log = debug('libp2p:circuit:relay') -log.err = debug('libp2p:circuit:error:relay') +const log = debug('libp2p:circuit:hop') +log.error = debug('libp2p:circuit:hop:error') -class Hop extends EE { - /** - * Construct a Circuit object - * - * This class will handle incoming circuit connections and - * either start a relay or hand the relayed connection to - * the swarm - * - * @param {Swarm} swarm - * @param {Object} options - */ - constructor (swarm, options) { - super() - this.swarm = swarm - this.peerInfo = this.swarm._peerInfo - this.utils = utilsFactory(swarm) - this.config = options || { active: false, enabled: false } - this.active = this.config.active - } - - /** - * Handle the relay message - * - * @param {CircuitRelay} message - * @param {StreamHandler} sh - * @returns {*} - */ - handle (message, sh) { - if (!this.config.enabled) { - this.utils.writeResponse( - sh, - proto.Status.HOP_CANT_SPEAK_RELAY) - return sh.close() - } - - // check if message is `CAN_HOP` - if (message.type === proto.Type.CAN_HOP) { - this.utils.writeResponse( - sh, - proto.Status.SUCCESS) - return sh.close() - } - - // This is a relay request - validate and create a circuit - let srcPeerId = null - let dstPeerId = null - try { - srcPeerId = PeerId.createFromBytes(message.srcPeer.id).toB58String() - dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String() - } catch (err) { - log.err(err) - - if (!srcPeerId) { - this.utils.writeResponse( - sh, - proto.Status.HOP_SRC_MULTIADDR_INVALID) - return sh.close() - } - - if (!dstPeerId) { - this.utils.writeResponse( - sh, - proto.Status.HOP_DST_MULTIADDR_INVALID) - return sh.close() - } - } - - if (srcPeerId === dstPeerId) { - this.utils.writeResponse( - sh, - proto.Status.HOP_CANT_RELAY_TO_SELF) - return sh.close() - } - - if (!message.dstPeer.addrs.length) { - // TODO: use encapsulate here - const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer - message.dstPeer.addrs.push(addr) - } - - log('trying to establish a circuit: %s <-> %s', srcPeerId, dstPeerId) - const noPeer = () => { - // log.err(err) - this.utils.writeResponse( - sh, - proto.Status.HOP_NO_CONN_TO_DST) - return sh.close() - } - - const isConnected = (cb) => { - let dstPeer - try { - dstPeer = this.swarm._peerBook.get(dstPeerId) - if (!dstPeer.isConnected() && !this.active) { - const err = new Error(`No Connection to peer ${dstPeerId}`) - noPeer(err) - return cb(err) - } - } catch (err) { - if (!this.active) { - noPeer(err) - return cb(err) - } - } - cb() - } - - series([ - (cb) => this.utils.validateAddrs(message, sh, proto.Type.HOP, cb), - (cb) => isConnected(cb), - (cb) => this._circuit(sh, message, cb) - ], (err) => { - if (err) { - log.err(err) - sh.close() - return setImmediate(() => this.emit('circuit:error', err)) - } - setImmediate(() => this.emit('circuit:success')) +module.exports.handleHop = async function handleHop ({ + connection, + request, + streamHandler, + circuit +}) { + // Ensure hop is enabled + if (!circuit._options.hop.enabled) { + log('HOP request received but we are not acting as a relay') + return streamHandler.end({ + type: CircuitPB.Type.STATUS, + code: CircuitPB.Status.HOP_CANT_SPEAK_RELAY }) } - /** - * Connect to STOP - * - * @param {PeerInfo} peer - * @param {StreamHandler} srcSh - * @param {function} callback - * @returns {void} - */ - _connectToStop (peer, srcSh, callback) { - this._dialPeer(peer, (err, dstConn) => { - if (err) { - this.utils.writeResponse( - srcSh, - proto.Status.HOP_CANT_DIAL_DST) - log.err(err) - return callback(err) - } + // Validate the HOP request has the required input + try { + validateAddrs(request, streamHandler) + } catch (err) { + return log.error('invalid hop request via peer %s', connection.remotePeer.toB58String(), err) + } - return this.utils.writeResponse( - srcSh, - proto.Status.SUCCESS, - (err) => { - if (err) { - log.err(err) - return callback(err) - } - return callback(null, dstConn) - }) + // Get the connection to the destination (stop) peer + const destinationPeer = new PeerId(request.dstPeer.id) + + const destinationConnection = circuit._registrar.getConnection(new PeerInfo(destinationPeer)) + if (!destinationConnection && !circuit._options.hop.active) { + log('HOP request received but we are not connected to the destination peer') + return streamHandler.end({ + type: CircuitPB.Type.STATUS, + code: CircuitPB.Status.HOP_NO_CONN_TO_DST }) } - /** - * Negotiate STOP - * - * @param {StreamHandler} dstSh - * @param {StreamHandler} srcSh - * @param {CircuitRelay} message - * @param {function} callback - * @returns {void} - */ - _negotiateStop (dstSh, srcSh, message, callback) { - const stopMsg = Object.assign({}, message, { - type: proto.Type.STOP // change the message type + // TODO: Handle being an active relay + + // Handle the incoming HOP request by performing a STOP request + const stopRequest = { + type: CircuitPB.Type.STOP, + dstPeer: request.dstPeer, + srcPeer: request.srcPeer + } + + let destinationStream + try { + destinationStream = await stop({ + connection: destinationConnection, + request: stopRequest, + circuit }) - dstSh.write(proto.encode(stopMsg), - (err) => { - if (err) { - this.utils.writeResponse( - srcSh, - proto.Status.HOP_CANT_OPEN_DST_STREAM) - log.err(err) - return callback(err) - } - - // read response from STOP - dstSh.read((err, msg) => { - if (err) { - log.err(err) - return callback(err) - } - - const message = proto.decode(msg) - if (message.code !== proto.Status.SUCCESS) { - return callback(new Error('Unable to create circuit!')) - } - - return callback(null, msg) - }) - }) + } catch (err) { + return log.error(err) } - /** - * Attempt to make a circuit from A <-> R <-> B where R is this relay - * - * @param {StreamHandler} srcSh - the source stream handler - * @param {CircuitRelay} message - the message with the src and dst entries - * @param {Function} callback - callback to signal success or failure - * @returns {void} - * @private - */ - _circuit (srcSh, message, callback) { - let dstSh = null - waterfall([ - (cb) => this._connectToStop(message.dstPeer, srcSh, cb), - (_dstConn, cb) => { - dstSh = new StreamHandler(_dstConn) - this._negotiateStop(dstSh, srcSh, message, cb) - } - ], (err) => { - if (err) { - // close/end the source stream if there was an error - if (srcSh) { - srcSh.close() - } + log('hop request from %s is valid', connection.remotePeer.toB58String()) + streamHandler.write({ + type: CircuitPB.Type.STATUS, + code: CircuitPB.Status.SUCCESS + }) + const sourceStream = streamHandler.rest() - if (dstSh) { - dstSh.close() - } - return callback(err) - } - - const src = srcSh.rest() - const dst = dstSh.rest() - - const srcIdStr = PeerId.createFromBytes(message.srcPeer.id).toB58String() - const dstIdStr = PeerId.createFromBytes(message.dstPeer.id).toB58String() - - // circuit the src and dst streams - pull( - src, - dst, - src - ) - log('circuit %s <-> %s established', srcIdStr, dstIdStr) - callback() - }) - } - - /** - * Dial the dest peer and create a circuit - * - * @param {Multiaddr} dstPeer - * @param {Function} callback - * @returns {void} - * @private - */ - _dialPeer (dstPeer, callback) { - const peerInfo = new PeerInfo(PeerId.createFromBytes(dstPeer.id)) - dstPeer.addrs.forEach((a) => peerInfo.multiaddrs.add(a)) - this.swarm.dial(peerInfo, multicodec.relay, once((err, conn) => { - if (err) { - log.err(err) - return callback(err) - } - - callback(null, conn) - })) - } + // Short circuit the two streams to create the relayed connection + return pipe( + sourceStream, + destinationStream, + sourceStream + ) } -module.exports = Hop +/** + * Performs a HOP request to a relay peer, to request a connection to another + * peer. A new, virtual, connection will be created between the two via the relay. + * + * @param {object} options + * @param {Connection} options.connection Connection to the relay + * @param {*} options.request + * @param {Circuit} options.circuit + * @returns {Promise} + */ +module.exports.hop = async function hop ({ + connection, + request +}) { + // Create a new stream to the relay + const { stream } = await connection.newStream([multicodec.relay]) + // Send the HOP request + const streamHandler = new StreamHandler({ stream }) + streamHandler.write(request) + + const response = await streamHandler.read() + + if (response.code === CircuitPB.Status.SUCCESS) { + log('hop request was successful') + return streamHandler.rest() + } + + log('hop request failed with code %d, closing stream', response.code) + streamHandler.close() + throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED) +} + +/** + * Creates an unencoded CAN_HOP response based on the Circuits configuration + * @private + */ +module.exports.handleCanHop = function handleCanHop ({ + connection, + streamHandler, + circuit +}) { + const canHop = circuit._options.hop.enabled + log('can hop (%s) request from %s', canHop, connection.remotePeer.toB58String()) + streamHandler.end({ + type: CircuitPB.Type.STATUS, + code: canHop ? CircuitPB.Status.SUCCESS : CircuitPB.Status.HOP_CANT_SPEAK_RELAY + }) +} diff --git a/src/circuit/circuit/stop.js b/src/circuit/circuit/stop.js index e18ef028..ef4ca3d1 100644 --- a/src/circuit/circuit/stop.js +++ b/src/circuit/circuit/stop.js @@ -1,56 +1,69 @@ 'use strict' -const setImmediate = require('async/setImmediate') - -const EE = require('events').EventEmitter -const { Connection } = require('libp2p-interfaces/src/connection') -const utilsFactory = require('./utils') -const PeerInfo = require('peer-info') -const proto = require('../protocol').CircuitRelay -const series = require('async/series') +const { CircuitRelay: CircuitPB } = require('../protocol') +const multicodec = require('../multicodec') +const StreamHandler = require('./stream-handler') +const { validateAddrs } = require('./utils') const debug = require('debug') - const log = debug('libp2p:circuit:stop') -log.err = debug('libp2p:circuit:error:stop') +log.error = debug('libp2p:circuit:stop:error') -class Stop extends EE { - constructor (swarm) { - super() - this.swarm = swarm - this.utils = utilsFactory(swarm) +/** + * Handles incoming STOP requests + * + * @private + * @param {*} options + * @param {Connection} options.connection + * @param {*} options.request The CircuitRelay protobuf request (unencoded) + * @param {StreamHandler} options.streamHandler + * @returns {Promise<*>} Resolves a duplex iterable + */ +module.exports.handleStop = function handleStop ({ + connection, + request, + streamHandler +}) { + // Validate the STOP request has the required input + try { + validateAddrs(request, streamHandler) + } catch (err) { + return log.error('invalid stop request via peer %s', connection.remotePeer.toB58String(), err) } - /** - * Handle the incoming STOP message - * - * @param {{}} msg - the parsed protobuf message - * @param {StreamHandler} sh - the stream handler wrapped connection - * @param {Function} callback - callback - * @returns {undefined} - */ - handle (msg, sh, callback) { - callback = callback || (() => {}) - - series([ - (cb) => this.utils.validateAddrs(msg, sh, proto.Type.STOP, cb), - (cb) => this.utils.writeResponse(sh, proto.Status.Success, cb) - ], (err) => { - if (err) { - // we don't return the error here, - // since multistream select don't expect one - callback() - return log(err) - } - - const peerInfo = new PeerInfo(this.utils.peerIdFromId(msg.srcPeer.id)) - msg.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr)) - const newConn = new Connection(sh.rest()) - newConn.setPeerInfo(peerInfo) - setImmediate(() => this.emit('connection', newConn)) - callback(newConn) - }) - } + // The request is valid + log('stop request is valid') + streamHandler.write({ + type: CircuitPB.Type.STATUS, + code: CircuitPB.Status.SUCCESS + }) + return streamHandler.rest() } -module.exports = Stop +/** + * Creates a STOP request + * @private + * @param {*} options + * @param {Connection} options.connection + * @param {*} options.request The CircuitRelay protobuf request (unencoded) + * @returns {Promise<*>} Resolves a duplex iterable + */ +module.exports.stop = async function stop ({ + connection, + request +}) { + const { stream } = await connection.newStream([multicodec.relay]) + log('starting stop request to %s', connection.remotePeer.toB58String()) + const streamHandler = new StreamHandler({ stream }) + + streamHandler.write(request) + const response = await streamHandler.read() + + if (response.code === CircuitPB.Status.SUCCESS) { + log('stop request to %s was successful', connection.remotePeer.toB58String()) + return streamHandler.rest() + } + + log('stop request failed with code %d', response.code) + streamHandler.close() +} diff --git a/src/circuit/circuit/stream-handler.js b/src/circuit/circuit/stream-handler.js index 0d008171..5fd8b638 100644 --- a/src/circuit/circuit/stream-handler.js +++ b/src/circuit/circuit/stream-handler.js @@ -1,139 +1,79 @@ 'use strict' -const values = require('pull-stream/sources/values') -const collect = require('pull-stream/sinks/collect') -const empty = require('pull-stream/sources/empty') -const pull = require('pull-stream/pull') -const lp = require('pull-length-prefixed') -const handshake = require('pull-handshake') +const lp = require('it-length-prefixed') +const handshake = require('it-handshake') +const { CircuitRelay: CircuitPB } = require('../protocol') const debug = require('debug') const log = debug('libp2p:circuit:stream-handler') -log.err = debug('libp2p:circuit:error:stream-handler') +log.error = debug('libp2p:circuit:stream-handler:error') class StreamHandler { /** * Create a stream handler for connection * - * @param {Connection} conn - connection to read/write - * @param {Function|undefined} cb - handshake callback called on error - * @param {Number} timeout - handshake timeout - * @param {Number} maxLength - max bytes length of message + * @param {object} options + * @param {*} options.stream - A duplex iterable + * @param {Number} options.maxLength - max bytes length of message */ - constructor (conn, cb, timeout, maxLength) { - this.conn = conn - this.stream = null - this.shake = null - this.timeout = cb || 1000 * 60 - this.maxLength = maxLength || 4096 + constructor ({ stream, maxLength = 4096 }) { + this.stream = stream - if (typeof cb === 'function') { - this.timeout = timeout || 1000 * 60 - } - - this.stream = handshake({ timeout: this.timeout }, cb) - this.shake = this.stream.handshake - - pull(this.stream, conn, this.stream) - } - - isValid () { - return this.conn && this.shake && this.stream + this.shake = handshake(this.stream) + this.decoder = lp.decode.fromReader(this.shake.reader, { maxDataLength: maxLength }) } /** * Read and decode message - * - * @param {Function} cb - * @returns {void|Function} + * @async + * @returns {void} */ - read (cb) { - if (!this.isValid()) { - return cb(new Error('handler is not in a valid state')) + async read () { + const msg = await this.decoder.next() + if (msg.value) { + const value = CircuitPB.decode(msg.value.slice()) + log('read message type', value.type) + return value } - lp.decodeFromReader( - this.shake, - { maxLength: this.maxLength }, - (err, msg) => { - if (err) { - log.err(err) - // this.shake.abort(err) - return cb(err) - } - - return cb(null, msg) - }) + log('read received no value, closing stream') + // End the stream, we didn't get data + this.close() } /** * Encode and write array of buffers * - * @param {Buffer[]} msg - * @param {Function} [cb] - * @returns {Function} + * @param {*} msg An unencoded CircuitRelay protobuf message */ - write (msg, cb) { - cb = cb || (() => {}) - - if (!this.isValid()) { - return cb(new Error('handler is not in a valid state')) - } - - pull( - values([msg]), - lp.encode(), - collect((err, encoded) => { - if (err) { - log.err(err) - this.shake.abort(err) - return cb(err) - } - - encoded.forEach((e) => this.shake.write(e)) - cb() - }) - ) - } - - /** - * Get the raw Connection - * - * @returns {null|Connection|*} - */ - getRawConn () { - return this.conn + write (msg) { + log('write message type %s', msg.type) + this.shake.write(lp.encode.single(CircuitPB.encode(msg))) } /** * Return the handshake rest stream and invalidate handler * - * @return {*|{source, sink}} + * @return {*} A duplex iterable */ rest () { - const rest = this.shake.rest() + this.shake.rest() + return this.shake.stream + } - this.conn = null - this.stream = null - this.shake = null - return rest + end (msg) { + this.write(msg) + this.close() } /** * Close the stream * - * @returns {undefined} + * @returns {void} */ close () { - if (!this.isValid()) { - return - } - - // close stream - pull( - empty(), - this.rest() - ) + log('closing the stream') + this.rest().sink([]) } } diff --git a/src/circuit/circuit/utils.js b/src/circuit/circuit/utils.js index c3aa87a5..8e87851d 100644 --- a/src/circuit/circuit/utils.js +++ b/src/circuit/circuit/utils.js @@ -1,118 +1,51 @@ 'use strict' const multiaddr = require('multiaddr') -const PeerInfo = require('peer-info') -const PeerId = require('peer-id') -const proto = require('../protocol') -const { getPeerInfo } = require('../../get-peer-info') +const { CircuitRelay } = require('../protocol') -module.exports = function (swarm) { - /** - * Get b58 string from multiaddr or peerinfo - * - * @param {Multiaddr|PeerInfo} peer - * @return {*} - */ - function getB58String (peer) { - let b58Id = null - if (multiaddr.isMultiaddr(peer)) { - const relayMa = multiaddr(peer) - b58Id = relayMa.getPeerId() - } else if (PeerInfo.isPeerInfo(peer)) { - b58Id = peer.id.toB58String() - } +/** + * Write a response + * + * @param {StreamHandler} streamHandler + * @param {CircuitRelay.Status} status + */ +function writeResponse (streamHandler, status) { + streamHandler.write({ + type: CircuitRelay.Type.STATUS, + code: status + }) +} - return b58Id +/** + * Validate incomming HOP/STOP message + * + * @param {*} msg A CircuitRelay unencoded protobuf message + * @param {StreamHandler} streamHandler + */ +function validateAddrs (msg, streamHandler) { + try { + msg.dstPeer.addrs.forEach((addr) => { + return multiaddr(addr) + }) + } catch (err) { + writeResponse(streamHandler, msg.type === CircuitRelay.Type.HOP + ? CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID + : CircuitRelay.Status.STOP_DST_MULTIADDR_INVALID) + throw err } - /** - * Helper to make a peer info from a multiaddrs - * - * @param {Multiaddr|PeerInfo|PeerId} peer - * @return {PeerInfo} - * @private - */ - function peerInfoFromMa (peer) { - return getPeerInfo(peer, swarm._peerBook) - } - - /** - * Checks if peer has an existing connection - * - * @param {String} peerId - * @param {Swarm} swarm - * @return {Boolean} - */ - function isPeerConnected (peerId) { - return swarm.muxedConns[peerId] || swarm.conns[peerId] - } - - /** - * Write a response - * - * @param {StreamHandler} streamHandler - * @param {CircuitRelay.Status} status - * @param {Function} cb - * @returns {*} - */ - function writeResponse (streamHandler, status, cb) { - cb = cb || (() => {}) - streamHandler.write(proto.CircuitRelay.encode({ - type: proto.CircuitRelay.Type.STATUS, - code: status - })) - return cb() - } - - /** - * Validate incomming HOP/STOP message - * - * @param {CircuitRelay} msg - * @param {StreamHandler} streamHandler - * @param {CircuitRelay.Type} type - * @returns {*} - * @param {Function} cb - */ - function validateAddrs (msg, streamHandler, type, cb) { - try { - msg.dstPeer.addrs.forEach((addr) => { - return multiaddr(addr) - }) - } catch (err) { - writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP - ? proto.CircuitRelay.Status.HOP_DST_MULTIADDR_INVALID - : proto.CircuitRelay.Status.STOP_DST_MULTIADDR_INVALID) - return cb(err) - } - - try { - msg.srcPeer.addrs.forEach((addr) => { - return multiaddr(addr) - }) - } catch (err) { - writeResponse(streamHandler, type === proto.CircuitRelay.Type.HOP - ? proto.CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID - : proto.CircuitRelay.Status.STOP_SRC_MULTIADDR_INVALID) - return cb(err) - } - - return cb(null) - } - - function peerIdFromId (id) { - if (typeof id === 'string') { - return PeerId.createFromB58String(id) - } - - return PeerId.createFromBytes(id) - } - - return { - getB58String, - peerInfoFromMa, - isPeerConnected, - validateAddrs, - writeResponse, - peerIdFromId + try { + msg.srcPeer.addrs.forEach((addr) => { + return multiaddr(addr) + }) + } catch (err) { + writeResponse(streamHandler, msg.type === CircuitRelay.Type.HOP + ? CircuitRelay.Status.HOP_SRC_MULTIADDR_INVALID + : CircuitRelay.Status.STOP_SRC_MULTIADDR_INVALID) + throw err } } + +module.exports = { + validateAddrs +} diff --git a/src/circuit/index.js b/src/circuit/index.js index e4c7fed7..d5d293bf 100644 --- a/src/circuit/index.js +++ b/src/circuit/index.js @@ -1,3 +1,186 @@ 'use strict' -module.exports = require('./circuit') +const mafmt = require('mafmt') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const withIs = require('class-is') +const { CircuitRelay: CircuitPB } = require('./protocol') + +const debug = require('debug') +const log = debug('libp2p:circuit') +log.error = debug('libp2p:circuit:error') + +const { relay: multicodec } = require('./multicodec') +const createListener = require('./listener') +const { handleCanHop, handleHop, hop } = require('./circuit/hop') +const { handleStop } = require('./circuit/stop') +const StreamHandler = require('./circuit/stream-handler') +const toConnection = require('./stream-to-conn') + +class Circuit { + /** + * Creates an instance of Circuit. + * + * @constructor + * @param {object} options + * @param {Libp2p} options.libp2p + * @param {Upgrader} options.upgrader + */ + constructor ({ libp2p, upgrader }) { + this._dialer = libp2p.dialer + this._registrar = libp2p.registrar + this._upgrader = upgrader + this._options = libp2p._config.relay + this.peerInfo = libp2p.peerInfo + this._registrar.handle(multicodec, this._onProtocol.bind(this)) + } + + async _onProtocol ({ connection, stream, protocol }) { + const streamHandler = new StreamHandler({ stream }) + const request = await streamHandler.read() + const circuit = this + let virtualConnection + + switch (request.type) { + case CircuitPB.Type.CAN_HOP: { + log('received CAN_HOP request from %s', connection.remotePeer.toB58String()) + await handleCanHop({ circuit, connection, streamHandler }) + break + } + case CircuitPB.Type.HOP: { + log('received HOP request from %s', connection.remotePeer.toB58String()) + virtualConnection = await handleHop({ + connection, + request, + streamHandler, + circuit + }) + break + } + case CircuitPB.Type.STOP: { + log('received STOP request from %s', connection.remotePeer.toB58String()) + virtualConnection = await handleStop({ + connection, + request, + streamHandler, + circuit + }) + break + } + default: { + log('Request of type %s not supported', request.type) + } + } + + if (virtualConnection) { + const remoteAddr = multiaddr(request.dstPeer.addrs[0]) + const localAddr = multiaddr(request.srcPeer.addrs[0]) + const maConn = toConnection({ + stream: virtualConnection, + remoteAddr, + localAddr + }) + const type = CircuitPB.Type === CircuitPB.Type.HOP ? 'relay' : 'inbound' + log('new %s connection %s', type, maConn.remoteAddr) + + const conn = await this._upgrader.upgradeInbound(maConn) + log('%s connection %s upgraded', type, maConn.remoteAddr) + this.handler && this.handler(conn) + } + } + + /** + * Dial a peer over a relay + * + * @param {multiaddr} ma - the multiaddr of the peer to dial + * @param {Object} options - dial options + * @param {AbortSignal} [options.signal] - An optional abort signal + * @returns {Connection} - the connection + */ + async dial (ma, options) { + // Check the multiaddr to see if it contains a relay and a destination peer + const addrs = ma.toString().split('/p2p-circuit') + const relayAddr = multiaddr(addrs[0]) + const destinationAddr = multiaddr(addrs[addrs.length - 1]) + const relayPeer = PeerId.createFromCID(relayAddr.getPeerId()) + const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId()) + + let disconnectOnFailure = false + let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer)) + if (!relayConnection) { + relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options) + disconnectOnFailure = true + } + + try { + const virtualConnection = await hop({ + connection: relayConnection, + circuit: this, + request: { + type: CircuitPB.Type.HOP, + srcPeer: { + id: this.peerInfo.id.toBytes(), + addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer) + }, + dstPeer: { + id: destinationPeer.toBytes(), + addrs: [multiaddr(destinationAddr).buffer] + } + } + }) + + const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerInfo.id.toB58String()}`) + const maConn = toConnection({ + stream: virtualConnection, + remoteAddr: ma, + localAddr + }) + log('new outbound connection %s', maConn.remoteAddr) + + return this._upgrader.upgradeOutbound(maConn) + } catch (err) { + log.error('Circuit relay dial failed', err) + disconnectOnFailure && await relayConnection.close() + throw err + } + } + + /** + * Create a listener + * + * @param {any} options + * @param {Function} handler + * @return {listener} + */ + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + // Called on successful HOP and STOP requests + this.handler = handler + + return createListener(this, options) + } + + /** + * Filter check for all Multiaddrs that this transport can dial on + * + * @param {Array} multiaddrs + * @returns {Array} + */ + filter (multiaddrs) { + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] + + return multiaddrs.filter((ma) => { + return mafmt.Circuit.matches(ma) + }) + } +} + +/** + * @type {Circuit} + */ +module.exports = withIs(Circuit, { className: 'Circuit', symbolName: '@libp2p/js-libp2p-circuit/circuit' }) diff --git a/src/circuit/listener.js b/src/circuit/listener.js index ff55056e..2569c500 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -1,94 +1,42 @@ 'use strict' -const setImmediate = require('async/setImmediate') - -const multicodec = require('./multicodec') -const EE = require('events').EventEmitter +const EventEmitter = require('events') const multiaddr = require('multiaddr') -const mafmt = require('mafmt') -const Stop = require('./circuit/stop') -const Hop = require('./circuit/hop') -const proto = require('./protocol') -const utilsFactory = require('./circuit/utils') - -const StreamHandler = require('./circuit/stream-handler') const debug = require('debug') - const log = debug('libp2p:circuit:listener') log.err = debug('libp2p:circuit:error:listener') -module.exports = (swarm, options, connHandler) => { - const listener = new EE() - const utils = utilsFactory(swarm) - - listener.stopHandler = new Stop(swarm) - listener.stopHandler.on('connection', (conn) => listener.emit('connection', conn)) - listener.hopHandler = new Hop(swarm, options.hop) +/** + * @param {*} circuit + * @returns {Listener} a transport listener + */ +module.exports = (circuit) => { + const listener = new EventEmitter() + const listeningAddrs = new Map() /** * Add swarm handler and listen for incoming connections * - * @param {Multiaddr} ma - * @param {Function} callback + * @param {Multiaddr} addr * @return {void} */ - listener.listen = (ma, callback) => { - callback = callback || (() => {}) + listener.listen = async (addr) => { + const [addrString] = String(addr).split('/p2p-circuit').slice(-1) - swarm.handle(multicodec.relay, (_, conn) => { - const sh = new StreamHandler(conn) + const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString)) + const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit') - sh.read((err, msg) => { - if (err) { - log.err(err) - return - } - - let request = null - try { - request = proto.CircuitRelay.decode(msg) - } catch (err) { - return utils.writeResponse( - sh, - proto.CircuitRelay.Status.MALFORMED_MESSAGE) - } - - switch (request.type) { - case proto.CircuitRelay.Type.CAN_HOP: - case proto.CircuitRelay.Type.HOP: { - return listener.hopHandler.handle(request, sh) - } - - case proto.CircuitRelay.Type.STOP: { - return listener.stopHandler.handle(request, sh, connHandler) - } - - default: { - utils.writeResponse( - sh, - proto.CircuitRelay.Status.INVALID_MSG_TYPE) - return sh.close() - } - } - }) - }) - - setImmediate(() => listener.emit('listen')) - callback() + listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr) + listener.emit('listening') } /** - * Remove swarm listener + * TODO: Remove the peers from our topology * - * @param {Function} cb * @return {void} */ - listener.close = (cb) => { - swarm.unhandle(multicodec.relay) - setImmediate(() => listener.emit('close')) - cb() - } + listener.close = () => {} /** * Get fixed up multiaddrs @@ -104,45 +52,14 @@ module.exports = (swarm, options, connHandler) => { * the encapsulated transport address. This is useful when for example, a peer should only * be dialed over TCP rather than any other transport * - * @param {Function} callback - * @return {void} + * @return {Multiaddr[]} */ - listener.getAddrs = (callback) => { - let addrs = swarm._peerInfo.multiaddrs.toArray() - - // get all the explicit relay addrs excluding self - const p2pAddrs = addrs.filter((addr) => { - return mafmt.Circuit.matches(addr) && - !addr.toString().includes(swarm._peerInfo.id.toB58String()) - }) - - // use the explicit relays instead of any relay - if (p2pAddrs.length) { - addrs = p2pAddrs + listener.getAddrs = () => { + const addrs = [] + for (const addr of listeningAddrs.values()) { + addrs.push(addr) } - - const listenAddrs = [] - addrs.forEach((addr) => { - const peerMa = `/p2p-circuit/ipfs/${swarm._peerInfo.id.toB58String()}` - if (addr.toString() === peerMa) { - listenAddrs.push(multiaddr(peerMa)) - return - } - - if (!mafmt.Circuit.matches(addr)) { - if (addr.getPeerId()) { - // by default we're reachable over any relay - listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(addr)) - } else { - const ma = `${addr}/ipfs/${swarm._peerInfo.id.toB58String()}` - listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(ma)) - } - } else { - listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`)) - } - }) - - callback(null, listenAddrs) + return addrs } return listener diff --git a/src/circuit/stream-to-conn.js b/src/circuit/stream-to-conn.js new file mode 100644 index 00000000..d9040716 --- /dev/null +++ b/src/circuit/stream-to-conn.js @@ -0,0 +1,49 @@ +'use strict' + +const abortable = require('abortable-iterator') +const log = require('debug')('libp2p:circuit:stream') + +// Convert a duplex iterable into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = ({ stream, remoteAddr, localAddr }, options = {}) => { + const { sink, source } = stream + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await sink(source) + } catch (err) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log(err) + } + } + close() + }, + + source: options.signal ? abortable(source, options.signal) : source, + conn: stream, + localAddr, + remoteAddr, + timeline: { open: Date.now() }, + + close () { + sink([]) + close() + } + } + + function close () { + if (!maConn.timeline.close) { + maConn.timeline.close = Date.now() + } + } + + return maConn +} diff --git a/src/dialer.js b/src/dialer.js index 2d4071d1..2154554f 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -90,7 +90,6 @@ class Dialer { nextTick(async () => { try { await this.identifyService.identify(conn, conn.remotePeer) - // TODO: Update the PeerStore with the information from identify } catch (err) { log.error(err) } diff --git a/src/errors.js b/src/errors.js index 9a4f6e75..4c26c941 100644 --- a/src/errors.js +++ b/src/errors.js @@ -16,6 +16,7 @@ exports.codes = { ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF', ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED', + ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED', ERR_INVALID_KEY: 'ERR_INVALID_KEY', ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE', ERR_INVALID_PEER: 'ERR_INVALID_PEER', diff --git a/src/index.js b/src/index.js index 8d88cec1..de655d18 100644 --- a/src/index.js +++ b/src/index.js @@ -16,6 +16,7 @@ const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info') const { validate: validateConfig } = require('./config') const { codes } = require('./errors') +const Circuit = require('./circuit') const Dialer = require('./dialer') const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') @@ -78,9 +79,6 @@ class Libp2p extends EventEmitter { libp2p: this, upgrader: this.upgrader }) - this._modules.transport.forEach((Transport) => { - this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport) - }) // Attach crypto channels if (this._modules.connEncryption) { @@ -95,6 +93,12 @@ class Libp2p extends EventEmitter { peerStore: this.peerStore }) + this._modules.transport.forEach((Transport) => { + this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport) + }) + // TODO: enable relay if enabled + this.transportManager.add(Circuit.prototype[Symbol.toStringTag], Circuit) + // Attach stream multiplexers if (this._modules.streamMuxer) { const muxers = this._modules.streamMuxer @@ -182,6 +186,7 @@ class Libp2p extends EventEmitter { this.pubsub && await this.pubsub.stop() this._dht && await this._dht.stop() await this.transportManager.close() + await this.registrar.close() } catch (err) { if (err) { log.error(err) @@ -282,7 +287,10 @@ class Libp2p extends EventEmitter { this.upgrader.protocols.set(protocol, handler) }) - this.dialer.identifyService.pushToPeerStore(this.peerStore) + // Only push if libp2p is running + if (this.isStarted()) { + this.dialer.identifyService.pushToPeerStore(this.peerStore) + } } /** @@ -296,7 +304,10 @@ class Libp2p extends EventEmitter { this.upgrader.protocols.delete(protocol) }) - this.dialer.identifyService.pushToPeerStore(this.peerStore) + // Only push if libp2p is running + if (this.isStarted()) { + this.dialer.identifyService.pushToPeerStore(this.peerStore) + } } async _onStarting () { diff --git a/src/registrar.js b/src/registrar.js index c4a11680..666336ff 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -46,6 +46,23 @@ class Registrar { this._handle = handle } + /** + * Cleans up the registrar + * @async + */ + async close () { + // Close all connections we're tracking + const tasks = [] + for (const connectionList of this.connections.values()) { + for (const connection of connectionList) { + tasks.push(connection.close()) + } + } + + await tasks + this.connections.clear() + } + /** * Add a new connected peer to the record * TODO: this should live in the ConnectionManager @@ -100,8 +117,9 @@ class Registrar { getConnection (peerInfo) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + const connections = this.connections.get(peerInfo.id.toB58String()) // TODO: what should we return - return this.connections.get(peerInfo.id.toB58String())[0] + return connections ? connections[0] : null } /** diff --git a/src/transport-manager.js b/src/transport-manager.js index 722049a4..5e4a9201 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -64,7 +64,9 @@ class TransportManager { await Promise.all(tasks) log('all listeners closed') - this._listeners.clear() + for (const key of this._listeners.keys()) { + this._listeners.set(key, []) + } } /** @@ -82,6 +84,7 @@ class TransportManager { try { return await transport.dial(ma, options) } catch (err) { + if (err.code) throw err throw errCode(err, codes.ERR_TRANSPORT_DIAL_FAILED) } } diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 04d63b40..8fbe5d79 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -3,6 +3,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') const Transport = require('libp2p-tcp') @@ -77,14 +78,9 @@ describe('Dialing (direct, TCP)', () => { it('should fail to connect to an unsupported multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - try { - await dialer.connectToMultiaddr(unsupportedAddr) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToMultiaddr(unsupportedAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) }) it('should be able to connect to a given peer info', async () => { @@ -121,14 +117,9 @@ describe('Dialing (direct, TCP)', () => { const peerInfo = new PeerInfo(peerId) peerInfo.multiaddrs.add(unsupportedAddr) - try { - await dialer.connectToPeer(peerInfo) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToPeer(peerInfo)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED) }) it('should abort dials on queue task timeout', async () => { @@ -144,14 +135,9 @@ describe('Dialing (direct, TCP)', () => { expect(options.signal.aborted).to.equal(true) }) - try { - await dialer.connectToMultiaddr(remoteAddr) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToMultiaddr(remoteAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) it('should dial to the max concurrency', async () => { diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index eb1b158b..f5eaff9a 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -3,6 +3,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') const pDefer = require('p-defer') @@ -67,14 +68,9 @@ describe('Dialing (direct, WebSockets)', () => { it('should fail to connect to an unsupported multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - try { - await dialer.connectToMultiaddr(unsupportedAddr) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToMultiaddr(unsupportedAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) }) it('should be able to connect to a given peer', async () => { @@ -94,14 +90,9 @@ describe('Dialing (direct, WebSockets)', () => { const peerInfo = new PeerInfo(peerId) peerInfo.multiaddrs.add(unsupportedAddr) - try { - await dialer.connectToPeer(peerInfo) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToPeer(peerInfo)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED) }) it('should abort dials on queue task timeout', async () => { @@ -117,14 +108,9 @@ describe('Dialing (direct, WebSockets)', () => { expect(options.signal.aborted).to.equal(true) }) - try { - await dialer.connectToMultiaddr(remoteAddr) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT) - return - } - - expect.fail('Dial should have failed') + await expect(dialer.connectToMultiaddr(remoteAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) it('should dial to the max concurrency', async () => { diff --git a/test/dialing/relay.node.js b/test/dialing/relay.node.js new file mode 100644 index 00000000..597bfee1 --- /dev/null +++ b/test/dialing/relay.node.js @@ -0,0 +1,162 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) +const { expect } = chai +const sinon = require('sinon') + +const multiaddr = require('multiaddr') +const { collect } = require('streaming-iterables') +const pipe = require('it-pipe') +const { createPeerInfoFromFixture } = require('../utils/creators/peer') +const baseOptions = require('../utils/base-options') +const Libp2p = require('../../src') +const { codes: Errors } = require('../../src/errors') + +describe('Dialing (via relay, TCP)', () => { + let srcLibp2p + let relayLibp2p + let dstLibp2p + + before(async () => { + const peerInfos = await createPeerInfoFromFixture(3) + // Create 3 nodes, and turn HOP on for the relay + ;[srcLibp2p, relayLibp2p, dstLibp2p] = peerInfos.map((peerInfo, index) => { + const opts = baseOptions + index === 1 && (opts.config.relay.hop.enabled = true) + return new Libp2p({ + ...opts, + peerInfo + }) + }) + + dstLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) + }) + + beforeEach(() => { + // Start each node + return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => { + // Reset multiaddrs and start + libp2p.peerInfo.multiaddrs.clear() + libp2p.peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0') + libp2p.start() + })) + }) + + afterEach(() => { + // Stop each node + return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop())) + }) + + it('should be able to connect to a peer over a relay with active connections', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerInfo.id.toString() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) + + const tcpAddrs = dstLibp2p.transportManager.getAddrs() + await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) + expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) + + const connection = await srcLibp2p.dial(dialAddr) + expect(connection).to.exist() + expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerInfo.id.toBytes()) + expect(connection.localPeer.toBytes()).to.eql(srcLibp2p.peerInfo.id.toBytes()) + expect(connection.remoteAddr).to.eql(dialAddr) + expect(connection.localAddr).to.eql( + relayAddr // the relay address + .encapsulate(`/p2p/${relayIdString}`) // with its peer id + .encapsulate('/p2p-circuit') // the local peer is connected over the relay + .encapsulate(`/p2p/${srcLibp2p.peerInfo.id.toB58String()}`) // and the local peer id + ) + + const { stream: echoStream } = await connection.newStream('/echo/1.0.0') + const input = Buffer.from('hello') + const [output] = await pipe( + [input], + echoStream, + collect + ) + expect(output.slice()).to.eql(input) + }) + + it('should fail to connect to a peer over a relay with inactive connections', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerInfo.id.toString() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) + + await expect(srcLibp2p.dial(dialAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + }) + + it('should not stay connected to a relay when not already connected and HOP fails', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerInfo.id.toString() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) + + await expect(srcLibp2p.dial(dialAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + + // We should not be connected to the relay, because we weren't before the dial + const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo) + expect(srcToRelayConn).to.not.exist() + }) + + it('dialer should stay connected to an already connected relay on hop failure', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerInfo.id.toString() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) + + await srcLibp2p.dial(relayAddr) + + await expect(srcLibp2p.dial(dialAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + + const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo) + expect(srcToRelayConn).to.exist() + expect(srcToRelayConn.stat.status).to.equal('open') + }) + + it('destination peer should stay connected to an already connected relay on hop failure', async () => { + const relayAddr = relayLibp2p.transportManager.getAddrs()[0] + const relayIdString = relayLibp2p.peerInfo.id.toString() + + const dialAddr = relayAddr + .encapsulate(`/p2p/${relayIdString}`) + .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) + + // Connect the destination peer and the relay + const tcpAddrs = dstLibp2p.transportManager.getAddrs() + await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) + expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) + + // Tamper with the our multiaddrs for the circuit message + sinon.stub(srcLibp2p.peerInfo.multiaddrs, 'toArray').returns([{ + buffer: Buffer.from('an invalid multiaddr') + }]) + + await expect(srcLibp2p.dial(dialAddr)) + .to.eventually.be.rejected() + .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + + const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo) + expect(dstToRelayConn).to.exist() + expect(dstToRelayConn.stat.status).to.equal('open') + }) +}) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 114b5156..8cc372c6 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -3,6 +3,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') @@ -98,20 +99,18 @@ describe('Identify', () => { sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) // Run identify - try { - await Promise.all([ - localIdentify.identify(localConnectionMock, localPeer.id), - remoteIdentify.handleMessage({ - connection: remoteConnectionMock, - stream: remote, - protocol: multicodecs.IDENTIFY - }) - ]) - expect.fail('should have thrown') - } catch (err) { - expect(err).to.exist() - expect(err.code).to.eql(Errors.ERR_INVALID_PEER) - } + const identifyPromise = Promise.all([ + localIdentify.identify(localConnectionMock, localPeer.id), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY + }) + ]) + + await expect(identifyPromise) + .to.eventually.be.rejected() + .and.to.have.property('code', Errors.ERR_INVALID_PEER) }) describe('push', () => { @@ -229,6 +228,7 @@ describe('Identify', () => { // Wait for identify to finish await libp2p.dialer.identifyService.identify.firstCall.returnValue + sinon.stub(libp2p, 'isStarted').returns(true) libp2p.handle('/echo/2.0.0', () => {}) libp2p.unhandle('/echo/2.0.0') diff --git a/test/registrar/registrar.node.js b/test/registrar/registrar.node.js index b2045004..c2357cdb 100644 --- a/test/registrar/registrar.node.js +++ b/test/registrar/registrar.node.js @@ -54,4 +54,19 @@ describe('registrar on dial', () => { const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo) expect(remoteConn).to.exist() }) + + it('should be closed on libp2p stop', async () => { + libp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo + })) + + await libp2p.dial(remoteAddr) + expect(libp2p.registrar.connections.size).to.equal(1) + + sinon.spy(libp2p.registrar, 'close') + + await libp2p.stop() + expect(libp2p.registrar.close.callCount).to.equal(1) + expect(libp2p.registrar.connections.size).to.equal(0) + }) }) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index 9114e035..a56854d4 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -24,26 +24,14 @@ describe('registrar', () => { }) it('should fail to register a protocol if no multicodec is provided', () => { - try { - registrar.register() - } catch (err) { - expect(err).to.exist() - return - } - throw new Error('should fail to register a protocol if no multicodec is provided') + expect(() => registrar.register()).to.throw() }) it('should fail to register a protocol if an invalid topology is provided', () => { const fakeTopology = { random: 1 } - try { - registrar.register() - } catch (err) { - expect(err).to.exist(fakeTopology) - return - } - throw new Error('should fail to register a protocol if an invalid topology is provided') + expect(() => registrar.register(fakeTopology)).to.throw() }) }) diff --git a/test/transports/transport-manager.node.js b/test/transports/transport-manager.node.js index 71d776fc..4fd57e9c 100644 --- a/test/transports/transport-manager.node.js +++ b/test/transports/transport-manager.node.js @@ -38,11 +38,12 @@ describe('Transport Manager (TCP)', () => { it('should be able to listen', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) await tm.listen(addrs) - expect(tm._listeners.size).to.equal(1) + expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) + expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) // Ephemeral ip addresses may result in multiple listeners expect(tm.getAddrs().length).to.equal(addrs.length) await tm.close() - expect(tm._listeners.size).to.equal(0) + expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0) }) it('should be able to dial', async () => { diff --git a/test/transports/transport-manager.spec.js b/test/transports/transport-manager.spec.js index 91a976eb..ab08fc17 100644 --- a/test/transports/transport-manager.spec.js +++ b/test/transports/transport-manager.spec.js @@ -3,6 +3,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') @@ -39,21 +40,25 @@ describe('Transport Manager (WebSockets)', () => { await tm.remove(Transport.prototype[Symbol.toStringTag]) }) - it('should not be able to add a transport without a key', () => { - expect(() => { + it('should not be able to add a transport without a key', async () => { + // Chai as promised conflicts with normal `throws` validation, + // so wrap the call in an async function + await expect((async () => { // eslint-disable-line tm.add(undefined, Transport) - }).to.throw().that.satisfies((err) => { - return err.code === ErrorCodes.ERR_INVALID_KEY - }) + })()) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_INVALID_KEY) }) - it('should not be able to add a transport twice', () => { + it('should not be able to add a transport twice', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) - expect(() => { + // Chai as promised conflicts with normal `throws` validation, + // so wrap the call in an async function + await expect((async () => { // eslint-disable-line tm.add(Transport.prototype[Symbol.toStringTag], Transport) - }).to.throw().that.satisfies((err) => { - return err.code === ErrorCodes.ERR_DUPLICATE_TRANSPORT - }) + })()) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_DUPLICATE_TRANSPORT) }) it('should be able to dial', async () => { @@ -67,27 +72,18 @@ describe('Transport Manager (WebSockets)', () => { it('should fail to dial an unsupported address', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) const addr = multiaddr('/ip4/127.0.0.1/tcp/0') - try { - await tm.dial(addr) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) - return - } - - expect.fail('Dial attempt should have failed') + await expect(tm.dial(addr)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) }) it('should fail to listen with no valid address', async () => { tm.add(Transport.prototype[Symbol.toStringTag], Transport) const addrs = [multiaddr('/ip4/127.0.0.1/tcp/0')] - try { - await tm.listen(addrs) - } catch (err) { - expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_NO_VALID_ADDRESSES) - return - } - expect.fail('should have failed') + await expect(tm.listen(addrs)) + .to.eventually.be.rejected() + .and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES) }) }) @@ -115,7 +111,8 @@ describe('libp2p.transportManager', () => { }) expect(libp2p.transportManager).to.exist() - expect(libp2p.transportManager._transports.size).to.equal(1) + // Our transport and circuit relay + expect(libp2p.transportManager._transports.size).to.equal(2) }) it('starting and stopping libp2p should start and stop TransportManager', async () => { diff --git a/test/utils/base-options.browser.js b/test/utils/base-options.browser.js index 1c1a4255..d437b580 100644 --- a/test/utils/base-options.browser.js +++ b/test/utils/base-options.browser.js @@ -9,5 +9,13 @@ module.exports = { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: false + } + } } } diff --git a/test/utils/base-options.js b/test/utils/base-options.js index ca7a037c..8b7fb535 100644 --- a/test/utils/base-options.js +++ b/test/utils/base-options.js @@ -9,5 +9,13 @@ module.exports = { transport: [Transport], streamMuxer: [Muxer], connEncryption: [Crypto] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: false + } + } } } diff --git a/test/utils/creators/peer.js b/test/utils/creators/peer.js index e39e52e4..88750bea 100644 --- a/test/utils/creators/peer.js +++ b/test/utils/creators/peer.js @@ -5,7 +5,7 @@ const PeerInfo = require('peer-info') const Peers = require('../../fixtures/peers') -module.exports.createPeerInfo = async (length) => { +async function createPeerInfo (length) { const peers = await Promise.all( Array.from({ length }) .map((_, i) => PeerId.create()) @@ -14,11 +14,19 @@ module.exports.createPeerInfo = async (length) => { return peers.map((peer) => new PeerInfo(peer)) } -module.exports.createPeerInfoFromFixture = async (length) => { - const peers = await Promise.all( +function createPeerIdsFromFixture (length) { + return Promise.all( Array.from({ length }) .map((_, i) => PeerId.createFromJSON(Peers[i])) ) +} + +async function createPeerInfoFromFixture (length) { + const peers = await createPeerIdsFromFixture(length) return peers.map((peer) => new PeerInfo(peer)) } + +module.exports.createPeerInfo = createPeerInfo +module.exports.createPeerIdsFromFixture = createPeerIdsFromFixture +module.exports.createPeerInfoFromFixture = createPeerInfoFromFixture diff --git a/test/utils/mockConnection.js b/test/utils/mockConnection.js index f3ce8858..5e6219ad 100644 --- a/test/utils/mockConnection.js +++ b/test/utils/mockConnection.js @@ -1,10 +1,15 @@ 'use strict' +const pipe = require('it-pipe') const { Connection } = require('libp2p-interfaces/src/connection') const multiaddr = require('multiaddr') - +const Muxer = require('libp2p-mplex') +const Multistream = require('multistream-select') const pair = require('it-pair') +const errCode = require('err-code') +const { codes } = require('../../src/errors') +const mockMultiaddrConnPair = require('./mockMultiaddrConn') const peerUtils = require('./creators/peer') module.exports = async (properties = {}) => { @@ -48,3 +53,103 @@ module.exports = async (properties = {}) => { ...properties }) } + +/** + * Creates a full connection pair, without the transport or encryption + * + * @param {object} options + * @param {Multiaddr[]} options.addrs Should contain two addresses for the local and remote peer respectively + * @param {PeerId[]} options.remotePeer Should contain two peer ids, for the local and remote peer respectively + * @param {Map} options.protocols The protocols the connections should support + * @returns {{inbound:Connection, outbound:Connection}} + */ +module.exports.pair = function connectionPair ({ addrs, peers, protocols }) { + const [localPeer, remotePeer] = peers + + const { + inbound: inboundMaConn, + outbound: outboundMaConn + } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const inbound = createConnection({ + direction: 'inbound', + maConn: inboundMaConn, + protocols, + // Inbound connection peers are reversed + localPeer: remotePeer, + remotePeer: localPeer + }) + const outbound = createConnection({ + direction: 'outbound', + maConn: outboundMaConn, + protocols, + localPeer, + remotePeer + }) + + return { inbound, outbound } +} + +function createConnection ({ + direction, + maConn, + localPeer, + remotePeer, + protocols +}) { + // Create the muxer + const muxer = new Muxer({ + // Run anytime a remote stream is created + onStream: async muxedStream => { + const mss = new Multistream.Listener(muxedStream) + try { + const { stream, protocol } = await mss.handle(Array.from(protocols.keys())) + connection.addStream(stream, protocol) + // Need to be able to notify a peer of this this._onStream({ connection, stream, protocol }) + const handler = protocols.get(protocol) + handler({ connection, stream, protocol }) + } catch (err) { + // Do nothing + } + }, + // Run anytime a stream closes + onStreamEnd: muxedStream => { + connection.removeStream(muxedStream.id) + } + }) + + const newStream = async protocols => { + const muxedStream = muxer.newStream() + const mss = new Multistream.Dialer(muxedStream) + try { + const { stream, protocol } = await mss.select(protocols) + return { stream: { ...muxedStream, ...stream }, protocol } + } catch (err) { + throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) + } + } + + // Pipe all data through the muxer + pipe(maConn, muxer, maConn) + + maConn.timeline.upgraded = Date.now() + + // Create the connection + const connection = new Connection({ + localAddr: maConn.localAddr, + remoteAddr: maConn.remoteAddr, + localPeer: localPeer, + remotePeer: remotePeer, + stat: { + direction, + timeline: maConn.timeline, + multiplexer: Muxer.multicodec, + encryption: 'N/A' + }, + newStream, + getStreams: () => muxer.streams, + close: err => maConn.close(err) + }) + + return connection +} diff --git a/test/utils/mockMultiaddrConn.js b/test/utils/mockMultiaddrConn.js index 617f6604..9fd9c9b5 100644 --- a/test/utils/mockMultiaddrConn.js +++ b/test/utils/mockMultiaddrConn.js @@ -13,10 +13,11 @@ const AbortController = require('abort-controller') */ module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) { const controller = new AbortController() + const [localAddr, remoteAddr] = addrs const [inbound, outbound] = duplexPair() - outbound.localAddr = addrs[0] - outbound.remoteAddr = addrs[1].encapsulate(`/p2p/${remotePeer.toB58String()}`) + outbound.localAddr = localAddr + outbound.remoteAddr = remoteAddr.encapsulate(`/p2p/${remotePeer.toB58String()}`) outbound.timeline = { open: Date.now() } @@ -25,8 +26,8 @@ module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) { controller.abort() } - inbound.localAddr = addrs[1] - inbound.remoteAddr = addrs[0] + inbound.localAddr = remoteAddr + inbound.remoteAddr = localAddr inbound.timeline = { open: Date.now() }