fix: time out slow reads (#1227)

There are a few places in the codebase where we send/receive data from the network without timeouts/abort controllers which means the user has to wait for the underlying socket to timeout which can take a long time depending on the platform, if at all.

This change ensures we can time out while running identify (both flavours), ping and fetch and adds tests to ensure there are no regressions.
This commit is contained in:
Alex Potsides 2022-05-25 18:15:21 +01:00 committed by GitHub
parent 5934b13cce
commit a1220d22f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1039 additions and 442 deletions

View File

@ -97,7 +97,9 @@ Creates an instance of Libp2p.
| options.modules | [`Array<object>`](./CONFIGURATION.md#modules) | libp2p [modules](./CONFIGURATION.md#modules) to use |
| [options.addresses] | `{ listen: Array<string>, announce: Array<string>, announceFilter: (ma: Array<multiaddr>) => Array<multiaddr> }` | Addresses for transport listening and to advertise to the network |
| [options.config] | `object` | libp2p modules configuration and core configuration |
| [options.host] | `{ agentVersion: string }` | libp2p host options |
| [options.identify] | `{ protocolPrefix: string, host: { agentVersion: string }, timeout: number }` | libp2p identify protocol options |
| [options.ping] | `{ protocolPrefix: string }` | libp2p ping protocol options |
| [options.fetch] | `{ protocolPrefix: string }` | libp2p fetch protocol options |
| [options.connectionManager] | [`object`](./CONFIGURATION.md#configuring-connection-manager) | libp2p Connection Manager [configuration](./CONFIGURATION.md#configuring-connection-manager) |
| [options.transportManager] | [`object`](./CONFIGURATION.md#configuring-transport-manager) | libp2p transport manager [configuration](./CONFIGURATION.md#configuring-transport-manager) |
| [options.datastore] | `object` | must implement [ipfs/interface-datastore](https://github.com/ipfs/interface-datastore) (in memory datastore will be used if not provided) |

View File

@ -885,7 +885,12 @@ Changing the protocol name prefix can isolate default public network (IPFS) for
```js
const node = await createLibp2p({
protocolPrefix: 'ipfs' // default
identify: {
protocolPrefix: 'ipfs' // default
},
ping: {
protocolPrefix: 'ipfs' // default
}
})
/*
protocols: [

View File

@ -37,6 +37,7 @@ The following changes have been made to the configuration object:
3. Use of the `enabled` flag has been removed - if you don't want a particular feature enabled, don't pass a module implementing that feature
4. Some keys have been renamed = `transport` -> `transports`, `streamMuxer` -> `streamMuxers`, `connEncryption` -> `connectionEncryption`, etc
5. Keys from `config.dialer` have been moved to `config.connectionManager` as the connection manager is now responsible for managing connections
6. The `protocolPrefix` configuration option is now passed on a per-protocol basis for `identify`, `fetch` and `ping`
**Before**
@ -71,6 +72,7 @@ const node = await Libp2p.create({
MulticastDNS
]
},
protocolPrefix: 'ipfs',
config: {
peerDiscovery: {
autoDial: true,
@ -136,7 +138,10 @@ const node = await createLibp2p({
new MulticastDNS({
interval: 1000
})
]
],
identify: {
protocolPrefix: 'ipfs'
}
})
```

View File

@ -3,7 +3,7 @@
"version": "0.1.0",
"private": true,
"dependencies": {
"@chainsafe/libp2p-noise": "^6.1.1",
"@chainsafe/libp2p-noise": "^6.2.0",
"ipfs-core": "^0.14.1",
"libp2p": "../../",
"@libp2p/delegated-content-routing": "^1.0.1",

View File

@ -9,7 +9,7 @@
},
"license": "ISC",
"dependencies": {
"@chainsafe/libp2p-noise": "^6.1.1",
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^1.0.4",
"@libp2p/mplex": "^1.0.4",
"@libp2p/webrtc-star": "^1.0.8",

View File

@ -10,7 +10,7 @@
"license": "ISC",
"dependencies": {
"@libp2p/webrtc-direct": "^1.0.1",
"@chainsafe/libp2p-noise": "^6.1.1",
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^1.0.4",
"@libp2p/mplex": "^1.0.4",
"libp2p": "../../",

View File

@ -95,7 +95,7 @@
"@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/connection": "^2.0.2",
"@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^2.0.1",
"@libp2p/interfaces": "^2.0.2",
"@libp2p/logger": "^1.1.4",
"@libp2p/multistream-select": "^1.0.4",
"@libp2p/peer-collections": "^1.0.2",
@ -127,7 +127,6 @@
"it-pipe": "^2.0.3",
"it-sort": "^1.0.1",
"it-stream-types": "^1.0.4",
"it-take": "^1.0.2",
"merge-options": "^3.0.4",
"multiformats": "^9.6.3",
"mutable-proxy": "^1.0.0",
@ -146,14 +145,14 @@
"xsalsa20": "^1.1.0"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^6.1.1",
"@chainsafe/libp2p-noise": "^6.2.0",
"@libp2p/bootstrap": "^1.0.4",
"@libp2p/daemon-client": "^1.0.2",
"@libp2p/daemon-server": "^1.0.2",
"@libp2p/delegated-content-routing": "^1.0.2",
"@libp2p/delegated-peer-routing": "^1.0.2",
"@libp2p/floodsub": "^1.0.6",
"@libp2p/interface-compliance-tests": "^2.0.1",
"@libp2p/interface-compliance-tests": "^2.0.3",
"@libp2p/interop": "^1.0.3",
"@libp2p/kad-dht": "^1.0.9",
"@libp2p/mdns": "^1.0.5",

View File

@ -35,9 +35,6 @@ const DefaultConfig: Partial<Libp2pInit> = {
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
},
host: {
agentVersion: AGENT_VERSION
},
metrics: {
enabled: false,
computeThrottleMaxQueueSize: 1000,
@ -56,7 +53,6 @@ const DefaultConfig: Partial<Libp2pInit> = {
bootDelay: 10e3
}
},
protocolPrefix: 'ipfs',
nat: {
enabled: true,
ttl: 7200,
@ -77,6 +73,19 @@ const DefaultConfig: Partial<Libp2pInit> = {
enabled: false,
maxListeners: 2
}
},
identify: {
protocolPrefix: 'ipfs',
host: {
agentVersion: AGENT_VERSION
},
timeout: 30000
},
ping: {
protocolPrefix: 'ipfs'
},
fetch: {
protocolPrefix: 'libp2p'
}
}

View File

@ -1,3 +1,4 @@
// https://github.com/libp2p/specs/tree/master/fetch#wire-protocol
export const PROTOCOL = '/libp2p/fetch/0.0.1'
export const PROTOCOL_VERSION = '0.0.1'
export const PROTOCOL_NAME = 'fetch'

View File

@ -4,16 +4,19 @@ import { codes } from '../errors.js'
import * as lp from 'it-length-prefixed'
import { FetchRequest, FetchResponse } from './pb/proto.js'
import { handshake } from 'it-handshake'
import { PROTOCOL } from './constants.js'
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Stream } from '@libp2p/interfaces/connection'
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { Components } from '@libp2p/interfaces/components'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Duplex } from 'it-stream-types'
import { abortableDuplex } from 'abortable-iterator'
const log = logger('libp2p:fetch')
export interface FetchInit {
export interface FetchServiceInit {
protocolPrefix: string
}
@ -33,15 +36,15 @@ export interface LookupFunction {
* by a fixed prefix that all keys that should be routed to that lookup function will start with.
*/
export class FetchService implements Startable {
public readonly protocol: string
private readonly components: Components
private readonly lookupFunctions: Map<string, LookupFunction>
private readonly protocol: string
private started: boolean
constructor (components: Components, init: FetchInit) {
constructor (components: Components, init: FetchServiceInit) {
this.started = false
this.components = components
this.protocol = PROTOCOL
this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
this.lookupFunctions = new Map() // Maps key prefix to value lookup function
this.handleMessage = this.handleMessage.bind(this)
}
@ -67,12 +70,19 @@ export class FetchService implements Startable {
/**
* Sends a request to fetch the value associated with the given key from the given peer
*/
async fetch (peer: PeerId, key: string): Promise<Uint8Array | null> {
async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
log('dialing %s to %p', this.protocol, peer)
const connection = await this.components.getConnectionManager().openConnection(peer)
const { stream } = await connection.newStream([this.protocol])
const shake = handshake(stream)
const connection = await this.components.getConnectionManager().openConnection(peer, options)
const { stream } = await connection.newStream([this.protocol], options)
let source: Duplex<Uint8Array> = stream
// make stream abortable if AbortSignal passed
if (options.signal != null) {
source = abortableDuplex(stream, options.signal)
}
const shake = handshake(source)
// send message
shake.write(lp.encode.single(FetchRequest.encode({ identifier: key })).slice())

View File

@ -2,8 +2,6 @@ import { logger } from '@libp2p/logger'
import errCode from 'err-code'
import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import all from 'it-all'
import take from 'it-take'
import drain from 'it-drain'
import first from 'it-first'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
@ -21,13 +19,19 @@ import {
} from './consts.js'
import { codes } from '../errors.js'
import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { Connection } from '@libp2p/interfaces/connection'
import type { Connection, Stream } from '@libp2p/interfaces/connection'
import type { Startable } from '@libp2p/interfaces/startable'
import { peerIdFromKeys } from '@libp2p/peer-id'
import type { Components } from '@libp2p/interfaces/components'
import { TimeoutController } from 'timeout-abort-controller'
import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator'
import type { Duplex } from 'it-stream-types'
const log = logger('libp2p:identify')
const IDENTIFY_TIMEOUT = 30000
export interface HostProperties {
agentVersion: string
}
@ -35,6 +39,7 @@ export interface HostProperties {
export interface IdentifyServiceInit {
protocolPrefix: string
host: HostProperties
timeout?: number
}
export class IdentifyService implements Startable {
@ -46,11 +51,13 @@ export class IdentifyService implements Startable {
agentVersion: string
}
private readonly init: IdentifyServiceInit
private started: boolean
constructor (components: Components, init: IdentifyServiceInit) {
this.components = components
this.started = false
this.init = init
this.handleMessage = this.handleMessage.bind(this)
@ -128,8 +135,17 @@ export class IdentifyService implements Startable {
const protocols = await this.components.getPeerStore().protoBook.get(this.components.getPeerId())
const pushes = connections.map(async connection => {
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
let stream: Stream | undefined
try {
const { stream } = await connection.newStream([this.identifyPushProtocolStr])
const data = await connection.newStream([this.identifyPushProtocolStr], {
signal: timeoutController.signal
})
stream = data.stream
// make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
await pipe(
[Identify.encode({
@ -138,12 +154,18 @@ export class IdentifyService implements Startable {
protocols
})],
lp.encode(),
stream,
source,
drain
)
} catch (err: any) {
// Just log errors
log.error('could not push identify update to peer', err)
} finally {
if (stream != null) {
stream.close()
}
timeoutController.clear()
}
})
@ -175,31 +197,44 @@ export class IdentifyService implements Startable {
await this.push(connections)
}
async _identify (connection: Connection, options: AbortOptions = {}): Promise<Identify> {
const { stream } = await connection.newStream([this.identifyProtocolStr], options)
let source: Duplex<Uint8Array> = stream
// make stream abortable if AbortSignal passed
if (options.signal != null) {
source = abortableDuplex(stream, options.signal)
}
try {
const data = await pipe(
[],
source,
lp.decode(),
async (source) => await first(source)
)
if (data == null) {
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
}
try {
return Identify.decode(data)
} catch (err: any) {
throw errCode(err, codes.ERR_INVALID_MESSAGE)
}
} finally {
stream.close()
}
}
/**
* Requests the `Identify` message from peer associated with the given `connection`.
* If the identified peer does not match the `PeerId` associated with the connection,
* an error will be thrown.
*/
async identify (connection: Connection): Promise<void> {
const { stream } = await connection.newStream([this.identifyProtocolStr])
const [data] = await pipe(
[],
stream,
lp.decode(),
(source) => take(source, 1),
async (source) => await all(source)
)
if (data == null) {
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
}
let message: Identify
try {
message = Identify.decode(data)
} catch (err: any) {
throw errCode(err, codes.ERR_INVALID_MESSAGE)
}
async identify (connection: Connection, options: AbortOptions = {}): Promise<void> {
const message = await this._identify(connection, options)
const {
publicKey,
@ -308,6 +343,8 @@ export class IdentifyService implements Startable {
*/
async _handleIdentify (data: IncomingStreamData) {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
try {
const publicKey = this.components.getPeerId().publicKey ?? new Uint8Array(0)
const peerData = await this.components.getPeerStore().get(this.components.getPeerId())
@ -335,14 +372,20 @@ export class IdentifyService implements Startable {
protocols: peerData.protocols
})
// make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
await pipe(
[message],
lp.encode(),
stream,
source,
drain
)
} catch (err: any) {
log.error('could not respond to identify request', err)
} finally {
stream.close()
timeoutController.clear()
}
}
@ -351,12 +394,16 @@ export class IdentifyService implements Startable {
*/
async _handlePush (data: IncomingStreamData) {
const { connection, stream } = data
const timeoutController = new TimeoutController(this.init.timeout ?? IDENTIFY_TIMEOUT)
let message: Identify | undefined
try {
// make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal)
const data = await pipe(
[],
stream,
source,
lp.decode(),
async (source) => await first(source)
)
@ -366,6 +413,9 @@ export class IdentifyService implements Startable {
}
} catch (err: any) {
return log.error('received invalid message', err)
} finally {
stream.close()
timeoutController.clear()
}
if (message == null) {

View File

@ -4,7 +4,7 @@ import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { FaultTolerance } from './transport-manager.js'
import type { HostProperties } from './identify/index.js'
import type { IdentifyServiceInit } from './identify/index.js'
import type { DualDHT } from '@libp2p/interfaces/dht'
import type { Datastore } from 'interface-datastore'
import type { PeerStore, PeerStoreInit } from '@libp2p/interfaces/peer-store'
@ -24,6 +24,8 @@ import type { Metrics, MetricsInit } from '@libp2p/interfaces/metrics'
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import type { KeyChain } from './keychain/index.js'
import type { ConnectionManagerInit } from './connection-manager/index.js'
import type { PingServiceInit } from './ping/index.js'
import type { FetchServiceInit } from './fetch/index.js'
export interface PersistentPeerStoreOptions {
threshold?: number
@ -95,7 +97,6 @@ export interface RefreshManagerConfig {
export interface Libp2pInit {
peerId: PeerId
host: HostProperties
addresses: AddressesConfig
connectionManager: ConnectionManagerInit
connectionGater: Partial<ConnectionGater>
@ -105,9 +106,11 @@ export interface Libp2pInit {
peerStore: PeerStoreInit
peerRouting: PeerRoutingConfig
keychain: KeychainConfig
protocolPrefix: string
nat: NatManagerConfig
relay: RelayConfig
identify: IdentifyServiceInit
ping: PingServiceInit
fetch: FetchServiceInit
transports: Transport[]
streamMuxers?: StreamMuxerFactory[]
@ -195,12 +198,12 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
/**
* Pings the given peer in order to obtain the operation latency
*/
ping: (peer: Multiaddr |PeerId) => Promise<number>
ping: (peer: Multiaddr | PeerId, options?: AbortOptions) => Promise<number>
/**
* Sends a request to fetch the value associated with the given key from the given peer.
*/
fetch: (peer: PeerId | Multiaddr | string, key: string) => Promise<Uint8Array | null>
fetch: (peer: PeerId | Multiaddr | string, key: string, options?: AbortOptions) => Promise<Uint8Array | null>
/**
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA' type

View File

@ -166,10 +166,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
if (init.streamMuxers != null && init.streamMuxers.length > 0) {
// Add the identify service since we can multiplex
this.identifyService = new IdentifyService(this.components, {
protocolPrefix: init.protocolPrefix,
host: {
agentVersion: init.host.agentVersion
}
...init.identify
})
this.configureComponent(this.identifyService)
}
@ -229,11 +226,11 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
}
this.fetchService = this.configureComponent(new FetchService(this.components, {
protocolPrefix: init.protocolPrefix
...init.fetch
}))
this.pingService = this.configureComponent(new PingService(this.components, {
protocolPrefix: init.protocolPrefix
...init.ping
}))
const autoDialer = this.configureComponent(new AutoDialer(this.components, {
@ -419,9 +416,9 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
throw errCode(new Error('no protocols were provided to open a stream'), codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
const connection = await this.dial(peer)
const connection = await this.dial(peer, options)
return await connection.newStream(protocols)
return await connection.newStream(protocols, options)
}
getMultiaddrs (): Multiaddr[] {
@ -473,24 +470,24 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
throw errCode(new Error(`Node not responding with its public key: ${peer.toString()}`), codes.ERR_INVALID_RECORD)
}
async fetch (peer: PeerId | Multiaddr | string, key: string): Promise<Uint8Array | null> {
async fetch (peer: PeerId | Multiaddr | string, key: string, options: AbortOptions = {}): Promise<Uint8Array | null> {
const { id, multiaddrs } = getPeer(peer)
if (multiaddrs != null) {
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
}
return await this.fetchService.fetch(id, key)
return await this.fetchService.fetch(id, key, options)
}
async ping (peer: PeerId | Multiaddr | string): Promise<number> {
async ping (peer: PeerId | Multiaddr | string, options: AbortOptions = {}): Promise<number> {
const { id, multiaddrs } = getPeer(peer)
if (multiaddrs.length > 0) {
await this.components.getPeerStore().addressBook.add(id, multiaddrs)
}
return await this.pingService.ping(id)
return await this.pingService.ping(id, options)
}
async handle (protocols: string | string[], handler: StreamHandler): Promise<void> {

View File

@ -10,6 +10,9 @@ import type { IncomingStreamData } from '@libp2p/interfaces/registrar'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Components } from '@libp2p/interfaces/components'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Duplex } from 'it-stream-types'
import { abortableDuplex } from 'abortable-iterator'
const log = logger('libp2p:ping')
@ -18,8 +21,8 @@ export interface PingServiceInit {
}
export class PingService implements Startable {
public readonly protocol: string
private readonly components: Components
private readonly protocol: string
private started: boolean
constructor (components: Components, init: PingServiceInit) {
@ -60,25 +63,36 @@ export class PingService implements Startable {
* @param {PeerId|Multiaddr} peer
* @returns {Promise<number>}
*/
async ping (peer: PeerId): Promise<number> {
async ping (peer: PeerId, options: AbortOptions = {}): Promise<number> {
log('dialing %s to %p', this.protocol, peer)
const connection = await this.components.getConnectionManager().openConnection(peer)
const { stream } = await connection.newStream([this.protocol])
const connection = await this.components.getConnectionManager().openConnection(peer, options)
const { stream } = await connection.newStream([this.protocol], options)
const start = Date.now()
const data = randomBytes(PING_LENGTH)
const result = await pipe(
[data],
stream,
async (source) => await first(source)
)
const end = Date.now()
let source: Duplex<Uint8Array> = stream
if (result == null || !uint8ArrayEquals(data, result)) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
// make stream abortable if AbortSignal passed
if (options.signal != null) {
source = abortableDuplex(stream, options.signal)
}
return end - start
try {
const result = await pipe(
[data],
source,
async (source) => await first(source)
)
const end = Date.now()
if (result == null || !uint8ArrayEquals(data, result)) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
}
return end - start
} finally {
stream.close()
}
}
}

View File

@ -15,6 +15,7 @@ import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { MultiaddrConnection, Upgrader, UpgraderEvents } from '@libp2p/interfaces/transport'
import type { Duplex } from 'it-stream-types'
import type { Components } from '@libp2p/interfaces/components'
import type { AbortOptions } from '@libp2p/interfaces'
const log = logger('libp2p:upgrader')
@ -266,7 +267,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
} = opts
let muxer: StreamMuxer | undefined
let newStream: ((multicodecs: string[]) => Promise<ProtocolStream>) | undefined
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<ProtocolStream>) | undefined
let connection: Connection // eslint-disable-line prefer-const
if (muxerFactory != null) {
@ -308,7 +309,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
}
})
newStream = async (protocols: string[]): Promise<ProtocolStream> => {
newStream = async (protocols: string[], options: AbortOptions = {}): Promise<ProtocolStream> => {
if (muxer == null) {
throw errCode(new Error('Stream is not multiplexed'), codes.ERR_MUXER_UNAVAILABLE)
}
@ -319,7 +320,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
const metrics = this.components.getMetrics()
try {
let { stream, protocol } = await mss.select(protocols)
let { stream, protocol } = await mss.select(protocols, options)
if (metrics != null) {
stream = metrics.trackStream({ stream, remotePeer, protocol })
@ -328,6 +329,11 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err: any) {
log.error('could not create new stream', err)
if (err.code != null) {
throw err
}
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
}
}

View File

@ -18,13 +18,21 @@ describe('Protocol prefix is configurable', () => {
it('protocolPrefix is provided', async () => {
const testProtocol = 'test-protocol'
libp2p = await createLibp2pNode(mergeOptions(baseOptions, {
protocolPrefix: testProtocol
identify: {
protocolPrefix: testProtocol
},
ping: {
protocolPrefix: testProtocol
},
fetch: {
protocolPrefix: testProtocol
}
}))
await libp2p.start()
const protocols = await libp2p.peerStore.protoBook.get(libp2p.peerId)
expect(protocols).to.include.members([
'/libp2p/fetch/0.0.1',
`/${testProtocol}/fetch/0.0.1`,
'/libp2p/circuit/relay/0.1.0',
`/${testProtocol}/id/1.0.0`,
`/${testProtocol}/id/push/1.0.0`,
@ -41,7 +49,8 @@ describe('Protocol prefix is configurable', () => {
'/libp2p/circuit/relay/0.1.0',
'/ipfs/id/1.0.0',
'/ipfs/id/push/1.0.0',
'/ipfs/ping/1.0.0'
'/ipfs/ping/1.0.0',
'/libp2p/fetch/0.0.1'
])
})
})

133
test/fetch/index.spec.ts Normal file
View File

@ -0,0 +1,133 @@
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { FetchService } from '../../src/fetch/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/interfaces/components'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { start, stop } from '@libp2p/interfaces/startable'
import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
const defaultInit = {
protocolPrefix: 'ipfs'
}
async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index])
const components = new Components({
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
})
return components
}
describe('fetch', () => {
let localComponents: Components
let remoteComponents: Components
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
await Promise.all([
start(localComponents),
start(remoteComponents)
])
})
afterEach(async () => {
sinon.restore()
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})
it('should be able to fetch from another peer', async () => {
const key = 'key'
const value = Uint8Array.from([0, 1, 2, 3, 4])
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
remoteFetch.registerLookupFunction(key, async (identifier) => {
expect(identifier).to.equal(key)
return value
})
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// Run fetch
const result = await localFetch.fetch(remoteComponents.getPeerId(), key)
expect(result).to.equalBytes(value)
})
it('should time out fetching from another peer when waiting for the record', async () => {
const key = 'key'
const localFetch = new FetchService(localComponents, defaultInit)
const remoteFetch = new FetchService(remoteComponents, defaultInit)
await start(localFetch)
await start(remoteFetch)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(remoteFetch.protocol)
await remoteComponents.getRegistrar().handle(remoteFetch.protocol, ({ stream }) => {
void pipe(
stream,
async function * (source) {
for await (const chunk of source) {
// longer than the timeout
await delay(1000)
yield chunk
}
},
stream
)
})
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// Run fetch, should time out
await expect(localFetch.fetch(remoteComponents.getPeerId(), key, {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
})
})

View File

@ -3,17 +3,13 @@
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../../src/errors.js'
import { IdentifyService, Message } from '../../src/identify/index.js'
import Peers from '../fixtures/peers.js'
import { createLibp2pNode } from '../../src/libp2p.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { createBaseOptions } from '../utils/base-options.browser.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { MemoryDatastore } from 'datastore-core/memory'
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
import * as lp from 'it-length-prefixed'
import drain from 'it-drain'
import { pipe } from 'it-pipe'
@ -27,14 +23,9 @@ import {
} from '../../src/identify/consts.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { CustomEvent } from '@libp2p/interfaces/events'
import delay from 'delay'
import pWaitFor from 'p-wait-for'
import { peerIdFromString } from '@libp2p/peer-id'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Libp2pNode } from '../../src/libp2p.js'
import { pEvent } from 'p-event'
import { start, stop } from '@libp2p/interfaces/startable'
import { TimeoutController } from 'timeout-abort-controller'
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
@ -75,18 +66,16 @@ async function createComponents (index: number) {
return components
}
describe('Identify', () => {
describe('identify', () => {
let localComponents: Components
let remoteComponents: Components
let localPeerRecordUpdater: PeerRecordUpdater
let remotePeerRecordUpdater: PeerRecordUpdater
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
remotePeerRecordUpdater = new PeerRecordUpdater(remoteComponents)
await Promise.all([
@ -238,355 +227,47 @@ describe('Identify', () => {
await stop(localIdentify)
})
describe('push', () => {
it('should be able to push identify updates to another peer', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
it('should time out during identify', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await start(localIdentify)
await start(remoteIdentify)
await start(localIdentify)
await start(remoteIdentify)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
const [localToRemote] = connectionPair(localComponents, remoteComponents)
// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: localToRemote
}))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: remoteToLocal
}))
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY)
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY, ({ stream }) => {
void pipe(
stream,
async function * (source) {
// we receive no data in the identify protocol, we just send our data
await drain(source)
// identify both ways
await localIdentify.identify(localToRemote)
await remoteIdentify.identify(remoteToLocal)
// longer than the timeout
await delay(1000)
const updatedProtocol = '/special-new-protocol/1.0.0'
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
// should have protocols but not our new one
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(identifiedProtocols).to.not.be.empty()
expect(identifiedProtocols).to.not.include(updatedProtocol)
// should have addresses but not our new one
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(identifiedAddresses).to.not.be.empty()
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
// update local data - change event will trigger push
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
// needed to update the peer record and send our supported addresses
const addressManager = localComponents.getAddressManager()
addressManager.getAddresses = () => {
return [updatedAddress]
}
// ensure sequence number of peer record we are about to create is different
await delay(1000)
// make sure we have a peer record to send
await localPeerRecordUpdater.update()
// wait for the remote peer store to notice the changes
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
// push updated peer record to connections
await localIdentify.pushToPeerStore()
await eventPromise
// should have new protocol
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(updatedProtocols).to.not.be.empty()
expect(updatedProtocols).to.include(updatedProtocol)
// should have new address
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(updatedAddresses.map(a => {
return {
multiaddr: a.multiaddr.toString(),
isCertified: a.isCertified
}
})).to.deep.equal([{
multiaddr: updatedAddress.toString(),
isCertified: true
}])
await stop(localIdentify)
await stop(remoteIdentify)
yield new Uint8Array()
},
stream
)
})
// LEGACY
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
await start(localIdentify)
await start(remoteIdentify)
// 10 ms timeout
const timeoutController = new TimeoutController(10)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
// Run identify
await expect(localIdentify.identify(localToRemote, {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: localToRemote
}))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: remoteToLocal
}))
// identify both ways
await localIdentify.identify(localToRemote)
await remoteIdentify.identify(remoteToLocal)
const updatedProtocol = '/special-new-protocol/1.0.0'
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
// should have protocols but not our new one
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(identifiedProtocols).to.not.be.empty()
expect(identifiedProtocols).to.not.include(updatedProtocol)
// should have addresses but not our new one
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(identifiedAddresses).to.not.be.empty()
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
// update local data - change event will trigger push
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
// needed to send our supported addresses
const addressManager = localComponents.getAddressManager()
addressManager.getAddresses = () => {
return [updatedAddress]
}
// wait until remote peer store notices protocol list update
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
await localIdentify.pushToPeerStore()
await waitForUpdate
// should have new protocol
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(updatedProtocols).to.not.be.empty()
expect(updatedProtocols).to.include(updatedProtocol)
// should have new address
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(updatedAddresses.map(a => {
return {
multiaddr: a.multiaddr.toString(),
isCertified: a.isCertified
}
})).to.deep.equal([{
multiaddr: updatedAddress.toString(),
isCertified: false
}])
await stop(localIdentify)
await stop(remoteIdentify)
})
})
describe('libp2p.dialer.identifyService', () => {
let peerId: PeerId
let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
before(async () => {
peerId = await createFromJSON(Peers[0])
})
afterEach(async () => {
sinon.restore()
if (libp2p != null) {
await libp2p.stop()
}
})
after(async () => {
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
}
})
it('should run identify automatically after connecting', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(identityServiceIdentifySpy.callCount).to.equal(1)
// The connection should have no open streams
await pWaitFor(() => connection.streams.length === 0)
await connection.close()
})
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(identityServiceIdentifySpy.callCount).to.equal(1)
// The connection should have no open streams
await pWaitFor(() => connection.streams.length === 0)
await connection.close()
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
expect(storedAgentVersion).to.exist()
expect(storedProtocolVersion).to.exist()
})
it('should push protocol updates to an already connected peer', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for connection event to be emitted
await connectionPromise
// Wait for identify to finish
await identityServiceIdentifySpy.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
await libp2p.handle('/echo/2.0.0', () => {})
await libp2p.unhandle('/echo/2.0.0')
// the protocol change event listener in the identity service is async
await pWaitFor(() => identityServicePushSpy.callCount === 2)
// Verify the remote peer is notified of both changes
expect(identityServicePushSpy.callCount).to.equal(2)
for (const call of identityServicePushSpy.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
await call.returnValue
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
it('should store host data and protocol version into metadataBook', async () => {
const agentVersion = 'js-project/1.0.0'
libp2p = await createLibp2pNode(createBaseOptions({
peerId,
host: {
agentVersion
}
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
expect(storedProtocolVersion).to.exist()
})
it('should push multiaddr updates to an already connected peer', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for connection event to be emitted
await connectionPromise
// Wait for identify to finish
await identityServiceIdentifySpy.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
// the protocol change event listener in the identity service is async
await pWaitFor(() => identityServicePushSpy.callCount === 1)
// Verify the remote peer is notified of change
expect(identityServicePushSpy.callCount).to.equal(1)
for (const call of identityServicePushSpy.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
await call.returnValue
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
})
})

296
test/identify/push.spec.ts Normal file
View File

@ -0,0 +1,296 @@
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr'
import { IdentifyService } from '../../src/identify/index.js'
import Peers from '../fixtures/peers.js'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { MemoryDatastore } from 'datastore-core/memory'
import drain from 'it-drain'
import { pipe } from 'it-pipe'
import { mockConnectionGater, mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/interfaces/components'
import { PeerRecordUpdater } from '../../src/peer-record-updater.js'
import {
MULTICODEC_IDENTIFY,
MULTICODEC_IDENTIFY_PUSH
} from '../../src/identify/consts.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { CustomEvent } from '@libp2p/interfaces/events'
import delay from 'delay'
import { pEvent } from 'p-event'
import { start, stop } from '@libp2p/interfaces/startable'
const listenMaddrs = [new Multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
const defaultInit = {
protocolPrefix: 'ipfs',
host: {
agentVersion: 'v1.0.0'
}
}
const protocols = [MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_PUSH]
async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index])
const components = new Components({
peerId,
datastore: new MemoryDatastore(),
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
connectionGater: mockConnectionGater(),
peerStore: new PersistentPeerStore(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
})
components.setAddressManager(new DefaultAddressManager(components, {
announce: listenMaddrs.map(ma => ma.toString())
}))
const transportManager = new DefaultTransportManager(components)
components.setTransportManager(transportManager)
await components.getPeerStore().protoBook.set(peerId, protocols)
return components
}
describe('identify (push)', () => {
let localComponents: Components
let remoteComponents: Components
let localPeerRecordUpdater: PeerRecordUpdater
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
localPeerRecordUpdater = new PeerRecordUpdater(localComponents)
await Promise.all([
start(localComponents),
start(remoteComponents)
])
})
afterEach(async () => {
sinon.restore()
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})
it('should be able to push identify updates to another peer', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await start(localIdentify)
await start(remoteIdentify)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: localToRemote
}))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: remoteToLocal
}))
// identify both ways
await localIdentify.identify(localToRemote)
await remoteIdentify.identify(remoteToLocal)
const updatedProtocol = '/special-new-protocol/1.0.0'
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
// should have protocols but not our new one
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(identifiedProtocols).to.not.be.empty()
expect(identifiedProtocols).to.not.include(updatedProtocol)
// should have addresses but not our new one
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(identifiedAddresses).to.not.be.empty()
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
// update local data - change event will trigger push
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
// needed to update the peer record and send our supported addresses
const addressManager = localComponents.getAddressManager()
addressManager.getAddresses = () => {
return [updatedAddress]
}
// ensure sequence number of peer record we are about to create is different
await delay(1000)
// make sure we have a peer record to send
await localPeerRecordUpdater.update()
// wait for the remote peer store to notice the changes
const eventPromise = pEvent(remoteComponents.getPeerStore(), 'change:multiaddrs')
// push updated peer record to connections
await localIdentify.pushToPeerStore()
await eventPromise
// should have new protocol
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(updatedProtocols).to.not.be.empty()
expect(updatedProtocols).to.include(updatedProtocol)
// should have new address
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(updatedAddresses.map(a => {
return {
multiaddr: a.multiaddr.toString(),
isCertified: a.isCertified
}
})).to.deep.equal([{
multiaddr: updatedAddress.toString(),
isCertified: true
}])
await stop(localIdentify)
await stop(remoteIdentify)
})
it('should time out during push identify', async () => {
let streamEnded = false
const localIdentify = new IdentifyService(localComponents, {
...defaultInit,
timeout: 10
})
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await start(localIdentify)
await start(remoteIdentify)
// simulate connection between nodes
const [localToRemote] = connectionPair(localComponents, remoteComponents)
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(MULTICODEC_IDENTIFY_PUSH)
await remoteComponents.getRegistrar().handle(MULTICODEC_IDENTIFY_PUSH, ({ stream }) => {
void pipe(
stream,
async function * (source) {
// ignore the sent data
await drain(source)
// longer than the timeout
await delay(1000)
// the delay should have caused the local push to time out so this should
// occur after the local push method invocation has completed
streamEnded = true
yield new Uint8Array()
},
stream
)
})
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
// push updated peer record to remote
await localIdentify.push([localToRemote])
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
// method should have returned before the remote handler completes as we timed
// out so we ignore the return value
expect(streamEnded).to.be.false()
})
// LEGACY
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
const localIdentify = new IdentifyService(localComponents, defaultInit)
const remoteIdentify = new IdentifyService(remoteComponents, defaultInit)
await start(localIdentify)
await start(remoteIdentify)
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
// ensure connections are registered by connection manager
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: localToRemote
}))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', {
detail: remoteToLocal
}))
// identify both ways
await localIdentify.identify(localToRemote)
await remoteIdentify.identify(remoteToLocal)
const updatedProtocol = '/special-new-protocol/1.0.0'
const updatedAddress = new Multiaddr('/ip4/127.0.0.1/tcp/48322')
// should have protocols but not our new one
const identifiedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(identifiedProtocols).to.not.be.empty()
expect(identifiedProtocols).to.not.include(updatedProtocol)
// should have addresses but not our new one
const identifiedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(identifiedAddresses).to.not.be.empty()
expect(identifiedAddresses.map(a => a.multiaddr.toString())).to.not.include(updatedAddress.toString())
// update local data - change event will trigger push
await localComponents.getPeerStore().protoBook.add(localComponents.getPeerId(), [updatedProtocol])
await localComponents.getPeerStore().addressBook.add(localComponents.getPeerId(), [updatedAddress])
// needed to send our supported addresses
const addressManager = localComponents.getAddressManager()
addressManager.getAddresses = () => {
return [updatedAddress]
}
// wait until remote peer store notices protocol list update
const waitForUpdate = pEvent(remoteComponents.getPeerStore(), 'change:protocols')
await localIdentify.pushToPeerStore()
await waitForUpdate
// should have new protocol
const updatedProtocols = await remoteComponents.getPeerStore().protoBook.get(localComponents.getPeerId())
expect(updatedProtocols).to.not.be.empty()
expect(updatedProtocols).to.include(updatedProtocol)
// should have new address
const updatedAddresses = await remoteComponents.getPeerStore().addressBook.get(localComponents.getPeerId())
expect(updatedAddresses.map(a => {
return {
multiaddr: a.multiaddr.toString(),
isCertified: a.isCertified
}
})).to.deep.equal([{
multiaddr: updatedAddress.toString(),
isCertified: false
}])
await stop(localIdentify)
await stop(remoteIdentify)
})
})

