chore: move stats folder and delete old switch code

This commit is contained in:
Jacob Heun 2019-12-08 11:02:47 +01:00
parent 8c6ad79630
commit 9b10e09cc0
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
23 changed files with 0 additions and 3113 deletions

View File

@ -1,423 +0,0 @@
libp2p-switch JavaScript implementation
======================================
> libp2p-switch is a dialer machine, it leverages the multiple libp2p transports, stream muxers, crypto channels and other connection upgrades to dial to peers in the libp2p network. It also supports Protocol Multiplexing through a multicodec and multistream-select handshake.
**Note**: git history prior to merging into js-libp2p can be found in the original repository, https://github.com/libp2p/js-libp2p-switch.
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [Create a libp2p switch](#create-a-libp2p-switch)
- [API](#api)
- [`switch.connection`](#switchconnection)
- [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback)
- [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback)
- [`switch.handle(protocol, handlerFunc, matchFunc)`](#switchhandleprotocol-handlerfunc-matchfunc)
- [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback)
- [`switch.start(callback)`](#switchstartcallback)
- [`switch.stop(callback)`](#switchstopcallback)
- [`switch.stats`](#stats-api)
- [`switch.unhandle(protocol)`](#switchunhandleprotocol)
- [Internal Transports API](#internal-transports-api)
- [Design Notes](#design-notes)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
- [Notes](#notes)
- [Contribute](#contribute)
- [License](#license)
## Install
```bash
> npm install libp2p-switch --save
```
## Usage
### Create a libp2p Switch
```JavaScript
const switch = require('libp2p-switch')
const sw = new switch(peerInfo , peerBook [, options])
```
If defined, `options` should be an object with the following keys and respective values:
- `denyTTL`: - number of ms a peer should not be dialable to after it errors. Each successive deny will increase the TTL from the base value. Defaults to 5 minutes
- `denyAttempts`: - number of times a peer can be denied before they are permanently denied. Defaults to 5.
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `100`
- `maxColdCalls`: - number of queued cold calls that are allowed. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
- `computeThrottleTimeout`: Throttle timeout, in miliseconds. Defaults to `2000`,
- `movingAverageIntervals`: Array containin the intervals, in miliseconds, for which moving averages are calculated. Defaults to:
```js
[
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
]
```
### Private Networks
libp2p-switch supports private networking. In order to enabled private networks, the `switch.protector` must be
set and must contain a `protect` method. You can see an example of this in the [private network
tests]([./test/pnet.node.js]).
## API
- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
- peerBook is a [PeerBook](https://github.com/libp2p/js-peer-book) object that stores all the known peers.
### `switch.connection`
##### `switch.connection.addUpgrade()`
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) specification.
> **WIP**
##### `switch.connection.addStreamMuxer(muxer)`
Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.
- `muxer`
##### `switch.connection.reuse()`
Enable the identify protocol.
##### `switch.connection.crypto([tag, encrypt])`
Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.
You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this
```js
const secio = require('libp2p-secio')
switch.connection.crypto(secio.tag, secio.encrypt)
```
##### `switch.connection.enableCircuitRelay(options, callback)`
Enable circuit relaying.
- `options`
- enabled - activates relay dialing and listening functionality
- hop - an object with two properties
- enabled - enables circuit relaying
- active - is it an active or passive relay (default false)
- `callback`
### `switch.dial(peer, protocol, callback)`
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `protocol`
- `callback`
### `switch.dialFSM(peer, protocol, callback)`
works like dial, but calls back with a [Connection State Machine](#connection-state-machine)
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)
#### Connection State Machine
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.
### `switch.dialer.connect(peer, options, callback)`
a low priority dial to the provided peer. Calls to `dial` and `dialFSM` will take priority. This should be used when an application only wishes to establish connections to new peers, such as during peer discovery when there is a low peer count. Currently, anything greater than the HIGH_PRIORITY (10) will be placed into the cold call queue, and anything less than or equal to the HIGH_PRIORITY will be added to the normal queue.
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `options`: Optional
- `options.priority`: Number of the priority of the dial, defaults to 20.
- `options.useFSM`: Boolean of whether or not to callback with a [Connection State Machine](#connection-state-machine)
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)
##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) will be emitted.
- `close`: emitted when the connection has closed.
### `switch.handle(protocol, handlerFunc, matchFunc)`
Handle a new protocol.
- `protocol`
- `handlerFunc` - function called when we receive a dial on `protocol. Signature must be `function (protocol, conn) {}`
- `matchFunc` - matchFunc for multistream-select
### `switch.hangUp(peer, callback)`
Hang up the muxed connection we have with the peer.
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `callback`
### `switch.on('error', (err) => {})`
Emitted when the switch encounters an error.
- `err`: instance of [Error][]
### `switch.on('peer-mux-established', (peer) => {})`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.
### `switch.on('peer-mux-closed', (peer) => {})`
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection with.
### `switch.on('connection:start', (peer) => {})`
This will be triggered anytime a new connection is created.
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just started a connection with.
### `switch.on('connection:end', (peer) => {})`
This will be triggered anytime an existing connection, regardless of state, is removed from the switch's internal connection tracking.
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a connection with.
### `switch.on('start', () => {})`
Emitted when the switch has successfully started.
### `switch.on('stop', () => {})`
Emitted when the switch has successfully stopped.
### `switch.start(callback)`
Start listening on all added transports that are available on the current `peerInfo`.
### `switch.stop(callback)`
Close all the listeners and muxers.
- `callback`
### Stats API
##### `switch.stats.emit('update')`
Every time any stat value changes, this object emits an `update` event.
#### Global stats
##### `switch.stats.global.snapshot`
Should return a stats snapshot, which is an object containing the following keys and respective values:
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
##### `switch.stats.global.movingAverages`
Returns an object containing the following keys:
- dataSent
- dataReceived
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
#### Per-transport stats
##### `switch.stats.transports()`
Returns an array containing the tags (string) for each observed transport.
##### `switch.stats.forTransport(transportTag).snapshot`
Should return a stats snapshot, which is an object containing the following keys and respective values:
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
##### `switch.stats.forTransport(transportTag).movingAverages`
Returns an object containing the following keys:
dataSent
dataReceived
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
#### Per-protocol stats
##### `switch.stats.protocols()`
Returns an array containing the tags (string) for each observed protocol.
##### `switch.stats.forProtocol(protocolTag).snapshot`
Should return a stats snapshot, which is an object containing the following keys and respective values:
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
##### `switch.stats.forProtocol(protocolTag).movingAverages`
Returns an object containing the following keys:
- dataSent
- dataReceived
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
#### Per-peer stats
##### `switch.stats.peers()`
Returns an array containing the peerIDs (B58-encoded string) for each observed peer.
##### `switch.stats.forPeer(peerId:String).snapshot`
Should return a stats snapshot, which is an object containing the following keys and respective values:
- dataSent: amount of bytes sent, [Big](https://github.com/MikeMcl/big.js#readme) number
- dataReceived: amount of bytes received, [Big](https://github.com/MikeMcl/big.js#readme) number
##### `switch.stats.forPeer(peerId:String).movingAverages`
Returns an object containing the following keys:
- dataSent
- dataReceived
Each one of them contains an object that has a key for each interval (`60000`, `300000` and `900000` miliseconds).
Each one of these values is [an exponential moving-average instance](https://github.com/pgte/moving-average#readme).
#### Stats update interval
Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.
### `switch.unhandle(protocol)`
Unhandle a protocol.
- `protocol`
### Internal Transports API
##### `switch.transport.add(key, transport, options)`
libp2p-switch expects transports that implement [interface-transport](https://github.com/libp2p/interface-transport). For example [libp2p-tcp](https://github.com/libp2p/js-libp2p-tcp).
- `key` - the transport identifier.
- `transport` -
- `options` -
##### `switch.transport.dial(key, multiaddrs, callback)`
Dial to a peer on a specific transport.
- `key`
- `multiaddrs`
- `callback`
##### `switch.transport.listen(key, options, handler, callback)`
Set a transport to start listening mode.
- `key`
- `options`
- `handler`
- `callback`
##### `switch.transport.close(key, callback)`
Close the listeners of a given transport.
- `key`
- `callback`
## Design Notes
### Multitransport
libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through.
In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/libp2p/interface-transport) spec.
### Connection upgrades
Each connection in libp2p follows the [interface-connection](https://github.com/libp2p/js-interfaces/tree/master/src/connection) spec. This design decision enables libp2p to have upgradable transports.
We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it.
Types of upgrades to a connection:
- encrypted channel (with TLS for e.g)
- congestion flow (some transports don't have it by default)
- multipath (open several connections and abstract it as a single connection)
- simulcast (still really thinking this one through, it might be interesting to send a packet through different connections under some hard network circumstances)
- stream-muxer - this a special case, because once we upgrade a connection to a stream-muxer, we can open more streams (multiplex them) on a single stream, also enabling us to reuse the underlying dialed transport
We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol)
### Identify
Identify is a protocol that switchs mounts on top of itself, to identify the connections between any two peers. E.g:
- a) peer A dials a conn to peer B
- b) that conn gets upgraded to a stream multiplexer that both peers agree
- c) peer B executes de identify protocol
- d) peer B now can open streams to peer A, knowing which is the
identity of peer A
In addition to this, we also share the "observed addresses" by the other peer, which is extremely useful information for different kinds of network topologies.
### Notes
To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as:
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex
- transport - something that as a dial/listen interface and return objs that implement a connection interface
### This module uses `pull-streams`
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
You can learn more about pull-streams at:
- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)
#### Converting `pull-streams` to Node.js Streams
If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/pull-stream/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:
```js
const pullToStream = require('pull-stream-to-stream')
const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```
To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.

View File

@ -1,126 +0,0 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const withIs = require('class-is')
class BaseConnection extends EventEmitter {
constructor ({ _switch, name }) {
super()
this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
this.log.error = debug(`libp2p:conn:${name}:error`)
}
/**
* Puts the state into its disconnecting flow
*
* @param {Error} err Will be emitted if provided
* @returns {void}
*/
close (err) {
if (this._state._state === 'DISCONNECTING') return
this.log('closing connection to %s', this.theirB58Id)
if (err && this._events.error) {
this.emit('error', err)
}
this._state('disconnect')
}
emit (eventName, ...args) {
if (eventName === 'error' && !this._events.error) {
this.log.error(...args)
} else {
super.emit(eventName, ...args)
}
}
/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}
/**
* Puts the state into encrypting mode
*
* @returns {void}
*/
encrypt () {
this._state('encrypt')
}
/**
* Puts the state into privatizing mode
*
* @returns {void}
*/
protect () {
this._state('privatize')
}
/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}
/**
* Event handler for disconnected.
*
* @fires BaseConnection#close
* @returns {void}
*/
_onDisconnected () {
this.switch.connection.remove(this)
this.log('disconnected from %s', this.theirB58Id)
this.emit('close')
this.removeAllListeners()
}
/**
* Event handler for privatized
*
* @fires BaseConnection#private
* @returns {void}
*/
_onPrivatized () {
this.emit('private', this.conn)
}
/**
* Wraps this.conn with the Switch.protector for private connections
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onPrivatizing () {
if (!this.switch.protector) {
return this._state('done')
}
this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
return this.close(err)
}
this.log('successfully privatized conn to %s', this.theirB58Id)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
}
}
module.exports = withIs(BaseConnection, {
className: 'BaseConnection',
symbolName: 'libp2p-switch/BaseConnection'
})

View File

@ -1,47 +0,0 @@
'use strict'
const debug = require('debug')
const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')
function listener (_switch) {
const log = debug('libp2p:switch:listener')
/**
* Takes a transport key and returns a connection handler function
*
* @param {string} transportKey The key of the transport to handle connections for
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return function (transportKey, handler) {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return function (conn) {
log('received incoming connection for transport %s', transportKey)
conn.getPeerInfo((_, peerInfo) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
const connFSM = new IncomingConnection({ connection, _switch, transportKey, peerInfo })
connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())
connFSM.protect()
})
}
}
}
module.exports = listener

View File

@ -1,115 +0,0 @@
'use strict'
const FSM = require('fsm-event')
const multistream = require('multistream-select')
const withIs = require('class-is')
const BaseConnection = require('./base')
class IncomingConnectionFSM extends BaseConnection {
constructor ({ connection, _switch, transportKey, peerInfo }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = peerInfo || null
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
this.msListener = new multistream.Listener()
this._state = FSM('DIALED', {
DISCONNECTED: {
disconnect: 'DISCONNECTED'
},
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
ENCRYPTING: { // Encrypting the base connection
done: 'ENCRYPTED',
disconnect: 'DISCONNECTING'
},
ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting
upgrade: 'UPGRADING',
disconnect: 'DISCONNECTING'
},
UPGRADING: { // Attempting to upgrade the connection with muxers
done: 'MUXED'
},
MUXED: {
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED'
}
})
this._state.on('DISCONNECTED', () => this._onDisconnected())
this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log('successfully encrypted connection to %s', this.theirB58Id || 'unknown peer')
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log('successfully muxed connection to %s', this.theirB58Id || 'unknown peer')
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
this._state('done')
})
}
/**
* Attempts to encrypt `this.conn` with the Switch's crypto.
*
* @private
* @fires IncomingConnectionFSM#error
* @returns {void}
*/
_onEncrypting () {
this.log('encrypting connection via %s', this.switch.crypto.tag)
this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
if (err) {
return this.close(err)
}
this.conn.getPeerInfo((_, peerInfo) => {
this.theirPeerInfo = peerInfo
this._state('done')
})
})
}, null)
// Start handling the connection
this.msListener.handle(this.conn, (err) => {
if (err) {
this.emit('crypto handshaking failed', err)
}
})
}
_onUpgrading () {
this.log('adding the protocol muxer to the connection')
this.protocolMuxer(this.conn, this.msListener)
this._state('done')
}
}
module.exports = withIs(IncomingConnectionFSM, {
className: 'IncomingConnectionFSM',
symbolName: 'libp2p-switch/IncomingConnectionFSM'
})

View File

@ -1,498 +0,0 @@
'use strict'
const FSM = require('fsm-event')
const Circuit = require('../../circuit')
const multistream = require('multistream-select')
const withIs = require('class-is')
const BaseConnection = require('./base')
const parallel = require('async/parallel')
const nextTick = require('async/nextTick')
const identify = require('../../identify')
const errCode = require('err-code')
const { msHandle, msSelect, identifyDialer } = require('../utils')
const observeConnection = require('../observe-connection')
const {
CONNECTION_FAILED,
DIAL_SELF,
INVALID_STATE_TRANSITION,
NO_TRANSPORTS_REGISTERED,
maybeUnexpectedEnd
} = require('../errors')
/**
* @typedef {Object} ConnectionOptions
* @property {Switch} _switch Our switch instance
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
* @property {Muxer} muxer Optional - A muxed connection
* @property {Connection} conn Optional - The base connection
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
*/
/**
* ConnectionFSM handles the complex logic of managing a connection
* between peers. ConnectionFSM is internally composed of a state machine
* to help improve the usability and debuggability of connections. The
* state machine also helps to improve the ability to handle dial backoff,
* coalescing dials and dial locks.
*/
class ConnectionFSM extends BaseConnection {
/**
* @param {ConnectionOptions} connectionOptions
* @constructor
*/
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
super({
_switch,
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.theirPeerInfo = peerInfo
this.theirB58Id = this.theirPeerInfo.id.toB58String()
this.conn = conn // The base connection
this.muxer = muxer // The upgraded/muxed connection
let startState = 'DISCONNECTED'
if (this.muxer) {
startState = 'MUXED'
}
this._state = FSM(startState, {
DISCONNECTED: { // No active connections exist for the peer
dial: 'DIALING',
disconnect: 'DISCONNECTED',
done: 'DISCONNECTED'
},
DIALING: { // Creating an initial connection
abort: 'ABORTED',
// emit events for different transport dials?
done: 'DIALED',
error: 'ERRORED',
disconnect: 'DISCONNECTING'
},
DIALED: { // Base connection to peer established
encrypt: 'ENCRYPTING',
privatize: 'PRIVATIZING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
abort: 'ABORTED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
ENCRYPTING: { // Encrypting the base connection
done: 'ENCRYPTED',
error: 'ERRORED',
disconnect: 'DISCONNECTING'
},
ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting
upgrade: 'UPGRADING',
disconnect: 'DISCONNECTING'
},
UPGRADING: { // Attempting to upgrade the connection with muxers
stop: 'CONNECTED', // If we cannot mux, stop upgrading
done: 'MUXED',
error: 'ERRORED',
disconnect: 'DISCONNECTING'
},
MUXED: {
disconnect: 'DISCONNECTING'
},
CONNECTED: { // A non muxed connection is established
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED',
disconnect: 'DISCONNECTING'
},
ABORTED: { }, // A severe event occurred
ERRORED: { // An error occurred, but future dials may be allowed
disconnect: 'DISCONNECTING' // There could be multiple options here, but this is a likely action
}
})
this._state.on('DISCONNECTED', () => this._onDisconnected())
this._state.on('DIALING', () => this._onDialing())
this._state.on('DIALED', () => this._onDialed())
this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log('successfully encrypted connection to %s', this.theirB58Id)
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log('successfully muxed connection to %s', this.theirB58Id)
delete this.switch.conns[this.theirB58Id]
this.emit('muxed', this.muxer)
})
this._state.on('CONNECTED', () => {
this.log('unmuxed connection opened to %s', this.theirB58Id)
this.emit('unmuxed', this.conn)
})
this._state.on('DISCONNECTING', () => this._onDisconnecting())
this._state.on('ABORTED', () => this._onAborted())
this._state.on('ERRORED', () => this._onErrored())
this._state.on('error', (err) => this._onStateError(err))
}
/**
* Puts the state into dialing mode
*
* @fires ConnectionFSM#Error May emit a DIAL_SELF error
* @returns {void}
*/
dial () {
if (this.theirB58Id === this.ourPeerInfo.id.toB58String()) {
return this.emit('error', DIAL_SELF())
} else if (this.getState() === 'DIALING') {
return this.log('attempted to dial while already dialing, ignoring')
}
this._state('dial')
}
/**
* Initiates a handshake for the given protocol
*
* @param {string} protocol The protocol to negotiate
* @param {function(Error, Connection)} callback
* @returns {void}
*/
shake (protocol, callback) {
// If there is no protocol set yet, don't perform the handshake
if (!protocol) {
return callback(null, null)
}
if (this.muxer && this.muxer.newStream) {
return this.muxer.newStream((err, stream) => {
if (err) {
return callback(err, null)
}
this.log('created new stream to %s', this.theirB58Id)
this._protocolHandshake(protocol, stream, callback)
})
}
this._protocolHandshake(protocol, this.conn, callback)
}
/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}
/**
* Event handler for dialing. Transitions state when successful.
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onDialing () {
this.log('dialing %s', this.theirB58Id)
if (!this.switch.hasTransports()) {
return this.close(NO_TRANSPORTS_REGISTERED())
}
const tKeys = this.switch.availableTransports(this.theirPeerInfo)
const circuitEnabled = Boolean(this.switch.transports[Circuit.tag])
if (circuitEnabled && !tKeys.includes(Circuit.tag)) {
tKeys.push(Circuit.tag)
}
const nextTransport = (key) => {
const transport = key
if (!transport) {
if (!circuitEnabled) {
return this.close(
CONNECTION_FAILED(`Circuit not enabled and all transports failed to dial peer ${this.theirB58Id}!`)
)
}
return this.close(
CONNECTION_FAILED(`No available transports to dial peer ${this.theirB58Id}!`)
)
}
if (transport === Circuit.tag) {
this.theirPeerInfo.multiaddrs.add(`/p2p-circuit/p2p/${this.theirB58Id}`)
}
this.log('dialing transport %s', transport)
this.switch.transport.dial(transport, this.theirPeerInfo, (errors, _conn) => {
if (errors) {
this.emit('error:connection_attempt_failed', errors)
this.log(errors)
return nextTransport(tKeys.shift())
}
this.conn = observeConnection(transport, null, _conn, this.switch.observer)
this._state('done')
})
}
nextTransport(tKeys.shift())
}
/**
* Once a connection has been successfully dialed, the connection
* will be privatized or encrypted depending on the presence of the
* Switch.protector.
*
* @returns {void}
*/
_onDialed () {
this.log('successfully dialed %s', this.theirB58Id)
this.emit('connected', this.conn)
}
/**
* Event handler for disconnecting. Handles any needed cleanup
*
* @returns {void}
*/
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))
delete this.switch.conns[this.theirB58Id]
const tasks = []
// Clean up stored connections
if (this.muxer) {
tasks.push((cb) => {
this.muxer.end(() => {
delete this.muxer
cb()
})
})
}
// If we have the base connection, abort it
// Ignore abort errors, since we're closing
if (this.conn) {
try {
this.conn.source.abort()
} catch (_) { }
delete this.conn
}
parallel(tasks, () => {
this._state('done')
})
}
/**
* Attempts to encrypt `this.conn` with the Switch's crypto.
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onEncrypting () {
const msDialer = new multistream.Dialer()
msDialer.handle(this.conn, (err) => {
if (err) {
return this.close(maybeUnexpectedEnd(err))
}
this.log('selecting crypto %s to %s', this.switch.crypto.tag, this.theirB58Id)
msDialer.select(this.switch.crypto.tag, (err, _conn) => {
if (err) {
return this.close(maybeUnexpectedEnd(err))
}
const observedConn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
const encryptedConn = this.switch.crypto.encrypt(this.ourPeerInfo.id, observedConn, this.theirPeerInfo.id, (err) => {
if (err) {
return this.close(err)
}
this.conn = encryptedConn
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
})
})
}
/**
* Iterates over each Muxer on the Switch and attempts to upgrade
* the given `connection`. Successful muxed connections will be stored
* on the Switch.muxedConns with `b58Id` as their key for future reference.
*
* @private
* @returns {void}
*/
_onUpgrading () {
const muxers = Object.keys(this.switch.muxers)
this.log('upgrading connection to %s', this.theirB58Id)
if (muxers.length === 0) {
return this._state('stop')
}
const msDialer = new multistream.Dialer()
msDialer.handle(this.conn, (err) => {
if (err) {
return this._didUpgrade(err)
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
this.log('selecting %s', key)
msDialer.select(key, (err, _conn) => {
if (err) {
if (muxers.length === 0) {
return this._didUpgrade(err)
}
return nextMuxer(muxers.shift())
}
// observe muxed connections
const conn = observeConnection(null, key, _conn, this.switch.observer)
this.muxer = this.switch.muxers[key].dialer(conn)
this.muxer.once('close', () => {
this.close()
})
// For incoming streams, in case identify is on
this.muxer.on('stream', (conn) => {
this.log('new stream created via muxer to %s', this.theirB58Id)
conn.setPeerInfo(this.theirPeerInfo)
this.switch.protocolMuxer(null)(conn)
})
this._didUpgrade(null)
// Run identify on the connection
if (this.switch.identify) {
this._identify((err, results) => {
if (err) {
return this.close(err)
}
this.theirPeerInfo = this.switch._peerBook.put(results.peerInfo)
})
}
})
}
nextMuxer(muxers.shift())
})
}
/**
* Runs the identify protocol on the connection
* @private
* @param {function(error, { PeerInfo })} callback
* @returns {void}
*/
_identify (callback) {
if (!this.muxer) {
return nextTick(callback, errCode('The connection was already closed', 'ERR_CONNECTION_CLOSED'))
}
this.muxer.newStream(async (err, conn) => {
if (err) return callback(err)
const ms = new multistream.Dialer()
let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, this.theirPeerInfo)
} catch (err) {
return callback(err)
}
callback(null, results)
})
}
/**
* Analyses the given error, if it exists, to determine where the state machine
* needs to go.
*
* @param {Error} err
* @returns {void}
*/
_didUpgrade (err) {
if (err) {
this.log('Error upgrading connection:', err)
this.switch.conns[this.theirB58Id] = this
this.emit('error:upgrade_failed', err)
// Cant upgrade, hold the encrypted connection
return this._state('stop')
}
// move the state machine forward
this._state('done')
}
/**
* Performs the protocol handshake for the given protocol
* over the given connection. The resulting error or connection
* will be returned via the callback.
*
* @private
* @param {string} protocol
* @param {Connection} connection
* @param {function(Error, Connection)} callback
* @returns {void}
*/
_protocolHandshake (protocol, connection, callback) {
const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
if (err) {
return callback(err, null)
}
msDialer.select(protocol, (err, _conn) => {
if (err) {
this.log('could not perform protocol handshake:', err)
return callback(err, null)
}
const conn = observeConnection(null, protocol, _conn, this.switch.observer)
this.log('successfully performed handshake of %s to %s', protocol, this.theirB58Id)
this.emit('connection', conn)
callback(null, conn)
})
})
}
/**
* Event handler for state transition errors
*
* @param {Error} err
* @returns {void}
*/
_onStateError (err) {
this.emit('error', INVALID_STATE_TRANSITION(err))
this.log(err)
}
}
module.exports = withIs(ConnectionFSM, {
className: 'ConnectionFSM',
symbolName: 'libp2p-switch/ConnectionFSM'
})

View File

@ -1,289 +0,0 @@
'use strict'
const identify = require('../../identify')
const multistream = require('multistream-select')
const debug = require('debug')
const log = debug('libp2p:switch:conn-manager')
const once = require('once')
const ConnectionFSM = require('../connection')
const { msHandle, msSelect, identifyDialer } = require('../utils')
const Circuit = require('../../circuit')
const plaintext = require('../plaintext')
/**
* Contains methods for binding handlers to the Switch
* in order to better manage its connections.
*/
class ConnectionManager {
constructor (_switch) {
this.switch = _switch
this.connections = {}
}
/**
* Adds the connection for tracking if it's not already added
* @private
* @param {ConnectionFSM} connection
* @returns {void}
*/
add (connection) {
this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || []
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
this.switch.emit('connection:start', connection.theirPeerInfo)
if (connection.getState() === 'MUXED') {
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
// Clear the denylist of the peer
this.switch.dialer.clearDenylist(connection.theirPeerInfo)
} else {
connection.once('muxed', () => {
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
// Clear the denylist of the peer
this.switch.dialer.clearDenylist(connection.theirPeerInfo)
})
}
}
}
/**
* Gets the connection from the list if it exists
* @private
* @param {ConnectionFSM} connection
* @returns {ConnectionFSM|null} The found connection or null
*/
get (connection) {
if (!this.connections[connection.theirB58Id]) return null
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
return this.connections[connection.theirB58Id][i]
}
}
return null
}
/**
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
if (this.connections[peerId]) {
// Only return muxed connections
for (var i = 0; i < this.connections[peerId].length; i++) {
if (this.connections[peerId][i].getState() === 'MUXED') {
return this.connections[peerId][i]
}
}
}
return null
}
/**
* Removes the connection from tracking
* @private
* @param {ConnectionFSM} connection The connection to remove
* @returns {void}
*/
remove (connection) {
// No record of the peer, disconnect it
if (!this.connections[connection.theirB58Id]) {
if (connection.theirPeerInfo) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
return
}
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
break
}
}
// The peer is fully disconnected
if (this.connections[connection.theirB58Id].length === 0) {
delete this.connections[connection.theirB58Id]
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
// A tracked connection was closed, let the world know
this.switch.emit('connection:end', connection.theirPeerInfo)
}
/**
* Returns all connections being tracked
* @private
* @returns {ConnectionFSM[]}
*/
getAll () {
let connections = []
for (const conns of Object.values(this.connections)) {
connections = [...connections, ...conns]
}
return connections
}
/**
* Returns all connections being tracked for a given peer id
* @private
* @param {string} peerId Stringified peer id
* @returns {ConnectionFSM[]}
*/
getAllById (peerId) {
return this.connections[peerId] || []
}
/**
* Adds a listener for the given `muxer` and creates a handler for it
* leveraging the Switch.protocolMuxer handler factory
*
* @param {Muxer} muxer
* @returns {void}
*/
addStreamMuxer (muxer) {
// for dialing
this.switch.muxers[muxer.multicodec] = muxer
// for listening
this.switch.handle(muxer.multicodec, (protocol, conn) => {
const muxedConn = muxer.listener(conn)
muxedConn.on('stream', this.switch.protocolMuxer(null))
// If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (this.switch.identify) {
// Get the peer info from the crypto exchange
conn.getPeerInfo((err, cryptoPI) => {
if (err || !cryptoPI) {
log('crypto peerInfo wasnt found')
}
// overload peerInfo to use Identify instead
conn.getPeerInfo = async (callback) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
callback = once(callback)
let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, cryptoPI)
} catch (err) {
return muxedConn.end(() => {
callback(err, null)
})
}
const { peerInfo } = results
if (peerInfo) {
conn.setPeerInfo(peerInfo)
}
callback(null, peerInfo)
}
conn.getPeerInfo((err, peerInfo) => {
/* eslint no-warning-comments: off */
if (err) {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()
peerInfo = this.switch._peerBook.put(peerInfo)
const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn,
conn: conn,
type: 'inc'
})
this.switch.connection.add(connection)
// Only update if it's not already connected
if (!peerInfo.isConnected()) {
if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
}
}
muxedConn.once('close', () => {
connection.close()
})
})
})
}
return conn
})
}
/**
* Adds the `encrypt` handler for the given `tag` and also sets the
* Switch's crypto to passed `encrypt` function
*
* @param {String} tag
* @param {function(PeerID, Connection, PeerId, Callback)} encrypt
* @returns {void}
*/
crypto (tag, encrypt) {
if (!tag && !encrypt) {
tag = plaintext.tag
encrypt = plaintext.encrypt
}
this.switch.crypto = { tag, encrypt }
}
/**
* If config.enabled is true, a Circuit relay will be added to the
* available Switch transports.
*
* @param {any} config
* @returns {void}
*/
enableCircuitRelay (config) {
config = config || {}
if (config.enabled) {
if (!config.hop) {
Object.assign(config, { hop: { enabled: false, active: false } })
}
this.switch.transport.add(Circuit.tag, new Circuit(this.switch, config))
}
}
/**
* Sets identify to true on the Switch and performs handshakes
* for libp2p-identify leveraging the Switch's muxer.
*
* @returns {void}
*/
reuse () {
this.switch.identify = true
this.switch.handle(identify.multicodec, (protocol, conn) => {
identify.listener(conn, this.switch._peerInfo)
})
}
}
module.exports = ConnectionManager

View File

@ -1,12 +0,0 @@
'use strict'
module.exports = {
DENY_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again
DENY_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently denied
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials
QUARTER_HOUR: 15 * 60e3,
PRIORITY_HIGH: 10,
PRIORITY_LOW: 20
}

View File

@ -1,119 +0,0 @@
'use strict'
const DialQueueManager = require('./queueManager')
const { getPeerInfo } = require('../../get-peer-info')
const {
DENY_ATTEMPTS,
DENY_TTL,
MAX_COLD_CALLS,
MAX_PARALLEL_DIALS,
PRIORITY_HIGH,
PRIORITY_LOW
} = require('../constants')
module.exports = function (_switch) {
const dialQueueManager = new DialQueueManager(_switch)
_switch.state.on('STARTED:enter', start)
_switch.state.on('STOPPING:enter', stop)
/**
* @param {DialRequest} dialRequest
* @returns {void}
*/
function _dial ({ peerInfo, protocol, options, callback }) {
if (typeof protocol === 'function') {
callback = protocol
protocol = null
}
try {
peerInfo = getPeerInfo(peerInfo, _switch._peerBook)
} catch (err) {
return callback(err)
}
// Add it to the queue, it will automatically get executed
dialQueueManager.add({ peerInfo, protocol, options, callback })
}
/**
* Starts the `DialQueueManager`
*
* @param {function} callback
*/
function start (callback) {
dialQueueManager.start()
callback()
}
/**
* Aborts all dials that are queued. This should
* only be used when the Switch is being stopped
*
* @param {function} callback
*/
function stop (callback) {
dialQueueManager.stop()
callback()
}
/**
* Clears the denylist for a given peer
* @param {PeerInfo} peerInfo
*/
function clearDenylist (peerInfo) {
dialQueueManager.clearDenylist(peerInfo)
}
/**
* Attempts to establish a connection to the given `peerInfo` at
* a lower priority than a standard dial.
* @param {PeerInfo} peerInfo
* @param {object} options
* @param {boolean} options.useFSM Whether or not to return a `ConnectionFSM`. Defaults to false.
* @param {number} options.priority Lowest priority goes first. Defaults to 20.
* @param {function(Error, Connection)} callback
*/
function connect (peerInfo, options, callback) {
if (typeof options === 'function') {
callback = options
options = null
}
options = { useFSM: false, priority: PRIORITY_LOW, ...options }
_dial({ peerInfo, protocol: null, options, callback })
}
/**
* Adds the dial request to the queue for the given `peerInfo`
* The request will be added with a high priority (10).
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, Connection)} callback
*/
function dial (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, options: { useFSM: false, priority: PRIORITY_HIGH }, callback })
}
/**
* Behaves like dial, except it calls back with a ConnectionFSM
*
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @param {function(Error, ConnectionFSM)} callback
*/
function dialFSM (peerInfo, protocol, callback) {
_dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback })
}
return {
connect,
dial,
dialFSM,
clearDenylist,
DENY_ATTEMPTS: isNaN(_switch._options.denyAttempts) ? DENY_ATTEMPTS : _switch._options.denyAttempts,
DENY_TTL: isNaN(_switch._options.denyTTL) ? DENY_TTL : _switch._options.denyTTL,
MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls,
MAX_PARALLEL_DIALS: isNaN(_switch._options.maxParallelDials) ? MAX_PARALLEL_DIALS : _switch._options.maxParallelDials
}
}

View File

@ -1,281 +0,0 @@
'use strict'
const ConnectionFSM = require('../connection')
const { DIAL_ABORTED, ERR_DENIED } = require('../errors')
const nextTick = require('async/nextTick')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:switch:dial')
log.error = debug('libp2p:switch:dial:error')
/**
* Components required to execute a dial
* @typedef {Object} DialRequest
* @property {PeerInfo} peerInfo - The peer to dial to
* @property {string} [protocol] - The protocol to create a stream for
* @property {object} options
* @property {boolean} options.useFSM - If `callback` should return a ConnectionFSM
* @property {number} options.priority - The priority of the dial
* @property {function(Error, Connection|ConnectionFSM)} callback
*/
/**
* @typedef {Object} NewConnection
* @property {ConnectionFSM} connectionFSM
* @property {boolean} didCreate
*/
/**
* Attempts to create a new connection or stream (when muxed),
* via negotiation of the given `protocol`. If no `protocol` is
* provided, no action will be taken and `callback` will be called
* immediately with no error or values.
*
* @param {object} options
* @param {string} options.protocol
* @param {ConnectionFSM} options.connection
* @param {function(Error, Connection)} options.callback
* @returns {void}
*/
function createConnectionWithProtocol ({ protocol, connection, callback }) {
if (!protocol) {
return callback()
}
connection.shake(protocol, (err, conn) => {
if (!conn) {
return callback(err)
}
conn.setPeerInfo(connection.theirPeerInfo)
callback(null, conn)
})
}
/**
* A convenience array wrapper for controlling
* a per peer queue
*
* @returns {Queue}
*/
class Queue {
/**
* @constructor
* @param {string} peerId
* @param {Switch} _switch
* @param {function(string)} onStopped Called when the queue stops
*/
constructor (peerId, _switch, onStopped) {
this.id = peerId
this.switch = _switch
this._queue = []
this.denylisted = null
this.denylistCount = 0
this.isRunning = false
this.onStopped = onStopped
}
get length () {
return this._queue.length
}
/**
* Adds the dial request to the queue. The queue is not automatically started
* @param {string} protocol
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
* @param {function(Error, Connection)} callback
* @returns {void}
*/
add (protocol, useFSM, callback) {
if (!this.isDialAllowed()) {
return nextTick(callback, ERR_DENIED())
}
this._queue.push({ protocol, useFSM, callback })
}
/**
* Determines whether or not dialing is currently allowed
* @returns {boolean}
*/
isDialAllowed () {
if (this.denylisted) {
// If the deny ttl has passed, reset it
if (Date.now() > this.denylisted) {
this.denylisted = null
return true
}
// Dial is not allowed
return false
}
return true
}
/**
* Starts the queue. If the queue was started `true` will be returned.
* If the queue was already running `false` is returned.
* @returns {boolean}
*/
start () {
if (!this.isRunning) {
log('starting dial queue to %s', this.id)
this.isRunning = true
this._run()
return true
}
return false
}
/**
* Stops the queue
*/
stop () {
if (this.isRunning) {
log('stopping dial queue to %s', this.id)
this.isRunning = false
this.onStopped(this.id)
}
}
/**
* Stops the queue and errors the callback for each dial request
*/
abort () {
while (this.length > 0) {
const dial = this._queue.shift()
dial.callback(DIAL_ABORTED())
}
this.stop()
}
/**
* Marks the queue as denylisted. The queue will be immediately aborted.
* @returns {void}
*/
denylist () {
this.denylistCount++
if (this.denylistCount >= this.switch.dialer.DENY_ATTEMPTS) {
this.denylisted = Infinity
return
}
let ttl = this.switch.dialer.DENY_TTL * Math.pow(this.denylistCount, 3)
const minTTL = ttl * 0.9
const maxTTL = ttl * 1.1
// Add a random jitter of 20% to the ttl
ttl = Math.floor(Math.random() * (maxTTL - minTTL) + minTTL)
this.denylisted = Date.now() + ttl
this.abort()
}
/**
* Attempts to find a muxed connection for the given peer. If one
* isn't found, a new one will be created.
*
* Returns an array containing two items. The ConnectionFSM and wether
* or not the ConnectionFSM was just created. The latter can be used
* to determine dialing needs.
*
* @private
* @param {PeerInfo} peerInfo
* @returns {NewConnection}
*/
_getOrCreateConnection (peerInfo) {
let connectionFSM = this.switch.connection.getOne(this.id)
let didCreate = false
if (!connectionFSM) {
connectionFSM = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: null,
conn: null
})
this.switch.connection.add(connectionFSM)
// Add control events and start the dialer
connectionFSM.once('connected', () => connectionFSM.protect())
connectionFSM.once('private', () => connectionFSM.encrypt())
connectionFSM.once('encrypted', () => connectionFSM.upgrade())
didCreate = true
}
return { connectionFSM, didCreate }
}
/**
* Executes the next dial in the queue for the given peer
* @private
* @returns {void}
*/
_run () {
// If we have no items in the queue or we're stopped, exit
if (this.length < 1 || !this.isRunning) {
log('stopping the queue for %s', this.id)
return this.stop()
}
const next = once(() => {
log('starting next dial to %s', this.id)
this._run()
})
const peerInfo = this.switch._peerBook.get(this.id)
const queuedDial = this._queue.shift()
const { connectionFSM, didCreate } = this._getOrCreateConnection(peerInfo)
// If the dial expects a ConnectionFSM, we can provide that back now
if (queuedDial.useFSM) {
nextTick(queuedDial.callback, null, connectionFSM)
}
// If we can handshake protocols, get a new stream and call run again
if (['MUXED', 'CONNECTED'].includes(connectionFSM.getState())) {
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
return
}
// If we error, error the queued dial
// In the future, it may be desired to error the other queued dials,
// depending on the error.
connectionFSM.once('error', (err) => {
queuedDial.callback(err)
// Dont denylist peers we have identified and that we are connected to
if (peerInfo.protocols.size > 0 && peerInfo.isConnected()) {
return
}
this.denylist()
})
connectionFSM.once('close', () => {
next()
})
// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.denylistCount = 0 // reset denylisting on good connections
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})
connectionFSM.once('unmuxed', () => {
this.denylistCount = 0
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})
// If we have a new connection, start dialing
if (didCreate) {
connectionFSM.dial()
}
}
}
module.exports = Queue

View File

@ -1,220 +0,0 @@
'use strict'
const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const nextTick = require('async/nextTick')
const retimer = require('retimer')
const { QUARTER_HOUR, PRIORITY_HIGH } = require('../constants')
const debug = require('debug')
const log = debug('libp2p:switch:dial:manager')
const noop = () => {}
class DialQueueManager {
/**
* @constructor
* @param {Switch} _switch
*/
constructor (_switch) {
this._queue = new Set()
this._coldCallQueue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR)
this.start()
}
/**
* Runs through all queues, aborts and removes them if they
* are no longer valid. A queue that is denylisted indefinitely,
* is considered no longer valid.
* @private
*/
_clean () {
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
// Clear if the queue has reached max denylist
if (dialQueue.denylisted === Infinity) {
dialQueue.abort()
delete this._queues[dialQueue.id]
return
}
// Keep track of denylisted queues
if (dialQueue.denylisted) return
// Clear if peer is no longer active
// To avoid reallocating memory, dont delete queues of
// connected peers, as these are highly likely to leverage the
// queues in the immediate term
if (!dialQueue.isRunning && dialQueue.length < 1) {
let isConnected = false
try {
const peerInfo = this.switch._peerBook.get(dialQueue.id)
isConnected = Boolean(peerInfo.isConnected())
} catch (_) {
// If we get an error, that means the peerbook doesnt have the peer
}
if (!isConnected) {
dialQueue.abort()
delete this._queues[dialQueue.id]
}
}
})
this._cleanInterval.reschedule(QUARTER_HOUR)
}
/**
* Allows the `DialQueueManager` to execute dials
*/
start () {
this.isRunning = true
}
/**
* Iterates over all items in the DialerQueue
* and executes there callback with an error.
*
* This causes the entire DialerQueue to be drained
*/
stop () {
this.isRunning = false
// Clear the general queue
this._queue.clear()
// Clear the cold call queue
this._coldCallQueue.clear()
this._cleanInterval.clear()
// Abort the individual peer queues
const queues = Object.values(this._queues)
queues.forEach(dialQueue => {
dialQueue.abort()
delete this._queues[dialQueue.id]
})
}
/**
* Adds the `dialRequest` to the queue and ensures queue is running
*
* @param {DialRequest} dialRequest
* @returns {void}
*/
add ({ peerInfo, protocol, options, callback }) {
callback = callback ? once(callback) : noop
// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
// Cold Call
if (options.priority > PRIORITY_HIGH) {
// If we have too many cold calls, abort the dial immediately
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS) {
return nextTick(callback, DIAL_ABORTED())
}
if (this._queue.has(targetQueue.id)) {
return nextTick(callback, DIAL_ABORTED())
}
}
targetQueue.add(protocol, options.useFSM, callback)
// If we're already connected to the peer, start the queue now
// While it might cause queues to go over the max parallel amount,
// it avoids denying peers we're already connected to
if (peerInfo.isConnected()) {
targetQueue.start()
return
}
// If dialing is not allowed, abort
if (!targetQueue.isDialAllowed()) {
return
}
// Add the id to its respective queue set if the queue isn't running
if (!targetQueue.isRunning) {
if (options.priority <= PRIORITY_HIGH) {
this._queue.add(targetQueue.id)
this._coldCallQueue.delete(targetQueue.id)
// Only add it to the cold queue if it's not in the normal queue
} else {
this._coldCallQueue.add(targetQueue.id)
}
}
this.run()
}
/**
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (!this.isRunning) return
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) {
let nextQueue = { done: true }
// Check the queue first and fall back to the cold call queue
if (this._queue.size > 0) {
nextQueue = this._queue.values().next()
this._queue.delete(nextQueue.value)
} else if (this._coldCallQueue.size > 0) {
nextQueue = this._coldCallQueue.values().next()
this._coldCallQueue.delete(nextQueue.value)
}
if (nextQueue.done) {
return
}
const targetQueue = this._queues[nextQueue.value]
if (!targetQueue) {
log('missing queue %s, maybe it was aborted?', nextQueue.value)
return
}
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
}
}
/**
* Will remove the `peerInfo` from the dial denylist
* @param {PeerInfo} peerInfo
*/
clearDenylist (peerInfo) {
const queue = this.getQueue(peerInfo)
queue.denylisted = null
queue.denylistCount = 0
}
/**
* A handler for when dialing queues stop. This will trigger
* `run()` in order to keep the queue processing.
* @private
* @param {string} id peer id of the queue that stopped
*/
_onQueueStopped (id) {
this._dialingQueues.delete(id)
this.run()
}
/**
* Returns the `Queue` for the given `peerInfo`
* @param {PeerInfo} peerInfo
* @returns {Queue}
*/
getQueue (peerInfo) {
const id = peerInfo.id.toB58String()
this._queues[id] = this._queues[id] || new Queue(id, this.switch, this._onQueueStopped.bind(this))
return this._queues[id]
}
}
module.exports = DialQueueManager

View File

@ -1,20 +0,0 @@
'use strict'
const errCode = require('err-code')
module.exports = {
CONNECTION_FAILED: (err) => errCode(err, 'CONNECTION_FAILED'),
DIAL_ABORTED: () => errCode('Dial was aborted', 'DIAL_ABORTED'),
ERR_DENIED: () => errCode('Dial is currently denied for this peer', 'ERR_DENIED'),
DIAL_SELF: () => errCode('A node cannot dial itself', 'DIAL_SELF'),
INVALID_STATE_TRANSITION: (err) => errCode(err, 'INVALID_STATE_TRANSITION'),
NO_TRANSPORTS_REGISTERED: () => errCode('No transports registered, dial not possible', 'NO_TRANSPORTS_REGISTERED'),
PROTECTOR_REQUIRED: () => errCode('No protector provided with private network enforced', 'PROTECTOR_REQUIRED'),
UNEXPECTED_END: () => errCode('Unexpected end of input from reader.', 'UNEXPECTED_END'),
maybeUnexpectedEnd: (err) => {
if (err === true) {
return module.exports.UNEXPECTED_END()
}
return err
}
}

View File

@ -1,274 +0,0 @@
'use strict'
const FSM = require('fsm-event')
const EventEmitter = require('events').EventEmitter
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const series = require('async/series')
const Circuit = require('../circuit')
const TransportManager = require('./transport')
const ConnectionManager = require('./connection/manager')
const { getPeerInfo } = require('../get-peer-info')
const getDialer = require('./dialer')
const connectionHandler = require('./connection/handler')
const ProtocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')
const Observer = require('./observer')
const Stats = require('./stats')
const assert = require('assert')
const Errors = require('./errors')
const debug = require('debug')
const log = debug('libp2p:switch')
log.error = debug('libp2p:switch:error')
/**
* @fires Switch#stop Triggered when the switch has stopped
* @fires Switch#start Triggered when the switch has started
* @fires Switch#error Triggered whenever an error occurs
*/
class Switch extends EventEmitter {
constructor (peerInfo, peerBook, options) {
super()
assert(peerInfo, 'You must provide a `peerInfo`')
assert(peerBook, 'You must provide a `peerBook`')
this._peerInfo = peerInfo
this._peerBook = peerBook
this._options = options || {}
this.setMaxListeners(Infinity)
// transports --
// { key: transport }; e.g { tcp: <tcp> }
this.transports = {}
// connections --
// { peerIdB58: { conn: <conn> }}
this.conns = {}
// { protocol: handler }
this.protocols = {}
// { muxerCodec: <muxer> } e.g { '/spdy/0.3.1': spdy }
this.muxers = {}
// is the Identify protocol enabled?
this.identify = false
// Crypto details
this.crypto = plaintext
this.protector = this._options.protector || null
this.transport = new TransportManager(this)
this.connection = new ConnectionManager(this)
this.observer = Observer(this)
this.stats = Stats(this.observer, this._options.stats)
this.protocolMuxer = ProtocolMuxer(this.protocols, this.observer)
// All purpose connection handler for managing incoming connections
this._connectionHandler = connectionHandler(this)
// Setup the internal state
this.state = new FSM('STOPPED', {
STOPPED: {
start: 'STARTING',
stop: 'STOPPING' // ensures that any transports that were manually started are stopped
},
STARTING: {
done: 'STARTED',
stop: 'STOPPING'
},
STARTED: {
stop: 'STOPPING',
start: 'STARTED'
},
STOPPING: {
stop: 'STOPPING',
done: 'STOPPED'
}
})
this.state.on('STARTING', () => {
log('The switch is starting')
this._onStarting()
})
this.state.on('STOPPING', () => {
log('The switch is stopping')
this._onStopping()
})
this.state.on('STARTED', () => {
log('The switch has started')
this.emit('start')
})
this.state.on('STOPPED', () => {
log('The switch has stopped')
this.emit('stop')
})
this.state.on('error', (err) => {
log.error(err)
this.emit('error', err)
})
// higher level (public) API
this.dialer = getDialer(this)
this.dial = this.dialer.dial
this.dialFSM = this.dialer.dialFSM
}
/**
* Returns a list of the transports peerInfo has addresses for
*
* @param {PeerInfo} peerInfo
* @returns {Array<Transport>}
*/
availableTransports (peerInfo) {
const myAddrs = peerInfo.multiaddrs.toArray()
const myTransports = Object.keys(this.transports)
// Only listen on transports we actually have addresses for
return myTransports.filter((ts) => this.transports[ts].filter(myAddrs).length > 0)
// push Circuit to be the last proto to be dialed, and alphabetize the others
.sort((a, b) => {
if (a === Circuit.tag) return 1
if (b === Circuit.tag) return -1
return a < b ? -1 : 1
})
}
/**
* Adds the `handlerFunc` and `matchFunc` to the Switch's protocol
* handler list for the given `protocol`. If the `matchFunc` returns
* true for a protocol check, the `handlerFunc` will be called.
*
* @param {string} protocol
* @param {function(string, Connection)} handlerFunc
* @param {function(string, string, function(Error, boolean))} matchFunc
* @returns {void}
*/
handle (protocol, handlerFunc, matchFunc) {
this.protocols[protocol] = {
handlerFunc: handlerFunc,
matchFunc: matchFunc
}
this._peerInfo.protocols.add(protocol)
}
/**
* Removes the given protocol from the Switch's protocol list
*
* @param {string} protocol
* @returns {void}
*/
unhandle (protocol) {
if (this.protocols[protocol]) {
delete this.protocols[protocol]
}
this._peerInfo.protocols.delete(protocol)
}
/**
* If a muxed Connection exists for the given peer, it will be closed
* and its reference on the Switch will be removed.
*
* @param {PeerInfo|Multiaddr|PeerId} peer
* @param {function()} callback
* @returns {void}
*/
hangUp (peer, callback) {
const peerInfo = getPeerInfo(peer, this._peerBook)
const key = peerInfo.id.toB58String()
const conns = [...this.connection.getAllById(key)]
each(conns, (conn, cb) => {
conn.once('close', cb)
conn.close()
}, callback)
}
/**
* Returns whether or not the switch has any transports
*
* @returns {boolean}
*/
hasTransports () {
const transports = Object.keys(this.transports).filter((t) => t !== Circuit.tag)
return transports && transports.length > 0
}
/**
* Issues a start on the Switch state.
*
* @param {function} callback deprecated: Listening for the `error` and `start` events are recommended
* @returns {void}
*/
start (callback = () => {}) {
// Add once listener for deprecated callback support
this.once('start', callback)
this.state('start')
}
/**
* Issues a stop on the Switch state.
*
* @param {function} callback deprecated: Listening for the `error` and `stop` events are recommended
* @returns {void}
*/
stop (callback = () => {}) {
// Add once listener for deprecated callback support
this.once('stop', callback)
this.state('stop')
}
/**
* A listener that will start any necessary services and listeners
*
* @private
* @returns {void}
*/
_onStarting () {
this.stats.start()
eachSeries(this.availableTransports(this._peerInfo), (ts, cb) => {
// Listen on the given transport
this.transport.listen(ts, {}, null, cb)
}, (err) => {
if (err) {
log.error(err)
this.emit('error', err)
return this.state('stop')
}
this.state('done')
})
}
/**
* A listener that will turn off all running services and listeners
*
* @private
* @returns {void}
*/
_onStopping () {
this.stats.stop()
series([
(cb) => {
each(this.transports, (transport, cb) => {
each(transport.listeners, (listener, cb) => {
listener.close((err) => {
if (err) log.error(err)
cb()
})
}, cb)
}, cb)
},
(cb) => each(this.connection.getAll(), (conn, cb) => {
conn.once('close', cb)
conn.close()
}, cb)
], (_) => {
this.state('done')
})
}
}
module.exports = Switch
module.exports.errors = Errors

View File

@ -1,88 +0,0 @@
'use strict'
const tryEach = require('async/tryEach')
const debug = require('debug')
const log = debug('libp2p:switch:dialer')
const DialQueue = require('./queue')
/**
* Track dials per peer and limited them.
*/
class LimitDialer {
/**
* Create a new dialer.
*
* @param {number} perPeerLimit
* @param {number} dialTimeout
*/
constructor (perPeerLimit, dialTimeout) {
log('create: %s peer limit, %s dial timeout', perPeerLimit, dialTimeout)
this.perPeerLimit = perPeerLimit
this.dialTimeout = dialTimeout
this.queues = new Map()
}
/**
* Dial a list of multiaddrs on the given transport.
*
* @param {PeerId} peer
* @param {SwarmTransport} transport
* @param {Array<Multiaddr>} addrs
* @param {function(Error, Connection)} callback
* @returns {void}
*/
dialMany (peer, transport, addrs, callback) {
log('dialMany:start')
// we use a token to track if we want to cancel following dials
const token = { cancel: false }
const errors = []
const tasks = addrs.map((m) => {
return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => {
if (err) {
errors.push(err)
return cb(err)
}
return cb(null, result)
})
})
tryEach(tasks, (_, result) => {
if (result && result.conn) {
log('dialMany:success')
return callback(null, result)
}
log('dialMany:error')
callback(errors)
})
}
/**
* Dial a single multiaddr on the given transport.
*
* @param {PeerId} peer
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
*/
dialSingle (peer, transport, addr, token, callback) {
const ps = peer.toB58String()
log('dialSingle: %s:%s', ps, addr.toString())
let q
if (this.queues.has(ps)) {
q = this.queues.get(ps)
} else {
q = new DialQueue(this.perPeerLimit, this.dialTimeout)
this.queues.set(ps, q)
}
q.push(transport, addr, token, callback)
}
}
module.exports = LimitDialer

View File

@ -1,109 +0,0 @@
'use strict'
const { Connection } = require('libp2p-interfaces/src/connection')
const pull = require('pull-stream/pull')
const empty = require('pull-stream/sources/empty')
const timeout = require('async/timeout')
const queue = require('async/queue')
const debug = require('debug')
const once = require('once')
const log = debug('libp2p:switch:dialer:queue')
log.error = debug('libp2p:switch:dialer:queue:error')
/**
* Queue up the amount of dials to a given peer.
*/
class DialQueue {
/**
* Create a new dial queue.
*
* @param {number} limit
* @param {number} dialTimeout
*/
constructor (limit, dialTimeout) {
this.dialTimeout = dialTimeout
this.queue = queue((task, cb) => {
this._doWork(task.transport, task.addr, task.token, cb)
}, limit)
}
/**
* The actual work done by the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
* @private
*/
_doWork (transport, addr, token, callback) {
callback = once(callback)
log('work:start')
this._dialWithTimeout(transport, addr, (err, conn) => {
if (err) {
log.error(`${transport.constructor.name}:work`, err)
return callback(err)
}
if (token.cancel) {
log('work:cancel')
// clean up already done dials
pull(empty(), conn)
// If we can close the connection, do it
if (typeof conn.close === 'function') {
return conn.close((_) => callback(null))
}
return callback(null)
}
// one is enough
token.cancel = true
log('work:success')
const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
callback(null, { multiaddr: addr, conn: conn })
})
}
/**
* Dial the given transport, timing out with the set timeout.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {function(Error, Connection)} callback
* @returns {void}
*
* @private
*/
_dialWithTimeout (transport, addr, callback) {
timeout((cb) => {
const conn = transport.dial(addr, (err) => {
if (err) {
return cb(err)
}
cb(null, conn)
})
}, this.dialTimeout)(callback)
}
/**
* Add new work to the queue.
*
* @param {SwarmTransport} transport
* @param {Multiaddr} addr
* @param {CancelToken} token
* @param {function(Error, Connection)} callback
* @returns {void}
*/
push (transport, addr, token, callback) {
this.queue.push({ transport, addr, token }, callback)
}
}
module.exports = DialQueue

View File

@ -1,44 +0,0 @@
'use strict'
const { Connection } = require('libp2p-interfaces/src/connection')
const pull = require('pull-stream/pull')
/**
* Creates a pull stream to run the given Connection stream through
* the given Observer. This provides a way to more easily monitor connections
* and their metadata. A new Connection will be returned that contains
* has the attached Observer.
*
* @param {Transport} transport
* @param {string} protocol
* @param {Connection} connection
* @param {Observer} observer
* @returns {Connection}
*/
module.exports = (transport, protocol, connection, observer) => {
const peerInfo = new Promise((resolve, reject) => {
connection.getPeerInfo((err, peerInfo) => {
if (!err && peerInfo) {
resolve(peerInfo)
return
}
const setPeerInfo = connection.setPeerInfo
connection.setPeerInfo = (pi) => {
setPeerInfo.call(connection, pi)
resolve(pi)
}
})
})
const stream = {
source: pull(
connection,
observer.incoming(transport, protocol, peerInfo)),
sink: pull(
observer.outgoing(transport, protocol, peerInfo),
connection)
}
return new Connection(stream, connection)
}

View File

@ -1,48 +0,0 @@
'use strict'
const map = require('pull-stream/throughs/map')
const EventEmitter = require('events')
/**
* Takes a Switch and returns an Observer that can be used in conjunction with
* observe-connection.js. The returned Observer comes with `incoming` and
* `outgoing` properties that can be used in pull streams to emit all metadata
* for messages that pass through a Connection.
*
* @param {Switch} swtch
* @returns {EventEmitter}
*/
module.exports = (swtch) => {
const observer = Object.assign(new EventEmitter(), {
incoming: observe('in'),
outgoing: observe('out')
})
swtch.on('peer-mux-established', (peerInfo) => {
observer.emit('peer:connected', peerInfo.id.toB58String())
})
swtch.on('peer-mux-closed', (peerInfo) => {
observer.emit('peer:closed', peerInfo.id.toB58String())
})
return observer
function observe (direction) {
return (transport, protocol, peerInfo) => {
return map((buffer) => {
willObserve(peerInfo, transport, protocol, direction, buffer.length)
return buffer
})
}
}
function willObserve (peerInfo, transport, protocol, direction, bufferLength) {
peerInfo.then((_peerInfo) => {
if (_peerInfo) {
const peerId = _peerInfo.id.toB58String()
observer.emit('message', peerId, transport, protocol, direction, bufferLength)
}
})
}
}

View File

@ -1,20 +0,0 @@
'use strict'
const setImmediate = require('async/setImmediate')
/**
* An encryption stub in the instance that the default crypto
* has not been overriden for the Switch
*/
module.exports = {
tag: '/plaintext/1.0.0',
encrypt (myId, conn, remoteId, callback) {
if (typeof remoteId === 'function') {
callback = remoteId
remoteId = undefined
}
setImmediate(() => callback())
return conn
}
}

View File

@ -1,48 +0,0 @@
'use strict'
const multistream = require('multistream-select')
const observeConn = require('./observe-connection')
const debug = require('debug')
const log = debug('libp2p:switch:protocol-muxer')
log.error = debug('libp2p:switch:protocol-muxer:error')
module.exports = function protocolMuxer (protocols, observer) {
return (transport) => (_parentConn, msListener) => {
const ms = msListener || new multistream.Listener()
let parentConn
// Only observe the transport if we have one, and there is not already a listener
if (transport && !msListener) {
parentConn = observeConn(transport, null, _parentConn, observer)
} else {
parentConn = _parentConn
}
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
const handler = (protocolName, _conn) => {
log('registering handler with protocol %s', protocolName)
const protocol = protocols[protocolName]
if (protocol) {
const handlerFunc = protocol && protocol.handlerFunc
if (handlerFunc) {
const conn = observeConn(null, protocolName, _conn, observer)
handlerFunc(protocol, conn)
}
}
}
ms.addHandler(protocol, handler, protocols[protocol].matchFunc)
})
ms.handle(parentConn, (err) => {
if (err) {
log.error('multistream handshake failed', err)
}
})
}
}

View File

@ -1,272 +0,0 @@
'use strict'
/* eslint no-warning-comments: off */
const parallel = require('async/parallel')
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:switch:transport')
const LimitDialer = require('./limit-dialer')
const { DIAL_TIMEOUT } = require('./constants')
const { uniqueBy } = require('./utils')
// number of concurrent outbound dials to make per peer, same as go-libp2p-swtch
const defaultPerPeerRateLimit = 8
/**
* Manages the transports for the switch. This simplifies dialing and listening across
* multiple transports.
*/
class TransportManager {
constructor (_switch) {
this.switch = _switch
this.dialer = new LimitDialer(defaultPerPeerRateLimit, this.switch._options.dialTimeout || DIAL_TIMEOUT)
}
/**
* Adds a `Transport` to the list of transports on the switch, and assigns it to the given key
*
* @param {String} key
* @param {Transport} transport
* @returns {void}
*/
add (key, transport) {
log('adding %s', key)
if (this.switch.transports[key]) {
throw new Error('There is already a transport with this key')
}
this.switch.transports[key] = transport
if (!this.switch.transports[key].listeners) {
this.switch.transports[key].listeners = []
}
}
/**
* Closes connections for the given transport key
* and removes it from the switch.
*
* @param {String} key
* @param {function(Error)} callback
* @returns {void}
*/
remove (key, callback) {
callback = callback || function () {}
if (!this.switch.transports[key]) {
return callback()
}
this.close(key, (err) => {
delete this.switch.transports[key]
callback(err)
})
}
/**
* Calls `remove` on each transport the switch has
*
* @param {function(Error)} callback
* @returns {void}
*/
removeAll (callback) {
const tasks = Object.keys(this.switch.transports).map((key) => {
return (cb) => {
this.remove(key, cb)
}
})
parallel(tasks, callback)
}
/**
* For a given transport `key`, dial to all that transport multiaddrs
*
* @param {String} key Key of the `Transport` to dial
* @param {PeerInfo} peerInfo
* @param {function(Error, Connection)} callback
* @returns {void}
*/
dial (key, peerInfo, callback) {
const transport = this.switch.transports[key]
let multiaddrs = peerInfo.multiaddrs.toArray()
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
// filter the multiaddrs that are actually valid for this transport
multiaddrs = TransportManager.dialables(transport, multiaddrs, this.switch._peerInfo)
log('dialing %s', key, multiaddrs.map((m) => m.toString()))
// dial each of the multiaddrs with the given transport
this.dialer.dialMany(peerInfo.id, transport, multiaddrs, (errors, success) => {
if (errors) {
return callback(errors)
}
peerInfo.connect(success.multiaddr)
callback(null, success.conn)
})
}
/**
* For a given Transport `key`, listen on all multiaddrs in the switch's `_peerInfo`.
* If a `handler` is not provided, the Switch's `protocolMuxer` will be used.
*
* @param {String} key
* @param {*} _options Currently ignored
* @param {function(Connection)} handler
* @param {function(Error)} callback
* @returns {void}
*/
listen (key, _options, handler, callback) {
handler = this.switch._connectionHandler(key, handler)
const transport = this.switch.transports[key]
let originalAddrs = this.switch._peerInfo.multiaddrs.toArray()
// Until TCP can handle distinct addresses on listen, https://github.com/libp2p/interface-transport/issues/41,
// make sure we aren't trying to listen on duplicate ports. This also applies to websockets.
originalAddrs = uniqueBy(originalAddrs, (addr) => {
// Any non 0 port should register as unique
const port = Number(addr.toOptions().port)
return isNaN(port) || port === 0 ? addr.toString() : port
})
const multiaddrs = TransportManager.dialables(transport, originalAddrs)
if (!transport.listeners) {
transport.listeners = []
}
let freshMultiaddrs = []
const createListeners = multiaddrs.map((ma) => {
return (cb) => {
const done = once(cb)
const listener = transport.createListener(handler)
listener.once('error', done)
listener.listen(ma, (err) => {
if (err) {
return done(err)
}
listener.removeListener('error', done)
listener.getAddrs((err, addrs) => {
if (err) {
return done(err)
}
freshMultiaddrs = freshMultiaddrs.concat(addrs)
transport.listeners.push(listener)
done()
})
})
}
})
parallel(createListeners, (err) => {
if (err) {
return callback(err)
}
// cause we can listen on port 0 or 0.0.0.0
this.switch._peerInfo.multiaddrs.replace(multiaddrs, freshMultiaddrs)
callback()
})
}
/**
* Closes the transport with the given key, by closing all of its listeners
*
* @param {String} key
* @param {function(Error)} callback
* @returns {void}
*/
close (key, callback) {
const transport = this.switch.transports[key]
if (!transport) {
return callback(new Error(`Trying to close non existing transport: ${key}`))
}
parallel(transport.listeners.map((listener) => {
return (cb) => {
listener.close(cb)
}
}), callback)
}
/**
* For a given transport, return its multiaddrs that match the given multiaddrs
*
* @param {Transport} transport
* @param {Array<Multiaddr>} multiaddrs
* @param {PeerInfo} peerInfo Optional - a peer whose addresses should not be returned
* @returns {Array<Multiaddr>}
*/
static dialables (transport, multiaddrs, peerInfo) {
// If we dont have a proper transport, return no multiaddrs
if (!transport || !transport.filter) return []
const transportAddrs = transport.filter(multiaddrs)
if (!peerInfo || !transportAddrs.length) {
return transportAddrs
}
const ourAddrs = ourAddresses(peerInfo)
const result = transportAddrs.filter(transportAddr => {
// If our address is in the destination address, filter it out
return !ourAddrs.some(a => getDestination(transportAddr).startsWith(a))
})
return result
}
}
/**
* Expand addresses in peer info into array of addresses with and without peer
* ID suffix.
*
* @param {PeerInfo} peerInfo Our peer info object
* @returns {String[]}
*/
function ourAddresses (peerInfo) {
const ourPeerId = peerInfo.id.toB58String()
return peerInfo.multiaddrs.toArray()
.reduce((ourAddrs, addr) => {
const peerId = addr.getPeerId()
addr = addr.toString()
const otherAddr = peerId
? addr.slice(0, addr.lastIndexOf(`/ipfs/${peerId}`))
: `${addr}/ipfs/${ourPeerId}`
return ourAddrs.concat([addr, otherAddr])
}, [])
.filter(a => Boolean(a))
.concat(`/ipfs/${ourPeerId}`)
}
const RelayProtos = [
'p2p-circuit',
'p2p-websocket-star',
'p2p-webrtc-star',
'p2p-stardust'
]
/**
* Get the destination address of a (possibly relay) multiaddr as a string
*
* @param {Multiaddr} addr
* @returns {String}
*/
function getDestination (addr) {
const protos = addr.protoNames().reverse()
const splitProto = protos.find(p => RelayProtos.includes(p))
addr = addr.toString()
if (!splitProto) return addr
return addr.slice(addr.lastIndexOf(splitProto) + splitProto.length)
}
module.exports = TransportManager

View File

@ -1,60 +0,0 @@
'use strict'
const Identify = require('../identify')
/**
* For a given multistream, registers to handle the given connection
* @param {MultistreamDialer} multistream
* @param {Connection} connection
* @returns {Promise}
*/
module.exports.msHandle = (multistream, connection) => {
return new Promise((resolve, reject) => {
multistream.handle(connection, (err) => {
if (err) return reject(err)
resolve()
})
})
}
/**
* For a given multistream, selects the given protocol
* @param {MultistreamDialer} multistream
* @param {string} protocol
* @returns {Promise} Resolves the selected Connection
*/
module.exports.msSelect = (multistream, protocol) => {
return new Promise((resolve, reject) => {
multistream.select(protocol, (err, connection) => {
if (err) return reject(err)
resolve(connection)
})
})
}
/**
* Runs identify for the given connection and verifies it against the
* PeerInfo provided
* @param {Connection} connection
* @param {PeerInfo} cryptoPeerInfo The PeerInfo determined during crypto exchange
* @returns {Promise} Resolves {peerInfo, observedAddrs}
*/
module.exports.identifyDialer = (connection, cryptoPeerInfo) => {
return new Promise((resolve, reject) => {
Identify.dialer(connection, cryptoPeerInfo, (err, peerInfo, observedAddrs) => {
if (err) return reject(err)
resolve({ peerInfo, observedAddrs })
})
})
}
/**
* Get unique values from `arr` using `getValue` to determine
* what is used for uniqueness
* @param {Array} arr The array to get unique values for
* @param {function(value)} getValue The function to determine what is compared
* @returns {Array}
*/
module.exports.uniqueBy = (arr, getValue) => {
return [...new Map(arr.map((i) => [getValue(i), i])).values()]
}