module Registry.ResourcesAPI declares * import "registry-service.aqua" import "registry-api.aqua" import "@fluencelabs/aqua-lib/builtin.aqua" alias ResourceId: string alias Error: string func appendErrors(error1: *Error, error2: *Error): for e <- error2: error1 <<- e func getResourceId(label: string, peer_id: string) -> ResourceId: resource_id <- Registry.get_key_id(label, peer_id) <- resource_id -- Get peers closest to the resource_id's hash in Kademlia network -- These peers are expected to store list of providers for this key func getNeighbours(resource_id: ResourceId) -> []PeerId: k <- Op.string_to_b58(resource_id) nodes <- Kademlia.neighborhood(k, nil, nil) <- nodes func getResource(resource_id: ResourceId) -> ?Key, *Error: nodes <- getNeighbours(resource_id) resources: *Key error: *Error for n <- nodes par: on n: try: t <- Peer.timestamp_sec() get_result <- Registry.get_key_metadata(resource_id, t) if get_result.success: resources <<- get_result.key else: error <<- get_result.error join resources[0] par error <- Peer.timeout(10000, "resource not found, timeout exceeded") merge_result <- Registry.merge_keys(resources) resource: ?Key if merge_result.success: resource <<- merge_result.key else: error <<- merge_result.error <- resource, error -- If this peer have set node_id as a provider for resource, -- this call will prevent provider from renew -- so that eventually it will disappear from the providers list func removeNodeFromProviders(resource_id: ResourceId): on HOST_PEER_ID: t <- Peer.timestamp_sec() Registry.clear_host_record(resource_id, t) -- Create a resource: register it on the closest peers func createResource(label: string) -> ?ResourceId, *Error: t <- Peer.timestamp_sec() resource_id: ?ResourceId error: *Error successful: *bool on HOST_PEER_ID: sig_result <- getKeySignature(label, t) if sig_result.success: signature = sig_result.signature! id <- getResourceId(label, INIT_PEER_ID) nodes <- getNeighbours(id) for n <- nodes par: on n: try: res <- registerKey(label, t, signature) if res.success: successful <<- true else: error <<- res.error par Peer.timeout(10000, "timeout exceeded") else: error <<- sig_result.error! timeout: ?string join successful[0] par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") if timeout == nil: resource_id <<- id else: error <<- timeout! <- resource_id, error -- Create a resource and register as provider -- INIT_PEER_ID (current client) will become a provider func createResourceAndRegisterProvider(label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: resource_id: ?ResourceId error: *Error relay_id: ?string relay_id <<- HOST_PEER_ID t <- Peer.timestamp_sec() successful: *bool on HOST_PEER_ID: key_sig_result <- getKeySignature(label, t) if key_sig_result.success == false: error <<- key_sig_result.error! else: id <- getResourceId(label, INIT_PEER_ID) record_sig_result <- getRecordSignature(id, value, relay_id, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: key_signature = key_sig_result.signature! record_signature = record_sig_result.signature! nodes <- getNeighbours(id) for n <- nodes par: on n: try: reg_res <- registerKey(label, t, key_signature) if reg_res.success: put_res <- putRecord(id, value, relay_id, service_id, t, record_signature) if put_res.success: successful <<- true else: error <<- put_res.error else: error <<- reg_res.error par Peer.timeout(10000, "timeout exceeded") timeout: ?string join successful[0] par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") if timeout == nil: resource_id <<- id else: error <<- timeout! <- resource_id, error -- Create a resource and make the given node a provider for it func createResourceAndRegisterNodeProvider(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: resource_id: ?ResourceId error: *Error t <- Peer.timestamp_sec() successful: *bool on provider_node_id: key_sig_result <- getKeySignature(label, t) if key_sig_result.success == false: error <<- key_sig_result.error! else: id <- getResourceId(label, INIT_PEER_ID) record_sig_result <- getHostRecordSignature(id, value, nil, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: key_signature = key_sig_result.signature! record_signature = record_sig_result.signature! reg_res1 <- registerKey(label, t, key_signature) if reg_res1.success == false: error <<- reg_res1.error else: r <- putHostRecord(id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(id) for n <- nodes par: on n: try: reg_res <- registerKey(label, t, key_signature) if reg_res.success: prop_res <- propagateHostRecord(r) if prop_res.success: successful <<- true else: error <<- prop_res.error else: error <<- reg_res.error par Peer.timeout(10000, "timeout exceeded") timeout: ?string join successful[0] par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") if timeout == nil: resource_id <<- id else: error <<- timeout! <- resource_id, error -- Register for a resource as provider -- Note: resource must be already created func registerProvider(resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: error: *Error relay_id: ?string relay_id <<- HOST_PEER_ID t <- Peer.timestamp_sec() on HOST_PEER_ID: record_sig_result <- getRecordSignature(resource_id, value, relay_id, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: record_signature = record_sig_result.signature! key, error_get <- getResource(resource_id) appendErrors(error, error_get) successful: *bool if key != nil: nodes <- getNeighbours(resource_id) for n <- nodes par: on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: put_res <- putRecord(resource_id, value, relay_id, service_id, t, record_signature) if put_res.success: successful <<- true else: error <<- put_res.error par Peer.timeout(10000, "timeout exceeded") timeout: ?string join successful[0] par timeout <- Peer.timeout(10000, "provider hasn't registered: timeout exceeded") success: *bool if timeout == nil: success <<- true else: success <<- false error <<- timeout! <- success!, error -- Register a node as provider to the given resource -- Note: resource must be already created func registerNodeProvider(provider_node_id: PeerId, resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: error: *Error t <- Peer.timestamp_sec() successful: *bool on provider_node_id: record_sig_result <- getHostRecordSignature(resource_id, value, nil, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: record_signature = record_sig_result.signature! key, error_get <- getResource(resource_id) if key == nil: appendErrors(error, error_get) else: republish_result <- republishKey(key!) if republish_result.success == false: error <<- republish_result.error else: r <- putHostRecord(resource_id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(resource_id) for n <- nodes par: on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: prop_res <- propagateHostRecord(r) if prop_res.success: successful <<- true else: error <<- prop_res.error par Peer.timeout(10000, "timeout exceeded") timeout: ?string join successful[0] par timeout <- Peer.timeout(10000, "provider hasn't registered: timeout exceeded") success: *bool if timeout == nil: success <<- true else: success <<- false error <<- timeout! <- success!, error -- Find the list of providers' records for the given resource_id func resolveProviders(resource_id: ResourceId, ack: i16) -> []Record, *Error: on HOST_PEER_ID: nodes <- getNeighbours(resource_id) res: *[]Record error: *Error for n <- nodes par: on n: try: t <- Peer.timestamp_sec() get_result <- Registry.get_records(resource_id, t) if get_result.success: res <<- get_result.result else: error <<- get_result.error par Peer.timeout(10000, "timeout exceeded") join res[ack] par Peer.timeout(10000, "") result <- Registry.merge(res) if result.success == false: error <<- result.error <- result.result, error -- Execute the given call on providers -- Note that you can provide another Aqua function as an argument to this one func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error: providers, error <- resolveProviders(resource_id, ack) for r <- providers par: on r.peer_id via r.relay_id: call(r) <- error