fix: update to new interfaces (#1206)

Notably removes the `Dialer` interface as the `ConnectionManager` is now in charge of managing connections.
This commit is contained in:
Alex Potsides
2022-05-04 10:19:04 +01:00
committed by GitHub
parent d16817ca44
commit a15254fdd4
34 changed files with 566 additions and 398 deletions

View File

@ -11,6 +11,7 @@ export default {
}, },
test: { test: {
before: async () => { before: async () => {
// use dynamic import because we only want to reference these files during the test run, e.g. after building
const { createLibp2p } = await import('./dist/src/index.js') const { createLibp2p } = await import('./dist/src/index.js')
const { MULTIADDRS_WEBSOCKETS } = await import('./dist/test/fixtures/browser.js') const { MULTIADDRS_WEBSOCKETS } = await import('./dist/test/fixtures/browser.js')
const { Plaintext } = await import('./dist/src/insecure/index.js') const { Plaintext } = await import('./dist/src/insecure/index.js')

View File

@ -98,9 +98,10 @@
"@achingbrain/nat-port-mapper": "^1.0.0", "@achingbrain/nat-port-mapper": "^1.0.0",
"@libp2p/connection": "^1.1.5", "@libp2p/connection": "^1.1.5",
"@libp2p/crypto": "^0.22.11", "@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^1.3.24", "@libp2p/interfaces": "^1.3.30",
"@libp2p/logger": "^1.1.4", "@libp2p/logger": "^1.1.4",
"@libp2p/multistream-select": "^1.0.4", "@libp2p/multistream-select": "^1.0.4",
"@libp2p/peer-collections": "^1.0.2",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.10",
"@libp2p/peer-id-factory": "^1.0.9", "@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/peer-record": "^1.0.8", "@libp2p/peer-record": "^1.0.8",
@ -135,8 +136,6 @@
"it-stream-types": "^1.0.4", "it-stream-types": "^1.0.4",
"it-take": "^1.0.2", "it-take": "^1.0.2",
"it-to-buffer": "^2.0.2", "it-to-buffer": "^2.0.2",
"@libp2p/tracked-map": "^1.0.4",
"it-pair": "^2.0.2",
"merge-options": "^3.0.4", "merge-options": "^3.0.4",
"mortice": "^3.0.0", "mortice": "^3.0.0",
"multiformats": "^9.6.3", "multiformats": "^9.6.3",
@ -165,9 +164,9 @@
"@libp2p/delegated-content-routing": "^1.0.2", "@libp2p/delegated-content-routing": "^1.0.2",
"@libp2p/delegated-peer-routing": "^1.0.2", "@libp2p/delegated-peer-routing": "^1.0.2",
"@libp2p/floodsub": "^1.0.6", "@libp2p/floodsub": "^1.0.6",
"@libp2p/interface-compliance-tests": "^1.1.25", "@libp2p/interface-compliance-tests": "^1.1.31",
"@libp2p/interop": "^1.0.3", "@libp2p/interop": "^1.0.3",
"@libp2p/kad-dht": "^1.0.7", "@libp2p/kad-dht": "^1.0.8",
"@libp2p/mdns": "^1.0.4", "@libp2p/mdns": "^1.0.4",
"@libp2p/mplex": "^1.0.3", "@libp2p/mplex": "^1.0.3",
"@libp2p/pubsub": "^1.2.18", "@libp2p/pubsub": "^1.2.18",

View File

@ -85,12 +85,14 @@ export class AutoRelay {
// If protocol, check if can hop, store info in the metadataBook and listen on it // If protocol, check if can hop, store info in the metadataBook and listen on it
try { try {
const connection = this.components.getConnectionManager().getConnection(peerId) const connections = this.components.getConnectionManager().getConnections(peerId)
if (connection == null) { if (connections.length === 0) {
return return
} }
const connection = connections[0]
// Do not hop on a relayed connection // Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) { if (connection.remoteAddr.protoCodes().includes(CIRCUIT_PROTO_CODE)) {
log(`relayed connection to ${id} will not be used to hop on`) log(`relayed connection to ${id} will not be used to hop on`)
@ -223,15 +225,15 @@ export class AutoRelay {
continue continue
} }
const connection = this.components.getConnectionManager().getConnection(id) const connections = this.components.getConnectionManager().getConnections(id)
// If not connected, store for possible later use. // If not connected, store for possible later use.
if (connection == null) { if (connections.length === 0) {
knownHopsToDial.push(id) knownHopsToDial.push(id)
continue continue
} }
await this._addListenRelay(connection, idStr) await this._addListenRelay(connections[0], idStr)
// Check if already listening on enough relays // Check if already listening on enough relays
if (this.listenRelays.size >= this.maxListeners) { if (this.listenRelays.size >= this.maxListeners) {
@ -274,7 +276,7 @@ export class AutoRelay {
async _tryToListenOnRelay (peerId: PeerId) { async _tryToListenOnRelay (peerId: PeerId) {
try { try {
const connection = await this.components.getDialer().dial(peerId) const connection = await this.components.getConnectionManager().openConnection(peerId)
await this._addListenRelay(connection, peerId.toString()) await this._addListenRelay(connection, peerId.toString())
} catch (err: any) { } catch (err: any) {
log.error('Could not use %p as relay', peerId, err) log.error('Could not use %p as relay', peerId, err)

View File

@ -11,7 +11,7 @@ import type { Connection } from '@libp2p/interfaces/connection'
import { peerIdFromBytes } from '@libp2p/peer-id' import { peerIdFromBytes } from '@libp2p/peer-id'
import type { Duplex } from 'it-stream-types' import type { Duplex } from 'it-stream-types'
import type { Circuit } from '../transport.js' import type { Circuit } from '../transport.js'
import type { ConnectionManager } from '@libp2p/interfaces/registrar' import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
const log = logger('libp2p:circuit:hop') const log = logger('libp2p:circuit:hop')
@ -58,8 +58,8 @@ export async function handleHop (hopRequest: HopRequest) {
// Get the connection to the destination (stop) peer // Get the connection to the destination (stop) peer
const destinationPeer = peerIdFromBytes(request.dstPeer.id) const destinationPeer = peerIdFromBytes(request.dstPeer.id)
const destinationConnection = connectionManager.getConnection(destinationPeer) const destinationConnections = connectionManager.getConnections(destinationPeer)
if (destinationConnection == null && !circuit.hopActive()) { if (destinationConnections.length === 0 && !circuit.hopActive()) {
log('HOP request received but we are not connected to the destination peer') log('HOP request received but we are not connected to the destination peer')
return streamHandler.end({ return streamHandler.end({
type: CircuitPB.Type.STATUS, type: CircuitPB.Type.STATUS,
@ -68,7 +68,7 @@ export async function handleHop (hopRequest: HopRequest) {
} }
// TODO: Handle being an active relay // TODO: Handle being an active relay
if (destinationConnection == null) { if (destinationConnections.length === 0) {
log('did not have connection to remote peer') log('did not have connection to remote peer')
return streamHandler.end({ return streamHandler.end({
type: CircuitPB.Type.STATUS, type: CircuitPB.Type.STATUS,
@ -87,7 +87,7 @@ export async function handleHop (hopRequest: HopRequest) {
try { try {
log('performing STOP request') log('performing STOP request')
const result = await stop({ const result = await stop({
connection: destinationConnection, connection: destinationConnections[0],
request: stopRequest request: stopRequest
}) })

View File

@ -1,11 +1,12 @@
import { CustomEvent, EventEmitter } from '@libp2p/interfaces' import { CustomEvent, EventEmitter } from '@libp2p/interfaces'
import type { ConnectionManager } from '@libp2p/interfaces/registrar' import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
import type { Dialer } from '@libp2p/interfaces/dialer' import type { PeerStore } from '@libp2p/interfaces/peer-store'
import type { Listener } from '@libp2p/interfaces/transport' import type { Listener } from '@libp2p/interfaces/transport'
import { peerIdFromString } from '@libp2p/peer-id'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
export interface ListenerOptions { export interface ListenerOptions {
dialer: Dialer peerStore: PeerStore
connectionManager: ConnectionManager connectionManager: ConnectionManager
} }
@ -17,7 +18,19 @@ export function createListener (options: ListenerOptions): Listener {
*/ */
async function listen (addr: Multiaddr): Promise<void> { async function listen (addr: Multiaddr): Promise<void> {
const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '') const addrString = addr.toString().split('/p2p-circuit').find(a => a !== '')
const relayConn = await options.dialer.dial(new Multiaddr(addrString)) const ma = new Multiaddr(addrString)
const relayPeerStr = ma.getPeerId()
if (relayPeerStr == null) {
throw new Error('Could not determine relay peer from multiaddr')
}
const relayPeerId = peerIdFromString(relayPeerStr)
await options.peerStore.addressBook.add(relayPeerId, [ma])
const relayConn = await options.connectionManager.openConnection(relayPeerId)
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit') const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
listeningAddrs.set(relayConn.remotePeer.toString(), relayedAddr) listeningAddrs.set(relayConn.remotePeer.toString(), relayedAddr)

View File

@ -149,9 +149,12 @@ export class Circuit implements Transport, Initializable {
const destinationPeer = peerIdFromString(destinationId) const destinationPeer = peerIdFromString(destinationId)
let disconnectOnFailure = false let disconnectOnFailure = false
let relayConnection = this.components.getConnectionManager().getConnection(relayPeer) const relayConnections = this.components.getConnectionManager().getConnections(relayPeer)
let relayConnection = relayConnections[0]
if (relayConnection == null) { if (relayConnection == null) {
relayConnection = await this.components.getDialer().dial(relayAddr, options) await this.components.getPeerStore().addressBook.add(relayPeer, [relayAddr])
relayConnection = await this.components.getConnectionManager().openConnection(relayPeer, options)
disconnectOnFailure = true disconnectOnFailure = true
} }
@ -195,8 +198,8 @@ export class Circuit implements Transport, Initializable {
this.handler = options.handler this.handler = options.handler
return createListener({ return createListener({
dialer: this.components.getDialer(), connectionManager: this.components.getConnectionManager(),
connectionManager: this.components.getConnectionManager() peerStore: this.components.getPeerStore()
}) })
} }

View File

@ -21,14 +21,8 @@ const DefaultConfig: Partial<Libp2pInit> = {
connectionManager: { connectionManager: {
maxConnections: 300, maxConnections: 300,
minConnections: 50, minConnections: 50,
autoDial: true,
autoDialInterval: 10000, autoDialInterval: 10000,
autoDial: true
},
connectionGater: {},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
},
dialer: {
maxParallelDials: Constants.MAX_PARALLEL_DIALS, maxParallelDials: Constants.MAX_PARALLEL_DIALS,
maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS, maxDialsPerPeer: Constants.MAX_PER_PEER_DIALS,
dialTimeout: Constants.DIAL_TIMEOUT, dialTimeout: Constants.DIAL_TIMEOUT,
@ -37,6 +31,10 @@ const DefaultConfig: Partial<Libp2pInit> = {
}, },
addressSorter: publicAddressesFirst addressSorter: publicAddressesFirst
}, },
connectionGater: {},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
},
host: { host: {
agentVersion: AGENT_VERSION agentVersion: AGENT_VERSION
}, },

View File

@ -102,7 +102,7 @@ export class AutoDialler implements Startable {
const minConnections = this.options.minConnections const minConnections = this.options.minConnections
// Already has enough connections // Already has enough connections
if (this.components.getConnectionManager().getConnectionList().length >= minConnections) { if (this.components.getConnectionManager().getConnections().length >= minConnections) {
this.autoDialTimeout = retimer(this._autoDial, this.options.autoDialInterval) this.autoDialTimeout = retimer(this._autoDial, this.options.autoDialInterval)
return return
@ -126,7 +126,7 @@ export class AutoDialler implements Startable {
async (source) => await all(source) async (source) => await all(source)
) )
for (let i = 0; this.running && i < peers.length && this.components.getConnectionManager().getConnectionList().length < minConnections; i++) { for (let i = 0; this.running && i < peers.length && this.components.getConnectionManager().getConnections().length < minConnections; i++) {
// Connection Manager was stopped during async dial // Connection Manager was stopped during async dial
if (!this.running) { if (!this.running) {
return return
@ -134,10 +134,10 @@ export class AutoDialler implements Startable {
const peer = peers[i] const peer = peers[i]
if (this.components.getConnectionManager().getConnection(peer.id) == null) { if (this.components.getConnectionManager().getConnections(peer.id).length === 0) {
log('connecting to a peerStore stored peer %p', peer.id) log('connecting to a peerStore stored peer %p', peer.id)
try { try {
await this.components.getDialer().dial(peer.id) await this.components.getConnectionManager().openConnection(peer.id)
} catch (err: any) { } catch (err: any) {
log.error('could not connect to peerStore stored peer', err) log.error('could not connect to peerStore stored peer', err)
} }

View File

@ -1,39 +1,58 @@
import type { PeerInfo } from '@libp2p/interfaces/peer-info' import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import { logger } from '@libp2p/logger' import { logger } from '@libp2p/logger'
import type { Components } from '@libp2p/interfaces/components' import type { Components } from '@libp2p/interfaces/components'
import { TimeoutController } from 'timeout-abort-controller'
const log = logger('libp2p:dialer:auto-dialer') const log = logger('libp2p:dialer:auto-dialer')
export interface AutoDialerInit { export interface AutoDialerInit {
enabled: boolean enabled: boolean
minConnections: number minConnections: number
dialTimeout: number
} }
export class AutoDialer { export class AutoDialer {
private readonly components: Components private readonly components: Components
private readonly enabled: boolean private readonly enabled: boolean
private readonly minConnections: number private readonly minConnections: number
private readonly dialTimeout: number
constructor (components: Components, init: AutoDialerInit) { constructor (components: Components, init: AutoDialerInit) {
this.components = components this.components = components
this.enabled = init.enabled this.enabled = init.enabled
this.minConnections = init.minConnections this.minConnections = init.minConnections
this.dialTimeout = init.dialTimeout
} }
public handle (evt: CustomEvent<PeerInfo>) { public handle (evt: CustomEvent<PeerInfo>) {
const { detail: peer } = evt const { detail: peer } = evt
if (!this.enabled) {
return
}
const connections = this.components.getConnectionManager().getConnections(peer.id)
// If auto dialing is on and we have no connection to the peer, check if we should dial // If auto dialing is on and we have no connection to the peer, check if we should dial
if (this.enabled && this.components.getConnectionManager().getConnection(peer.id) == null) { if (connections.length === 0) {
const minConnections = this.minConnections ?? 0 const minConnections = this.minConnections ?? 0
if (minConnections > this.components.getConnectionManager().getConnectionList().length) { const allConnections = this.components.getConnectionManager().getConnections()
log('auto-dialing discovered peer %p', peer.id)
void this.components.getDialer().dial(peer.id) if (minConnections > allConnections.length) {
log('auto-dialing discovered peer %p with timeout %d', peer.id, this.dialTimeout)
const controller = new TimeoutController(this.dialTimeout)
void this.components.getConnectionManager().openConnection(peer.id, {
signal: controller.signal
})
.catch(err => { .catch(err => {
log.error('could not connect to discovered peer %p with %o', peer.id, err) log.error('could not connect to discovered peer %p with %o', peer.id, err)
}) })
.finally(() => {
controller.clear()
})
} }
} }
} }

View File

@ -3,12 +3,12 @@ import { anySignal } from 'any-signal'
import FIFO from 'p-fifo' import FIFO from 'p-fifo'
// @ts-expect-error setMaxListeners is missing from the node 16 types // @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events' import { setMaxListeners } from 'events'
import { codes } from '../errors.js' import { codes } from '../../errors.js'
import { logger } from '@libp2p/logger' import { logger } from '@libp2p/logger'
import type { Multiaddr } from '@multiformats/multiaddr' import type { Multiaddr } from '@multiformats/multiaddr'
import type { Connection } from '@libp2p/interfaces/connection' import type { Connection } from '@libp2p/interfaces/connection'
import type { AbortOptions } from '@libp2p/interfaces' import type { AbortOptions } from '@libp2p/interfaces'
import type { DefaultDialer } from './index.js' import type { Dialer } from './index.js'
const log = logger('libp2p:dialer:dial-request') const log = logger('libp2p:dialer:dial-request')
@ -19,12 +19,12 @@ export interface DialAction {
export interface DialRequestOptions { export interface DialRequestOptions {
addrs: Multiaddr[] addrs: Multiaddr[]
dialAction: DialAction dialAction: DialAction
dialer: DefaultDialer dialer: Dialer
} }
export class DialRequest { export class DialRequest {
private readonly addrs: Multiaddr[] private readonly addrs: Multiaddr[]
private readonly dialer: DefaultDialer private readonly dialer: Dialer
private readonly dialAction: DialAction private readonly dialAction: DialAction
/** /**

View File

@ -3,7 +3,7 @@ import all from 'it-all'
import filter from 'it-filter' import filter from 'it-filter'
import { pipe } from 'it-pipe' import { pipe } from 'it-pipe'
import errCode from 'err-code' import errCode from 'err-code'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr, Resolver } from '@multiformats/multiaddr'
import { TimeoutController } from 'timeout-abort-controller' import { TimeoutController } from 'timeout-abort-controller'
import { AbortError } from '@libp2p/interfaces/errors' import { AbortError } from '@libp2p/interfaces/errors'
import { anySignal } from 'any-signal' import { anySignal } from 'any-signal'
@ -12,22 +12,22 @@ import { setMaxListeners } from 'events'
import { DialAction, DialRequest } from './dial-request.js' import { DialAction, DialRequest } from './dial-request.js'
import { publicAddressesFirst } from '@libp2p/utils/address-sort' import { publicAddressesFirst } from '@libp2p/utils/address-sort'
import { trackedMap } from '@libp2p/tracked-map' import { trackedMap } from '@libp2p/tracked-map'
import { codes } from '../errors.js' import { codes } from '../../errors.js'
import { import {
DIAL_TIMEOUT, DIAL_TIMEOUT,
MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS,
MAX_PER_PEER_DIALS, MAX_PER_PEER_DIALS,
MAX_ADDRS_TO_DIAL MAX_ADDRS_TO_DIAL
} from '../constants.js' } from '../../constants.js'
import type { Connection } from '@libp2p/interfaces/connection' import type { Connection } from '@libp2p/interfaces/connection'
import type { AbortOptions, Startable } from '@libp2p/interfaces' import type { AbortOptions, Startable } from '@libp2p/interfaces'
import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerId } from '@libp2p/interfaces/peer-id'
import { getPeer } from '../get-peer.js' import { getPeer } from '../../get-peer.js'
import sort from 'it-sort' import sort from 'it-sort'
import type { Components } from '@libp2p/interfaces/components' import { Components, Initializable } from '@libp2p/interfaces/components'
import type { Dialer, DialerInit } from '@libp2p/interfaces/dialer'
import map from 'it-map' import map from 'it-map'
import type { AddressSorter } from '@libp2p/interfaces/peer-store' import type { AddressSorter } from '@libp2p/interfaces/peer-store'
import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics'
const log = logger('libp2p:dialer') const log = logger('libp2p:dialer')
@ -52,8 +52,41 @@ export interface PendingDialTarget {
reject: (err: Error) => void reject: (err: Error) => void
} }
export class DefaultDialer implements Dialer, Startable { export interface DialerInit {
private readonly components: Components /**
* Sort the known addresses of a peer before trying to dial
*/
addressSorter?: AddressSorter
/**
* Number of max concurrent dials
*/
maxParallelDials?: number
/**
* Number of max addresses to dial for a given peer
*/
maxAddrsToDial?: number
/**
* How long a dial attempt is allowed to take
*/
dialTimeout?: number
/**
* Number of max concurrent dials per peer
*/
maxDialsPerPeer?: number
/**
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>
metrics?: ComponentMetricsTracker
}
export class Dialer implements Startable, Initializable {
private components: Components = new Components()
private readonly addressSorter: AddressSorter private readonly addressSorter: AddressSorter
private readonly maxAddrsToDial: number private readonly maxAddrsToDial: number
private readonly timeout: number private readonly timeout: number
@ -63,8 +96,7 @@ export class DefaultDialer implements Dialer, Startable {
public pendingDialTargets: Map<string, PendingDialTarget> public pendingDialTargets: Map<string, PendingDialTarget>
private started: boolean private started: boolean
constructor (components: Components, init: DialerInit = {}) { constructor (init: DialerInit = {}) {
this.components = components
this.started = false this.started = false
this.addressSorter = init.addressSorter ?? publicAddressesFirst this.addressSorter = init.addressSorter ?? publicAddressesFirst
this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL this.maxAddrsToDial = init.maxAddrsToDial ?? MAX_ADDRS_TO_DIAL
@ -87,6 +119,10 @@ export class DefaultDialer implements Dialer, Startable {
} }
} }
init (components: Components): void {
this.components = components
}
isStarted () { isStarted () {
return this.started return this.started
} }
@ -139,16 +175,6 @@ export class DefaultDialer implements Dialer, Startable {
throw errCode(new Error('The dial request is blocked by gater.allowDialPeer'), codes.ERR_PEER_DIAL_INTERCEPTED) throw errCode(new Error('The dial request is blocked by gater.allowDialPeer'), codes.ERR_PEER_DIAL_INTERCEPTED)
} }
log('dial to %p', id)
const existingConnection = this.components.getConnectionManager().getConnection(id)
if (existingConnection != null) {
log('had an existing connection to %p', id)
return existingConnection
}
log('creating dial target for %p', id) log('creating dial target for %p', id)
const dialTarget = await this._createCancellableDialTarget(id) const dialTarget = await this._createCancellableDialTarget(id)
@ -176,22 +202,6 @@ export class DefaultDialer implements Dialer, Startable {
} }
} }
async dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options: AbortOptions = {}) {
if (protocols == null) {
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
protocols = Array.isArray(protocols) ? protocols : [protocols]
if (protocols.length === 0) {
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
const connection = await this.dial(peer, options)
return await connection.newStream(protocols)
}
/** /**
* Connects to a given `peer` by dialing all of its known addresses. * Connects to a given `peer` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection * The dial to the first address that is successfully able to upgrade a connection

View File

@ -4,16 +4,19 @@ import mergeOptions from 'merge-options'
import { LatencyMonitor, SummaryObject } from './latency-monitor.js' import { LatencyMonitor, SummaryObject } from './latency-monitor.js'
// @ts-expect-error retimer does not have types // @ts-expect-error retimer does not have types
import retimer from 'retimer' import retimer from 'retimer'
import { CustomEvent, EventEmitter, Startable } from '@libp2p/interfaces' import { AbortOptions, CustomEvent, EventEmitter, Startable } from '@libp2p/interfaces'
import { trackedMap } from '@libp2p/tracked-map' import { trackedMap } from '@libp2p/tracked-map'
import { codes } from '../errors.js' import { codes } from '../errors.js'
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id' import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
// @ts-expect-error setMaxListeners is missing from the node 16 types // @ts-expect-error setMaxListeners is missing from the node 16 types
import { setMaxListeners } from 'events' import { setMaxListeners } from 'events'
import type { Connection } from '@libp2p/interfaces/connection' import type { Connection } from '@libp2p/interfaces/connection'
import type { ConnectionManager } from '@libp2p/interfaces/registrar' import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
import type { Components } from '@libp2p/interfaces/components' import { Components, Initializable } from '@libp2p/interfaces/components'
import * as STATUS from '@libp2p/interfaces/connection/status' import * as STATUS from '@libp2p/interfaces/connection/status'
import { Dialer } from './dialer/index.js'
import type { AddressSorter } from '@libp2p/interfaces/peer-store'
import type { Resolver } from '@multiformats/multiaddr'
const log = logger('libp2p:connection-manager') const log = logger('libp2p:connection-manager')
@ -34,21 +37,16 @@ const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections' const METRICS_PEER_CONNECTIONS = 'peer-connections'
const METRICS_PEER_VALUES = 'peer-values' const METRICS_PEER_VALUES = 'peer-values'
export interface ConnectionManagerEvents {
'peer:connect': CustomEvent<PeerId>
'peer:disconnect': CustomEvent<PeerId>
}
export interface ConnectionManagerInit { export interface ConnectionManagerInit {
/** /**
* The maximum number of connections allowed * The maximum number of connections to keep open
*/ */
maxConnections?: number maxConnections: number
/** /**
* The minimum number of connections to avoid pruning * The minimum number of connections to keep open
*/ */
minConnections?: number minConnections: number
/** /**
* The max data (in and out), per average interval to allow * The max data (in and out), per average interval to allow
@ -86,39 +84,75 @@ export interface ConnectionManagerInit {
defaultPeerValue?: number defaultPeerValue?: number
/** /**
* Should preemptively guarantee connections are above the low watermark * If true, try to connect to all discovered peers up to the connection manager limit
*/ */
autoDial?: boolean autoDial?: boolean
/** /**
* How often, in milliseconds, it should preemptively guarantee connections are above the low watermark * How long to wait between attempting to keep our number of concurrent connections
* above minConnections
*/ */
autoDialInterval?: number autoDialInterval: number
/**
* Sort the known addresses of a peer before trying to dial
*/
addressSorter?: AddressSorter
/**
* Number of max concurrent dials
*/
maxParallelDials?: number
/**
* Number of max addresses to dial for a given peer
*/
maxAddrsToDial?: number
/**
* How long a dial attempt is allowed to take
*/
dialTimeout?: number
/**
* Number of max concurrent dials per peer
*/
maxDialsPerPeer?: number
/**
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>
}
export interface ConnectionManagerEvents {
'peer:connect': CustomEvent<PeerId>
'peer:disconnect': CustomEvent<PeerId>
} }
/** /**
* Responsible for managing known connections. * Responsible for managing known connections.
*/ */
export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable { export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable, Initializable {
private readonly components: Components public readonly dialer: Dialer
private readonly init: Required<ConnectionManagerInit> private components = new Components()
private readonly opts: Required<ConnectionManagerInit>
private readonly peerValues: Map<string, number> private readonly peerValues: Map<string, number>
private readonly connections: Map<string, Connection[]> private readonly connections: Map<string, Connection[]>
private started: boolean private started: boolean
private timer?: ReturnType<retimer> private timer?: ReturnType<retimer>
private readonly latencyMonitor: LatencyMonitor private readonly latencyMonitor: LatencyMonitor
constructor (components: Components, init: ConnectionManagerInit = {}) { constructor (init: ConnectionManagerInit) {
super() super()
this.components = components this.opts = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)
this.init = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, init)
if (this.init.maxConnections < this.init.minConnections) { if (this.opts.maxConnections < this.opts.minConnections) {
throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS) throw errCode(new Error('Connection Manager maxConnections must be greater than minConnections'), codes.ERR_INVALID_PARAMETERS)
} }
log('options: %o', this.init) log('options: %o', this.opts)
/** /**
* Map of peer identifiers to their peer value for pruning connections. * Map of peer identifiers to their peer value for pruning connections.
@ -153,12 +187,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
setMaxListeners?.(Infinity, this) setMaxListeners?.(Infinity, this)
} catch {} } catch {}
this.components.getUpgrader().addEventListener('connection', (evt) => { this.dialer = new Dialer(this.opts)
void this.onConnect(evt).catch(err => {
log.error(err) this.onConnect = this.onConnect.bind(this)
}) this.onDisconnect = this.onDisconnect.bind(this)
}) }
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect.bind(this))
init (components: Components): void {
this.components = components
this.dialer.init(components)
} }
isStarted () { isStarted () {
@ -171,18 +209,29 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
*/ */
async start () { async start () {
if (this.components.getMetrics() != null) { if (this.components.getMetrics() != null) {
this.timer = this.timer ?? retimer(this._checkMetrics, this.init.pollInterval) this.timer = this.timer ?? retimer(this._checkMetrics, this.opts.pollInterval)
} }
// latency monitor // latency monitor
this.latencyMonitor.start() this.latencyMonitor.start()
this._onLatencyMeasure = this._onLatencyMeasure.bind(this) this._onLatencyMeasure = this._onLatencyMeasure.bind(this)
this.latencyMonitor.addEventListener('data', this._onLatencyMeasure) this.latencyMonitor.addEventListener('data', this._onLatencyMeasure)
await this.dialer.start()
this.started = true this.started = true
log('started') log('started')
} }
async afterStart () {
this.components.getUpgrader().addEventListener('connection', this.onConnect)
this.components.getUpgrader().addEventListener('connectionEnd', this.onDisconnect)
}
async beforeStop () {
this.components.getUpgrader().removeEventListener('connection', this.onConnect)
this.components.getUpgrader().removeEventListener('connectionEnd', this.onDisconnect)
}
/** /**
* Stops the Connection Manager * Stops the Connection Manager
*/ */
@ -191,6 +240,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
this.latencyMonitor.removeEventListener('data', this._onLatencyMeasure) this.latencyMonitor.removeEventListener('data', this._onLatencyMeasure)
this.latencyMonitor.stop() this.latencyMonitor.stop()
await this.dialer.stop()
this.started = false this.started = false
await this._close() await this._close()
@ -238,23 +288,29 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
if (metrics != null) { if (metrics != null) {
try { try {
const movingAverages = metrics.getGlobal().getMovingAverages() const movingAverages = metrics.getGlobal().getMovingAverages()
const received = movingAverages.dataReceived[this.init.movingAverageInterval].movingAverage const received = movingAverages.dataReceived[this.opts.movingAverageInterval].movingAverage
await this._checkMaxLimit('maxReceivedData', received) await this._checkMaxLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this.init.movingAverageInterval].movingAverage const sent = movingAverages.dataSent[this.opts.movingAverageInterval].movingAverage
await this._checkMaxLimit('maxSentData', sent) await this._checkMaxLimit('maxSentData', sent)
const total = received + sent const total = received + sent
await this._checkMaxLimit('maxData', total) await this._checkMaxLimit('maxData', total)
log('metrics update', total) log('metrics update', total)
} finally { } finally {
this.timer = retimer(this._checkMetrics, this.init.pollInterval) this.timer = retimer(this._checkMetrics, this.opts.pollInterval)
} }
} }
} }
onConnect (evt: CustomEvent<Connection>) {
void this._onConnect(evt).catch(err => {
log.error(err)
})
}
/** /**
* Tracks the incoming connection and check the connection limit * Tracks the incoming connection and check the connection limit
*/ */
async onConnect (evt: CustomEvent<Connection>) { async _onConnect (evt: CustomEvent<Connection>) {
const { detail: connection } = evt const { detail: connection } = evt
if (!this.started) { if (!this.started) {
@ -278,10 +334,10 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
} }
if (!this.peerValues.has(peerIdStr)) { if (!this.peerValues.has(peerIdStr)) {
this.peerValues.set(peerIdStr, this.init.defaultPeerValue) this.peerValues.set(peerIdStr, this.opts.defaultPeerValue)
} }
await this._checkMaxLimit('maxConnections', this.getConnectionList().length) await this._checkMaxLimit('maxConnections', this.getConnections().length)
this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection })) this.dispatchEvent(new CustomEvent<Connection>('peer:connect', { detail: connection }))
} }
@ -311,35 +367,64 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
} }
} }
getConnectionMap (): Map<string, Connection[]> { getConnections (peerId?: PeerId): Connection[] {
return this.connections if (peerId != null) {
} return this.connections.get(peerId.toString()) ?? []
getConnectionList (): Connection[] {
let output: Connection[] = []
for (const connections of this.connections.values()) {
output = output.concat(connections)
} }
return output let conns: Connection[] = []
}
getConnections (peerId: PeerId): Connection[] { for (const c of this.connections.values()) {
return this.connections.get(peerId.toString()) ?? [] conns = conns.concat(c)
}
/**
* Get a connection with a peer
*/
getConnection (peerId: PeerId): Connection | undefined {
const connections = this.getAll(peerId)
if (connections.length > 0) {
return connections[0]
} }
return undefined return conns
}
async openConnection (peerId: PeerId, options?: AbortOptions): Promise<Connection> {
log('dial to %p', peerId)
const existingConnections = this.getConnections(peerId)
if (existingConnections.length > 0) {
log('had an existing connection to %p', peerId)
return existingConnections[0]
}
const connection = await this.dialer.dial(peerId, options)
let peerConnections = this.connections.get(peerId.toString())
if (peerConnections == null) {
peerConnections = []
this.connections.set(peerId.toString(), peerConnections)
}
// we get notified of connections via the Upgrader emitting "connection"
// events, double check we aren't already tracking this connection before
// storing it
let trackedConnection = false
for (const conn of peerConnections) {
if (conn.id === connection.id) {
trackedConnection = true
}
}
if (!trackedConnection) {
peerConnections.push(connection)
}
return connection
}
async closeConnections (peerId: PeerId): Promise<void> {
const connections = this.connections.get(peerId.toString()) ?? []
await Promise.all(
connections.map(async connection => {
return await connection.close()
})
)
} }
/** /**
@ -377,7 +462,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
* If the `value` of `name` has exceeded its limit, maybe close a connection * If the `value` of `name` has exceeded its limit, maybe close a connection
*/ */
async _checkMaxLimit (name: keyof ConnectionManagerInit, value: number) { async _checkMaxLimit (name: keyof ConnectionManagerInit, value: number) {
const limit = this.init[name] const limit = this.opts[name]
log.trace('checking limit of %s. current value: %d of %d', name, value, limit) log.trace('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) { if (value > limit) {
log('%s: limit exceeded: %p, %d', this.components.getPeerId(), name, value) log('%s: limit exceeded: %p, %d', this.components.getPeerId(), name, value)
@ -390,7 +475,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
* to the lowest valued peer. * to the lowest valued peer.
*/ */
async _maybeDisconnectOne () { async _maybeDisconnectOne () {
if (this.init.minConnections < this.connections.size) { if (this.opts.minConnections < this.connections.size) {
const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1]))) const peerValues = Array.from(new Map([...this.peerValues.entries()].sort((a, b) => a[1] - b[1])))
log('%p: sorted peer values: %j', this.components.getPeerId(), peerValues) log('%p: sorted peer values: %j', this.components.getPeerId(), peerValues)

View File

@ -70,7 +70,7 @@ export class FetchService implements Startable {
async fetch (peer: PeerId, key: string): Promise<Uint8Array | null> { async fetch (peer: PeerId, key: string): Promise<Uint8Array | null> {
log('dialing %s to %p', this.protocol, peer) log('dialing %s to %p', this.protocol, peer)
const connection = await this.components.getDialer().dial(peer) const connection = await this.components.getConnectionManager().openConnection(peer)
const { stream } = await connection.newStream([this.protocol]) const { stream } = await connection.newStream([this.protocol])
const shake = handshake(stream) const shake = handshake(stream)

View File

@ -23,7 +23,7 @@ import { codes } from '../errors.js'
import type { IncomingStreamData } from '@libp2p/interfaces/registrar' import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { Connection } from '@libp2p/interfaces/connection' import type { Connection } from '@libp2p/interfaces/connection'
import type { Startable } from '@libp2p/interfaces' import type { Startable } from '@libp2p/interfaces'
import { peerIdFromKeys, peerIdFromString } from '@libp2p/peer-id' import { peerIdFromKeys } from '@libp2p/peer-id'
import type { Components } from '@libp2p/interfaces/components' import type { Components } from '@libp2p/interfaces/components'
const log = logger('libp2p:identify') const log = logger('libp2p:identify')
@ -161,15 +161,15 @@ export class IdentifyService implements Startable {
const connections: Connection[] = [] const connections: Connection[] = []
for (const [peerIdStr, conns] of this.components.getConnectionManager().getConnectionMap().entries()) { for (const conn of this.components.getConnectionManager().getConnections()) {
const peerId = peerIdFromString(peerIdStr) const peerId = conn.remotePeer
const peer = await this.components.getPeerStore().get(peerId) const peer = await this.components.getPeerStore().get(peerId)
if (!peer.protocols.includes(this.identifyPushProtocolStr)) { if (!peer.protocols.includes(this.identifyPushProtocolStr)) {
continue continue
} }
connections.push(...conns) connections.push(conn)
} }
await this.push(connections) await this.push(connections)

View File

@ -16,11 +16,12 @@ import type { ConnectionEncrypter } from '@libp2p/interfaces/connection-encrypte
import type { PeerRouting } from '@libp2p/interfaces/peer-routing' import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
import type { ContentRouting } from '@libp2p/interfaces/content-routing' import type { ContentRouting } from '@libp2p/interfaces/content-routing'
import type { PubSub } from '@libp2p/interfaces/pubsub' import type { PubSub } from '@libp2p/interfaces/pubsub'
import type { ConnectionManager, Registrar, StreamHandler } from '@libp2p/interfaces/registrar' import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics' import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics'
import type { PeerInfo } from '@libp2p/interfaces/peer-info' import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import type { DialerInit } from '@libp2p/interfaces/dialer'
import type { KeyChain } from './keychain/index.js' import type { KeyChain } from './keychain/index.js'
import type { ConnectionManagerInit } from './connection-manager/index.js'
export interface PersistentPeerStoreOptions { export interface PersistentPeerStoreOptions {
threshold?: number threshold?: number
@ -71,29 +72,6 @@ export interface AddressesConfig {
announceFilter: (multiaddrs: Multiaddr[]) => Multiaddr[] announceFilter: (multiaddrs: Multiaddr[]) => Multiaddr[]
} }
export interface ConnectionManagerConfig {
/**
* If true, try to connect to all discovered peers up to the connection manager limit
*/
autoDial?: boolean
/**
* The maximum number of connections to keep open
*/
maxConnections: number
/**
* The minimum number of connections to keep open
*/
minConnections: number
/**
* How long to wait between attempting to keep our number of concurrent connections
* above minConnections
*/
autoDialInterval: number
}
export interface TransportManagerConfig { export interface TransportManagerConfig {
faultTolerance?: FaultTolerance faultTolerance?: FaultTolerance
} }
@ -117,11 +95,10 @@ export interface Libp2pInit {
peerId: PeerId peerId: PeerId
host: HostProperties host: HostProperties
addresses: AddressesConfig addresses: AddressesConfig
connectionManager: ConnectionManagerConfig connectionManager: ConnectionManagerInit
connectionGater: Partial<ConnectionGater> connectionGater: Partial<ConnectionGater>
transportManager: TransportManagerConfig transportManager: TransportManagerConfig
datastore: Datastore datastore: Datastore
dialer: DialerInit
metrics: MetricsInit metrics: MetricsInit
peerStore: PeerStoreInit peerStore: PeerStoreInit
peerRouting: PeerRoutingConfig peerRouting: PeerRoutingConfig

View File

@ -11,7 +11,6 @@ import { DefaultConnectionManager } from './connection-manager/index.js'
import { AutoDialler } from './connection-manager/auto-dialler.js' import { AutoDialler } from './connection-manager/auto-dialler.js'
import { Circuit } from './circuit/transport.js' import { Circuit } from './circuit/transport.js'
import { Relay } from './circuit/index.js' import { Relay } from './circuit/index.js'
import { DefaultDialer } from './dialer/index.js'
import { KeyChain } from './keychain/index.js' import { KeyChain } from './keychain/index.js'
import { DefaultMetrics } from './metrics/index.js' import { DefaultMetrics } from './metrics/index.js'
import { DefaultTransportManager } from './transport-manager.js' import { DefaultTransportManager } from './transport-manager.js'
@ -25,14 +24,15 @@ import { PeerRecordUpdater } from './peer-record-updater.js'
import { DHTPeerRouting } from './dht/dht-peer-routing.js' import { DHTPeerRouting } from './dht/dht-peer-routing.js'
import { PersistentPeerStore } from '@libp2p/peer-store' import { PersistentPeerStore } from '@libp2p/peer-store'
import { DHTContentRouting } from './dht/dht-content-routing.js' import { DHTContentRouting } from './dht/dht-content-routing.js'
import { AutoDialer } from './dialer/auto-dialer.js' import { AutoDialer } from './connection-manager/dialer/auto-dialer.js'
import { Initializable, Components, isInitializable } from '@libp2p/interfaces/components' import { Initializable, Components, isInitializable } from '@libp2p/interfaces/components'
import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Connection } from '@libp2p/interfaces/connection' import type { Connection } from '@libp2p/interfaces/connection'
import type { PeerRouting } from '@libp2p/interfaces/peer-routing' import type { PeerRouting } from '@libp2p/interfaces/peer-routing'
import type { ContentRouting } from '@libp2p/interfaces/content-routing' import type { ContentRouting } from '@libp2p/interfaces/content-routing'
import type { PubSub } from '@libp2p/interfaces/pubsub' import type { PubSub } from '@libp2p/interfaces/pubsub'
import type { ConnectionManager, Registrar, StreamHandler } from '@libp2p/interfaces/registrar' import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
import type { PeerInfo } from '@libp2p/interfaces/peer-info' import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js' import type { Libp2p, Libp2pEvents, Libp2pInit, Libp2pOptions } from './index.js'
import { validateConfig } from './config.js' import { validateConfig } from './config.js'
@ -46,6 +46,7 @@ import { unmarshalPublicKey } from '@libp2p/crypto/keys'
import type { Metrics } from '@libp2p/interfaces/metrics' import type { Metrics } from '@libp2p/interfaces/metrics'
import { DummyDHT } from './dht/dummy-dht.js' import { DummyDHT } from './dht/dummy-dht.js'
import { DummyPubSub } from './pubsub/dummy-pubsub.js' import { DummyPubSub } from './pubsub/dummy-pubsub.js'
import { PeerSet } from '@libp2p/peer-collections'
const log = logger('libp2p') const log = logger('libp2p')
@ -72,34 +73,40 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
constructor (init: Libp2pInit) { constructor (init: Libp2pInit) {
super() super()
this.services = []
this.initializables = [] this.initializables = []
this.started = false this.started = false
this.peerId = init.peerId this.peerId = init.peerId
this.components = new Components({ this.components = new Components({
peerId: init.peerId, peerId: init.peerId,
datastore: init.datastore ?? new MemoryDatastore() datastore: init.datastore ?? new MemoryDatastore(),
connectionGater: {
denyDialPeer: async () => await Promise.resolve(false),
denyDialMultiaddr: async () => await Promise.resolve(false),
denyInboundConnection: async () => await Promise.resolve(false),
denyOutboundConnection: async () => await Promise.resolve(false),
denyInboundEncryptedConnection: async () => await Promise.resolve(false),
denyOutboundEncryptedConnection: async () => await Promise.resolve(false),
denyInboundUpgradedConnection: async () => await Promise.resolve(false),
denyOutboundUpgradedConnection: async () => await Promise.resolve(false),
filterMultiaddrForPeer: async () => await Promise.resolve(true),
...init.connectionGater
}
}) })
this.components.setPeerStore(new PersistentPeerStore({
addressFilter: this.components.getConnectionGater().filterMultiaddrForPeer,
...init.peerStore
}))
this.services = [
this.components
]
// Create Metrics // Create Metrics
if (init.metrics.enabled) { if (init.metrics.enabled) {
this.metrics = this.components.setMetrics(this.configureComponent(new DefaultMetrics(init.metrics))) this.metrics = this.components.setMetrics(new DefaultMetrics(init.metrics))
} }
this.components.setConnectionGater(this.configureComponent({ this.peerStore = this.components.getPeerStore()
denyDialPeer: async () => await Promise.resolve(false),
denyDialMultiaddr: async () => await Promise.resolve(false),
denyInboundConnection: async () => await Promise.resolve(false),
denyOutboundConnection: async () => await Promise.resolve(false),
denyInboundEncryptedConnection: async () => await Promise.resolve(false),
denyOutboundEncryptedConnection: async () => await Promise.resolve(false),
denyInboundUpgradedConnection: async () => await Promise.resolve(false),
denyOutboundUpgradedConnection: async () => await Promise.resolve(false),
filterMultiaddrForPeer: async () => await Promise.resolve(true),
...init.connectionGater
}))
this.peerStore = this.components.setPeerStore(this.configureComponent(new PersistentPeerStore(this.components, init.peerStore)))
this.peerStore.addEventListener('peer', evt => { this.peerStore.addEventListener('peer', evt => {
const { detail: peerData } = evt const { detail: peerData } = evt
@ -109,32 +116,30 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
// Set up connection protector if configured // Set up connection protector if configured
if (init.connectionProtector != null) { if (init.connectionProtector != null) {
this.components.setConnectionProtector(this.configureComponent(init.connectionProtector)) this.components.setConnectionProtector(init.connectionProtector)
} }
// Set up the Upgrader // Set up the Upgrader
this.components.setUpgrader(this.configureComponent(new DefaultUpgrader(this.components, { this.components.setUpgrader(new DefaultUpgrader(this.components, {
connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)), connectionEncryption: (init.connectionEncryption ?? []).map(component => this.configureComponent(component)),
muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component)) muxers: (init.streamMuxers ?? []).map(component => this.configureComponent(component))
}))) }))
// Create the Connection Manager // Create the Connection Manager
this.connectionManager = this.components.setConnectionManager(this.configureComponent(new DefaultConnectionManager(this.components, init.connectionManager))) this.connectionManager = this.components.setConnectionManager(new DefaultConnectionManager(init.connectionManager))
// Create the Registrar // Create the Registrar
this.registrar = this.components.setRegistrar(this.configureComponent(new DefaultRegistrar(this.components))) this.registrar = this.components.setRegistrar(new DefaultRegistrar(this.components))
// Setup the transport manager // Setup the transport manager
this.components.setTransportManager(this.configureComponent(new DefaultTransportManager(this.components, init.transportManager))) this.components.setTransportManager(new DefaultTransportManager(this.components, init.transportManager))
// Addresses {listen, announce, noAnnounce} // Addresses {listen, announce, noAnnounce}
this.components.setAddressManager(this.configureComponent(new DefaultAddressManager(this.components, init.addresses))) this.components.setAddressManager(new DefaultAddressManager(this.components, init.addresses))
// update our peer record when addresses change // update our peer record when addresses change
this.configureComponent(new PeerRecordUpdater(this.components)) this.configureComponent(new PeerRecordUpdater(this.components))
this.components.setDialer(this.configureComponent(new DefaultDialer(this.components, init.dialer)))
this.configureComponent(new AutoDialler(this.components, { this.configureComponent(new AutoDialler(this.components, {
enabled: init.connectionManager.autoDial, enabled: init.connectionManager.autoDial,
minConnections: init.connectionManager.minConnections, minConnections: init.connectionManager.minConnections,
@ -169,14 +174,14 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
// dht provided components (peerRouting, contentRouting, dht) // dht provided components (peerRouting, contentRouting, dht)
if (init.dht != null) { if (init.dht != null) {
this.dht = this.components.setDHT(this.configureComponent(init.dht)) this.dht = this.components.setDHT(init.dht)
} else { } else {
this.dht = new DummyDHT() this.dht = new DummyDHT()
} }
// Create pubsub if provided // Create pubsub if provided
if (init.pubsub != null) { if (init.pubsub != null) {
this.pubsub = this.components.setPubSub(this.configureComponent(init.pubsub)) this.pubsub = this.components.setPubSub(init.pubsub)
} else { } else {
this.pubsub = new DummyPubSub() this.pubsub = new DummyPubSub()
} }
@ -216,7 +221,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
this.components.getTransportManager().add(this.configureComponent(new Circuit())) this.components.getTransportManager().add(this.configureComponent(new Circuit()))
this.configureComponent(new Relay(this.components, { this.configureComponent(new Relay(this.components, {
addressSorter: init.dialer.addressSorter, addressSorter: init.connectionManager.addressSorter,
...init.relay ...init.relay
})) }))
} }
@ -231,7 +236,8 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
const autoDialer = this.configureComponent(new AutoDialer(this.components, { const autoDialer = this.configureComponent(new AutoDialer(this.components, {
enabled: init.connectionManager.autoDial !== false, enabled: init.connectionManager.autoDial !== false,
minConnections: init.connectionManager.minConnections ?? Infinity minConnections: init.connectionManager.minConnections,
dialTimeout: init.connectionManager.dialTimeout ?? 30000
})) }))
this.addEventListener('peer:discovery', evt => { this.addEventListener('peer:discovery', evt => {
@ -379,24 +385,41 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
} }
getConnections (peerId?: PeerId): Connection[] { getConnections (peerId?: PeerId): Connection[] {
if (peerId == null) {
return this.components.getConnectionManager().getConnectionList()
}
return this.components.getConnectionManager().getConnections(peerId) return this.components.getConnectionManager().getConnections(peerId)
} }
getPeers (): PeerId[] { getPeers (): PeerId[] {
return this.components.getConnectionManager().getConnectionList() const peerSet = new PeerSet()
.map(conn => conn.remotePeer)
for (const conn of this.components.getConnectionManager().getConnections()) {
peerSet.add(conn.remotePeer)
}
return Array.from(peerSet)
} }
async dial (peer: PeerId | Multiaddr, options: AbortOptions = {}): Promise<Connection> { async dial (peer: PeerId | Multiaddr, options: AbortOptions = {}): Promise<Connection> {
return await this.components.getDialer().dial(peer, options) const { id, multiaddrs } = getPeer(peer)
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
return await this.components.getConnectionManager().openConnection(id, options)
} }
async dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options: AbortOptions = {}) { async dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options: AbortOptions = {}) {
return await this.components.getDialer().dialProtocol(peer, protocols, options) if (protocols == null) {
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
protocols = Array.isArray(protocols) ? protocols : [protocols]
if (protocols.length === 0) {
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
const connection = await this.dial(peer)
return await connection.newStream(protocols)
} }
getMultiaddrs (): Multiaddr[] { getMultiaddrs (): Multiaddr[] {
@ -406,13 +429,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
async hangUp (peer: PeerId | Multiaddr | string): Promise<void> { async hangUp (peer: PeerId | Multiaddr | string): Promise<void> {
const { id } = getPeer(peer) const { id } = getPeer(peer)
const connections = this.components.getConnectionManager().getConnections(id) await this.components.getConnectionManager().closeConnections(id)
await Promise.all(
connections.map(async connection => {
return await connection.close()
})
)
} }
/** /**

View File

@ -63,7 +63,8 @@ export class PingService implements Startable {
async ping (peer: PeerId): Promise<number> { async ping (peer: PeerId): Promise<number> {
log('dialing %s to %p', this.protocol, peer) log('dialing %s to %p', this.protocol, peer)
const { stream } = await this.components.getDialer().dialProtocol(peer, this.protocol) const connection = await this.components.getConnectionManager().openConnection(peer)
const { stream } = await connection.newStream([this.protocol])
const start = Date.now() const start = Date.now()
const data = randomBytes(PING_LENGTH) const data = randomBytes(PING_LENGTH)

View File

@ -192,7 +192,7 @@ export class DefaultRegistrar implements Registrar {
for (const { topology, protocols } of this.topologies.values()) { for (const { topology, protocols } of this.topologies.values()) {
if (supportsProtocol(added, protocols)) { if (supportsProtocol(added, protocols)) {
const connection = this.components.getConnectionManager().getConnection(peerId) const connection = this.components.getConnectionManager().getConnections(peerId)[0]
if (connection == null) { if (connection == null) {
continue continue

View File

@ -40,12 +40,15 @@ describe('Pubsub subsystem is configurable', () => {
}) })
libp2p = await createLibp2p(customOptions) libp2p = await createLibp2p(customOptions)
// @ts-expect-error not part of interface
expect(libp2p.pubsub.isStarted()).to.equal(false) expect(libp2p.pubsub.isStarted()).to.equal(false)
await libp2p.start() await libp2p.start()
// @ts-expect-error not part of interface
expect(libp2p.pubsub.isStarted()).to.equal(true) expect(libp2p.pubsub.isStarted()).to.equal(true)
await libp2p.stop() await libp2p.stop()
// @ts-expect-error not part of interface
expect(libp2p.pubsub.isStarted()).to.equal(false) expect(libp2p.pubsub.isStarted()).to.equal(false)
}) })
}) })

View File

@ -7,9 +7,8 @@ import delay from 'delay'
import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/interfaces/components' import { Components } from '@libp2p/interfaces/components'
import { stubInterface } from 'ts-sinon' import { stubInterface } from 'ts-sinon'
import type { ConnectionManager } from '@libp2p/interfaces/registrar' import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'
import type { PeerStore, Peer } from '@libp2p/interfaces/peer-store' import type { PeerStore, Peer } from '@libp2p/interfaces/peer-store'
import type { Dialer } from '@libp2p/interfaces/dialer'
describe('Auto-dialler', () => { describe('Auto-dialler', () => {
it('should not dial self', async () => { it('should not dial self', async () => {
@ -36,26 +35,24 @@ describe('Auto-dialler', () => {
])) ]))
const connectionManager = stubInterface<ConnectionManager>() const connectionManager = stubInterface<ConnectionManager>()
connectionManager.getConnectionList.returns([]) connectionManager.getConnections.returns([])
const dialer = stubInterface<Dialer>()
const autoDialler = new AutoDialler(new Components({ const autoDialler = new AutoDialler(new Components({
peerId: self.id, peerId: self.id,
peerStore, peerStore,
connectionManager, connectionManager
dialer
}), { }), {
minConnections: 10 minConnections: 10
}) })
await autoDialler.start() await autoDialler.start()
await pWaitFor(() => dialer.dial.callCount === 1) await pWaitFor(() => connectionManager.openConnection.callCount === 1)
await delay(1000) await delay(1000)
await autoDialler.stop() await autoDialler.stop()
expect(dialer.dial.callCount).to.equal(1) expect(connectionManager.openConnection.callCount).to.equal(1)
expect(dialer.dial.calledWith(self.id)).to.be.false() expect(connectionManager.openConnection.calledWith(self.id)).to.be.false()
}) })
}) })

View File

@ -18,6 +18,7 @@ import type { Connection } from '@libp2p/interfaces/connection'
import delay from 'delay' import delay from 'delay'
import type { Libp2pNode } from '../../src/libp2p.js' import type { Libp2pNode } from '../../src/libp2p.js'
import { codes } from '../../src/errors.js' import { codes } from '../../src/errors.js'
import { start } from '@libp2p/interface-compliance-tests'
describe('Connection Manager', () => { describe('Connection Manager', () => {
let libp2p: Libp2p let libp2p: Libp2p
@ -50,9 +51,14 @@ describe('Connection Manager', () => {
const peerStore = stubInterface<PeerStore>() const peerStore = stubInterface<PeerStore>()
peerStore.keyBook = stubInterface<KeyBook>() peerStore.keyBook = stubInterface<KeyBook>()
const connectionManager = new DefaultConnectionManager(new Components({ upgrader, peerStore })) const connectionManager = new DefaultConnectionManager({
maxConnections: 1000,
minConnections: 50,
autoDialInterval: 1000
})
connectionManager.init(new Components({ upgrader, peerStore }))
await connectionManager.start() await start(connectionManager)
const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1])) const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1])) const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
@ -80,9 +86,14 @@ describe('Connection Manager', () => {
const peerStore = stubInterface<PeerStore>() const peerStore = stubInterface<PeerStore>()
peerStore.keyBook = stubInterface<KeyBook>() peerStore.keyBook = stubInterface<KeyBook>()
const connectionManager = new DefaultConnectionManager(new Components({ upgrader, peerStore })) const connectionManager = new DefaultConnectionManager({
maxConnections: 1000,
minConnections: 50,
autoDialInterval: 1000
})
connectionManager.init(new Components({ upgrader, peerStore }))
await connectionManager.start() await start(connectionManager)
const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1])) const conn1 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1])) const conn2 = await mockConnection(mockMultiaddrConnection(mockDuplex(), peerIds[1]))
@ -224,11 +235,11 @@ describe('libp2p.connections', () => {
await libp2p.start() await libp2p.start()
// Wait for peer to connect // Wait for peer to connect
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === minConnections) await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === minConnections)
// Wait more time to guarantee no other connection happened // Wait more time to guarantee no other connection happened
await delay(200) await delay(200)
expect(libp2p.components.getConnectionManager().getConnectionMap().size).to.eql(minConnections) expect(libp2p.components.getConnectionManager().getConnections().length).to.eql(minConnections)
await libp2p.stop() await libp2p.stop()
}) })
@ -257,11 +268,11 @@ describe('libp2p.connections', () => {
await libp2p.start() await libp2p.start()
// Wait for peer to connect // Wait for peer to connect
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === minConnections) await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === minConnections)
// Should have connected to the peer with protocols // Should have connected to the peer with protocols
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.not.exist() expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.be.empty()
expect(libp2p.components.getConnectionManager().getConnection(nodes[1].peerId)).to.exist() expect(libp2p.components.getConnectionManager().getConnections(nodes[1].peerId)).to.not.be.empty()
await libp2p.stop() await libp2p.stop()
}) })
@ -287,15 +298,15 @@ describe('libp2p.connections', () => {
// Wait for peer to connect // Wait for peer to connect
const conn = await libp2p.dial(nodes[0].peerId) const conn = await libp2p.dial(nodes[0].peerId)
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.exist() expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.not.be.empty()
await conn.close() await conn.close()
// Closed // Closed
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === 0) await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === 0)
// Connected // Connected
await pWaitFor(() => libp2p.components.getConnectionManager().getConnectionMap().size === 1) await pWaitFor(() => libp2p.components.getConnectionManager().getConnections().length === 1)
expect(libp2p.components.getConnectionManager().getConnection(nodes[0].peerId)).to.exist() expect(libp2p.components.getConnectionManager().getConnections(nodes[0].peerId)).to.not.be.empty()
await libp2p.stop() await libp2p.stop()
}) })
@ -321,9 +332,7 @@ describe('libp2p.connections', () => {
await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs()) await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs())
await libp2p.dial(remoteLibp2p.peerId) await libp2p.dial(remoteLibp2p.peerId)
const totalConns = Array.from(libp2p.components.getConnectionManager().getConnectionMap().values()) const conns = libp2p.components.getConnectionManager().getConnections()
expect(totalConns.length).to.eql(1)
const conns = totalConns[0]
expect(conns.length).to.eql(1) expect(conns.length).to.eql(1)
const conn = conns[0] const conn = conns[0]
@ -394,7 +403,7 @@ describe('libp2p.connections', () => {
}) })
}) })
await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs()) await libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.getMultiaddrs())
await libp2p.components.getDialer().dial(remoteLibp2p.peerId) await libp2p.components.getConnectionManager().openConnection(remoteLibp2p.peerId)
for (const multiaddr of remoteLibp2p.getMultiaddrs()) { for (const multiaddr of remoteLibp2p.getMultiaddrs()) {
expect(denyDialMultiaddr.calledWith(remoteLibp2p.peerId, multiaddr)).to.be.true() expect(denyDialMultiaddr.calledWith(remoteLibp2p.peerId, multiaddr)).to.be.true()
@ -418,7 +427,7 @@ describe('libp2p.connections', () => {
const fullMultiaddr = remoteLibp2p.getMultiaddrs()[0] const fullMultiaddr = remoteLibp2p.getMultiaddrs()[0]
await libp2p.components.getDialer().dial(fullMultiaddr) await libp2p.dial(fullMultiaddr)
expect(filterMultiaddrForPeer.callCount).to.equal(2) expect(filterMultiaddrForPeer.callCount).to.equal(2)

View File

@ -79,7 +79,7 @@ describe('Connection Manager', () => {
const value = Math.random() const value = Math.random()
spies.set(value, spy) spies.set(value, spy)
connectionManager.setPeerValue(connection.remotePeer, value) connectionManager.setPeerValue(connection.remotePeer, value)
await connectionManager.onConnect(new CustomEvent('connection', { detail: connection })) await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
})) }))
// get the lowest value // get the lowest value
@ -122,7 +122,7 @@ describe('Connection Manager', () => {
await Promise.all([...new Array(max + 1)].map(async () => { await Promise.all([...new Array(max + 1)].map(async () => {
const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId())) const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), await createEd25519PeerId()))
sinon.stub(connection, 'close').callsFake(async () => spy()) // eslint-disable-line sinon.stub(connection, 'close').callsFake(async () => spy()) // eslint-disable-line
await connectionManager.onConnect(new CustomEvent('connection', { detail: connection })) await connectionManager._onConnect(new CustomEvent('connection', { detail: connection }))
})) }))
expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1) expect(connectionManagerMaybeDisconnectOneSpy.callCount).to.equal(1)

View File

@ -8,7 +8,7 @@ import { subsystemMulticodecs, createSubsystemOptions } from './utils.js'
import { createPeerId } from '../../utils/creators/peer.js' import { createPeerId } from '../../utils/creators/peer.js'
import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerId } from '@libp2p/interfaces/peer-id'
import { createLibp2pNode, Libp2pNode } from '../../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../../src/libp2p.js'
import { isStartable } from '@libp2p/interfaces' import { start } from '@libp2p/interface-compliance-tests'
const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8000') const listenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8000')
const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8001') const remoteListenAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8001')
@ -145,9 +145,7 @@ describe('DHT subsystem operates correctly', () => {
const dht = remoteLibp2p.dht const dht = remoteLibp2p.dht
if (isStartable(dht)) { await start(dht)
await dht.start()
}
// should be 0 directly after start - TODO this may be susceptible to timing bugs, we should have // should be 0 directly after start - TODO this may be susceptible to timing bugs, we should have
// the ability to report stats on the DHT routing table instead of reaching into it's heart like this // the ability to report stats on the DHT routing table instead of reaching into it's heart like this
@ -164,9 +162,7 @@ describe('DHT subsystem operates correctly', () => {
const dht = remoteLibp2p.dht const dht = remoteLibp2p.dht
if (isStartable(dht)) { await start(dht)
await dht.start()
}
await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1) await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1)
await libp2p.components.getContentRouting().put(key, value) await libp2p.components.getContentRouting().put(key, value)

View File

@ -5,12 +5,11 @@ import sinon from 'sinon'
import { AbortError } from '@libp2p/interfaces/errors' import { AbortError } from '@libp2p/interfaces/errors'
import pDefer from 'p-defer' import pDefer from 'p-defer'
import delay from 'delay' import delay from 'delay'
import { DialAction, DialRequest } from '../../src/dialer/dial-request.js' import { DialAction, DialRequest } from '../../src/connection-manager/dialer/dial-request.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks' import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import { DefaultDialer } from '../../src/dialer/index.js' import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { Components } from '@libp2p/interfaces/components'
const error = new Error('dial failure') const error = new Error('dial failure')
describe('Dial Request', () => { describe('Dial Request', () => {
@ -23,7 +22,7 @@ describe('Dial Request', () => {
} }
const dialAction: DialAction = async (num) => await actions[num.toString()]() const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController() const controller = new AbortController()
const dialer = new DefaultDialer(new Components(), { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -56,7 +55,7 @@ describe('Dial Request', () => {
} }
const dialAction: DialAction = async (num) => await actions[num.toString()]() const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController() const controller = new AbortController()
const dialer = new DefaultDialer(new Components(), { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -99,7 +98,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]() const dialAction: DialAction = async (num) => await actions[num.toString()]()
const addrs = Object.keys(actions) const addrs = Object.keys(actions)
const controller = new AbortController() const controller = new AbortController()
const dialer = new DefaultDialer(new Components(), { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -139,7 +138,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]() const dialAction: DialAction = async (num) => await actions[num.toString()]()
const controller = new AbortController() const controller = new AbortController()
const dialer = new DefaultDialer(new Components(), { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')
@ -185,7 +184,7 @@ describe('Dial Request', () => {
const dialAction: DialAction = async (num) => await actions[num.toString()]() const dialAction: DialAction = async (num) => await actions[num.toString()]()
const addrs = Object.keys(actions) const addrs = Object.keys(actions)
const controller = new AbortController() const controller = new AbortController()
const dialer = new DefaultDialer(new Components(), { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken') const dialerReleaseTokenSpy = sinon.spy(dialer, 'releaseToken')

View File

@ -17,7 +17,7 @@ import { Connection, isConnection } from '@libp2p/interfaces/connection'
import { AbortError } from '@libp2p/interfaces/errors' import { AbortError } from '@libp2p/interfaces/errors'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MemoryDatastore } from 'datastore-core/memory' import { MemoryDatastore } from 'datastore-core/memory'
import { DefaultDialer } from '../../src/dialer/index.js' import { Dialer } from '../../src/connection-manager/dialer/index.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js' import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { PersistentPeerStore } from '@libp2p/peer-store' import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js' import { DefaultTransportManager } from '../../src/transport-manager.js'
@ -25,7 +25,6 @@ import { codes as ErrorCodes } from '../../src/errors.js'
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks' import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
import Peers from '../fixtures/peers.js' import Peers from '../fixtures/peers.js'
import { Components } from '@libp2p/interfaces/components' import { Components } from '@libp2p/interfaces/components'
import type { PeerStore } from '@libp2p/interfaces/peer-store'
import { createFromJSON } from '@libp2p/peer-id-factory' import { createFromJSON } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerId } from '@libp2p/interfaces/peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
@ -40,7 +39,6 @@ const unsupportedAddr = new Multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1
describe('Dialing (direct, TCP)', () => { describe('Dialing (direct, TCP)', () => {
let remoteTM: DefaultTransportManager let remoteTM: DefaultTransportManager
let localTM: DefaultTransportManager let localTM: DefaultTransportManager
let peerStore: PeerStore
let remoteAddr: Multiaddr let remoteAddr: Multiaddr
let remoteComponents: Components let remoteComponents: Components
let localComponents: Components let localComponents: Components
@ -55,17 +53,14 @@ describe('Dialing (direct, TCP)', () => {
peerId: remotePeerId, peerId: remotePeerId,
datastore: new MemoryDatastore(), datastore: new MemoryDatastore(),
upgrader: mockUpgrader(), upgrader: mockUpgrader(),
connectionGater: mockConnectionGater() connectionGater: mockConnectionGater(),
peerStore: new PersistentPeerStore()
}) })
remoteComponents.setAddressManager(new DefaultAddressManager(remoteComponents, { remoteComponents.setAddressManager(new DefaultAddressManager(remoteComponents, {
listen: [ listen: [
listenAddr.toString() listenAddr.toString()
] ]
})) }))
peerStore = new PersistentPeerStore(remoteComponents, {
addressFilter: remoteComponents.getConnectionGater().filterMultiaddrForPeer
})
remoteComponents.setPeerStore(peerStore)
remoteTM = new DefaultTransportManager(remoteComponents) remoteTM = new DefaultTransportManager(remoteComponents)
remoteTM.add(new TCP()) remoteTM.add(new TCP())
@ -75,10 +70,12 @@ describe('Dialing (direct, TCP)', () => {
upgrader: mockUpgrader(), upgrader: mockUpgrader(),
connectionGater: mockConnectionGater() connectionGater: mockConnectionGater()
}) })
localComponents.setPeerStore(new PersistentPeerStore(localComponents, { localComponents.setPeerStore(new PersistentPeerStore())
addressFilter: localComponents.getConnectionGater().filterMultiaddrForPeer localComponents.setConnectionManager(new DefaultConnectionManager({
maxConnections: 100,
minConnections: 50,
autoDialInterval: 1000
})) }))
localComponents.setConnectionManager(new DefaultConnectionManager(localComponents))
localTM = new DefaultTransportManager(localComponents) localTM = new DefaultTransportManager(localComponents)
localTM.add(new TCP()) localTM.add(new TCP())
@ -97,7 +94,8 @@ describe('Dialing (direct, TCP)', () => {
}) })
it('should be able to connect to a remote node via its multiaddr', async () => { it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const connection = await dialer.dial(remoteAddr) const connection = await dialer.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -105,7 +103,8 @@ describe('Dialing (direct, TCP)', () => {
}) })
it('should fail to connect to an unsupported multiaddr', async () => { it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
await expect(dialer.dial(unsupportedAddr)) await expect(dialer.dial(unsupportedAddr))
.to.eventually.be.rejectedWith(Error) .to.eventually.be.rejectedWith(Error)
@ -113,7 +112,8 @@ describe('Dialing (direct, TCP)', () => {
}) })
it('should fail to connect if peer has no known addresses', async () => { it('should fail to connect if peer has no known addresses', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const peerId = await createFromJSON(Peers[1]) const peerId = await createFromJSON(Peers[1])
await expect(dialer.dial(peerId)) await expect(dialer.dial(peerId))
@ -124,7 +124,8 @@ describe('Dialing (direct, TCP)', () => {
it('should be able to connect to a given peer id', async () => { it('should be able to connect to a given peer id', async () => {
await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs()) await localComponents.getPeerStore().addressBook.set(remoteComponents.getPeerId(), remoteTM.getAddrs())
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const connection = await dialer.dial(remoteComponents.getPeerId()) const connection = await dialer.dial(remoteComponents.getPeerId())
expect(connection).to.exist() expect(connection).to.exist()
@ -134,7 +135,8 @@ describe('Dialing (direct, TCP)', () => {
it('should fail to connect to a given peer with unsupported addresses', async () => { it('should fail to connect to a given peer with unsupported addresses', async () => {
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr]) await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), [unsupportedAddr])
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
await expect(dialer.dial(remoteComponents.getPeerId())) await expect(dialer.dial(remoteComponents.getPeerId()))
.to.eventually.be.rejectedWith(Error) .to.eventually.be.rejectedWith(Error)
@ -147,7 +149,8 @@ describe('Dialing (direct, TCP)', () => {
const peerId = await createFromJSON(Peers[1]) const peerId = await createFromJSON(Peers[1])
await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr]) await localComponents.getPeerStore().addressBook.add(peerId, [...remoteAddrs, unsupportedAddr])
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
sinon.spy(localTM, 'dial') sinon.spy(localTM, 'dial')
const connection = await dialer.dial(peerId) const connection = await dialer.dial(peerId)
@ -158,9 +161,10 @@ describe('Dialing (direct, TCP)', () => {
}) })
it('should abort dials on queue task timeout', async () => { it('should abort dials on queue task timeout', async () => {
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
dialTimeout: 50 dialTimeout: 50
}) })
dialer.init(localComponents)
sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => { sinon.stub(localTM, 'dial').callsFake(async (addr, options = {}) => {
expect(options.signal).to.exist() expect(options.signal).to.exist()
@ -186,9 +190,10 @@ describe('Dialing (direct, TCP)', () => {
await localComponents.getPeerStore().addressBook.add(peerId, addrs) await localComponents.getPeerStore().addressBook.add(peerId, addrs)
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
dialer.init(localComponents)
expect(dialer.tokens).to.have.lengthOf(2) expect(dialer.tokens).to.have.lengthOf(2)
@ -298,7 +303,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
await libp2p.start() await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial') const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -325,7 +330,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
await libp2p.start() await libp2p.start()
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial') const dialerDialSpy = sinon.spy(libp2p.components.getConnectionManager(), 'openConnection')
await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs()) await libp2p.components.getPeerStore().addressBook.set(remotePeerId, remoteLibp2p.getMultiaddrs())

View File

@ -13,7 +13,7 @@ import { AbortError } from '@libp2p/interfaces/errors'
import { MemoryDatastore } from 'datastore-core/memory' import { MemoryDatastore } from 'datastore-core/memory'
import { codes as ErrorCodes } from '../../src/errors.js' import { codes as ErrorCodes } from '../../src/errors.js'
import * as Constants from '../../src/constants.js' import * as Constants from '../../src/constants.js'
import { DefaultDialer, DialTarget } from '../../src/dialer/index.js' import { Dialer, DialTarget } from '../../src/connection-manager/dialer/index.js'
import { publicAddressesFirst } from '@libp2p/utils/address-sort' import { publicAddressesFirst } from '@libp2p/utils/address-sort'
import { PersistentPeerStore } from '@libp2p/peer-store' import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultTransportManager } from '../../src/transport-manager.js' import { DefaultTransportManager } from '../../src/transport-manager.js'
@ -46,10 +46,14 @@ describe('Dialing (direct, WebSockets)', () => {
upgrader: mockUpgrader(), upgrader: mockUpgrader(),
connectionGater: mockConnectionGater() connectionGater: mockConnectionGater()
}) })
localComponents.setPeerStore(new PersistentPeerStore(localComponents, { localComponents.setPeerStore(new PersistentPeerStore({
addressFilter: localComponents.getConnectionGater().filterMultiaddrForPeer addressFilter: localComponents.getConnectionGater().filterMultiaddrForPeer
})) }))
localComponents.setConnectionManager(new DefaultConnectionManager(localComponents)) localComponents.setConnectionManager(new DefaultConnectionManager({
maxConnections: 100,
minConnections: 50,
autoDialInterval: 1000
}))
localTM = new DefaultTransportManager(localComponents) localTM = new DefaultTransportManager(localComponents)
localTM.add(new WebSockets({ filter: filters.all })) localTM.add(new WebSockets({ filter: filters.all }))
@ -67,7 +71,8 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should limit the number of tokens it provides', () => { it('should limit the number of tokens it provides', () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const maxPerPeer = Constants.MAX_PER_PEER_DIALS const maxPerPeer = Constants.MAX_PER_PEER_DIALS
expect(dialer.tokens).to.have.lengthOf(Constants.MAX_PARALLEL_DIALS) expect(dialer.tokens).to.have.lengthOf(Constants.MAX_PARALLEL_DIALS)
@ -77,9 +82,10 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should not return tokens if none are left', () => { it('should not return tokens if none are left', () => {
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
maxDialsPerPeer: Infinity maxDialsPerPeer: Infinity
}) })
dialer.init(localComponents)
const maxTokens = dialer.tokens.length const maxTokens = dialer.tokens.length
@ -90,7 +96,8 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should NOT be able to return a token twice', () => { it('should NOT be able to return a token twice', () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const tokens = dialer.getTokens(1) const tokens = dialer.getTokens(1)
expect(tokens).to.have.length(1) expect(tokens).to.have.length(1)
@ -101,7 +108,9 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should be able to connect to a remote node via its multiaddr', async () => { it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -111,7 +120,8 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should fail to connect to an unsupported multiaddr', async () => { it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
await expect(dialer.dial(unsupportedAddr.encapsulate(`/p2p/${remoteComponents.getPeerId().toString()}`))) await expect(dialer.dial(unsupportedAddr.encapsulate(`/p2p/${remoteComponents.getPeerId().toString()}`)))
.to.eventually.be.rejectedWith(Error) .to.eventually.be.rejectedWith(Error)
@ -119,7 +129,9 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should be able to connect to a given peer', async () => { it('should be able to connect to a given peer', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -129,7 +141,9 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should fail to connect to a given peer with unsupported addresses', async () => { it('should fail to connect to a given peer with unsupported addresses', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [unsupportedAddr]) await localComponents.getPeerStore().addressBook.set(remotePeerId, [unsupportedAddr])
@ -139,9 +153,11 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should abort dials on queue task timeout', async () => { it('should abort dials on queue task timeout', async () => {
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
dialTimeout: 50 dialTimeout: 50
}) })
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr]) await localComponents.getPeerStore().addressBook.set(remotePeerId, [remoteAddr])
@ -160,9 +176,11 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should throw when a peer advertises more than the allowed number of peers', async () => { it('should throw when a peer advertises more than the allowed number of peers', async () => {
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
maxAddrsToDial: 10 maxAddrsToDial: 10
}) })
dialer.init(localComponents)
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
await localComponents.getPeerStore().addressBook.set(remotePeerId, Array.from({ length: 11 }, (_, i) => new Multiaddr(`/ip4/127.0.0.1/tcp/1500${i}/ws/p2p/12D3KooWHFKTMzwerBtsVmtz4ZZEQy2heafxzWw6wNn5PPYkBxJ5`))) await localComponents.getPeerStore().addressBook.set(remotePeerId, Array.from({ length: 11 }, (_, i) => new Multiaddr(`/ip4/127.0.0.1/tcp/1500${i}/ws/p2p/12D3KooWHFKTMzwerBtsVmtz4ZZEQy2heafxzWw6wNn5PPYkBxJ5`)))
@ -181,10 +199,11 @@ describe('Dialing (direct, WebSockets)', () => {
const publicAddressesFirstSpy = sinon.spy(publicAddressesFirst) const publicAddressesFirstSpy = sinon.spy(publicAddressesFirst)
const localTMDialStub = sinon.stub(localTM, 'dial').callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), peerIdFromString(ma.getPeerId() ?? '')))) const localTMDialStub = sinon.stub(localTM, 'dial').callsFake(async (ma) => mockConnection(mockMultiaddrConnection(mockDuplex(), peerIdFromString(ma.getPeerId() ?? ''))))
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
addressSorter: publicAddressesFirstSpy, addressSorter: publicAddressesFirstSpy,
maxParallelDials: 3 maxParallelDials: 3
}) })
dialer.init(localComponents)
// Inject data in the AddressBook // Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), peerMultiaddrs) await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), peerMultiaddrs)
@ -209,9 +228,10 @@ describe('Dialing (direct, WebSockets)', () => {
] ]
const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '') const remotePeerId = peerIdFromString(remoteAddr.getPeerId() ?? '')
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
dialer.init(localComponents)
// Inject data in the AddressBook // Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remotePeerId, addrs) await localComponents.getPeerStore().addressBook.add(remotePeerId, addrs)
@ -247,9 +267,10 @@ describe('Dialing (direct, WebSockets)', () => {
new Multiaddr('/ip4/0.0.0.0/tcp/8001/ws'), new Multiaddr('/ip4/0.0.0.0/tcp/8001/ws'),
new Multiaddr('/ip4/0.0.0.0/tcp/8002/ws') new Multiaddr('/ip4/0.0.0.0/tcp/8002/ws')
] ]
const dialer = new DefaultDialer(localComponents, { const dialer = new Dialer({
maxParallelDials: 2 maxParallelDials: 2
}) })
dialer.init(localComponents)
// Inject data in the AddressBook // Inject data in the AddressBook
await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), addrs) await localComponents.getPeerStore().addressBook.add(remoteComponents.getPeerId(), addrs)
@ -287,7 +308,8 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
it('should cancel pending dial targets before proceeding', async () => { it('should cancel pending dial targets before proceeding', async () => {
const dialer = new DefaultDialer(localComponents) const dialer = new Dialer()
dialer.init(localComponents)
sinon.stub(dialer, '_createDialTarget').callsFake(async () => { sinon.stub(dialer, '_createDialTarget').callsFake(async () => {
const deferredDial = pDefer<DialTarget>() const deferredDial = pDefer<DialTarget>()
@ -309,7 +331,7 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
describe('libp2p.dialer (direct, WebSockets)', () => { describe('libp2p.dialer (direct, WebSockets)', () => {
const connectionGater = mockConnectionGater() // const connectionGater = mockConnectionGater()
let libp2p: Libp2pNode let libp2p: Libp2pNode
let peerId: PeerId let peerId: PeerId
@ -338,14 +360,16 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
], ],
connectionEncryption: [ connectionEncryption: [
NOISE NOISE
], ]
connectionGater
}) })
expect(libp2p.components.getDialer()).to.exist() const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
expect(libp2p.components.getDialer()).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS) const dialer = connectionManager.dialer
expect(libp2p.components.getDialer()).to.have.property('maxDialsPerPeer', Constants.MAX_PER_PEER_DIALS)
expect(libp2p.components.getDialer()).to.have.property('timeout', Constants.DIAL_TIMEOUT) expect(dialer).to.exist()
expect(dialer).to.have.property('tokens').with.lengthOf(Constants.MAX_PARALLEL_DIALS)
expect(dialer).to.have.property('maxDialsPerPeer', Constants.MAX_PER_PEER_DIALS)
expect(dialer).to.have.property('timeout', Constants.DIAL_TIMEOUT)
}) })
it('should be able to override dialer options', async () => { it('should be able to override dialer options', async () => {
@ -362,7 +386,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
connectionEncryption: [ connectionEncryption: [
NOISE NOISE
], ],
dialer: { connectionManager: {
maxParallelDials: 10, maxParallelDials: 10,
maxDialsPerPeer: 1, maxDialsPerPeer: 1,
dialTimeout: 1e3 // 30 second dial timeout per peer dialTimeout: 1e3 // 30 second dial timeout per peer
@ -370,10 +394,13 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
} }
libp2p = await createLibp2pNode(config) libp2p = await createLibp2pNode(config)
expect(libp2p.components.getDialer()).to.exist() const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
expect(libp2p.components.getDialer()).to.have.property('tokens').with.lengthOf(config.dialer.maxParallelDials) const dialer = connectionManager.dialer
expect(libp2p.components.getDialer()).to.have.property('maxDialsPerPeer', config.dialer.maxDialsPerPeer)
expect(libp2p.components.getDialer()).to.have.property('timeout', config.dialer.dialTimeout) expect(dialer).to.exist()
expect(dialer).to.have.property('tokens').with.lengthOf(config.connectionManager.maxParallelDials)
expect(dialer).to.have.property('maxDialsPerPeer', config.connectionManager.maxDialsPerPeer)
expect(dialer).to.have.property('timeout', config.connectionManager.dialTimeout)
}) })
it('should use the dialer for connecting', async () => { it('should use the dialer for connecting', async () => {
@ -392,7 +419,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
] ]
}) })
const dialerDialSpy = sinon.spy(libp2p.components.getDialer(), 'dial') const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialerDialSpy = sinon.spy(connectionManager.dialer, 'dial')
const addressBookAddSpy = sinon.spy(libp2p.components.getPeerStore().addressBook, 'add') const addressBookAddSpy = sinon.spy(libp2p.components.getPeerStore().addressBook, 'add')
await libp2p.start() await libp2p.start()
@ -514,7 +542,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
] ]
}) })
sinon.stub(libp2p.components.getDialer() as DefaultDialer, '_createDialTarget').callsFake(async () => { const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
sinon.stub(connectionManager.dialer, '_createDialTarget').callsFake(async () => {
const deferredDial = pDefer<DialTarget>() const deferredDial = pDefer<DialTarget>()
return await deferredDial.promise return await deferredDial.promise
}) })
@ -552,7 +581,8 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
await libp2p.start() await libp2p.start()
const dialerDestroyStub = sinon.spy(libp2p.components.getDialer() as DefaultDialer, 'stop') const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const dialerDestroyStub = sinon.spy(connectionManager.dialer, 'stop')
await libp2p.stop() await libp2p.stop()

View File

@ -46,18 +46,16 @@ describe('Dialing (resolvable addresses)', () => {
listen: [`${relayAddr.toString()}/p2p-circuit`] listen: [`${relayAddr.toString()}/p2p-circuit`]
}, },
connectionManager: { connectionManager: {
autoDial: false autoDial: false,
resolvers: {
dnsaddr: resolver
}
}, },
relay: { relay: {
enabled: true, enabled: true,
hop: { hop: {
enabled: false enabled: false
} }
},
dialer: {
resolvers: {
dnsaddr: resolver
}
} }
}), }),
started: true started: true
@ -68,18 +66,16 @@ describe('Dialing (resolvable addresses)', () => {
listen: [`${relayAddr.toString()}/p2p-circuit`] listen: [`${relayAddr.toString()}/p2p-circuit`]
}, },
connectionManager: { connectionManager: {
autoDial: false autoDial: false,
resolvers: {
dnsaddr: resolver
}
}, },
relay: { relay: {
enabled: true, enabled: true,
hop: { hop: {
enabled: false enabled: false
} }
},
dialer: {
resolvers: {
dnsaddr: resolver
}
} }
}), }),
started: true started: true

View File

@ -27,13 +27,14 @@ import {
} from '../../src/identify/consts.js' } from '../../src/identify/consts.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js' import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js' import { DefaultTransportManager } from '../../src/transport-manager.js'
import { CustomEvent, Startable } from '@libp2p/interfaces' import { CustomEvent } from '@libp2p/interfaces'
import delay from 'delay' import delay from 'delay'
import pWaitFor from 'p-wait-for' import pWaitFor from 'p-wait-for'
import { peerIdFromString } from '@libp2p/peer-id' import { peerIdFromString } from '@libp2p/peer-id'
import type { PeerId } from '@libp2p/interfaces/peer-id' import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Libp2pNode } from '../../src/libp2p.js' import type { Libp2pNode } from '../../src/libp2p.js'
import { pEvent } from 'p-event' import { pEvent } from 'p-event'
import { start, stop } from '@libp2p/interface-compliance-tests'
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
@ -46,7 +47,7 @@ const defaultInit = {
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
async function createComponents (index: number, services: Startable[]) { async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index]) const peerId = await createFromJSON(Peers[index])
const components = new Components({ const components = new Components({
@ -54,25 +55,22 @@ async function createComponents (index: number, services: Startable[]) {
datastore: new MemoryDatastore(), datastore: new MemoryDatastore(),
registrar: mockRegistrar(), registrar: mockRegistrar(),
upgrader: mockUpgrader(), upgrader: mockUpgrader(),
connectionGater: mockConnectionGater() connectionGater: mockConnectionGater(),
peerStore: new PersistentPeerStore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
}) })
const peerStore = new PersistentPeerStore(components, {
addressFilter: components.getConnectionGater().filterMultiaddrForPeer
})
components.setPeerStore(peerStore)
components.setAddressManager(new DefaultAddressManager(components, { components.setAddressManager(new DefaultAddressManager(components, {
announce: listenMaddrs.map(ma => ma.toString()) announce: listenMaddrs.map(ma => ma.toString())
})) }))
const connectionManager = new DefaultConnectionManager(components)
services.push(connectionManager)
components.setConnectionManager(connectionManager)
const transportManager = new DefaultTransportManager(components) const transportManager = new DefaultTransportManager(components)
services.push(transportManager)
components.setTransportManager(transportManager) components.setTransportManager(transportManager)
await peerStore.protoBook.set(peerId, protocols) await components.getPeerStore().protoBook.set(peerId, protocols)
return components return components
} }
@ -83,36 +81,35 @@ describe('Identify', () => {
let localPeerRecordUpdater: PeerRecordUpdater let localPeerRecordUpdater: PeerRecordUpdater
let remotePeerRecordUpdater: PeerRecordUpdater let remotePeerRecordUpdater: PeerRecordUpdater
let services: Startable[]
beforeEach(async () => { beforeEach(async () => {
services = [] localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
localComponents = await createComponents(0, services)
remoteComponents = await createComponents(1, services)
localPeerRecordUpdater = new PeerRecordUpdater(localComponents) localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents) remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents)
await Promise.all( await Promise.all([
services.map(s => s.start()) start(localComponents),
) start(remoteComponents)
])
}) })
afterEach(async () => { afterEach(async () => {
sinon.restore() sinon.restore()
await Promise.all( await Promise.all([
services.map(s => s.stop()) stop(localComponents),
) stop(remoteComponents)
])
}) })
it('should be able to identify another peer', async () => { it('should be able to identify another peer', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit) const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await localIdentify.start() await start(localIdentify)
await remoteIdentify.start() await start(remoteIdentify)
const [localToRemote] = connectionPair(localComponents, remoteComponents) const [localToRemote] = connectionPair(localComponents, remoteComponents)
@ -146,14 +143,14 @@ describe('Identify', () => {
agentVersion: agentVersion agentVersion: agentVersion
} }
}) })
await localIdentify.start() await start(localIdentify)
const remoteIdentify = new IdentifyService(remoteComponents, { const remoteIdentify = new IdentifyService(remoteComponents, {
protocolPrefix: 'ipfs', protocolPrefix: 'ipfs',
host: { host: {
agentVersion: agentVersion agentVersion: agentVersion
} }
}) })
await remoteIdentify.start() await start(remoteIdentify)
const [localToRemote] = connectionPair(localComponents, remoteComponents) const [localToRemote] = connectionPair(localComponents, remoteComponents)
@ -179,8 +176,8 @@ describe('Identify', () => {
const localIdentify = new IdentifyService(localComponents, defaultInit) const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await localIdentify.start() await start(localIdentify)
await remoteIdentify.start() await start(remoteIdentify)
const [localToRemote] = connectionPair(localComponents, remoteComponents) const [localToRemote] = connectionPair(localComponents, remoteComponents)
@ -231,14 +228,14 @@ describe('Identify', () => {
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion')) await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion'))
.to.eventually.be.undefined() .to.eventually.be.undefined()
await localIdentify.start() await start(localIdentify)
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'AgentVersion')) await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'AgentVersion'))
.to.eventually.deep.equal(uint8ArrayFromString(agentVersion)) .to.eventually.deep.equal(uint8ArrayFromString(agentVersion))
await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion')) await expect(localComponents.getPeerStore().metadataBook.getValue(localComponents.getPeerId(), 'ProtocolVersion'))
.to.eventually.be.ok() .to.eventually.be.ok()
await localIdentify.stop() await stop(localIdentify)
}) })
describe('push', () => { describe('push', () => {
@ -246,8 +243,8 @@ describe('Identify', () => {
const localIdentify = new IdentifyService(localComponents, defaultInit) const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await localIdentify.start() await start(localIdentify)
await remoteIdentify.start() await start(remoteIdentify)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
@ -317,8 +314,8 @@ describe('Identify', () => {
isCertified: true isCertified: true
}]) }])
await localIdentify.stop() await stop(localIdentify)
await remoteIdentify.stop() await stop(remoteIdentify)
}) })
// LEGACY // LEGACY
@ -326,8 +323,8 @@ describe('Identify', () => {
const localIdentify = new IdentifyService(localComponents, defaultInit) const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await localIdentify.start() await start(localIdentify)
await remoteIdentify.start() await start(remoteIdentify)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
@ -390,8 +387,8 @@ describe('Identify', () => {
isCertified: false isCertified: false
}]) }])
await localIdentify.stop() await stop(localIdentify)
await remoteIdentify.stop() await stop(remoteIdentify)
}) })
}) })

View File

@ -14,11 +14,16 @@ import type { DefaultMetrics } from '../../src/metrics/index.js'
describe('libp2p.metrics', () => { describe('libp2p.metrics', () => {
let libp2p: Libp2pNode let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
afterEach(async () => { afterEach(async () => {
if (libp2p != null) { if (libp2p != null) {
await libp2p.stop() await libp2p.stop()
} }
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
}
}) })
it('should disable metrics by default', async () => { it('should disable metrics by default', async () => {
@ -56,8 +61,7 @@ describe('libp2p.metrics', () => {
}) })
it('should record metrics on connections and streams when enabled', async () => { it('should record metrics on connections and streams when enabled', async () => {
let remoteLibp2p: Libp2pNode [libp2p, remoteLibp2p] = await Promise.all([
;[libp2p, remoteLibp2p] = await Promise.all([
createNode({ createNode({
config: createBaseOptions({ config: createBaseOptions({
metrics: { metrics: {
@ -117,8 +121,7 @@ describe('libp2p.metrics', () => {
}) })
it('should move disconnected peers to the old peers list', async () => { it('should move disconnected peers to the old peers list', async () => {
let remoteLibp2p [libp2p, remoteLibp2p] = await Promise.all([
;[libp2p, remoteLibp2p] = await Promise.all([
createNode({ createNode({
config: createBaseOptions({ config: createBaseOptions({
metrics: { metrics: {

View File

@ -9,6 +9,7 @@ import { createPeerId } from '../utils/creators/peer.js'
import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id' import { isPeerId, PeerId } from '@libp2p/interfaces/peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks' import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
import type { Startable } from '@libp2p/interfaces'
describe('peer discovery', () => { describe('peer discovery', () => {
describe('basic functions', () => { describe('basic functions', () => {
@ -42,7 +43,7 @@ describe('peer discovery', () => {
await libp2p.peerStore.addressBook.set(remotePeerId, [new Multiaddr('/ip4/165.1.1.1/tcp/80')]) await libp2p.peerStore.addressBook.set(remotePeerId, [new Multiaddr('/ip4/165.1.1.1/tcp/80')])
const deferred = defer() const deferred = defer()
sinon.stub(libp2p.components.getDialer(), 'dial').callsFake(async (id) => { sinon.stub(libp2p.components.getConnectionManager(), 'openConnection').callsFake(async (id) => {
if (!isPeerId(id)) { if (!isPeerId(id)) {
throw new Error('Tried to dial something that was not a peer ID') throw new Error('Tried to dial something that was not a peer ID')
} }
@ -69,13 +70,22 @@ describe('peer discovery', () => {
let started = 0 let started = 0
let stopped = 0 let stopped = 0
class MockDiscovery { class MockDiscovery implements Startable {
static tag = 'mock' static tag = 'mock'
started = false
isStarted () {
return this.started
}
start () { start () {
this.started = true
started++ started++
} }
stop () { stop () {
this.started = false
stopped++ stopped++
} }

View File

@ -6,7 +6,7 @@ import { MemoryDatastore } from 'datastore-core/memory'
import { createTopology } from '@libp2p/topology' import { createTopology } from '@libp2p/topology'
import { PersistentPeerStore } from '@libp2p/peer-store' import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultRegistrar } from '../../src/registrar.js' import { DefaultRegistrar } from '../../src/registrar.js'
import { mockConnectionGater, mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks' import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
import { createPeerId, createNode } from '../utils/creators/peer.js' import { createPeerId, createNode } from '../utils/creators/peer.js'
import { createBaseOptions } from '../utils/base-options.browser.js' import { createBaseOptions } from '../utils/base-options.browser.js'
import type { Registrar } from '@libp2p/interfaces/registrar' import type { Registrar } from '@libp2p/interfaces/registrar'
@ -24,7 +24,6 @@ import { Mplex } from '@libp2p/mplex'
const protocol = '/test/1.0.0' const protocol = '/test/1.0.0'
describe('registrar', () => { describe('registrar', () => {
const connectionGater = mockConnectionGater()
let components: Components let components: Components
let registrar: Registrar let registrar: Registrar
let peerId: PeerId let peerId: PeerId
@ -38,12 +37,14 @@ describe('registrar', () => {
components = new Components({ components = new Components({
peerId, peerId,
datastore: new MemoryDatastore(), datastore: new MemoryDatastore(),
upgrader: mockUpgrader() upgrader: mockUpgrader(),
peerStore: new PersistentPeerStore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
}) })
components.setPeerStore(new PersistentPeerStore(components, {
addressFilter: connectionGater.filterMultiaddrForPeer
}))
components.setConnectionManager(new DefaultConnectionManager(components))
registrar = new DefaultRegistrar(components) registrar = new DefaultRegistrar(components)
}) })

View File

@ -263,7 +263,7 @@ describe('auto-relay', () => {
await relayLibp2p1.hangUp(relayLibp2p3.peerId) await relayLibp2p1.hangUp(relayLibp2p3.peerId)
// Stub dial // Stub dial
sinon.stub(relayLibp2p1.components.getDialer(), 'dial').callsFake(async () => { sinon.stub(relayLibp2p1.components.getConnectionManager(), 'openConnection').callsFake(async () => {
deferred.resolve() deferred.resolve()
return await Promise.reject(new Error('failed to dial')) return await Promise.reject(new Error('failed to dial'))
}) })

View File

@ -118,8 +118,8 @@ describe('Dialing (via relay, TCP)', () => {
.and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) .and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
// We should not be connected to the relay, because we weren't before the dial // We should not be connected to the relay, because we weren't before the dial
const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId) const srcToRelayConns = srcLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
expect(srcToRelayConn).to.not.exist() expect(srcToRelayConns).to.be.empty()
}) })
it('dialer should stay connected to an already connected relay on hop failure', async () => { it('dialer should stay connected to an already connected relay on hop failure', async () => {
@ -135,9 +135,9 @@ describe('Dialing (via relay, TCP)', () => {
.to.eventually.be.rejected() .to.eventually.be.rejected()
.and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) .and.to.have.nested.property('.errors[0].code', Errors.ERR_HOP_REQUEST_FAILED)
const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId) const srcToRelayConn = srcLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
expect(srcToRelayConn).to.exist() expect(srcToRelayConn).to.have.lengthOf(1)
expect(srcToRelayConn?.stat.status).to.equal('OPEN') expect(srcToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN')
}) })
it('destination peer should stay connected to an already connected relay on hop failure', async () => { it('destination peer should stay connected to an already connected relay on hop failure', async () => {
@ -166,8 +166,8 @@ describe('Dialing (via relay, TCP)', () => {
streamHandler.close() streamHandler.close()
// should still be connected // should still be connected
const dstToRelayConn = dstLibp2p.components.getConnectionManager().getConnection(relayLibp2p.peerId) const dstToRelayConn = dstLibp2p.components.getConnectionManager().getConnections(relayLibp2p.peerId)
expect(dstToRelayConn).to.exist() expect(dstToRelayConn).to.have.lengthOf(1)
expect(dstToRelayConn?.stat.status).to.equal('OPEN') expect(dstToRelayConn).to.have.nested.property('[0].stat.status', 'OPEN')
}) })
}) })

View File

@ -8,7 +8,7 @@ import { PersistentPeerStore } from '@libp2p/peer-store'
import { PeerRecord } from '@libp2p/peer-record' import { PeerRecord } from '@libp2p/peer-record'
import { TCP } from '@libp2p/tcp' import { TCP } from '@libp2p/tcp'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import { mockUpgrader, mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks' import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import sinon from 'sinon' import sinon from 'sinon'
import Peers from '../fixtures/peers.js' import Peers from '../fixtures/peers.js'
import pWaitFor from 'p-wait-for' import pWaitFor from 'p-wait-for'
@ -23,7 +23,6 @@ const addrs = [
] ]
describe('Transport Manager (TCP)', () => { describe('Transport Manager (TCP)', () => {
const connectionGater = mockConnectionGater()
let tm: DefaultTransportManager let tm: DefaultTransportManager
let localPeer: PeerId let localPeer: PeerId
let components: Components let components: Components
@ -39,9 +38,7 @@ describe('Transport Manager (TCP)', () => {
upgrader: mockUpgrader() upgrader: mockUpgrader()
}) })
components.setAddressManager(new DefaultAddressManager(components, { listen: addrs.map(addr => addr.toString()) })) components.setAddressManager(new DefaultAddressManager(components, { listen: addrs.map(addr => addr.toString()) }))
components.setPeerStore(new PersistentPeerStore(components, { components.setPeerStore(new PersistentPeerStore())
addressFilter: connectionGater.filterMultiaddrForPeer
}))
tm = new DefaultTransportManager(components) tm = new DefaultTransportManager(components)