diff --git a/package.json b/package.json index 81e524a0..859b41dd 100644 --- a/package.json +++ b/package.json @@ -98,12 +98,13 @@ }, "dependencies": { "@achingbrain/nat-port-mapper": "^1.0.3", - "@libp2p/components": "^2.0.3", + "@libp2p/components": "^2.1.0", "@libp2p/connection": "^4.0.1", "@libp2p/crypto": "^1.0.3", "@libp2p/interface-address-manager": "^1.0.2", "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-connection-encrypter": "^2.0.1", + "@libp2p/interface-connection-manager": "^1.1.0", "@libp2p/interface-content-routing": "^1.0.2", "@libp2p/interface-dht": "^1.0.1", "@libp2p/interface-metrics": "^3.0.0", diff --git a/src/connection-manager/dialer/dial-request.ts b/src/connection-manager/dialer/dial-request.ts index 9004bb4d..18e90de3 100644 --- a/src/connection-manager/dialer/dial-request.ts +++ b/src/connection-manager/dialer/dial-request.ts @@ -7,7 +7,7 @@ import { logger } from '@libp2p/logger' import type { Multiaddr } from '@multiformats/multiaddr' import type { Connection } from '@libp2p/interface-connection' import type { AbortOptions } from '@libp2p/interfaces' -import type { Dialer } from './index.js' +import type { Dialer } from '@libp2p/interface-connection-manager' const log = logger('libp2p:dialer:dial-request') diff --git a/src/connection-manager/dialer/index.ts b/src/connection-manager/dialer/index.ts index f40c735b..90159655 100644 --- a/src/connection-manager/dialer/index.ts +++ b/src/connection-manager/dialer/index.ts @@ -24,10 +24,11 @@ import type { Startable } from '@libp2p/interfaces/startable' import type { PeerId } from '@libp2p/interface-peer-id' import { getPeer } from '../../get-peer.js' import sort from 'it-sort' -import { Components, Initializable } from '@libp2p/components' +import type { Components } from '@libp2p/components' import map from 'it-map' import type { AddressSorter } from '@libp2p/interface-peer-store' import type { ComponentMetricsTracker } from '@libp2p/interface-metrics' +import type { Dialer } from '@libp2p/interface-connection-manager' const log = logger('libp2p:dialer') @@ -85,8 +86,8 @@ export interface DialerInit { metrics?: ComponentMetricsTracker } -export class Dialer implements Startable, Initializable { - private components: Components = new Components() +export class DefaultDialer implements Startable, Dialer { + private readonly components: Components private readonly addressSorter: AddressSorter private readonly maxAddrsToDial: number private readonly timeout: number @@ -96,13 +97,14 @@ export class Dialer implements Startable, Initializable { public pendingDialTargets: Map private started: boolean - constructor (init: DialerInit = {}) { + constructor (components: Components, init: DialerInit = {}) { this.started = false this.addressSorter = init.addressSorter ?? publicAddressesFirst this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL this.timeout = init.dialTimeout ?? DIAL_TIMEOUT this.maxDialsPerPeer = init.maxDialsPerPeer ?? MAX_PER_PEER_DIALS this.tokens = [...new Array(init.maxParallelDials ?? MAX_PARALLEL_DIALS)].map((_, index) => index) + this.components = components this.pendingDials = trackedMap({ component: METRICS_COMPONENT, metric: METRICS_PENDING_DIALS, @@ -111,7 +113,7 @@ export class Dialer implements Startable, Initializable { this.pendingDialTargets = trackedMap({ component: METRICS_COMPONENT, metric: METRICS_PENDING_DIAL_TARGETS, - metrics: init.metrics + metrics: components.getMetrics() }) for (const [key, value] of Object.entries(init.resolvers ?? {})) { @@ -119,10 +121,6 @@ export class Dialer implements Startable, Initializable { } } - init (components: Components): void { - this.components = components - } - isStarted () { return this.started } diff --git a/src/connection-manager/index.ts b/src/connection-manager/index.ts index 4b6f001a..c3d0de88 100644 --- a/src/connection-manager/index.ts +++ b/src/connection-manager/index.ts @@ -14,7 +14,6 @@ import type { Connection } from '@libp2p/interface-connection' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import { Components, Initializable } from '@libp2p/components' import * as STATUS from '@libp2p/interface-connection/status' -import { Dialer } from './dialer/index.js' import type { AddressSorter } from '@libp2p/interface-peer-store' import type { Resolver } from '@multiformats/multiaddr' import { PeerMap } from '@libp2p/peer-collections' @@ -144,7 +143,6 @@ export interface ConnectionManagerEvents { * Responsible for managing known connections. */ export class DefaultConnectionManager extends EventEmitter implements ConnectionManager, Startable, Initializable { - public readonly dialer: Dialer private components = new Components() private readonly opts: Required private readonly connections: Map @@ -184,8 +182,6 @@ export class DefaultConnectionManager extends EventEmitter implements Libp2p { inboundUpgradeTimeout: init.connectionManager.inboundUpgradeTimeout })) + // Create the dialer + this.components.setDialer(new DefaultDialer(this.components, init.connectionManager)) + // Create the Connection Manager this.connectionManager = this.components.setConnectionManager(new DefaultConnectionManager(init.connectionManager)) @@ -338,7 +342,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { ) await Promise.all( - this.services.map(servce => servce.stop()) + this.services.map(service => service.stop()) ) await Promise.all( diff --git a/test/dialing/dial-request.spec.ts b/test/dialing/dial-request.spec.ts index b6dd5222..cf9840fe 100644 --- a/test/dialing/dial-request.spec.ts +++ b/test/dialing/dial-request.spec.ts @@ -9,7 +9,8 @@ import { DialAction, DialRequest } from '../../src/connection-manager/dialer/dia import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { Multiaddr } from '@multiformats/multiaddr' -import { Dialer } from '../../src/connection-manager/dialer/index.js' +import { DefaultDialer } from '../../src/connection-manager/dialer/index.js' +import { Components } from '@libp2p/components' const error = new Error('dial failure') describe('Dial Request', () => { @@ -23,7 +24,7 @@ describe('Dial Request', () => { } const dialAction: DialAction = async (num) => await actions[num.toString()]() const controller = new AbortController() - const dialer = new Dialer({ + const dialer = new DefaultDialer(new Components(), { maxParallelDials: 2 }) const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') @@ -53,7 +54,7 @@ describe('Dial Request', () => { } const dialAction: DialAction = async (num) => await actions[num.toString()]() const controller = new AbortController() - const dialer = new Dialer({ + const dialer = new DefaultDialer(new Components(), { maxParallelDials: 2 }) const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') @@ -98,7 +99,7 @@ describe('Dial Request', () => { const dialAction: DialAction = async (num) => await actions[num.toString()]() const addrs = Object.keys(actions) const controller = new AbortController() - const dialer = new Dialer({ + const dialer = new DefaultDialer(new Components(), { maxParallelDials: 2 }) const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') @@ -138,7 +139,7 @@ describe('Dial Request', () => { const dialAction: DialAction = async (num) => await actions[num.toString()]() const controller = new AbortController() - const dialer = new Dialer({ + const dialer = new DefaultDialer(new Components(), { maxParallelDials: 2 }) const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') @@ -184,7 +185,7 @@ describe('Dial Request', () => { const dialAction: DialAction = async (num) => await actions[num.toString()]() const addrs = Object.keys(actions) const controller = new AbortController() - const dialer = new Dialer({ + const dialer = new DefaultDialer(new Components(), { maxParallelDials: 2 }) const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') @@ -237,7 +238,7 @@ describe('Dial Request', () => { const dialRequest = new DialRequest({ addrs: Object.keys(actions).map(str => new Multiaddr(str)), - dialer: new Dialer({ + dialer: new DefaultDialer(new Components(), { maxParallelDials: 3 }), dialAction: async (ma, opts) => { diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index d4965adb..146701f8 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -17,7 +17,7 @@ import { Connection, isConnection } from '@libp2p/interface-connection' import { AbortError } from '@libp2p/interfaces/errors' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { MemoryDatastore } from 'datastore-core/memory' -import { Dialer } from '../../src/connection-manager/dialer/index.js' +import { DefaultDialer } from '../../src/connection-manager/dialer/index.js' import { DefaultAddressManager } from '../../src/address-manager/index.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultTransportManager } from '../../src/transport-manager.js' @@ -95,8 +95,7 @@ describe('Dialing (direct, TCP)', () => { }) it('should be able to connect to a remote node via its multiaddr', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const connection = await dialer.dial(remoteAddr) expect(connection).to.exist() @@ -104,8 +103,7 @@ describe('Dialing (direct, TCP)', () => { }) it('should fail to connect to an unsupported multiaddr', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) await expect(dialer.dial(unsupportedAddr)) .to.eventually.be.rejectedWith(Error) @@ -113,8 +111,7 @@ describe('Dialing (direct, TCP)', () => { }) it('should fail to connect if peer has no known addresses', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const peerId = await createFromJSON(Peers[1]) await expect(dialer.dial(peerId)) @@ -125,8 +122,7 @@ describe('Dialing (direct, TCP)', () => { it('should be able to connect to a given peer id', async () => { await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs()) - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const connection = await dialer.dial(remoteComponents.getPeerId()) expect(connection).to.exist() @@ -136,8 +132,7 @@ describe('Dialing (direct, TCP)', () => { it('should fail to connect to a given peer with unsupported addresses', async () => { await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr]) - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) await expect(dialer.dial(remoteComponents.getPeerId())) .to.eventually.be.rejectedWith(Error) @@ -150,8 +145,7 @@ describe('Dialing (direct, TCP)', () => { const peerId = await createFromJSON(Peers[1]) await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr]) - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) sinon.spy(localTM, 'dial') const connection = await dialer.dial(peerId) @@ -162,10 +156,9 @@ describe('Dialing (direct, TCP)', () => { }) it('should abort dials on queue task timeout', async () => { - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { dialTimeout: 50 }) - dialer.init(localComponents) sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => { expect(options.signal).to.exist() @@ -191,10 +184,9 @@ describe('Dialing (direct, TCP)', () => { await localComponents.getPeerStore().addressBook.add(peerId, addrs) - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { maxParallelDials: 2 }) - dialer.init(localComponents) expect(dialer.tokens).to.have.lengthOf(2) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index f20ad79c..d2f0a826 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -13,7 +13,7 @@ import { AbortError } from '@libp2p/interfaces/errors' import { MemoryDatastore } from 'datastore-core/memory' import { codes as ErrorCodes } from '../../src/errors.js' import * as Constants from '../../src/constants.js' -import { Dialer, DialTarget } from '../../src/connection-manager/dialer/index.js' +import { DefaultDialer, DialTarget } from '../../src/connection-manager/dialer/index.js' import { publicAddressesFirst } from '@libp2p/utils/address-sort' import { PersistentPeerStore } from '@libp2p/peer-store' import { DefaultTransportManager } from '../../src/transport-manager.js' @@ -72,8 +72,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should limit the number of tokens it provides', () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const maxPerPeer = Constants.MAX_PER_PEER_DIALS expect(dialer.tokens).to.have.lengthOf(Constants.MAX_PARALLEL_DIALS) @@ -83,10 +82,9 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should not return tokens if none are left', () => { - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { maxDialsPerPeer: Infinity }) - dialer.init(localComponents) const maxTokens = dialer.tokens.length @@ -97,8 +95,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should NOT be able to return a token twice', () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const tokens = dialer.getTokens(1) expect(tokens).to.have.length(1) @@ -109,8 +106,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should be able to connect to a remote node via its multiaddr', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) @@ -121,8 +117,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should fail to connect to an unsupported multiaddr', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) await expect(dialer.dial(unsupportedAddr.encapsulate(`/p2p/${remoteComponents.getPeerId().toString()}`))) .to.eventually.be.rejectedWith(Error) @@ -130,8 +125,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should be able to connect to a given peer', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) @@ -142,8 +136,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should fail to connect to a given peer with unsupported addresses', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') await localComponents.getPeerStore().addressBook.set(remotePeerId, [unsupportedAddr]) @@ -154,10 +147,9 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should abort dials on queue task timeout', async () => { - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { dialTimeout: 50 }) - dialer.init(localComponents) const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) @@ -177,10 +169,9 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should throw when a peer advertises more than the allowed number of peers', async () => { - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { maxAddrsToDial: 10 }) - dialer.init(localComponents) const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') await localComponents.getPeerStore().addressBook.set(remotePeerId, Array.from({ length: 11 }, (_, i) => new Multiaddr(`/ip4/127.0.0.1/tcp/1500${i}/ws/p2p/12D3KooWHFKTMzwerBtsVmtz4ZZEQy2heafxzWw6wNn5PPYkBxJ5`))) @@ -200,11 +191,10 @@ describe('Dialing (direct, WebSockets)', () => { const publicAddressesFirstSpy = sinon.spy(publicAddressesFirst) const localTMDialStub = sinon.stub(localTM, 'dial').callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), peerIdFromString(ma.getPeerId() ?? '')))) - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { addressSorter: publicAddressesFirstSpy, maxParallelDials: 3 }) - dialer.init(localComponents) // Inject data in the AddressBook await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), peerMultiaddrs) @@ -229,10 +219,9 @@ describe('Dialing (direct, WebSockets)', () => { ] const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { maxParallelDials: 2 }) - dialer.init(localComponents) // Inject data in the AddressBook await localComponents.getPeerStore().addressBook.add(remotePeerId, addrs) @@ -268,10 +257,9 @@ describe('Dialing (direct, WebSockets)', () => { new Multiaddr('/ip4/0.0.0.0/tcp/8001/ws'), new Multiaddr('/ip4/0.0.0.0/tcp/8002/ws') ] - const dialer = new Dialer({ + const dialer = new DefaultDialer(localComponents, { maxParallelDials: 2 }) - dialer.init(localComponents) // Inject data in the AddressBook await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), addrs) @@ -309,8 +297,7 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should cancel pending dial targets before proceeding', async () => { - const dialer = new Dialer() - dialer.init(localComponents) + const dialer = new DefaultDialer(localComponents) sinon.stub(dialer, '_createDialTarget').callsFake(async () => { const deferredDial = pDefer() @@ -364,8 +351,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { ] }) - const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const dialer = connectionManager.dialer + const dialer = libp2p.components.getDialer() expect(dialer).to.exist() expect(dialer).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS) @@ -395,8 +381,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { } libp2p = await createLibp2pNode(config) - const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const dialer = connectionManager.dialer + const dialer = libp2p.components.getDialer() expect(dialer).to.exist() expect(dialer).to.have.property('tokens').with.lengthOf(config.connectionManager.maxParallelDials) @@ -420,8 +405,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { ] }) - const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const dialerDialSpy = sinon.spy(connectionManager.dialer, 'dial') + const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial') const addressBookAddSpy = sinon.spy(libp2p.components.getPeerStore().addressBook, 'add') await libp2p.start() @@ -543,8 +527,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => { ] }) - const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - sinon.stub(connectionManager.dialer, '_createDialTarget').callsFake(async () => { + const dialer = libp2p.components.getDialer() as DefaultDialer + sinon.stub(dialer, '_createDialTarget').callsFake(async () => { const deferredDial = pDefer() return await deferredDial.promise }) @@ -582,8 +566,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => { await libp2p.start() - const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager - const dialerDestroyStub = sinon.spy(connectionManager.dialer, 'stop') + const dialer = libp2p.components.getDialer() as DefaultDialer + const dialerDestroyStub = sinon.spy(dialer, 'stop') await libp2p.stop()