module Aqua.DHT.PubSub declares * import "dht.aqua" import "@fluencelabs/aqua-lib/builtin.aqua" -- Get peers closest to the topic's hash in Kademlia network -- These peers are expected to store list of subscribers of this topic func getNeighbours(topic: string) -> []PeerId: k <- Op.string_to_b58(topic) nodes <- Kademlia.neighborhood(k, nil, nil) <- nodes -- If this peer have set node_id as a subscriber for topic, -- this call will prevent subscriber from re-subscribing -- so that eventually it will disappear from the subscribers list func removeSubscriber(topic: string): on HOST_PEER_ID: t <- Peer.timestamp_sec() AquaDHT.clear_host_value(topic, t) -- Create a topic: register it on the closest peers func initTopic(topic: string): on HOST_PEER_ID: nodes <- getNeighbours(topic) for n <- nodes par: on n: try: t <- Peer.timestamp_sec() AquaDHT.register_key(topic, t, false, 0) -- Create a topic and subscribe to it -- %init_peer_id% (current client) will become a subscriber func initTopicAndSubscribe(topic: string, value: string, relay_id: ?PeerId, service_id: ?string): on HOST_PEER_ID: nodes <- getNeighbours(topic) for n <- nodes par: on n: try: t <- Peer.timestamp_sec() AquaDHT.register_key(topic, t, false, 0) AquaDHT.put_value(topic, value, t, relay_id, service_id, 0) -- Create a topic and subscribe to it -- %init_peer_id% (current client) will become a subscriber -- In contrast with non-blocking version, waits for at least a single write to succeed func initTopicAndSubscribeBlocking( topic: string, value: string, relay_id: ?PeerId, service_id: ?string, progress: string -> () ) -> DhtResult: results: *DhtResult on HOST_PEER_ID: nodes <- getNeighbours(topic) for n <- nodes par: on n: try: t <- Peer.timestamp_sec() AquaDHT.register_key(topic, t, false, 0) result <- AquaDHT.put_value(topic, value, t, relay_id, service_id, 0) if result.success: results <<- result progress(n) <- results!0 -- Create a topic and make the given node a subscriber to it func initTopicAndSubscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string): on subscriber_node_id: t <- Peer.timestamp_sec() AquaDHT.register_key(topic, t, false, 0) r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0) nodes <- getNeighbours(topic) for n <- nodes par: on n: try: tt <- Peer.timestamp_sec() AquaDHT.register_key(topic, tt, false, 0) AquaDHT.propagate_host_value(r, tt, 0) -- Subscribe to a topic -- Note: topic must be already initiated func subscribe(topic: string, value: string, relay_id: ?PeerId, service_id: ?string): on HOST_PEER_ID: nodes <- getNeighbours(topic) for n <- nodes par: on n: try: t <- Peer.timestamp_sec() AquaDHT.put_value(topic, value, t, relay_id, service_id, 0) -- Subscribe a node to the given topic -- Note: topic must be already initiated func subscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string): on subscriber_node_id: t <- Peer.timestamp_sec() r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0) nodes <- getNeighbours(topic) for n <- nodes par: on n: try: tt <- Peer.timestamp_sec() AquaDHT.register_key(topic, tt, false, 0) AquaDHT.propagate_host_value(r, tt, 0) -- Find the list of subscribers for the given topic func findSubscribers(topic: string) -> []Record: on HOST_PEER_ID: nodes <- getNeighbours(topic) res: *GetValuesResult for n <- nodes par: on n: try: t <- Peer.timestamp_sec() res <- AquaDHT.get_values(topic, t) v <- AquaDHT.merge_two(res!.result, res!1.result) <- v.result -- Execute the given code on subscribers -- Note that you can provide another Aqua function as an argument to this one func executeOnSubscribers(topic: string, call: Record -> ()): subs <- findSubscribers(topic) for r <- subs par: on r.peer_id via r.relay_id: call(r)