View File

@ -0,0 +1,216 @@
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import Peers from '../fixtures/peers.js'
import { createLibp2pNode } from '../../src/libp2p.js'
import { createBaseOptions } from '../utils/base-options.browser.js'
import { MULTIADDRS_WEBSOCKETS } from '../fixtures/browser.js'
import { createFromJSON } from '@libp2p/peer-id-factory'
import pWaitFor from 'p-wait-for'
import { peerIdFromString } from '@libp2p/peer-id'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Libp2pNode } from '../../src/libp2p.js'
import { pEvent } from 'p-event'
describe('libp2p.dialer.identifyService', () => {
let peerId: PeerId
let libp2p: Libp2pNode
let remoteLibp2p: Libp2pNode
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
before(async () => {
peerId = await createFromJSON(Peers[0])
})
afterEach(async () => {
sinon.restore()
if (libp2p != null) {
await libp2p.stop()
}
})
after(async () => {
if (remoteLibp2p != null) {
await remoteLibp2p.stop()
}
})
it('should run identify automatically after connecting', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(identityServiceIdentifySpy.callCount).to.equal(1)
// The connection should have no open streams
await pWaitFor(() => connection.streams.length === 0)
await connection.close()
})
it('should store remote agent and protocol versions in metadataBook after connecting', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const peerStoreSpyConsumeRecord = sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
const peerStoreSpyAdd = sinon.spy(libp2p.peerStore.addressBook, 'add')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(identityServiceIdentifySpy.callCount).to.equal(1)
// The connection should have no open streams
await pWaitFor(() => connection.streams.length === 0)
await connection.close()
const remotePeer = peerIdFromString(remoteAddr.getPeerId() ?? '')
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'AgentVersion')
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(remotePeer, 'ProtocolVersion')
expect(storedAgentVersion).to.exist()
expect(storedProtocolVersion).to.exist()
})
it('should push protocol updates to an already connected peer', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for connection event to be emitted
await connectionPromise
// Wait for identify to finish
await identityServiceIdentifySpy.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
await libp2p.handle('/echo/2.0.0', () => {})
await libp2p.unhandle('/echo/2.0.0')
// the protocol change event listener in the identity service is async
await pWaitFor(() => identityServicePushSpy.callCount === 2)
// Verify the remote peer is notified of both changes
expect(identityServicePushSpy.callCount).to.equal(2)
for (const call of identityServicePushSpy.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
await call.returnValue
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
it('should store host data and protocol version into metadataBook', async () => {
const agentVersion = 'js-project/1.0.0'
libp2p = await createLibp2pNode(createBaseOptions({
peerId,
identify: {
host: {
agentVersion
}
}
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const storedAgentVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'AgentVersion')
const storedProtocolVersion = await libp2p.peerStore.metadataBook.getValue(peerId, 'ProtocolVersion')
expect(agentVersion).to.equal(uint8ArrayToString(storedAgentVersion ?? new Uint8Array()))
expect(storedProtocolVersion).to.exist()
})
it('should push multiaddr updates to an already connected peer', async () => {
libp2p = await createLibp2pNode(createBaseOptions({
peerId
}))
await libp2p.start()
if (libp2p.identifyService == null) {
throw new Error('Identity service was not configured')
}
const identityServiceIdentifySpy = sinon.spy(libp2p.identifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.identifyService, 'push')
const connectionPromise = pEvent(libp2p.connectionManager, 'peer:connect')
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
// Wait for connection event to be emitted
await connectionPromise
// Wait for identify to finish
await identityServiceIdentifySpy.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
await libp2p.peerStore.addressBook.add(libp2p.peerId, [new Multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
// the protocol change event listener in the identity service is async
await pWaitFor(() => identityServicePushSpy.callCount === 1)
// Verify the remote peer is notified of change
expect(identityServicePushSpy.callCount).to.equal(1)
for (const call of identityServicePushSpy.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toString()).to.equal(remoteAddr.getPeerId())
await call.returnValue
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
})

122
test/ping/index.spec.ts Normal file
View File

@ -0,0 +1,122 @@
/* eslint-env mocha */
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { PingService } from '../../src/ping/index.js'
import Peers from '../fixtures/peers.js'
import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { Components } from '@libp2p/interfaces/components'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { start, stop } from '@libp2p/interfaces/startable'
import { CustomEvent } from '@libp2p/interfaces/events'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import { pipe } from 'it-pipe'
const defaultInit = {
protocolPrefix: 'ipfs'
}
async function createComponents (index: number) {
const peerId = await createFromJSON(Peers[index])
const components = new Components({
peerId,
registrar: mockRegistrar(),
upgrader: mockUpgrader(),
connectionManager: new DefaultConnectionManager({
minConnections: 50,
maxConnections: 1000,
autoDialInterval: 1000
})
})
return components
}
describe('ping', () => {
let localComponents: Components
let remoteComponents: Components
beforeEach(async () => {
localComponents = await createComponents(0)
remoteComponents = await createComponents(1)
await Promise.all([
start(localComponents),
start(remoteComponents)
])
})
afterEach(async () => {
sinon.restore()
await Promise.all([
stop(localComponents),
stop(remoteComponents)
])
})
it('should be able to ping another peer', async () => {
const localPing = new PingService(localComponents, defaultInit)
const remotePing = new PingService(remoteComponents, defaultInit)
await start(localPing)
await start(remotePing)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// Run ping
await expect(localPing.ping(remoteComponents.getPeerId())).to.eventually.be.gte(0)
})
it('should time out pinging another peer when waiting for a pong', async () => {
const localPing = new PingService(localComponents, defaultInit)
const remotePing = new PingService(remoteComponents, defaultInit)
await start(localPing)
await start(remotePing)
// simulate connection between nodes
const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents)
localComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: localToRemote }))
remoteComponents.getUpgrader().dispatchEvent(new CustomEvent('connection', { detail: remoteToLocal }))
// replace existing handler with a really slow one
await remoteComponents.getRegistrar().unhandle(remotePing.protocol)
await remoteComponents.getRegistrar().handle(remotePing.protocol, ({ stream }) => {
void pipe(
stream,
async function * (source) {
for await (const chunk of source) {
// longer than the timeout
await delay(1000)
yield chunk
}
},
stream
)
})
const newStreamSpy = sinon.spy(localToRemote, 'newStream')
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// Run ping, should time out
await expect(localPing.ping(remoteComponents.getPeerId(), {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
const { stream } = await newStreamSpy.getCall(0).returnValue
expect(stream).to.have.nested.property('timeline.close')
})
})

View File

@ -14,7 +14,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import swarmKey from '../fixtures/swarm.key.js'
import { DefaultUpgrader } from '../../src/upgrader.js'
import { codes } from '../../src/errors.js'
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar } from '@libp2p/interface-compliance-tests/mocks'
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-compliance-tests/mocks'
import Peers from '../fixtures/peers.js'
import type { Upgrader } from '@libp2p/interfaces/transport'
import type { PeerId } from '@libp2p/interfaces/peer-id'
@ -27,6 +27,9 @@ import type { Stream } from '@libp2p/interfaces/connection'
import pDefer from 'p-defer'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { pEvent } from 'p-event'
import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay'
import drain from 'it-drain'
const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/0'),
@ -35,6 +38,7 @@ const addrs = [
describe('Upgrader', () => {
let localUpgrader: Upgrader
let localMuxerFactory: StreamMuxerFactory
let remoteUpgrader: Upgrader
let localPeer: PeerId
let remotePeer: PeerId
@ -55,12 +59,13 @@ describe('Upgrader', () => {
connectionGater: mockConnectionGater(),
registrar: mockRegistrar()
})
localMuxerFactory = new Mplex()
localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
new Plaintext()
],
muxers: [
new Mplex()
localMuxerFactory
]
})
@ -366,6 +371,40 @@ describe('Upgrader', () => {
expect(result).to.have.nested.property('reason.code', codes.ERR_UNSUPPORTED_PROTOCOL)
})
})
it('should abort protocol selection for slow streams', async () => {
const createStreamMuxerSpy = sinon.spy(localMuxerFactory, 'createStreamMuxer')
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
// 10 ms timeout
const timeoutController = new TimeoutController(10)
// should have created muxer for connection
expect(createStreamMuxerSpy).to.have.property('callCount', 1)
// create mock muxed stream that never sends data
const muxer = createStreamMuxerSpy.getCall(0).returnValue
muxer.newStream = () => {
return mockStream({
source: (async function * () {
// longer than the timeout
await delay(1000)
yield new Uint8Array()
}()),
sink: drain
})
}
await expect(connections[0].newStream('/echo/1.0.0', {
signal: timeoutController.signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
})
})
describe('libp2p.upgrader', () => {