Fix propagate_host_record, update annotations in aqua (#92)

This commit is contained in:
Aleksey Proshutisnkiy
2022-03-29 14:58:55 +04:00
committed by GitHub
parent 6b2d699d64
commit 78dafff714
7 changed files with 215 additions and 47 deletions

View File

@ -10,17 +10,17 @@ func get_route_id(label: string, peer_id: string) -> RouteId:
route_id <- Registry.get_route_id(label, peer_id) route_id <- Registry.get_route_id(label, peer_id)
<- route_id <- route_id
-- Get peers closest to the label's hash in Kademlia network -- Get peers closest to the route_id's hash in Kademlia network
-- These peers are expected to store list of subscribers of this label -- These peers are expected to store list of providers for this route
func getNeighbours(route_id: string) -> []PeerId: func getNeighbours(route_id: RouteId) -> []PeerId:
k <- Op.string_to_b58(route_id) k <- Op.string_to_b58(route_id)
nodes <- Kademlia.neighborhood(k, nil, nil) nodes <- Kademlia.neighborhood(k, nil, nil)
<- nodes <- nodes
-- If this peer have set node_id as a subscriber for label, -- If this peer have set node_id as a provider for route,
-- this call will prevent subscriber from re-subscribing -- this call will prevent provider from renew
-- so that eventually it will disappear from the subscribers list -- so that eventually it will disappear from the providers list
func removeFromRoute(route_id: string): func removeFromRoute(route_id: RouteId):
on HOST_PEER_ID: on HOST_PEER_ID:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
Registry.clear_host_record(route_id, t) Registry.clear_host_record(route_id, t)
@ -39,8 +39,8 @@ func createRoute(label: string) -> RouteId:
result <- register_route(label, t, signature, false) result <- register_route(label, t, signature, false)
<- route_id <- route_id
-- Create a label and subscribe to it -- Create a route and register for it
-- INIT_PEER_ID (current client) will become a subscriber -- INIT_PEER_ID (current client) will become a provider
func createRouteAndRegister(label: string, value: string, service_id: ?string) -> string: func createRouteAndRegister(label: string, value: string, service_id: ?string) -> string:
relay_id: ?string relay_id: ?string
relay_id <<- HOST_PEER_ID relay_id <<- HOST_PEER_ID
@ -61,8 +61,8 @@ func createRouteAndRegister(label: string, value: string, service_id: ?string) -
put_record(route_id, value, relay_id, service_id, t, record_signature) put_record(route_id, value, relay_id, service_id, t, record_signature)
<- route_id <- route_id
-- Create a label and subscribe to it -- Create a route and register for it
-- INIT_PEER_ID (current client) will become a subscriber -- INIT_PEER_ID (current client) will become a provider
-- In contrast with non-blocking version, waits for at least a single write to succeed -- In contrast with non-blocking version, waits for at least a single write to succeed
func createRouteAndRegisterBlocking( func createRouteAndRegisterBlocking(
label: string, value: string, label: string, value: string,
@ -93,8 +93,8 @@ func createRouteAndRegisterBlocking(
join results[ack] join results[ack]
<- route_id <- route_id
-- Create a label and make the given node a subscriber to it -- Create a route and make the given node a provider for it
func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string: func createRouteAndRegisterNode(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> RouteId:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t) route_signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
@ -102,7 +102,7 @@ func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
on subscriber_node_id: on provider_node_id:
register_route(label, t, route_signature, false) register_route(label, t, route_signature, false)
r <- put_host_record(route_id, value, nil, service_id, t, record_signature) r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id) nodes <- getNeighbours(route_id)
@ -113,9 +113,37 @@ func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value
propagate_host_record(r) propagate_host_record(r)
<- route_id <- route_id
-- Subscribe to a label func createRouteAndRegisterNodeBlocking(
-- Note: label must be already initiated provider_node_id: PeerId, label: string,
func registerForRoute(route_id: string, value: string, service_id: ?string): value: string, service_id: ?string,
progress: string -> (),
ack: i16
) -> RouteId:
t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
on HOST_PEER_ID:
route_id <- get_route_id(label, INIT_PEER_ID)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
results: *DhtResult
on provider_node_id:
register_route(label, t, route_signature, false)
r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id)
for n <- nodes par:
on n:
try:
register_route(label, t, route_signature, false)
results <- propagate_host_record(r)
progress(n)
join results[ack]
<- route_id
-- Register for a route
-- Note: route must be already initiated
func registerForRoute(route_id: RouteId, value: string, service_id: ?string):
relay_id: ?string relay_id: ?string
relay_id <<- HOST_PEER_ID relay_id <<- HOST_PEER_ID
@ -130,26 +158,22 @@ func registerForRoute(route_id: string, value: string, service_id: ?string):
put_record(route_id, value, relay_id, service_id, t, record_signature) put_record(route_id, value, relay_id, service_id, t, record_signature)
-- Subscribe a node to the given label -- Register a node to the given route
-- Note: label must be already initiated -- Note: route must be already initiated
func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string): func registerForRouteNode(provider_node_id: PeerId, route_id: RouteId, value: string, service_id: ?string):
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
route_signature <- get_route_signature(label, t)
on HOST_PEER_ID:
route_id <- get_route_id(label, INIT_PEER_ID)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
on subscriber_node_id: on provider_node_id:
r <- put_host_record(route_id, value, nil, service_id, t, record_signature) r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id) nodes <- getNeighbours(route_id)
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
register_route(label, t, route_signature, false)
propagate_host_record(r) propagate_host_record(r)
-- Find the list of record for the given route_id -- Find the list of record for the given route_id
func resolveRoute(route_id: string, ack: i16) -> []Record: func resolveRoute(route_id: RouteId, ack: i16) -> []Record:
on HOST_PEER_ID: on HOST_PEER_ID:
nodes <- getNeighbours(route_id) nodes <- getNeighbours(route_id)
res: *[]Record res: *[]Record
@ -165,10 +189,10 @@ func resolveRoute(route_id: string, ack: i16) -> []Record:
result <- Registry.merge(res) result <- Registry.merge(res)
<- result.result <- result.result
-- Execute the given code on subscribers -- Execute the given code on providers
-- Note that you can provide another Aqua function as an argument to this one -- Note that you can provide another Aqua function as an argument to this one
func executeOnRoute(route_id: string, ack: i16, call: Record -> ()): func executeOnRoute(route_id: RouteId, ack: i16, call: Record -> ()):
subs <- resolveRoute(route_id, ack) providers <- resolveRoute(route_id, ack)
for r <- subs par: for r <- providers par:
on r.peer_id via r.relay_id: on r.peer_id via r.relay_id:
call(r) call(r)

