mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-24 17:52:14 +00:00
Pubsub aqua (#19)
* add branch to ci to test release * add some comments * release! * delete release branch * DHT api in terms of PubSub Co-authored-by: DieMyst <dmitry.shakhtarin@fluence.ai>
This commit is contained in:
parent
a24a530be1
commit
19d4a28f61
3611
npm/package-lock.json
generated
3611
npm/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@ -33,7 +33,7 @@
|
||||
},
|
||||
"homepage": "https://github.com/fluencelabs/aqua-dht#readme",
|
||||
"devDependencies": {
|
||||
"@fluencelabs/aqua-cli": "^0.1.4-136",
|
||||
"@fluencelabs/aqua-cli": "^0.1.5-140",
|
||||
"typescript": "^3.9.5"
|
||||
}
|
||||
}
|
||||
|
100
npm/pubsub.aqua
Normal file
100
npm/pubsub.aqua
Normal file
@ -0,0 +1,100 @@
|
||||
import "dht.aqua"
|
||||
import "@fluencelabs/aqua-lib/builtin.aqua"
|
||||
|
||||
-- Get peers closest to the topic's hash in Kademlia network
|
||||
-- These peers are expected to store list of subscribers of this topic
|
||||
func getNeighbours(node_id: PeerId, topic: string) -> []PeerId:
|
||||
on node_id:
|
||||
k <- Op.string_to_b58(topic)
|
||||
nodes <- Kademlia.neighborhood(k, false)
|
||||
<- nodes
|
||||
|
||||
-- If this peer have set node_id as a subscriber for topic,
|
||||
-- this call will prevent subscriber from re-subscribing
|
||||
-- so that eventually it will disappear from the subscribers list
|
||||
func removeSubscriber(node_id: PeerId, topic: string):
|
||||
on node_id:
|
||||
t <- Peer.timestamp_sec()
|
||||
AquaDHT.clear_host_value(topic, t)
|
||||
|
||||
-- Create a topic: register it on the closest peers
|
||||
-- node_id is a peer with Kademlia access to start with
|
||||
func initTopic(node_id: PeerId, topic: string):
|
||||
nodes <- getNeighbours(node_id, topic)
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
t <- Peer.timestamp_sec()
|
||||
AquaDHT.register_key(topic, t, false, 0)
|
||||
|
||||
-- Create a topic and subscribe to it
|
||||
-- %init_peer_id% (current client) will become a subscriber
|
||||
func initTopicAndSubscribe(node_id: PeerId, topic: string, value: string, relay_id: ?PeerId, service_id: ?string):
|
||||
nodes <- getNeighbours(node_id, topic)
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
t <- Peer.timestamp_sec()
|
||||
AquaDHT.register_key(topic, t, false, 0)
|
||||
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
|
||||
|
||||
-- Create a topic and make the given node a subscriber to it
|
||||
func initTopicAndSubscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string):
|
||||
nil: ?string
|
||||
on subscriber_node_id:
|
||||
t <- Peer.timestamp_sec()
|
||||
AquaDHT.register_key(topic, t, false, 0)
|
||||
r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0)
|
||||
nodes <- getNeighbours(subscriber_node_id, topic)
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
tt <- Peer.timestamp_sec()
|
||||
AquaDHT.register_key(topic, tt, false, 0)
|
||||
AquaDHT.propagate_host_value(r, tt, 0)
|
||||
|
||||
-- Subscribe to a topic
|
||||
-- Note: topic must be already initiated
|
||||
func subscribe(node_id: PeerId, topic: string, value: string, relay_id: ?PeerId, service_id: ?string):
|
||||
nodes <- getNeighbours(node_id, topic)
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
t <- Peer.timestamp_sec()
|
||||
AquaDHT.put_value(topic, value, t, relay_id, service_id, 0)
|
||||
|
||||
-- Subscribe a node to the given topic
|
||||
-- Note: topic must be already initiated
|
||||
func subscribeNode(subscriber_node_id: PeerId, topic: string, value: string, service_id: ?string):
|
||||
on subscriber_node_id:
|
||||
nil: ?string
|
||||
t <- Peer.timestamp_sec()
|
||||
r <- AquaDHT.put_host_value(topic, value, t, nil, service_id, 0)
|
||||
nodes <- getNeighbours(subscriber_node_id, topic)
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
tt <- Peer.timestamp_sec()
|
||||
AquaDHT.register_key(topic, tt, false, 0)
|
||||
AquaDHT.propagate_host_value(r, tt, 0)
|
||||
|
||||
-- Find the list of subscribers for the given topic
|
||||
func findSubscribers(node_id: PeerId, topic: string) -> []Record:
|
||||
nodes <- getNeighbours(node_id, topic)
|
||||
res: *GetValuesResult
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
t <- Peer.timestamp_sec()
|
||||
res <- AquaDHT.get_values(topic, t)
|
||||
on node_id:
|
||||
v <- AquaDHT.merge_two(res!.result, res!1.result)
|
||||
<- v.result
|
||||
|
||||
-- Execute the given code on subscribers
|
||||
-- Note that you can provide another Aqua function as an argument to this one
|
||||
func executeOnSubscribers(node_id: PeerId, topic: string, call: Record -> ()):
|
||||
subs <- findSubscribers(node_id, topic)
|
||||
for r <- subs par:
|
||||
on r.peer_id via r.relay_id:
|
||||
call(r)
|
Loading…
x
Reference in New Issue
Block a user