mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-05-09 09:22:17 +00:00
refactor(async): add dialer and upgrader (#462)
* chore(deps): update connection and multistream * feat: add basic dial support for addresses and peers * test: automatically require all node test files * fix: dont catch and log in the wrong place * test: add direct spec test fix: improve dial error consistency * feat: add dial timeouts and concurrency Queue timeouts will result in aborts of the dials * chore: fix linting * test: verify dialer defaults * feat: add initial upgrader * fix: add more test coverage and fix bugs * feat: libp2p creates the upgrader * feat: hook up handle to the upgrader * feat: hook up the dialer to libp2p test: add node dialer libp2p tests * feat: add connection listeners to upgrader * feat: emit connect and disconnect events * chore: use libp2p-interfaces * fix: address review feedback * fix: correct import * refactor: dedupe connection creation code
This commit is contained in:
parent
6ecc9b80c3
commit
a23d4d23cb
14
package.json
14
package.json
@ -41,6 +41,7 @@
|
|||||||
"npm": ">=6.0.0"
|
"npm": ">=6.0.0"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"abort-controller": "^3.0.0",
|
||||||
"async": "^2.6.2",
|
"async": "^2.6.2",
|
||||||
"bignumber.js": "^9.0.0",
|
"bignumber.js": "^9.0.0",
|
||||||
"class-is": "^1.1.0",
|
"class-is": "^1.1.0",
|
||||||
@ -48,15 +49,17 @@
|
|||||||
"err-code": "^1.1.2",
|
"err-code": "^1.1.2",
|
||||||
"fsm-event": "^2.1.0",
|
"fsm-event": "^2.1.0",
|
||||||
"hashlru": "^2.3.0",
|
"hashlru": "^2.3.0",
|
||||||
"interface-connection": "~0.3.3",
|
"it-pipe": "^1.0.1",
|
||||||
"latency-monitor": "~0.2.1",
|
"latency-monitor": "~0.2.1",
|
||||||
"libp2p-crypto": "^0.16.2",
|
"libp2p-crypto": "^0.16.2",
|
||||||
|
"libp2p-interfaces": "^0.1.1",
|
||||||
"mafmt": "^7.0.0",
|
"mafmt": "^7.0.0",
|
||||||
"merge-options": "^1.0.1",
|
"merge-options": "^1.0.1",
|
||||||
"moving-average": "^1.0.0",
|
"moving-average": "^1.0.0",
|
||||||
"multiaddr": "^7.1.0",
|
"multiaddr": "^7.1.0",
|
||||||
"multistream-select": "~0.14.6",
|
"multistream-select": "^0.15.0",
|
||||||
"once": "^1.4.0",
|
"once": "^1.4.0",
|
||||||
|
"p-queue": "^6.1.1",
|
||||||
"p-settle": "^3.1.0",
|
"p-settle": "^3.1.0",
|
||||||
"peer-book": "^0.9.1",
|
"peer-book": "^0.9.1",
|
||||||
"peer-id": "^0.13.3",
|
"peer-id": "^0.13.3",
|
||||||
@ -73,6 +76,7 @@
|
|||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@nodeutils/defaults-deep": "^1.1.0",
|
"@nodeutils/defaults-deep": "^1.1.0",
|
||||||
|
"abortable-iterator": "^2.1.0",
|
||||||
"aegir": "^20.0.0",
|
"aegir": "^20.0.0",
|
||||||
"chai": "^4.2.0",
|
"chai": "^4.2.0",
|
||||||
"chai-checkmark": "^1.0.1",
|
"chai-checkmark": "^1.0.1",
|
||||||
@ -80,7 +84,9 @@
|
|||||||
"delay": "^4.3.0",
|
"delay": "^4.3.0",
|
||||||
"dirty-chai": "^2.0.1",
|
"dirty-chai": "^2.0.1",
|
||||||
"electron-webrtc": "^0.3.0",
|
"electron-webrtc": "^0.3.0",
|
||||||
|
"glob": "^7.1.4",
|
||||||
"interface-datastore": "^0.6.0",
|
"interface-datastore": "^0.6.0",
|
||||||
|
"it-pair": "^1.0.0",
|
||||||
"libp2p-bootstrap": "^0.9.7",
|
"libp2p-bootstrap": "^0.9.7",
|
||||||
"libp2p-delegated-content-routing": "^0.2.2",
|
"libp2p-delegated-content-routing": "^0.2.2",
|
||||||
"libp2p-delegated-peer-routing": "^0.2.2",
|
"libp2p-delegated-peer-routing": "^0.2.2",
|
||||||
@ -88,7 +94,7 @@
|
|||||||
"libp2p-gossipsub": "~0.0.4",
|
"libp2p-gossipsub": "~0.0.4",
|
||||||
"libp2p-kad-dht": "^0.15.3",
|
"libp2p-kad-dht": "^0.15.3",
|
||||||
"libp2p-mdns": "^0.12.3",
|
"libp2p-mdns": "^0.12.3",
|
||||||
"libp2p-mplex": "^0.8.4",
|
"libp2p-mplex": "^0.9.1",
|
||||||
"libp2p-pnet": "~0.1.0",
|
"libp2p-pnet": "~0.1.0",
|
||||||
"libp2p-secio": "^0.11.1",
|
"libp2p-secio": "^0.11.1",
|
||||||
"libp2p-spdy": "^0.13.2",
|
"libp2p-spdy": "^0.13.2",
|
||||||
@ -96,6 +102,7 @@
|
|||||||
"libp2p-websockets": "^0.13.0",
|
"libp2p-websockets": "^0.13.0",
|
||||||
"lodash.times": "^4.3.2",
|
"lodash.times": "^4.3.2",
|
||||||
"nock": "^10.0.6",
|
"nock": "^10.0.6",
|
||||||
|
"p-defer": "^3.0.0",
|
||||||
"portfinder": "^1.0.20",
|
"portfinder": "^1.0.20",
|
||||||
"pull-goodbye": "0.0.2",
|
"pull-goodbye": "0.0.2",
|
||||||
"pull-length-prefixed": "^1.3.3",
|
"pull-length-prefixed": "^1.3.3",
|
||||||
@ -104,6 +111,7 @@
|
|||||||
"pull-protocol-buffers": "~0.1.2",
|
"pull-protocol-buffers": "~0.1.2",
|
||||||
"pull-serializer": "^0.3.2",
|
"pull-serializer": "^0.3.2",
|
||||||
"sinon": "^7.2.7",
|
"sinon": "^7.2.7",
|
||||||
|
"streaming-iterables": "^4.1.0",
|
||||||
"wrtc": "^0.4.1"
|
"wrtc": "^0.4.1"
|
||||||
},
|
},
|
||||||
"contributors": [
|
"contributors": [
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
# js-libp2p-circuit
|
# js-libp2p-circuit
|
||||||
|
|
||||||
> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/interface-connection) interface for dial/listen.
|
> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) interface for dial/listen.
|
||||||
|
|
||||||
**Note**: git history prior to merging into js-libp2p can be found in the original repository, https://github.com/libp2p/js-libp2p-circuit.
|
**Note**: git history prior to merging into js-libp2p can be found in the original repository, https://github.com/libp2p/js-libp2p-circuit.
|
||||||
|
|
||||||
@ -24,15 +24,18 @@ Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes
|
|||||||
|
|
||||||
## Table of Contents
|
## Table of Contents
|
||||||
|
|
||||||
- [Install](#install)
|
- [js-libp2p-circuit](#js-libp2p-circuit)
|
||||||
- [npm](#npm)
|
- [Why?](#why)
|
||||||
- [Usage](#usage)
|
- [libp2p-circuit and IPFS](#libp2p-circuit-and-ipfs)
|
||||||
- [Example](#example)
|
- [Table of Contents](#table-of-contents)
|
||||||
- [This module uses `pull-streams`](#this-module-uses-pull-streams)
|
- [Usage](#usage)
|
||||||
- [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams)
|
- [Example](#example)
|
||||||
- [API](#api)
|
- [Create dialer/listener](#create-dialerlistener)
|
||||||
- [Contribute](#contribute)
|
- [Create `relay`](#create-relay)
|
||||||
- [License](#license)
|
- [This module uses `pull-streams`](#this-module-uses-pull-streams)
|
||||||
|
- [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams)
|
||||||
|
- [API](#api)
|
||||||
|
- [Implementation rational](#implementation-rational)
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ const waterfall = require('async/waterfall')
|
|||||||
const setImmediate = require('async/setImmediate')
|
const setImmediate = require('async/setImmediate')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
const Connection = require('interface-connection').Connection
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
|
|
||||||
const utilsFactory = require('./utils')
|
const utilsFactory = require('./utils')
|
||||||
const StreamHandler = require('./stream-handler')
|
const StreamHandler = require('./stream-handler')
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
const setImmediate = require('async/setImmediate')
|
const setImmediate = require('async/setImmediate')
|
||||||
|
|
||||||
const EE = require('events').EventEmitter
|
const EE = require('events').EventEmitter
|
||||||
const Connection = require('interface-connection').Connection
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const utilsFactory = require('./utils')
|
const utilsFactory = require('./utils')
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
const proto = require('../protocol').CircuitRelay
|
const proto = require('../protocol').CircuitRelay
|
||||||
|
12
src/constants.js
Normal file
12
src/constants.js
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
DENY_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
|
||||||
|
DENY_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently denied
|
||||||
|
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
|
||||||
|
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
|
||||||
|
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
|
||||||
|
QUARTER_HOUR: 15 * 60e3,
|
||||||
|
PRIORITY_HIGH: 10,
|
||||||
|
PRIORITY_LOW: 20
|
||||||
|
}
|
98
src/dialer.js
Normal file
98
src/dialer.js
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const errCode = require('err-code')
|
||||||
|
const { default: PQueue } = require('p-queue')
|
||||||
|
const AbortController = require('abort-controller')
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:dialer')
|
||||||
|
log.error = debug('libp2p:dialer:error')
|
||||||
|
|
||||||
|
const { codes } = require('./errors')
|
||||||
|
const {
|
||||||
|
MAX_PARALLEL_DIALS,
|
||||||
|
DIAL_TIMEOUT
|
||||||
|
} = require('./constants')
|
||||||
|
|
||||||
|
class Dialer {
|
||||||
|
/**
|
||||||
|
* @constructor
|
||||||
|
* @param {object} options
|
||||||
|
* @param {TransportManager} options.transportManager
|
||||||
|
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
|
||||||
|
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
|
||||||
|
*/
|
||||||
|
constructor ({
|
||||||
|
transportManager,
|
||||||
|
concurrency = MAX_PARALLEL_DIALS,
|
||||||
|
timeout = DIAL_TIMEOUT
|
||||||
|
}) {
|
||||||
|
this.transportManager = transportManager
|
||||||
|
this.concurrency = concurrency
|
||||||
|
this.timeout = timeout
|
||||||
|
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
|
||||||
|
* dialed, it will be used for encryption verification.
|
||||||
|
*
|
||||||
|
* @async
|
||||||
|
* @param {Multiaddr} addr The address to dial
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {AbortSignal} [options.signal] An AbortController signal
|
||||||
|
* @returns {Promise<Connection>}
|
||||||
|
*/
|
||||||
|
async connectToMultiaddr (addr, options = {}) {
|
||||||
|
addr = multiaddr(addr)
|
||||||
|
let conn
|
||||||
|
let controller
|
||||||
|
|
||||||
|
if (!options.signal) {
|
||||||
|
controller = new AbortController()
|
||||||
|
options.signal = controller.signal
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
conn = await this.queue.add(() => this.transportManager.dial(addr, options))
|
||||||
|
} catch (err) {
|
||||||
|
if (err.name === 'TimeoutError') {
|
||||||
|
controller.abort()
|
||||||
|
err.code = codes.ERR_TIMEOUT
|
||||||
|
}
|
||||||
|
log.error('Error dialing address %s,', addr, err)
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connects to a given `PeerInfo` by dialing all of its known addresses.
|
||||||
|
* The dial to the first address that is successfully able to upgrade a connection
|
||||||
|
* will be used.
|
||||||
|
*
|
||||||
|
* @async
|
||||||
|
* @param {PeerInfo} peerInfo The remote peer to dial
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {AbortSignal} [options.signal] An AbortController signal
|
||||||
|
* @returns {Promise<Connection>}
|
||||||
|
*/
|
||||||
|
async connectToPeer (peerInfo, options = {}) {
|
||||||
|
const addrs = peerInfo.multiaddrs.toArray()
|
||||||
|
for (const addr of addrs) {
|
||||||
|
try {
|
||||||
|
return await this.connectToMultiaddr(addr, options)
|
||||||
|
} catch (_) {
|
||||||
|
// The error is already logged, just move to the next addr
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED)
|
||||||
|
log.error(err)
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Dialer
|
@ -8,10 +8,16 @@ exports.messages = {
|
|||||||
exports.codes = {
|
exports.codes = {
|
||||||
DHT_DISABLED: 'ERR_DHT_DISABLED',
|
DHT_DISABLED: 'ERR_DHT_DISABLED',
|
||||||
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
|
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
|
||||||
|
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
|
||||||
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
||||||
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
||||||
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
||||||
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
||||||
|
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
||||||
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
|
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
|
||||||
ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE'
|
ERR_MUXER_UNAVAILABLE: 'ERR_MUXER_UNAVAILABLE',
|
||||||
|
ERR_TIMEOUT: 'ERR_TIMEOUT',
|
||||||
|
ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE',
|
||||||
|
ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED',
|
||||||
|
ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL'
|
||||||
}
|
}
|
||||||
|
199
src/index.js
199
src/index.js
@ -13,20 +13,22 @@ const nextTick = require('async/nextTick')
|
|||||||
|
|
||||||
const PeerBook = require('peer-book')
|
const PeerBook = require('peer-book')
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
const Switch = require('./switch')
|
const Switch = require('./switch')
|
||||||
const Ping = require('./ping')
|
const Ping = require('./ping')
|
||||||
const ConnectionManager = require('./connection-manager')
|
|
||||||
|
|
||||||
const { emitFirst } = require('./util')
|
const { emitFirst } = require('./util')
|
||||||
const peerRouting = require('./peer-routing')
|
const peerRouting = require('./peer-routing')
|
||||||
const contentRouting = require('./content-routing')
|
const contentRouting = require('./content-routing')
|
||||||
const dht = require('./dht')
|
const dht = require('./dht')
|
||||||
const pubsub = require('./pubsub')
|
const pubsub = require('./pubsub')
|
||||||
const { getPeerInfoRemote } = require('./get-peer-info')
|
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
|
||||||
const validateConfig = require('./config').validate
|
const { validate: validateConfig } = require('./config')
|
||||||
const { codes } = require('./errors')
|
const { codes } = require('./errors')
|
||||||
|
|
||||||
|
const Dialer = require('./dialer')
|
||||||
const TransportManager = require('./transport-manager')
|
const TransportManager = require('./transport-manager')
|
||||||
|
const Upgrader = require('./upgrader')
|
||||||
|
|
||||||
const notStarted = (action, state) => {
|
const notStarted = (action, state) => {
|
||||||
return errCode(
|
return errCode(
|
||||||
@ -61,64 +63,49 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
// create the switch, and listen for errors
|
// create the switch, and listen for errors
|
||||||
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
|
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
|
||||||
this._switch.on('error', (...args) => this.emit('error', ...args))
|
|
||||||
|
|
||||||
this.stats = this._switch.stats
|
// Setup the Upgrader
|
||||||
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
|
this.upgrader = new Upgrader({
|
||||||
|
localPeer: this.peerInfo.id,
|
||||||
|
onConnection: (connection) => {
|
||||||
|
const peerInfo = getPeerInfo(connection.remotePeer)
|
||||||
|
this.emit('peer:connect', peerInfo)
|
||||||
|
},
|
||||||
|
onConnectionEnd: (connection) => {
|
||||||
|
const peerInfo = getPeerInfo(connection.remotePeer)
|
||||||
|
this.emit('peer:disconnect', peerInfo)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Setup the transport manager
|
// Setup the transport manager
|
||||||
this.transportManager = new TransportManager({
|
this.transportManager = new TransportManager({
|
||||||
libp2p: this,
|
libp2p: this,
|
||||||
// TODO: set the actual upgrader
|
upgrader: this.upgrader
|
||||||
upgrader: {
|
|
||||||
upgradeInbound: (maConn) => maConn,
|
|
||||||
upgradeOutbound: (maConn) => maConn
|
|
||||||
},
|
|
||||||
// TODO: Route incoming connections to a multiplex protocol router
|
|
||||||
onConnection: () => {}
|
|
||||||
})
|
})
|
||||||
this._modules.transport.forEach((Transport) => {
|
this._modules.transport.forEach((Transport) => {
|
||||||
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
|
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Attach stream multiplexers
|
|
||||||
if (this._modules.streamMuxer) {
|
|
||||||
const muxers = this._modules.streamMuxer
|
|
||||||
muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer))
|
|
||||||
|
|
||||||
// If muxer exists
|
|
||||||
// we can use Identify
|
|
||||||
this._switch.connection.reuse()
|
|
||||||
// we can use Relay for listening/dialing
|
|
||||||
this._switch.connection.enableCircuitRelay(this._config.relay)
|
|
||||||
|
|
||||||
// Received incomming dial and muxer upgrade happened,
|
|
||||||
// reuse this muxed connection
|
|
||||||
this._switch.on('peer-mux-established', (peerInfo) => {
|
|
||||||
this.emit('peer:connect', peerInfo)
|
|
||||||
})
|
|
||||||
|
|
||||||
this._switch.on('peer-mux-closed', (peerInfo) => {
|
|
||||||
this.emit('peer:disconnect', peerInfo)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Events for anytime connections are created/removed
|
|
||||||
this._switch.on('connection:start', (peerInfo) => {
|
|
||||||
this.emit('connection:start', peerInfo)
|
|
||||||
})
|
|
||||||
this._switch.on('connection:end', (peerInfo) => {
|
|
||||||
this.emit('connection:end', peerInfo)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Attach crypto channels
|
// Attach crypto channels
|
||||||
if (this._modules.connEncryption) {
|
if (this._modules.connEncryption) {
|
||||||
const cryptos = this._modules.connEncryption
|
const cryptos = this._modules.connEncryption
|
||||||
cryptos.forEach((crypto) => {
|
cryptos.forEach((crypto) => {
|
||||||
this._switch.connection.crypto(crypto.tag, crypto.encrypt)
|
this.upgrader.cryptos.set(crypto.tag, crypto)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attach stream multiplexers
|
||||||
|
if (this._modules.streamMuxer) {
|
||||||
|
const muxers = this._modules.streamMuxer
|
||||||
|
muxers.forEach((muxer) => {
|
||||||
|
this.upgrader.muxers.set(muxer.multicodec, muxer)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
this.dialer = new Dialer({
|
||||||
|
transportManager: this.transportManager
|
||||||
|
})
|
||||||
|
|
||||||
// Attach private network protector
|
// Attach private network protector
|
||||||
if (this._modules.connProtector) {
|
if (this._modules.connProtector) {
|
||||||
this._switch.protector = this._modules.connProtector
|
this._switch.protector = this._modules.connProtector
|
||||||
@ -153,7 +140,8 @@ class Libp2p extends EventEmitter {
|
|||||||
this.state = new FSM('STOPPED', {
|
this.state = new FSM('STOPPED', {
|
||||||
STOPPED: {
|
STOPPED: {
|
||||||
start: 'STARTING',
|
start: 'STARTING',
|
||||||
stop: 'STOPPED'
|
stop: 'STOPPED',
|
||||||
|
done: 'STOPPED'
|
||||||
},
|
},
|
||||||
STARTING: {
|
STARTING: {
|
||||||
done: 'STARTED',
|
done: 'STARTED',
|
||||||
@ -175,7 +163,6 @@ class Libp2p extends EventEmitter {
|
|||||||
})
|
})
|
||||||
this.state.on('STOPPING', () => {
|
this.state.on('STOPPING', () => {
|
||||||
log('libp2p is stopping')
|
log('libp2p is stopping')
|
||||||
this._onStopping()
|
|
||||||
})
|
})
|
||||||
this.state.on('STARTED', () => {
|
this.state.on('STARTED', () => {
|
||||||
log('libp2p has started')
|
log('libp2p has started')
|
||||||
@ -201,7 +188,7 @@ class Libp2p extends EventEmitter {
|
|||||||
this._peerDiscovered = this._peerDiscovered.bind(this)
|
this._peerDiscovered = this._peerDiscovered.bind(this)
|
||||||
|
|
||||||
// promisify all instance methods
|
// promisify all instance methods
|
||||||
;['start', 'stop', 'dial', 'dialProtocol', 'dialFSM', 'hangUp', 'ping'].forEach(method => {
|
;['start', 'hangUp', 'ping'].forEach(method => {
|
||||||
this[method] = promisify(this[method], { context: this })
|
this[method] = promisify(this[method], { context: this })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -234,13 +221,21 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the libp2p node by closing its listeners and open connections
|
* Stop the libp2p node by closing its listeners and open connections
|
||||||
*
|
* @async
|
||||||
* @param {function(Error)} callback
|
|
||||||
* @returns {void}
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
stop (callback = () => {}) {
|
async stop () {
|
||||||
emitFirst(this, ['error', 'stop'], callback)
|
|
||||||
this.state('stop')
|
this.state('stop')
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.transportManager.close()
|
||||||
|
} catch (err) {
|
||||||
|
if (err) {
|
||||||
|
log.error(err)
|
||||||
|
this.emit('error', err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.state('done')
|
||||||
}
|
}
|
||||||
|
|
||||||
isStarted () {
|
isStarted () {
|
||||||
@ -252,11 +247,12 @@ class Libp2p extends EventEmitter {
|
|||||||
* peer will be added to the nodes `PeerBook`
|
* peer will be added to the nodes `PeerBook`
|
||||||
*
|
*
|
||||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
||||||
* @param {function(Error)} callback
|
* @param {object} options
|
||||||
* @returns {void}
|
* @param {AbortSignal} [options.signal]
|
||||||
|
* @returns {Promise<Connection>}
|
||||||
*/
|
*/
|
||||||
dial (peer, callback) {
|
dial (peer, options) {
|
||||||
this.dialProtocol(peer, null, callback)
|
return this.dialProtocol(peer, null, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -264,50 +260,28 @@ class Libp2p extends EventEmitter {
|
|||||||
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
|
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
|
||||||
* and the `Connection` will be sent in the callback
|
* and the `Connection` will be sent in the callback
|
||||||
*
|
*
|
||||||
|
* @async
|
||||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
||||||
* @param {string} protocol
|
* @param {string[]|string} protocols
|
||||||
* @param {function(Error, Connection)} callback
|
* @param {object} options
|
||||||
* @returns {void}
|
* @param {AbortSignal} [options.signal]
|
||||||
|
* @returns {Promise<Connection|*>}
|
||||||
*/
|
*/
|
||||||
dialProtocol (peer, protocol, callback) {
|
async dialProtocol (peer, protocols, options) {
|
||||||
if (!this.isStarted()) {
|
let connection
|
||||||
return callback(notStarted('dial', this.state._state))
|
if (multiaddr.isMultiaddr(peer)) {
|
||||||
|
connection = await this.dialer.connectToMultiaddr(peer, options)
|
||||||
|
} else {
|
||||||
|
peer = await getPeerInfoRemote(peer, this)
|
||||||
|
connection = await this.dialer.connectToPeer(peer, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof protocol === 'function') {
|
// If a protocol was provided, create a new stream
|
||||||
callback = protocol
|
if (protocols) {
|
||||||
protocol = undefined
|
return connection.newStream(protocols)
|
||||||
}
|
}
|
||||||
|
|
||||||
getPeerInfoRemote(peer, this)
|
return connection
|
||||||
.then(peerInfo => {
|
|
||||||
this._switch.dial(peerInfo, protocol, callback)
|
|
||||||
}, callback)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Similar to `dial` and `dialProtocol`, but the callback will contain a
|
|
||||||
* Connection State Machine.
|
|
||||||
*
|
|
||||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
|
||||||
* @param {string} protocol
|
|
||||||
* @param {function(Error, ConnectionFSM)} callback
|
|
||||||
* @returns {void}
|
|
||||||
*/
|
|
||||||
dialFSM (peer, protocol, callback) {
|
|
||||||
if (!this.isStarted()) {
|
|
||||||
return callback(notStarted('dial', this.state._state))
|
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof protocol === 'function') {
|
|
||||||
callback = protocol
|
|
||||||
protocol = undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
getPeerInfoRemote(peer, this)
|
|
||||||
.then(peerInfo => {
|
|
||||||
this._switch.dialFSM(peerInfo, protocol, callback)
|
|
||||||
}, callback)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -342,12 +316,28 @@ class Libp2p extends EventEmitter {
|
|||||||
}, callback)
|
}, callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
handle (protocol, handlerFunc, matchFunc) {
|
/**
|
||||||
this._switch.handle(protocol, handlerFunc, matchFunc)
|
* Registers the `handler` for each protocol
|
||||||
|
* @param {string[]|string} protocols
|
||||||
|
* @param {function({ stream:*, protocol:string })} handler
|
||||||
|
*/
|
||||||
|
handle (protocols, handler) {
|
||||||
|
protocols = Array.isArray(protocols) ? protocols : [protocols]
|
||||||
|
protocols.forEach(protocol => {
|
||||||
|
this.upgrader.protocols.set(protocol, handler)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
unhandle (protocol) {
|
/**
|
||||||
this._switch.unhandle(protocol)
|
* Removes the handler for each protocol. The protocol
|
||||||
|
* will no longer be supported on streams.
|
||||||
|
* @param {string[]|string} protocols
|
||||||
|
*/
|
||||||
|
unhandle (protocols) {
|
||||||
|
protocols = Array.isArray(protocols) ? protocols : [protocols]
|
||||||
|
protocols.forEach(protocol => {
|
||||||
|
this.upgrader.protocols.delete(protocol)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async _onStarting () {
|
async _onStarting () {
|
||||||
@ -373,21 +363,6 @@ class Libp2p extends EventEmitter {
|
|||||||
this.state('done')
|
this.state('done')
|
||||||
}
|
}
|
||||||
|
|
||||||
async _onStopping () {
|
|
||||||
// Start parallel tasks
|
|
||||||
try {
|
|
||||||
await this.transportManager.close()
|
|
||||||
} catch (err) {
|
|
||||||
if (err) {
|
|
||||||
log.error(err)
|
|
||||||
this.emit('error', err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// libp2p has stopped
|
|
||||||
this.state('done')
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles discovered peers. Each discovered peer will be emitted via
|
* Handles discovered peers. Each discovered peer will be emitted via
|
||||||
* the `peer:discovery` event. If auto dial is enabled for libp2p
|
* the `peer:discovery` event. If auto dial is enabled for libp2p
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const pull = require('pull-stream')
|
const pull = require('pull-stream')
|
||||||
const Connection = require('interface-connection').Connection
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const assert = require('assert')
|
const assert = require('assert')
|
||||||
|
|
||||||
const Errors = require('./errors')
|
const Errors = require('./errors')
|
||||||
|
@ -81,7 +81,7 @@ tests]([./test/pnet.node.js]).
|
|||||||
|
|
||||||
##### `switch.connection.addUpgrade()`
|
##### `switch.connection.addUpgrade()`
|
||||||
|
|
||||||
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
|
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) specification.
|
||||||
|
|
||||||
> **WIP**
|
> **WIP**
|
||||||
|
|
||||||
@ -151,7 +151,7 @@ a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will tak
|
|||||||
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
|
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
|
||||||
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
|
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
|
||||||
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
|
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
|
||||||
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
|
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) will be emitted.
|
||||||
- `close`: emitted when the connection has closed.
|
- `close`: emitted when the connection has closed.
|
||||||
|
|
||||||
### `switch.handle(protocol, handlerFunc, matchFunc)`
|
### `switch.handle(protocol, handlerFunc, matchFunc)`
|
||||||
@ -365,7 +365,7 @@ In order for a transport to be supported, it has to follow the [interface-transp
|
|||||||
|
|
||||||
### Connection upgrades
|
### Connection upgrades
|
||||||
|
|
||||||
Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
|
Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) spec. This design decision enables libp2p to have upgradable transports.
|
||||||
|
|
||||||
We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it.
|
We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it.
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const Connection = require('interface-connection').Connection
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const pull = require('pull-stream/pull')
|
const pull = require('pull-stream/pull')
|
||||||
const empty = require('pull-stream/sources/empty')
|
const empty = require('pull-stream/sources/empty')
|
||||||
const timeout = require('async/timeout')
|
const timeout = require('async/timeout')
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
const Connection = require('interface-connection').Connection
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
const pull = require('pull-stream/pull')
|
const pull = require('pull-stream/pull')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -13,14 +13,12 @@ class TransportManager {
|
|||||||
* @param {object} options
|
* @param {object} options
|
||||||
* @param {Libp2p} options.libp2p The Libp2p instance. It will be passed to the transports.
|
* @param {Libp2p} options.libp2p The Libp2p instance. It will be passed to the transports.
|
||||||
* @param {Upgrader} options.upgrader The upgrader to provide to the transports
|
* @param {Upgrader} options.upgrader The upgrader to provide to the transports
|
||||||
* @param {function(Connection)} options.onConnection Called whenever an incoming connection is received
|
|
||||||
*/
|
*/
|
||||||
constructor ({ libp2p, upgrader, onConnection }) {
|
constructor ({ libp2p, upgrader }) {
|
||||||
this.libp2p = libp2p
|
this.libp2p = libp2p
|
||||||
this.upgrader = upgrader
|
this.upgrader = upgrader
|
||||||
this._transports = new Map()
|
this._transports = new Map()
|
||||||
this._listeners = new Map()
|
this._listeners = new Map()
|
||||||
this.onConnection = onConnection
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,7 +43,9 @@ class TransportManager {
|
|||||||
})
|
})
|
||||||
|
|
||||||
this._transports.set(key, transport)
|
this._transports.set(key, transport)
|
||||||
this._listeners.set(key, [])
|
if (!this._listeners.has(key)) {
|
||||||
|
this._listeners.set(key, [])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -57,11 +57,13 @@ class TransportManager {
|
|||||||
for (const [key, listeners] of this._listeners) {
|
for (const [key, listeners] of this._listeners) {
|
||||||
log('closing listeners for %s', key)
|
log('closing listeners for %s', key)
|
||||||
while (listeners.length) {
|
while (listeners.length) {
|
||||||
tasks.push(listeners.pop().close())
|
const listener = listeners.pop()
|
||||||
|
tasks.push(listener.close())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.all(tasks)
|
await Promise.all(tasks)
|
||||||
|
log('all listeners closed')
|
||||||
this._listeners.clear()
|
this._listeners.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,8 +78,12 @@ class TransportManager {
|
|||||||
if (!transport) {
|
if (!transport) {
|
||||||
throw errCode(new Error(`No transport available for address ${String(ma)}`), codes.ERR_TRANSPORT_UNAVAILABLE)
|
throw errCode(new Error(`No transport available for address ${String(ma)}`), codes.ERR_TRANSPORT_UNAVAILABLE)
|
||||||
}
|
}
|
||||||
const conn = await transport.dial(ma, options)
|
|
||||||
return conn
|
try {
|
||||||
|
return await transport.dial(ma, options)
|
||||||
|
} catch (err) {
|
||||||
|
throw errCode(new Error('Transport dial failed'), codes.ERR_TRANSPORT_DIAL_FAILED, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
336
src/upgrader.js
Normal file
336
src/upgrader.js
Normal file
@ -0,0 +1,336 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:upgrader')
|
||||||
|
log.error = debug('libp2p:upgrader:error')
|
||||||
|
const Multistream = require('multistream-select')
|
||||||
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const pipe = require('it-pipe')
|
||||||
|
const errCode = require('err-code')
|
||||||
|
|
||||||
|
const { codes } = require('./errors')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef MultiaddrConnection
|
||||||
|
* @property {function} sink
|
||||||
|
* @property {AsyncIterator} source
|
||||||
|
* @property {*} conn
|
||||||
|
* @property {Multiaddr} remoteAddr
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef CryptoResult
|
||||||
|
* @property {*} conn A duplex iterable
|
||||||
|
* @property {PeerId} remotePeer
|
||||||
|
* @property {string} protocol
|
||||||
|
*/
|
||||||
|
|
||||||
|
class Upgrader {
|
||||||
|
/**
|
||||||
|
* @param {object} options
|
||||||
|
* @param {PeerId} options.localPeer
|
||||||
|
* @param {Map<string, Crypto>} options.cryptos
|
||||||
|
* @param {Map<string, Muxer>} options.muxers
|
||||||
|
*/
|
||||||
|
constructor ({ localPeer, cryptos, muxers, onConnectionEnd = () => {}, onConnection = () => {} }) {
|
||||||
|
this.localPeer = localPeer
|
||||||
|
this.cryptos = cryptos || new Map()
|
||||||
|
this.muxers = muxers || new Map()
|
||||||
|
this.protocols = new Map()
|
||||||
|
this.onConnection = onConnection
|
||||||
|
this.onConnectionEnd = onConnectionEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upgrades an inbound connection
|
||||||
|
* @async
|
||||||
|
* @param {MultiaddrConnection} maConn
|
||||||
|
* @returns {Promise<Connection>}
|
||||||
|
*/
|
||||||
|
async upgradeInbound (maConn) {
|
||||||
|
let encryptedConn
|
||||||
|
let remotePeer
|
||||||
|
let muxedConnection
|
||||||
|
let Muxer
|
||||||
|
let cryptoProtocol
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Encrypt the connection
|
||||||
|
({
|
||||||
|
conn: encryptedConn,
|
||||||
|
remotePeer,
|
||||||
|
protocol: cryptoProtocol
|
||||||
|
} = await this._encryptInbound(this.localPeer, maConn, this.cryptos))
|
||||||
|
|
||||||
|
// Multiplex the connection
|
||||||
|
;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Failed to upgrade inbound connection', err)
|
||||||
|
await maConn.close(err)
|
||||||
|
// TODO: We shouldn't throw here, as there isn't anything to catch the failure
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
|
||||||
|
log('Successfully upgraded inbound connection')
|
||||||
|
|
||||||
|
return this._createConnection({
|
||||||
|
cryptoProtocol,
|
||||||
|
direction: 'inbound',
|
||||||
|
maConn,
|
||||||
|
muxedConnection,
|
||||||
|
Muxer,
|
||||||
|
remotePeer
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upgrades an outbound connection
|
||||||
|
* @async
|
||||||
|
* @param {MultiaddrConnection} maConn
|
||||||
|
* @returns {Promise<Connection>}
|
||||||
|
*/
|
||||||
|
async upgradeOutbound (maConn) {
|
||||||
|
let remotePeerId
|
||||||
|
try {
|
||||||
|
remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId())
|
||||||
|
} catch (err) {
|
||||||
|
log.error('multiaddr did not contain a valid peer id', err)
|
||||||
|
}
|
||||||
|
|
||||||
|
let encryptedConn
|
||||||
|
let remotePeer
|
||||||
|
let muxedConnection
|
||||||
|
let cryptoProtocol
|
||||||
|
let Muxer
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Encrypt the connection
|
||||||
|
({
|
||||||
|
conn: encryptedConn,
|
||||||
|
remotePeer,
|
||||||
|
protocol: cryptoProtocol
|
||||||
|
} = await this._encryptOutbound(this.localPeer, maConn, remotePeerId, this.cryptos))
|
||||||
|
|
||||||
|
// Multiplex the connection
|
||||||
|
;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
|
||||||
|
} catch (err) {
|
||||||
|
log.error('Failed to upgrade outbound connection', err)
|
||||||
|
await maConn.close(err)
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
|
||||||
|
log('Successfully upgraded outbound connection')
|
||||||
|
|
||||||
|
return this._createConnection({
|
||||||
|
cryptoProtocol,
|
||||||
|
direction: 'outbound',
|
||||||
|
maConn,
|
||||||
|
muxedConnection,
|
||||||
|
Muxer,
|
||||||
|
remotePeer
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A convenience method for generating a new `Connection`
|
||||||
|
* @private
|
||||||
|
* @param {object} options
|
||||||
|
* @param {string} cryptoProtocol The crypto protocol that was negotiated
|
||||||
|
* @param {string} direction One of ['inbound', 'outbound']
|
||||||
|
* @param {MultiaddrConnection} maConn The transport layer connection
|
||||||
|
* @param {*} muxedConnection A duplex connection returned from multiplexer selection
|
||||||
|
* @param {Muxer} Muxer The muxer to be used for muxing
|
||||||
|
* @param {PeerId} remotePeer The peer the connection is with
|
||||||
|
* @returns {Connection}
|
||||||
|
*/
|
||||||
|
_createConnection ({
|
||||||
|
cryptoProtocol,
|
||||||
|
direction,
|
||||||
|
maConn,
|
||||||
|
muxedConnection,
|
||||||
|
Muxer,
|
||||||
|
remotePeer
|
||||||
|
}) {
|
||||||
|
// Create the muxer
|
||||||
|
const muxer = new Muxer({
|
||||||
|
// Run anytime a remote stream is created
|
||||||
|
onStream: async muxedStream => {
|
||||||
|
const mss = new Multistream.Listener(muxedStream)
|
||||||
|
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
|
||||||
|
log('%s: incoming stream opened on %s', direction, protocol)
|
||||||
|
connection.addStream(stream, protocol)
|
||||||
|
this._onStream({ stream, protocol })
|
||||||
|
},
|
||||||
|
// Run anytime a stream closes
|
||||||
|
onStreamEnd: muxedStream => {
|
||||||
|
connection.removeStream(muxedStream.id)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const newStream = async protocols => {
|
||||||
|
log('%s: starting new stream on %s', direction, protocols)
|
||||||
|
const muxedStream = muxer.newStream()
|
||||||
|
const mss = new Multistream.Dialer(muxedStream)
|
||||||
|
try {
|
||||||
|
const { stream, protocol } = await mss.select(protocols)
|
||||||
|
return { stream: { ...muxedStream, ...stream }, protocol }
|
||||||
|
} catch (err) {
|
||||||
|
log.error('could not create new stream', err)
|
||||||
|
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pipe all data through the muxer
|
||||||
|
pipe(muxedConnection, muxer, muxedConnection)
|
||||||
|
|
||||||
|
maConn.timeline.upgraded = Date.now()
|
||||||
|
const timelineProxy = new Proxy(maConn.timeline, {
|
||||||
|
set: (...args) => {
|
||||||
|
if (args[1] === 'close' && args[2]) {
|
||||||
|
this.onConnectionEnd(connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Reflect.set(...args)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create the connection
|
||||||
|
const connection = new Connection({
|
||||||
|
localAddr: maConn.localAddr,
|
||||||
|
remoteAddr: maConn.remoteAddr,
|
||||||
|
localPeer: this.localPeer,
|
||||||
|
remotePeer: remotePeer,
|
||||||
|
stat: {
|
||||||
|
direction,
|
||||||
|
timeline: timelineProxy,
|
||||||
|
multiplexer: Muxer.multicodec,
|
||||||
|
encryption: cryptoProtocol
|
||||||
|
},
|
||||||
|
newStream,
|
||||||
|
getStreams: () => muxer.streams,
|
||||||
|
close: err => maConn.close(err)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.onConnection(connection)
|
||||||
|
|
||||||
|
return connection
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Routes incoming streams to the correct handler
|
||||||
|
* @private
|
||||||
|
* @param {object} options
|
||||||
|
* @param {Stream} options.stream
|
||||||
|
* @param {string} protocol
|
||||||
|
*/
|
||||||
|
_onStream ({ stream, protocol }) {
|
||||||
|
const handler = this.protocols.get(protocol)
|
||||||
|
handler({ stream, protocol })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to encrypt the incoming `connection` with the provided `cryptos`.
|
||||||
|
* @private
|
||||||
|
* @async
|
||||||
|
* @param {PeerId} localPeer The initiators PeerInfo
|
||||||
|
* @param {*} connection
|
||||||
|
* @param {Map<string, Crypto>} cryptos
|
||||||
|
* @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used
|
||||||
|
*/
|
||||||
|
async _encryptInbound (localPeer, connection, cryptos) {
|
||||||
|
const mss = new Multistream.Listener(connection)
|
||||||
|
const protocols = Array.from(cryptos.keys())
|
||||||
|
log('selecting inbound crypto protocol', protocols)
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { stream, protocol } = await mss.handle(protocols)
|
||||||
|
const crypto = cryptos.get(protocol)
|
||||||
|
log('encrypting inbound connection...')
|
||||||
|
|
||||||
|
return {
|
||||||
|
...await crypto.secureInbound(localPeer, stream),
|
||||||
|
protocol
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
throw errCode(err, codes.ERR_ENCRYPTION_FAILED)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to encrypt the given `connection` with the provided `cryptos`.
|
||||||
|
* The first `Crypto` module to succeed will be used
|
||||||
|
* @private
|
||||||
|
* @async
|
||||||
|
* @param {PeerId} localPeer The initiators PeerInfo
|
||||||
|
* @param {*} connection
|
||||||
|
* @param {PeerId} remotePeerId
|
||||||
|
* @param {Map<string, Crypto>} cryptos
|
||||||
|
* @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used
|
||||||
|
*/
|
||||||
|
async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) {
|
||||||
|
const mss = new Multistream.Dialer(connection)
|
||||||
|
const protocols = Array.from(cryptos.keys())
|
||||||
|
log('selecting outbound crypto protocol', protocols)
|
||||||
|
|
||||||
|
try {
|
||||||
|
const { stream, protocol } = await mss.select(protocols)
|
||||||
|
const crypto = cryptos.get(protocol)
|
||||||
|
log('encrypting outbound connection to %j', remotePeerId)
|
||||||
|
|
||||||
|
return {
|
||||||
|
...await crypto.secureOutbound(localPeer, stream, remotePeerId),
|
||||||
|
protocol
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
throw errCode(err, codes.ERR_ENCRYPTION_FAILED)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Selects one of the given muxers via multistream-select. That
|
||||||
|
* muxer will be used for all future streams on the connection.
|
||||||
|
* @private
|
||||||
|
* @async
|
||||||
|
* @param {*} connection A basic duplex connection to multiplex
|
||||||
|
* @param {Map<string, Muxer>} muxers The muxers to attempt multiplexing with
|
||||||
|
* @returns {*} A muxed connection
|
||||||
|
*/
|
||||||
|
async _multiplexOutbound (connection, muxers) {
|
||||||
|
const dialer = new Multistream.Dialer(connection)
|
||||||
|
const protocols = Array.from(muxers.keys())
|
||||||
|
log('outbound selecting muxer %s', protocols)
|
||||||
|
try {
|
||||||
|
const { stream, protocol } = await dialer.select(protocols)
|
||||||
|
log('%s selected as muxer protocol', protocol)
|
||||||
|
const Muxer = muxers.get(protocol)
|
||||||
|
return { stream, Muxer }
|
||||||
|
} catch (err) {
|
||||||
|
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers support for one of the given muxers via multistream-select. The
|
||||||
|
* selected muxer will be used for all future streams on the connection.
|
||||||
|
* @private
|
||||||
|
* @async
|
||||||
|
* @param {*} connection A basic duplex connection to multiplex
|
||||||
|
* @param {Map<string, Muxer>} muxers The muxers to attempt multiplexing with
|
||||||
|
* @returns {*} A muxed connection
|
||||||
|
*/
|
||||||
|
async _multiplexInbound (connection, muxers) {
|
||||||
|
const listener = new Multistream.Listener(connection)
|
||||||
|
const protocols = Array.from(muxers.keys())
|
||||||
|
log('inbound handling muxers %s', protocols)
|
||||||
|
try {
|
||||||
|
const { stream, protocol } = await listener.handle(protocols)
|
||||||
|
const Muxer = muxers.get(protocol)
|
||||||
|
return { stream, Muxer }
|
||||||
|
} catch (err) {
|
||||||
|
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Upgrader
|
236
test/dialing/direct.node.js
Normal file
236
test/dialing/direct.node.js
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
const sinon = require('sinon')
|
||||||
|
const Transport = require('libp2p-tcp')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const PeerInfo = require('peer-info')
|
||||||
|
const delay = require('delay')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const pipe = require('it-pipe')
|
||||||
|
|
||||||
|
const Libp2p = require('../../src')
|
||||||
|
const Dialer = require('../../src/dialer')
|
||||||
|
const TransportManager = require('../../src/transport-manager')
|
||||||
|
const { codes: ErrorCodes } = require('../../src/errors')
|
||||||
|
|
||||||
|
const mockUpgrader = require('../utils/mockUpgrader')
|
||||||
|
const mockCrypto = require('../utils/mockCrypto')
|
||||||
|
const Peers = require('../fixtures/peers')
|
||||||
|
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
|
||||||
|
|
||||||
|
describe('Dialing (direct, TCP)', () => {
|
||||||
|
let remoteTM
|
||||||
|
let localTM
|
||||||
|
let remoteAddr
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
remoteTM = new TransportManager({
|
||||||
|
libp2p: {},
|
||||||
|
upgrader: mockUpgrader
|
||||||
|
})
|
||||||
|
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
|
||||||
|
localTM = new TransportManager({
|
||||||
|
libp2p: {},
|
||||||
|
upgrader: mockUpgrader
|
||||||
|
})
|
||||||
|
localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
|
||||||
|
await remoteTM.listen([listenAddr])
|
||||||
|
|
||||||
|
remoteAddr = remoteTM.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
await remoteTM.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
sinon.restore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a remote node via its multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
const connection = await dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a remote node via its stringified multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
const connection = await dialer.connectToMultiaddr(remoteAddr.toString())
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToMultiaddr(unsupportedAddr)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_UNAVAILABLE)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a given peer', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
const peerInfo = new PeerInfo(peerId)
|
||||||
|
peerInfo.multiaddrs.add(remoteAddr)
|
||||||
|
|
||||||
|
const connection = await dialer.connectToPeer(peerInfo)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
const peerInfo = new PeerInfo(peerId)
|
||||||
|
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToPeer(peerInfo)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should abort dials on queue task timeout', async () => {
|
||||||
|
const dialer = new Dialer({
|
||||||
|
transportManager: localTM,
|
||||||
|
timeout: 50
|
||||||
|
})
|
||||||
|
sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
|
||||||
|
expect(options.signal).to.exist()
|
||||||
|
expect(options.signal.aborted).to.equal(false)
|
||||||
|
expect(addr.toString()).to.eql(remoteAddr.toString())
|
||||||
|
await delay(60)
|
||||||
|
expect(options.signal.aborted).to.equal(true)
|
||||||
|
})
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should dial to the max concurrency', async () => {
|
||||||
|
const dialer = new Dialer({
|
||||||
|
transportManager: localTM,
|
||||||
|
concurrency: 2
|
||||||
|
})
|
||||||
|
|
||||||
|
const deferredDial = pDefer()
|
||||||
|
sinon.stub(localTM, 'dial').callsFake(async () => {
|
||||||
|
await deferredDial.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
// Add 3 dials
|
||||||
|
Promise.all([
|
||||||
|
dialer.connectToMultiaddr(remoteAddr),
|
||||||
|
dialer.connectToMultiaddr(remoteAddr),
|
||||||
|
dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
])
|
||||||
|
|
||||||
|
// Let the call stack run
|
||||||
|
await delay(0)
|
||||||
|
|
||||||
|
// We should have 2 in progress, and 1 waiting
|
||||||
|
expect(localTM.dial.callCount).to.equal(2)
|
||||||
|
expect(dialer.queue.pending).to.equal(2)
|
||||||
|
expect(dialer.queue.size).to.equal(1)
|
||||||
|
|
||||||
|
deferredDial.resolve()
|
||||||
|
|
||||||
|
// Let the call stack run
|
||||||
|
await delay(0)
|
||||||
|
// All dials should have executed
|
||||||
|
expect(localTM.dial.callCount).to.equal(3)
|
||||||
|
expect(dialer.queue.pending).to.equal(0)
|
||||||
|
expect(dialer.queue.size).to.equal(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('libp2p.dialer', () => {
|
||||||
|
let peerInfo
|
||||||
|
let remotePeerInfo
|
||||||
|
let libp2p
|
||||||
|
let remoteLibp2p
|
||||||
|
let remoteAddr
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
const [peerId, remotePeerId] = await Promise.all([
|
||||||
|
PeerId.createFromJSON(Peers[0]),
|
||||||
|
PeerId.createFromJSON(Peers[1])
|
||||||
|
])
|
||||||
|
|
||||||
|
peerInfo = new PeerInfo(peerId)
|
||||||
|
remotePeerInfo = new PeerInfo(remotePeerId)
|
||||||
|
|
||||||
|
remoteLibp2p = new Libp2p({
|
||||||
|
peerInfo: remotePeerInfo,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
|
||||||
|
|
||||||
|
await remoteLibp2p.transportManager.listen([listenAddr])
|
||||||
|
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
libp2p = null
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
await remoteLibp2p.stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should use the dialer for connecting', async () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
sinon.spy(libp2p.dialer, 'connectToMultiaddr')
|
||||||
|
|
||||||
|
const connection = await libp2p.dial(remoteAddr)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
|
||||||
|
expect(stream).to.exist()
|
||||||
|
expect(protocol).to.equal('/echo/1.0.0')
|
||||||
|
await connection.close()
|
||||||
|
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
|
||||||
|
})
|
||||||
|
})
|
211
test/dialing/direct.spec.js
Normal file
211
test/dialing/direct.spec.js
Normal file
@ -0,0 +1,211 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
const sinon = require('sinon')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const delay = require('delay')
|
||||||
|
const Transport = require('libp2p-websockets')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const PeerInfo = require('peer-info')
|
||||||
|
|
||||||
|
const { codes: ErrorCodes } = require('../../src/errors')
|
||||||
|
const Constants = require('../../src/constants')
|
||||||
|
const Dialer = require('../../src/dialer')
|
||||||
|
const TransportManager = require('../../src/transport-manager')
|
||||||
|
const Libp2p = require('../../src')
|
||||||
|
|
||||||
|
const Peers = require('../fixtures/peers')
|
||||||
|
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
|
||||||
|
const mockUpgrader = require('../utils/mockUpgrader')
|
||||||
|
const mockCrypto = require('../utils/mockCrypto')
|
||||||
|
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
|
||||||
|
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||||
|
|
||||||
|
describe('Dialing (direct, WebSockets)', () => {
|
||||||
|
let localTM
|
||||||
|
|
||||||
|
before(() => {
|
||||||
|
localTM = new TransportManager({
|
||||||
|
libp2p: {},
|
||||||
|
upgrader: mockUpgrader,
|
||||||
|
onConnection: () => {}
|
||||||
|
})
|
||||||
|
localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
sinon.restore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should have appropriate defaults', () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
expect(dialer.concurrency).to.equal(Constants.MAX_PARALLEL_DIALS)
|
||||||
|
expect(dialer.timeout).to.equal(Constants.DIAL_TIMEOUT)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a remote node via its multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
const connection = await dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a remote node via its stringified multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
const connection = await dialer.connectToMultiaddr(remoteAddr.toString())
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail to connect to an unsupported multiaddr', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToMultiaddr(unsupportedAddr)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TRANSPORT_DIAL_FAILED)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to connect to a given peer', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
const peerInfo = new PeerInfo(peerId)
|
||||||
|
peerInfo.multiaddrs.add(remoteAddr)
|
||||||
|
|
||||||
|
const connection = await dialer.connectToPeer(peerInfo)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail to connect to a given peer with unsupported addresses', async () => {
|
||||||
|
const dialer = new Dialer({ transportManager: localTM })
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
const peerInfo = new PeerInfo(peerId)
|
||||||
|
peerInfo.multiaddrs.add(unsupportedAddr)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToPeer(peerInfo)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_CONNECTION_FAILED)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should abort dials on queue task timeout', async () => {
|
||||||
|
const dialer = new Dialer({
|
||||||
|
transportManager: localTM,
|
||||||
|
timeout: 50
|
||||||
|
})
|
||||||
|
sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
|
||||||
|
expect(options.signal).to.exist()
|
||||||
|
expect(options.signal.aborted).to.equal(false)
|
||||||
|
expect(addr.toString()).to.eql(remoteAddr.toString())
|
||||||
|
await delay(60)
|
||||||
|
expect(options.signal.aborted).to.equal(true)
|
||||||
|
})
|
||||||
|
|
||||||
|
try {
|
||||||
|
await dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.satisfy((err) => err.code === ErrorCodes.ERR_TIMEOUT)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
expect.fail('Dial should have failed')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should dial to the max concurrency', async () => {
|
||||||
|
const dialer = new Dialer({
|
||||||
|
transportManager: localTM,
|
||||||
|
concurrency: 2
|
||||||
|
})
|
||||||
|
|
||||||
|
const deferredDial = pDefer()
|
||||||
|
sinon.stub(localTM, 'dial').callsFake(async () => {
|
||||||
|
await deferredDial.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
// Add 3 dials
|
||||||
|
Promise.all([
|
||||||
|
dialer.connectToMultiaddr(remoteAddr),
|
||||||
|
dialer.connectToMultiaddr(remoteAddr),
|
||||||
|
dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
])
|
||||||
|
|
||||||
|
// Let the call stack run
|
||||||
|
await delay(0)
|
||||||
|
|
||||||
|
// We should have 2 in progress, and 1 waiting
|
||||||
|
expect(localTM.dial.callCount).to.equal(2)
|
||||||
|
expect(dialer.queue.pending).to.equal(2)
|
||||||
|
expect(dialer.queue.size).to.equal(1)
|
||||||
|
|
||||||
|
deferredDial.resolve()
|
||||||
|
|
||||||
|
// Let the call stack run
|
||||||
|
await delay(0)
|
||||||
|
// All dials should have executed
|
||||||
|
expect(localTM.dial.callCount).to.equal(3)
|
||||||
|
expect(dialer.queue.pending).to.equal(0)
|
||||||
|
expect(dialer.queue.size).to.equal(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe.skip('libp2p.dialer', () => {
|
||||||
|
let peerInfo
|
||||||
|
let libp2p
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
const peerId = await PeerId.createFromJSON(Peers[0])
|
||||||
|
peerInfo = new PeerInfo(peerId)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
libp2p = null
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should create a dialer', () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(libp2p.dialer).to.exist()
|
||||||
|
// Ensure the dialer also has the transport manager
|
||||||
|
expect(libp2p.transportManager).to.equal(libp2p.dialer.transportManager)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should use the dialer for connecting', async () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo,
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
await connection.close()
|
||||||
|
})
|
||||||
|
})
|
13
test/node.js
13
test/node.js
@ -1,3 +1,14 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
require('./transports/transport-manager.node')
|
const glob = require('glob')
|
||||||
|
const path = require('path')
|
||||||
|
|
||||||
|
// Automatically require test files so we don't have to worry about adding new ones
|
||||||
|
glob('test/**/*.node.js', function (err, testPaths) {
|
||||||
|
if (err) throw err
|
||||||
|
if (testPaths.length < 1) throw new Error('Could not find any node test files')
|
||||||
|
|
||||||
|
testPaths.forEach(file => {
|
||||||
|
require(path.resolve(file))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
370
test/upgrading/upgrader.spec.js
Normal file
370
test/upgrading/upgrader.spec.js
Normal file
@ -0,0 +1,370 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
const sinon = require('sinon')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const PeerInfo = require('peer-info')
|
||||||
|
const pipe = require('it-pipe')
|
||||||
|
const { collect } = require('streaming-iterables')
|
||||||
|
const pSettle = require('p-settle')
|
||||||
|
const Transport = require('libp2p-websockets')
|
||||||
|
|
||||||
|
const Libp2p = require('../../src')
|
||||||
|
const Upgrader = require('../../src/upgrader')
|
||||||
|
const { codes } = require('../../src/errors')
|
||||||
|
|
||||||
|
const mockCrypto = require('../utils/mockCrypto')
|
||||||
|
const mockMultiaddrConnPair = require('../utils/mockMultiaddrConn')
|
||||||
|
const Peers = require('../fixtures/peers')
|
||||||
|
const addrs = [
|
||||||
|
multiaddr('/ip4/127.0.0.1/tcp/0'),
|
||||||
|
multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
]
|
||||||
|
|
||||||
|
describe('Upgrader', () => {
|
||||||
|
let localUpgrader
|
||||||
|
let remoteUpgrader
|
||||||
|
let localPeer
|
||||||
|
let remotePeer
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
([
|
||||||
|
localPeer,
|
||||||
|
remotePeer
|
||||||
|
] = await Promise.all([
|
||||||
|
PeerId.createFromJSON(Peers[0]),
|
||||||
|
PeerId.createFromJSON(Peers[1])
|
||||||
|
]))
|
||||||
|
|
||||||
|
localUpgrader = new Upgrader({
|
||||||
|
localPeer
|
||||||
|
})
|
||||||
|
remoteUpgrader = new Upgrader({
|
||||||
|
localPeer: remotePeer
|
||||||
|
})
|
||||||
|
|
||||||
|
localUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
|
||||||
|
remoteUpgrader.protocols.set('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
sinon.restore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should ignore a missing remote peer id', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
// Remove the peer id from the remote address
|
||||||
|
outbound.remoteAddr = outbound.remoteAddr.decapsulateCode(421)
|
||||||
|
|
||||||
|
const connections = await Promise.all([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(connections).to.have.length(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should upgrade with valid muxers and crypto', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
const connections = await Promise.all([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(connections).to.have.length(2)
|
||||||
|
|
||||||
|
const { stream, protocol } = await connections[0].newStream('/echo/1.0.0')
|
||||||
|
expect(protocol).to.equal('/echo/1.0.0')
|
||||||
|
|
||||||
|
const hello = Buffer.from('hello there!')
|
||||||
|
const result = await pipe(
|
||||||
|
[hello],
|
||||||
|
stream,
|
||||||
|
function toBuffer (source) {
|
||||||
|
return (async function * () {
|
||||||
|
for await (const val of source) yield val.slice()
|
||||||
|
})()
|
||||||
|
},
|
||||||
|
collect
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(result).to.eql([hello])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail if crypto fails', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const crypto = {
|
||||||
|
tag: '/insecure',
|
||||||
|
secureInbound: () => { throw new Error('Boom') },
|
||||||
|
secureOutbound: () => { throw new Error('Boom') }
|
||||||
|
}
|
||||||
|
|
||||||
|
const cryptos = new Map([[crypto.tag, crypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
// Wait for the results of each side of the connection
|
||||||
|
const results = await pSettle([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
// Ensure both sides fail
|
||||||
|
expect(results).to.have.length(2)
|
||||||
|
results.forEach(result => {
|
||||||
|
expect(result.isRejected).to.equal(true)
|
||||||
|
expect(result.reason.code).to.equal(codes.ERR_ENCRYPTION_FAILED)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail if muxers do not match', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxersLocal = new Map([['/muxer-local', Muxer]])
|
||||||
|
const muxersRemote = new Map([['/muxer-remote', Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxersLocal)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxersRemote)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
// Wait for the results of each side of the connection
|
||||||
|
const results = await pSettle([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
// Ensure both sides fail
|
||||||
|
expect(results).to.have.length(2)
|
||||||
|
results.forEach(result => {
|
||||||
|
expect(result.isRejected).to.equal(true)
|
||||||
|
expect(result.reason.code).to.equal(codes.ERR_MUXER_UNAVAILABLE)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should map getStreams and close methods', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
const connections = await Promise.all([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(connections).to.have.length(2)
|
||||||
|
|
||||||
|
// Create a few streams, at least 1 in each direction
|
||||||
|
await connections[0].newStream('/echo/1.0.0')
|
||||||
|
await connections[1].newStream('/echo/1.0.0')
|
||||||
|
await connections[0].newStream('/echo/1.0.0')
|
||||||
|
connections.forEach(conn => {
|
||||||
|
expect(conn.streams).to.have.length(3)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Verify the MultiaddrConnection close method is called
|
||||||
|
sinon.spy(inbound, 'close')
|
||||||
|
sinon.spy(outbound, 'close')
|
||||||
|
await Promise.all(connections.map(conn => conn.close()))
|
||||||
|
expect(inbound.close.callCount).to.equal(1)
|
||||||
|
expect(outbound.close.callCount).to.equal(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should call connection handlers', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
// Verify onConnection is called with the connection
|
||||||
|
sinon.spy(localUpgrader, 'onConnection')
|
||||||
|
sinon.spy(remoteUpgrader, 'onConnection')
|
||||||
|
const connections = await Promise.all([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
expect(connections).to.have.length(2)
|
||||||
|
expect(localUpgrader.onConnection.callCount).to.equal(1)
|
||||||
|
expect(localUpgrader.onConnection.getCall(0).args).to.eql([connections[0]])
|
||||||
|
expect(remoteUpgrader.onConnection.callCount).to.equal(1)
|
||||||
|
expect(remoteUpgrader.onConnection.getCall(0).args).to.eql([connections[1]])
|
||||||
|
|
||||||
|
// Verify onConnectionEnd is called with the connection
|
||||||
|
sinon.spy(localUpgrader, 'onConnectionEnd')
|
||||||
|
sinon.spy(remoteUpgrader, 'onConnectionEnd')
|
||||||
|
await Promise.all(connections.map(conn => conn.close()))
|
||||||
|
expect(localUpgrader.onConnectionEnd.callCount).to.equal(1)
|
||||||
|
expect(localUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[0]])
|
||||||
|
expect(remoteUpgrader.onConnectionEnd.callCount).to.equal(1)
|
||||||
|
expect(remoteUpgrader.onConnectionEnd.getCall(0).args).to.eql([connections[1]])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should fail to create a stream for an unsupported protocol', async () => {
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
|
||||||
|
|
||||||
|
const muxers = new Map([[Muxer.multicodec, Muxer]])
|
||||||
|
sinon.stub(localUpgrader, 'muxers').value(muxers)
|
||||||
|
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
|
||||||
|
|
||||||
|
const cryptos = new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
|
||||||
|
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
|
||||||
|
|
||||||
|
const connections = await Promise.all([
|
||||||
|
localUpgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
|
||||||
|
expect(connections).to.have.length(2)
|
||||||
|
|
||||||
|
const results = await pSettle([
|
||||||
|
connections[0].newStream('/unsupported/1.0.0'),
|
||||||
|
connections[1].newStream('/unsupported/1.0.0')
|
||||||
|
])
|
||||||
|
expect(results).to.have.length(2)
|
||||||
|
results.forEach(result => {
|
||||||
|
expect(result.isRejected).to.equal(true)
|
||||||
|
expect(result.reason.code).to.equal(codes.ERR_UNSUPPORTED_PROTOCOL)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('libp2p.upgrader', () => {
|
||||||
|
let peers
|
||||||
|
let libp2p
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
const ids = await Promise.all([
|
||||||
|
PeerId.createFromJSON(Peers[0]),
|
||||||
|
PeerId.createFromJSON(Peers[1])
|
||||||
|
])
|
||||||
|
peers = ids.map(peerId => new PeerInfo(peerId))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
libp2p = null
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should create an Upgrader', () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo: peers[0],
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(libp2p.upgrader).to.exist()
|
||||||
|
expect(libp2p.upgrader.muxers).to.eql(new Map([[Muxer.multicodec, Muxer]]))
|
||||||
|
expect(libp2p.upgrader.cryptos).to.eql(new Map([[mockCrypto.tag, mockCrypto]]))
|
||||||
|
// Ensure the transport manager also has the upgrader
|
||||||
|
expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to register and unregister a handler', () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo: peers[0],
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(libp2p.upgrader.protocols.size).to.equal(0)
|
||||||
|
|
||||||
|
const echoHandler = () => {}
|
||||||
|
libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler)
|
||||||
|
expect(libp2p.upgrader.protocols.size).to.equal(2)
|
||||||
|
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler)
|
||||||
|
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
|
||||||
|
|
||||||
|
libp2p.unhandle(['/echo/1.0.0'])
|
||||||
|
expect(libp2p.upgrader.protocols.size).to.equal(1)
|
||||||
|
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined)
|
||||||
|
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should emit connect and disconnect events', async () => {
|
||||||
|
const remotePeer = peers[1]
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
peerInfo: peers[0],
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const remoteUpgrader = new Upgrader({
|
||||||
|
localPeer: remotePeer.id,
|
||||||
|
muxers: new Map([[Muxer.multicodec, Muxer]]),
|
||||||
|
cryptos: new Map([[mockCrypto.tag, mockCrypto]])
|
||||||
|
})
|
||||||
|
|
||||||
|
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer: remotePeer.id })
|
||||||
|
|
||||||
|
// Spy on emit for easy verification
|
||||||
|
sinon.spy(libp2p, 'emit')
|
||||||
|
|
||||||
|
// Upgrade and check the connect event
|
||||||
|
const connections = await Promise.all([
|
||||||
|
libp2p.upgrader.upgradeOutbound(outbound),
|
||||||
|
remoteUpgrader.upgradeInbound(inbound)
|
||||||
|
])
|
||||||
|
expect(libp2p.emit.callCount).to.equal(1)
|
||||||
|
let [event, peerInfo] = libp2p.emit.getCall(0).args
|
||||||
|
expect(event).to.equal('peer:connect')
|
||||||
|
expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true)
|
||||||
|
|
||||||
|
// Close and check the disconnect event
|
||||||
|
await Promise.all(connections.map(conn => conn.close()))
|
||||||
|
expect(libp2p.emit.callCount).to.equal(2)
|
||||||
|
;([event, peerInfo] = libp2p.emit.getCall(1).args)
|
||||||
|
expect(event).to.equal('peer:disconnect')
|
||||||
|
expect(peerInfo.id.isEqual(remotePeer.id)).to.equal(true)
|
||||||
|
})
|
||||||
|
})
|
24
test/utils/mockCrypto.js
Normal file
24
test/utils/mockCrypto.js
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const Peers = require('../fixtures/peers')
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
tag: '/insecure',
|
||||||
|
secureInbound: (localPeer, stream) => {
|
||||||
|
return {
|
||||||
|
conn: stream,
|
||||||
|
remotePeer: localPeer
|
||||||
|
}
|
||||||
|
},
|
||||||
|
secureOutbound: async (localPeer, stream, remotePeer) => {
|
||||||
|
// Crypto should always return a remotePeer
|
||||||
|
if (!remotePeer) {
|
||||||
|
remotePeer = await PeerId.createFromJSON(Peers[0])
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
conn: stream,
|
||||||
|
remotePeer: remotePeer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
43
test/utils/mockMultiaddrConn.js
Normal file
43
test/utils/mockMultiaddrConn.js
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const duplexPair = require('it-pair/duplex')
|
||||||
|
const abortable = require('abortable-iterator')
|
||||||
|
const AbortController = require('abort-controller')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns both sides of a mocked MultiaddrConnection
|
||||||
|
* @param {object} options
|
||||||
|
* @param {Multiaddr[]} options.addrs Should contain two addresses for the local and remote peer
|
||||||
|
* @param {PeerId} options.remotePeer The peer that is being "dialed"
|
||||||
|
* @returns {{inbound:MultiaddrConnection, outbound:MultiaddrConnection}}
|
||||||
|
*/
|
||||||
|
module.exports = function mockMultiaddrConnPair ({ addrs, remotePeer }) {
|
||||||
|
const controller = new AbortController()
|
||||||
|
|
||||||
|
const [inbound, outbound] = duplexPair()
|
||||||
|
outbound.localAddr = addrs[0]
|
||||||
|
outbound.remoteAddr = addrs[1].encapsulate(`/p2p/${remotePeer.toB58String()}`)
|
||||||
|
outbound.timeline = {
|
||||||
|
open: Date.now()
|
||||||
|
}
|
||||||
|
outbound.close = () => {
|
||||||
|
outbound.timeline.close = Date.now()
|
||||||
|
controller.abort()
|
||||||
|
}
|
||||||
|
|
||||||
|
inbound.localAddr = addrs[1]
|
||||||
|
inbound.remoteAddr = addrs[0]
|
||||||
|
inbound.timeline = {
|
||||||
|
open: Date.now()
|
||||||
|
}
|
||||||
|
inbound.close = () => {
|
||||||
|
inbound.timeline.close = Date.now()
|
||||||
|
controller.abort()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the sources abortable so we can close them easily
|
||||||
|
inbound.source = abortable(inbound.source, controller.signal)
|
||||||
|
outbound.source = abortable(outbound.source, controller.signal)
|
||||||
|
|
||||||
|
return { inbound, outbound }
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user