mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-25 02:02:14 +00:00
199 lines
6.3 KiB
Plaintext
199 lines
6.3 KiB
Plaintext
module Registry.Routing declares *
|
|
|
|
import "registry.aqua"
|
|
import "registry-api.aqua"
|
|
import "@fluencelabs/aqua-lib/builtin.aqua"
|
|
|
|
alias RouteId: string
|
|
|
|
func get_route_id(label: string, peer_id: string) -> RouteId:
|
|
route_id <- Registry.get_route_id(label, peer_id)
|
|
<- route_id
|
|
|
|
-- Get peers closest to the route_id's hash in Kademlia network
|
|
-- These peers are expected to store list of providers for this route
|
|
func getNeighbours(route_id: RouteId) -> []PeerId:
|
|
k <- Op.string_to_b58(route_id)
|
|
nodes <- Kademlia.neighborhood(k, nil, nil)
|
|
<- nodes
|
|
|
|
-- If this peer have set node_id as a provider for route,
|
|
-- this call will prevent provider from renew
|
|
-- so that eventually it will disappear from the providers list
|
|
func removeFromRoute(route_id: RouteId):
|
|
on HOST_PEER_ID:
|
|
t <- Peer.timestamp_sec()
|
|
Registry.clear_host_record(route_id, t)
|
|
|
|
-- Create a route: register it on the closest peers
|
|
func createRoute(label: string) -> RouteId:
|
|
t <- Peer.timestamp_sec()
|
|
signature <- get_route_signature(label, t)
|
|
|
|
on HOST_PEER_ID:
|
|
route_id <- get_route_id(label, INIT_PEER_ID)
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
result <- register_route(label, t, signature, false)
|
|
<- route_id
|
|
|
|
-- Create a route and register for it
|
|
-- INIT_PEER_ID (current client) will become a provider
|
|
func createRouteAndRegister(label: string, value: string, service_id: ?string) -> string:
|
|
relay_id: ?string
|
|
relay_id <<- HOST_PEER_ID
|
|
|
|
t <- Peer.timestamp_sec()
|
|
route_signature <- get_route_signature(label, t)
|
|
on HOST_PEER_ID:
|
|
route_id <- get_route_id(label, INIT_PEER_ID)
|
|
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
|
|
|
|
on HOST_PEER_ID:
|
|
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
register_route(label, t, route_signature, false)
|
|
put_record(route_id, value, relay_id, service_id, t, record_signature)
|
|
<- route_id
|
|
|
|
-- Create a route and register for it
|
|
-- INIT_PEER_ID (current client) will become a provider
|
|
-- In contrast with non-blocking version, waits for at least a single write to succeed
|
|
func createRouteAndRegisterBlocking(
|
|
label: string, value: string,
|
|
service_id: ?string,
|
|
progress: string -> (),
|
|
ack: i16
|
|
) -> string:
|
|
relay_id: ?string
|
|
relay_id <<- HOST_PEER_ID
|
|
|
|
t <- Peer.timestamp_sec()
|
|
route_signature <- get_route_signature(label, t)
|
|
on HOST_PEER_ID:
|
|
route_id <- get_route_id(label, INIT_PEER_ID)
|
|
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
|
|
|
|
results: *DhtResult
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
res1 <- register_route(label, t, route_signature, false)
|
|
result <- put_record(route_id, value, relay_id, service_id, t, record_signature)
|
|
if result.success:
|
|
results <<- result
|
|
progress(n)
|
|
join results[ack]
|
|
<- route_id
|
|
|
|
-- Create a route and make the given node a provider for it
|
|
func createRouteAndRegisterNode(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> RouteId:
|
|
t <- Peer.timestamp_sec()
|
|
route_signature <- get_route_signature(label, t)
|
|
on HOST_PEER_ID:
|
|
route_id <- get_route_id(label, INIT_PEER_ID)
|
|
|
|
record_signature <- get_host_record_signature(provider_node_id, route_id, value, nil, service_id, t)
|
|
|
|
on provider_node_id:
|
|
register_route(label, t, route_signature, false)
|
|
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
register_route(label, t, route_signature, false)
|
|
propagate_host_record(r)
|
|
<- route_id
|
|
|
|
func createRouteAndRegisterNodeBlocking(
|
|
provider_node_id: PeerId, label: string,
|
|
value: string, service_id: ?string,
|
|
progress: string -> (),
|
|
ack: i16
|
|
) -> RouteId:
|
|
t <- Peer.timestamp_sec()
|
|
route_signature <- get_route_signature(label, t)
|
|
on HOST_PEER_ID:
|
|
route_id <- get_route_id(label, INIT_PEER_ID)
|
|
|
|
record_signature <- get_host_record_signature(provider_node_id, route_id, value, nil, service_id, t)
|
|
|
|
results: *DhtResult
|
|
on provider_node_id:
|
|
register_route(label, t, route_signature, false)
|
|
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
register_route(label, t, route_signature, false)
|
|
results <- propagate_host_record(r)
|
|
progress(n)
|
|
|
|
join results[ack]
|
|
<- route_id
|
|
|
|
-- Register for a route
|
|
-- Note: route must be already initiated
|
|
func registerForRoute(route_id: RouteId, value: string, service_id: ?string):
|
|
relay_id: ?string
|
|
relay_id <<- HOST_PEER_ID
|
|
|
|
t <- Peer.timestamp_sec()
|
|
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
|
|
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
put_record(route_id, value, relay_id, service_id, t, record_signature)
|
|
|
|
|
|
-- Register a node to the given route
|
|
-- Note: route must be already initiated
|
|
func registerForRouteNode(provider_node_id: PeerId, route_id: RouteId, value: string, service_id: ?string):
|
|
t <- Peer.timestamp_sec()
|
|
record_signature <- get_host_record_signature(provider_node_id, route_id, value, nil, service_id, t)
|
|
|
|
on provider_node_id:
|
|
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
|
|
nodes <- getNeighbours(route_id)
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
propagate_host_record(r)
|
|
|
|
-- Find the list of record for the given route_id
|
|
func resolveRoute(route_id: RouteId, ack: i16) -> []Record:
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(route_id)
|
|
res: *[]Record
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
get_result <- Registry.get_records(route_id, t)
|
|
res <<- get_result.result
|
|
|
|
join res[ack]
|
|
--par Peer.timeout(100000000, "timeout")
|
|
result <- Registry.merge(res)
|
|
<- result.result
|
|
|
|
-- Execute the given code on providers
|
|
-- Note that you can provide another Aqua function as an argument to this one
|
|
func executeOnRoute(route_id: RouteId, ack: i16, call: Record -> ()):
|
|
providers <- resolveRoute(route_id, ack)
|
|
for r <- providers par:
|
|
on r.peer_id via r.relay_id:
|
|
call(r)
|