module Registry.ResourcesAPI declares * import "registry-service.aqua" import "registry-api.aqua" import "@fluencelabs/aqua-lib/builtin.aqua" alias ResourceId: string alias Error: string const REPLICATION_FACTOR = 1 const CONSISTENCY_LEVEL = 1 const DEFAULT_TIMEOUT = 6000 func wait(successful: *bool, len: i16, timeout: u16) -> bool: status: *string waiting = (arr: *bool, s: *string): join arr[len - 1] s <<- "ok" waiting(successful, status) par status <- Peer.timeout(timeout, "timeout") result: *bool if status! == "ok": result <<- true else: result <<- false <- result! func appendErrors(error1: *Error, error2: *Error): for e <- error2: error1 <<- e func getResourceId(label: string, peer_id: string) -> ResourceId: resource_id <- Registry.get_key_id(label, peer_id) <- resource_id -- Get peers closest to the resource_id's hash in Kademlia network -- These peers are expected to store list of providers for this key func getNeighbours(resource_id: ResourceId) -> []PeerId: k <- Op.string_to_b58(resource_id) nodes <- Kademlia.neighborhood(k, nil, nil) <- nodes func getResource(resource_id: ResourceId) -> ?Key, *Error: nodes <- getNeighbours(resource_id) result: ?Key error: *Error resources: *Key successful: *bool for n <- nodes par: on n: try: t <- Peer.timestamp_sec() get_result <- Registry.get_key_metadata(resource_id, t) if get_result.success: resources <<- get_result.key successful <<- true else: e <- Op.concat_strings(get_result.error, " on ") error <- Op.concat_strings(e, n) success <- wait(successful, CONSISTENCY_LEVEL, DEFAULT_TIMEOUT) if success == false: error <<- "resource not found: timeout exceeded" else: merge_result <- Registry.merge_keys(resources) if merge_result.success: result <<- merge_result.key else: error <<- merge_result.error <- result, error -- If this peer have set node_id as a provider for resource, -- this call will prevent provider from renew -- so that eventually it will disappear from the providers list func removeNodeFromProviders(provider_node_id: PeerId, resource_id: ResourceId): on provider_node_id: t <- Peer.timestamp_sec() Registry.clear_host_record(resource_id, t) -- Create a resource: register it on the closest peers func createResource(label: string) -> ?ResourceId, *Error: t <- Peer.timestamp_sec() resource_id: ?ResourceId error: *Error on HOST_PEER_ID: sig_result <- getKeySignature(label, t) if sig_result.success == false: error <<- sig_result.error! else: signature = sig_result.signature! id <- getResourceId(label, INIT_PEER_ID) nodes <- getNeighbours(id) successful: *bool for n <- nodes par: on n: try: res <- registerKey(label, t, signature) if res.success: successful <<- true else: error <<- res.error success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success == false: error <<- "resource wasn't created: timeout exceeded" else: resource_id <<- id <- resource_id, error -- Create a resource and register as provider -- INIT_PEER_ID (current client) will become a provider func createResourceAndRegisterProvider(label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: resource_id: ?ResourceId error: *Error relay_id: ?string relay_id <<- HOST_PEER_ID t <- Peer.timestamp_sec() on HOST_PEER_ID: key_sig_result <- getKeySignature(label, t) if key_sig_result.success == false: error <<- key_sig_result.error! else: id <- getResourceId(label, INIT_PEER_ID) record_sig_result <- getRecordSignature(id, value, relay_id, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: key_signature = key_sig_result.signature! record_signature = record_sig_result.signature! nodes <- getNeighbours(id) successful: *bool for n <- nodes par: on n: try: reg_res <- registerKey(label, t, key_signature) if reg_res.success: put_res <- putRecord(id, value, relay_id, service_id, t, record_signature) if put_res.success: successful <<- true else: error <<- put_res.error else: error <<- reg_res.error success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success == false: error <<- "resource hasn't created: timeout exceeded" else: resource_id <<- id <- resource_id, error -- Create a resource and make the given node a provider for it func createResourceAndRegisterNodeProvider(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: resource_id: ?ResourceId error: *Error t <- Peer.timestamp_sec() on provider_node_id: key_sig_result <- getKeySignature(label, t) if key_sig_result.success == false: error <<- key_sig_result.error! else: id <- getResourceId(label, INIT_PEER_ID) record_sig_result <- getHostRecordSignature(id, value, nil, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! else: key_signature = key_sig_result.signature! record_signature = record_sig_result.signature! reg_res1 <- registerKey(label, t, key_signature) if reg_res1.success == false: error <<- reg_res1.error else: r <- putHostRecord(id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(id) successful: *bool for n <- nodes par: on n: try: reg_res <- registerKey(label, t, key_signature) if reg_res.success: prop_res <- propagateHostRecord(r) if prop_res.success: successful <<- true else: error <<- prop_res.error else: error <<- reg_res.error success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success == false: error <<- "resource hasn't created: timeout exceeded" else: resource_id <<- id <- resource_id, error -- Register for a resource as provider -- Note: resource must be already created func registerProvider(resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: success: *bool error: *Error relay_id: ?string relay_id <<- HOST_PEER_ID t <- Peer.timestamp_sec() on HOST_PEER_ID: record_sig_result <- getRecordSignature(resource_id, value, relay_id, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! success <<- false else: record_signature = record_sig_result.signature! key, error_get <- getResource(resource_id) if key == nil: appendErrors(error, error_get) success <<- false else: nodes <- getNeighbours(resource_id) successful: *bool for n <- nodes par: error <<- n on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: put_res <- putRecord(resource_id, value, relay_id, service_id, t, record_signature) if put_res.success: successful <<- true else: error <<- put_res.error success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success! == false: error <<- "provider hasn't registered: timeout exceeded" <- success!, error -- Register a node as provider to the given resource -- Note: resource must be already created func registerNodeProvider(provider_node_id: PeerId, resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: success: *bool error: *Error t <- Peer.timestamp_sec() on provider_node_id: record_sig_result <- getHostRecordSignature(resource_id, value, nil, service_id, t) if record_sig_result.success == false: error <<- record_sig_result.error! success <<- false else: record_signature = record_sig_result.signature! key, error_get <- getResource(resource_id) if key == nil: appendErrors(error, error_get) success <<- false else: republish_result <- republishKey(key!) if republish_result.success == false: error <<- republish_result.error else: r <- putHostRecord(resource_id, value, nil, service_id, t, record_signature) if r.success == false: error <<- r.error success <<- false else: on HOST_PEER_ID: nodes <- getNeighbours(resource_id) successful: *bool for n <- nodes par: on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: prop_res <- propagateHostRecord(r) if prop_res.success: successful <<- true else: error <<- prop_res.error success <- wait(successful, REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success! == false: error <<- "provider hasn't registered: timeout exceeded" <- success!, error -- Find the list of providers' records for the given resource_id func resolveProviders(resource_id: ResourceId, ack: i16) -> []Record, *Error: on HOST_PEER_ID: nodes <- getNeighbours(resource_id) res: *[]Record error: *Error successful: *bool for n <- nodes par: on n: try: t <- Peer.timestamp_sec() get_result <- Registry.get_records(resource_id, t) if get_result.success: res <<- get_result.result successful <<- true else: error <<- get_result.error success <- wait(successful, ack, DEFAULT_TIMEOUT) if success == false: error <<- "timeout exceeded" result <- Registry.merge(res) if result.success == false: error <<- result.error <- result.result, error -- Execute the given call on providers -- Note that you can provide another Aqua function as an argument to this one func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error: providers, error <- resolveProviders(resource_id, ack) for r <- providers par: on r.peer_id via r.relay_id: call(r) <- error