diff --git a/doc/API.md b/doc/API.md index 908fff23..46c8e7a7 100644 --- a/doc/API.md +++ b/doc/API.md @@ -97,7 +97,9 @@ Creates an instance of Libp2p. | options.modules | [`Array`](./CONFIGURATION.md#modules) | libp2p [modules](./CONFIGURATION.md#modules) to use | | [options.addresses] | `{ listen: Array, announce: Array, announceFilter: (ma: Array) => Array }` | Addresses for transport listening and to advertise to the network | | [options.config] | `object` | libp2p modules configuration and core configuration | -| [options.host] | `{ agentVersion: string }` | libp2p host options | +| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number }` | libp2p identify protocol options | +| [options.ping] | `{ protocolPrefix: string }` | libp2p ping protocol options | +| [options.fetch] | `{ protocolPrefix: string }` | libp2p fetch protocol options | | [options.connectionManager] | [`object`](./CONFIGURATION.md#configuring-connection-manager) | libp2p Connection Manager [configuration](./CONFIGURATION.md#configuring-connection-manager) | | [options.transportManager] | [`object`](./CONFIGURATION.md#configuring-transport-manager) | libp2p transport manager [configuration](./CONFIGURATION.md#configuring-transport-manager) | | [options.datastore] | `object` | must implement [ipfs/interface-datastore](https://github.com/ipfs/interface-datastore) (in memory datastore will be used if not provided) | diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index df5e689a..89d4acc8 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -885,7 +885,12 @@ Changing the protocol name prefix can isolate default public network (IPFS) for ```js const node = await createLibp2p({ - protocolPrefix: 'ipfs' // default + identify: { + protocolPrefix: 'ipfs' // default + }, + ping: { + protocolPrefix: 'ipfs' // default + } }) /* protocols: [ diff --git a/doc/migrations/v0.36-v.037.md b/doc/migrations/v0.36-v.037.md index 2dd3322f..d4482fee 100644 --- a/doc/migrations/v0.36-v.037.md +++ b/doc/migrations/v0.36-v.037.md @@ -37,6 +37,7 @@ The following changes have been made to the configuration object: 3. Use of the `enabled` flag has been removed - if you don't want a particular feature enabled, don't pass a module implementing that feature 4. Some keys have been renamed = `transport` -> `transports`, `streamMuxer` -> `streamMuxers`, `connEncryption` -> `connectionEncryption`, etc 5. Keys from `config.dialer` have been moved to `config.connectionManager` as the connection manager is now responsible for managing connections +6. The `protocolPrefix` configuration option is now passed on a per-protocol basis for `identify`, `fetch` and `ping` **Before** @@ -71,6 +72,7 @@ const node = await Libp2p.create({ MulticastDNS ] }, + protocolPrefix: 'ipfs', config: { peerDiscovery: { autoDial: true, @@ -136,7 +138,10 @@ const node = await createLibp2p({ new MulticastDNS({ interval: 1000 }) - ] + ], + identify: { + protocolPrefix: 'ipfs' + } }) ``` diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json index 22cd66fb..5a3bedb6 100644 --- a/examples/delegated-routing/package.json +++ b/examples/delegated-routing/package.json @@ -3,7 +3,7 @@ "version": "0.1.0", "private": true, "dependencies": { - "@chainsafe/libp2p-noise": "^6.1.1", + "@chainsafe/libp2p-noise": "^6.2.0", "ipfs-core": "^0.14.1", "libp2p": "../../", "@libp2p/delegated-content-routing": "^1.0.1", diff --git a/examples/libp2p-in-the-browser/package.json b/examples/libp2p-in-the-browser/package.json index c182fea0..4a6c6df5 100644 --- a/examples/libp2p-in-the-browser/package.json +++ b/examples/libp2p-in-the-browser/package.json @@ -9,7 +9,7 @@ }, "license": "ISC", "dependencies": { - "@chainsafe/libp2p-noise": "^6.1.1", + "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^1.0.4", "@libp2p/mplex": "^1.0.4", "@libp2p/webrtc-star": "^1.0.8", diff --git a/examples/webrtc-direct/package.json b/examples/webrtc-direct/package.json index a888ea7f..6d64d081 100644 --- a/examples/webrtc-direct/package.json +++ b/examples/webrtc-direct/package.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "@libp2p/webrtc-direct": "^1.0.1", - "@chainsafe/libp2p-noise": "^6.1.1", + "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^1.0.4", "@libp2p/mplex": "^1.0.4", "libp2p": "../../", diff --git a/package.json b/package.json index 22c39dd6..21996777 100644 --- a/package.json +++ b/package.json @@ -95,7 +95,7 @@ "@achingbrain/nat-port-mapper": "^1.0.3", "@libp2p/connection": "^2.0.2", "@libp2p/crypto": "^0.22.11", - "@libp2p/interfaces": "^2.0.1", + "@libp2p/interfaces": "^2.0.2", "@libp2p/logger": "^1.1.4", "@libp2p/multistream-select": "^1.0.4", "@libp2p/peer-collections": "^1.0.2", @@ -127,7 +127,6 @@ "it-pipe": "^2.0.3", "it-sort": "^1.0.1", "it-stream-types": "^1.0.4", - "it-take": "^1.0.2", "merge-options": "^3.0.4", "multiformats": "^9.6.3", "mutable-proxy": "^1.0.0", @@ -146,14 +145,14 @@ "xsalsa20": "^1.1.0" }, "devDependencies": { - "@chainsafe/libp2p-noise": "^6.1.1", + "@chainsafe/libp2p-noise": "^6.2.0", "@libp2p/bootstrap": "^1.0.4", "@libp2p/daemon-client": "^1.0.2", "@libp2p/daemon-server": "^1.0.2", "@libp2p/delegated-content-routing": "^1.0.2", "@libp2p/delegated-peer-routing": "^1.0.2", "@libp2p/floodsub": "^1.0.6", - "@libp2p/interface-compliance-tests": "^2.0.1", + "@libp2p/interface-compliance-tests": "^2.0.3", "@libp2p/interop": "^1.0.3", "@libp2p/kad-dht": "^1.0.9", "@libp2p/mdns": "^1.0.5", diff --git a/src/config.ts b/src/config.ts index 8707d433..432d7e9a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -35,9 +35,6 @@ const DefaultConfig: Partial = { transportManager: { faultTolerance: FaultTolerance.FATAL_ALL }, - host: { - agentVersion: AGENT_VERSION - }, metrics: { enabled: false, computeThrottleMaxQueueSize: 1000, @@ -56,7 +53,6 @@ const DefaultConfig: Partial = { bootDelay: 10e3 } }, - protocolPrefix: 'ipfs', nat: { enabled: true, ttl: 7200, @@ -77,6 +73,19 @@ const DefaultConfig: Partial = { enabled: false, maxListeners: 2 } + }, + identify: { + protocolPrefix: 'ipfs', + host: { + agentVersion: AGENT_VERSION + }, + timeout: 30000 + }, + ping: { + protocolPrefix: 'ipfs' + }, + fetch: { + protocolPrefix: 'libp2p' } } diff --git a/src/fetch/constants.ts b/src/fetch/constants.ts index c9c425d6..1c2551bb 100644 --- a/src/fetch/constants.ts +++ b/src/fetch/constants.ts @@ -1,3 +1,4 @@ // https://github.com/libp2p/specs/tree/master/fetch#wire-protocol -export const PROTOCOL = '/libp2p/fetch/0.0.1' +export const PROTOCOL_VERSION = '0.0.1' +export const PROTOCOL_NAME = 'fetch' diff --git a/src/fetch/index.ts b/src/fetch/index.ts index 5ae9b67c..59874cb0 100644 --- a/src/fetch/index.ts +++ b/src/fetch/index.ts @@ -4,16 +4,19 @@ import { codes } from '../errors.js' import * as lp from 'it-length-prefixed' import { FetchRequest, FetchResponse } from './pb/proto.js' import { handshake } from 'it-handshake' -import { PROTOCOL } from './constants.js' +import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' import type { PeerId } from '@libp2p/interfaces/peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { Stream } from '@libp2p/interfaces/connection' import type { IncomingStreamData } from '@libp2p/interfaces/registrar' import type { Components } from '@libp2p/interfaces/components' +import type { AbortOptions } from '@libp2p/interfaces' +import type { Duplex } from 'it-stream-types' +import { abortableDuplex } from 'abortable-iterator' const log = logger('libp2p:fetch') -export interface FetchInit { +export interface FetchServiceInit { protocolPrefix: string } @@ -33,15 +36,15 @@ export interface LookupFunction { * by a fixed prefix that all keys that should be routed to that lookup function will start with. */ export class FetchService implements Startable { + public readonly protocol: string private readonly components: Components private readonly lookupFunctions: Map - private readonly protocol: string private started: boolean - constructor (components: Components, init: FetchInit) { + constructor (components: Components, init: FetchServiceInit) { this.started = false this.components = components - this.protocol = PROTOCOL + this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` this.lookupFunctions = new Map() // Maps key prefix to value lookup function this.handleMessage = this.handleMessage.bind(this) } @@ -67,12 +70,19 @@ export class FetchService implements Startable { /** * Sends a request to fetch the value associated with the given key from the given peer */ - async fetch (peer: PeerId, key: string): Promise { + async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise { log('dialing %s to %p', this.protocol, peer) - const connection = await this.components.getConnectionManager().openConnection(peer) - const { stream } = await connection.newStream([this.protocol]) - const shake = handshake(stream) + const connection = await this.components.getConnectionManager().openConnection(peer, options) + const { stream } = await connection.newStream([this.protocol], options) + let source: Duplex = stream + + // make stream abortable if AbortSignal passed + if (options.signal != null) { + source = abortableDuplex(stream, options.signal) + } + + const shake = handshake(source) // send message shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice()) diff --git a/src/identify/index.ts b/src/identify/index.ts index 023c289c..123895aa 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -2,8 +2,6 @@ import { logger } from '@libp2p/logger' import errCode from 'err-code' import * as lp from 'it-length-prefixed' import { pipe } from 'it-pipe' -import all from 'it-all' -import take from 'it-take' import drain from 'it-drain' import first from 'it-first' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' @@ -21,13 +19,19 @@ import { } from './consts.js' import { codes } from '../errors.js' import type { IncomingStreamData } from '@libp2p/interfaces/registrar' -import type { Connection } from '@libp2p/interfaces/connection' +import type { Connection, Stream } from '@libp2p/interfaces/connection' import type { Startable } from '@libp2p/interfaces/startable' import { peerIdFromKeys } from '@libp2p/peer-id' import type { Components } from '@libp2p/interfaces/components' +import { TimeoutController } from 'timeout-abort-controller' +import type { AbortOptions } from '@libp2p/interfaces' +import { abortableDuplex } from 'abortable-iterator' +import type { Duplex } from 'it-stream-types' const log = logger('libp2p:identify') +const IDENTIFY_TIMEOUT = 30000 + export interface HostProperties { agentVersion: string } @@ -35,6 +39,7 @@ export interface HostProperties { export interface IdentifyServiceInit { protocolPrefix: string host: HostProperties + timeout?: number } export class IdentifyService implements Startable { @@ -46,11 +51,13 @@ export class IdentifyService implements Startable { agentVersion: string } + private readonly init: IdentifyServiceInit private started: boolean constructor (components: Components, init: IdentifyServiceInit) { this.components = components this.started = false + this.init = init this.handleMessage = this.handleMessage.bind(this) @@ -128,8 +135,17 @@ export class IdentifyService implements Startable { const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId()) const pushes = connections.map(async connection => { + const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + let stream: Stream | undefined + try { - const { stream } = await connection.newStream([this.identifyPushProtocolStr]) + const data = await connection.newStream([this.identifyPushProtocolStr], { + signal: timeoutController.signal + }) + stream = data.stream + + // make stream abortable + const source: Duplex = abortableDuplex(stream, timeoutController.signal) await pipe( [Identify.encode({ @@ -138,12 +154,18 @@ export class IdentifyService implements Startable { protocols })], lp.encode(), - stream, + source, drain ) } catch (err: any) { // Just log errors log.error('could not push identify update to peer', err) + } finally { + if (stream != null) { + stream.close() + } + + timeoutController.clear() } }) @@ -175,31 +197,44 @@ export class IdentifyService implements Startable { await this.push(connections) } + async _identify (connection: Connection, options: AbortOptions = {}): Promise { + const { stream } = await connection.newStream([this.identifyProtocolStr], options) + let source: Duplex = stream + + // make stream abortable if AbortSignal passed + if (options.signal != null) { + source = abortableDuplex(stream, options.signal) + } + + try { + const data = await pipe( + [], + source, + lp.decode(), + async (source) => await first(source) + ) + + if (data == null) { + throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED) + } + + try { + return Identify.decode(data) + } catch (err: any) { + throw errCode(err, codes.ERR_INVALID_MESSAGE) + } + } finally { + stream.close() + } + } + /** * Requests the `Identify` message from peer associated with the given `connection`. * If the identified peer does not match the `PeerId` associated with the connection, * an error will be thrown. */ - async identify (connection: Connection): Promise { - const { stream } = await connection.newStream([this.identifyProtocolStr]) - const [data] = await pipe( - [], - stream, - lp.decode(), - (source) => take(source, 1), - async (source) => await all(source) - ) - - if (data == null) { - throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED) - } - - let message: Identify - try { - message = Identify.decode(data) - } catch (err: any) { - throw errCode(err, codes.ERR_INVALID_MESSAGE) - } + async identify (connection: Connection, options: AbortOptions = {}): Promise { + const message = await this._identify(connection, options) const { publicKey, @@ -308,6 +343,8 @@ export class IdentifyService implements Startable { */ async _handleIdentify (data: IncomingStreamData) { const { connection, stream } = data + const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) + try { const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0) const peerData = await this.components.getPeerStore().get(this.components.getPeerId()) @@ -335,14 +372,20 @@ export class IdentifyService implements Startable { protocols: peerData.protocols }) + // make stream abortable + const source: Duplex = abortableDuplex(stream, timeoutController.signal) + await pipe( [message], lp.encode(), - stream, + source, drain ) } catch (err: any) { log.error('could not respond to identify request', err) + } finally { + stream.close() + timeoutController.clear() } } @@ -351,12 +394,16 @@ export class IdentifyService implements Startable { */ async _handlePush (data: IncomingStreamData) { const { connection, stream } = data + const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT) let message: Identify | undefined try { + // make stream abortable + const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const data = await pipe( [], - stream, + source, lp.decode(), async (source) => await first(source) ) @@ -366,6 +413,9 @@ export class IdentifyService implements Startable { } } catch (err: any) { return log.error('received invalid message', err) + } finally { + stream.close() + timeoutController.clear() } if (message == null) { diff --git a/src/index.ts b/src/index.ts index 7d411bed..9a530f71 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,7 @@ import type { EventEmitter } from '@libp2p/interfaces/events' import type { Startable } from '@libp2p/interfaces/startable' import type { Multiaddr } from '@multiformats/multiaddr' import type { FaultTolerance } from './transport-manager.js' -import type { HostProperties } from './identify/index.js' +import type { IdentifyServiceInit } from './identify/index.js' import type { DualDHT } from '@libp2p/interfaces/dht' import type { Datastore } from 'interface-datastore' import type { PeerStore, PeerStoreInit } from '@libp2p/interfaces/peer-store' @@ -24,6 +24,8 @@ import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics' import type { PeerInfo } from '@libp2p/interfaces/peer-info' import type { KeyChain } from './keychain/index.js' import type { ConnectionManagerInit } from './connection-manager/index.js' +import type { PingServiceInit } from './ping/index.js' +import type { FetchServiceInit } from './fetch/index.js' export interface PersistentPeerStoreOptions { threshold?: number @@ -95,7 +97,6 @@ export interface RefreshManagerConfig { export interface Libp2pInit { peerId: PeerId - host: HostProperties addresses: AddressesConfig connectionManager: ConnectionManagerInit connectionGater: Partial @@ -105,9 +106,11 @@ export interface Libp2pInit { peerStore: PeerStoreInit peerRouting: PeerRoutingConfig keychain: KeychainConfig - protocolPrefix: string nat: NatManagerConfig relay: RelayConfig + identify: IdentifyServiceInit + ping: PingServiceInit + fetch: FetchServiceInit transports: Transport[] streamMuxers?: StreamMuxerFactory[] @@ -195,12 +198,12 @@ export interface Libp2p extends Startable, EventEmitter { /** * Pings the given peer in order to obtain the operation latency */ - ping: (peer: Multiaddr |PeerId) => Promise + ping: (peer: Multiaddr | PeerId, options?: AbortOptions) => Promise /** * Sends a request to fetch the value associated with the given key from the given peer. */ - fetch: (peer: PeerId | Multiaddr | string, key: string) => Promise + fetch: (peer: PeerId | Multiaddr | string, key: string, options?: AbortOptions) => Promise /** * Returns the public key for the passed PeerId. If the PeerId is of the 'RSA' type diff --git a/src/libp2p.ts b/src/libp2p.ts index 611dbe10..9d2dafed 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -166,10 +166,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { if (init.streamMuxers != null && init.streamMuxers.length > 0) { // Add the identify service since we can multiplex this.identifyService = new IdentifyService(this.components, { - protocolPrefix: init.protocolPrefix, - host: { - agentVersion: init.host.agentVersion - } + ...init.identify }) this.configureComponent(this.identifyService) } @@ -229,11 +226,11 @@ export class Libp2pNode extends EventEmitter implements Libp2p { } this.fetchService = this.configureComponent(new FetchService(this.components, { - protocolPrefix: init.protocolPrefix + ...init.fetch })) this.pingService = this.configureComponent(new PingService(this.components, { - protocolPrefix: init.protocolPrefix + ...init.ping })) const autoDialer = this.configureComponent(new AutoDialer(this.components, { @@ -419,9 +416,9 @@ export class Libp2pNode extends EventEmitter implements Libp2p { throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM) } - const connection = await this.dial(peer) + const connection = await this.dial(peer, options) - return await connection.newStream(protocols) + return await connection.newStream(protocols, options) } getMultiaddrs (): Multiaddr[] { @@ -473,24 +470,24 @@ export class Libp2pNode extends EventEmitter implements Libp2p { throw errCode(new Error(`Node not responding with its public key: ${peer.toString()}`), codes.ERR_INVALID_RECORD) } - async fetch (peer: PeerId | Multiaddr | string, key: string): Promise { + async fetch (peer: PeerId | Multiaddr | string, key: string, options: AbortOptions = {}): Promise { const { id, multiaddrs } = getPeer(peer) if (multiaddrs != null) { await this.components.getPeerStore().addressBook.add(id, multiaddrs) } - return await this.fetchService.fetch(id, key) + return await this.fetchService.fetch(id, key, options) } - async ping (peer: PeerId | Multiaddr | string): Promise { + async ping (peer: PeerId | Multiaddr | string, options: AbortOptions = {}): Promise { const { id, multiaddrs } = getPeer(peer) if (multiaddrs.length > 0) { await this.components.getPeerStore().addressBook.add(id, multiaddrs) } - return await this.pingService.ping(id) + return await this.pingService.ping(id, options) } async handle (protocols: string | string[], handler: StreamHandler): Promise { diff --git a/src/ping/index.ts b/src/ping/index.ts index 03cceffe..685e1cd5 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -10,6 +10,9 @@ import type { IncomingStreamData } from '@libp2p/interfaces/registrar' import type { PeerId } from '@libp2p/interfaces/peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { Components } from '@libp2p/interfaces/components' +import type { AbortOptions } from '@libp2p/interfaces' +import type { Duplex } from 'it-stream-types' +import { abortableDuplex } from 'abortable-iterator' const log = logger('libp2p:ping') @@ -18,8 +21,8 @@ export interface PingServiceInit { } export class PingService implements Startable { + public readonly protocol: string private readonly components: Components - private readonly protocol: string private started: boolean constructor (components: Components, init: PingServiceInit) { @@ -60,25 +63,36 @@ export class PingService implements Startable { * @param {PeerId|Multiaddr} peer * @returns {Promise} */ - async ping (peer: PeerId): Promise { + async ping (peer: PeerId, options: AbortOptions = {}): Promise { log('dialing %s to %p', this.protocol, peer) - const connection = await this.components.getConnectionManager().openConnection(peer) - const { stream } = await connection.newStream([this.protocol]) + const connection = await this.components.getConnectionManager().openConnection(peer, options) + const { stream } = await connection.newStream([this.protocol], options) const start = Date.now() const data = randomBytes(PING_LENGTH) - const result = await pipe( - [data], - stream, - async (source) => await first(source) - ) - const end = Date.now() + let source: Duplex = stream - if (result == null || !uint8ArrayEquals(data, result)) { - throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) + // make stream abortable if AbortSignal passed + if (options.signal != null) { + source = abortableDuplex(stream, options.signal) } - return end - start + try { + const result = await pipe( + [data], + source, + async (source) => await first(source) + ) + const end = Date.now() + + if (result == null || !uint8ArrayEquals(data, result)) { + throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) + } + + return end - start + } finally { + stream.close() + } } } diff --git a/src/upgrader.ts b/src/upgrader.ts index 27cd729e..f83365ac 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -15,6 +15,7 @@ import type { PeerId } from '@libp2p/interfaces/peer-id' import type { MultiaddrConnection, Upgrader, UpgraderEvents } from '@libp2p/interfaces/transport' import type { Duplex } from 'it-stream-types' import type { Components } from '@libp2p/interfaces/components' +import type { AbortOptions } from '@libp2p/interfaces' const log = logger('libp2p:upgrader') @@ -266,7 +267,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg } = opts let muxer: StreamMuxer | undefined - let newStream: ((multicodecs: string[]) => Promise) | undefined + let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise) | undefined let connection: Connection // eslint-disable-line prefer-const if (muxerFactory != null) { @@ -308,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg } }) - newStream = async (protocols: string[]): Promise => { + newStream = async (protocols: string[], options: AbortOptions = {}): Promise => { if (muxer == null) { throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE) } @@ -319,7 +320,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg const metrics = this.components.getMetrics() try { - let { stream, protocol } = await mss.select(protocols) + let { stream, protocol } = await mss.select(protocols, options) if (metrics != null) { stream = metrics.trackStream({ stream, remotePeer, protocol }) @@ -328,6 +329,11 @@ export class DefaultUpgrader extends EventEmitter implements Upg return { stream: { ...muxedStream, ...stream }, protocol } } catch (err: any) { log.error('could not create new stream', err) + + if (err.code != null) { + throw err + } + throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) } } diff --git a/test/configuration/protocol-prefix.node.ts b/test/configuration/protocol-prefix.node.ts index a48bbee1..c21cdd56 100644 --- a/test/configuration/protocol-prefix.node.ts +++ b/test/configuration/protocol-prefix.node.ts @@ -18,13 +18,21 @@ describe('Protocol prefix is configurable', () => { it('protocolPrefix is provided', async () => { const testProtocol = 'test-protocol' libp2p = await createLibp2pNode(mergeOptions(baseOptions, { - protocolPrefix: testProtocol + identify: { + protocolPrefix: testProtocol + }, + ping: { + protocolPrefix: testProtocol + }, + fetch: { + protocolPrefix: testProtocol + } })) await libp2p.start() const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId) expect(protocols).to.include.members([ - '/libp2p/fetch/0.0.1', + `/${testProtocol}/fetch/0.0.1`, '/libp2p/circuit/relay/0.1.0', `/${testProtocol}/id/1.0.0`, `/${testProtocol}/id/push/1.0.0`, @@ -41,7 +49,8 @@ describe('Protocol prefix is configurable', () => { '/libp2p/circuit/relay/0.1.0', '/ipfs/id/1.0.0', '/ipfs/id/push/1.0.0', - '/ipfs/ping/1.0.0' + '/ipfs/ping/1.0.0', + '/libp2p/fetch/0.0.1' ]) }) }) diff --git a/test/fetch/index.spec.ts b/test/fetch/index.spec.ts new file mode 100644 index 00000000..0e9d173b --- /dev/null +++ b/test/fetch/index.spec.ts @@ -0,0 +1,133 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { FetchService } from '../../src/fetch/index.js' +import Peers from '../fixtures/peers.js' +import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks' +import { createFromJSON } from '@libp2p/peer-id-factory' +import { Components } from '@libp2p/interfaces/components' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { start, stop } from '@libp2p/interfaces/startable' +import { CustomEvent } from '@libp2p/interfaces/events' +import { TimeoutController } from 'timeout-abort-controller' +import delay from 'delay' +import { pipe } from 'it-pipe' + +const defaultInit = { + protocolPrefix: 'ipfs' +} + +async function createComponents (index: number) { + const peerId = await createFromJSON(Peers[index]) + + const components = new Components({ + peerId, + registrar: mockRegistrar(), + upgrader: mockUpgrader(), + connectionManager: new DefaultConnectionManager({ + minConnections: 50, + maxConnections: 1000, + autoDialInterval: 1000 + }) + }) + + return components +} + +describe('fetch', () => { + let localComponents: Components + let remoteComponents: Components + + beforeEach(async () => { + localComponents = await createComponents(0) + remoteComponents = await createComponents(1) + + await Promise.all([ + start(localComponents), + start(remoteComponents) + ]) + }) + + afterEach(async () => { + sinon.restore() + + await Promise.all([ + stop(localComponents), + stop(remoteComponents) + ]) + }) + + it('should be able to fetch from another peer', async () => { + const key = 'key' + const value = Uint8Array.from([0, 1, 2, 3, 4]) + const localFetch = new FetchService(localComponents, defaultInit) + const remoteFetch = new FetchService(remoteComponents, defaultInit) + + remoteFetch.registerLookupFunction(key, async (identifier) => { + expect(identifier).to.equal(key) + + return value + }) + + await start(localFetch) + await start(remoteFetch) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // Run fetch + const result = await localFetch.fetch(remoteComponents.getPeerId(), key) + + expect(result).to.equalBytes(value) + }) + + it('should time out fetching from another peer when waiting for the record', async () => { + const key = 'key' + const localFetch = new FetchService(localComponents, defaultInit) + const remoteFetch = new FetchService(remoteComponents, defaultInit) + + await start(localFetch) + await start(remoteFetch) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // replace existing handler with a really slow one + await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol) + await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => { + void pipe( + stream, + async function * (source) { + for await (const chunk of source) { + // longer than the timeout + await delay(1000) + + yield chunk + } + }, + stream + ) + }) + + const newStreamSpy = sinon.spy(localToRemote, 'newStream') + + // 10 ms timeout + const timeoutController = new TimeoutController(10) + + // Run fetch, should time out + await expect(localFetch.fetch(remoteComponents.getPeerId(), key, { + signal: timeoutController.signal + })) + .to.eventually.be.rejected.with.property('code', 'ABORT_ERR') + + // should have closed stream + expect(newStreamSpy).to.have.property('callCount', 1) + const { stream } = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('timeline.close') + }) +}) diff --git a/test/identify/index.spec.ts b/test/identify/index.spec.ts index df47bc58..7ce5d75e 100644 --- a/test/identify/index.spec.ts +++ b/test/identify/index.spec.ts @@ -3,17 +3,13 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { codes } from '../../src/errors.js' import { IdentifyService, Message } from '../../src/identify/index.js' import Peers from '../fixtures/peers.js' -import { createLibp2pNode } from '../../src/libp2p.js' import { PersistentPeerStore } from '@libp2p/peer-store' -import { createBaseOptions } from '../utils/base-options.browser.js' import { DefaultAddressManager } from '../../src/address-manager/index.js' import { MemoryDatastore } from 'datastore-core/memory' -import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js' import * as lp from 'it-length-prefixed' import drain from 'it-drain' import { pipe } from 'it-pipe' @@ -27,14 +23,9 @@ import { } from '../../src/identify/consts.js' import { DefaultConnectionManager } from '../../src/connection-manager/index.js' import { DefaultTransportManager } from '../../src/transport-manager.js' -import { CustomEvent } from '@libp2p/interfaces/events' import delay from 'delay' -import pWaitFor from 'p-wait-for' -import { peerIdFromString } from '@libp2p/peer-id' -import type { PeerId } from '@libp2p/interfaces/peer-id' -import type { Libp2pNode } from '../../src/libp2p.js' -import { pEvent } from 'p-event' import { start, stop } from '@libp2p/interfaces/startable' +import { TimeoutController } from 'timeout-abort-controller' const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] @@ -75,18 +66,16 @@ async function createComponents (index: number) { return components } -describe('Identify', () => { +describe('identify', () => { let localComponents: Components let remoteComponents: Components - let localPeerRecordUpdater: PeerRecordUpdater let remotePeerRecordUpdater: PeerRecordUpdater beforeEach(async () => { localComponents = await createComponents(0) remoteComponents = await createComponents(1) - localPeerRecordUpdater = new PeerRecordUpdater(localComponents) remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents) await Promise.all([ @@ -238,355 +227,47 @@ describe('Identify', () => { await stop(localIdentify) }) - describe('push', () => { - it('should be able to push identify updates to another peer', async () => { - const localIdentify = new IdentifyService(localComponents, defaultInit) - const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) + it('should time out during identify', async () => { + const localIdentify = new IdentifyService(localComponents, defaultInit) + const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) - await start(localIdentify) - await start(remoteIdentify) + await start(localIdentify) + await start(remoteIdentify) - const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + const [localToRemote] = connectionPair(localComponents, remoteComponents) - // ensure connections are registered by connection manager - localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { - detail: localToRemote - })) - remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { - detail: remoteToLocal - })) + // replace existing handler with a really slow one + await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY) + await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY, ({ stream }) => { + void pipe( + stream, + async function * (source) { + // we receive no data in the identify protocol, we just send our data + await drain(source) - // identify both ways - await localIdentify.identify(localToRemote) - await remoteIdentify.identify(remoteToLocal) + // longer than the timeout + await delay(1000) - const updatedProtocol = '/special-new-protocol/1.0.0' - const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322') - - // should have protocols but not our new one - const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) - expect(identifiedProtocols).to.not.be.empty() - expect(identifiedProtocols).to.not.include(updatedProtocol) - - // should have addresses but not our new one - const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) - expect(identifiedAddresses).to.not.be.empty() - expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString()) - - // update local data - change event will trigger push - await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol]) - await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress]) - - // needed to update the peer record and send our supported addresses - const addressManager = localComponents.getAddressManager() - addressManager.getAddresses = () => { - return [updatedAddress] - } - - // ensure sequence number of peer record we are about to create is different - await delay(1000) - - // make sure we have a peer record to send - await localPeerRecordUpdater.update() - - // wait for the remote peer store to notice the changes - const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs') - - // push updated peer record to connections - await localIdentify.pushToPeerStore() - - await eventPromise - - // should have new protocol - const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) - expect(updatedProtocols).to.not.be.empty() - expect(updatedProtocols).to.include(updatedProtocol) - - // should have new address - const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) - expect(updatedAddresses.map(a => { - return { - multiaddr: a.multiaddr.toString(), - isCertified: a.isCertified - } - })).to.deep.equal([{ - multiaddr: updatedAddress.toString(), - isCertified: true - }]) - - await stop(localIdentify) - await stop(remoteIdentify) + yield new Uint8Array() + }, + stream + ) }) - // LEGACY - it('should be able to push identify updates to another peer with no certified peer records support', async () => { - const localIdentify = new IdentifyService(localComponents, defaultInit) - const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) + const newStreamSpy = sinon.spy(localToRemote, 'newStream') - await start(localIdentify) - await start(remoteIdentify) + // 10 ms timeout + const timeoutController = new TimeoutController(10) - const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + // Run identify + await expect(localIdentify.identify(localToRemote, { + signal: timeoutController.signal + })) + .to.eventually.be.rejected.with.property('code', 'ABORT_ERR') - // ensure connections are registered by connection manager - localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { - detail: localToRemote - })) - remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { - detail: remoteToLocal - })) - - // identify both ways - await localIdentify.identify(localToRemote) - await remoteIdentify.identify(remoteToLocal) - - const updatedProtocol = '/special-new-protocol/1.0.0' - const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322') - - // should have protocols but not our new one - const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) - expect(identifiedProtocols).to.not.be.empty() - expect(identifiedProtocols).to.not.include(updatedProtocol) - - // should have addresses but not our new one - const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) - expect(identifiedAddresses).to.not.be.empty() - expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString()) - - // update local data - change event will trigger push - await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol]) - await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress]) - - // needed to send our supported addresses - const addressManager = localComponents.getAddressManager() - addressManager.getAddresses = () => { - return [updatedAddress] - } - - // wait until remote peer store notices protocol list update - const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols') - - await localIdentify.pushToPeerStore() - - await waitForUpdate - - // should have new protocol - const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) - expect(updatedProtocols).to.not.be.empty() - expect(updatedProtocols).to.include(updatedProtocol) - - // should have new address - const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) - expect(updatedAddresses.map(a => { - return { - multiaddr: a.multiaddr.toString(), - isCertified: a.isCertified - } - })).to.deep.equal([{ - multiaddr: updatedAddress.toString(), - isCertified: false - }]) - - await stop(localIdentify) - await stop(remoteIdentify) - }) - }) - - describe('libp2p.dialer.identifyService', () => { - let peerId: PeerId - let libp2p: Libp2pNode - let remoteLibp2p: Libp2pNode - const remoteAddr = MULTIADDRS_WEBSOCKETS[0] - - before(async () => { - peerId = await createFromJSON(Peers[0]) - }) - - afterEach(async () => { - sinon.restore() - - if (libp2p != null) { - await libp2p.stop() - } - }) - - after(async () => { - if (remoteLibp2p != null) { - await remoteLibp2p.stop() - } - }) - - it('should run identify automatically after connecting', async () => { - libp2p = await createLibp2pNode(createBaseOptions({ - peerId - })) - - await libp2p.start() - - if (libp2p.identifyService == null) { - throw new Error('Identity service was not configured') - } - - const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') - const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') - const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add') - - const connection = await libp2p.dial(remoteAddr) - expect(connection).to.exist() - - // Wait for peer store to be updated - // Dialer._createDialTarget (add), Identify (consume) - await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) - expect(identityServiceIdentifySpy.callCount).to.equal(1) - - // The connection should have no open streams - await pWaitFor(() => connection.streams.length === 0) - await connection.close() - }) - - it('should store remote agent and protocol versions in metadataBook after connecting', async () => { - libp2p = await createLibp2pNode(createBaseOptions({ - peerId - })) - - await libp2p.start() - - if (libp2p.identifyService == null) { - throw new Error('Identity service was not configured') - } - - const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') - const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') - const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add') - - const connection = await libp2p.dial(remoteAddr) - expect(connection).to.exist() - - // Wait for peer store to be updated - // Dialer._createDialTarget (add), Identify (consume) - await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) - expect(identityServiceIdentifySpy.callCount).to.equal(1) - - // The connection should have no open streams - await pWaitFor(() => connection.streams.length === 0) - await connection.close() - - const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '') - - const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion') - const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion') - - expect(storedAgentVersion).to.exist() - expect(storedProtocolVersion).to.exist() - }) - - it('should push protocol updates to an already connected peer', async () => { - libp2p = await createLibp2pNode(createBaseOptions({ - peerId - })) - - await libp2p.start() - - if (libp2p.identifyService == null) { - throw new Error('Identity service was not configured') - } - - const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') - const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push') - const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect') - const connection = await libp2p.dial(remoteAddr) - - expect(connection).to.exist() - // Wait for connection event to be emitted - await connectionPromise - - // Wait for identify to finish - await identityServiceIdentifySpy.firstCall.returnValue - sinon.stub(libp2p, 'isStarted').returns(true) - - await libp2p.handle('/echo/2.0.0', () => {}) - await libp2p.unhandle('/echo/2.0.0') - - // the protocol change event listener in the identity service is async - await pWaitFor(() => identityServicePushSpy.callCount === 2) - - // Verify the remote peer is notified of both changes - expect(identityServicePushSpy.callCount).to.equal(2) - - for (const call of identityServicePushSpy.getCalls()) { - const [connections] = call.args - expect(connections.length).to.equal(1) - expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId()) - await call.returnValue - } - - // Verify the streams close - await pWaitFor(() => connection.streams.length === 0) - }) - - it('should store host data and protocol version into metadataBook', async () => { - const agentVersion = 'js-project/1.0.0' - - libp2p = await createLibp2pNode(createBaseOptions({ - peerId, - host: { - agentVersion - } - })) - - await libp2p.start() - - if (libp2p.identifyService == null) { - throw new Error('Identity service was not configured') - } - - const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion') - const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion') - - expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array())) - expect(storedProtocolVersion).to.exist() - }) - - it('should push multiaddr updates to an already connected peer', async () => { - libp2p = await createLibp2pNode(createBaseOptions({ - peerId - })) - - await libp2p.start() - - if (libp2p.identifyService == null) { - throw new Error('Identity service was not configured') - } - - const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') - const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push') - const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect') - const connection = await libp2p.dial(remoteAddr) - - expect(connection).to.exist() - // Wait for connection event to be emitted - await connectionPromise - - // Wait for identify to finish - await identityServiceIdentifySpy.firstCall.returnValue - sinon.stub(libp2p, 'isStarted').returns(true) - - await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')]) - - // the protocol change event listener in the identity service is async - await pWaitFor(() => identityServicePushSpy.callCount === 1) - - // Verify the remote peer is notified of change - expect(identityServicePushSpy.callCount).to.equal(1) - for (const call of identityServicePushSpy.getCalls()) { - const [connections] = call.args - expect(connections.length).to.equal(1) - expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId()) - await call.returnValue - } - - // Verify the streams close - await pWaitFor(() => connection.streams.length === 0) - }) + // should have closed stream + expect(newStreamSpy).to.have.property('callCount', 1) + const { stream } = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('timeline.close') }) }) diff --git a/test/identify/push.spec.ts b/test/identify/push.spec.ts new file mode 100644 index 00000000..8f0b62e6 --- /dev/null +++ b/test/identify/push.spec.ts @@ -0,0 +1,296 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { Multiaddr } from '@multiformats/multiaddr' +import { IdentifyService } from '../../src/identify/index.js' +import Peers from '../fixtures/peers.js' +import { PersistentPeerStore } from '@libp2p/peer-store' +import { DefaultAddressManager } from '../../src/address-manager/index.js' +import { MemoryDatastore } from 'datastore-core/memory' +import drain from 'it-drain' +import { pipe } from 'it-pipe' +import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks' +import { createFromJSON } from '@libp2p/peer-id-factory' +import { Components } from '@libp2p/interfaces/components' +import { PeerRecordUpdater } from '../../src/peer-record-updater.js' +import { + MULTICODEC_IDENTIFY, + MULTICODEC_IDENTIFY_PUSH +} from '../../src/identify/consts.js' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { DefaultTransportManager } from '../../src/transport-manager.js' +import { CustomEvent } from '@libp2p/interfaces/events' +import delay from 'delay' +import { pEvent } from 'p-event' +import { start, stop } from '@libp2p/interfaces/startable' + +const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')] + +const defaultInit = { + protocolPrefix: 'ipfs', + host: { + agentVersion: 'v1.0.0' + } +} + +const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH] + +async function createComponents (index: number) { + const peerId = await createFromJSON(Peers[index]) + + const components = new Components({ + peerId, + datastore: new MemoryDatastore(), + registrar: mockRegistrar(), + upgrader: mockUpgrader(), + connectionGater: mockConnectionGater(), + peerStore: new PersistentPeerStore(), + connectionManager: new DefaultConnectionManager({ + minConnections: 50, + maxConnections: 1000, + autoDialInterval: 1000 + }) + }) + components.setAddressManager(new DefaultAddressManager(components, { + announce: listenMaddrs.map(ma => ma.toString()) + })) + + const transportManager = new DefaultTransportManager(components) + components.setTransportManager(transportManager) + + await components.getPeerStore().protoBook.set(peerId, protocols) + + return components +} + +describe('identify (push)', () => { + let localComponents: Components + let remoteComponents: Components + + let localPeerRecordUpdater: PeerRecordUpdater + + beforeEach(async () => { + localComponents = await createComponents(0) + remoteComponents = await createComponents(1) + + localPeerRecordUpdater = new PeerRecordUpdater(localComponents) + + await Promise.all([ + start(localComponents), + start(remoteComponents) + ]) + }) + + afterEach(async () => { + sinon.restore() + + await Promise.all([ + stop(localComponents), + stop(remoteComponents) + ]) + }) + + it('should be able to push identify updates to another peer', async () => { + const localIdentify = new IdentifyService(localComponents, defaultInit) + const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) + + await start(localIdentify) + await start(remoteIdentify) + + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + + // ensure connections are registered by connection manager + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { + detail: localToRemote + })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { + detail: remoteToLocal + })) + + // identify both ways + await localIdentify.identify(localToRemote) + await remoteIdentify.identify(remoteToLocal) + + const updatedProtocol = '/special-new-protocol/1.0.0' + const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322') + + // should have protocols but not our new one + const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) + expect(identifiedProtocols).to.not.be.empty() + expect(identifiedProtocols).to.not.include(updatedProtocol) + + // should have addresses but not our new one + const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) + expect(identifiedAddresses).to.not.be.empty() + expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString()) + + // update local data - change event will trigger push + await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol]) + await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress]) + + // needed to update the peer record and send our supported addresses + const addressManager = localComponents.getAddressManager() + addressManager.getAddresses = () => { + return [updatedAddress] + } + + // ensure sequence number of peer record we are about to create is different + await delay(1000) + + // make sure we have a peer record to send + await localPeerRecordUpdater.update() + + // wait for the remote peer store to notice the changes + const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs') + + // push updated peer record to connections + await localIdentify.pushToPeerStore() + + await eventPromise + + // should have new protocol + const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) + expect(updatedProtocols).to.not.be.empty() + expect(updatedProtocols).to.include(updatedProtocol) + + // should have new address + const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) + expect(updatedAddresses.map(a => { + return { + multiaddr: a.multiaddr.toString(), + isCertified: a.isCertified + } + })).to.deep.equal([{ + multiaddr: updatedAddress.toString(), + isCertified: true + }]) + + await stop(localIdentify) + await stop(remoteIdentify) + }) + + it('should time out during push identify', async () => { + let streamEnded = false + const localIdentify = new IdentifyService(localComponents, { + ...defaultInit, + timeout: 10 + }) + const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) + + await start(localIdentify) + await start(remoteIdentify) + + // simulate connection between nodes + const [localToRemote] = connectionPair(localComponents, remoteComponents) + + // replace existing handler with a really slow one + await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY_PUSH) + await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY_PUSH, ({ stream }) => { + void pipe( + stream, + async function * (source) { + // ignore the sent data + await drain(source) + + // longer than the timeout + await delay(1000) + + // the delay should have caused the local push to time out so this should + // occur after the local push method invocation has completed + streamEnded = true + + yield new Uint8Array() + }, + stream + ) + }) + + const newStreamSpy = sinon.spy(localToRemote, 'newStream') + + // push updated peer record to remote + await localIdentify.push([localToRemote]) + + // should have closed stream + expect(newStreamSpy).to.have.property('callCount', 1) + const { stream } = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('timeline.close') + + // method should have returned before the remote handler completes as we timed + // out so we ignore the return value + expect(streamEnded).to.be.false() + }) + + // LEGACY + it('should be able to push identify updates to another peer with no certified peer records support', async () => { + const localIdentify = new IdentifyService(localComponents, defaultInit) + const remoteIdentify = new IdentifyService(remoteComponents, defaultInit) + + await start(localIdentify) + await start(remoteIdentify) + + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + + // ensure connections are registered by connection manager + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { + detail: localToRemote + })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { + detail: remoteToLocal + })) + + // identify both ways + await localIdentify.identify(localToRemote) + await remoteIdentify.identify(remoteToLocal) + + const updatedProtocol = '/special-new-protocol/1.0.0' + const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322') + + // should have protocols but not our new one + const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) + expect(identifiedProtocols).to.not.be.empty() + expect(identifiedProtocols).to.not.include(updatedProtocol) + + // should have addresses but not our new one + const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) + expect(identifiedAddresses).to.not.be.empty() + expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString()) + + // update local data - change event will trigger push + await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol]) + await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress]) + + // needed to send our supported addresses + const addressManager = localComponents.getAddressManager() + addressManager.getAddresses = () => { + return [updatedAddress] + } + + // wait until remote peer store notices protocol list update + const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols') + + await localIdentify.pushToPeerStore() + + await waitForUpdate + + // should have new protocol + const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId()) + expect(updatedProtocols).to.not.be.empty() + expect(updatedProtocols).to.include(updatedProtocol) + + // should have new address + const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId()) + expect(updatedAddresses.map(a => { + return { + multiaddr: a.multiaddr.toString(), + isCertified: a.isCertified + } + })).to.deep.equal([{ + multiaddr: updatedAddress.toString(), + isCertified: false + }]) + + await stop(localIdentify) + await stop(remoteIdentify) + }) +}) diff --git a/test/identify/service.spec.ts b/test/identify/service.spec.ts new file mode 100644 index 00000000..3f198eb2 --- /dev/null +++ b/test/identify/service.spec.ts @@ -0,0 +1,216 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { Multiaddr } from '@multiformats/multiaddr' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import Peers from '../fixtures/peers.js' +import { createLibp2pNode } from '../../src/libp2p.js' +import { createBaseOptions } from '../utils/base-options.browser.js' +import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js' +import { createFromJSON } from '@libp2p/peer-id-factory' +import pWaitFor from 'p-wait-for' +import { peerIdFromString } from '@libp2p/peer-id' +import type { PeerId } from '@libp2p/interfaces/peer-id' +import type { Libp2pNode } from '../../src/libp2p.js' +import { pEvent } from 'p-event' + +describe('libp2p.dialer.identifyService', () => { + let peerId: PeerId + let libp2p: Libp2pNode + let remoteLibp2p: Libp2pNode + const remoteAddr = MULTIADDRS_WEBSOCKETS[0] + + before(async () => { + peerId = await createFromJSON(Peers[0]) + }) + + afterEach(async () => { + sinon.restore() + + if (libp2p != null) { + await libp2p.stop() + } + }) + + after(async () => { + if (remoteLibp2p != null) { + await remoteLibp2p.stop() + } + }) + + it('should run identify automatically after connecting', async () => { + libp2p = await createLibp2pNode(createBaseOptions({ + peerId + })) + + await libp2p.start() + + if (libp2p.identifyService == null) { + throw new Error('Identity service was not configured') + } + + const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') + const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') + const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add') + + const connection = await libp2p.dial(remoteAddr) + expect(connection).to.exist() + + // Wait for peer store to be updated + // Dialer._createDialTarget (add), Identify (consume) + await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) + expect(identityServiceIdentifySpy.callCount).to.equal(1) + + // The connection should have no open streams + await pWaitFor(() => connection.streams.length === 0) + await connection.close() + }) + + it('should store remote agent and protocol versions in metadataBook after connecting', async () => { + libp2p = await createLibp2pNode(createBaseOptions({ + peerId + })) + + await libp2p.start() + + if (libp2p.identifyService == null) { + throw new Error('Identity service was not configured') + } + + const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') + const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') + const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add') + + const connection = await libp2p.dial(remoteAddr) + expect(connection).to.exist() + + // Wait for peer store to be updated + // Dialer._createDialTarget (add), Identify (consume) + await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) + expect(identityServiceIdentifySpy.callCount).to.equal(1) + + // The connection should have no open streams + await pWaitFor(() => connection.streams.length === 0) + await connection.close() + + const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '') + + const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion') + const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion') + + expect(storedAgentVersion).to.exist() + expect(storedProtocolVersion).to.exist() + }) + + it('should push protocol updates to an already connected peer', async () => { + libp2p = await createLibp2pNode(createBaseOptions({ + peerId + })) + + await libp2p.start() + + if (libp2p.identifyService == null) { + throw new Error('Identity service was not configured') + } + + const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') + const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push') + const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect') + const connection = await libp2p.dial(remoteAddr) + + expect(connection).to.exist() + // Wait for connection event to be emitted + await connectionPromise + + // Wait for identify to finish + await identityServiceIdentifySpy.firstCall.returnValue + sinon.stub(libp2p, 'isStarted').returns(true) + + await libp2p.handle('/echo/2.0.0', () => {}) + await libp2p.unhandle('/echo/2.0.0') + + // the protocol change event listener in the identity service is async + await pWaitFor(() => identityServicePushSpy.callCount === 2) + + // Verify the remote peer is notified of both changes + expect(identityServicePushSpy.callCount).to.equal(2) + + for (const call of identityServicePushSpy.getCalls()) { + const [connections] = call.args + expect(connections.length).to.equal(1) + expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId()) + await call.returnValue + } + + // Verify the streams close + await pWaitFor(() => connection.streams.length === 0) + }) + + it('should store host data and protocol version into metadataBook', async () => { + const agentVersion = 'js-project/1.0.0' + + libp2p = await createLibp2pNode(createBaseOptions({ + peerId, + identify: { + host: { + agentVersion + } + } + })) + + await libp2p.start() + + if (libp2p.identifyService == null) { + throw new Error('Identity service was not configured') + } + + const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion') + const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion') + + expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array())) + expect(storedProtocolVersion).to.exist() + }) + + it('should push multiaddr updates to an already connected peer', async () => { + libp2p = await createLibp2pNode(createBaseOptions({ + peerId + })) + + await libp2p.start() + + if (libp2p.identifyService == null) { + throw new Error('Identity service was not configured') + } + + const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify') + const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push') + const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect') + const connection = await libp2p.dial(remoteAddr) + + expect(connection).to.exist() + // Wait for connection event to be emitted + await connectionPromise + + // Wait for identify to finish + await identityServiceIdentifySpy.firstCall.returnValue + sinon.stub(libp2p, 'isStarted').returns(true) + + await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')]) + + // the protocol change event listener in the identity service is async + await pWaitFor(() => identityServicePushSpy.callCount === 1) + + // Verify the remote peer is notified of change + expect(identityServicePushSpy.callCount).to.equal(1) + for (const call of identityServicePushSpy.getCalls()) { + const [connections] = call.args + expect(connections.length).to.equal(1) + expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId()) + await call.returnValue + } + + // Verify the streams close + await pWaitFor(() => connection.streams.length === 0) + }) +}) diff --git a/test/ping/index.spec.ts b/test/ping/index.spec.ts new file mode 100644 index 00000000..d97837bb --- /dev/null +++ b/test/ping/index.spec.ts @@ -0,0 +1,122 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { PingService } from '../../src/ping/index.js' +import Peers from '../fixtures/peers.js' +import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks' +import { createFromJSON } from '@libp2p/peer-id-factory' +import { Components } from '@libp2p/interfaces/components' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { start, stop } from '@libp2p/interfaces/startable' +import { CustomEvent } from '@libp2p/interfaces/events' +import { TimeoutController } from 'timeout-abort-controller' +import delay from 'delay' +import { pipe } from 'it-pipe' + +const defaultInit = { + protocolPrefix: 'ipfs' +} + +async function createComponents (index: number) { + const peerId = await createFromJSON(Peers[index]) + + const components = new Components({ + peerId, + registrar: mockRegistrar(), + upgrader: mockUpgrader(), + connectionManager: new DefaultConnectionManager({ + minConnections: 50, + maxConnections: 1000, + autoDialInterval: 1000 + }) + }) + + return components +} + +describe('ping', () => { + let localComponents: Components + let remoteComponents: Components + + beforeEach(async () => { + localComponents = await createComponents(0) + remoteComponents = await createComponents(1) + + await Promise.all([ + start(localComponents), + start(remoteComponents) + ]) + }) + + afterEach(async () => { + sinon.restore() + + await Promise.all([ + stop(localComponents), + stop(remoteComponents) + ]) + }) + + it('should be able to ping another peer', async () => { + const localPing = new PingService(localComponents, defaultInit) + const remotePing = new PingService(remoteComponents, defaultInit) + + await start(localPing) + await start(remotePing) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // Run ping + await expect(localPing.ping(remoteComponents.getPeerId())).to.eventually.be.gte(0) + }) + + it('should time out pinging another peer when waiting for a pong', async () => { + const localPing = new PingService(localComponents, defaultInit) + const remotePing = new PingService(remoteComponents, defaultInit) + + await start(localPing) + await start(remotePing) + + // simulate connection between nodes + const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) + localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote })) + remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal })) + + // replace existing handler with a really slow one + await remoteComponents.getRegistrar().unhandle(remotePing.protocol) + await remoteComponents.getRegistrar().handle(remotePing.protocol, ({ stream }) => { + void pipe( + stream, + async function * (source) { + for await (const chunk of source) { + // longer than the timeout + await delay(1000) + + yield chunk + } + }, + stream + ) + }) + + const newStreamSpy = sinon.spy(localToRemote, 'newStream') + + // 10 ms timeout + const timeoutController = new TimeoutController(10) + + // Run ping, should time out + await expect(localPing.ping(remoteComponents.getPeerId(), { + signal: timeoutController.signal + })) + .to.eventually.be.rejected.with.property('code', 'ABORT_ERR') + + // should have closed stream + expect(newStreamSpy).to.have.property('callCount', 1) + const { stream } = await newStreamSpy.getCall(0).returnValue + expect(stream).to.have.nested.property('timeline.close') + }) +}) diff --git a/test/core/ping.node.ts b/test/ping/ping.node.ts similarity index 100% rename from test/core/ping.node.ts rename to test/ping/ping.node.ts diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 56e5eb96..6a677298 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -14,7 +14,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import swarmKey from '../fixtures/swarm.key.js' import { DefaultUpgrader } from '../../src/upgrader.js' import { codes } from '../../src/errors.js' -import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar } from '@libp2p/interface-compliance-tests/mocks' +import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-compliance-tests/mocks' import Peers from '../fixtures/peers.js' import type { Upgrader } from '@libp2p/interfaces/transport' import type { PeerId } from '@libp2p/interfaces/peer-id' @@ -27,6 +27,9 @@ import type { Stream } from '@libp2p/interfaces/connection' import pDefer from 'p-defer' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { pEvent } from 'p-event' +import { TimeoutController } from 'timeout-abort-controller' +import delay from 'delay' +import drain from 'it-drain' const addrs = [ new Multiaddr('/ip4/127.0.0.1/tcp/0'), @@ -35,6 +38,7 @@ const addrs = [ describe('Upgrader', () => { let localUpgrader: Upgrader + let localMuxerFactory: StreamMuxerFactory let remoteUpgrader: Upgrader let localPeer: PeerId let remotePeer: PeerId @@ -55,12 +59,13 @@ describe('Upgrader', () => { connectionGater: mockConnectionGater(), registrar: mockRegistrar() }) + localMuxerFactory = new Mplex() localUpgrader = new DefaultUpgrader(localComponents, { connectionEncryption: [ new Plaintext() ], muxers: [ - new Mplex() + localMuxerFactory ] }) @@ -366,6 +371,40 @@ describe('Upgrader', () => { expect(result).to.have.nested.property('reason.code', codes.ERR_UNSUPPORTED_PROTOCOL) }) }) + + it('should abort protocol selection for slow streams', async () => { + const createStreamMuxerSpy = sinon.spy(localMuxerFactory, 'createStreamMuxer') + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) + + const connections = await Promise.all([ + localUpgrader.upgradeOutbound(outbound), + remoteUpgrader.upgradeInbound(inbound) + ]) + + // 10 ms timeout + const timeoutController = new TimeoutController(10) + + // should have created muxer for connection + expect(createStreamMuxerSpy).to.have.property('callCount', 1) + + // create mock muxed stream that never sends data + const muxer = createStreamMuxerSpy.getCall(0).returnValue + muxer.newStream = () => { + return mockStream({ + source: (async function * () { + // longer than the timeout + await delay(1000) + yield new Uint8Array() + }()), + sink: drain + }) + } + + await expect(connections[0].newStream('/echo/1.0.0', { + signal: timeoutController.signal + })) + .to.eventually.be.rejected.with.property('code', 'ABORT_ERR') + }) }) describe('libp2p.upgrader', () => {