mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-24 17:52:14 +00:00
342 lines
10 KiB
Plaintext
342 lines
10 KiB
Plaintext
module Registry.ResourcesAPI declares *
|
|
|
|
import "registry-service.aqua"
|
|
import "registry-api.aqua"
|
|
import "@fluencelabs/aqua-lib/builtin.aqua"
|
|
|
|
alias ResourceId: string
|
|
alias Error: string
|
|
|
|
func appendErrors(error1: *Error, error2: *Error):
|
|
for e <- error2:
|
|
error1 <<- e
|
|
|
|
func getResourceId(label: string, peer_id: string) -> ResourceId:
|
|
resource_id <- Registry.get_key_id(label, peer_id)
|
|
<- resource_id
|
|
|
|
-- Get peers closest to the resource_id's hash in Kademlia network
|
|
-- These peers are expected to store list of providers for this key
|
|
func getNeighbours(resource_id: ResourceId) -> []PeerId:
|
|
k <- Op.string_to_b58(resource_id)
|
|
nodes <- Kademlia.neighborhood(k, nil, nil)
|
|
<- nodes
|
|
|
|
func getResource(resource_id: ResourceId) -> ?Key, *Error:
|
|
nodes <- getNeighbours(resource_id)
|
|
result: ?Key
|
|
error: *Error
|
|
|
|
resources: *Key
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
get_result <- Registry.get_key_metadata(resource_id, t)
|
|
if get_result.success:
|
|
resources <<- get_result.key
|
|
else:
|
|
error <<- get_result.error
|
|
|
|
timeout: ?string
|
|
join resources[0]
|
|
par timeout <- Peer.timeout(5000, "resource not found, timeout exceeded")
|
|
|
|
if timeout != nil:
|
|
error <<- timeout!
|
|
|
|
merge_result <- Registry.merge_keys(resources)
|
|
|
|
if merge_result.success:
|
|
result <<- merge_result.key
|
|
else:
|
|
error <<- merge_result.error
|
|
|
|
<- result, error
|
|
|
|
-- If this peer have set node_id as a provider for resource,
|
|
-- this call will prevent provider from renew
|
|
-- so that eventually it will disappear from the providers list
|
|
func removeNodeFromProviders(provider_node_id: PeerId, resource_id: ResourceId):
|
|
on provider_node_id:
|
|
t <- Peer.timestamp_sec()
|
|
Registry.clear_host_record(resource_id, t)
|
|
|
|
-- Create a resource: register it on the closest peers
|
|
func createResource(label: string) -> ?ResourceId, *Error:
|
|
t <- Peer.timestamp_sec()
|
|
|
|
resource_id: ?ResourceId
|
|
error: *Error
|
|
on HOST_PEER_ID:
|
|
sig_result <- getKeySignature(label, t)
|
|
if sig_result.success == false:
|
|
error <<- sig_result.error!
|
|
else:
|
|
signature = sig_result.signature!
|
|
id <- getResourceId(label, INIT_PEER_ID)
|
|
nodes <- getNeighbours(id)
|
|
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
res <- registerKey(label, t, signature)
|
|
|
|
if res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- res.error
|
|
|
|
timeout: ?string
|
|
success: *bool
|
|
join successful[0]
|
|
par timeout <- Peer.timeout(6000, "resource hasn't created: timeout exceeded")
|
|
|
|
if timeout == nil:
|
|
resource_id <<- id
|
|
else:
|
|
error <<- timeout!
|
|
|
|
<- resource_id, error
|
|
|
|
-- Create a resource and register as provider
|
|
-- INIT_PEER_ID (current client) will become a provider
|
|
func createResourceAndRegisterProvider(label: string, value: string, service_id: ?string) -> ?ResourceId, *Error:
|
|
resource_id: ?ResourceId
|
|
error: *Error
|
|
|
|
relay_id: ?string
|
|
relay_id <<- HOST_PEER_ID
|
|
|
|
t <- Peer.timestamp_sec()
|
|
on HOST_PEER_ID:
|
|
key_sig_result <- getKeySignature(label, t)
|
|
|
|
if key_sig_result.success == false:
|
|
error <<- key_sig_result.error!
|
|
else:
|
|
id <- getResourceId(label, INIT_PEER_ID)
|
|
record_sig_result <- getRecordSignature(id, value, relay_id, service_id, t)
|
|
if record_sig_result.success == false:
|
|
error <<- record_sig_result.error!
|
|
else:
|
|
key_signature = key_sig_result.signature!
|
|
record_signature = record_sig_result.signature!
|
|
nodes <- getNeighbours(id)
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
reg_res <- registerKey(label, t, key_signature)
|
|
if reg_res.success:
|
|
put_res <- putRecord(id, value, relay_id, service_id, t, record_signature)
|
|
if put_res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- put_res.error
|
|
else:
|
|
error <<- reg_res.error
|
|
|
|
timeout: ?string
|
|
join successful[0]
|
|
par timeout <- Peer.timeout(5000, "resource hasn't created: timeout exceeded")
|
|
|
|
if timeout == nil:
|
|
resource_id <<- id
|
|
else:
|
|
error <<- timeout!
|
|
|
|
<- resource_id, error
|
|
|
|
-- Create a resource and make the given node a provider for it
|
|
func createResourceAndRegisterNodeProvider(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> ?ResourceId, *Error:
|
|
resource_id: ?ResourceId
|
|
error: *Error
|
|
|
|
t <- Peer.timestamp_sec()
|
|
|
|
on provider_node_id:
|
|
key_sig_result <- getKeySignature(label, t)
|
|
|
|
if key_sig_result.success == false:
|
|
error <<- key_sig_result.error!
|
|
else:
|
|
id <- getResourceId(label, INIT_PEER_ID)
|
|
record_sig_result <- getHostRecordSignature(id, value, nil, service_id, t)
|
|
if record_sig_result.success == false:
|
|
error <<- record_sig_result.error!
|
|
else:
|
|
key_signature = key_sig_result.signature!
|
|
record_signature = record_sig_result.signature!
|
|
reg_res1 <- registerKey(label, t, key_signature)
|
|
if reg_res1.success == false:
|
|
error <<- reg_res1.error
|
|
else:
|
|
r <- putHostRecord(id, value, nil, service_id, t, record_signature)
|
|
nodes <- getNeighbours(id)
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
reg_res <- registerKey(label, t, key_signature)
|
|
if reg_res.success:
|
|
prop_res <- propagateHostRecord(r)
|
|
if prop_res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- prop_res.error
|
|
else:
|
|
error <<- reg_res.error
|
|
|
|
timeout: ?string
|
|
join successful[0]
|
|
par timeout <- Peer.timeout(5000, "resource hasn't created: timeout exceeded")
|
|
|
|
if timeout == nil:
|
|
resource_id <<- id
|
|
else:
|
|
error <<- timeout!
|
|
|
|
<- resource_id, error
|
|
|
|
-- Register for a resource as provider
|
|
-- Note: resource must be already created
|
|
func registerProvider(resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error:
|
|
success: *bool
|
|
error: *Error
|
|
relay_id: ?string
|
|
relay_id <<- HOST_PEER_ID
|
|
|
|
t <- Peer.timestamp_sec()
|
|
|
|
on HOST_PEER_ID:
|
|
record_sig_result <- getRecordSignature(resource_id, value, relay_id, service_id, t)
|
|
|
|
if record_sig_result.success == false:
|
|
error <<- record_sig_result.error!
|
|
success <<- false
|
|
else:
|
|
record_signature = record_sig_result.signature!
|
|
key, error_get <- getResource(resource_id)
|
|
if key == nil:
|
|
appendErrors(error, error_get)
|
|
success <<- false
|
|
else:
|
|
nodes <- getNeighbours(resource_id)
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
error <<- n
|
|
on n:
|
|
try:
|
|
republish_res <- republishKey(key!)
|
|
if republish_res.success == false:
|
|
error <<- republish_res.error
|
|
else:
|
|
put_res <- putRecord(resource_id, value, relay_id, service_id, t, record_signature)
|
|
if put_res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- put_res.error
|
|
|
|
timeout: ?string
|
|
join successful[0]
|
|
par timeout <- Peer.timeout(5000, "provider hasn't registered: timeout exceeded")
|
|
|
|
if timeout == nil:
|
|
success <<- true
|
|
else:
|
|
success <<- false
|
|
error <<- timeout!
|
|
|
|
<- success!, error
|
|
|
|
-- Register a node as provider to the given resource
|
|
-- Note: resource must be already created
|
|
func registerNodeProvider(provider_node_id: PeerId, resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error:
|
|
success: *bool
|
|
error: *Error
|
|
t <- Peer.timestamp_sec()
|
|
|
|
on provider_node_id:
|
|
record_sig_result <- getHostRecordSignature(resource_id, value, nil, service_id, t)
|
|
if record_sig_result.success == false:
|
|
error <<- record_sig_result.error!
|
|
success <<- false
|
|
else:
|
|
record_signature = record_sig_result.signature!
|
|
key, error_get <- getResource(resource_id)
|
|
if key == nil:
|
|
appendErrors(error, error_get)
|
|
success <<- false
|
|
else:
|
|
republish_result <- republishKey(key!)
|
|
if republish_result.success == false:
|
|
error <<- republish_result.error
|
|
success <<- false
|
|
else:
|
|
r <- putHostRecord(resource_id, value, nil, service_id, t, record_signature)
|
|
nodes <- getNeighbours(resource_id)
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
republish_res <- republishKey(key!)
|
|
if republish_res.success == false:
|
|
error <<- republish_res.error
|
|
else:
|
|
prop_res <- propagateHostRecord(r)
|
|
if prop_res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- prop_res.error
|
|
|
|
timeout: ?string
|
|
join successful[0]
|
|
par timeout <- Peer.timeout(5000, "provider hasn't registered: timeout exceeded")
|
|
|
|
if timeout == nil:
|
|
success <<- true
|
|
else:
|
|
success <<- false
|
|
error <<- timeout!
|
|
|
|
<- success!, error
|
|
|
|
-- Find the list of providers' records for the given resource_id
|
|
func resolveProviders(resource_id: ResourceId, ack: i16) -> []Record, *Error:
|
|
on HOST_PEER_ID:
|
|
nodes <- getNeighbours(resource_id)
|
|
res: *[]Record
|
|
error: *Error
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
get_result <- Registry.get_records(resource_id, t)
|
|
if get_result.success:
|
|
res <<- get_result.result
|
|
else:
|
|
error <<- get_result.error
|
|
|
|
timeout: ?string
|
|
join res[ack - 1]
|
|
par error <- Peer.timeout(5000, "timeout exceeded")
|
|
|
|
if timeout != nil:
|
|
error <<- timeout!
|
|
|
|
result <- Registry.merge(res)
|
|
if result.success == false:
|
|
error <<- result.error
|
|
<- result.result, error
|
|
|
|
-- Execute the given call on providers
|
|
-- Note that you can provide another Aqua function as an argument to this one
|
|
func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error:
|
|
providers, error <- resolveProviders(resource_id, ack)
|
|
for r <- providers par:
|
|
on r.peer_id via r.relay_id:
|
|
call(r)
|
|
<- error
|