add propagate_host_value (#15)

This commit is contained in:
Aleksey Proshutisnkiy 2021-06-08 14:29:15 +03:00 committed by GitHub
parent ea5d350f59
commit a24a530be1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 171 additions and 72 deletions

93
Cargo.lock generated
View File

@ -406,9 +406,9 @@ dependencies = [
[[package]]
name = "fluence-app-service"
version = "0.7.2"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3b1d2c199d6bc140c9ec5ffa323ebd34ac134fb37828bbfd7aeed0339cab78"
checksum = "c0b7c82c9b76dc2f0f00a24fa39fe6ecdb90c7b4596bc70069947f55b83d7ed8"
dependencies = [
"fluence-faas",
"log",
@ -422,15 +422,16 @@ dependencies = [
[[package]]
name = "fluence-faas"
version = "0.7.2"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9421599c5540e50f3854a0a48702c31408ac1cfb06314fe391792daa3a9d800b"
checksum = "5d03b12b7b359570eca676f491e5a4350fcc55decb14b1feff76796f5e346659"
dependencies = [
"cmd_lib",
"fluence",
"fluence-sdk-main",
"itertools 0.9.0",
"log",
"marine-module-interface",
"marine-runtime",
"marine-utils",
"safe-transmute",
@ -468,20 +469,6 @@ dependencies = [
"serde",
]
[[package]]
name = "fluence-sdk-wit"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab3ed39703b72e0e52bce9e1760746c73f65a5694c8da4dec751d3bfdec15b8"
dependencies = [
"proc-macro2",
"quote",
"serde",
"serde_json",
"syn",
"uuid",
]
[[package]]
name = "fluence-test"
version = "0.1.9"
@ -809,14 +796,14 @@ checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "marine-it-generator"
version = "0.4.0"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245d4971ee68559236cdd34e6d57ae769dab68c6c9ed4d390f14ae5830880c8f"
checksum = "e62f29b16bbdb0763a04f8561c954624ee9cd9f558af4e67b95eb00880da11ec"
dependencies = [
"cargo_toml",
"fluence-sdk-wit",
"it-lilo",
"marine-it-parser",
"marine-it-parser 0.6.4",
"marine-macro-impl",
"once_cell",
"serde",
"serde_json",
@ -835,6 +822,16 @@ dependencies = [
"wasmer-interface-types-fl",
]
[[package]]
name = "marine-it-interfaces"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f18c137e51fd52ab7a3652233fc4eaa68e25a6a53d609bf9dd0f2e3bf67adee1"
dependencies = [
"multimap",
"wasmer-interface-types-fl",
]
[[package]]
name = "marine-it-parser"
version = "0.5.0"
@ -842,7 +839,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e59c7067a18b9e4aebe67bee033638fae97d6fe4fb00f70f9a509eb5d03d1c5d"
dependencies = [
"anyhow",
"marine-it-interfaces",
"marine-it-interfaces 0.3.0",
"nom",
"semver 0.11.0",
"serde",
"thiserror",
"walrus",
"wasmer-interface-types-fl",
"wasmer-runtime-core-fl",
]
[[package]]
name = "marine-it-parser"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6be3cfa1905a63ebf925c4efc605cdd121ddfe86809aaec16ec9fe32443b4423"
dependencies = [
"anyhow",
"itertools 0.10.0",
"marine-it-interfaces 0.4.0",
"marine-module-interface",
"nom",
"semver 0.11.0",
"serde",
@ -892,19 +908,38 @@ dependencies = [
]
[[package]]
name = "marine-runtime"
version = "0.5.0"
name = "marine-module-interface"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8722580724555bf87a1128780f7dd3f700bbc6add690d1efe7e7fc736edf1a1"
checksum = "c4920ea983e51f8f560e09d9d554035cd4ecd7d60940b352637c8c4c9d02f865"
dependencies = [
"anyhow",
"itertools 0.10.0",
"marine-it-interfaces 0.4.0",
"nom",
"semver 0.11.0",
"serde",
"thiserror",
"walrus",
"wasmer-interface-types-fl",
"wasmer-runtime-core-fl",
]
[[package]]
name = "marine-runtime"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4d692a5077c2617cc147419bf65eb73f8eff583726933b2c79e2b3f9db2115b"
dependencies = [
"anyhow",
"boolinator",
"it-lilo",
"log",
"marine-it-generator",
"marine-it-interfaces",
"marine-it-parser",
"marine-it-interfaces 0.4.0",
"marine-it-parser 0.6.4",
"marine-module-info-parser",
"marine-module-interface",
"marine-utils",
"multimap",
"once_cell",
@ -950,7 +985,7 @@ checksum = "b6e79cfbde74677cbeb60fe5944c345617739aae37dc5ac5ef5f12d487472071"
dependencies = [
"darling",
"fluence-app-service",
"marine-it-parser",
"marine-it-parser 0.5.0",
"proc-macro-error",
"proc-macro2",
"quote",

View File

@ -10,7 +10,7 @@ name = "aqua-dht"
path = "src/main.rs"
[dependencies]
fluence = "0.6.8"
fluence = "0.6.9"
marine-sqlite-connector = "0.4.1"
fstrings = "0.2.3"
eyre = "0.6.5"

View File

@ -1,15 +1,3 @@
data Key:
key: string
peer_id: string
timestamp_created: u64
pinned: bool
weight: u32
data GetKeyMetadataResult:
success: bool
error: string
key: Key
data Record:
value: string
peer_id: string
@ -19,11 +7,24 @@ data Record:
timestamp_created: u64
weight: u32
data GetValuesResult:
data MergeResult:
success: bool
error: string
result: []Record
data ClearExpiredResult:
success: bool
error: string
count_keys: u64
count_values: u64
data Key:
key: string
peer_id: string
timestamp_created: u64
pinned: bool
weight: u32
data EvictStaleItem:
key: Key
records: []Record
@ -38,20 +39,25 @@ data RepublishValuesResult:
error: string
updated: u64
data ClearExpiredResult:
data GetValuesResult:
success: bool
error: string
count_keys: u64
count_values: u64
result: []Record
data DhtResult:
success: bool
error: string
data MergeResult:
data PutHostValueResult:
success: bool
error: string
result: []Record
key: string
value: []Record
data GetKeyMetadataResult:
success: bool
error: string
key: Key
service AquaDHT("aqua-dht"):
clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult
@ -62,10 +68,9 @@ service AquaDHT("aqua-dht"):
merge(records: [][]Record) -> MergeResult
merge_hack_get_values(records: []GetValuesResult) -> MergeResult
merge_two(a: []Record, b: []Record) -> MergeResult
put_host_value(key: string, value: string, current_timestamp_sec: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult
put_host_value_relay(key: string, value: string, current_timestamp_sec: u64, relay_id: string, weight: u32) -> DhtResult
propagate_host_value(set_host_value: PutHostValueResult, current_timestamp_sec: u64, weight: u32) -> DhtResult
put_host_value(key: string, value: string, current_timestamp_sec: u64, relay_id: []string, service_id: []string, weight: u32) -> PutHostValueResult
put_value(key: string, value: string, current_timestamp_sec: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult
put_value_relay(key: string, value: string, current_timestamp_sec: u64, relay_id: string, weight: u32) -> DhtResult
register_key(key: string, current_timestamp_sec: u64, pin: bool, weight: u32) -> DhtResult
renew_host_value(key: string, current_timestamp_sec: u64) -> DhtResult
republish_key(key: Key, current_timestamp_sec: u64) -> DhtResult

View File

@ -15,7 +15,7 @@
*/
use crate::{Config, KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, DEFAULT_EXPIRED_VALUE_AGE, DEFAULT_STALE_VALUE_AGE, DEFAULT_EXPIRED_HOST_VALUE_AGE, VALUES_LIMIT, CONFIG_FILE};
use crate::results::{Key, Record, EvictStaleItem};
use crate::results::{Key, Record, EvictStaleItem, PutHostValueResult};
use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State, Statement};
use fluence::{CallParameters};
use eyre;
@ -68,6 +68,15 @@ pub(crate) fn check_timestamp_tetraplets(call_parameters: &CallParameters, arg_n
&& tetraplet.peer_pk == call_parameters.host_id).then(|| ()).wrap_err(error_msg)
}
pub(crate) fn check_host_value_tetraplets(call_parameters: &CallParameters, arg_number: usize, host_value: &Record) -> eyre::Result<()> {
let error_msg = "you should use put_host_value to pass set_host_value";
let tetraplets = call_parameters.tetraplets.get(arg_number).wrap_err(error_msg)?;
let tetraplet = tetraplets.get(0).wrap_err(error_msg)?;
(tetraplet.service_id == "aqua-dht" &&
tetraplet.function_name == "put_host_value"
&& tetraplet.peer_pk == host_value.peer_id).then(|| ()).wrap_err(error_msg)
}
#[inline]
pub(crate) fn get_connection() -> SqliteResult<Connection> {
marine_sqlite_connector::open(DB_PATH)
@ -195,7 +204,7 @@ pub fn republish_key_impl(key: Key, current_timestamp_sec: u64) -> SqliteResult<
update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp_sec, false, key.weight)
}
pub fn put_value_impl(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32, host: bool) -> SqliteResult<()> {
pub fn put_value_impl(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32, host: bool) -> SqliteResult<Record> {
let call_parameters = fluence::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 2)
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
@ -223,7 +232,17 @@ pub fn put_value_impl(key: String, value: String, current_timestamp_sec: u64, re
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
VALUES ('{key}', '{value}', '{peer_id}', '{set_by}', '{relay_id}',\
'{service_id}', '{current_timestamp_sec}', '{current_timestamp_sec}', '{weight}')")
)
)?;
Ok(Record {
value,
peer_id,
set_by,
relay_id:vec![relay_id],
service_id: vec![service_id],
timestamp_created: current_timestamp_sec,
weight
})
} else {
Err(SqliteError { code: None, message: Some("values limit is exceeded".to_string()) })
}
@ -273,10 +292,6 @@ pub fn republish_values_impl(key: String, mut records: Vec<Record>, current_time
let relay_id = if record.relay_id.is_empty() { "".to_string() } else { record.relay_id[0].clone() };
let service_id = if record.service_id.is_empty() { "".to_string() } else { record.service_id[0].clone() };
if record.set_by != record.peer_id { // host values are ignored in republish
continue;
}
connection.execute(
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
VALUES ('{key}', '{record.value}', '{record.peer_id}', '{record.peer_id}, '{relay_id}',\
@ -409,4 +424,23 @@ pub fn clear_host_value_impl(key: String, current_timestamp_sec: u64) -> SqliteR
WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?;
(connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) })
}
}
pub fn propagate_host_value_impl(mut set_host_value: PutHostValueResult, current_timestamp_sec: u64, weight: u32) -> SqliteResult<()> {
if !set_host_value.success || set_host_value.value.len() != 1 {
return Err( SqliteError { code: None, message: Some("invalid set_host_value".to_string()) });
}
let call_parameters = fluence::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 1)
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
check_host_value_tetraplets(&call_parameters, 0, &set_host_value.value[0])
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
if set_host_value.value[0].set_by != call_parameters.init_peer_id {
return Err( SqliteError { code: None, message: Some("value is set by another peer".to_string()) });
}
set_host_value.value[0].weight = weight;
republish_values_impl(set_host_value.key, set_host_value.value, current_timestamp_sec).map(|_| ())
}

View File

@ -19,8 +19,8 @@ mod results;
mod tests;
mod impls;
use crate::results::{Key, GetKeyMetadataResult, DhtResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult};
use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl, create_config, load_config, write_config};
use crate::results::{Key, GetKeyMetadataResult, DhtResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult, PutHostValueResult};
use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl, create_config, load_config, write_config, propagate_host_value_impl};
use fluence::marine;
use fluence::module_manifest;
@ -76,22 +76,20 @@ pub fn republish_key(key: Key, current_timestamp_sec: u64) -> DhtResult {
// VALUES
#[marine]
pub fn put_value(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32) -> DhtResult {
put_value_impl(key, value, current_timestamp_sec, relay_id, service_id, weight, false).into()
put_value_impl(key, value, current_timestamp_sec, relay_id, service_id, weight, false).map(|_| ()).into()
}
#[marine]
pub fn put_value_relay(key: String, value: String, current_timestamp_sec: u64, relay_id: String, weight: u32) -> DhtResult {
put_value_impl(key, value, current_timestamp_sec, vec![relay_id], vec![], weight, false).into()
pub fn put_host_value(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32) -> PutHostValueResult {
let mut result: PutHostValueResult = put_value_impl(key.clone(), value, current_timestamp_sec, relay_id, service_id, weight, true).into();
result.key = key;
result
}
#[marine]
pub fn put_host_value(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32) -> DhtResult {
put_value_impl(key, value, current_timestamp_sec, relay_id, service_id, weight, true).into()
}
#[marine]
pub fn put_host_value_relay(key: String, value: String, current_timestamp_sec: u64, relay_id: String, weight: u32) -> DhtResult {
put_value_impl(key, value, current_timestamp_sec, vec![relay_id], vec![], weight, true).into()
pub fn propagate_host_value(set_host_value: PutHostValueResult, current_timestamp_sec: u64, weight: u32) -> DhtResult {
propagate_host_value_impl(set_host_value, current_timestamp_sec, weight).into()
}
#[marine]

View File

@ -40,7 +40,7 @@ impl From<SqliteResult<()>> for DhtResult {
}
#[marine]
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Record {
pub value: String,
pub peer_id: String,
@ -241,3 +241,30 @@ impl From<SqliteResult<Vec<Record>>> for MergeResult {
}
}
}
#[marine]
pub struct PutHostValueResult {
pub success: bool,
pub error: String,
pub key: String,
pub value: Vec<Record>
}
impl From<SqliteResult<Record>> for PutHostValueResult {
fn from(result: SqliteResult<Record>) -> Self {
match result {
Ok(result) => Self {
success: true,
error: "".to_string(),
key: "".to_string(),
value: vec![result]
},
Err(err) => Self {
success: false,
error: err.to_string(),
key: "".to_string(),
value: vec![]
},
}
}
}