docs: update message filtering example (#1362)

Updates the example to use the new pubsub `addEventListener`-style API along with the README.

Also updates the test to actually test that the relevant messages were received.

Fixes https://github.com/libp2p/js-libp2p/issues/1288
This commit is contained in:
Alex Potsides 2022-08-30 10:33:39 +02:00 committed by GitHub
parent 1f38ab7ac8
commit 0e7096d527
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 69 deletions

View File

@ -40,24 +40,42 @@ const createNode = async () => {
await node2.dial(node3.peerId) await node2.dial(node3.peerId)
//subscribe //subscribe
node1.pubsub.addEventListener(topic, (evt) => { node1.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}
// Will not receive own published messages by default // Will not receive own published messages by default
console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`) console.log(`node1 received: ${uint8ArrayToString(evt.detail.data)}`)
}) })
node1.pubsub.subscribe(topic) node1.pubsub.subscribe(topic)
node2.pubsub.addEventListener(topic, (evt) => { node2.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}
console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`) console.log(`node2 received: ${uint8ArrayToString(evt.detail.data)}`)
}) })
node2.pubsub.subscribe(topic)
node3.pubsub.addEventListener('message', (evt) => {
if (evt.detail.topic !== topic) {
return
}
node3.pubsub.addEventListener(topic, (evt) => {
console.log(`node3 received: ${uint8ArrayToString(evt.detail.data)}`) console.log(`node3 received: ${uint8ArrayToString(evt.detail.data)}`)
}) })
node3.pubsub.subscribe(topic)
// wait for subscriptions to propagate
await delay(1000)
const validateFruit = (msgTopic, msg) => { const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data) const fruit = uint8ArrayToString(msg.data)
const validFruit = ['banana', 'apple', 'orange'] const validFruit = ['banana', 'apple', 'orange']
// car is not a fruit !
if (!validFruit.includes(fruit)) { if (!validFruit.includes(fruit)) {
throw new Error('no valid fruit received') throw new Error('no valid fruit received')
} }
@ -68,18 +86,19 @@ const createNode = async () => {
node2.pubsub.topicValidators.set(topic, validateFruit) node2.pubsub.topicValidators.set(topic, validateFruit)
node3.pubsub.topicValidators.set(topic, validateFruit) node3.pubsub.topicValidators.set(topic, validateFruit)
// node1 publishes "fruits" every five seconds // node1 publishes "fruits"
var count = 0; for (const fruit of ['banana', 'apple', 'car', 'orange']) {
const myFruits = ['banana', 'apple', 'car', 'orange']; console.log('############## fruit ' + fruit + ' ##############')
// car is not a fruit ! await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
setInterval(() => { }
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, uint8ArrayFromString(myFruits[count])).catch(err => { // wait a few seconds for messages to be received
console.info(err) await delay(5000)
}) console.log('############## all messages sent ##############')
count++
if (count == myFruits.length) {
count = 0
}
}, 5000)
})() })()
async function delay (ms) {
await new Promise((resolve) => {
setTimeout(() => resolve(), ms)
})
}

View File

@ -48,18 +48,30 @@ Now we' can subscribe to the fruit topic and log incoming messages.
```JavaScript ```JavaScript
const topic = 'fruit' const topic = 'fruit'
node1.pubsub.on(topic, (msg) => { node1.pubsub.addEventListener('message', (msg) => {
if (msg.detail.topic !== topic) {
return
}
console.log(`node1 received: ${uint8ArrayToString(msg.data)}`) console.log(`node1 received: ${uint8ArrayToString(msg.data)}`)
}) })
await node1.pubsub.subscribe(topic) await node1.pubsub.subscribe(topic)
node2.pubsub.on(topic, (msg) => { node2.pubsub.addEventListener('message', (msg) => {
if (msg.detail.topic !== topic) {
return
}
console.log(`node2 received: ${uint8ArrayToString(msg.data)}`) console.log(`node2 received: ${uint8ArrayToString(msg.data)}`)
}) })
await node2.pubsub.subscribe(topic) await node2.pubsub.subscribe(topic)
node3.pubsub.on(topic, (msg) => { node3.pubsub.addEventListener('message', (msg) => {
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`) if (msg.detail.topic !== topic) {
return
}
console.log(`node3 received: ${uint8ArrayToString(msg.data)}`)
}) })
await node3.pubsub.subscribe(topic) await node3.pubsub.subscribe(topic)
``` ```
@ -83,19 +95,10 @@ node3.pubsub.topicValidators.set(topic, validateFruit)
In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared. In this example, node one has an outdated version of the system, or is a malicious node. When it tries to publish fruit, the messages are re-shared and all the nodes share the message. However, when it tries to publish a vehicle the message is not re-shared.
```JavaScript ```JavaScript
var count = 0; for (const fruit of ['banana', 'apple', 'car', 'orange']) {
const myFruits = ['banana', 'apple', 'car', 'orange']; console.log('############## fruit ' + fruit + ' ##############')
await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
setInterval(() => { }
console.log('############## fruit ' + myFruits[count] + ' ##############')
node1.pubsub.publish(topic, new TextEncoder().encode(myFruits[count])).catch(err => {
console.error(err)
})
count++
if (count == myFruits.length) {
count = 0
}
}, 5000)
``` ```
Result Result

View File

@ -6,29 +6,11 @@ import { fileURLToPath } from 'url'
const __dirname = path.dirname(fileURLToPath(import.meta.url)) const __dirname = path.dirname(fileURLToPath(import.meta.url))
const stdout = [ // holds messages received by peers
{ const messages = {}
topic: 'banana',
messageCount: 2
},
{
topic: 'apple',
messageCount: 2
},
{
topic: 'car',
messageCount: 0
},
{
topic: 'orange',
messageCount: 2
},
]
export async function test () { export async function test () {
const defer = pDefer() const defer = pDefer()
let topicCount = 0
let topicMessageCount = 0
process.stdout.write('message-filtering/1.js\n') process.stdout.write('message-filtering/1.js\n')
@ -38,26 +20,27 @@ export async function test () {
}) })
proc.all.on('data', async (data) => { proc.all.on('data', async (data) => {
// End
if (topicCount === stdout.length) {
defer.resolve()
proc.all.removeAllListeners('data')
}
process.stdout.write(data) process.stdout.write(data)
const line = uint8ArrayToString(data) const line = uint8ArrayToString(data)
if (stdout[topicCount] && line.includes(stdout[topicCount].topic)) { // End
// Validate previous number of messages if (line.includes('all messages sent')) {
if (topicCount > 0 && topicMessageCount > stdout[topicCount - 1].messageCount) { if (messages.car > 0) {
defer.reject() defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
throw new Error(`topic ${stdout[topicCount - 1].topic} had ${topicMessageCount} messages instead of ${stdout[topicCount - 1].messageCount}`)
} }
topicCount++ for (const fruit of ['banana', 'apple', 'orange']) {
topicMessageCount = 0 if (messages[fruit] !== 2) {
} else { defer.reject(new Error(`Not enough ${fruit} messages - received ${messages[fruit] ?? 0}, expected 2`))
topicMessageCount++ }
}
defer.resolve()
}
if (line.includes('received:')) {
const fruit = line.split('received:')[1].trim()
messages[fruit] = (messages[fruit] ?? 0) + 1
} }
}) })