Add initTopicAndSubscribeBlocking (#56)

This commit is contained in:
folex 2021-09-13 15:30:43 +03:00 committed by GitHub
parent c498f6c15d
commit c19ee1293e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 4 deletions

View File

@ -40,6 +40,28 @@ func initTopicAndSubscribe(topic: string, value: string, relay_id: ?PeerId, serv
AquaDHT.register_key(topic, t, false, 0)
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
-- Create a topic and subscribe to it
-- %init_peer_id% (current client) will become a subscriber
-- In contrast with non-blocking version, waits for at least a single write to succeed
func initTopicAndSubscribeBlocking(
topic: string, value: string,
relay_id: ?PeerId, service_id: ?string,
progress: string -> ()
) -> DhtResult:
results: *DhtResult
on HOST_PEER_ID:
nodes <- getNeighbours(topic)
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
AquaDHT.register_key(topic, t, false, 0)
result <- AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
if result.success:
results <<- result
co progress(n)
<- results!0
-- Create a topic and make the given node a subscriber to it
func initTopicAndSubscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string):
on subscriber_node_id:

View File

@ -1,3 +1,3 @@
import initTopicAndSubscribe, findSubscribers from "@fluencelabs/aqua-dht/pubsub.aqua"
import initTopicAndSubscribeBlocking, findSubscribers from "@fluencelabs/aqua-dht/pubsub.aqua"
export initTopicAndSubscribe, findSubscribers
export initTopicAndSubscribeBlocking, findSubscribers

View File

@ -1,6 +1,6 @@
import { Fluence } from "@fluencelabs/fluence";
import { krasnodar } from "@fluencelabs/fluence-network-environment";
import { initTopicAndSubscribe, findSubscribers } from "./generated/export";
import { initTopicAndSubscribeBlocking, findSubscribers } from "./generated/export";
async function main() {
// connect to the Fluence network
@ -9,7 +9,10 @@ async function main() {
let value = "myValue";
// create topic (if not exists) and subscribe on it
let relay = Fluence.getStatus().relayPeerId;
await initTopicAndSubscribe(topic, value, relay, null);
await initTopicAndSubscribeBlocking(
topic, value, relay, null,
(s) => console.log(`node ${s} saved the record`)
);
// find other peers subscribed to that topic
let subscribers = await findSubscribers(topic);
console.log("found subscribers:", subscribers);