diff --git a/doc/API.md b/doc/API.md index c26a8c71..a4ae0830 100644 --- a/doc/API.md +++ b/doc/API.md @@ -46,6 +46,9 @@ * [`peerStore.delete`](#peerstoredelete) * [`peerStore.get`](#peerstoreget) * [`peerStore.peers`](#peerstorepeers) + * [`peerStore.tagPeer`](#peerstoretagpeer) + * [`peerStore.unTagPeer`](#peerstoreuntagpeer) + * [`peerStore.getTags`](#peerstoregettags) * [`pubsub.getSubscribers`](#pubsubgetsubscribers) * [`pubsub.getTopics`](#pubsubgettopics) * [`pubsub.publish`](#pubsubpublish) @@ -56,7 +59,6 @@ * [`pubsub.topicValidators.set`](#pubsubtopicvalidatorsset) * [`pubsub.topicValidators.delete`](#pubsubtopicvalidatorsdelete) * [`connectionManager.get`](#connectionmanagerget) - * [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue) * [`connectionManager.size`](#connectionmanagersize) * [`keychain.createKey`](#keychaincreatekey) * [`keychain.renameKey`](#keychainrenamekey) @@ -1399,6 +1401,81 @@ for (let [peerIdString, peer] of peerStore.peers.entries()) { } ``` +### peerStore.tagPeer + +Tags a peer with the specified tag and optional value/expiry time + +`peerStore.tagPeer(peerId, tag, options)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | `PeerId` | The peer to tag | +| tag | `string` | The name of the tag to add | +| options | `{ value?: number, ttl?: number }` | An optional value (1-100) and an optional ttl after which the tag will expire (ms) | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | Promise resolves once the tag is stored | + +#### Example + +```js +await peerStore.tagPeer(peerId, 'my-tag', { value: 100, ttl: Date.now() + 60000 }) +``` + +### peerStore.unTagPeer + +Remove the tag from the specified peer + +`peerStore.unTagPeer(peerId, tag)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | `PeerId` | The peer to untag | +| tag | `string` | The name of the tag to remove | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | Promise resolves once the tag has been removed | + +#### Example + +```js +await peerStore.unTagPeer(peerId, 'my-tag') +``` + +### peerStore.getTags + +Remove the tag from the specified peer + +`peerStore.getTags(peerId)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | `PeerId` | The peer to get the tags for | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise>` | The promise resolves to the list of tags for the passed peer | + +#### Example + +```js +await peerStore.getTags(peerId) +``` + ### pubsub.getSubscribers Gets a list of the peer-ids that are subscribed to one topic. @@ -1672,32 +1749,6 @@ Get a connection with a given peer, if it exists. libp2p.connectionManager.get(peerId) ``` -### connectionManager.setPeerValue - -Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits. - -`libp2p.connectionManager.setPeerValue(peerId, value)` - -#### Parameters - -| Name | Type | Description | -|------|------|-------------| -| peerId | [`PeerId`][peer-id] | The peer to set the value for | -| value | `number` | The value of the peer from 0 to 1 | - -#### Returns - -| Type | Description | -|------|-------------| -| `void` | | - -#### Example - -```js -libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1) -libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0) -``` - ### connectionManager.size Getter for obtaining the current number of open connections. diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 9ad4230f..e15c0816 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -556,7 +556,6 @@ const node = await createLibp2p({ maxConnections: Infinity, minConnections: 0, pollInterval: 2000, - defaultPeerValue: 1, // The below values will only be taken into account when Metrics are enabled maxData: Infinity, maxSentData: Infinity, diff --git a/doc/CONNECTION_MANAGER.md b/doc/CONNECTION_MANAGER.md index 8d0a3aaf..f25e340a 100644 --- a/doc/CONNECTION_MANAGER.md +++ b/doc/CONNECTION_MANAGER.md @@ -17,4 +17,3 @@ The following is a list of available options for setting limits for the Connecti - `maxEventLoopDelay`: sets the maximum event loop delay (measured in milliseconds) this node is willing to endure before it starts disconnecting peers. Defaults to `Infinity`. - `pollInterval`: sets the poll interval (in milliseconds) for assessing the current state and determining if this peer needs to force a disconnect. Defaults to `2000` (2 seconds). - `movingAverageInterval`: the interval used to calculate moving averages (in milliseconds). Defaults to `60000` (1 minute). This must be an available interval configured in `Metrics` -- `defaultPeerValue`: number between 0 and 1. Defaults to 1. diff --git a/package.json b/package.json index 89ebc2b6..75843b2b 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "generate:proto:fetch": "protons ./src/fetch/pb/proto.proto", "generate:proto:identify": "protons ./src/identify/pb/message.proto", "generate:proto:plaintext": "protons ./src/insecure/pb/proto.proto", + "generate:proto:tags": "protons ./src/connection-manager/tags/tags.proto", "test": "aegir test", "test:node": "aegir test -t node -f \"./dist/test/**/*.{node,spec}.js\" --cov", "test:chrome": "aegir test -t browser -f \"./dist/test/**/*.spec.js\" --cov", diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 1ec2e00b..79487226 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -18,6 +18,7 @@ import * as STATUS from '@libp2p/interface-connection/status' import { Dialer } from './dialer/index.js' import type { AddressSorter } from '@libp2p/interface-peer-store' import type { Resolver } from '@multiformats/multiaddr' +import { PeerMap } from '@libp2p/peer-collections' const log = logger('libp2p:connection-manager') @@ -30,13 +31,11 @@ const defaultOptions: Partial = { maxEventLoopDelay: Infinity, pollInterval: 2000, autoDialInterval: 10000, - movingAverageInterval: 60000, - defaultPeerValue: 0.5 + movingAverageInterval: 60000 } const METRICS_COMPONENT = 'connection-manager' const METRICS_PEER_CONNECTIONS = 'peer-connections' -const METRICS_PEER_VALUES = 'peer-values' export interface ConnectionManagerInit { /** @@ -79,11 +78,6 @@ export interface ConnectionManagerInit { */ movingAverageInterval?: number - /** - * The value of the peer - */ - defaultPeerValue?: number - /** * If true, try to connect to all discovered peers up to the connection manager limit */ @@ -138,7 +132,6 @@ export class DefaultConnectionManager extends EventEmitter - private readonly peerValues: Map private readonly connections: Map private started: boolean private timer?: ReturnType @@ -155,17 +148,6 @@ export class DefaultConnectionManager extends EventEmitter} - */ - this.peerValues = trackedMap({ - component: METRICS_COMPONENT, - metric: METRICS_PEER_VALUES, - metrics: this.components.getMetrics() - }) - /** * Map of connections per peer */ @@ -271,18 +253,6 @@ export class DefaultConnectionManager extends EventEmitter 1) { - throw new Error('value should be a number between 0 and 1') - } - - this.peerValues.set(peerId.toString(), value) - } - /** * Checks the libp2p metrics to determine if any values have exceeded * the configured maximums. @@ -340,10 +310,6 @@ export class DefaultConnectionManager extends EventEmitter('peer:disconnect', { detail: connection })) this.components.getMetrics()?.onPeerDisconnected(connection.remotePeer) @@ -475,7 +440,7 @@ export class DefaultConnectionManager extends EventEmitter limit) { - log('%s: limit exceeded: %p, %d, pruning %d connection(s)', this.components.getPeerId(), name, value, toPrune) + log('%s: limit exceeded: %p, %d/%d, pruning %d connection(s)', this.components.getPeerId(), name, value, limit, toPrune) await this._maybePruneConnections(toPrune) } } @@ -491,22 +456,49 @@ export class DefaultConnectionManager extends EventEmitter a[1] - b[1]))) - log.trace('sorted peer values: %j', peerValues) + const peerValues = new PeerMap() + // work out peer values + for (const connection of connections) { + const remotePeer = connection.remotePeer + + if (peerValues.has(remotePeer)) { + continue + } + + const tags = await this.components.getPeerStore().getTags(remotePeer) + + // sum all tag values + peerValues.set(remotePeer, tags.reduce((acc, curr) => { + return acc + curr.value + }, 0)) + } + + // sort by value, lowest to highest + const sortedConnections = connections.sort((a, b) => { + const peerAValue = peerValues.get(a.remotePeer) ?? 0 + const peerBValue = peerValues.get(b.remotePeer) ?? 0 + + if (peerAValue > peerBValue) { + return 1 + } + + if (peerAValue < peerBValue) { + return -1 + } + + return 0 + }) + + // close some connections const toClose = [] - for (const [peerId] of peerValues) { - log('too many connections open - closing a connection to %p', peerId) + for (const connection of sortedConnections) { + log('too many connections open - closing a connection to %p', connection.remotePeer) + toClose.push(connection) - for (const connection of connections) { - if (connection.remotePeer.toString() === peerId) { - toClose.push(connection) - } - - if (toClose.length === toPrune) { - break - } + if (toClose.length === toPrune) { + break } } diff --git a/test/connection-manager/index.spec.ts b/test/connection-manager/index.spec.ts index 950223c0..10b06869 100644 --- a/test/connection-manager/index.spec.ts +++ b/test/connection-manager/index.spec.ts @@ -51,7 +51,7 @@ describe('Connection Manager', () => { expect(libp2p.components.getMetrics()).to.exist() }) - it('should close lowest value peer connection when the maximum has been reached', async () => { + it('should close connections with low tag values first', async () => { const max = 5 libp2p = await createNode({ config: createBaseOptions({ @@ -67,20 +67,21 @@ describe('Connection Manager', () => { const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections') - - // Add 1 too many connections const spies = new Map>>() - await Promise.all([...new Array(max + 1)].map(async (_, index) => { + + // Add 1 connection too many + for (let i = 0; i < max + 1; i++) { const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) const spy = sinon.spy(connection, 'close') - // The connections have the same remote id, give them random ones - // so that we can verify the correct connection was closed - // sinon.stub(connection.remotePeer, 'toString').returns(index) - const value = Math.random() + + const value = Math.round(Math.random() * 100) spies.set(value, spy) - connectionManager.setPeerValue(connection.remotePeer, value) + await libp2p.peerStore.tagPeer(connection.remotePeer, 'test-tag', { + value + }) + await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) - })) + } // get the lowest value const lowest = Array.from(spies.keys()).sort((a, b) => { @@ -100,7 +101,7 @@ describe('Connection Manager', () => { expect(lowestSpy).to.have.property('callCount', 1) }) - it('should close connection when the maximum has been reached even without peer values', async () => { + it('should close connection when the maximum has been reached even without tags', async () => { const max = 5 libp2p = await createNode({ config: createBaseOptions({ @@ -119,11 +120,11 @@ describe('Connection Manager', () => { // Add 1 too many connections const spy = sinon.spy() - await Promise.all([...new Array(max + 1)].map(async () => { + for (let i = 0; i < max + 1; i++) { const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) sinon.stub(connection, 'close').callsFake(async () => spy()) // eslint-disable-line await connectionManager._onConnect(new CustomEvent('connection', { detail: connection })) - })) + } expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1) expect(spy).to.have.property('callCount', 1)