From 8d492113f17ec7add582f7f2d9575fc48b5325dc Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Mon, 20 Feb 2023 23:44:51 +0700 Subject: [PATCH] feat(deals): register and resolve workers (#197) --- aqua-tests/aqua/test.aqua | 2 +- aqua/package-lock.json | 2 +- aqua/package.json | 2 +- aqua/subnetwork.aqua | 180 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 aqua/subnetwork.aqua diff --git a/aqua-tests/aqua/test.aqua b/aqua-tests/aqua/test.aqua index 982b670..126ad9e 100644 --- a/aqua-tests/aqua/test.aqua +++ b/aqua-tests/aqua/test.aqua @@ -5,4 +5,4 @@ import "@fluencelabs/registry/resources-api.aqua" export getResource, createResource, getResourceId, get_peer_id, registerService, resolveResource, unregisterService func get_peer_id() -> PeerId: - <- INIT_PEER_ID + <- INIT_PEER_ID diff --git a/aqua/package-lock.json b/aqua/package-lock.json index 38613a9..70f0fc7 100644 --- a/aqua/package-lock.json +++ b/aqua/package-lock.json @@ -9,7 +9,7 @@ "version": "0.3.1", "license": "MIT", "dependencies": { - "@fluencelabs/aqua-lib": "0.6.0", + "@fluencelabs/aqua-lib": "^0.6.0", "@fluencelabs/trust-graph": "3.0.4" }, "devDependencies": { diff --git a/aqua/package.json b/aqua/package.json index 410bc33..abb173f 100644 --- a/aqua/package.json +++ b/aqua/package.json @@ -6,7 +6,7 @@ "*.aqua" ], "dependencies": { - "@fluencelabs/aqua-lib": "0.6.0", + "@fluencelabs/aqua-lib": "^0.6.0", "@fluencelabs/trust-graph": "3.0.4" }, "scripts": { diff --git a/aqua/subnetwork.aqua b/aqua/subnetwork.aqua new file mode 100644 index 0000000..45e49fe --- /dev/null +++ b/aqua/subnetwork.aqua @@ -0,0 +1,180 @@ +module Registry.Subnetwork declares * + +import "registry-service.aqua" +import "registry-api.aqua" +import "misc.aqua" +import "constants.aqua" +import "@fluencelabs/aqua-lib/builtin.aqua" +import "@fluencelabs/trust-graph/trust-graph.aqua" + +alias Error: string + +const MIN_ACK = 2 + +func getInsecuredPeerId() -> PeerId: + Sig "insecure_sig" + peer_id <- Sig.get_peer_id() + <- peer_id + +func getKeyInsecuredSignature(label: string, peer_id: PeerId, timestamp_created: u64) -> SignResult: + bytes <- Registry.get_key_bytes(label, ?[peer_id], timestamp_created, nil, "") + Sig "insecure_sig" + result <- Sig.sign(bytes) + <- result + +-- peer_id comes from ("insecure_sig" "get_peer_id") +func registerSubnetworkKey(deal_id: string, peer_id: PeerId, timestamp_created: u64, signature: []u8) -> RegisterKeyResult: + t <- Peer.timestamp_sec() + weight <- TrustGraph.get_weight(peer_id, t) + result <- Registry.register_key(deal_id, ?[peer_id], timestamp_created, nil, "", signature, weight, t) + <- result + +func createSubnetwork(deal_id: string) -> ?string, *Error: + t <- Peer.timestamp_sec() + + subnetwork_id: ?string + error: *Error + on HOST_PEER_ID: + peer_id <- getInsecuredPeerId() + sig_result <- getKeyInsecuredSignature(deal_id, peer_id, t) + if sig_result.success == false: + error <<- sig_result.error! + else: + signature = sig_result.signature! + + id <- Registry.get_key_id(deal_id, peer_id) + nodes <- getNeighbors(id) + successful: *bool + on HOST_PEER_ID: + for n <- nodes par: + on n: + try: + res <- registerSubnetworkKey(deal_id, peer_id, t, signature) + + if res.success: + successful <<- true + else: + error <<- res.error + + success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT) + + if success == false: + error <<- "key wasn't created: timeout exceeded" + else: + subnetwork_id <<- id + + <- subnetwork_id, error + +func getWorkerRecordMetadata(subnetwork_id: string) -> ?RecordMetadata, ?string: + t <- Peer.timestamp_sec() + relay_id = ?[HOST_PEER_ID] + bytes <- Registry.get_record_metadata_bytes(subnetwork_id, INIT_PEER_ID, t, "", INIT_PEER_ID, relay_id, nil, nil) + on INIT_PEER_ID: + Sig "sig" + sig_result <- Sig.sign(bytes) + + result: ?RecordMetadata + error: *string + if sig_result.success == true: + result <- Registry.create_record_metadata(subnetwork_id, INIT_PEER_ID, t, "", INIT_PEER_ID, relay_id, nil, nil, sig_result.signature!) + else: + error <<- sig_result.error! + + <- result, error + +func getWorkerRecordSignature(metadata: RecordMetadata, timestamp_created: u64) -> SignResult: + signature: *SignResult + bytes <- Registry.get_record_bytes(metadata, timestamp_created) + + on INIT_PEER_ID: + Sig "sig" + signature <- Sig.sign(bytes) + + <- signature! + +func registerWorker(subnetwork_id: string) -> bool, *Error: + success: *bool + error: *Error + + metadata, err <- getWorkerRecordMetadata(subnetwork_id) + if metadata == nil: + success <<- false + error <<- err! + else: + t <- Peer.timestamp_sec() + sig_result = getWorkerRecordSignature(metadata!, t) + if sig_result.success == false: + error <<- sig_result.error! + success <<- false + else: + key, error_get <- getResourceHelper(subnetwork_id) + + if key == nil: + appendErrors(error, error_get) + success <<- false + else: + republish_result <- republishKey(key!) + if republish_result.success == false: + error <<- republish_result.error + success <<- false + else: + p_res <- putRecord(metadata!, t, sig_result.signature!) + if p_res.success == false: + error <<- p_res.error + success <<- false + else: + nodes <- getNeighbors(subnetwork_id) + + successful: *bool + for n <- nodes par: + on n: + try: + republish_res <- republishKey(key!) + if republish_res.success == false: + error <<- republish_res.error + else: + put_res <- putRecord(metadata!, t, sig_result.signature!) + if put_res.success: + successful <<- true + else: + error <<- put_res.error + + success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT) + + succ = success! + if succ == false: + error <<- "worker hasn't registered: timeout exceeded" + + <- succ, error + +func resolveSubnetwork(deal_id: string) -> ?[]Record, *Error: + peer_id <- getInsecuredPeerId() + key_id <- Registry.get_key_id(deal_id, peer_id) + + nodes <- getNeighbors(key_id) + result: ?[]Record + records: *[]Record + error: *Error + successful: *bool + on HOST_PEER_ID: + for n <- nodes par: + on n: + try: + t <- Peer.timestamp_sec() + get_result <- Registry.get_records(key_id, t) + if get_result.success: + records <<- get_result.result + successful <<- true + else: + error <<- get_result.error + + success <- wait(successful, MIN_ACK, DEFAULT_TIMEOUT) + if success == false: + error <<- "timeout exceeded" + else: + merged <- Registry.merge(records) + if merged.success == false: + error <<- merged.error + else: + result <<- merged.result + <- result, error