mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-24 17:52:14 +00:00
124 lines
4.1 KiB
Plaintext
124 lines
4.1 KiB
Plaintext
module AquaDHT.PubSub declares *
|
|
|
|
import "dht.aqua"
|
|
import "@fluencelabs/aqua-lib/builtin.aqua"
|
|
|
|
-- Get peers closest to the topic's hash in Kademlia network
|
|
-- These peers are expected to store list of subscribers of this topic
|
|
func getNeighbours(topic: string) -> []PeerId:
|
|
k <- Op.string_to_b58(topic)
|
|
nodes <- Kademlia.neighborhood(k, nil, nil)
|
|
<- nodes
|
|
|
|
-- If this peer have set node_id as a subscriber for topic,
|
|
-- this call will prevent subscriber from re-subscribing
|
|
-- so that eventually it will disappear from the subscribers list
|
|
func removeSubscriber(topic: string):
|
|
on HOST_PEER_ID:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.clear_host_value(topic, t)
|
|
|
|
-- Create a topic: register it on the closest peers
|
|
func initTopic(topic: string):
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, t, false, 0)
|
|
|
|
-- Create a topic and subscribe to it
|
|
-- %init_peer_id% (current client) will become a subscriber
|
|
func initTopicAndSubscribe(topic: string, value: string, relay_id: ?PeerId, service_id: ?string):
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, t, false, 0)
|
|
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
|
|
|
|
-- Create a topic and subscribe to it
|
|
-- %init_peer_id% (current client) will become a subscriber
|
|
-- In contrast with non-blocking version, waits for at least a single write to succeed
|
|
func initTopicAndSubscribeBlocking(
|
|
topic: string, value: string,
|
|
relay_id: ?PeerId, service_id: ?string,
|
|
progress: string -> ()
|
|
) -> DhtResult:
|
|
results: *DhtResult
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, t, false, 0)
|
|
result <- AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
|
|
if result.success:
|
|
results <<- result
|
|
progress(n)
|
|
<- results!0
|
|
|
|
-- Create a topic and make the given node a subscriber to it
|
|
func initTopicAndSubscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string):
|
|
on subscriber_node_id:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, t, false, 0)
|
|
r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0)
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
tt <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, tt, false, 0)
|
|
AquaDHT.propagate_host_value(r, tt, 0)
|
|
|
|
-- Subscribe to a topic
|
|
-- Note: topic must be already initiated
|
|
func subscribe(topic: string, value: string, relay_id: ?PeerId, service_id: ?string):
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
|
|
|
|
-- Subscribe a node to the given topic
|
|
-- Note: topic must be already initiated
|
|
func subscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string):
|
|
on subscriber_node_id:
|
|
t <- Peer.timestamp_sec()
|
|
r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0)
|
|
nodes <- getNeighbours(topic)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
tt <- Peer.timestamp_sec()
|
|
AquaDHT.register_key(topic, tt, false, 0)
|
|
AquaDHT.propagate_host_value(r, tt, 0)
|
|
|
|
-- Find the list of subscribers for the given topic
|
|
func findSubscribers(topic: string) -> []Record:
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(topic)
|
|
res: *GetValuesResult
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
res <- AquaDHT.get_values(topic, t)
|
|
v <- AquaDHT.merge_two(res!.result, res!1.result)
|
|
<- v.result
|
|
|
|
-- Execute the given code on subscribers
|
|
-- Note that you can provide another Aqua function as an argument to this one
|
|
func executeOnSubscribers(topic: string, call: Record -> ()):
|
|
subs <- findSubscribers(topic)
|
|
for r <- subs par:
|
|
on r.peer_id via r.relay_id:
|
|
call(r)
|