diff --git a/Cargo.lock b/Cargo.lock index 10e2df1..f784cd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,8 @@ dependencies = [ "fstrings", "marine-sqlite-connector", "rusqlite", + "serde", + "toml", ] [[package]] @@ -625,9 +627,9 @@ dependencies = [ [[package]] name = "heck" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ "unicode-segmentation", ] diff --git a/Cargo.toml b/Cargo.toml index f33c0ca..ac248cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ marine-sqlite-connector = "0.4.1" fstrings = "0.2.3" eyre = "0.6.5" boolinator = "2.4.0" +toml = "0.5.6" +serde = { version = "1.0.118", features = ["derive"] } [dev-dependencies] fluence-test = "0.1.9" diff --git a/aqua/aqua-dht.aqua b/aqua/aqua-dht.aqua index 424c264..3464d1a 100644 --- a/aqua/aqua-dht.aqua +++ b/aqua/aqua-dht.aqua @@ -1,3 +1,9 @@ +data ClearExpiredResult: + success: bool + error: string + count_keys: u64 + count_values: u64 + data Key: key: string peer_id: string @@ -10,6 +16,11 @@ data GetKeyMetadataResult: error: string key: Key +data RepublishValuesResult: + success: bool + error: string + updated: u64 + data Record: value: string peer_id: string @@ -19,54 +30,46 @@ data Record: timestamp_created: u64 weight: u32 +data EvictStaleItem: + key: Key + records: []Record + data MergeResult: success: bool error: string result: []Record -data GetValuesResult: - success: bool - error: string - result: []Record - -data ClearExpiredResult: - success: bool - error: string - count_keys: u64 - count_values: u64 - data DhtResult: success: bool error: string -data EvictStaleItem: - key: Key - records: []Record - data EvictStaleResult: success: bool error: string results: []EvictStaleItem -data RepublishValuesResult: +data GetValuesResult: success: bool error: string - updated: u64 + result: []Record service AquaDHT("aqua-dht"): - clear_expired(current_timestamp: u64) -> ClearExpiredResult - clear_host_value(key: string, current_timestamp: u64) -> DhtResult - evict_stale(current_timestamp: u64) -> EvictStaleResult - get_key_metadata(key: string, current_timestamp: u64) -> GetKeyMetadataResult - get_values(key: string, current_timestamp: u64) -> GetValuesResult + clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult + clear_host_value(key: string, current_timestamp_sec: u64) -> DhtResult + evict_stale(current_timestamp_sec: u64) -> EvictStaleResult + get_key_metadata(key: string, current_timestamp_sec: u64) -> GetKeyMetadataResult + get_values(key: string, current_timestamp_sec: u64) -> GetValuesResult merge(records: [][]Record) -> MergeResult merge_hack_get_values(records: []GetValuesResult) -> MergeResult merge_two(a: []Record, b: []Record) -> MergeResult - put_host_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult - put_host_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult - put_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult - put_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult - register_key(key: string, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult - renew_host_value(key: string, current_timestamp: u64) -> DhtResult - republish_key(key: Key, current_timestamp: u64) -> DhtResult - republish_values(key: string, records: []Record, current_timestamp: u64) -> RepublishValuesResult + put_host_value(key: string, value: string, current_timestamp_sec: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult + put_host_value_relay(key: string, value: string, current_timestamp_sec: u64, relay_id: string, weight: u32) -> DhtResult + put_value(key: string, value: string, current_timestamp_sec: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult + put_value_relay(key: string, value: string, current_timestamp_sec: u64, relay_id: string, weight: u32) -> DhtResult + register_key(key: string, current_timestamp_sec: u64, pin: bool, weight: u32) -> DhtResult + renew_host_value(key: string, current_timestamp_sec: u64) -> DhtResult + republish_key(key: Key, current_timestamp_sec: u64) -> DhtResult + republish_values(key: string, records: []Record, current_timestamp_sec: u64) -> RepublishValuesResult + set_expired_timeout(timeout_sec: u64) -> () + set_host_expired_timeout(timeout_sec: u64) -> () + set_stale_timeout(timeout_sec: u64) -> () diff --git a/src/impls.rs b/src/impls.rs index 9977efe..4f3cd62 100644 --- a/src/impls.rs +++ b/src/impls.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE, STALE_VALUE_AGE, EXPIRED_HOST_VALUE_AGE, VALUES_LIMIT}; +use crate::{Config, KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, DEFAULT_EXPIRED_VALUE_AGE, DEFAULT_STALE_VALUE_AGE, DEFAULT_EXPIRED_HOST_VALUE_AGE, VALUES_LIMIT, CONFIG_FILE}; use crate::results::{Key, Record, EvictStaleItem}; use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State, Statement}; use fluence::{CallParameters}; @@ -22,7 +22,8 @@ use eyre; use eyre::ContextCompat; use std::collections::HashMap; use boolinator::Boolinator; - +use toml; +use std::fs; fn get_custom_option(value: String) -> Vec { if value.is_empty() { @@ -54,8 +55,8 @@ fn read_record(statement: &Statement) -> SqliteResult { }) } -fn check_key_existence(connection: &Connection, key: String, current_timestamp: u64) -> SqliteResult<()> { - get_key_metadata_helper(&connection, key, current_timestamp).map(|_| ()) +fn check_key_existence(connection: &Connection, key: String, current_timestamp_sec: u64) -> SqliteResult<()> { + get_key_metadata_helper(&connection, key, current_timestamp_sec).map(|_| ()) } pub(crate) fn check_timestamp_tetraplets(call_parameters: &CallParameters, arg_number: usize) -> eyre::Result<()> { @@ -110,10 +111,30 @@ pub(crate) fn create_values_table() -> bool { ).is_ok() } -fn get_key_metadata_helper(connection: &Connection, key: String, current_timestamp: u64) -> SqliteResult { +pub fn write_config(config: Config) { + fs::write(CONFIG_FILE, toml::to_string(&config).unwrap()).unwrap(); +} + +pub fn load_config() -> Config { + let file_content = fs::read_to_string(CONFIG_FILE).unwrap(); + let config: Config = toml::from_str(&file_content).unwrap(); + config +} + +pub(crate) fn create_config() { + if fs::metadata(CONFIG_FILE).is_err() { + write_config(Config { + expired_timeout: DEFAULT_EXPIRED_VALUE_AGE, + stale_timeout: DEFAULT_STALE_VALUE_AGE, + host_expired_timeout: DEFAULT_EXPIRED_HOST_VALUE_AGE, + }); + } +} + +fn get_key_metadata_helper(connection: &Connection, key: String, current_timestamp_sec: u64) -> SqliteResult { connection.execute( f!("UPDATE {KEYS_TABLE_NAME} \ - SET timestamp_accessed = '{current_timestamp}' \ + SET timestamp_accessed = '{current_timestamp_sec}' \ WHERE key = '{key}'"))?; let mut statement = connection @@ -148,40 +169,40 @@ fn update_key(connection: &Connection, key: String, peer_id: String, timestamp_c } } -pub fn get_key_metadata_impl(key: String, current_timestamp: u64) -> SqliteResult { +pub fn get_key_metadata_impl(key: String, current_timestamp_sec: u64) -> SqliteResult { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; - get_key_metadata_helper(&get_connection()?, key, current_timestamp) + get_key_metadata_helper(&get_connection()?, key, current_timestamp_sec) } -pub fn register_key_impl(key: String, current_timestamp: u64, pin: bool, weight: u32) -> SqliteResult<()> { +pub fn register_key_impl(key: String, current_timestamp_sec: u64, pin: bool, weight: u32) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); let peer_id = call_parameters.init_peer_id.clone(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; - update_key(&get_connection()?, key, peer_id, current_timestamp.clone(), current_timestamp, pin, weight) + update_key(&get_connection()?, key, peer_id, current_timestamp_sec.clone(), current_timestamp_sec, pin, weight) } -pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()> { +pub fn republish_key_impl(key: Key, current_timestamp_sec: u64) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; // Key.pinned is ignored in republish - update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp, false, key.weight) + update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp_sec, false, key.weight) } -pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32, host: bool) -> SqliteResult<()> { +pub fn put_value_impl(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec, service_id: Vec, weight: u32, host: bool) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 2) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + check_key_existence(&connection, key.clone(), current_timestamp_sec.clone())?; let values: Vec = get_values_helper(&connection, key.clone())?.into_iter().filter(|item| item.peer_id == item.set_by).collect(); let min_weight_record = values.iter().last(); @@ -201,7 +222,7 @@ pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_ connection.execute( f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ VALUES ('{key}', '{value}', '{peer_id}', '{set_by}', '{relay_id}',\ - '{service_id}', '{current_timestamp}', '{current_timestamp}', '{weight}')") + '{service_id}', '{current_timestamp_sec}', '{current_timestamp_sec}', '{weight}')") ) } else { Err(SqliteError { code: None, message: Some("values limit is exceeded".to_string()) }) @@ -222,7 +243,7 @@ pub fn get_values_helper(connection: &Connection, key: String) -> SqliteResult SqliteResult> { +pub fn get_values_impl(key: String, current_timestamp_sec: u64) -> SqliteResult> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; @@ -231,19 +252,19 @@ pub fn get_values_impl(key: String, current_timestamp: u64) -> SqliteResult, current_timestamp: u64) -> SqliteResult { +pub fn republish_values_impl(key: String, mut records: Vec, current_timestamp_sec: u64) -> SqliteResult { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 2) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + check_key_existence(&connection, key.clone(), current_timestamp_sec.clone())?; records = merge_impl(get_values_helper(&connection, key.clone())?.into_iter().chain(records.into_iter()).collect())?; @@ -259,7 +280,7 @@ pub fn republish_values_impl(key: String, mut records: Vec, current_time connection.execute( f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ VALUES ('{key}', '{record.value}', '{record.peer_id}', '{record.peer_id}, '{relay_id}',\ - '{service_id}', '{record.timestamp_created}', '{current_timestamp}', '{record.weight}')"))?; + '{service_id}', '{record.timestamp_created}', '{current_timestamp_sec}', '{record.weight}')"))?; updated += connection.changes() as u64; } @@ -267,14 +288,15 @@ pub fn republish_values_impl(key: String, mut records: Vec, current_time Ok(updated) } -pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> { +pub fn clear_expired_impl(current_timestamp_sec: u64) -> SqliteResult<(u64, u64)> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 0) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; + let config = load_config(); - let expired_host_timestamp = current_timestamp - EXPIRED_HOST_VALUE_AGE; - let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE; + let expired_host_timestamp = current_timestamp_sec - config.host_expired_timeout; + let expired_timestamp = current_timestamp_sec - config.expired_timeout; let mut deleted_values = 0u64; let host_id = call_parameters.host_id; connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} \ @@ -301,12 +323,12 @@ pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> { Ok((deleted_keys, deleted_values)) } -pub fn evict_stale_impl(current_timestamp: u64) -> SqliteResult> { +pub fn evict_stale_impl(current_timestamp_sec: u64) -> SqliteResult> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 0) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - let stale_timestamp = current_timestamp - STALE_VALUE_AGE; + let stale_timestamp = current_timestamp_sec - load_config().stale_timeout; let mut stale_keys: Vec = vec![]; let mut statement = @@ -352,32 +374,32 @@ pub fn merge_impl(records: Vec) -> SqliteResult> { Ok(result.into_iter().map(|(_, rec)| rec).collect()) } -pub fn renew_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> { +pub fn renew_host_value_impl(key: String, current_timestamp_sec: u64) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + check_key_existence(&connection, key.clone(), current_timestamp_sec.clone())?; let set_by = call_parameters.init_peer_id; let host_id = call_parameters.host_id; connection.execute( f!("UPDATE {VALUES_TABLE_NAME} \ - SET timestamp_created = '{current_timestamp}', timestamp_accessed = '{current_timestamp}' \ + SET timestamp_created = '{current_timestamp_sec}', timestamp_accessed = '{current_timestamp_sec}' \ WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?; (connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) }) } -pub fn clear_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> { +pub fn clear_host_value_impl(key: String, current_timestamp_sec: u64) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + check_key_existence(&connection, key.clone(), current_timestamp_sec.clone())?; let host_id = call_parameters.host_id; let set_by = call_parameters.init_peer_id; diff --git a/src/main.rs b/src/main.rs index e595736..bf84914 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,11 +20,13 @@ mod tests; mod impls; use crate::results::{Key, GetKeyMetadataResult, DhtResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult}; -use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl}; +use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl, create_config, load_config, write_config}; use fluence::marine; use fluence::module_manifest; +use serde::{Deserialize, Serialize}; + #[macro_use] extern crate fstrings; @@ -32,86 +34,95 @@ module_manifest!(); pub static KEYS_TABLE_NAME: &str = "dht_keys"; pub static VALUES_TABLE_NAME: &str = "dht_values"; +pub static CONFIG_FILE: &str = "/tmp/Config.toml"; pub static DB_PATH: &str = "/tmp/dht.db"; -pub static STALE_VALUE_AGE: u64 = 60 * 60; -pub static EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60; -pub static EXPIRED_HOST_VALUE_AGE: u64 = 10 * EXPIRED_VALUE_AGE; +pub static DEFAULT_STALE_VALUE_AGE: u64 = 60 * 60; +pub static DEFAULT_EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60; +pub static DEFAULT_EXPIRED_HOST_VALUE_AGE: u64 = 10 * DEFAULT_EXPIRED_VALUE_AGE; pub static VALUES_LIMIT: usize = 20; pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer"; pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec"; +#[derive(Deserialize, Serialize)] +pub struct Config { + pub expired_timeout: u64, + pub stale_timeout: u64, + pub host_expired_timeout: u64, +} + fn main() { create_keys_table(); create_values_table(); + create_config(); } // KEYS #[marine] -pub fn register_key(key: String, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult { - register_key_impl(key, current_timestamp, pin, weight).into() +pub fn register_key(key: String, current_timestamp_sec: u64, pin: bool, weight: u32) -> DhtResult { + register_key_impl(key, current_timestamp_sec, pin, weight).into() } #[marine] -pub fn get_key_metadata(key: String, current_timestamp: u64) -> GetKeyMetadataResult { - get_key_metadata_impl(key, current_timestamp).into() +pub fn get_key_metadata(key: String, current_timestamp_sec: u64) -> GetKeyMetadataResult { + get_key_metadata_impl(key, current_timestamp_sec).into() } #[marine] -pub fn republish_key(key: Key, current_timestamp: u64) -> DhtResult { - republish_key_impl(key, current_timestamp).into() +pub fn republish_key(key: Key, current_timestamp_sec: u64) -> DhtResult { + republish_key_impl(key, current_timestamp_sec).into() } // VALUES #[marine] -pub fn put_value(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { - put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, false).into() +pub fn put_value(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp_sec, relay_id, service_id, weight, false).into() } #[marine] -pub fn put_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult { - put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, false).into() +pub fn put_value_relay(key: String, value: String, current_timestamp_sec: u64, relay_id: String, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp_sec, vec![relay_id], vec![], weight, false).into() } #[marine] -pub fn put_host_value(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { - put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, true).into() +pub fn put_host_value(key: String, value: String, current_timestamp_sec: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp_sec, relay_id, service_id, weight, true).into() } #[marine] -pub fn put_host_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult { - put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, true).into() +pub fn put_host_value_relay(key: String, value: String, current_timestamp_sec: u64, relay_id: String, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp_sec, vec![relay_id], vec![], weight, true).into() } #[marine] -pub fn get_values(key: String, current_timestamp: u64) -> GetValuesResult { - get_values_impl(key, current_timestamp).into() +pub fn get_values(key: String, current_timestamp_sec: u64) -> GetValuesResult { + get_values_impl(key, current_timestamp_sec).into() } #[marine] -pub fn republish_values(key: String, records: Vec, current_timestamp: u64) -> RepublishValuesResult { - republish_values_impl(key, records, current_timestamp).into() +pub fn republish_values(key: String, records: Vec, current_timestamp_sec: u64) -> RepublishValuesResult { + republish_values_impl(key, records, current_timestamp_sec).into() } #[marine] -pub fn renew_host_value(key: String, current_timestamp: u64) -> DhtResult { - renew_host_value_impl(key, current_timestamp).into() +pub fn renew_host_value(key: String, current_timestamp_sec: u64) -> DhtResult { + renew_host_value_impl(key, current_timestamp_sec).into() } #[marine] -pub fn clear_host_value(key: String, current_timestamp: u64) -> DhtResult { - clear_host_value_impl(key, current_timestamp).into() +pub fn clear_host_value(key: String, current_timestamp_sec: u64) -> DhtResult { + clear_host_value_impl(key, current_timestamp_sec).into() } // BOTH #[marine] -pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult { - clear_expired_impl(current_timestamp).into() +pub fn clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult { + clear_expired_impl(current_timestamp_sec).into() } #[marine] -pub fn evict_stale(current_timestamp: u64) -> EvictStaleResult { - evict_stale_impl(current_timestamp).into() +pub fn evict_stale(current_timestamp_sec: u64) -> EvictStaleResult { + evict_stale_impl(current_timestamp_sec).into() } #[marine] @@ -134,4 +145,25 @@ pub fn merge_hack_get_values(records: Vec) -> MergeResult { .flatten() .collect() ).into() -} \ No newline at end of file +} + +#[marine] +pub fn set_expired_timeout(timeout_sec: u64) { + let mut config = load_config(); + config.expired_timeout = timeout_sec; + write_config(config); +} + +#[marine] +pub fn set_host_expired_timeout(timeout_sec: u64) { + let mut config = load_config(); + config.host_expired_timeout = timeout_sec; + write_config(config); +} + +#[marine] +pub fn set_stale_timeout(timeout_sec: u64) { + let mut config = load_config(); + config.stale_timeout = timeout_sec; + write_config(config); +} diff --git a/src/tests.rs b/src/tests.rs index 8f5b986..07746fe 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -18,17 +18,22 @@ mod tests { use fluence_test::marine_test; use rusqlite::{Connection}; - use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, EXPIRED_VALUE_AGE, STALE_VALUE_AGE, VALUES_LIMIT}; + use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, DEFAULT_EXPIRED_VALUE_AGE, DEFAULT_STALE_VALUE_AGE, VALUES_LIMIT, CONFIG_FILE}; use fluence::{CallParameters, SecurityTetraplet}; use std::time::SystemTime; + use std::fs; const HOST_ID: &str = "some_host_id"; - fn clear_db() { + fn clear_env() { let connection = Connection::open(DB_PATH).unwrap(); connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME}").as_str(), []).unwrap(); connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME}").as_str(), []).unwrap(); + + if fs::metadata(CONFIG_FILE).is_ok() { + fs::remove_file(CONFIG_FILE).unwrap(); + } } fn get_correct_timestamp_cp(arg_number: usize) -> CallParameters { @@ -113,13 +118,13 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key() { - clear_db(); + clear_env(); register_key_and_check!(aqua_dht, "some_key".to_string(), 123u64, false, 0u32, get_correct_timestamp_cp(1)); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key_empty_cp() { - clear_db(); + clear_env(); let result = aqua_dht.register_key("some_key".to_string(), 123u64, false, 0u32); assert!(!result.success); assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); @@ -127,7 +132,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key_invalid_cp() { - clear_db(); + clear_env(); let mut invalid_cp = CallParameters::default(); invalid_cp.tetraplets.push(vec![]); invalid_cp.tetraplets.push(vec![SecurityTetraplet { @@ -144,7 +149,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key_twice_same_peer_id() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let timestamp = 123u64; let weight = 8u32; @@ -158,7 +163,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key_twice_other_peer_id() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let timestamp = 123u64; let weight = 8u32; @@ -175,7 +180,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn get_key_metadata_not_found() { - clear_db(); + clear_env(); let result = aqua_dht.get_key_metadata_cp("invalid_key".to_string(), 123u64, get_correct_timestamp_cp(1)); assert!(!result.success); assert_eq!(result.error, "not found"); @@ -183,7 +188,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn republish_key_not_exists() { - clear_db(); + clear_env(); let key = aqua_dht_structs::Key { key: "some_key".to_string(), peer_id: "some_peer".to_string(), @@ -197,7 +202,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn republish_key_same_peer_id() { - clear_db(); + clear_env(); let key_str = "some_key".to_string(); let timestamp = 123u64; let weight = 8u32; @@ -219,7 +224,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn republish_key_other_peer_id() { - clear_db(); + clear_env(); let key_str = "some_key".to_string(); let timestamp = 123u64; let weight = 8u32; @@ -243,7 +248,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_empty_cp() { - clear_db(); + clear_env(); let result = aqua_dht.put_value("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32); assert!(!result.success); assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); @@ -251,7 +256,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_invalid_cp() { - clear_db(); + clear_env(); let mut invalid_cp = CallParameters::default(); invalid_cp.tetraplets.push(vec![]); @@ -269,7 +274,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn get_values_empty_cp() { - clear_db(); + clear_env(); let result = aqua_dht.get_values("some_key".to_string(), 123u64); assert!(!result.success); assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); @@ -277,7 +282,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn get_values_invalid_cp() { - clear_db(); + clear_env(); let mut invalid_cp = CallParameters::default(); invalid_cp.tetraplets.push(vec![]); invalid_cp.tetraplets.push(vec![SecurityTetraplet { @@ -294,7 +299,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn get_values_empty() { - clear_db(); + clear_env(); let key = "some_key".to_string(); register_key_and_check!(aqua_dht, key, 123u64, false, 8u32, get_correct_timestamp_cp(1)); @@ -308,7 +313,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_key_not_exists() { - clear_db(); + clear_env(); let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32, get_correct_timestamp_cp(2)); assert!(!result.success); assert_eq!(result.error, "not found"); @@ -316,7 +321,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let value = "some_value".to_string(); @@ -348,7 +353,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_update() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let value1 = "some_value".to_string(); let timestamp = 123u64; @@ -382,7 +387,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_limit() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let value = "some_value".to_string(); let timestamp = 123u64; @@ -424,7 +429,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_multiple_values_for_key() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let value = "some_value".to_string(); let timestamp = 123u64; @@ -469,7 +474,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_empty_cp() { - clear_db(); + clear_env(); let result = aqua_dht.clear_expired(124u64); assert!(!result.success); @@ -478,7 +483,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_invalid_cp() { - clear_db(); + clear_env(); let mut invalid_cp = CallParameters::default(); invalid_cp.tetraplets.push(vec![]); @@ -496,7 +501,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_empty() { - clear_db(); + clear_env(); let result = aqua_dht.clear_expired_cp(124u64, get_correct_timestamp_cp(0)); assert_eq!(result.error, ""); assert!(result.success); @@ -505,12 +510,12 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_key_without_values() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let expired_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); - let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.clear_expired_cp(expired_timestamp + DEFAULT_EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -524,13 +529,13 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_host_key() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let expired_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), true, 8u32, get_correct_timestamp_cp(1)); put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); - let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.clear_expired_cp(expired_timestamp + DEFAULT_EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -540,13 +545,13 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_host_value() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let expired_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); put_host_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); - let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.clear_expired_cp(expired_timestamp + DEFAULT_EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -556,13 +561,42 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_key_with_values() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let expired_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); - let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.clear_expired_cp(expired_timestamp + DEFAULT_EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); + + assert!(result.success); + assert_eq!(result.error, ""); + assert_eq!(result.count_keys, 1); + assert_eq!(result.count_values, 1); + + let result = aqua_dht.get_key_metadata_cp(key.clone(), 123u64, get_correct_timestamp_cp(1)); + assert!(!result.success); + assert_eq!(result.error, "not found"); + + let result = aqua_dht.get_values_cp(key, 123u64, get_correct_timestamp_cp(1)); + + assert!(result.success); + assert_eq!(result.error, ""); + assert_eq!(result.result.len(), 0); + } + + #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] + fn clear_expired_change_timeout() { + clear_env(); + let key = "some_key".to_string(); + let expired_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); + put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); + + let new_expired_timeout = DEFAULT_EXPIRED_VALUE_AGE - 100u64; + aqua_dht.set_expired_timeout(new_expired_timeout.clone()); + let result = aqua_dht.clear_expired_cp(expired_timestamp + new_expired_timeout, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -582,7 +616,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn evict_stale_empty_cp() { - clear_db(); + clear_env(); let result = aqua_dht.evict_stale(124u64); assert!(!result.success); @@ -591,7 +625,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn evict_stale_invalid_cp() { - clear_db(); + clear_env(); let mut invalid_cp = CallParameters::default(); invalid_cp.tetraplets.push(vec![]); @@ -609,7 +643,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn evict_stale_empty() { - clear_db(); + clear_env(); let result = aqua_dht.evict_stale_cp(124u64, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -618,12 +652,12 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn evict_stale_key_without_values() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let stale_timestamp = 0u64; register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); - let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.evict_stale_cp(stale_timestamp + DEFAULT_STALE_VALUE_AGE, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, ""); @@ -639,14 +673,14 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn evict_stale_key_with_values() { - clear_db(); + clear_env(); let key = "some_key".to_string(); let value = "some_value".to_string(); let stale_timestamp = 0u64; register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); put_value_and_check!(aqua_dht, key.clone(), value.clone(), stale_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); - let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0)); + let result = aqua_dht.evict_stale_cp(stale_timestamp + DEFAULT_STALE_VALUE_AGE, get_correct_timestamp_cp(0)); assert!(result.success); assert_eq!(result.error, "");