From 78dafff7145bdd760c312bd2ab53452f625ea6ac Mon Sep 17 00:00:00 2001 From: Aleksey Proshutisnkiy Date: Tue, 29 Mar 2022 14:58:55 +0400 Subject: [PATCH] Fix propagate_host_record, update annotations in aqua (#92) --- aqua/routing.aqua | 84 +++++++++++------ example/src/aqua/export.aqua | 4 +- example/src/example.ts | 6 +- service/Cargo.lock | 14 +-- service/src/defaults.rs | 2 + service/src/tests/mod.rs | 144 ++++++++++++++++++++++++++++- service/src/tetraplets_checkers.rs | 8 +- 7 files changed, 215 insertions(+), 47 deletions(-) diff --git a/aqua/routing.aqua b/aqua/routing.aqua index d8fdd00..62cf2ad 100644 --- a/aqua/routing.aqua +++ b/aqua/routing.aqua @@ -10,17 +10,17 @@ func get_route_id(label: string, peer_id: string) -> RouteId: route_id <- Registry.get_route_id(label, peer_id) <- route_id --- Get peers closest to the label's hash in Kademlia network --- These peers are expected to store list of subscribers of this label -func getNeighbours(route_id: string) -> []PeerId: +-- Get peers closest to the route_id's hash in Kademlia network +-- These peers are expected to store list of providers for this route +func getNeighbours(route_id: RouteId) -> []PeerId: k <- Op.string_to_b58(route_id) nodes <- Kademlia.neighborhood(k, nil, nil) <- nodes --- If this peer have set node_id as a subscriber for label, --- this call will prevent subscriber from re-subscribing --- so that eventually it will disappear from the subscribers list -func removeFromRoute(route_id: string): +-- If this peer have set node_id as a provider for route, +-- this call will prevent provider from renew +-- so that eventually it will disappear from the providers list +func removeFromRoute(route_id: RouteId): on HOST_PEER_ID: t <- Peer.timestamp_sec() Registry.clear_host_record(route_id, t) @@ -39,8 +39,8 @@ func createRoute(label: string) -> RouteId: result <- register_route(label, t, signature, false) <- route_id --- Create a label and subscribe to it --- INIT_PEER_ID (current client) will become a subscriber +-- Create a route and register for it +-- INIT_PEER_ID (current client) will become a provider func createRouteAndRegister(label: string, value: string, service_id: ?string) -> string: relay_id: ?string relay_id <<- HOST_PEER_ID @@ -61,8 +61,8 @@ func createRouteAndRegister(label: string, value: string, service_id: ?string) - put_record(route_id, value, relay_id, service_id, t, record_signature) <- route_id --- Create a label and subscribe to it --- INIT_PEER_ID (current client) will become a subscriber +-- Create a route and register for it +-- INIT_PEER_ID (current client) will become a provider -- In contrast with non-blocking version, waits for at least a single write to succeed func createRouteAndRegisterBlocking( label: string, value: string, @@ -93,8 +93,8 @@ func createRouteAndRegisterBlocking( join results[ack] <- route_id --- Create a label and make the given node a subscriber to it -func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string: +-- Create a route and make the given node a provider for it +func createRouteAndRegisterNode(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> RouteId: t <- Peer.timestamp_sec() route_signature <- get_route_signature(label, t) on HOST_PEER_ID: @@ -102,7 +102,7 @@ func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) - on subscriber_node_id: + on provider_node_id: register_route(label, t, route_signature, false) r <- put_host_record(route_id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(route_id) @@ -113,9 +113,37 @@ func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value propagate_host_record(r) <- route_id --- Subscribe to a label --- Note: label must be already initiated -func registerForRoute(route_id: string, value: string, service_id: ?string): +func createRouteAndRegisterNodeBlocking( + provider_node_id: PeerId, label: string, + value: string, service_id: ?string, + progress: string -> (), + ack: i16 +) -> RouteId: + t <- Peer.timestamp_sec() + route_signature <- get_route_signature(label, t) + on HOST_PEER_ID: + route_id <- get_route_id(label, INIT_PEER_ID) + + record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) + + results: *DhtResult + on provider_node_id: + register_route(label, t, route_signature, false) + r <- put_host_record(route_id, value, nil, service_id, t, record_signature) + nodes <- getNeighbours(route_id) + for n <- nodes par: + on n: + try: + register_route(label, t, route_signature, false) + results <- propagate_host_record(r) + progress(n) + + join results[ack] + <- route_id + +-- Register for a route +-- Note: route must be already initiated +func registerForRoute(route_id: RouteId, value: string, service_id: ?string): relay_id: ?string relay_id <<- HOST_PEER_ID @@ -130,26 +158,22 @@ func registerForRoute(route_id: string, value: string, service_id: ?string): put_record(route_id, value, relay_id, service_id, t, record_signature) --- Subscribe a node to the given label --- Note: label must be already initiated -func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string): +-- Register a node to the given route +-- Note: route must be already initiated +func registerForRouteNode(provider_node_id: PeerId, route_id: RouteId, value: string, service_id: ?string): t <- Peer.timestamp_sec() - route_signature <- get_route_signature(label, t) - on HOST_PEER_ID: - route_id <- get_route_id(label, INIT_PEER_ID) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) - on subscriber_node_id: + on provider_node_id: r <- put_host_record(route_id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(route_id) for n <- nodes par: on n: try: - register_route(label, t, route_signature, false) propagate_host_record(r) -- Find the list of record for the given route_id -func resolveRoute(route_id: string, ack: i16) -> []Record: +func resolveRoute(route_id: RouteId, ack: i16) -> []Record: on HOST_PEER_ID: nodes <- getNeighbours(route_id) res: *[]Record @@ -165,10 +189,10 @@ func resolveRoute(route_id: string, ack: i16) -> []Record: result <- Registry.merge(res) <- result.result --- Execute the given code on subscribers +-- Execute the given code on providers -- Note that you can provide another Aqua function as an argument to this one -func executeOnRoute(route_id: string, ack: i16, call: Record -> ()): - subs <- resolveRoute(route_id, ack) - for r <- subs par: +func executeOnRoute(route_id: RouteId, ack: i16, call: Record -> ()): + providers <- resolveRoute(route_id, ack) + for r <- providers par: on r.peer_id via r.relay_id: call(r) diff --git a/example/src/aqua/export.aqua b/example/src/aqua/export.aqua index b3827ab..aeaa5db 100644 --- a/example/src/aqua/export.aqua +++ b/example/src/aqua/export.aqua @@ -1,7 +1,7 @@ module Export -import createRouteAndRegisterBlocking, resolveRoute from "@fluencelabs/registry/routing.aqua" +import createRouteAndRegisterNodeBlocking, resolveRoute from "@fluencelabs/registry/routing.aqua" import Peer from "@fluencelabs/aqua-lib/builtin.aqua" -export createRouteAndRegisterBlocking, resolveRoute, timestamp_sec +export createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec func timestamp_sec() -> u64: diff --git a/example/src/example.ts b/example/src/example.ts index 9208061..a618d62 100644 --- a/example/src/example.ts +++ b/example/src/example.ts @@ -1,6 +1,6 @@ import {Fluence, KeyPair} from "@fluencelabs/fluence"; import { krasnodar, Node } from "@fluencelabs/fluence-network-environment"; -import {createRouteAndRegisterBlocking, resolveRoute, timestamp_sec} from "./generated/export"; +import {createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec} from "./generated/export"; let local: Node[] = [ { @@ -33,8 +33,8 @@ async function main() { let value = "myValue"; console.log("Will create route with label:", label); // create route (if not exists) and register on it - let route_id = await createRouteAndRegisterBlocking( - label, value, null, + let route_id = await createRouteAndRegisterNodeBlocking(krasnodar[0].peerId, + label, value, "identity", (s) => console.log(`node ${s} saved the record`), 5 ); diff --git a/service/Cargo.lock b/service/Cargo.lock index d4d53da..0d5235a 100644 --- a/service/Cargo.lock +++ b/service/Cargo.lock @@ -8,7 +8,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.5", + "getrandom 0.2.6", "once_cell", "version_check", ] @@ -889,9 +889,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", "libc", @@ -2045,7 +2045,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.5", + "getrandom 0.2.6", ] [[package]] @@ -2419,9 +2419,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.89" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" +checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" dependencies = [ "proc-macro2", "quote", @@ -2608,7 +2608,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.5", + "getrandom 0.2.6", ] [[package]] diff --git a/service/src/defaults.rs b/service/src/defaults.rs index d8aae40..7069e4e 100644 --- a/service/src/defaults.rs +++ b/service/src/defaults.rs @@ -28,3 +28,5 @@ pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer"; pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec"; pub static TRUSTED_WEIGHT_SERVICE_ID: &str = "trust-graph"; pub static TRUSTED_WEIGHT_FUNCTION_NAME: &str = "get_weight"; +pub static TRUSTED_REGISTRY_SERVICE_ID: &str = "registry"; +pub static TRUSTED_REGISTRY_FUNCTION_NAME: &str = "put_host_record"; diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 628ace8..e3f9b0e 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -21,7 +21,7 @@ mod tests { use marine_rs_sdk::{CallParameters, SecurityTetraplet}; use rusqlite::Connection; marine_rs_sdk_test::include_test_env!("/marine_test_env.rs"); - use marine_test_env::registry::{DhtResult, Record, ServiceInterface}; + use marine_test_env::registry::{DhtResult, PutHostRecordResult, Record, ServiceInterface}; use crate::defaults::{ CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME, @@ -329,6 +329,99 @@ mod tests { assert!(result.success, "{}", result.error); } + fn get_signed_host_record_bytes( + registry: &mut ServiceInterface, + kp: &KeyPair, + route_id: String, + value: String, + relay_id: Vec, + service_id: Vec, + timestamp_created: u64, + solution: Vec, + ) -> Vec { + let issuer_peer_id = kp.get_peer_id().to_base58(); + let cp = CPWrapper::new(&issuer_peer_id); + let record_bytes = registry.get_host_record_bytes_cp( + route_id, + value, + relay_id, + service_id, + timestamp_created, + solution, + cp.get(), + ); + + kp.sign(&record_bytes).unwrap().to_vec().to_vec() + } + + fn put_host_record( + registry: &mut ServiceInterface, + kp: &KeyPair, + route_id: String, + value: String, + relay_id: Vec, + service_id: Vec, + timestamp_created: u64, + current_timestamp: u64, + weight: u32, + ) -> PutHostRecordResult { + let issuer_peer_id = kp.get_peer_id().to_base58(); + let solution = vec![]; + + let signature = get_signed_host_record_bytes( + registry, + kp, + route_id.clone(), + value.clone(), + relay_id.clone(), + service_id.clone(), + timestamp_created, + solution.clone(), + ); + + let cp = CPWrapper::new(&issuer_peer_id) + .add_weight_tetraplets(7) + .add_timestamp_tetraplets(8); + let weight = get_weight(issuer_peer_id.clone(), weight); + registry.put_host_record_cp( + route_id, + value, + relay_id, + service_id, + timestamp_created, + solution, + signature, + weight, + current_timestamp, + cp.get(), + ) + } + + fn put_host_record_checked( + registry: &mut ServiceInterface, + kp: &KeyPair, + route_id: String, + value: String, + relay_id: Vec, + service_id: Vec, + timestamp_created: u64, + current_timestamp: u64, + weight: u32, + ) { + let result = put_host_record( + registry, + kp, + route_id, + value, + relay_id, + service_id, + timestamp_created, + current_timestamp, + weight, + ); + assert!(result.success, "{}", result.error); + } + fn get_records( registry: &mut ServiceInterface, route_id: String, @@ -753,4 +846,53 @@ mod tests { assert_eq!(record.value, value); assert_eq!(record.set_by, kp.get_peer_id().to_base58()); } + + #[test] + fn test_put_get_host_record() { + clear_env(); + let mut registry = ServiceInterface::new(); + let kp = KeyPair::generate_ed25519(); + let route = "some_route".to_string(); + let timestamp_created = 0u64; + let current_timestamp = 100u64; + let weight = 0; + let pin = false; + + let route_id = register_route_checked( + &mut registry, + &kp, + route, + timestamp_created, + current_timestamp, + pin, + weight, + ); + let value = "some_value".to_string(); + let relay_id = vec!["some_relay".to_string()]; + let service_id = vec!["some_service_id".to_string()]; + let weight = 5u32; + + put_host_record_checked( + &mut registry, + &kp, + route_id.clone(), + value.clone(), + relay_id.clone(), + service_id.clone(), + timestamp_created, + current_timestamp, + weight, + ); + + let records = get_records(&mut registry, route_id.clone(), current_timestamp); + assert_eq!(records.len(), 1); + let record = &records[0]; + assert_eq!(record.route_id, route_id); + assert_eq!(record.relay_id, relay_id); + assert_eq!(record.service_id, service_id); + assert_eq!(record.set_by, kp.get_peer_id().to_base58()); + assert_eq!(record.peer_id, HOST_ID); + assert_eq!(record.value, value); + assert_eq!(record.set_by, kp.get_peer_id().to_base58()); + } } diff --git a/service/src/tetraplets_checkers.rs b/service/src/tetraplets_checkers.rs index 4eba896..7749643 100644 --- a/service/src/tetraplets_checkers.rs +++ b/service/src/tetraplets_checkers.rs @@ -15,8 +15,8 @@ */ use crate::defaults::{ - TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_WEIGHT_FUNCTION_NAME, - TRUSTED_WEIGHT_SERVICE_ID, + TRUSTED_REGISTRY_FUNCTION_NAME, TRUSTED_REGISTRY_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, + TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID, }; use crate::error::ServiceError; use crate::error::ServiceError::{ @@ -56,8 +56,8 @@ pub(crate) fn check_host_value_tetraplets( let tetraplet = tetraplets .get(0) .ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", call_parameters.tetraplets)))?; - (tetraplet.service_id == "aqua-dht" - && tetraplet.function_name == "put_host_value" + (tetraplet.service_id == TRUSTED_REGISTRY_SERVICE_ID + && tetraplet.function_name == TRUSTED_REGISTRY_FUNCTION_NAME && tetraplet.peer_pk == host_value.peer_id) .then(|| ()) .ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", tetraplet)))