registry/aqua/resources-api.aqua

359 lines
11 KiB
Plaintext
Raw Normal View History

module Registry.ResourcesAPI declares *
import "registry-service.aqua"
import "registry-api.aqua"
import "@fluencelabs/aqua-lib/builtin.aqua"
alias ResourceId: string
alias Error: string
const REPLICATION_FACTOR = 1
const CONSISTENCY_LEVEL = 1
const DEFAULT_TIMEOUT = 6000
service HackOp("op"):
identity(a: string) -> string
service BoolOp("op"):
identity(a: bool) -> bool
func wait(successful: *bool, len: i16, timeout: u16) -> bool:
status: *string
waiting = (arr: *bool, s: *string):
join arr[len - 1]
s <<- "ok"
waiting(successful, status)
par status <- Peer.timeout(timeout, "timeout")
result: *bool
-- HACK: workaround for #LNG-63
stat <- HackOp.identity(status!)
if stat == "ok":
result <<- true
else:
result <<- false
<- result!
func appendErrors(error1: *Error, error2: *Error):
for e <- error2:
error1 <<- e
func getResourceId(label: string, peer_id: string) -> ResourceId:
resource_id <- Registry.get_key_id(label, peer_id)
<- resource_id
-- Get peers closest to the resource_id's hash in Kademlia network
-- These peers are expected to store list of providers for this key
func getNeighbours(resource_id: ResourceId) -> []PeerId:
k <- Op.string_to_b58(resource_id)
nodes <- Kademlia.neighborhood(k, nil, nil)
<- nodes
func getResource(resource_id: ResourceId) -> ?Key, *Error:
nodes <- getNeighbours(resource_id)
2022-04-13 01:11:26 +04:00
result: ?Key
error: *Error
2022-04-13 01:11:26 +04:00
resources: *Key
successful: *bool
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
get_result <- Registry.get_key_metadata(resource_id, t)
if get_result.success:
resources <<- get_result.key
successful <<- true
else:
e <- Op.concat_strings(get_result.error, " on ")
error <- Op.concat_strings(e, n)
success <- wait(successful, CONSISTENCY_LEVEL, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource not found: timeout exceeded"
else:
merge_result <- Registry.merge_keys(resources)
if merge_result.success:
result <<- merge_result.key
else:
error <<- merge_result.error
2022-04-13 01:11:26 +04:00
<- result, error
-- If this peer have set node_id as a provider for resource,
-- this call will prevent provider from renew
-- so that eventually it will disappear from the providers list
2022-04-13 01:11:26 +04:00
func removeNodeFromProviders(provider_node_id: PeerId, resource_id: ResourceId):
on provider_node_id:
t <- Peer.timestamp_sec()
Registry.clear_host_record(resource_id, t)
-- Create a resource: register it on the closest peers
func createResource(label: string) -> ?ResourceId, *Error:
t <- Peer.timestamp_sec()
resource_id: ?ResourceId
error: *Error
on HOST_PEER_ID:
sig_result <- getKeySignature(label, t)
2022-04-13 01:11:26 +04:00
if sig_result.success == false:
error <<- sig_result.error!
else:
signature = sig_result.signature!
id <- getResourceId(label, INIT_PEER_ID)
nodes <- getNeighbours(id)
2022-04-13 01:11:26 +04:00
successful: *bool
for n <- nodes par:
on n:
try:
res <- registerKey(label, t, signature)
2022-04-13 01:11:26 +04:00
if res.success:
successful <<- true
else:
error <<- res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource wasn't created: timeout exceeded"
2022-04-13 01:11:26 +04:00
else:
resource_id <<- id
<- resource_id, error
-- Create a resource and register as provider
-- INIT_PEER_ID (current client) will become a provider
func createResourceAndRegisterProvider(label: string, value: string, service_id: ?string) -> ?ResourceId, *Error:
resource_id: ?ResourceId
error: *Error
relay_id: ?string
relay_id <<- HOST_PEER_ID
t <- Peer.timestamp_sec()
on HOST_PEER_ID:
key_sig_result <- getKeySignature(label, t)
if key_sig_result.success == false:
error <<- key_sig_result.error!
else:
id <- getResourceId(label, INIT_PEER_ID)
record_sig_result <- getRecordSignature(id, value, relay_id, service_id, t)
if record_sig_result.success == false:
error <<- record_sig_result.error!
else:
key_signature = key_sig_result.signature!
record_signature = record_sig_result.signature!
nodes <- getNeighbours(id)
2022-04-13 01:11:26 +04:00
successful: *bool
for n <- nodes par:
on n:
try:
reg_res <- registerKey(label, t, key_signature)
if reg_res.success:
put_res <- putRecord(id, value, relay_id, service_id, t, record_signature)
if put_res.success:
successful <<- true
else:
error <<- put_res.error
else:
error <<- reg_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource hasn't created: timeout exceeded"
2022-04-13 01:11:26 +04:00
else:
resource_id <<- id
<- resource_id, error
-- Create a resource and make the given node a provider for it
func createResourceAndRegisterNodeProvider(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> ?ResourceId, *Error:
resource_id: ?ResourceId
error: *Error
successful: *bool
t <- Peer.timestamp_sec()
on provider_node_id:
key_sig_result <- getKeySignature(label, t)
if key_sig_result.success == false:
error <<- key_sig_result.error!
else:
id <- getResourceId(label, INIT_PEER_ID)
record_sig_result <- getHostRecordSignature(id, value, nil, service_id, t)
if record_sig_result.success == false:
error <<- record_sig_result.error!
else:
key_signature = key_sig_result.signature!
record_signature = record_sig_result.signature!
reg_res1 <- registerKey(label, t, key_signature)
if reg_res1.success == false:
error <<- reg_res1.error
else:
r <- putHostRecord(id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(id)
for n <- nodes par:
on n:
try:
reg_res <- registerKey(label, t, key_signature)
if reg_res.success:
prop_res <- propagateHostRecord(r)
if prop_res.success:
successful <<- true
else:
error <<- prop_res.error
else:
error <<- reg_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource hasn't created: timeout exceeded"
2022-04-13 01:11:26 +04:00
else:
resource_id <<- id
<- resource_id, error
-- Register for a resource as provider
-- Note: resource must be already created
func registerProvider(resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error:
2022-04-13 01:11:26 +04:00
success: *bool
error: *Error
relay_id: ?string
relay_id <<- HOST_PEER_ID
t <- Peer.timestamp_sec()
on HOST_PEER_ID:
record_sig_result <- getRecordSignature(resource_id, value, relay_id, service_id, t)
if record_sig_result.success == false:
error <<- record_sig_result.error!
2022-04-13 01:11:26 +04:00
success <<- false
else:
record_signature = record_sig_result.signature!
key, error_get <- getResource(resource_id)
2022-04-13 01:11:26 +04:00
if key == nil:
appendErrors(error, error_get)
success <<- false
else:
nodes <- getNeighbours(resource_id)
2022-04-13 01:11:26 +04:00
successful: *bool
for n <- nodes par:
2022-04-13 01:11:26 +04:00
error <<- n
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
put_res <- putRecord(resource_id, value, relay_id, service_id, t, record_signature)
if put_res.success:
successful <<- true
else:
error <<- put_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
-- HACK: workaround for #LNG-63
succ = BoolOp.identity(success!)
if succ == false:
error <<- "provider hasn't registered: timeout exceeded"
<- success!, error
-- Register a node as provider to the given resource
-- Note: resource must be already created
func registerNodeProvider(provider_node_id: PeerId, resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error:
2022-04-13 01:11:26 +04:00
success: *bool
error: *Error
t <- Peer.timestamp_sec()
on provider_node_id:
record_sig_result <- getHostRecordSignature(resource_id, value, nil, service_id, t)
if record_sig_result.success == false:
error <<- record_sig_result.error!
2022-04-13 01:11:26 +04:00
success <<- false
else:
record_signature = record_sig_result.signature!
on HOST_PEER_ID:
key, error_get <- getResource(resource_id)
if key == nil:
appendErrors(error, error_get)
2022-04-13 01:11:26 +04:00
success <<- false
else:
republish_result <- republishKey(key!)
if republish_result.success == false:
error <<- republish_result.error
else:
r <- putHostRecord(resource_id, value, nil, service_id, t, record_signature)
if r.success == false:
error <<- r.error
2022-04-13 01:11:26 +04:00
success <<- false
else:
nodes <- getNeighbours(resource_id)
successful: *bool
for n <- nodes par:
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
prop_res <- propagateHostRecord(r)
if prop_res.success:
successful <<- true
else:
error <<- prop_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
-- HACK: workaround for #LNG-63
succ = BoolOp.identity(success!)
if succ == false:
error <<- "provider hasn't registered: timeout exceeded"
<- success!, error
-- Find the list of providers' records for the given resource_id
func resolveProviders(resource_id: ResourceId, ack: i16) -> []Record, *Error:
on HOST_PEER_ID:
nodes <- getNeighbours(resource_id)
res: *[]Record
error: *Error
successful: *bool
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
get_result <- Registry.get_records(resource_id, t)
if get_result.success:
res <<- get_result.result
successful <<- true
else:
error <<- get_result.error
success <- wait(successful, ack, DEFAULT_TIMEOUT)
if success == false:
error <<- "timeout exceeded"
2022-04-13 01:11:26 +04:00
result <- Registry.merge(res)
if result.success == false:
error <<- result.error
<- result.result, error
-- Execute the given call on providers
-- Note that you can provide another Aqua function as an argument to this one
func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error:
providers, error <- resolveProviders(resource_id, ack)
for r <- providers par:
on r.peer_id via r.relay_id:
call(r)
<- error