registry/aqua/routing.aqua

166 lines
5.6 KiB
Plaintext
Raw Normal View History

2022-02-24 16:37:58 +03:00
module Registry.Routing declares *
import "registry.aqua"
import "registry-api.aqua"
import "@fluencelabs/aqua-lib/builtin.aqua"
alias RouteId: string
func get_route_id(label: string, peer_id: string) -> RouteId:
route_id <- Registry.get_route_id(label, peer_id)
2022-02-24 16:37:58 +03:00
<- route_id
-- Get peers closest to the label's hash in Kademlia network
-- These peers are expected to store list of subscribers of this label
func getNeighbours(route_id: string) -> []PeerId:
k <- Op.string_to_b58(route_id)
nodes <- Kademlia.neighborhood(k, nil, nil)
<- nodes
-- If this peer have set node_id as a subscriber for label,
-- this call will prevent subscriber from re-subscribing
-- so that eventually it will disappear from the subscribers list
func removeFromRoute(route_id: string):
on HOST_PEER_ID:
t <- Peer.timestamp_sec()
Registry.clear_host_record(route_id, t)
-- Create a route: register it on the closest peers
func createRoute(label: string) -> RouteId:
t <- Peer.timestamp_sec()
signature <- get_route_signature(label, t)
2022-02-24 16:37:58 +03:00
on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%)
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
result <- register_route(label, t, signature, false)
2022-02-24 16:37:58 +03:00
<- route_id
-- Create a label and subscribe to it
-- %init_peer_id% (current client) will become a subscriber
func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, service_id: ?string) -> string:
t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
2022-02-24 16:37:58 +03:00
on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
on HOST_PEER_ID:
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
register_route(label, t, route_signature, false)
2022-02-24 16:37:58 +03:00
put_record(route_id, value, relay_id, service_id, t, record_signature)
<- route_id
-- Create a label and subscribe to it
-- %init_peer_id% (current client) will become a subscriber
-- In contrast with non-blocking version, waits for at least a single write to succeed
func createRouteAndRegisterBlocking(
label: string, value: string,
relay_id: ?PeerId, service_id: ?string,
progress: string -> (),
ack: i16
) -> string:
t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
2022-02-24 16:37:58 +03:00
on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
results: *DhtResult
on HOST_PEER_ID:
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
res1 <- register_route(label, t, route_signature, false)
2022-02-24 16:37:58 +03:00
result <- put_record(route_id, value, relay_id, service_id, t, record_signature)
if result.success:
results <<- result
progress(n)
join results[ack]
<- route_id
-- Create a label and make the given node a subscriber to it
func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string:
t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
2022-02-24 16:37:58 +03:00
on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
on subscriber_node_id:
register_route(label, t, route_signature, false)
2022-02-24 16:37:58 +03:00
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
register_route(label, t, route_signature, false)
2022-02-24 16:37:58 +03:00
propagate_host_record(r)
<- route_id
-- Subscribe to a label
-- Note: label must be already initiated
func registerForRoute(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string):
t <- Peer.timestamp_sec()
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
on HOST_PEER_ID:
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
put_record(route_id, value, relay_id, service_id, t, record_signature)
-- Subscribe a node to the given label
-- Note: label must be already initiated
func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string):
t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
2022-02-24 16:37:58 +03:00
on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
on subscriber_node_id:
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
register_route(label, t, route_signature, false)
2022-02-24 16:37:58 +03:00
propagate_host_record(r)
-- Find the list of record for the given route_id
func resolveRoute(route_id: string, ack: i16) -> []Record:
on HOST_PEER_ID:
nodes <- getNeighbours(route_id)
res: *[]Record
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
get_result <- Registry.get_records(route_id, t)
res <<- get_result.result
join res[ack]
--par Peer.timeout(100000000, "timeout")
result <- Registry.merge(res)
<- result.result
-- Execute the given code on subscribers
-- Note that you can provide another Aqua function as an argument to this one
func executeOnRoute(route_id: string, ack: i16, call: Record -> ()):
subs <- resolveRoute(route_id, ack)
for r <- subs par:
on r.peer_id via r.relay_id:
call(r)