chore: refactor connection manager and registrar

This commit is contained in:
Vasco Santos 2020-04-18 17:06:56 +02:00
parent 67fda7e106
commit 31d8a929ea
18 changed files with 424 additions and 411 deletions

View File

@ -11,12 +11,12 @@
* [`handle`](#handle) * [`handle`](#handle)
* [`unhandle`](#unhandle) * [`unhandle`](#unhandle)
* [`ping`](#ping) * [`ping`](#ping)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`contentRouting.findProviders`](#contentroutingfindproviders) * [`contentRouting.findProviders`](#contentroutingfindproviders)
* [`contentRouting.provide`](#contentroutingprovide) * [`contentRouting.provide`](#contentroutingprovide)
* [`contentRouting.put`](#contentroutingput) * [`contentRouting.put`](#contentroutingput)
* [`contentRouting.get`](#contentroutingget) * [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany) * [`contentRouting.getMany`](#contentroutinggetmany)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
* [`peerStore.addressBook.get`](#peerstoreaddressbookget) * [`peerStore.addressBook.get`](#peerstoreaddressbookget)
@ -34,7 +34,9 @@
* [`pubsub.publish`](#pubsubpublish) * [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe) * [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe) * [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`connectionManager.get`](#connectionmanagerget)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue) * [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`connectionManager.size`](#connectionmanagersize)
* [`metrics.global`](#metricsglobal) * [`metrics.global`](#metricsglobal)
* [`metrics.peers`](#metricspeers) * [`metrics.peers`](#metricspeers)
* [`metrics.protocols`](#metricsprotocols) * [`metrics.protocols`](#metricsprotocols)
@ -42,6 +44,7 @@
* [`metrics.forProtocol`](#metricsforprotocol) * [`metrics.forProtocol`](#metricsforprotocol)
* [Events](#events) * [Events](#events)
* [`libp2p`](#libp2p) * [`libp2p`](#libp2p)
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
* [`libp2p.peerStore`](#libp2ppeerStore) * [`libp2p.peerStore`](#libp2ppeerStore)
* [Types](#types) * [Types](#types)
* [`Stats`](#stats) * [`Stats`](#stats)
@ -999,6 +1002,28 @@ const handler = (msg) => {
libp2p.pubsub.unsubscribe(topic, handler) libp2p.pubsub.unsubscribe(topic, handler)
``` ```
### connectionManager.get
Get a connection with a given peer, if it exists.
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| peerId | [`PeerId`][peer-id] | The peer to find |
#### Returns
| Type | Description |
|------|-------------|
| [`Connection`][connection] | Connection with the given peer |
#### Example
```js
libp2p.connectionManager.get(peerId)
```
### connectionManager.setPeerValue ### connectionManager.setPeerValue
Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits. Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0) libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
``` ```
### connectionManager.size
Getter for obtaining the current number of open connections.
#### Example
```js
libp2p.connectionManager.size
// 10
```
### metrics.global ### metrics.global
A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node. A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
@ -1126,21 +1162,23 @@ unless they are performing a specific action. See [peer discovery and auto dial]
- `peer`: instance of [`PeerId`][peer-id] - `peer`: instance of [`PeerId`][peer-id]
### libp2p.connectionManager
#### A new connection to a peer has been opened #### A new connection to a peer has been opened
This event will be triggered anytime a new Connection is established to another peer. This event will be triggered anytime a new Connection is established to another peer.
`libp2p.on('peer:connect', (peer) => {})` `libp2p.on('peer:connect', (connection) => {})`
- `peer`: instance of [`PeerId`][peer-id] - `connection`: instance of [`Connection`][connection]
#### An existing connection to a peer has been closed #### An existing connection to a peer has been closed
This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed. This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed.
`libp2p.on('peer:disconnect', (peer) => {})` `libp2p.on('peer:disconnect', (connection) => {})`
- `peer`: instance of [`PeerId`][peer-id] - `connection`: instance of [`Connection`][connection]
### libp2p.peerStore ### libp2p.peerStore

View File

@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
// Get the connection to the destination (stop) peer // Get the connection to the destination (stop) peer
const destinationPeer = new PeerId(request.dstPeer.id) const destinationPeer = new PeerId(request.dstPeer.id)
const destinationConnection = circuit._registrar.getConnection(destinationPeer) const destinationConnection = circuit._connectionManager.get(destinationPeer)
if (!destinationConnection && !circuit._options.hop.active) { if (!destinationConnection && !circuit._options.hop.active) {
log('HOP request received but we are not connected to the destination peer') log('HOP request received but we are not connected to the destination peer')
return streamHandler.end({ return streamHandler.end({

View File

@ -29,6 +29,7 @@ class Circuit {
constructor ({ libp2p, upgrader }) { constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer this._dialer = libp2p.dialer
this._registrar = libp2p.registrar this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader this._upgrader = upgrader
this._options = libp2p._config.relay this._options = libp2p._config.relay
this.addresses = libp2p.addresses this.addresses = libp2p.addresses
@ -107,7 +108,7 @@ class Circuit {
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId()) const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
let disconnectOnFailure = false let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(relayPeer) let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) { if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options) relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true disconnectOnFailure = true

View File

@ -6,6 +6,11 @@ const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager') const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer') const retimer = require('retimer')
const { EventEmitter } = require('events')
const PeerId = require('peer-id')
const { Connection } = require('libp2p-interfaces/src/connection')
const { const {
ERR_INVALID_PARAMETERS ERR_INVALID_PARAMETERS
} = require('../errors') } = require('../errors')
@ -22,7 +27,12 @@ const defaultOptions = {
defaultPeerValue: 1 defaultPeerValue: 1
} }
class ConnectionManager { /**
* Responsible for managing known connections.
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
* @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected.
*/
class ConnectionManager extends EventEmitter {
/** /**
* @constructor * @constructor
* @param {Libp2p} libp2p * @param {Libp2p} libp2p
@ -38,9 +48,11 @@ class ConnectionManager {
* @param {Number} options.defaultPeerValue The value of the peer. Default=1 * @param {Number} options.defaultPeerValue The value of the peer. Default=1
*/ */
constructor (libp2p, options) { constructor (libp2p, options) {
super()
this._libp2p = libp2p this._libp2p = libp2p
this._registrar = libp2p.registrar
this._peerId = libp2p.peerId.toB58String() this._peerId = libp2p.peerId.toB58String()
this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options) this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
if (this._options.maxConnections < this._options.minConnections) { if (this._options.maxConnections < this._options.minConnections) {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS) throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
@ -48,20 +60,38 @@ class ConnectionManager {
debug('options: %j', this._options) debug('options: %j', this._options)
this._metrics = libp2p.metrics this._libp2p = libp2p
/**
* Map of peer identifiers to their peer value for pruning connections.
* @type {Map<string, number>}
*/
this._peerValues = new Map() this._peerValues = new Map()
this._connections = new Map()
/**
* Map of connections per peer
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()
this._timer = null this._timer = null
this._checkMetrics = this._checkMetrics.bind(this) this._checkMetrics = this._checkMetrics.bind(this)
} }
/**
* Get current number of open connections.
*/
get size () {
return Array.from(this.connections.values())
.reduce((accumulator, value) => accumulator + value.length, 0)
}
/** /**
* Starts the Connection Manager. If Metrics are not enabled on libp2p * Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored. * only event loop and connection limits will be monitored.
*/ */
start () { start () {
if (this._metrics) { if (this._libp2p.metrics) {
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval) this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
} }
@ -77,13 +107,33 @@ class ConnectionManager {
/** /**
* Stops the Connection Manager * Stops the Connection Manager
* @async
*/ */
stop () { async stop () {
this._timer && this._timer.clear() this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure) this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
await this._close()
debug('stopped') debug('stopped')
} }
/**
* Cleans up the connections
* @async
*/
async _close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}
await tasks
this.connections.clear()
}
/** /**
* Sets the value of the given peer. Peers with lower values * Sets the value of the given peer. Peers with lower values
* will be disconnected first. * will be disconnected first.
@ -106,7 +156,7 @@ class ConnectionManager {
* @private * @private
*/ */
_checkMetrics () { _checkMetrics () {
const movingAverages = this._metrics.global.movingAverages const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received) this._checkLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
@ -122,12 +172,25 @@ class ConnectionManager {
* @param {Connection} connection * @param {Connection} connection
*/ */
onConnect (connection) { onConnect (connection) {
if (!Connection.isConnection(connection)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}
const peerId = connection.remotePeer.toB58String() const peerId = connection.remotePeer.toB58String()
this._connections.set(connection.id, connection) const storedConn = this.connections.get(peerId)
if (storedConn) {
storedConn.push(connection)
} else {
this.connections.set(peerId, [connection])
this.emit('peer:connect', connection)
}
if (!this._peerValues.has(peerId)) { if (!this._peerValues.has(peerId)) {
this._peerValues.set(peerId, this._options.defaultPeerValue) this._peerValues.set(peerId, this._options.defaultPeerValue)
} }
this._checkLimit('maxConnections', this._connections.size)
this._checkLimit('maxConnections', this.size)
} }
/** /**
@ -135,8 +198,37 @@ class ConnectionManager {
* @param {Connection} connection * @param {Connection} connection
*/ */
onDisconnect (connection) { onDisconnect (connection) {
this._connections.delete(connection.id) const peerId = connection.remotePeer.toB58String()
this._peerValues.delete(connection.remotePeer.toB58String()) let storedConn = this.connections.get(peerId)
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn) {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
}
}
/**
* Get a connection with a peer.
* @param {PeerId} peerId
* @returns {Connection}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
const connections = this.connections.get(id)
// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
} }
/** /**
@ -169,7 +261,7 @@ class ConnectionManager {
* @private * @private
*/ */
_maybeDisconnectOne () { _maybeDisconnectOne () {
if (this._options.minConnections < this._connections.size) { if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue) const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues) debug('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0] const disconnectPeer = peerValues[0]
@ -177,9 +269,9 @@ class ConnectionManager {
const peerId = disconnectPeer[0] const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId) debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId) debug('%s: closing a connection to %j', this._peerId, peerId)
for (const connection of this._connections.values()) { for (const connections of this.connections.values()) {
if (connection.remotePeer.toB58String() === peerId) { if (connections[0].remotePeer.toB58String() === peerId) {
connection.close() connections[0].close()
break break
} }
} }

View File

@ -46,16 +46,28 @@ class IdentifyService {
/** /**
* @constructor * @constructor
* @param {object} options * @param {object} options
* @param {Registrar} options.registrar * @param {PeerStore} options.peerStore
* @param {ConnectionManager} options.connectionManager
* @param {Map<string, handler>} options.protocols A reference to the protocols we support * @param {Map<string, handler>} options.protocols A reference to the protocols we support
* @param {PeerId} options.peerId The peer running the identify service * @param {PeerId} options.peerId The peer running the identify service
* @param {{ listen: Array<Multiaddr>}} options.addresses The peer addresses * @param {{ listen: Array<Multiaddr>}} options.addresses The peer addresses
*/ */
constructor (options) { constructor (options) {
/** /**
* @property {Registrar} * @property {PeerStore}
*/ */
this.registrar = options.registrar this.peerStore = options.peerStore
/**
* @property {ConnectionManager}
*/
this.connectionManager = options.connectionManager
this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer
this.identify(connection, peerId).catch(log.error)
})
/** /**
* @property {PeerId} * @property {PeerId}
*/ */
@ -104,7 +116,7 @@ class IdentifyService {
const connections = [] const connections = []
let connection let connection
for (const peer of peerStore.peers.values()) { for (const peer of peerStore.peers.values()) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer.id))) { if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
connections.push(connection) connections.push(connection)
} }
} }
@ -160,8 +172,8 @@ class IdentifyService {
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr) observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
// Update peers data in PeerStore // Update peers data in PeerStore
this.registrar.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr))) this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
this.registrar.peerStore.protoBook.set(id, protocols) this.peerStore.protoBook.set(id, protocols)
// TODO: Track our observed address so that we can score it // TODO: Track our observed address so that we can score it
log('received observed address of %s', observedAddr) log('received observed address of %s', observedAddr)
@ -245,13 +257,13 @@ class IdentifyService {
// Update peers data in PeerStore // Update peers data in PeerStore
const id = connection.remotePeer const id = connection.remotePeer
try { try {
this.registrar.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr))) this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
} catch (err) { } catch (err) {
return log.error('received invalid listen addrs', err) return log.error('received invalid listen addrs', err)
} }
// Update the protocols // Update the protocols
this.registrar.peerStore.protoBook.set(id, message.protocols) this.peerStore.protoBook.set(id, message.protocols)
} }
} }

View File

@ -54,55 +54,40 @@ class Libp2p extends EventEmitter {
this._transport = [] // Transport instances/references this._transport = [] // Transport instances/references
this._discovery = new Map() // Discovery service instances/references this._discovery = new Map() // Discovery service instances/references
// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Create Metrics
if (this._options.metrics.enabled) { if (this._options.metrics.enabled) {
this.metrics = new Metrics(this._options.metrics) this.metrics = new Metrics({
...this._options.metrics,
connectionManager: this.connectionManager
})
} }
// Setup the Upgrader // Setup the Upgrader
this.upgrader = new Upgrader({ this.upgrader = new Upgrader({
localPeer: this.peerId, localPeer: this.peerId,
metrics: this.metrics, metrics: this.metrics,
onConnection: (connection) => { onConnection: (connection) => this.connectionManager.onConnect(connection),
const peerId = connection.remotePeer onConnectionEnd: (connection) => this.connectionManager.onDisconnect(connection)
this.registrar.onConnect(peerId, connection)
this.connectionManager.onConnect(connection)
this.emit('peer:connect', peerId)
// Run identify for every connection
if (this.identifyService) {
this.identifyService.identify(connection, peerId)
.catch(log.error)
}
},
onConnectionEnd: (connection) => {
const peerId = connection.remotePeer
this.registrar.onDisconnect(peerId, connection)
this.connectionManager.onDisconnect(connection)
// If there are no connections to the peer, disconnect
if (!this.registrar.getConnection(peerId)) {
this.emit('peer:disconnect', peerId)
this.metrics && this.metrics.onPeerDisconnected(peerId)
}
}
}) })
// Create the Registrar
this.registrar = new Registrar({ peerStore: this.peerStore })
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle
// Create the Connection Manager
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
// Setup the transport manager // Setup the transport manager
this.transportManager = new TransportManager({ this.transportManager = new TransportManager({
libp2p: this, libp2p: this,
upgrader: this.upgrader upgrader: this.upgrader
}) })
// Create the Registrar
this.registrar = new Registrar({
peerStore: this.peerStore,
connectionManager: this.connectionManager
})
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle
// Attach crypto channels // Attach crypto channels
if (this._modules.connEncryption) { if (this._modules.connEncryption) {
const cryptos = this._modules.connEncryption const cryptos = this._modules.connEncryption
@ -138,7 +123,8 @@ class Libp2p extends EventEmitter {
// Add the identify service since we can multiplex // Add the identify service since we can multiplex
this.identifyService = new IdentifyService({ this.identifyService = new IdentifyService({
registrar: this.registrar, peerStore: this.peerStore,
connectionManager: this.connectionManager,
peerId: this.peerId, peerId: this.peerId,
addresses: this.addresses, addresses: this.addresses,
protocols: this.upgrader.protocols protocols: this.upgrader.protocols
@ -242,7 +228,6 @@ class Libp2p extends EventEmitter {
]) ])
await this.transportManager.close() await this.transportManager.close()
await this.registrar.close()
ping.unmount(this) ping.unmount(this)
this.dialer.destroy() this.dialer.destroy()
@ -294,7 +279,7 @@ class Libp2p extends EventEmitter {
*/ */
async dialProtocol (peer, protocols, options) { async dialProtocol (peer, protocols, options) {
const { id, multiaddrs } = getPeer(peer, this.peerStore) const { id, multiaddrs } = getPeer(peer, this.peerStore)
let connection = this.registrar.getConnection(id) let connection = this.connectionManager.get(id)
if (!connection) { if (!connection) {
connection = await this.dialer.connectToPeer(peer, options) connection = await this.dialer.connectToPeer(peer, options)
@ -318,7 +303,7 @@ class Libp2p extends EventEmitter {
async hangUp (peer) { async hangUp (peer) {
const { id } = getPeer(peer) const { id } = getPeer(peer)
const connections = this.registrar.connections.get(id.toB58String()) const connections = this.connectionManager.connections.get(id.toB58String())
if (!connections) { if (!connections) {
return return
@ -452,9 +437,9 @@ class Libp2p extends EventEmitter {
*/ */
async _maybeConnect (peerId) { async _maybeConnect (peerId) {
// If auto dialing is on and we have no connection to the peer, check if we should dial // If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerId)) { if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) {
const minPeers = this._options.connectionManager.minPeers || 0 const minPeers = this._options.connectionManager.minPeers || 0
if (minPeers > this.connectionManager._connections.size) { if (minPeers > this.connectionManager.size) {
log('connecting to discovered peer %s', peerId.toB58String()) log('connecting to discovered peer %s', peerId.toB58String())
try { try {
await this.dialer.connectToPeer(peerId) await this.dialer.connectToPeer(peerId)

View File

@ -21,6 +21,7 @@ class Metrics {
/** /**
* *
* @param {object} options * @param {object} options
* @param {ConnectionManager} options.connectionManager
* @param {number} options.computeThrottleMaxQueueSize * @param {number} options.computeThrottleMaxQueueSize
* @param {number} options.computeThrottleTimeout * @param {number} options.computeThrottleTimeout
* @param {Array<number>} options.movingAverageIntervals * @param {Array<number>} options.movingAverageIntervals
@ -34,6 +35,10 @@ class Metrics {
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention) this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
this._running = false this._running = false
this._onMessage = this._onMessage.bind(this) this._onMessage = this._onMessage.bind(this)
this._connectionManager = options.connectionManager
this._connectionManager.on('peer:disconnect', (connection) => {
this.onPeerDisconnected(connection.remotePeer)
})
} }
/** /**

View File

@ -5,13 +5,10 @@ const errcode = require('err-code')
const log = debug('libp2p:peer-store') const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error') log.error = debug('libp2p:peer-store:error')
const PeerId = require('peer-id')
const { const {
ERR_INVALID_PARAMETERS ERR_INVALID_PARAMETERS
} = require('./errors') } = require('./errors')
const Topology = require('libp2p-interfaces/src/topology') const Topology = require('libp2p-interfaces/src/topology')
const { Connection } = require('libp2p-interfaces/src/connection')
/** /**
* Responsible for notifying registered protocols of events in the network. * Responsible for notifying registered protocols of events in the network.
@ -20,18 +17,14 @@ class Registrar {
/** /**
* @param {Object} props * @param {Object} props
* @param {PeerStore} props.peerStore * @param {PeerStore} props.peerStore
* @param {connectionManager} props.connectionManager
* @constructor * @constructor
*/ */
constructor ({ peerStore }) { constructor ({ peerStore, connectionManager }) {
// Used on topology to listen for protocol changes // Used on topology to listen for protocol changes
this.peerStore = peerStore this.peerStore = peerStore
/** this.connectionManager = connectionManager
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()
/** /**
* Map of topologies * Map of topologies
@ -41,6 +34,9 @@ class Registrar {
this.topologies = new Map() this.topologies = new Map()
this._handle = undefined this._handle = undefined
this._onDisconnect = this._onDisconnect.bind(this)
this.connectionManager.on('peer:disconnect', this._onDisconnect)
} }
get handle () { get handle () {
@ -51,93 +47,13 @@ class Registrar {
this._handle = handle this._handle = handle
} }
/**
* Cleans up the registrar
* @async
*/
async close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}
await tasks
this.connections.clear()
}
/**
* Add a new connected peer to the record
* TODO: this should live in the ConnectionManager
* @param {PeerId} peerId
* @param {Connection} conn
* @returns {void}
*/
onConnect (peerId, conn) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
if (!Connection.isConnection(conn)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
const storedConn = this.connections.get(id)
if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
}
/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerId} peerId
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerId, connection, error) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const id = peerId.toB58String()
let storedConn = this.connections.get(id)
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(id, storedConn)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerId, error)
}
this.connections.delete(id)
}
}
/** /**
* Get a connection with a peer. * Get a connection with a peer.
* @param {PeerId} peerId * @param {PeerId} peerId
* @returns {Connection} * @returns {Connection}
*/ */
getConnection (peerId) { getConnection (peerId) {
if (!PeerId.isPeerId(peerId)) { return this.connectionManager.get(peerId)
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}
const connections = this.connections.get(peerId.toB58String())
// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
} }
/** /**
@ -169,6 +85,18 @@ class Registrar {
unregister (id) { unregister (id) {
return this.topologies.delete(id) return this.topologies.delete(id)
} }
/**
* Remove a disconnected peer from the record
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
_onDisconnect (connection, error) {
for (const [, topology] of this.topologies) {
topology.disconnect(connection.remotePeer, error)
}
}
} }
module.exports = Registrar module.exports = Registrar

View File

@ -0,0 +1,88 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai
const sinon = require('sinon')
const multiaddr = require('multiaddr')
const peerUtils = require('../utils/creators/peer')
const mockConnection = require('../utils/mockConnection')
const baseOptions = require('../utils/base-options.browser')
const listenMultiaddr = multiaddr('/ip4/127.0.0.1/tcp/15002/ws')
describe('Connection Manager', () => {
let libp2p
beforeEach(async () => {
[libp2p] = await peerUtils.createPeer({
config: {
addresses: {
listen: [listenMultiaddr]
},
modules: baseOptions.modules
}
})
})
afterEach(() => libp2p.stop())
it('should filter connections on disconnect, removing the closed one', async () => {
const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 })
const conn1 = await mockConnection({ localPeer, remotePeer })
const conn2 = await mockConnection({ localPeer, remotePeer })
const id = remotePeer.toB58String()
// Add connection to the connectionManager
libp2p.connectionManager.onConnect(conn1)
libp2p.connectionManager.onConnect(conn2)
expect(libp2p.connectionManager.connections.get(id).length).to.eql(2)
conn2._stat.status = 'closed'
libp2p.connectionManager.onDisconnect(conn2)
const peerConnections = libp2p.connectionManager.connections.get(id)
expect(peerConnections.length).to.eql(1)
expect(peerConnections[0]._stat.status).to.eql('open')
})
it('should add connection on dial and remove on node stop', async () => {
const [remoteLibp2p] = await peerUtils.createPeer({
config: {
addresses: {
listen: [multiaddr('/ip4/127.0.0.1/tcp/15003/ws')]
},
modules: baseOptions.modules
}
})
// Spy on emit for easy verification
sinon.spy(libp2p.connectionManager, 'emit')
sinon.spy(remoteLibp2p.connectionManager, 'emit')
libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.addresses.listen)
await libp2p.dial(remoteLibp2p.peerId)
// check connect event
expect(libp2p.connectionManager.emit.callCount).to.equal(1)
const [event, connection] = libp2p.connectionManager.emit.getCall(0).args
expect(event).to.equal('peer:connect')
expect(connection.remotePeer.isEqual(remoteLibp2p.peerId)).to.equal(true)
const libp2pConn = libp2p.connectionManager.get(remoteLibp2p.peerId)
expect(libp2pConn).to.exist()
const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId)
expect(remoteConn).to.exist()
await remoteLibp2p.stop()
expect(remoteLibp2p.connectionManager.size).to.eql(0)
})
})

View File

@ -7,7 +7,7 @@ chai.use(require('chai-as-promised'))
const { expect } = chai const { expect } = chai
const sinon = require('sinon') const sinon = require('sinon')
const { createPeer } = require('../utils/creators/peer') const peerUtils = require('../utils/creators/peer')
const mockConnection = require('../utils/mockConnection') const mockConnection = require('../utils/mockConnection')
const baseOptions = require('../utils/base-options.browser') const baseOptions = require('../utils/base-options.browser')
@ -20,7 +20,7 @@ describe('Connection Manager', () => {
}) })
it('should be able to create without metrics', async () => { it('should be able to create without metrics', async () => {
[libp2p] = await createPeer({ [libp2p] = await peerUtils.createPeer({
config: { config: {
modules: baseOptions.modules modules: baseOptions.modules
}, },
@ -35,7 +35,7 @@ describe('Connection Manager', () => {
}) })
it('should be able to create with metrics', async () => { it('should be able to create with metrics', async () => {
[libp2p] = await createPeer({ [libp2p] = await peerUtils.createPeer({
config: { config: {
modules: baseOptions.modules, modules: baseOptions.modules,
metrics: { metrics: {
@ -49,12 +49,12 @@ describe('Connection Manager', () => {
await libp2p.start() await libp2p.start()
expect(spy).to.have.property('callCount', 1) expect(spy).to.have.property('callCount', 1)
expect(libp2p.connectionManager._metrics).to.exist() expect(libp2p.connectionManager._libp2p.metrics).to.exist()
}) })
it('should close lowest value peer connection when the maximum has been reached', async () => { it('should close lowest value peer connection when the maximum has been reached', async () => {
const max = 5 const max = 5
;[libp2p] = await createPeer({ ;[libp2p] = await peerUtils.createPeer({
config: { config: {
modules: baseOptions.modules, modules: baseOptions.modules,
connectionManager: { connectionManager: {
@ -92,7 +92,7 @@ describe('Connection Manager', () => {
it('should close connection when the maximum has been reached even without peer values', async () => { it('should close connection when the maximum has been reached even without peer values', async () => {
const max = 5 const max = 5
;[libp2p] = await createPeer({ ;[libp2p] = await peerUtils.createPeer({
config: { config: {
modules: baseOptions.modules, modules: baseOptions.modules,
connectionManager: { connectionManager: {
@ -110,7 +110,7 @@ describe('Connection Manager', () => {
const spy = sinon.spy() const spy = sinon.spy()
await Promise.all([...new Array(max + 1)].map(async () => { await Promise.all([...new Array(max + 1)].map(async () => {
const connection = await mockConnection() const connection = await mockConnection()
sinon.stub(connection, 'close').callsFake(() => spy()) sinon.stub(connection, 'close').callsFake(() => spy()) // eslint-disable-line
libp2p.connectionManager.onConnect(connection) libp2p.connectionManager.onConnect(connection)
})) }))
@ -119,7 +119,7 @@ describe('Connection Manager', () => {
}) })
it('should fail if the connection manager has mismatched connection limit options', async () => { it('should fail if the connection manager has mismatched connection limit options', async () => {
await expect(createPeer({ await expect(peerUtils.createPeer({
config: { config: {
modules: baseOptions.modules, modules: baseOptions.modules,
connectionManager: { connectionManager: {

View File

@ -374,8 +374,8 @@ describe('Dialing (direct, TCP)', () => {
} }
// 1 connection, because we know the peer in the multiaddr // 1 connection, because we know the peer in the multiaddr
expect(libp2p.connectionManager._connections.size).to.equal(1) expect(libp2p.connectionManager.size).to.equal(1)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(1) expect(remoteLibp2p.connectionManager.size).to.equal(1)
}) })
it('should coalesce parallel dials to the same error on failure', async () => { it('should coalesce parallel dials to the same error on failure', async () => {
@ -409,8 +409,8 @@ describe('Dialing (direct, TCP)', () => {
} }
// 1 connection, because we know the peer in the multiaddr // 1 connection, because we know the peer in the multiaddr
expect(libp2p.connectionManager._connections.size).to.equal(0) expect(libp2p.connectionManager.size).to.equal(0)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(0) expect(remoteLibp2p.connectionManager.size).to.equal(0)
}) })
}) })
}) })

View File

@ -121,7 +121,7 @@ describe('Dialing (via relay, TCP)', () => {
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
// We should not be connected to the relay, because we weren't before the dial // We should not be connected to the relay, because we weren't before the dial
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerId) const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId)
expect(srcToRelayConn).to.not.exist() expect(srcToRelayConn).to.not.exist()
}) })
@ -138,7 +138,7 @@ describe('Dialing (via relay, TCP)', () => {
.to.eventually.be.rejectedWith(AggregateError) .to.eventually.be.rejectedWith(AggregateError)
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerId) const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId)
expect(srcToRelayConn).to.exist() expect(srcToRelayConn).to.exist()
expect(srcToRelayConn.stat.status).to.equal('open') expect(srcToRelayConn.stat.status).to.equal('open')
}) })
@ -164,7 +164,7 @@ describe('Dialing (via relay, TCP)', () => {
.to.eventually.be.rejectedWith(AggregateError) .to.eventually.be.rejectedWith(AggregateError)
.and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerId) const dstToRelayConn = dstLibp2p.connectionManager.get(relayLibp2p.peerId)
expect(dstToRelayConn).to.exist() expect(dstToRelayConn).to.exist()
expect(dstToRelayConn.stat.status).to.equal('open') expect(dstToRelayConn.stat.status).to.equal('open')
}) })

View File

@ -7,6 +7,7 @@ chai.use(require('chai-as-promised'))
const { expect } = chai const { expect } = chai
const sinon = require('sinon') const sinon = require('sinon')
const { EventEmitter } = require('events')
const delay = require('delay') const delay = require('delay')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const duplexPair = require('it-pair/duplex') const duplexPair = require('it-pair/duplex')
@ -48,14 +49,13 @@ describe('Identify', () => {
listen: [] listen: []
}, },
protocols, protocols,
registrar: { connectionManager: new EventEmitter(),
peerStore: { peerStore: {
addressBook: { addressBook: {
set: () => { } set: () => { }
}, },
protoBook: { protoBook: {
set: () => { } set: () => { }
}
} }
} }
}) })
@ -64,7 +64,8 @@ describe('Identify', () => {
addresses: { addresses: {
listen: [] listen: []
}, },
protocols protocols,
connectionManager: new EventEmitter()
}) })
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
@ -74,8 +75,8 @@ describe('Identify', () => {
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY })
sinon.spy(localIdentify.registrar.peerStore.addressBook, 'set') sinon.spy(localIdentify.peerStore.addressBook, 'set')
sinon.spy(localIdentify.registrar.peerStore.protoBook, 'set') sinon.spy(localIdentify.peerStore.protoBook, 'set')
// Run identify // Run identify
await Promise.all([ await Promise.all([
@ -87,10 +88,10 @@ describe('Identify', () => {
}) })
]) ])
expect(localIdentify.registrar.peerStore.addressBook.set.callCount).to.equal(1) expect(localIdentify.peerStore.addressBook.set.callCount).to.equal(1)
expect(localIdentify.registrar.peerStore.protoBook.set.callCount).to.equal(1) expect(localIdentify.peerStore.protoBook.set.callCount).to.equal(1)
// Validate the remote peer gets updated in the peer store // Validate the remote peer gets updated in the peer store
const call = localIdentify.registrar.peerStore.addressBook.set.firstCall const call = localIdentify.peerStore.addressBook.set.firstCall
expect(call.args[0].id.bytes).to.equal(remotePeer.bytes) expect(call.args[0].id.bytes).to.equal(remotePeer.bytes)
}) })
@ -101,14 +102,13 @@ describe('Identify', () => {
listen: [] listen: []
}, },
protocols, protocols,
registrar: { connectionManager: new EventEmitter(),
peerStore: { peerStore: {
addressBook: { addressBook: {
set: () => { } set: () => { }
}, },
protoBook: { protoBook: {
set: () => { } set: () => { }
}
} }
} }
}) })
@ -117,7 +117,8 @@ describe('Identify', () => {
addresses: { addresses: {
listen: [] listen: []
}, },
protocols protocols,
connectionManager: new EventEmitter()
}) })
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
@ -145,12 +146,15 @@ describe('Identify', () => {
describe('push', () => { describe('push', () => {
it('should be able to push identify updates to another peer', async () => { it('should be able to push identify updates to another peer', async () => {
const listeningAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') const listeningAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
const connectionManager = new EventEmitter()
connectionManager.getConnection = () => {}
const localIdentify = new IdentifyService({ const localIdentify = new IdentifyService({
peerId: localPeer, peerId: localPeer,
addresses: { addresses: {
listen: [listeningAddr] listen: [listeningAddr]
}, },
registrar: { getConnection: () => {} }, connectionManager,
protocols: new Map([ protocols: new Map([
[multicodecs.IDENTIFY], [multicodecs.IDENTIFY],
[multicodecs.IDENTIFY_PUSH], [multicodecs.IDENTIFY_PUSH],
@ -162,14 +166,13 @@ describe('Identify', () => {
addresses: { addresses: {
listen: [] listen: []
}, },
registrar: { connectionManager,
peerStore: { peerStore: {
addressBook: { addressBook: {
set: () => {} set: () => { }
}, },
protoBook: { protoBook: {
set: () => { } set: () => { }
}
} }
} }
}) })
@ -182,8 +185,8 @@ describe('Identify', () => {
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_PUSH }) sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_PUSH })
sinon.spy(remoteIdentify.registrar.peerStore.addressBook, 'set') sinon.spy(remoteIdentify.peerStore.addressBook, 'set')
sinon.spy(remoteIdentify.registrar.peerStore.protoBook, 'set') sinon.spy(remoteIdentify.peerStore.protoBook, 'set')
// Run identify // Run identify
await Promise.all([ await Promise.all([
@ -195,12 +198,12 @@ describe('Identify', () => {
}) })
]) ])
expect(remoteIdentify.registrar.peerStore.addressBook.set.callCount).to.equal(1) expect(remoteIdentify.peerStore.addressBook.set.callCount).to.equal(1)
expect(remoteIdentify.registrar.peerStore.protoBook.set.callCount).to.equal(1) expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)
const [peerId, multiaddrs] = remoteIdentify.registrar.peerStore.addressBook.set.firstCall.args const [peerId, multiaddrs] = remoteIdentify.peerStore.addressBook.set.firstCall.args
expect(peerId.bytes).to.eql(localPeer.bytes) expect(peerId.bytes).to.eql(localPeer.bytes)
expect(multiaddrs).to.eql([listeningAddr]) expect(multiaddrs).to.eql([listeningAddr])
const [peerId2, protocols] = remoteIdentify.registrar.peerStore.protoBook.set.firstCall.args const [peerId2, protocols] = remoteIdentify.peerStore.protoBook.set.firstCall.args
expect(peerId2.bytes).to.eql(localPeer.bytes) expect(peerId2.bytes).to.eql(localPeer.bytes)
expect(protocols).to.eql(Array.from(localProtocols)) expect(protocols).to.eql(Array.from(localProtocols))
}) })

View File

@ -7,6 +7,8 @@ chai.use(require('chai-as-promised'))
const { expect } = chai const { expect } = chai
const sinon = require('sinon') const sinon = require('sinon')
const { EventEmitter } = require('events')
const { randomBytes } = require('libp2p-crypto') const { randomBytes } = require('libp2p-crypto')
const duplexPair = require('it-pair/duplex') const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe') const pipe = require('it-pipe')
@ -35,7 +37,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
const metrics = new Metrics({ const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000] movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
}) })
metrics.trackStream({ metrics.trackStream({
@ -70,7 +73,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
const metrics = new Metrics({ const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000] movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
}) })
metrics.trackStream({ metrics.trackStream({
@ -118,7 +122,8 @@ describe('Metrics', () => {
const [local2, remote2] = duplexPair() const [local2, remote2] = duplexPair()
const metrics = new Metrics({ const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000] movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
}) })
const protocol = '/echo/1.0.0' const protocol = '/echo/1.0.0'
metrics.start() metrics.start()
@ -173,7 +178,8 @@ describe('Metrics', () => {
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
const metrics = new Metrics({ const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000] movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
}) })
metrics.start() metrics.start()
@ -228,7 +234,8 @@ describe('Metrics', () => {
})) }))
const metrics = new Metrics({ const metrics = new Metrics({
maxOldPeersRetention: 5 // Only keep track of 5 maxOldPeersRetention: 5, // Only keep track of 5
connectionManager: new EventEmitter()
}) })
// Clone so trackedPeers isn't modified // Clone so trackedPeers isn't modified

View File

@ -1,72 +0,0 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')
const mergeOptions = require('merge-options')
const multiaddr = require('multiaddr')
const Libp2p = require('../../src')
const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
describe('registrar on dial', () => {
let peerId
let remotePeerId
let libp2p
let remoteLibp2p
let remoteAddr
before(async () => {
[peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 })
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerId: remotePeerId
}))
await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
})
after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})
it('should inform registrar of a new connection', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerId
}))
sinon.spy(remoteLibp2p.registrar, 'onConnect')
await libp2p.dial(remoteAddr)
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1)
const libp2pConn = libp2p.registrar.getConnection(remotePeerId)
expect(libp2pConn).to.exist()
const remoteConn = remoteLibp2p.registrar.getConnection(peerId)
expect(remoteConn).to.exist()
})
it('should be closed on libp2p stop', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerId
}))
await libp2p.dial(remoteAddr)
expect(libp2p.connections.size).to.equal(1)
sinon.spy(libp2p.registrar, 'close')
await libp2p.stop()
expect(libp2p.registrar.close.callCount).to.equal(1)
expect(libp2p.connections.size).to.equal(0)
})
})

View File

@ -6,21 +6,26 @@ chai.use(require('dirty-chai'))
const { expect } = chai const { expect } = chai
const pDefer = require('p-defer') const pDefer = require('p-defer')
const { EventEmitter } = require('events')
const Topology = require('libp2p-interfaces/src/topology/multicodec-topology') const Topology = require('libp2p-interfaces/src/topology/multicodec-topology')
const PeerStore = require('../../src/peer-store') const PeerStore = require('../../src/peer-store')
const Registrar = require('../../src/registrar') const Registrar = require('../../src/registrar')
const { createMockConnection } = require('./utils')
const createMockConnection = require('../utils/mockConnection')
const peerUtils = require('../utils/creators/peer') const peerUtils = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options.browser')
const multicodec = '/test/1.0.0' const multicodec = '/test/1.0.0'
describe('registrar', () => { describe('registrar', () => {
let peerStore, registrar let peerStore
let registrar
describe('errors', () => { describe('errors', () => {
beforeEach(() => { beforeEach(() => {
peerStore = new PeerStore() peerStore = new PeerStore()
registrar = new Registrar({ peerStore }) registrar = new Registrar({ peerStore, connectionManager: new EventEmitter() })
}) })
it('should fail to register a protocol if no multicodec is provided', () => { it('should fail to register a protocol if no multicodec is provided', () => {
@ -36,11 +41,19 @@ describe('registrar', () => {
}) })
describe('registration', () => { describe('registration', () => {
beforeEach(() => { let libp2p
peerStore = new PeerStore()
registrar = new Registrar({ peerStore }) beforeEach(async () => {
[libp2p] = await peerUtils.createPeer({
config: {
modules: baseOptions.modules
},
started: false
})
}) })
afterEach(() => libp2p.stop())
it('should be able to register a protocol', () => { it('should be able to register a protocol', () => {
const topologyProps = new Topology({ const topologyProps = new Topology({
multicodecs: multicodec, multicodecs: multicodec,
@ -50,7 +63,7 @@ describe('registrar', () => {
} }
}) })
const identifier = registrar.register(topologyProps) const identifier = libp2p.registrar.register(topologyProps)
expect(identifier).to.exist() expect(identifier).to.exist()
}) })
@ -64,14 +77,14 @@ describe('registrar', () => {
} }
}) })
const identifier = registrar.register(topologyProps) const identifier = libp2p.registrar.register(topologyProps)
const success = registrar.unregister(identifier) const success = libp2p.registrar.unregister(identifier)
expect(success).to.eql(true) expect(success).to.eql(true)
}) })
it('should fail to unregister if no register was made', () => { it('should fail to unregister if no register was made', () => {
const success = registrar.unregister('bad-identifier') const success = libp2p.registrar.unregister('bad-identifier')
expect(success).to.eql(false) expect(success).to.eql(false)
}) })
@ -85,10 +98,10 @@ describe('registrar', () => {
const remotePeerId = conn.remotePeer const remotePeerId = conn.remotePeer
// Add connected peer with protocol to peerStore and registrar // Add connected peer with protocol to peerStore and registrar
peerStore.protoBook.add(remotePeerId, [multicodec]) libp2p.peerStore.protoBook.add(remotePeerId, [multicodec])
registrar.onConnect(remotePeerId, conn) libp2p.connectionManager.onConnect(conn)
expect(registrar.connections.size).to.eql(1) expect(libp2p.connectionManager.size).to.eql(1)
const topologyProps = new Topology({ const topologyProps = new Topology({
multicodecs: multicodec, multicodecs: multicodec,
@ -108,14 +121,16 @@ describe('registrar', () => {
}) })
// Register protocol // Register protocol
const identifier = registrar.register(topologyProps) const identifier = libp2p.registrar.register(topologyProps)
const topology = registrar.topologies.get(identifier) const topology = libp2p.registrar.topologies.get(identifier)
// Topology created // Topology created
expect(topology).to.exist() expect(topology).to.exist()
registrar.onDisconnect(remotePeerId) await conn.close()
expect(registrar.connections.size).to.eql(0)
libp2p.connectionManager.onDisconnect(conn)
expect(libp2p.connectionManager.size).to.eql(0)
// Wait for handlers to be called // Wait for handlers to be called
return Promise.all([ return Promise.all([
@ -141,68 +156,30 @@ describe('registrar', () => {
}) })
// Register protocol // Register protocol
const identifier = registrar.register(topologyProps) const identifier = libp2p.registrar.register(topologyProps)
const topology = registrar.topologies.get(identifier) const topology = libp2p.registrar.topologies.get(identifier)
// Topology created // Topology created
expect(topology).to.exist() expect(topology).to.exist()
expect(registrar.connections.size).to.eql(0) expect(libp2p.connectionManager.size).to.eql(0)
// Setup connections before registrar // Setup connections before registrar
const conn = await createMockConnection() const conn = await createMockConnection()
const remotePeerId = conn.remotePeer const remotePeerId = conn.remotePeer
// Add connected peer to peerStore and registrar // Add connected peer to peerStore and registrar
peerStore.protoBook.set(remotePeerId, []) libp2p.peerStore.protoBook.set(remotePeerId, [])
registrar.onConnect(remotePeerId, conn) libp2p.connectionManager.onConnect(conn)
// Add protocol to peer and update it // Add protocol to peer and update it
peerStore.protoBook.add(remotePeerId, [multicodec]) libp2p.peerStore.protoBook.add(remotePeerId, [multicodec])
await onConnectDefer.promise await onConnectDefer.promise
// Remove protocol to peer and update it // Remove protocol to peer and update it
peerStore.protoBook.set(remotePeerId, []) libp2p.peerStore.protoBook.set(remotePeerId, [])
await onDisconnectDefer.promise await onDisconnectDefer.promise
}) })
it('should filter connections on disconnect, removing the closed one', async () => {
const onDisconnectDefer = pDefer()
const topologyProps = new Topology({
multicodecs: multicodec,
handlers: {
onConnect: () => {},
onDisconnect: () => {
onDisconnectDefer.resolve()
}
}
})
// Register protocol
registrar.register(topologyProps)
// Setup connections before registrar
const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 })
const conn1 = await createMockConnection({ localPeer, remotePeer })
const conn2 = await createMockConnection({ localPeer, remotePeer })
const id = remotePeer.toB58String()
// Add connection to registrar
registrar.onConnect(remotePeer, conn1)
registrar.onConnect(remotePeer, conn2)
expect(registrar.connections.get(id).length).to.eql(2)
conn2._stat.status = 'closed'
registrar.onDisconnect(remotePeer, conn2)
const peerConnections = registrar.connections.get(id)
expect(peerConnections.length).to.eql(1)
expect(peerConnections[0]._stat.status).to.eql('open')
})
}) })
}) })

View File

@ -1,51 +0,0 @@
'use strict'
const { Connection } = require('libp2p-interfaces/src/connection')
const multiaddr = require('multiaddr')
const pair = require('it-pair')
const peerUtils = require('../utils/creators/peer')
module.exports.createMockConnection = async (properties = {}) => {
const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080')
const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081')
const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 })
const openStreams = []
let streamId = 0
return new Connection({
localPeer: localPeer,
remotePeer: remotePeer,
localAddr,
remoteAddr,
stat: {
timeline: {
open: Date.now() - 10,
upgraded: Date.now()
},
direction: 'outbound',
encryption: '/secio/1.0.0',
multiplexer: '/mplex/6.7.0',
status: 'open'
},
newStream: (protocols) => {
const id = streamId++
const stream = pair()
stream.close = () => stream.sink([])
stream.id = id
openStreams.push(stream)
return {
stream,
protocol: protocols[0]
}
},
close: () => { },
getStreams: () => openStreams,
...properties
})
}

View File

@ -421,24 +421,24 @@ describe('libp2p.upgrader', () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// Spy on emit for easy verification // Spy on emit for easy verification
sinon.spy(libp2p, 'emit') sinon.spy(libp2p.connectionManager, 'emit')
// Upgrade and check the connect event // Upgrade and check the connect event
const connections = await Promise.all([ const connections = await Promise.all([
libp2p.upgrader.upgradeOutbound(outbound), libp2p.upgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound) remoteUpgrader.upgradeInbound(inbound)
]) ])
expect(libp2p.emit.callCount).to.equal(1) expect(libp2p.connectionManager.emit.callCount).to.equal(1)
let [event, peerId] = libp2p.emit.getCall(0).args let [event, connection] = libp2p.connectionManager.emit.getCall(0).args
expect(event).to.equal('peer:connect') expect(event).to.equal('peer:connect')
expect(peerId.isEqual(remotePeer)).to.equal(true) expect(connection.remotePeer.isEqual(remotePeer)).to.equal(true)
// Close and check the disconnect event // Close and check the disconnect event
await Promise.all(connections.map(conn => conn.close())) await Promise.all(connections.map(conn => conn.close()))
expect(libp2p.emit.callCount).to.equal(2) expect(libp2p.connectionManager.emit.callCount).to.equal(2)
;([event, peerId] = libp2p.emit.getCall(1).args) ;([event, connection] = libp2p.connectionManager.emit.getCall(1).args)
expect(event).to.equal('peer:disconnect') expect(event).to.equal('peer:disconnect')
expect(peerId.isEqual(remotePeer)).to.equal(true) expect(connection.remotePeer.isEqual(remotePeer)).to.equal(true)
}) })
}) })