fix: use placeholder dht/pubsub (#1193)

Instead of making the `.dht` and `.pubsub` properties optional, use dummy implementations that throw exceptions if they are not configured.

This way we don't have to null guard everywhere they are accessed.
This commit is contained in:
Alex Potsides 2022-04-21 15:46:06 +01:00 committed by GitHub
parent 147304449e
commit 5397137c65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 186 additions and 78 deletions

View File

@ -4,7 +4,7 @@ import { createLibp2p } from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
import { FloodSub } from '@libp2p/floodsub'
import { Bootstrap } from '@libp2p/bootstrap'
import { PubSubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
@ -16,7 +16,7 @@ const createNode = async (bootstrappers) => {
transports: [new TCP()],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
pubsub: new Gossipsub(),
pubsub: new FloodSub(),
peerDiscovery: [
new Bootstrap({
list: bootstrappers
@ -40,7 +40,7 @@ const createNode = async (bootstrappers) => {
transports: [new TCP()],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
pubsub: new Gossipsub(),
pubsub: new FloodSub(),
peerDiscovery: [
new PubSubPeerDiscovery({
interval: 1000

View File

@ -9,8 +9,8 @@
},
"license": "MIT",
"dependencies": {
"@achingbrain/libp2p-gossipsub": "^0.13.5",
"@libp2p/pubsub-peer-discovery": "^5.0.1",
"@libp2p/floodsub": "^1.0.5",
"execa": "^2.1.0",
"fs-extra": "^8.1.0",
"libp2p": "../",

View File

@ -4,10 +4,9 @@ import { createLibp2p } from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
import { FloodSub } from '@libp2p/floodsub'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { CustomEvent } from '@libp2p/interfaces'
const createNode = async () => {
const node = await createLibp2p({
@ -17,7 +16,7 @@ const createNode = async () => {
transports: [new TCP()],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
pubsub: new Gossipsub()
pubsub: new FloodSub()
})
await node.start()
@ -36,17 +35,19 @@ const createNode = async () => {
await node1.peerStore.addressBook.set(node2.peerId, node2.getMultiaddrs())
await node1.dial(node2.peerId)
node1.pubsub.addEventListener(topic, (evt) => {
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
node1.pubsub.subscribe(topic)
node1.pubsub.addEventListener('message', (evt) => {
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
})
// Will not receive own published messages by default
node2.pubsub.addEventListener(topic, (evt) => {
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
node2.pubsub.subscribe(topic)
node2.pubsub.addEventListener('message', (evt) => {
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
})
// node2 publishes "news" every second
setInterval(() => {
node2.pubsub.dispatchEvent(new CustomEvent(topic, { detail: uint8ArrayFromString('Bird bird bird, bird is the word!') }))
node2.pubsub.publish(topic, uint8ArrayFromString('Bird bird bird, bird is the word!'))
}, 1000)
})()

View File

@ -4,10 +4,9 @@ import { createLibp2p } from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
import { FloodSub } from '@libp2p/floodsub'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { CustomEvent } from '@libp2p/interfaces'
const createNode = async () => {
const node = await createLibp2p({
@ -17,7 +16,7 @@ const createNode = async () => {
transports: [new TCP()],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
pubsub: new Gossipsub()
pubsub: new FloodSub()
})
await node.start()
@ -45,7 +44,7 @@ const createNode = async () => {
// Will not receive own published messages by default
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
})
await node1.pubsub.subscribe(topic)
node1.pubsub.subscribe(topic)
node2.pubsub.addEventListener(topic, (evt) => {
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
@ -75,7 +74,7 @@ const createNode = async () => {
// car is not a fruit !
setInterval(() => {
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, { detail: uint8ArrayFromString(myFruits[count]) }))
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count]))
count++
if (count == myFruits.length) {
count = 0

View File

@ -96,16 +96,17 @@
},
"dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.0",
"@libp2p/connection": "^1.1.4",
"@libp2p/crypto": "^0.22.9",
"@libp2p/interfaces": "^1.3.21",
"@libp2p/logger": "^1.1.3",
"@libp2p/multistream-select": "^1.0.3",
"@libp2p/peer-id": "^1.1.8",
"@libp2p/peer-id-factory": "^1.0.8",
"@libp2p/connection": "^1.1.5",
"@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^1.3.22",
"@libp2p/logger": "^1.1.4",
"@libp2p/multistream-select": "^1.0.4",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/peer-record": "^1.0.8",
"@libp2p/peer-store": "^1.0.6",
"@libp2p/utils": "^1.0.9",
"@libp2p/peer-store": "^1.0.10",
"@libp2p/tracked-map": "^1.0.5",
"@libp2p/utils": "^1.0.10",
"@multiformats/mafmt": "^11.0.2",
"@multiformats/multiaddr": "^10.1.8",
"abortable-iterator": "^4.0.2",
@ -128,6 +129,7 @@
"it-length-prefixed": "^7.0.1",
"it-map": "^1.0.6",
"it-merge": "^1.0.3",
"it-pair": "^2.0.2",
"it-pipe": "^2.0.3",
"it-sort": "^1.0.1",
"it-stream-types": "^1.0.4",
@ -154,25 +156,23 @@
"xsalsa20": "^1.1.0"
},
"devDependencies": {
"@achingbrain/libp2p-gossipsub": "^0.13.5",
"@chainsafe/libp2p-noise": "^6.0.1",
"@libp2p/bootstrap": "^1.0.2",
"@libp2p/daemon-client": "^1.0.0",
"@libp2p/daemon-server": "^1.0.0",
"@libp2p/bootstrap": "^1.0.3",
"@libp2p/daemon-client": "^1.0.2",
"@libp2p/daemon-server": "^1.0.2",
"@libp2p/delegated-content-routing": "^1.0.2",
"@libp2p/delegated-peer-routing": "^1.0.2",
"@libp2p/floodsub": "^1.0.2",
"@libp2p/interface-compliance-tests": "^1.1.20",
"@libp2p/floodsub": "^1.0.5",
"@libp2p/interface-compliance-tests": "^1.1.23",
"@libp2p/interop": "^1.0.3",
"@libp2p/kad-dht": "^1.0.5",
"@libp2p/mdns": "^1.0.3",
"@libp2p/mplex": "^1.0.1",
"@libp2p/pubsub": "^1.2.14",
"@libp2p/tcp": "^1.0.6",
"@libp2p/kad-dht": "^1.0.7",
"@libp2p/mdns": "^1.0.4",
"@libp2p/mplex": "^1.0.3",
"@libp2p/pubsub": "^1.2.18",
"@libp2p/tcp": "^1.0.8",
"@libp2p/topology": "^1.1.7",
"@libp2p/tracked-map": "^1.0.4",
"@libp2p/webrtc-star": "^1.0.3",
"@libp2p/websockets": "^1.0.3",
"@libp2p/webrtc-star": "^1.0.7",
"@libp2p/websockets": "^1.0.6",
"@nodeutils/defaults-deep": "^1.1.0",
"@types/node": "^16.11.26",
"@types/node-forge": "^1.0.0",
@ -187,7 +187,6 @@
"go-libp2p": "^0.0.6",
"into-stream": "^7.0.0",
"ipfs-http-client": "^56.0.1",
"it-pair": "^2.0.2",
"it-pushable": "^2.0.1",
"nock": "^13.0.3",
"npm-run-all": "^4.1.5",

51
src/dht/dummy-dht.ts Normal file
View File

@ -0,0 +1,51 @@
import type { DualDHT, QueryEvent, SingleDHT } from '@libp2p/interfaces/dht'
import type { PeerDiscoveryEvents } from '@libp2p/interfaces/peer-discovery'
import errCode from 'err-code'
import { messages, codes } from '../errors.js'
import { EventEmitter } from '@libp2p/interfaces'
export class DummyDHT extends EventEmitter<PeerDiscoveryEvents> implements DualDHT {
get wan (): SingleDHT {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
get lan (): SingleDHT {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
get (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
findProviders (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
findPeer (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
getClosestPeers (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
provide (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
put (): AsyncIterable<QueryEvent> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
async getMode (): Promise<'client' | 'server'> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
async setMode (): Promise<void> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
async refreshRoutingTable (): Promise<void> {
throw errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)
}
}

View File

@ -1,6 +1,7 @@
export enum messages {
NOT_STARTED_YET = 'The libp2p node is not started yet',
DHT_DISABLED = 'DHT is not available',
PUBSUB_DISABLED = 'PubSub is not available',
CONN_ENCRYPTION_REQUIRED = 'At least one connection encryption module is required',
ERR_TRANSPORTS_REQUIRED = 'At least one transport module is required',
ERR_PROTECTOR_REQUIRED = 'Private network is enforced, but no protector was provided',
@ -9,6 +10,7 @@ export enum messages {
export enum codes {
DHT_DISABLED = 'ERR_DHT_DISABLED',
ERR_PUBSUB_DISABLED = 'ERR_PUBSUB_DISABLED',
PUBSUB_NOT_STARTED = 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED = 'ERR_DHT_NOT_STARTED',
CONN_ENCRYPTION_REQUIRED = 'ERR_CONN_ENCRYPTION_REQUIRED',

View File

@ -154,9 +154,8 @@ export interface Libp2p extends Startable, EventEmitter<Libp2pEvents> {
connectionManager: ConnectionManager
registrar: Registrar
metrics?: Metrics
pubsub?: PubSub
dht?: DualDHT
pubsub: PubSub
dht: DualDHT
/**
* Load keychain keys from the datastore.

View File

@ -44,13 +44,15 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import errCode from 'err-code'
import { unmarshalPublicKey } from '@libp2p/crypto/keys'
import type { Metrics } from '@libp2p/interfaces/metrics'
import { DummyDHT } from './dht/dummy-dht.js'
import { DummyPubSub } from './pubsub/dummy-pubsub.js'
const log = logger('libp2p')
export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
public peerId: PeerId
public dht?: DualDHT
public pubsub?: PubSub
public dht: DualDHT
public pubsub: PubSub
public identifyService?: IdentifyService
public fetchService: FetchService
public pingService: PingService
@ -168,11 +170,15 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
// dht provided components (peerRouting, contentRouting, dht)
if (init.dht != null) {
this.dht = this.components.setDHT(this.configureComponent(init.dht))
} else {
this.dht = new DummyDHT()
}
// Create pubsub if provided
if (init.pubsub != null) {
this.pubsub = this.components.setPubSub(this.configureComponent(init.pubsub))
} else {
this.pubsub = new DummyPubSub()
}
// Attach remaining APIs
@ -180,7 +186,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
const peerRouters: PeerRouting[] = (init.peerRouters ?? []).map(component => this.configureComponent(component))
if (this.dht != null) {
if (init.dht != null) {
// add dht to routers
peerRouters.push(this.configureComponent(new DHTPeerRouting(this.dht)))
@ -197,7 +203,7 @@ export class Libp2pNode extends EventEmitter<Libp2pEvents> implements Libp2p {
const contentRouters: ContentRouting[] = (init.contentRouters ?? []).map(component => this.configureComponent(component))
if (this.dht != null) {
if (init.dht != null) {
// add dht to routers
contentRouters.push(this.configureComponent(new DHTContentRouting(this.dht)))
}

View File

@ -0,0 +1,51 @@
import { EventEmitter } from '@libp2p/interfaces'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interfaces/pubsub'
import errCode from 'err-code'
import { messages, codes } from '../errors.js'
export class DummyPubSub extends EventEmitter<PubSubEvents> implements PubSub {
isStarted (): boolean {
return false
}
start (): void | Promise<void> {
}
stop (): void | Promise<void> {
}
get globalSignaturePolicy (): typeof StrictSign | typeof StrictNoSign {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
get multicodecs (): string[] {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
getPeers (): PeerId[] {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
getTopics (): string[] {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
subscribe (): void {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
unsubscribe (): void {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
getSubscribers (): PeerId[] {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
publish (): void {
throw errCode(new Error(messages.PUBSUB_DISABLED), codes.ERR_PUBSUB_DISABLED)
}
}

View File

@ -7,7 +7,6 @@ import delay from 'delay'
import { createLibp2p, Libp2p } from '../../src/index.js'
import { baseOptions, pubsubSubsystemOptions } from './utils.js'
import { createPeerId } from '../utils/creators/peer.js'
import { CustomEvent } from '@libp2p/interfaces'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { FloodSub } from '@libp2p/floodsub'
import type { PubSub } from '@libp2p/interfaces/pubsub'
@ -21,14 +20,16 @@ describe('Pubsub subsystem is configurable', () => {
}
})
it('should not exist if no module is provided', async () => {
it('should throw if no module is provided', async () => {
libp2p = await createLibp2p(baseOptions)
expect(libp2p.pubsub).to.not.exist()
await libp2p.start()
expect(() => libp2p.pubsub.getTopics()).to.throw()
})
it('should exist if the module is provided', async () => {
it('should not throw if the module is provided', async () => {
libp2p = await createLibp2p(pubsubSubsystemOptions)
expect(libp2p.pubsub).to.exist()
await libp2p.start()
expect(libp2p.pubsub.getTopics()).to.be.empty()
})
it('should start and stop by default once libp2p starts', async () => {
@ -39,13 +40,13 @@ describe('Pubsub subsystem is configurable', () => {
})
libp2p = await createLibp2p(customOptions)
expect(libp2p.pubsub?.isStarted()).to.equal(false)
expect(libp2p.pubsub.isStarted()).to.equal(false)
await libp2p.start()
expect(libp2p.pubsub?.isStarted()).to.equal(true)
expect(libp2p.pubsub.isStarted()).to.equal(true)
await libp2p.stop()
expect(libp2p.pubsub?.isStarted()).to.equal(false)
expect(libp2p.pubsub.isStarted()).to.equal(false)
})
})
@ -87,16 +88,14 @@ describe('Pubsub subscription handlers adapter', () => {
throw new Error('Pubsub was not enabled')
}
pubsub.addEventListener(topic, handler)
pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, {
detail: uint8ArrayFromString('useless-data')
}))
pubsub.subscribe(topic)
pubsub.addEventListener('message', handler)
pubsub.publish(topic, uint8ArrayFromString('useless-data'))
await defer.promise
pubsub.removeEventListener(topic, handler)
pubsub.dispatchEvent(new CustomEvent<Uint8Array>(topic, {
detail: uint8ArrayFromString('useless-data')
}))
pubsub.unsubscribe(topic)
pubsub.removeEventListener('message', handler)
pubsub.publish(topic, uint8ArrayFromString('useless-data'))
// wait to guarantee that the handler is not called twice
await delay(100)

View File

@ -13,15 +13,17 @@ describe('DHT subsystem is configurable', () => {
}
})
it('should not exist if no module is provided', async () => {
it('should throw if no module is provided', async () => {
libp2p = await createLibp2p(createSubsystemOptions({
dht: undefined
}))
expect(libp2p.dht).to.not.exist()
await libp2p.start()
await expect(libp2p.dht.getMode()).to.eventually.be.rejected()
})
it('should exist if the module is provided', async () => {
it('should not throw if the module is provided', async () => {
libp2p = await createLibp2p(createSubsystemOptions())
expect(libp2p.dht).to.exist()
await libp2p.start()
await expect(libp2p.dht.getMode()).to.eventually.equal('client')
})
})

View File

@ -78,8 +78,8 @@ describe('DHT subsystem operates correctly', () => {
expect(connection).to.exist()
return await Promise.all([
pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1),
pWaitFor(() => remoteLibp2p.dht?.lan.routingTable.size === 1)
pWaitFor(() => libp2p.dht.lan.routingTable.size === 1),
pWaitFor(() => remoteLibp2p.dht.lan.routingTable.size === 1)
])
})
@ -89,8 +89,8 @@ describe('DHT subsystem operates correctly', () => {
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
await Promise.all([
pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1),
pWaitFor(() => remoteLibp2p.dht?.lan.routingTable.size === 1)
pWaitFor(() => libp2p.dht.lan.routingTable.size === 1),
pWaitFor(() => remoteLibp2p.dht.lan.routingTable.size === 1)
])
await libp2p.components.getContentRouting().put(key, value)
@ -141,7 +141,7 @@ describe('DHT subsystem operates correctly', () => {
const connection = await libp2p.dial(remAddr)
expect(connection).to.exist()
expect(libp2p.dht?.lan.routingTable).to.be.empty()
expect(libp2p.dht.lan.routingTable).to.be.empty()
const dht = remoteLibp2p.dht
@ -151,9 +151,9 @@ describe('DHT subsystem operates correctly', () => {
// should be 0 directly after start - TODO this may be susceptible to timing bugs, we should have
// the ability to report stats on the DHT routing table instead of reaching into it's heart like this
expect(remoteLibp2p.dht?.lan.routingTable).to.be.empty()
expect(remoteLibp2p.dht.lan.routingTable).to.be.empty()
return await pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1)
return await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1)
})
it('should put on a peer and get from the other', async () => {
@ -168,7 +168,7 @@ describe('DHT subsystem operates correctly', () => {
await dht.start()
}
await pWaitFor(() => libp2p.dht?.lan.routingTable.size === 1)
await pWaitFor(() => libp2p.dht.lan.routingTable.size === 1)
await libp2p.components.getContentRouting().put(key, value)
const fetchedValue = await remoteLibp2p.components.getContentRouting().get(key)

View File

@ -17,7 +17,6 @@ import { unmarshalPrivateKey } from '@libp2p/crypto/keys'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import { peerIdFromKeys } from '@libp2p/peer-id'
import { FloodSub } from '@libp2p/floodsub'
import { Gossipsub } from '@achingbrain/libp2p-gossipsub'
// IPFS_LOGGING=debug DEBUG=libp2p*,go-libp2p:* npm run test:interop
@ -122,7 +121,7 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> {
if (options.pubsubRouter === 'floodsub') {
opts.pubsub = new FloodSub()
} else {
opts.pubsub = new Gossipsub()
opts.pubsub = new FloodSub()
}
}