View File

@ -1,7 +1,7 @@
module Export module Export
import createRouteAndRegisterBlocking, resolveRoute from "@fluencelabs/registry/routing.aqua" import createRouteAndRegisterNodeBlocking, resolveRoute from "@fluencelabs/registry/routing.aqua"
import Peer from "@fluencelabs/aqua-lib/builtin.aqua" import Peer from "@fluencelabs/aqua-lib/builtin.aqua"
export createRouteAndRegisterBlocking, resolveRoute, timestamp_sec export createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec
func timestamp_sec() -> u64: func timestamp_sec() -> u64:

View File

@ -1,6 +1,6 @@
import {Fluence, KeyPair} from "@fluencelabs/fluence"; import {Fluence, KeyPair} from "@fluencelabs/fluence";
import { krasnodar, Node } from "@fluencelabs/fluence-network-environment"; import { krasnodar, Node } from "@fluencelabs/fluence-network-environment";
import {createRouteAndRegisterBlocking, resolveRoute, timestamp_sec} from "./generated/export"; import {createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec} from "./generated/export";
let local: Node[] = [ let local: Node[] = [
{ {
@ -33,8 +33,8 @@ async function main() {
let value = "myValue"; let value = "myValue";
console.log("Will create route with label:", label); console.log("Will create route with label:", label);
// create route (if not exists) and register on it // create route (if not exists) and register on it
let route_id = await createRouteAndRegisterBlocking( let route_id = await createRouteAndRegisterNodeBlocking(krasnodar[0].peerId,
label, value, null, label, value, "identity",
(s) => console.log(`node ${s} saved the record`), (s) => console.log(`node ${s} saved the record`),
5 5
); );

14
service/Cargo.lock generated
View File

@ -8,7 +8,7 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [ dependencies = [
"getrandom 0.2.5", "getrandom 0.2.6",
"once_cell", "once_cell",
"version_check", "version_check",
] ]
@ -889,9 +889,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.5" version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
@ -2045,7 +2045,7 @@ version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [ dependencies = [
"getrandom 0.2.5", "getrandom 0.2.6",
] ]
[[package]] [[package]]
@ -2419,9 +2419,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.89" version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2608,7 +2608,7 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [ dependencies = [
"getrandom 0.2.5", "getrandom 0.2.6",
] ]
[[package]] [[package]]

View File

@ -28,3 +28,5 @@ pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer";
pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec"; pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec";
pub static TRUSTED_WEIGHT_SERVICE_ID: &str = "trust-graph"; pub static TRUSTED_WEIGHT_SERVICE_ID: &str = "trust-graph";
pub static TRUSTED_WEIGHT_FUNCTION_NAME: &str = "get_weight"; pub static TRUSTED_WEIGHT_FUNCTION_NAME: &str = "get_weight";
pub static TRUSTED_REGISTRY_SERVICE_ID: &str = "registry";
pub static TRUSTED_REGISTRY_FUNCTION_NAME: &str = "put_host_record";

View File

@ -21,7 +21,7 @@ mod tests {
use marine_rs_sdk::{CallParameters, SecurityTetraplet}; use marine_rs_sdk::{CallParameters, SecurityTetraplet};
use rusqlite::Connection; use rusqlite::Connection;
marine_rs_sdk_test::include_test_env!("/marine_test_env.rs"); marine_rs_sdk_test::include_test_env!("/marine_test_env.rs");
use marine_test_env::registry::{DhtResult, Record, ServiceInterface}; use marine_test_env::registry::{DhtResult, PutHostRecordResult, Record, ServiceInterface};
use crate::defaults::{ use crate::defaults::{
CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME, CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME,
@ -329,6 +329,99 @@ mod tests {
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
} }
fn get_signed_host_record_bytes(
registry: &mut ServiceInterface,
kp: &KeyPair,
route_id: String,
value: String,
relay_id: Vec<String>,
service_id: Vec<String>,
timestamp_created: u64,
solution: Vec<u8>,
) -> Vec<u8> {
let issuer_peer_id = kp.get_peer_id().to_base58();
let cp = CPWrapper::new(&issuer_peer_id);
let record_bytes = registry.get_host_record_bytes_cp(
route_id,
value,
relay_id,
service_id,
timestamp_created,
solution,
cp.get(),
);
kp.sign(&record_bytes).unwrap().to_vec().to_vec()
}
fn put_host_record(
registry: &mut ServiceInterface,
kp: &KeyPair,
route_id: String,
value: String,
relay_id: Vec<String>,
service_id: Vec<String>,
timestamp_created: u64,
current_timestamp: u64,
weight: u32,
) -> PutHostRecordResult {
let issuer_peer_id = kp.get_peer_id().to_base58();
let solution = vec![];
let signature = get_signed_host_record_bytes(
registry,
kp,
route_id.clone(),
value.clone(),
relay_id.clone(),
service_id.clone(),
timestamp_created,
solution.clone(),
);
let cp = CPWrapper::new(&issuer_peer_id)
.add_weight_tetraplets(7)
.add_timestamp_tetraplets(8);
let weight = get_weight(issuer_peer_id.clone(), weight);
registry.put_host_record_cp(
route_id,
value,
relay_id,
service_id,
timestamp_created,
solution,
signature,
weight,
current_timestamp,
cp.get(),
)
}
fn put_host_record_checked(
registry: &mut ServiceInterface,
kp: &KeyPair,
route_id: String,
value: String,
relay_id: Vec<String>,
service_id: Vec<String>,
timestamp_created: u64,
current_timestamp: u64,
weight: u32,
) {
let result = put_host_record(
registry,
kp,
route_id,
value,
relay_id,
service_id,
timestamp_created,
current_timestamp,
weight,
);
assert!(result.success, "{}", result.error);
}
fn get_records( fn get_records(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
route_id: String, route_id: String,
@ -753,4 +846,53 @@ mod tests {
assert_eq!(record.value, value); assert_eq!(record.value, value);
assert_eq!(record.set_by, kp.get_peer_id().to_base58()); assert_eq!(record.set_by, kp.get_peer_id().to_base58());
} }
#[test]
fn test_put_get_host_record() {
clear_env();
let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519();
let route = "some_route".to_string();
let timestamp_created = 0u64;
let current_timestamp = 100u64;
let weight = 0;
let pin = false;
let route_id = register_route_checked(
&mut registry,
&kp,
route,
timestamp_created,
current_timestamp,
pin,
weight,
);
let value = "some_value".to_string();
let relay_id = vec!["some_relay".to_string()];
let service_id = vec!["some_service_id".to_string()];
let weight = 5u32;
put_host_record_checked(
&mut registry,
&kp,
route_id.clone(),
value.clone(),
relay_id.clone(),
service_id.clone(),
timestamp_created,
current_timestamp,
weight,
);
let records = get_records(&mut registry, route_id.clone(), current_timestamp);
assert_eq!(records.len(), 1);
let record = &records[0];
assert_eq!(record.route_id, route_id);
assert_eq!(record.relay_id, relay_id);
assert_eq!(record.service_id, service_id);
assert_eq!(record.set_by, kp.get_peer_id().to_base58());
assert_eq!(record.peer_id, HOST_ID);
assert_eq!(record.value, value);
assert_eq!(record.set_by, kp.get_peer_id().to_base58());
}
} }

View File

@ -15,8 +15,8 @@
*/ */
use crate::defaults::{ use crate::defaults::{
TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_REGISTRY_FUNCTION_NAME, TRUSTED_REGISTRY_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME,
TRUSTED_WEIGHT_SERVICE_ID, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID,
}; };
use crate::error::ServiceError; use crate::error::ServiceError;
use crate::error::ServiceError::{ use crate::error::ServiceError::{
@ -56,8 +56,8 @@ pub(crate) fn check_host_value_tetraplets(
let tetraplet = tetraplets let tetraplet = tetraplets
.get(0) .get(0)
.ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", call_parameters.tetraplets)))?; .ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", call_parameters.tetraplets)))?;
(tetraplet.service_id == "aqua-dht" (tetraplet.service_id == TRUSTED_REGISTRY_SERVICE_ID
&& tetraplet.function_name == "put_host_value" && tetraplet.function_name == TRUSTED_REGISTRY_FUNCTION_NAME
&& tetraplet.peer_pk == host_value.peer_id) && tetraplet.peer_pk == host_value.peer_id)
.then(|| ()) .then(|| ())
.ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", tetraplet))) .ok_or_else(|| InvalidSetHostValueTetraplet(format!("{:?}", tetraplet)))