registry/aqua/resources-api.aqua
2023-12-26 14:10:14 +01:00

211 lines
6.3 KiB
Plaintext

aqua Registry.ResourcesAPI declares *
import "registry-service.aqua"
import "registry-api.aqua"
import "misc.aqua"
import "constants.aqua"
import "@fluencelabs/aqua-lib/builtin.aqua"
alias ResourceId: string
alias Resource: Key
alias Error: string
func getResource(resource_id: ResourceId) -> ?Resource, *Error:
on HOST_PEER_ID:
result, error <- getResourceHelper(resource_id)
<- result, error
func getResourceId(label: string, peer_id: string) -> ResourceId:
on HOST_PEER_ID:
resource_id <- Registry.get_key_id(label, peer_id)
<- resource_id
-- Create a resource: register it on the closest peers
func createResource(label: string) -> ?ResourceId, *Error:
t <- Peer.timestamp_sec()
resource_id: *ResourceId
error: *Error
on HOST_PEER_ID:
sig_result <- getKeySignature(label, t)
if sig_result.success == false:
error <<- sig_result.error!
else:
signature = sig_result.signature!
id <- Registry.get_key_id(label, INIT_PEER_ID)
nodes <- getNeighbors(id)
successful: *bool
for n <- nodes par:
on n:
try:
res <- registerKey(label, t, signature)
if res.success:
successful <<- true
else:
error <<- res.error
success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource wasn't created: timeout exceeded"
else:
resource_id <<- id
<- resource_id, error
-- Note: resource must be already created
func registerService(resource_id: ResourceId, value: string, peer_id: PeerId, service_id: ?string) -> bool, *Error:
relay_id: *string
if peer_id == INIT_PEER_ID:
relay_id <<- HOST_PEER_ID
success: *bool
error: *Error
on HOST_PEER_ID:
metadata, err <- getRecordMetadata(resource_id, value, peer_id, relay_id, service_id, nil)
if metadata == nil:
success <<- false
error <<- err!
else:
t <- Peer.timestamp_sec()
sig_result = getRecordSignature(metadata!, t)
if sig_result.success == false:
error <<- sig_result.error!
success <<- false
else:
key, error_get <- getResourceHelper(resource_id)
if key == nil:
appendErrors(error, error_get)
success <<- false
else:
if peer_id != INIT_PEER_ID:
on peer_id via HOST_PEER_ID:
republish_result <- republishKey(key!)
if republish_result.success == false:
error <<- republish_result.error
else:
p_res <- putRecord(metadata!, t, sig_result.signature!)
if p_res.success == false:
error <<- p_res.error
success <<- false
nodes <- getNeighbors(resource_id)
successful: *bool
for n <- nodes par:
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
put_res <- putRecord(metadata!, t, sig_result.signature!)
if put_res.success:
successful <<- true
else:
error <<- put_res.error
success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT)
succ = success!
if succ == false:
error <<- "service hasn't registered: timeout exceeded"
<- succ, error
func unregisterService(resource_id: ResourceId, peer_id: PeerId) -> bool, *Error:
success: *bool
error: *Error
on HOST_PEER_ID:
t <- Peer.timestamp_sec()
sig_result = getTombstoneSignature(resource_id, peer_id, t, nil)
if sig_result.success == false:
error <<- sig_result.error!
success <<- false
else:
key, error_get <- getResourceHelper(resource_id)
if key == nil:
appendErrors(error, error_get)
success <<- false
else:
if peer_id != INIT_PEER_ID:
on peer_id:
republish_result <- republishKey(key!)
if republish_result.success == false:
error <<- republish_result.error
else:
res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!)
if res.success == false:
error <<- res.error
success <<- false
nodes <- getNeighbors(resource_id)
successful: *bool
for n <- nodes par:
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
add_res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!)
if add_res.success:
successful <<- true
else:
error <<- add_res.error
success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT)
succ = success!
if succ == false:
error <<- "unregisterService failed: timeout exceeded"
<- succ, error
func resolveResource(resource_id: ResourceId, ack: i16) -> ?[]Record, *Error:
on HOST_PEER_ID:
nodes <- getNeighbors(resource_id)
result: *[]Record
records: *[]Record
error: *Error
successful: *bool
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
get_result <- Registry.get_records(resource_id, t)
if get_result.success:
records <<- get_result.result
successful <<- true
else:
error <<- get_result.error
success <- wait(successful, ack, DEFAULT_TIMEOUT)
if success == false:
error <<- "timeout exceeded"
else:
merged <- Registry.merge(records)
if merged.success == false:
error <<- merged.error
else:
result <<- merged.result
<- result, error
-- Execute the given call on providers
-- Note that you can provide another Aqua function as an argument to this one
func executeOnResource(resource_id: ResourceId, ack: i16, call: Record -> ()) -> bool, *Error:
success: *bool
result, error <- resolveResource(resource_id, ack)
if result == nil:
success <<- false
else:
for r <- result! par:
on r.metadata.peer_id via r.metadata.relay_id:
call(r)
success <<- true
<- success!, error