refactor(async): add dialer and upgrader (#462)

* chore(deps): update connection and multistream

* feat: add basic dial support for addresses and peers

* test: automatically require all node test files

* fix: dont catch and log in the wrong place

* test: add direct spec test

fix: improve dial error consistency

* feat: add dial timeouts and concurrency

Queue timeouts will result in aborts of the dials

* chore: fix linting

* test: verify dialer defaults

* feat: add initial upgrader

* fix: add more test coverage and fix bugs

* feat: libp2p creates the upgrader

* feat: hook up handle to the upgrader

* feat: hook up the dialer to libp2p

test: add node dialer libp2p tests

* feat: add connection listeners to upgrader

* feat: emit connect and disconnect events

* chore: use libp2p-interfaces

* fix: address review feedback

* fix: correct import

* refactor: dedupe connection creation code
This commit is contained in:
Jacob Heun
2019-10-21 16:53:58 +02:00
parent b37ccc7279
commit 5e1dbc21a2
20 changed files with 1481 additions and 142 deletions

View File

@ -1,6 +1,6 @@
# js-libp2p-circuit
> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/interface-connection) interface for dial/listen.
> Node.js implementation of the Circuit module that libp2p uses, which implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) interface for dial/listen.
**Note**: git history prior to merging into js-libp2p can be found in the original repository, https://github.com/libp2p/js-libp2p-circuit.
@ -24,15 +24,18 @@ Prior to `libp2p-circuit` there was a rift in the IPFS network, were IPFS nodes
## Table of Contents
- [Install](#install)
- [npm](#npm)
- [Usage](#usage)
- [Example](#example)
- [This module uses `pull-streams`](#this-module-uses-pull-streams)
- [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams)
- [API](#api)
- [Contribute](#contribute)
- [License](#license)
- [js-libp2p-circuit](#js-libp2p-circuit)
- [Why?](#why)
- [libp2p-circuit and IPFS](#libp2p-circuit-and-ipfs)
- [Table of Contents](#table-of-contents)
- [Usage](#usage)
- [Example](#example)
- [Create dialer/listener](#create-dialerlistener)
- [Create `relay`](#create-relay)
- [This module uses `pull-streams`](#this-module-uses-pull-streams)
- [Converting `pull-streams` to Node.js Streams](#converting-pull-streams-to-nodejs-streams)
- [API](#api)
- [Implementation rational](#implementation-rational)
## Usage

View File

@ -6,7 +6,7 @@ const waterfall = require('async/waterfall')
const setImmediate = require('async/setImmediate')
const multiaddr = require('multiaddr')
const Connection = require('interface-connection').Connection
const { Connection } = require('libp2p-interfaces/src/connection')
const utilsFactory = require('./utils')
const StreamHandler = require('./stream-handler')

View File

@ -3,7 +3,7 @@
const setImmediate = require('async/setImmediate')
const EE = require('events').EventEmitter
const Connection = require('interface-connection').Connection
const { Connection } = require('libp2p-interfaces/src/connection')
const utilsFactory = require('./utils')
const PeerInfo = require('peer-info')
const proto = require('../protocol').CircuitRelay

12
src/constants.js Normal file
View File

@ -0,0 +1,12 @@
'use strict'
module.exports = {
DENY_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
DENY_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently denied
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
}

98
src/dialer.js Normal file
View File

@ -0,0 +1,98 @@
'use strict'
const multiaddr = require('multiaddr')
const errCode = require('err-code')
const { default: PQueue } = require('p-queue')
const AbortController = require('abort-controller')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
const { codes } = require('./errors')
const {
MAX_PARALLEL_DIALS,
DIAL_TIMEOUT
} = require('./constants')
class Dialer {
/**
* @constructor
* @param {object} options
* @param {TransportManager} options.transportManager
* @param {number} options.concurrency Number of max concurrent dials. Defaults to `MAX_PARALLEL_DIALS`
* @param {number} options.timeout How long a dial attempt is allowed to take. Defaults to `DIAL_TIMEOUT`
*/
constructor ({
transportManager,
concurrency = MAX_PARALLEL_DIALS,
timeout = DIAL_TIMEOUT
}) {
this.transportManager = transportManager
this.concurrency = concurrency
this.timeout = timeout
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
}
/**
* Connects to a given `Multiaddr`. `addr` should include the id of the peer being
* dialed, it will be used for encryption verification.
*
* @async
* @param {Multiaddr} addr The address to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddr (addr, options = {}) {
addr = multiaddr(addr)
let conn
let controller
if (!options.signal) {
controller = new AbortController()
options.signal = controller.signal
}
try {
conn = await this.queue.add(() => this.transportManager.dial(addr, options))
} catch (err) {
if (err.name === 'TimeoutError') {
controller.abort()
err.code = codes.ERR_TIMEOUT
}
log.error('Error dialing address %s,', addr, err)
throw err
}
return conn
}
/**
* Connects to a given `PeerInfo` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @async
* @param {PeerInfo} peerInfo The remote peer to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToPeer (peerInfo, options = {}) {
const addrs = peerInfo.multiaddrs.toArray()
for (const addr of addrs) {
try {
return await this.connectToMultiaddr(addr, options)
} catch (_) {
// The error is already logged, just move to the next addr
continue
}
}
const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED)
log.error(err)
throw err
}
}
module.exports = Dialer

View File

@ -8,10 +8,16 @@ exports.messages = {
exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE'
ERR_MUXER_UNAVAILABLE: 'ERR_MUXER_UNAVAILABLE',
ERR_TIMEOUT: 'ERR_TIMEOUT',
ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE',
ERR_TRANSPORT_DIAL_FAILED: 'ERR_TRANSPORT_DIAL_FAILED',
ERR_UNSUPPORTED_PROTOCOL: 'ERR_UNSUPPORTED_PROTOCOL'
}

View File

@ -13,20 +13,22 @@ const nextTick = require('async/nextTick')
const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
const Ping = require('./ping')
const ConnectionManager = require('./connection-manager')
const { emitFirst } = require('./util')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const { getPeerInfoRemote } = require('./get-peer-info')
const validateConfig = require('./config').validate
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const notStarted = (action, state) => {
return errCode(
@ -61,64 +63,49 @@ class Libp2p extends EventEmitter {
// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch.on('error', (...args) => this.emit('error', ...args))
this.stats = this._switch.stats
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.emit('peer:disconnect', peerInfo)
}
})
// Setup the transport manager
this.transportManager = new TransportManager({
libp2p: this,
// TODO: set the actual upgrader
upgrader: {
upgradeInbound: (maConn) => maConn,
upgradeOutbound: (maConn) => maConn
},
// TODO: Route incoming connections to a multiplex protocol router
onConnection: () => {}
upgrader: this.upgrader
})
this._modules.transport.forEach((Transport) => {
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
})
// Attach stream multiplexers
if (this._modules.streamMuxer) {
const muxers = this._modules.streamMuxer
muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer))
// If muxer exists
// we can use Identify
this._switch.connection.reuse()
// we can use Relay for listening/dialing
this._switch.connection.enableCircuitRelay(this._config.relay)
// Received incomming dial and muxer upgrade happened,
// reuse this muxed connection
this._switch.on('peer-mux-established', (peerInfo) => {
this.emit('peer:connect', peerInfo)
})
this._switch.on('peer-mux-closed', (peerInfo) => {
this.emit('peer:disconnect', peerInfo)
})
}
// Events for anytime connections are created/removed
this._switch.on('connection:start', (peerInfo) => {
this.emit('connection:start', peerInfo)
})
this._switch.on('connection:end', (peerInfo) => {
this.emit('connection:end', peerInfo)
})
// Attach crypto channels
if (this._modules.connEncryption) {
const cryptos = this._modules.connEncryption
cryptos.forEach((crypto) => {
this._switch.connection.crypto(crypto.tag, crypto.encrypt)
this.upgrader.cryptos.set(crypto.tag, crypto)
})
}
// Attach stream multiplexers
if (this._modules.streamMuxer) {
const muxers = this._modules.streamMuxer
muxers.forEach((muxer) => {
this.upgrader.muxers.set(muxer.multicodec, muxer)
})
}
this.dialer = new Dialer({
transportManager: this.transportManager
})
// Attach private network protector
if (this._modules.connProtector) {
this._switch.protector = this._modules.connProtector
@ -153,7 +140,8 @@ class Libp2p extends EventEmitter {
this.state = new FSM('STOPPED', {
STOPPED: {
start: 'STARTING',
stop: 'STOPPED'
stop: 'STOPPED',
done: 'STOPPED'
},
STARTING: {
done: 'STARTED',
@ -175,7 +163,6 @@ class Libp2p extends EventEmitter {
})
this.state.on('STOPPING', () => {
log('libp2p is stopping')
this._onStopping()
})
this.state.on('STARTED', () => {
log('libp2p has started')
@ -201,7 +188,7 @@ class Libp2p extends EventEmitter {
this._peerDiscovered = this._peerDiscovered.bind(this)
// promisify all instance methods
;['start', 'stop', 'dial', 'dialProtocol', 'dialFSM', 'hangUp', 'ping'].forEach(method => {
;['start', 'hangUp', 'ping'].forEach(method => {
this[method] = promisify(this[method], { context: this })
})
}
@ -234,13 +221,21 @@ class Libp2p extends EventEmitter {
/**
* Stop the libp2p node by closing its listeners and open connections
*
* @param {function(Error)} callback
* @async
* @returns {void}
*/
stop (callback = () => {}) {
emitFirst(this, ['error', 'stop'], callback)
async stop () {
this.state('stop')
try {
await this.transportManager.close()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
this.state('done')
}
isStarted () {
@ -252,11 +247,12 @@ class Libp2p extends EventEmitter {
* peer will be added to the nodes `PeerBook`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {function(Error)} callback
* @returns {void}
* @param {object} options
* @param {AbortSignal} [options.signal]
* @returns {Promise<Connection>}
*/
dial (peer, callback) {
this.dialProtocol(peer, null, callback)
dial (peer, options) {
return this.dialProtocol(peer, null, options)
}
/**
@ -264,50 +260,28 @@ class Libp2p extends EventEmitter {
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* and the `Connection` will be sent in the callback
*
* @async
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {string} protocol
* @param {function(Error, Connection)} callback
* @returns {void}
* @param {string[]|string} protocols
* @param {object} options
* @param {AbortSignal} [options.signal]
* @returns {Promise<Connection|*>}
*/
dialProtocol (peer, protocol, callback) {
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
async dialProtocol (peer, protocols, options) {
let connection
if (multiaddr.isMultiaddr(peer)) {
connection = await this.dialer.connectToMultiaddr(peer, options)
} else {
peer = await getPeerInfoRemote(peer, this)
connection = await this.dialer.connectToPeer(peer, options)
}
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
}
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.dial(peerInfo, protocol, callback)
}, callback)
}
/**
* Similar to `dial` and `dialProtocol`, but the callback will contain a
* Connection State Machine.
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {string} protocol
* @param {function(Error, ConnectionFSM)} callback
* @returns {void}
*/
dialFSM (peer, protocol, callback) {
if (!this.isStarted()) {
return callback(notStarted('dial', this.state._state))
}
if (typeof protocol === 'function') {
callback = protocol
protocol = undefined
}
getPeerInfoRemote(peer, this)
.then(peerInfo => {
this._switch.dialFSM(peerInfo, protocol, callback)
}, callback)
return connection
}
/**
@ -342,12 +316,28 @@ class Libp2p extends EventEmitter {
}, callback)
}
handle (protocol, handlerFunc, matchFunc) {
this._switch.handle(protocol, handlerFunc, matchFunc)
/**
* Registers the `handler` for each protocol
* @param {string[]|string} protocols
* @param {function({ stream:*, protocol:string })} handler
*/
handle (protocols, handler) {
protocols = Array.isArray(protocols) ? protocols : [protocols]
protocols.forEach(protocol => {
this.upgrader.protocols.set(protocol, handler)
})
}
unhandle (protocol) {
this._switch.unhandle(protocol)
/**
* Removes the handler for each protocol. The protocol
* will no longer be supported on streams.
* @param {string[]|string} protocols
*/
unhandle (protocols) {
protocols = Array.isArray(protocols) ? protocols : [protocols]
protocols.forEach(protocol => {
this.upgrader.protocols.delete(protocol)
})
}
async _onStarting () {
@ -373,21 +363,6 @@ class Libp2p extends EventEmitter {
this.state('done')
}
async _onStopping () {
// Start parallel tasks
try {
await this.transportManager.close()
} catch (err) {
if (err) {
log.error(err)
this.emit('error', err)
}
}
// libp2p has stopped
this.state('done')
}
/**
* Handles discovered peers. Each discovered peer will be emitted via
* the `peer:discovery` event. If auto dial is enabled for libp2p

View File

@ -1,7 +1,7 @@
'use strict'
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection
const { Connection } = require('libp2p-interfaces/src/connection')
const assert = require('assert')
const Errors = require('./errors')

View File

@ -81,7 +81,7 @@ tests]([./test/pnet.node.js]).
##### `switch.connection.addUpgrade()`
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) specification.
> **WIP**
@ -151,7 +151,7 @@ a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will tak
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) will be emitted.
- `close`: emitted when the connection has closed.
### `switch.handle(protocol, handlerFunc, matchFunc)`
@ -365,7 +365,7 @@ In order for a transport to be supported, it has to follow the [interface-transp
### Connection upgrades
Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) spec. This design decision enables libp2p to have upgradable transports.
We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it.

View File

@ -1,6 +1,6 @@
'use strict'
const Connection = require('interface-connection').Connection
const { Connection } = require('libp2p-interfaces/src/connection')
const pull = require('pull-stream/pull')
const empty = require('pull-stream/sources/empty')
const timeout = require('async/timeout')

View File

@ -1,6 +1,6 @@
'use strict'
const Connection = require('interface-connection').Connection
const { Connection } = require('libp2p-interfaces/src/connection')
const pull = require('pull-stream/pull')
/**

View File

@ -13,14 +13,12 @@ class TransportManager {
* @param {object} options
* @param {Libp2p} options.libp2p The Libp2p instance. It will be passed to the transports.
* @param {Upgrader} options.upgrader The upgrader to provide to the transports
* @param {function(Connection)} options.onConnection Called whenever an incoming connection is received
*/
constructor ({ libp2p, upgrader, onConnection }) {
constructor ({ libp2p, upgrader }) {
this.libp2p = libp2p
this.upgrader = upgrader
this._transports = new Map()
this._listeners = new Map()
this.onConnection = onConnection
}
/**
@ -45,7 +43,9 @@ class TransportManager {
})
this._transports.set(key, transport)
this._listeners.set(key, [])
if (!this._listeners.has(key)) {
this._listeners.set(key, [])
}
}
/**
@ -57,11 +57,13 @@ class TransportManager {
for (const [key, listeners] of this._listeners) {
log('closing listeners for %s', key)
while (listeners.length) {
tasks.push(listeners.pop().close())
const listener = listeners.pop()
tasks.push(listener.close())
}
}
await Promise.all(tasks)
log('all listeners closed')
this._listeners.clear()
}
@ -76,8 +78,12 @@ class TransportManager {
if (!transport) {
throw errCode(new Error(`No transport available for address ${String(ma)}`), codes.ERR_TRANSPORT_UNAVAILABLE)
}
const conn = await transport.dial(ma, options)
return conn
try {
return await transport.dial(ma, options)
} catch (err) {
throw errCode(new Error('Transport dial failed'), codes.ERR_TRANSPORT_DIAL_FAILED, err)
}
}
/**

336
src/upgrader.js Normal file
View File

@ -0,0 +1,336 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:upgrader')
log.error = debug('libp2p:upgrader:error')
const Multistream = require('multistream-select')
const { Connection } = require('libp2p-interfaces/src/connection')
const PeerId = require('peer-id')
const pipe = require('it-pipe')
const errCode = require('err-code')
const { codes } = require('./errors')
/**
* @typedef MultiaddrConnection
* @property {function} sink
* @property {AsyncIterator} source
* @property {*} conn
* @property {Multiaddr} remoteAddr
*/
/**
* @typedef CryptoResult
* @property {*} conn A duplex iterable
* @property {PeerId} remotePeer
* @property {string} protocol
*/
class Upgrader {
/**
* @param {object} options
* @param {PeerId} options.localPeer
* @param {Map<string, Crypto>} options.cryptos
* @param {Map<string, Muxer>} options.muxers
*/
constructor ({ localPeer, cryptos, muxers, onConnectionEnd = () => {}, onConnection = () => {} }) {
this.localPeer = localPeer
this.cryptos = cryptos || new Map()
this.muxers = muxers || new Map()
this.protocols = new Map()
this.onConnection = onConnection
this.onConnectionEnd = onConnectionEnd
}
/**
* Upgrades an inbound connection
* @async
* @param {MultiaddrConnection} maConn
* @returns {Promise<Connection>}
*/
async upgradeInbound (maConn) {
let encryptedConn
let remotePeer
let muxedConnection
let Muxer
let cryptoProtocol
try {
// Encrypt the connection
({
conn: encryptedConn,
remotePeer,
protocol: cryptoProtocol
} = await this._encryptInbound(this.localPeer, maConn, this.cryptos))
// Multiplex the connection
;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
} catch (err) {
log.error('Failed to upgrade inbound connection', err)
await maConn.close(err)
// TODO: We shouldn't throw here, as there isn't anything to catch the failure
throw err
}
log('Successfully upgraded inbound connection')
return this._createConnection({
cryptoProtocol,
direction: 'inbound',
maConn,
muxedConnection,
Muxer,
remotePeer
})
}
/**
* Upgrades an outbound connection
* @async
* @param {MultiaddrConnection} maConn
* @returns {Promise<Connection>}
*/
async upgradeOutbound (maConn) {
let remotePeerId
try {
remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId())
} catch (err) {
log.error('multiaddr did not contain a valid peer id', err)
}
let encryptedConn
let remotePeer
let muxedConnection
let cryptoProtocol
let Muxer
try {
// Encrypt the connection
({
conn: encryptedConn,
remotePeer,
protocol: cryptoProtocol
} = await this._encryptOutbound(this.localPeer, maConn, remotePeerId, this.cryptos))
// Multiplex the connection
;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
} catch (err) {
log.error('Failed to upgrade outbound connection', err)
await maConn.close(err)
throw err
}
log('Successfully upgraded outbound connection')
return this._createConnection({
cryptoProtocol,
direction: 'outbound',
maConn,
muxedConnection,
Muxer,
remotePeer
})
}
/**
* A convenience method for generating a new `Connection`
* @private
* @param {object} options
* @param {string} cryptoProtocol The crypto protocol that was negotiated
* @param {string} direction One of ['inbound', 'outbound']
* @param {MultiaddrConnection} maConn The transport layer connection
* @param {*} muxedConnection A duplex connection returned from multiplexer selection
* @param {Muxer} Muxer The muxer to be used for muxing
* @param {PeerId} remotePeer The peer the connection is with
* @returns {Connection}
*/
_createConnection ({
cryptoProtocol,
direction,
maConn,
muxedConnection,
Muxer,
remotePeer
}) {
// Create the muxer
const muxer = new Muxer({
// Run anytime a remote stream is created
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
connection.addStream(stream, protocol)
this._onStream({ stream, protocol })
},
// Run anytime a stream closes
onStreamEnd: muxedStream => {
connection.removeStream(muxedStream.id)
}
})
const newStream = async protocols => {
log('%s: starting new stream on %s', direction, protocols)
const muxedStream = muxer.newStream()
const mss = new Multistream.Dialer(muxedStream)
try {
const { stream, protocol } = await mss.select(protocols)
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err) {
log.error('could not create new stream', err)
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
}
}
// Pipe all data through the muxer
pipe(muxedConnection, muxer, muxedConnection)
maConn.timeline.upgraded = Date.now()
const timelineProxy = new Proxy(maConn.timeline, {
set: (...args) => {
if (args[1] === 'close' && args[2]) {
this.onConnectionEnd(connection)
}
return Reflect.set(...args)
}
})
// Create the connection
const connection = new Connection({
localAddr: maConn.localAddr,
remoteAddr: maConn.remoteAddr,
localPeer: this.localPeer,
remotePeer: remotePeer,
stat: {
direction,
timeline: timelineProxy,
multiplexer: Muxer.multicodec,
encryption: cryptoProtocol
},
newStream,
getStreams: () => muxer.streams,
close: err => maConn.close(err)
})
this.onConnection(connection)
return connection
}
/**
* Routes incoming streams to the correct handler
* @private
* @param {object} options
* @param {Stream} options.stream
* @param {string} protocol
*/
_onStream ({ stream, protocol }) {
const handler = this.protocols.get(protocol)
handler({ stream, protocol })
}
/**
* Attempts to encrypt the incoming `connection` with the provided `cryptos`.
* @private
* @async
* @param {PeerId} localPeer The initiators PeerInfo
* @param {*} connection
* @param {Map<string, Crypto>} cryptos
* @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used
*/
async _encryptInbound (localPeer, connection, cryptos) {
const mss = new Multistream.Listener(connection)
const protocols = Array.from(cryptos.keys())
log('selecting inbound crypto protocol', protocols)
try {
const { stream, protocol } = await mss.handle(protocols)
const crypto = cryptos.get(protocol)
log('encrypting inbound connection...')
return {
...await crypto.secureInbound(localPeer, stream),
protocol
}
} catch (err) {
throw errCode(err, codes.ERR_ENCRYPTION_FAILED)
}
}
/**
* Attempts to encrypt the given `connection` with the provided `cryptos`.
* The first `Crypto` module to succeed will be used
* @private
* @async
* @param {PeerId} localPeer The initiators PeerInfo
* @param {*} connection
* @param {PeerId} remotePeerId
* @param {Map<string, Crypto>} cryptos
* @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used
*/
async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) {
const mss = new Multistream.Dialer(connection)
const protocols = Array.from(cryptos.keys())
log('selecting outbound crypto protocol', protocols)
try {
const { stream, protocol } = await mss.select(protocols)
const crypto = cryptos.get(protocol)
log('encrypting outbound connection to %j', remotePeerId)
return {
...await crypto.secureOutbound(localPeer, stream, remotePeerId),
protocol
}
} catch (err) {
throw errCode(err, codes.ERR_ENCRYPTION_FAILED)
}
}
/**
* Selects one of the given muxers via multistream-select. That
* muxer will be used for all future streams on the connection.
* @private
* @async
* @param {*} connection A basic duplex connection to multiplex
* @param {Map<string, Muxer>} muxers The muxers to attempt multiplexing with
* @returns {*} A muxed connection
*/
async _multiplexOutbound (connection, muxers) {
const dialer = new Multistream.Dialer(connection)
const protocols = Array.from(muxers.keys())
log('outbound selecting muxer %s', protocols)
try {
const { stream, protocol } = await dialer.select(protocols)
log('%s selected as muxer protocol', protocol)
const Muxer = muxers.get(protocol)
return { stream, Muxer }
} catch (err) {
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
}
}
/**
* Registers support for one of the given muxers via multistream-select. The
* selected muxer will be used for all future streams on the connection.
* @private
* @async
* @param {*} connection A basic duplex connection to multiplex
* @param {Map<string, Muxer>} muxers The muxers to attempt multiplexing with
* @returns {*} A muxed connection
*/
async _multiplexInbound (connection, muxers) {
const listener = new Multistream.Listener(connection)
const protocols = Array.from(muxers.keys())
log('inbound handling muxers %s', protocols)
try {
const { stream, protocol } = await listener.handle(protocols)
const Muxer = muxers.get(protocol)
return { stream, Muxer }
} catch (err) {
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
}
}
}
module.exports = Upgrader