aqua Registry.ResourcesAPI declares * import "registry-service.aqua" import "registry-api.aqua" import "misc.aqua" import "constants.aqua" import "@fluencelabs/aqua-lib/builtin.aqua" alias ResourceId: string alias Resource: Key alias Error: string func getResource(resource_id: ResourceId) -> ?Resource, *Error: on HOST_PEER_ID: result, error <- getResourceHelper(resource_id) <- result, error func getResourceId(label: string, peer_id: string) -> ResourceId: on HOST_PEER_ID: resource_id <- Registry.get_key_id(label, peer_id) <- resource_id -- Create a resource: register it on the closest peers func createResource(label: string) -> ?ResourceId, *Error: t <- Peer.timestamp_sec() resource_id: *ResourceId error: *Error on HOST_PEER_ID: sig_result <- getKeySignature(label, t) if sig_result.success == false: error <<- sig_result.error! else: signature = sig_result.signature! id <- Registry.get_key_id(label, INIT_PEER_ID) nodes <- getNeighbors(id) successful: *bool for n <- nodes par: on n: try: res <- registerKey(label, t, signature) if res.success: successful <<- true else: error <<- res.error success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT) if success == false: error <<- "resource wasn't created: timeout exceeded" else: resource_id <<- id <- resource_id, error -- Note: resource must be already created func registerService(resource_id: ResourceId, value: string, peer_id: PeerId, service_id: ?string) -> bool, *Error: relay_id: *string if peer_id == INIT_PEER_ID: relay_id <<- HOST_PEER_ID success: *bool error: *Error on HOST_PEER_ID: metadata, err <- getRecordMetadata(resource_id, value, peer_id, relay_id, service_id, nil) if metadata == nil: success <<- false error <<- err! else: t <- Peer.timestamp_sec() sig_result = getRecordSignature(metadata!, t) if sig_result.success == false: error <<- sig_result.error! success <<- false else: key, error_get <- getResourceHelper(resource_id) if key == nil: appendErrors(error, error_get) success <<- false else: if peer_id != INIT_PEER_ID: on peer_id via HOST_PEER_ID: republish_result <- republishKey(key!) if republish_result.success == false: error <<- republish_result.error else: p_res <- putRecord(metadata!, t, sig_result.signature!) if p_res.success == false: error <<- p_res.error success <<- false nodes <- getNeighbors(resource_id) successful: *bool for n <- nodes par: on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: put_res <- putRecord(metadata!, t, sig_result.signature!) if put_res.success: successful <<- true else: error <<- put_res.error success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT) succ = success! if succ == false: error <<- "service hasn't registered: timeout exceeded" <- succ, error func unregisterService(resource_id: ResourceId, peer_id: PeerId) -> bool, *Error: success: *bool error: *Error on HOST_PEER_ID: t <- Peer.timestamp_sec() sig_result = getTombstoneSignature(resource_id, peer_id, t, nil) if sig_result.success == false: error <<- sig_result.error! success <<- false else: key, error_get <- getResourceHelper(resource_id) if key == nil: appendErrors(error, error_get) success <<- false else: if peer_id != INIT_PEER_ID: on peer_id: republish_result <- republishKey(key!) if republish_result.success == false: error <<- republish_result.error else: res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!) if res.success == false: error <<- res.error success <<- false nodes <- getNeighbors(resource_id) successful: *bool for n <- nodes par: on n: try: republish_res <- republishKey(key!) if republish_res.success == false: error <<- republish_res.error else: add_res <- addTombstone(resource_id, peer_id, t, nil, sig_result.signature!) if add_res.success: successful <<- true else: error <<- add_res.error success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT) succ = success! if succ == false: error <<- "unregisterService failed: timeout exceeded" <- succ, error func resolveResource(resource_id: ResourceId, ack: i16) -> ?[]Record, *Error: on HOST_PEER_ID: nodes <- getNeighbors(resource_id) result: *[]Record records: *[]Record error: *Error successful: *bool for n <- nodes par: on n: try: t <- Peer.timestamp_sec() get_result <- Registry.get_records(resource_id, t) if get_result.success: records <<- get_result.result successful <<- true else: error <<- get_result.error success <- wait(successful, ack, DEFAULT_TIMEOUT) if success == false: error <<- "timeout exceeded" else: merged <- Registry.merge(records) if merged.success == false: error <<- merged.error else: result <<- merged.result <- result, error -- Execute the given call on providers -- Note that you can provide another Aqua function as an argument to this one func executeOnResource(resource_id: ResourceId, ack: i16, call: Record -> ()) -> bool, *Error: success: *bool result, error <- resolveResource(resource_id, ack) if result == nil: success <<- false else: for r <- result! par: on r.metadata.peer_id via r.metadata.relay_id: call(r) success <<- true <- success!, error