registry/aqua/resources-api.aqua
2022-09-23 22:58:12 +04:00

227 lines
6.7 KiB
Plaintext

module Registry.ResourcesAPI declares *
import "registry-service.aqua"
import "registry-api.aqua"
import "misc.aqua"
import "constants.aqua"
import "@fluencelabs/aqua-lib/builtin.aqua"
alias ResourceId: string
alias Resource: Key
alias Error: string
func getResourceId(label: string, peer_id: string) -> ResourceId:
resource_id <- Registry.get_key_id(label, peer_id)
<- resource_id
func getResource(resource_id: ResourceId) -> ?Resource, *Error:
nodes <- getNeighbours(resource_id)
result: ?Resource
error: *Error
resources: *Key
successful: *bool
for n <- nodes par:
on n:
try:
get_result <- Registry.get_key_metadata(resource_id)
if get_result.success:
resources <<- get_result.key
successful <<- true
else:
e <- Op.concat_strings(get_result.error, " on ")
error <- Op.concat_strings(e, n)
success <- wait(successful, CONSISTENCY_LEVEL, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource not found: timeout exceeded"
else:
merge_result <- Registry.merge_keys(resources)
if merge_result.success:
result <<- merge_result.key
else:
error <<- merge_result.error
<- result, error
-- Create a resource: register it on the closest peers
func createResource(label: string) -> ?ResourceId, *Error:
t <- Peer.timestamp_sec()
resource_id: ?ResourceId
error: *Error
on HOST_PEER_ID:
sig_result <- getKeySignature(label, t)
if sig_result.success == false:
error <<- sig_result.error!
else:
signature = sig_result.signature!
id <- getResourceId(label, INIT_PEER_ID)
nodes <- getNeighbours(id)
successful: *bool
for n <- nodes par:
on n:
try:
res <- registerKey(label, t, signature)
if res.success:
successful <<- true
else:
error <<- res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
if success == false:
error <<- "resource wasn't created: timeout exceeded"
else:
resource_id <<- id
<- resource_id, error
-- Note: resource must be already created
func registerServiceRecord(resource_id: ResourceId, value: string, peer_id: PeerId, service_id: ?string) -> bool, *Error:
relay_id: ?string
if peer_id == INIT_PEER_ID:
relay_id <<- HOST_PEER_ID
success: *bool
error: *Error
on HOST_PEER_ID:
metadata, err <- getRecordMetadata(resource_id, value, peer_id, relay_id, service_id, nil)
if metadata == nil:
success <<- false
error <<- err!
else:
t <- Peer.timestamp_sec()
sig_result = getRecordSignature(metadata!, t)
if sig_result.success == false:
error <<- sig_result.error!
success <<- false
else:
key, error_get <- getResource(resource_id)
if key == nil:
appendErrors(error, error_get)
success <<- false
else:
if peer_id != INIT_PEER_ID:
on peer_id via HOST_PEER_ID:
republish_result <- republishKey(key!)
if republish_result.success == false:
error <<- republish_result.error
else:
p_res <- putRecord(metadata!, t, sig_result.signature!)
if p_res.success == false:
error <<- p_res.error
success <<- false
nodes <- getNeighbours(resource_id)
successful: *bool
for n <- nodes par:
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
put_res <- putRecord(metadata!, t, sig_result.signature!)
if put_res.success:
successful <<- true
else:
error <<- put_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
succ = success!
if succ == false:
error <<- "service hasn't registered: timeout exceeded"
<- succ, error
func unregisterService(resource_id: ResourceId, peer_id: PeerId) -> bool, *Error:
success: *bool
error: *Error
on HOST_PEER_ID:
t <- Peer.timestamp_sec()
sig_result = getTombstoneSignature(resource_id, peer_id, t, nil)
if sig_result.success == false:
error <<- sig_result.error!
success <<- false
else:
key, error_get <- getResource(resource_id)
if key == nil:
appendErrors(error, error_get)
success <<- false
else:
if peer_id != INIT_PEER_ID:
on peer_id:
republish_result <- republishKey(key!)
if republish_result.success == false:
error <<- republish_result.error
else:
res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!)
if res.success == false:
error <<- res.error
success <<- false
nodes <- getNeighbours(resource_id)
successful: *bool
for n <- nodes par:
on n:
try:
republish_res <- republishKey(key!)
if republish_res.success == false:
error <<- republish_res.error
else:
add_res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!)
if add_res.success:
successful <<- true
else:
error <<- add_res.error
success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT)
succ = success!
if succ == false:
error <<- "unregisterService failed: timeout exceeded"
<- succ, error
func resolveResource(resource_id: ResourceId, ack: i16) -> []Record, *Error:
on HOST_PEER_ID:
nodes <- getNeighbours(resource_id)
res: *[]Record
error: *Error
successful: *bool
for n <- nodes par:
on n:
try:
t <- Peer.timestamp_sec()
get_result <- Registry.get_records(resource_id, t)
if get_result.success:
res <<- get_result.result
successful <<- true
else:
error <<- get_result.error
success <- wait(successful, ack, DEFAULT_TIMEOUT)
if success == false:
error <<- "timeout exceeded"
result <- Registry.merge(res)
if result.success == false:
error <<- result.error
<- result.result, error
-- Execute the given call on providers
-- Note that you can provide another Aqua function as an argument to this one
func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error:
providers, error <- resolveResource(resource_id, ack)
for r <- providers par:
on r.metadata.peer_id via r.metadata.relay_id:
call(r)
<- error