diff --git a/aqua/pubsub.aqua b/aqua/pubsub.aqua index 23a9c7f..7f0cae8 100644 --- a/aqua/pubsub.aqua +++ b/aqua/pubsub.aqua @@ -40,6 +40,28 @@ func initTopicAndSubscribe(topic: string, value: string, relay_id: ?PeerId, serv AquaDHT.register_key(topic, t, false, 0) AquaDHT.put_value(topic, value, t, relay_id, service_id, 0) +-- Create a topic and subscribe to it +-- %init_peer_id% (current client) will become a subscriber +-- In contrast with non-blocking version, waits for at least a single write to succeed +func initTopicAndSubscribeBlocking( + topic: string, value: string, + relay_id: ?PeerId, service_id: ?string, + progress: string -> () +) -> DhtResult: + results: *DhtResult + on HOST_PEER_ID: + nodes <- getNeighbours(topic) + for n <- nodes par: + on n: + try: + t <- Peer.timestamp_sec() + AquaDHT.register_key(topic, t, false, 0) + result <- AquaDHT.put_value(topic, value, t, relay_id, service_id, 0) + if result.success: + results <<- result + co progress(n) + <- results!0 + -- Create a topic and make the given node a subscriber to it func initTopicAndSubscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string): on subscriber_node_id: diff --git a/example/src/aqua/export.aqua b/example/src/aqua/export.aqua index 87fd03a..8d9b47f 100644 --- a/example/src/aqua/export.aqua +++ b/example/src/aqua/export.aqua @@ -1,3 +1,3 @@ -import initTopicAndSubscribe, findSubscribers from "@fluencelabs/aqua-dht/pubsub.aqua" +import initTopicAndSubscribeBlocking, findSubscribers from "@fluencelabs/aqua-dht/pubsub.aqua" -export initTopicAndSubscribe, findSubscribers +export initTopicAndSubscribeBlocking, findSubscribers diff --git a/example/src/example.ts b/example/src/example.ts index f31f334..c21f3b6 100644 --- a/example/src/example.ts +++ b/example/src/example.ts @@ -1,6 +1,6 @@ import { Fluence } from "@fluencelabs/fluence"; import { krasnodar } from "@fluencelabs/fluence-network-environment"; -import { initTopicAndSubscribe, findSubscribers } from "./generated/export"; +import { initTopicAndSubscribeBlocking, findSubscribers } from "./generated/export"; async function main() { // connect to the Fluence network @@ -9,7 +9,10 @@ async function main() { let value = "myValue"; // create topic (if not exists) and subscribe on it let relay = Fluence.getStatus().relayPeerId; - await initTopicAndSubscribe(topic, value, relay, null); + await initTopicAndSubscribeBlocking( + topic, value, relay, null, + (s) => console.log(`node ${s} saved the record`) + ); // find other peers subscribed to that topic let subscribers = await findSubscribers(topic); console.log("found subscribers:", subscribers);