diff --git a/package.json b/package.json index db94adc3..4abc96c8 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "npm": ">=6.0.0" }, "dependencies": { + "abort-controller": "^3.0.0", "async": "^2.6.2", "bignumber.js": "^9.0.0", "class-is": "^1.1.0", @@ -48,15 +49,17 @@ "err-code": "^1.1.2", "fsm-event": "^2.1.0", "hashlru": "^2.3.0", - "interface-connection": "~0.3.3", + "it-pipe": "^1.0.1", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.16.2", + "libp2p-interfaces": "^0.1.1", "mafmt": "^7.0.0", "merge-options": "^1.0.1", "moving-average": "^1.0.0", "multiaddr": "^7.1.0", - "multistream-select": "~0.14.6", + "multistream-select": "^0.15.0", "once": "^1.4.0", + "p-queue": "^6.1.1", "p-settle": "^3.1.0", "peer-book": "^0.9.1", "peer-id": "^0.13.3", @@ -73,6 +76,7 @@ }, "devDependencies": { "@nodeutils/defaults-deep": "^1.1.0", + "abortable-iterator": "^2.1.0", "aegir": "^20.0.0", "chai": "^4.2.0", "chai-checkmark": "^1.0.1", @@ -80,7 +84,9 @@ "delay": "^4.3.0", "dirty-chai": "^2.0.1", "electron-webrtc": "^0.3.0", + "glob": "^7.1.4", "interface-datastore": "^0.6.0", + "it-pair": "^1.0.0", "libp2p-bootstrap": "^0.9.7", "libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2", @@ -88,7 +94,7 @@ "libp2p-gossipsub": "~0.0.4", "libp2p-kad-dht": "^0.15.3", "libp2p-mdns": "^0.12.3", - "libp2p-mplex": "^0.8.4", + "libp2p-mplex": "^0.9.1", "libp2p-pnet": "~0.1.0", "libp2p-secio": "^0.11.1", "libp2p-spdy": "^0.13.2", @@ -96,6 +102,7 @@ "libp2p-websockets": "^0.13.0", "lodash.times": "^4.3.2", "nock": "^10.0.6", + "p-defer": "^3.0.0", "portfinder": "^1.0.20", "pull-goodbye": "0.0.2", "pull-length-prefixed": "^1.3.3", @@ -104,6 +111,7 @@ "pull-protocol-buffers": "~0.1.2", "pull-serializer": "^0.3.2", "sinon": "^7.2.7", + "streaming-iterables": "^4.1.0", "wrtc": "^0.4.1" }, "contributors": [ diff --git a/src/circuit/README.md b/src/circuit/README.md index 5c67474d..36b659f7 100644 --- a/src/circuit/README.md +++ b/src/circuit/README.md @@ -1,6 +1,6 @@ # js-libp2p-circuit -> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/interface-connection) interface for dial/listen. +> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) interface for dial/listen. **Note**: git history prior to merging into js-libp2p can be found in the original repository, https://github.com/libp2p/js-libp2p-circuit. @@ -24,15 +24,18 @@ Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes ## Table of Contents -- [Install](#install) - - [npm](#npm) -- [Usage](#usage) - - [Example](#example) - - [This module uses `pull-streams`](#this-module-uses-pull-streams) - - [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams) -- [API](#api) -- [Contribute](#contribute) -- [License](#license) +- [js-libp2p-circuit](#js-libp2p-circuit) + - [Why?](#why) + - [libp2p-circuit and IPFS](#libp2p-circuit-and-ipfs) + - [Table of Contents](#table-of-contents) + - [Usage](#usage) + - [Example](#example) + - [Create dialer/listener](#create-dialerlistener) + - [Create `relay`](#create-relay) + - [This module uses `pull-streams`](#this-module-uses-pull-streams) + - [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams) + - [API](#api) + - [Implementation rational](#implementation-rational) ## Usage diff --git a/src/circuit/circuit/dialer.js b/src/circuit/circuit/dialer.js index 2407c40f..2c250e2c 100644 --- a/src/circuit/circuit/dialer.js +++ b/src/circuit/circuit/dialer.js @@ -6,7 +6,7 @@ const waterfall = require('async/waterfall') const setImmediate = require('async/setImmediate') const multiaddr = require('multiaddr') -const Connection = require('interface-connection').Connection +const { Connection } = require('libp2p-interfaces/src/connection') const utilsFactory = require('./utils') const StreamHandler = require('./stream-handler') diff --git a/src/circuit/circuit/stop.js b/src/circuit/circuit/stop.js index f420c8f6..e18ef028 100644 --- a/src/circuit/circuit/stop.js +++ b/src/circuit/circuit/stop.js @@ -3,7 +3,7 @@ const setImmediate = require('async/setImmediate') const EE = require('events').EventEmitter -const Connection = require('interface-connection').Connection +const { Connection } = require('libp2p-interfaces/src/connection') const utilsFactory = require('./utils') const PeerInfo = require('peer-info') const proto = require('../protocol').CircuitRelay diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 00000000..72c442d2 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,12 @@ +'use strict' + +module.exports = { + DENY_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again + DENY_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently denied + DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take + MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued + MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials + QUARTER_HOUR: 15 * 60e3, + PRIORITY_HIGH: 10, + PRIORITY_LOW: 20 +} diff --git a/src/dialer.js b/src/dialer.js new file mode 100644 index 00000000..2177ccda --- /dev/null +++ b/src/dialer.js @@ -0,0 +1,98 @@ +'use strict' + +const multiaddr = require('multiaddr') +const errCode = require('err-code') +const { default: PQueue } = require('p-queue') +const AbortController = require('abort-controller') +const debug = require('debug') +const log = debug('libp2p:dialer') +log.error = debug('libp2p:dialer:error') + +const { codes } = require('./errors') +const { + MAX_PARALLEL_DIALS, + DIAL_TIMEOUT +} = require('./constants') + +class Dialer { + /** + * @constructor + * @param {object} options + * @param {TransportManager} options.transportManager + * @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS` + * @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT` + */ + constructor ({ + transportManager, + concurrency = MAX_PARALLEL_DIALS, + timeout = DIAL_TIMEOUT + }) { + this.transportManager = transportManager + this.concurrency = concurrency + this.timeout = timeout + this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true }) + } + + /** + * Connects to a given `Multiaddr`. `addr` should include the id of the peer being + * dialed, it will be used for encryption verification. + * + * @async + * @param {Multiaddr} addr The address to dial + * @param {object} [options] + * @param {AbortSignal} [options.signal] An AbortController signal + * @returns {Promise} + */ + async connectToMultiaddr (addr, options = {}) { + addr = multiaddr(addr) + let conn + let controller + + if (!options.signal) { + controller = new AbortController() + options.signal = controller.signal + } + + try { + conn = await this.queue.add(() => this.transportManager.dial(addr, options)) + } catch (err) { + if (err.name === 'TimeoutError') { + controller.abort() + err.code = codes.ERR_TIMEOUT + } + log.error('Error dialing address %s,', addr, err) + throw err + } + + return conn + } + + /** + * Connects to a given `PeerInfo` by dialing all of its known addresses. + * The dial to the first address that is successfully able to upgrade a connection + * will be used. + * + * @async + * @param {PeerInfo} peerInfo The remote peer to dial + * @param {object} [options] + * @param {AbortSignal} [options.signal] An AbortController signal + * @returns {Promise} + */ + async connectToPeer (peerInfo, options = {}) { + const addrs = peerInfo.multiaddrs.toArray() + for (const addr of addrs) { + try { + return await this.connectToMultiaddr(addr, options) + } catch (_) { + // The error is already logged, just move to the next addr + continue + } + } + + const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED) + log.error(err) + throw err + } +} + +module.exports = Dialer diff --git a/src/errors.js b/src/errors.js index 5219891a..de75f094 100644 --- a/src/errors.js +++ b/src/errors.js @@ -8,10 +8,16 @@ exports.messages = { exports.codes = { DHT_DISABLED: 'ERR_DHT_DISABLED', PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED', + ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED', ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES', ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF', ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', + ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED', ERR_INVALID_KEY: 'ERR_INVALID_KEY', - ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE' + ERR_MUXER_UNAVAILABLE: 'ERR_MUXER_UNAVAILABLE', + ERR_TIMEOUT: 'ERR_TIMEOUT', + ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE', + ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED', + ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL' } diff --git a/src/index.js b/src/index.js index b973739b..3d5cf8c2 100644 --- a/src/index.js +++ b/src/index.js @@ -13,20 +13,22 @@ const nextTick = require('async/nextTick') const PeerBook = require('peer-book') const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') const Switch = require('./switch') const Ping = require('./ping') -const ConnectionManager = require('./connection-manager') const { emitFirst } = require('./util') const peerRouting = require('./peer-routing') const contentRouting = require('./content-routing') const dht = require('./dht') const pubsub = require('./pubsub') -const { getPeerInfoRemote } = require('./get-peer-info') -const validateConfig = require('./config').validate +const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info') +const { validate: validateConfig } = require('./config') const { codes } = require('./errors') +const Dialer = require('./dialer') const TransportManager = require('./transport-manager') +const Upgrader = require('./upgrader') const notStarted = (action, state) => { return errCode( @@ -61,64 +63,49 @@ class Libp2p extends EventEmitter { // create the switch, and listen for errors this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch) - this._switch.on('error', (...args) => this.emit('error', ...args)) - this.stats = this._switch.stats - this.connectionManager = new ConnectionManager(this, this._options.connectionManager) + // Setup the Upgrader + this.upgrader = new Upgrader({ + localPeer: this.peerInfo.id, + onConnection: (connection) => { + const peerInfo = getPeerInfo(connection.remotePeer) + this.emit('peer:connect', peerInfo) + }, + onConnectionEnd: (connection) => { + const peerInfo = getPeerInfo(connection.remotePeer) + this.emit('peer:disconnect', peerInfo) + } + }) // Setup the transport manager this.transportManager = new TransportManager({ libp2p: this, - // TODO: set the actual upgrader - upgrader: { - upgradeInbound: (maConn) => maConn, - upgradeOutbound: (maConn) => maConn - }, - // TODO: Route incoming connections to a multiplex protocol router - onConnection: () => {} + upgrader: this.upgrader }) this._modules.transport.forEach((Transport) => { this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport) }) - // Attach stream multiplexers - if (this._modules.streamMuxer) { - const muxers = this._modules.streamMuxer - muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer)) - - // If muxer exists - // we can use Identify - this._switch.connection.reuse() - // we can use Relay for listening/dialing - this._switch.connection.enableCircuitRelay(this._config.relay) - - // Received incomming dial and muxer upgrade happened, - // reuse this muxed connection - this._switch.on('peer-mux-established', (peerInfo) => { - this.emit('peer:connect', peerInfo) - }) - - this._switch.on('peer-mux-closed', (peerInfo) => { - this.emit('peer:disconnect', peerInfo) - }) - } - - // Events for anytime connections are created/removed - this._switch.on('connection:start', (peerInfo) => { - this.emit('connection:start', peerInfo) - }) - this._switch.on('connection:end', (peerInfo) => { - this.emit('connection:end', peerInfo) - }) - // Attach crypto channels if (this._modules.connEncryption) { const cryptos = this._modules.connEncryption cryptos.forEach((crypto) => { - this._switch.connection.crypto(crypto.tag, crypto.encrypt) + this.upgrader.cryptos.set(crypto.tag, crypto) }) } + // Attach stream multiplexers + if (this._modules.streamMuxer) { + const muxers = this._modules.streamMuxer + muxers.forEach((muxer) => { + this.upgrader.muxers.set(muxer.multicodec, muxer) + }) + } + + this.dialer = new Dialer({ + transportManager: this.transportManager + }) + // Attach private network protector if (this._modules.connProtector) { this._switch.protector = this._modules.connProtector @@ -153,7 +140,8 @@ class Libp2p extends EventEmitter { this.state = new FSM('STOPPED', { STOPPED: { start: 'STARTING', - stop: 'STOPPED' + stop: 'STOPPED', + done: 'STOPPED' }, STARTING: { done: 'STARTED', @@ -175,7 +163,6 @@ class Libp2p extends EventEmitter { }) this.state.on('STOPPING', () => { log('libp2p is stopping') - this._onStopping() }) this.state.on('STARTED', () => { log('libp2p has started') @@ -201,7 +188,7 @@ class Libp2p extends EventEmitter { this._peerDiscovered = this._peerDiscovered.bind(this) // promisify all instance methods - ;['start', 'stop', 'dial', 'dialProtocol', 'dialFSM', 'hangUp', 'ping'].forEach(method => { + ;['start', 'hangUp', 'ping'].forEach(method => { this[method] = promisify(this[method], { context: this }) }) } @@ -234,13 +221,21 @@ class Libp2p extends EventEmitter { /** * Stop the libp2p node by closing its listeners and open connections - * - * @param {function(Error)} callback + * @async * @returns {void} */ - stop (callback = () => {}) { - emitFirst(this, ['error', 'stop'], callback) + async stop () { this.state('stop') + + try { + await this.transportManager.close() + } catch (err) { + if (err) { + log.error(err) + this.emit('error', err) + } + } + this.state('done') } isStarted () { @@ -252,11 +247,12 @@ class Libp2p extends EventEmitter { * peer will be added to the nodes `PeerBook` * * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial - * @param {function(Error)} callback - * @returns {void} + * @param {object} options + * @param {AbortSignal} [options.signal] + * @returns {Promise} */ - dial (peer, callback) { - this.dialProtocol(peer, null, callback) + dial (peer, options) { + return this.dialProtocol(peer, null, options) } /** @@ -264,50 +260,28 @@ class Libp2p extends EventEmitter { * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`, * and the `Connection` will be sent in the callback * + * @async * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial - * @param {string} protocol - * @param {function(Error, Connection)} callback - * @returns {void} + * @param {string[]|string} protocols + * @param {object} options + * @param {AbortSignal} [options.signal] + * @returns {Promise} */ - dialProtocol (peer, protocol, callback) { - if (!this.isStarted()) { - return callback(notStarted('dial', this.state._state)) + async dialProtocol (peer, protocols, options) { + let connection + if (multiaddr.isMultiaddr(peer)) { + connection = await this.dialer.connectToMultiaddr(peer, options) + } else { + peer = await getPeerInfoRemote(peer, this) + connection = await this.dialer.connectToPeer(peer, options) } - if (typeof protocol === 'function') { - callback = protocol - protocol = undefined + // If a protocol was provided, create a new stream + if (protocols) { + return connection.newStream(protocols) } - getPeerInfoRemote(peer, this) - .then(peerInfo => { - this._switch.dial(peerInfo, protocol, callback) - }, callback) - } - - /** - * Similar to `dial` and `dialProtocol`, but the callback will contain a - * Connection State Machine. - * - * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial - * @param {string} protocol - * @param {function(Error, ConnectionFSM)} callback - * @returns {void} - */ - dialFSM (peer, protocol, callback) { - if (!this.isStarted()) { - return callback(notStarted('dial', this.state._state)) - } - - if (typeof protocol === 'function') { - callback = protocol - protocol = undefined - } - - getPeerInfoRemote(peer, this) - .then(peerInfo => { - this._switch.dialFSM(peerInfo, protocol, callback) - }, callback) + return connection } /** @@ -342,12 +316,28 @@ class Libp2p extends EventEmitter { }, callback) } - handle (protocol, handlerFunc, matchFunc) { - this._switch.handle(protocol, handlerFunc, matchFunc) + /** + * Registers the `handler` for each protocol + * @param {string[]|string} protocols + * @param {function({ stream:*, protocol:string })} handler + */ + handle (protocols, handler) { + protocols = Array.isArray(protocols) ? protocols : [protocols] + protocols.forEach(protocol => { + this.upgrader.protocols.set(protocol, handler) + }) } - unhandle (protocol) { - this._switch.unhandle(protocol) + /** + * Removes the handler for each protocol. The protocol + * will no longer be supported on streams. + * @param {string[]|string} protocols + */ + unhandle (protocols) { + protocols = Array.isArray(protocols) ? protocols : [protocols] + protocols.forEach(protocol => { + this.upgrader.protocols.delete(protocol) + }) } async _onStarting () { @@ -373,21 +363,6 @@ class Libp2p extends EventEmitter { this.state('done') } - async _onStopping () { - // Start parallel tasks - try { - await this.transportManager.close() - } catch (err) { - if (err) { - log.error(err) - this.emit('error', err) - } - } - - // libp2p has stopped - this.state('done') - } - /** * Handles discovered peers. Each discovered peer will be emitted via * the `peer:discovery` event. If auto dial is enabled for libp2p diff --git a/src/pnet/index.js b/src/pnet/index.js index a6480dca..db79e3bc 100644 --- a/src/pnet/index.js +++ b/src/pnet/index.js @@ -1,7 +1,7 @@ 'use strict' const pull = require('pull-stream') -const Connection = require('interface-connection').Connection +const { Connection } = require('libp2p-interfaces/src/connection') const assert = require('assert') const Errors = require('./errors') diff --git a/src/switch/README.md b/src/switch/README.md index 2ceec4bb..3ca9e9d5 100644 --- a/src/switch/README.md +++ b/src/switch/README.md @@ -81,7 +81,7 @@ tests]([./test/pnet.node.js]). ##### `switch.connection.addUpgrade()` -A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification. +A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) specification. > **WIP** @@ -151,7 +151,7 @@ a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will tak - `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted. - `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal. - `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted. -- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted. +- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) will be emitted. - `close`: emitted when the connection has closed. ### `switch.handle(protocol, handlerFunc, matchFunc)` @@ -365,7 +365,7 @@ In order for a transport to be supported, it has to follow the [interface-transp ### Connection upgrades -Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/interface-connection) spec. This design decision enables libp2p to have upgradable transports. +Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) spec. This design decision enables libp2p to have upgradable transports. We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it. diff --git a/src/switch/limit-dialer/queue.js b/src/switch/limit-dialer/queue.js index 344997ad..94d38c5b 100644 --- a/src/switch/limit-dialer/queue.js +++ b/src/switch/limit-dialer/queue.js @@ -1,6 +1,6 @@ 'use strict' -const Connection = require('interface-connection').Connection +const { Connection } = require('libp2p-interfaces/src/connection') const pull = require('pull-stream/pull') const empty = require('pull-stream/sources/empty') const timeout = require('async/timeout') diff --git a/src/switch/observe-connection.js b/src/switch/observe-connection.js index c6e928c0..45f87cd0 100644 --- a/src/switch/observe-connection.js +++ b/src/switch/observe-connection.js @@ -1,6 +1,6 @@ 'use strict' -const Connection = require('interface-connection').Connection +const { Connection } = require('libp2p-interfaces/src/connection') const pull = require('pull-stream/pull') /** diff --git a/src/transport-manager.js b/src/transport-manager.js index 371f2f57..136dc8c8 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -13,14 +13,12 @@ class TransportManager { * @param {object} options * @param {Libp2p} options.libp2p The Libp2p instance. It will be passed to the transports. * @param {Upgrader} options.upgrader The upgrader to provide to the transports - * @param {function(Connection)} options.onConnection Called whenever an incoming connection is received */ - constructor ({ libp2p, upgrader, onConnection }) { + constructor ({ libp2p, upgrader }) { this.libp2p = libp2p this.upgrader = upgrader this._transports = new Map() this._listeners = new Map() - this.onConnection = onConnection } /** @@ -45,7 +43,9 @@ class TransportManager { }) this._transports.set(key, transport) - this._listeners.set(key, []) + if (!this._listeners.has(key)) { + this._listeners.set(key, []) + } } /** @@ -57,11 +57,13 @@ class TransportManager { for (const [key, listeners] of this._listeners) { log('closing listeners for %s', key) while (listeners.length) { - tasks.push(listeners.pop().close()) + const listener = listeners.pop() + tasks.push(listener.close()) } } await Promise.all(tasks) + log('all listeners closed') this._listeners.clear() } @@ -76,8 +78,12 @@ class TransportManager { if (!transport) { throw errCode(new Error(`No transport available for address ${String(ma)}`), codes.ERR_TRANSPORT_UNAVAILABLE) } - const conn = await transport.dial(ma, options) - return conn + + try { + return await transport.dial(ma, options) + } catch (err) { + throw errCode(new Error('Transport dial failed'), codes.ERR_TRANSPORT_DIAL_FAILED, err) + } } /** diff --git a/src/upgrader.js b/src/upgrader.js new file mode 100644 index 00000000..e470d81c --- /dev/null +++ b/src/upgrader.js @@ -0,0 +1,336 @@ +'use strict' + +const debug = require('debug') +const log = debug('libp2p:upgrader') +log.error = debug('libp2p:upgrader:error') +const Multistream = require('multistream-select') +const { Connection } = require('libp2p-interfaces/src/connection') +const PeerId = require('peer-id') +const pipe = require('it-pipe') +const errCode = require('err-code') + +const { codes } = require('./errors') + +/** + * @typedef MultiaddrConnection + * @property {function} sink + * @property {AsyncIterator} source + * @property {*} conn + * @property {Multiaddr} remoteAddr + */ + +/** + * @typedef CryptoResult + * @property {*} conn A duplex iterable + * @property {PeerId} remotePeer + * @property {string} protocol + */ + +class Upgrader { + /** + * @param {object} options + * @param {PeerId} options.localPeer + * @param {Map} options.cryptos + * @param {Map} options.muxers + */ + constructor ({ localPeer, cryptos, muxers, onConnectionEnd = () => {}, onConnection = () => {} }) { + this.localPeer = localPeer + this.cryptos = cryptos || new Map() + this.muxers = muxers || new Map() + this.protocols = new Map() + this.onConnection = onConnection + this.onConnectionEnd = onConnectionEnd + } + + /** + * Upgrades an inbound connection + * @async + * @param {MultiaddrConnection} maConn + * @returns {Promise} + */ + async upgradeInbound (maConn) { + let encryptedConn + let remotePeer + let muxedConnection + let Muxer + let cryptoProtocol + + try { + // Encrypt the connection + ({ + conn: encryptedConn, + remotePeer, + protocol: cryptoProtocol + } = await this._encryptInbound(this.localPeer, maConn, this.cryptos)) + + // Multiplex the connection + ;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) + } catch (err) { + log.error('Failed to upgrade inbound connection', err) + await maConn.close(err) + // TODO: We shouldn't throw here, as there isn't anything to catch the failure + throw err + } + + log('Successfully upgraded inbound connection') + + return this._createConnection({ + cryptoProtocol, + direction: 'inbound', + maConn, + muxedConnection, + Muxer, + remotePeer + }) + } + + /** + * Upgrades an outbound connection + * @async + * @param {MultiaddrConnection} maConn + * @returns {Promise} + */ + async upgradeOutbound (maConn) { + let remotePeerId + try { + remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) + } catch (err) { + log.error('multiaddr did not contain a valid peer id', err) + } + + let encryptedConn + let remotePeer + let muxedConnection + let cryptoProtocol + let Muxer + + try { + // Encrypt the connection + ({ + conn: encryptedConn, + remotePeer, + protocol: cryptoProtocol + } = await this._encryptOutbound(this.localPeer, maConn, remotePeerId, this.cryptos)) + + // Multiplex the connection + ;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) + } catch (err) { + log.error('Failed to upgrade outbound connection', err) + await maConn.close(err) + throw err + } + + log('Successfully upgraded outbound connection') + + return this._createConnection({ + cryptoProtocol, + direction: 'outbound', + maConn, + muxedConnection, + Muxer, + remotePeer + }) + } + + /** + * A convenience method for generating a new `Connection` + * @private + * @param {object} options + * @param {string} cryptoProtocol The crypto protocol that was negotiated + * @param {string} direction One of ['inbound', 'outbound'] + * @param {MultiaddrConnection} maConn The transport layer connection + * @param {*} muxedConnection A duplex connection returned from multiplexer selection + * @param {Muxer} Muxer The muxer to be used for muxing + * @param {PeerId} remotePeer The peer the connection is with + * @returns {Connection} + */ + _createConnection ({ + cryptoProtocol, + direction, + maConn, + muxedConnection, + Muxer, + remotePeer + }) { + // Create the muxer + const muxer = new Muxer({ + // Run anytime a remote stream is created + onStream: async muxedStream => { + const mss = new Multistream.Listener(muxedStream) + const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) + log('%s: incoming stream opened on %s', direction, protocol) + connection.addStream(stream, protocol) + this._onStream({ stream, protocol }) + }, + // Run anytime a stream closes + onStreamEnd: muxedStream => { + connection.removeStream(muxedStream.id) + } + }) + + const newStream = async protocols => { + log('%s: starting new stream on %s', direction, protocols) + const muxedStream = muxer.newStream() + const mss = new Multistream.Dialer(muxedStream) + try { + const { stream, protocol } = await mss.select(protocols) + return { stream: { ...muxedStream, ...stream }, protocol } + } catch (err) { + log.error('could not create new stream', err) + throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) + } + } + + // Pipe all data through the muxer + pipe(muxedConnection, muxer, muxedConnection) + + maConn.timeline.upgraded = Date.now() + const timelineProxy = new Proxy(maConn.timeline, { + set: (...args) => { + if (args[1] === 'close' && args[2]) { + this.onConnectionEnd(connection) + } + + return Reflect.set(...args) + } + }) + + // Create the connection + const connection = new Connection({ + localAddr: maConn.localAddr, + remoteAddr: maConn.remoteAddr, + localPeer: this.localPeer, + remotePeer: remotePeer, + stat: { + direction, + timeline: timelineProxy, + multiplexer: Muxer.multicodec, + encryption: cryptoProtocol + }, + newStream, + getStreams: () => muxer.streams, + close: err => maConn.close(err) + }) + + this.onConnection(connection) + + return connection + } + + /** + * Routes incoming streams to the correct handler + * @private + * @param {object} options + * @param {Stream} options.stream + * @param {string} protocol + */ + _onStream ({ stream, protocol }) { + const handler = this.protocols.get(protocol) + handler({ stream, protocol }) + } + + /** + * Attempts to encrypt the incoming `connection` with the provided `cryptos`. + * @private + * @async + * @param {PeerId} localPeer The initiators PeerInfo + * @param {*} connection + * @param {Map} cryptos + * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + */ + async _encryptInbound (localPeer, connection, cryptos) { + const mss = new Multistream.Listener(connection) + const protocols = Array.from(cryptos.keys()) + log('selecting inbound crypto protocol', protocols) + + try { + const { stream, protocol } = await mss.handle(protocols) + const crypto = cryptos.get(protocol) + log('encrypting inbound connection...') + + return { + ...await crypto.secureInbound(localPeer, stream), + protocol + } + } catch (err) { + throw errCode(err, codes.ERR_ENCRYPTION_FAILED) + } + } + + /** + * Attempts to encrypt the given `connection` with the provided `cryptos`. + * The first `Crypto` module to succeed will be used + * @private + * @async + * @param {PeerId} localPeer The initiators PeerInfo + * @param {*} connection + * @param {PeerId} remotePeerId + * @param {Map} cryptos + * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used + */ + async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) { + const mss = new Multistream.Dialer(connection) + const protocols = Array.from(cryptos.keys()) + log('selecting outbound crypto protocol', protocols) + + try { + const { stream, protocol } = await mss.select(protocols) + const crypto = cryptos.get(protocol) + log('encrypting outbound connection to %j', remotePeerId) + + return { + ...await crypto.secureOutbound(localPeer, stream, remotePeerId), + protocol + } + } catch (err) { + throw errCode(err, codes.ERR_ENCRYPTION_FAILED) + } + } + + /** + * Selects one of the given muxers via multistream-select. That + * muxer will be used for all future streams on the connection. + * @private + * @async + * @param {*} connection A basic duplex connection to multiplex + * @param {Map} muxers The muxers to attempt multiplexing with + * @returns {*} A muxed connection + */ + async _multiplexOutbound (connection, muxers) { + const dialer = new Multistream.Dialer(connection) + const protocols = Array.from(muxers.keys()) + log('outbound selecting muxer %s', protocols) + try { + const { stream, protocol } = await dialer.select(protocols) + log('%s selected as muxer protocol', protocol) + const Muxer = muxers.get(protocol) + return { stream, Muxer } + } catch (err) { + throw errCode(err, codes.ERR_MUXER_UNAVAILABLE) + } + } + + /** + * Registers support for one of the given muxers via multistream-select. The + * selected muxer will be used for all future streams on the connection. + * @private + * @async + * @param {*} connection A basic duplex connection to multiplex + * @param {Map} muxers The muxers to attempt multiplexing with + * @returns {*} A muxed connection + */ + async _multiplexInbound (connection, muxers) { + const listener = new Multistream.Listener(connection) + const protocols = Array.from(muxers.keys()) + log('inbound handling muxers %s', protocols) + try { + const { stream, protocol } = await listener.handle(protocols) + const Muxer = muxers.get(protocol) + return { stream, Muxer } + } catch (err) { + throw errCode(err, codes.ERR_MUXER_UNAVAILABLE) + } + } +} + +module.exports = Upgrader diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js new file mode 100644 index 00000000..071ad900 --- /dev/null +++ b/test/dialing/direct.node.js @@ -0,0 +1,236 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') +const Transport = require('libp2p-tcp') +const Muxer = require('libp2p-mplex') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const delay = require('delay') +const pDefer = require('p-defer') +const pipe = require('it-pipe') + +const Libp2p = require('../../src') +const Dialer = require('../../src/dialer') +const TransportManager = require('../../src/transport-manager') +const { codes: ErrorCodes } = require('../../src/errors') + +const mockUpgrader = require('../utils/mockUpgrader') +const mockCrypto = require('../utils/mockCrypto') +const Peers = require('../fixtures/peers') + +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') +const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws') + +describe('Dialing (direct, TCP)', () => { + let remoteTM + let localTM + let remoteAddr + + before(async () => { + remoteTM = new TransportManager({ + libp2p: {}, + upgrader: mockUpgrader + }) + remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport) + + localTM = new TransportManager({ + libp2p: {}, + upgrader: mockUpgrader + }) + localTM.add(Transport.prototype[Symbol.toStringTag], Transport) + + await remoteTM.listen([listenAddr]) + + remoteAddr = remoteTM.getAddrs()[0] + }) + + after(async () => { + await remoteTM.close() + }) + + afterEach(() => { + sinon.restore() + }) + + it('should be able to connect to a remote node via its multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + const connection = await dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + await connection.close() + }) + + it('should be able to connect to a remote node via its stringified multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + const connection = await dialer.connectToMultiaddr(remoteAddr.toString()) + expect(connection).to.exist() + await connection.close() + }) + + it('should fail to connect to an unsupported multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + try { + await dialer.connectToMultiaddr(unsupportedAddr) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) + return + } + + expect.fail('Dial should have failed') + }) + + it('should be able to connect to a given peer', async () => { + const dialer = new Dialer({ transportManager: localTM }) + const peerId = await PeerId.createFromJSON(Peers[0]) + const peerInfo = new PeerInfo(peerId) + peerInfo.multiaddrs.add(remoteAddr) + + const connection = await dialer.connectToPeer(peerInfo) + expect(connection).to.exist() + await connection.close() + }) + + it('should fail to connect to a given peer with unsupported addresses', async () => { + const dialer = new Dialer({ transportManager: localTM }) + const peerId = await PeerId.createFromJSON(Peers[0]) + const peerInfo = new PeerInfo(peerId) + peerInfo.multiaddrs.add(unsupportedAddr) + + try { + await dialer.connectToPeer(peerInfo) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED) + return + } + + expect.fail('Dial should have failed') + }) + + it('should abort dials on queue task timeout', async () => { + const dialer = new Dialer({ + transportManager: localTM, + timeout: 50 + }) + sinon.stub(localTM, 'dial').callsFake(async (addr, options) => { + expect(options.signal).to.exist() + expect(options.signal.aborted).to.equal(false) + expect(addr.toString()).to.eql(remoteAddr.toString()) + await delay(60) + expect(options.signal.aborted).to.equal(true) + }) + + try { + await dialer.connectToMultiaddr(remoteAddr) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT) + return + } + + expect.fail('Dial should have failed') + }) + + it('should dial to the max concurrency', async () => { + const dialer = new Dialer({ + transportManager: localTM, + concurrency: 2 + }) + + const deferredDial = pDefer() + sinon.stub(localTM, 'dial').callsFake(async () => { + await deferredDial.promise + }) + + // Add 3 dials + Promise.all([ + dialer.connectToMultiaddr(remoteAddr), + dialer.connectToMultiaddr(remoteAddr), + dialer.connectToMultiaddr(remoteAddr) + ]) + + // Let the call stack run + await delay(0) + + // We should have 2 in progress, and 1 waiting + expect(localTM.dial.callCount).to.equal(2) + expect(dialer.queue.pending).to.equal(2) + expect(dialer.queue.size).to.equal(1) + + deferredDial.resolve() + + // Let the call stack run + await delay(0) + // All dials should have executed + expect(localTM.dial.callCount).to.equal(3) + expect(dialer.queue.pending).to.equal(0) + expect(dialer.queue.size).to.equal(0) + }) +}) + +describe('libp2p.dialer', () => { + let peerInfo + let remotePeerInfo + let libp2p + let remoteLibp2p + let remoteAddr + + before(async () => { + const [peerId, remotePeerId] = await Promise.all([ + PeerId.createFromJSON(Peers[0]), + PeerId.createFromJSON(Peers[1]) + ]) + + peerInfo = new PeerInfo(peerId) + remotePeerInfo = new PeerInfo(remotePeerId) + + remoteLibp2p = new Libp2p({ + peerInfo: remotePeerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) + + await remoteLibp2p.transportManager.listen([listenAddr]) + remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + afterEach(async () => { + sinon.restore() + libp2p && await libp2p.stop() + libp2p = null + }) + + after(async () => { + await remoteLibp2p.stop() + }) + + it('should use the dialer for connecting', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + sinon.spy(libp2p.dialer, 'connectToMultiaddr') + + const connection = await libp2p.dial(remoteAddr) + expect(connection).to.exist() + const { stream, protocol } = await connection.newStream('/echo/1.0.0') + expect(stream).to.exist() + expect(protocol).to.equal('/echo/1.0.0') + await connection.close() + expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) + }) +}) diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js new file mode 100644 index 00000000..bfc77b4d --- /dev/null +++ b/test/dialing/direct.spec.js @@ -0,0 +1,211 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') +const pDefer = require('p-defer') +const delay = require('delay') +const Transport = require('libp2p-websockets') +const Muxer = require('libp2p-mplex') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') + +const { codes: ErrorCodes } = require('../../src/errors') +const Constants = require('../../src/constants') +const Dialer = require('../../src/dialer') +const TransportManager = require('../../src/transport-manager') +const Libp2p = require('../../src') + +const Peers = require('../fixtures/peers') +const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') +const mockUpgrader = require('../utils/mockUpgrader') +const mockCrypto = require('../utils/mockCrypto') +const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws') +const remoteAddr = MULTIADDRS_WEBSOCKETS[0] + +describe('Dialing (direct, WebSockets)', () => { + let localTM + + before(() => { + localTM = new TransportManager({ + libp2p: {}, + upgrader: mockUpgrader, + onConnection: () => {} + }) + localTM.add(Transport.prototype[Symbol.toStringTag], Transport) + }) + + afterEach(() => { + sinon.restore() + }) + + it('should have appropriate defaults', () => { + const dialer = new Dialer({ transportManager: localTM }) + expect(dialer.concurrency).to.equal(Constants.MAX_PARALLEL_DIALS) + expect(dialer.timeout).to.equal(Constants.DIAL_TIMEOUT) + }) + + it('should be able to connect to a remote node via its multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + const connection = await dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + await connection.close() + }) + + it('should be able to connect to a remote node via its stringified multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + const connection = await dialer.connectToMultiaddr(remoteAddr.toString()) + expect(connection).to.exist() + await connection.close() + }) + + it('should fail to connect to an unsupported multiaddr', async () => { + const dialer = new Dialer({ transportManager: localTM }) + + try { + await dialer.connectToMultiaddr(unsupportedAddr) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) + return + } + + expect.fail('Dial should have failed') + }) + + it('should be able to connect to a given peer', async () => { + const dialer = new Dialer({ transportManager: localTM }) + const peerId = await PeerId.createFromJSON(Peers[0]) + const peerInfo = new PeerInfo(peerId) + peerInfo.multiaddrs.add(remoteAddr) + + const connection = await dialer.connectToPeer(peerInfo) + expect(connection).to.exist() + await connection.close() + }) + + it('should fail to connect to a given peer with unsupported addresses', async () => { + const dialer = new Dialer({ transportManager: localTM }) + const peerId = await PeerId.createFromJSON(Peers[0]) + const peerInfo = new PeerInfo(peerId) + peerInfo.multiaddrs.add(unsupportedAddr) + + try { + await dialer.connectToPeer(peerInfo) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED) + return + } + + expect.fail('Dial should have failed') + }) + + it('should abort dials on queue task timeout', async () => { + const dialer = new Dialer({ + transportManager: localTM, + timeout: 50 + }) + sinon.stub(localTM, 'dial').callsFake(async (addr, options) => { + expect(options.signal).to.exist() + expect(options.signal.aborted).to.equal(false) + expect(addr.toString()).to.eql(remoteAddr.toString()) + await delay(60) + expect(options.signal.aborted).to.equal(true) + }) + + try { + await dialer.connectToMultiaddr(remoteAddr) + } catch (err) { + expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT) + return + } + + expect.fail('Dial should have failed') + }) + + it('should dial to the max concurrency', async () => { + const dialer = new Dialer({ + transportManager: localTM, + concurrency: 2 + }) + + const deferredDial = pDefer() + sinon.stub(localTM, 'dial').callsFake(async () => { + await deferredDial.promise + }) + + // Add 3 dials + Promise.all([ + dialer.connectToMultiaddr(remoteAddr), + dialer.connectToMultiaddr(remoteAddr), + dialer.connectToMultiaddr(remoteAddr) + ]) + + // Let the call stack run + await delay(0) + + // We should have 2 in progress, and 1 waiting + expect(localTM.dial.callCount).to.equal(2) + expect(dialer.queue.pending).to.equal(2) + expect(dialer.queue.size).to.equal(1) + + deferredDial.resolve() + + // Let the call stack run + await delay(0) + // All dials should have executed + expect(localTM.dial.callCount).to.equal(3) + expect(dialer.queue.pending).to.equal(0) + expect(dialer.queue.size).to.equal(0) + }) +}) + +describe.skip('libp2p.dialer', () => { + let peerInfo + let libp2p + + before(async () => { + const peerId = await PeerId.createFromJSON(Peers[0]) + peerInfo = new PeerInfo(peerId) + }) + + afterEach(async () => { + sinon.restore() + libp2p && await libp2p.stop() + libp2p = null + }) + + it('should create a dialer', () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + expect(libp2p.dialer).to.exist() + // Ensure the dialer also has the transport manager + expect(libp2p.transportManager).to.equal(libp2p.dialer.transportManager) + }) + + it('should use the dialer for connecting', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + await connection.close() + }) +}) diff --git a/test/node.js b/test/node.js index fa27625f..61887dac 100644 --- a/test/node.js +++ b/test/node.js @@ -1,3 +1,14 @@ 'use strict' -require('./transports/transport-manager.node') +const glob = require('glob') +const path = require('path') + +// Automatically require test files so we don't have to worry about adding new ones +glob('test/**/*.node.js', function (err, testPaths) { + if (err) throw err + if (testPaths.length < 1) throw new Error('Could not find any node test files') + + testPaths.forEach(file => { + require(path.resolve(file)) + }) +}) diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js new file mode 100644 index 00000000..d0583584 --- /dev/null +++ b/test/upgrading/upgrader.spec.js @@ -0,0 +1,370 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') +const Muxer = require('libp2p-mplex') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') +const pSettle = require('p-settle') +const Transport = require('libp2p-websockets') + +const Libp2p = require('../../src') +const Upgrader = require('../../src/upgrader') +const { codes } = require('../../src/errors') + +const mockCrypto = require('../utils/mockCrypto') +const mockMultiaddrConnPair = require('../utils/mockMultiaddrConn') +const Peers = require('../fixtures/peers') +const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/0'), + multiaddr('/ip4/127.0.0.1/tcp/0') +] + +describe('Upgrader', () => { + let localUpgrader + let remoteUpgrader + let localPeer + let remotePeer + + before(async () => { + ([ + localPeer, + remotePeer + ] = await Promise.all([ + PeerId.createFromJSON(Peers[0]), + PeerId.createFromJSON(Peers[1]) + ])) + + localUpgrader = new Upgrader({ + localPeer + }) + remoteUpgrader = new Upgrader({ + localPeer: remotePeer + }) + + localUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) + remoteUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) + }) + + afterEach(() => { + sinon.restore() + }) + + it('should ignore a missing remote peer id', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + // Remove the peer id from the remote address + outbound.remoteAddr = outbound.remoteAddr.decapsulateCode(421) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + }) + + it('should upgrade with valid muxers and crypto', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + + const { stream, protocol } = await connections[0].newStream('/echo/1.0.0') + expect(protocol).to.equal('/echo/1.0.0') + + const hello = Buffer.from('hello there!') + const result = await pipe( + [hello], + stream, + function toBuffer (source) { + return (async function * () { + for await (const val of source) yield val.slice() + })() + }, + collect + ) + + expect(result).to.eql([hello]) + }) + + it('should fail if crypto fails', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const crypto = { + tag: '/insecure', + secureInbound: () => { throw new Error('Boom') }, + secureOutbound: () => { throw new Error('Boom') } + } + + const cryptos = new Map([[crypto.tag, crypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + // Wait for the results of each side of the connection + const results = await pSettle([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + // Ensure both sides fail + expect(results).to.have.length(2) + results.forEach(result => { + expect(result.isRejected).to.equal(true) + expect(result.reason.code).to.equal(codes.ERR_ENCRYPTION_FAILED) + }) + }) + + it('should fail if muxers do not match', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxersLocal = new Map([['/muxer-local', Muxer]]) + const muxersRemote = new Map([['/muxer-remote', Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxersLocal) + sinon.stub(remoteUpgrader, 'muxers').value(muxersRemote) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + // Wait for the results of each side of the connection + const results = await pSettle([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + // Ensure both sides fail + expect(results).to.have.length(2) + results.forEach(result => { + expect(result.isRejected).to.equal(true) + expect(result.reason.code).to.equal(codes.ERR_MUXER_UNAVAILABLE) + }) + }) + + it('should map getStreams and close methods', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + + // Create a few streams, at least 1 in each direction + await connections[0].newStream('/echo/1.0.0') + await connections[1].newStream('/echo/1.0.0') + await connections[0].newStream('/echo/1.0.0') + connections.forEach(conn => { + expect(conn.streams).to.have.length(3) + }) + + // Verify the MultiaddrConnection close method is called + sinon.spy(inbound, 'close') + sinon.spy(outbound, 'close') + await Promise.all(connections.map(conn => conn.close())) + expect(inbound.close.callCount).to.equal(1) + expect(outbound.close.callCount).to.equal(1) + }) + + it('should call connection handlers', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + // Verify onConnection is called with the connection + sinon.spy(localUpgrader, 'onConnection') + sinon.spy(remoteUpgrader, 'onConnection') + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + expect(connections).to.have.length(2) + expect(localUpgrader.onConnection.callCount).to.equal(1) + expect(localUpgrader.onConnection.getCall(0).args).to.eql([connections[0]]) + expect(remoteUpgrader.onConnection.callCount).to.equal(1) + expect(remoteUpgrader.onConnection.getCall(0).args).to.eql([connections[1]]) + + // Verify onConnectionEnd is called with the connection + sinon.spy(localUpgrader, 'onConnectionEnd') + sinon.spy(remoteUpgrader, 'onConnectionEnd') + await Promise.all(connections.map(conn => conn.close())) + expect(localUpgrader.onConnectionEnd.callCount).to.equal(1) + expect(localUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[0]]) + expect(remoteUpgrader.onConnectionEnd.callCount).to.equal(1) + expect(remoteUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[1]]) + }) + + it('should fail to create a stream for an unsupported protocol', async () => { + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const muxers = new Map([[Muxer.multicodec, Muxer]]) + sinon.stub(localUpgrader, 'muxers').value(muxers) + sinon.stub(remoteUpgrader, 'muxers').value(muxers) + + const cryptos = new Map([[mockCrypto.tag, mockCrypto]]) + sinon.stub(localUpgrader, 'cryptos').value(cryptos) + sinon.stub(remoteUpgrader, 'cryptos').value(cryptos) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + expect(connections).to.have.length(2) + + const results = await pSettle([ + connections[0].newStream('/unsupported/1.0.0'), + connections[1].newStream('/unsupported/1.0.0') + ]) + expect(results).to.have.length(2) + results.forEach(result => { + expect(result.isRejected).to.equal(true) + expect(result.reason.code).to.equal(codes.ERR_UNSUPPORTED_PROTOCOL) + }) + }) +}) + +describe('libp2p.upgrader', () => { + let peers + let libp2p + + before(async () => { + const ids = await Promise.all([ + PeerId.createFromJSON(Peers[0]), + PeerId.createFromJSON(Peers[1]) + ]) + peers = ids.map(peerId => new PeerInfo(peerId)) + }) + + afterEach(async () => { + sinon.restore() + libp2p && await libp2p.stop() + libp2p = null + }) + + it('should create an Upgrader', () => { + libp2p = new Libp2p({ + peerInfo: peers[0], + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + expect(libp2p.upgrader).to.exist() + expect(libp2p.upgrader.muxers).to.eql(new Map([[Muxer.multicodec, Muxer]])) + expect(libp2p.upgrader.cryptos).to.eql(new Map([[mockCrypto.tag, mockCrypto]])) + // Ensure the transport manager also has the upgrader + expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader) + }) + + it('should be able to register and unregister a handler', () => { + libp2p = new Libp2p({ + peerInfo: peers[0], + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + expect(libp2p.upgrader.protocols.size).to.equal(0) + + const echoHandler = () => {} + libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) + expect(libp2p.upgrader.protocols.size).to.equal(2) + expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler) + expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) + + libp2p.unhandle(['/echo/1.0.0']) + expect(libp2p.upgrader.protocols.size).to.equal(1) + expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined) + expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) + }) + + it('should emit connect and disconnect events', async () => { + const remotePeer = peers[1] + libp2p = new Libp2p({ + peerInfo: peers[0], + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } + }) + + const remoteUpgrader = new Upgrader({ + localPeer: remotePeer.id, + muxers: new Map([[Muxer.multicodec, Muxer]]), + cryptos: new Map([[mockCrypto.tag, mockCrypto]]) + }) + + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer: remotePeer.id }) + + // Spy on emit for easy verification + sinon.spy(libp2p, 'emit') + + // Upgrade and check the connect event + const connections = await Promise.all([ + libp2p.upgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + expect(libp2p.emit.callCount).to.equal(1) + let [event, peerInfo] = libp2p.emit.getCall(0).args + expect(event).to.equal('peer:connect') + expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true) + + // Close and check the disconnect event + await Promise.all(connections.map(conn => conn.close())) + expect(libp2p.emit.callCount).to.equal(2) + ;([event, peerInfo] = libp2p.emit.getCall(1).args) + expect(event).to.equal('peer:disconnect') + expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true) + }) +}) diff --git a/test/utils/mockCrypto.js b/test/utils/mockCrypto.js new file mode 100644 index 00000000..d013cec2 --- /dev/null +++ b/test/utils/mockCrypto.js @@ -0,0 +1,24 @@ +'use strict' + +const PeerId = require('peer-id') +const Peers = require('../fixtures/peers') + +module.exports = { + tag: '/insecure', + secureInbound: (localPeer, stream) => { + return { + conn: stream, + remotePeer: localPeer + } + }, + secureOutbound: async (localPeer, stream, remotePeer) => { + // Crypto should always return a remotePeer + if (!remotePeer) { + remotePeer = await PeerId.createFromJSON(Peers[0]) + } + return { + conn: stream, + remotePeer: remotePeer + } + } +} diff --git a/test/utils/mockMultiaddrConn.js b/test/utils/mockMultiaddrConn.js new file mode 100644 index 00000000..617f6604 --- /dev/null +++ b/test/utils/mockMultiaddrConn.js @@ -0,0 +1,43 @@ +'use strict' + +const duplexPair = require('it-pair/duplex') +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') + +/** + * Returns both sides of a mocked MultiaddrConnection + * @param {object} options + * @param {Multiaddr[]} options.addrs Should contain two addresses for the local and remote peer + * @param {PeerId} options.remotePeer The peer that is being "dialed" + * @returns {{inbound:MultiaddrConnection, outbound:MultiaddrConnection}} + */ +module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) { + const controller = new AbortController() + + const [inbound, outbound] = duplexPair() + outbound.localAddr = addrs[0] + outbound.remoteAddr = addrs[1].encapsulate(`/p2p/${remotePeer.toB58String()}`) + outbound.timeline = { + open: Date.now() + } + outbound.close = () => { + outbound.timeline.close = Date.now() + controller.abort() + } + + inbound.localAddr = addrs[1] + inbound.remoteAddr = addrs[0] + inbound.timeline = { + open: Date.now() + } + inbound.close = () => { + inbound.timeline.close = Date.now() + controller.abort() + } + + // Make the sources abortable so we can close them easily + inbound.source = abortable(inbound.source, controller.signal) + outbound.source = abortable(outbound.source, controller.signal) + + return { inbound, outbound } +}