diff --git a/Cargo.lock b/Cargo.lock index 8e98b7f..b3cbb16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,7 @@ checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b" name = "aqua-dht" version = "0.1.0" dependencies = [ + "boolinator", "eyre", "fluence", "fluence-test", @@ -237,9 +238,9 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52fb27eab85b17fbb9f6fd667089e07d6a2eb8743d02639ee7f6a7a7729c9c94" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ "cfg-if 1.0.0", "crossbeam-utils", @@ -250,11 +251,10 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "autocfg", "cfg-if 1.0.0", "lazy_static", ] diff --git a/Cargo.toml b/Cargo.toml index 84bb0b2..f33c0ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ fluence = "0.6.8" marine-sqlite-connector = "0.4.1" fstrings = "0.2.3" eyre = "0.6.5" +boolinator = "2.4.0" [dev-dependencies] fluence-test = "0.1.9" diff --git a/aqua/aqua-dht.aqua b/aqua/aqua-dht.aqua index c5013c7..81167fe 100644 --- a/aqua/aqua-dht.aqua +++ b/aqua/aqua-dht.aqua @@ -1,49 +1,48 @@ -data RepublishKeyResult: - success: bool - error: string - data Record: value: string peer_id: string + set_by: string relay_id: []string service_id: []string timestamp_created: u64 - -data MergeResult: - success: bool - error: string - result: []Record + weight: u32 data GetValuesResult: success: bool error: string result: []Record +data MergeResult: + success: bool + error: string + result: []Record + +data ClearExpiredResult: + success: bool + error: string + count_keys: u64 + count_values: u64 + data Key: key: string peer_id: string timestamp_created: u64 + pinned: bool + weight: u32 data EvictStaleItem: key: Key records: []Record -data RegisterKeyResult: +data DhtResult: success: bool error: string -data RecordsStruct: - records: []Record - data RepublishValuesResult: success: bool error: string updated: u64 -data PutValueResult: - success: bool - error: string - data EvictStaleResult: success: bool error: string @@ -54,25 +53,20 @@ data GetKeyMetadataResult: error: string key: Key -data ClearExpiredResult: - success: bool - error: string - count_keys: u64 - count_values: u64 - service AquaDHT("aqua-dht"): - republish_key(key: Key, current_timestamp: u64) -> RepublishKeyResult - clear_expired(current_timestamp: u64) -> ClearExpiredResult - merge_two(a: []Record, b: []Record) -> MergeResult - merge_hack_get_values(records: []GetValuesResult) -> MergeResult - merge_hack_struct(records: RecordsStruct) -> MergeResult evict_stale(current_timestamp: u64) -> EvictStaleResult - republish_values(key: string, records: []Record, current_timestamp: u64) -> RepublishValuesResult + put_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult + merge_two(a: []Record, b: []Record) -> MergeResult merge(records: [][]Record) -> MergeResult - put_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string) -> PutValueResult - get_values(key: string, current_timestamp: u64) -> GetValuesResult - merge_wrapped(records: [][][]Record) -> MergeResult - merge_hack(records: [][]Record, hack: string) -> MergeResult - put_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string) -> PutValueResult + put_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult + clear_expired(current_timestamp: u64) -> ClearExpiredResult get_key_metadata(key: string, current_timestamp: u64) -> GetKeyMetadataResult - register_key(key: string, current_timestamp: u64) -> RegisterKeyResult + republish_values(key: string, records: []Record, current_timestamp: u64) -> RepublishValuesResult + republish_key(key: Key, current_timestamp: u64) -> DhtResult + merge_hack_get_values(records: []GetValuesResult) -> MergeResult + put_host_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult + renew_host_value(key: string, current_timestamp: u64) -> DhtResult + get_values(key: string, current_timestamp: u64) -> GetValuesResult + register_key(key: string, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult + clear_host_value(key: string, current_timestamp: u64) -> DhtResult + put_host_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult diff --git a/src/impls.rs b/src/impls.rs index 3cb82e5..c9ec442 100644 --- a/src/impls.rs +++ b/src/impls.rs @@ -14,13 +14,14 @@ * limitations under the License. */ -use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE, STALE_VALUE_AGE}; +use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE, STALE_VALUE_AGE, EXPIRED_HOST_VALUE_AGE, VALUES_LIMIT}; use crate::results::{Key, Record, EvictStaleItem}; -use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State}; +use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State, Statement}; use fluence::{CallParameters}; use eyre; use eyre::ContextCompat; use std::collections::HashMap; +use boolinator::Boolinator; fn get_custom_option(value: String) -> Vec { @@ -31,13 +32,39 @@ fn get_custom_option(value: String) -> Vec { } } +fn read_key(statement: &Statement) -> SqliteResult { + Ok(Key { + key: statement.read::(0)?, + peer_id: statement.read::(1)?, + timestamp_created: statement.read::(2)? as u64, + pinned: statement.read::(3)? != 0, + weight: statement.read::(4)? as u32, + }) +} + +fn read_record(statement: &Statement) -> SqliteResult { + Ok(Record { + value: statement.read::(0)?, + peer_id: statement.read::(1)?, + set_by: statement.read::(2)?, + relay_id: get_custom_option(statement.read::(3)?), + service_id: get_custom_option(statement.read::(4)?), + timestamp_created: statement.read::(5)? as u64, + weight: statement.read::(6)? as u32, + }) +} + +fn check_key_existence(connection: &Connection, key: String, current_timestamp: u64) -> SqliteResult<()> { + get_key_metadata_helper(&connection, key, current_timestamp).map(|_| ()) +} + pub(crate) fn check_timestamp_tetraplets(call_parameters: &CallParameters, arg_number: usize) -> eyre::Result<()> { - let error_msg = "you should use peer.timestamp_ms to pass timestamp"; + let error_msg = "you should use host peer.timestamp_sec to pass timestamp"; let tetraplets = call_parameters.tetraplets.get(arg_number).wrap_err(error_msg)?; let tetraplet = tetraplets.get(0).wrap_err(error_msg)?; (tetraplet.service_id == TRUSTED_TIMESTAMP_SERVICE_ID && - tetraplet.function_name == TRUSTED_TIMESTAMP_FUNCTION_NAME).then(|| ()).wrap_err(error_msg) - // TODO check host_id == peer_pk(???) + tetraplet.function_name == TRUSTED_TIMESTAMP_FUNCTION_NAME + && tetraplet.peer_pk == call_parameters.host_id).then(|| ()).wrap_err(error_msg) } #[inline] @@ -54,7 +81,10 @@ pub(crate) fn create_keys_table() -> bool { key TEXT PRIMARY KEY, timestamp_created INTEGER, timestamp_accessed INTEGER, - peer_id TEXT); + peer_id TEXT, + pinned INTEGER, + weight INTEGER + ); "), ).is_ok() } @@ -68,12 +98,14 @@ pub(crate) fn create_values_table() -> bool { key TEXT, value TEXT, peer_id TEXT, + set_by TEXT, relay_id TEXT, service_id TEXT, timestamp_created INTEGER, timestamp_accessed INTEGER, - PRIMARY KEY (key, peer_id) - ); + weight INTEGER, + PRIMARY KEY (key, peer_id, set_by) + ); "), ).is_ok() } @@ -85,27 +117,31 @@ fn get_key_metadata_helper(connection: &Connection, key: String, current_timesta WHERE key = '{key}'"))?; let mut statement = connection - .prepare(f!("SELECT key, peer_id, timestamp_created \ + .prepare(f!("SELECT key, peer_id, timestamp_created, pinned, weight \ FROM {KEYS_TABLE_NAME} WHERE key = '{key}'"))?; if let State::Row = statement.next()? { - Ok(Key { - key: statement.read::(0)?, - peer_id: statement.read::(1)?, - timestamp_created: statement.read::(2)? as u64, - }) + read_key(&statement) } else { Err(SqliteError { code: None, message: Some("not found".to_string()) }) } } -fn update_key(connection: &Connection, key: String, peer_id: String, timestamp_created: u64, timestamp_accessed: u64) -> SqliteResult<()> { +fn update_key(connection: &Connection, key: String, peer_id: String, timestamp_created: u64, timestamp_accessed: u64, pin: bool, weight: u32) -> SqliteResult<()> { let old_key = get_key_metadata_helper(&connection, key.clone(), timestamp_accessed); + let pinned = pin as i32; + let update_allowed = { + match old_key { + Ok(key) => key.peer_id == peer_id && key.timestamp_created < timestamp_created, + Err(_) => true, + } + }; - // TODO: compare conflicting keys by timestamp_created - if old_key.is_err() || old_key?.peer_id == peer_id { + + if update_allowed { connection.execute(f!(" - INSERT OR REPLACE INTO {KEYS_TABLE_NAME} VALUES ('{key}', '{timestamp_created}', '{timestamp_accessed}', '{peer_id}'); + INSERT OR REPLACE INTO {KEYS_TABLE_NAME} \ + VALUES ('{key}', '{timestamp_created}', '{timestamp_accessed}', '{peer_id}', '{pinned}', '{weight}'); ")) } else { Err(SqliteError { code: None, message: Some("key already exists with different peer_id".to_string()) }) @@ -120,13 +156,13 @@ pub fn get_key_metadata_impl(key: String, current_timestamp: u64) -> SqliteResul get_key_metadata_helper(&get_connection()?, key, current_timestamp) } -pub fn register_key_impl(key: String, current_timestamp: u64) -> SqliteResult<()> { +pub fn register_key_impl(key: String, current_timestamp: u64, pin: bool, weight: u32) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); let peer_id = call_parameters.init_peer_id.clone(); check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; - update_key(&get_connection()?, key, peer_id, current_timestamp.clone(), current_timestamp) + update_key(&get_connection()?, key, peer_id, current_timestamp.clone(), current_timestamp, pin, weight) } pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()> { @@ -134,45 +170,55 @@ pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()> check_timestamp_tetraplets(&call_parameters, 1) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; - update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp) + // Key.pinned is ignored in republish + update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp, false, key.weight) } -pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec) -> SqliteResult<()> { +pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32, host: bool) -> SqliteResult<()> { let call_parameters = fluence::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 2) .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - // checking key for existence - let _key = get_key_metadata_helper(&connection, key.clone(), current_timestamp.clone())?; - let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() }; - let peer_id = call_parameters.init_peer_id; - let service_id = if service_id.len() == 0 { "".to_string() } else { service_id[0].clone() }; + check_key_existence(&connection, key.clone(), current_timestamp.clone())?; - connection.execute( - f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ - VALUES ('{key}', '{value}', '{peer_id}', '{relay_id}',\ - '{service_id}', '{current_timestamp}', '{current_timestamp}')") - ) + let values: Vec = get_values_helper(&connection, key.clone())?.into_iter().filter(|item| item.peer_id == item.set_by).collect(); + let min_weight_record = values.iter().last(); + + if values.len() < VALUES_LIMIT || min_weight_record.unwrap().weight < weight { + let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() }; + let peer_id = if host { call_parameters.host_id } else { call_parameters.init_peer_id.clone() }; + let set_by = call_parameters.init_peer_id; + let service_id = if service_id.len() == 0 { "".to_string() } else { service_id[0].clone() }; + + if values.len() >= VALUES_LIMIT { + if let Some(rec) = min_weight_record { + connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE set_by='{rec.peer_id}'"))?; + } + } + + connection.execute( + f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ + VALUES ('{key}', '{value}', '{peer_id}', '{set_by}', '{relay_id}',\ + '{service_id}', '{current_timestamp}', '{current_timestamp}', '{weight}')") + ) + } else { + Err(SqliteError { code: None, message: Some("values limit is exceeded".to_string()) }) + } } pub fn get_values_helper(connection: &Connection, key: String) -> SqliteResult> { let mut statement = connection.prepare( - f!("SELECT value, peer_id, relay_id, service_id, timestamp_created FROM {VALUES_TABLE_NAME} \ + f!("SELECT value, peer_id, set_by, relay_id, service_id, timestamp_created, weight FROM {VALUES_TABLE_NAME} \ WHERE key = '{key}'"))?; let mut result: Vec = vec![]; while let State::Row = statement.next()? { - result.push(Record { - value: statement.read::(0)?, - peer_id: statement.read::(1)?, - relay_id: get_custom_option(statement.read::(2)?), - service_id: get_custom_option(statement.read::(3)?), - timestamp_created: statement.read::(4)? as u64, - }) + result.push(read_record(&statement)?) } + result.sort_by(|a, b| b.weight.cmp(&a.weight)); Ok(result) } @@ -197,19 +243,18 @@ pub fn republish_values_impl(key: String, records: Vec, current_timestam .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; let connection = get_connection()?; - // checking key for existence - let _key = get_key_metadata_helper(&connection, key.clone(), current_timestamp.clone())?; - + check_key_existence(&connection, key.clone(), current_timestamp.clone())?; // TODO: compare conflicting values by timestamp_created let mut updated = 0u64; for record in records.iter() { - let relay_id = if record.relay_id.is_empty() {"".to_string()} else {record.relay_id[0].clone()}; - let service_id = if record.service_id.is_empty() {"".to_string()} else {record.service_id[0].clone()}; + let relay_id = if record.relay_id.is_empty() { "".to_string() } else { record.relay_id[0].clone() }; + let service_id = if record.service_id.is_empty() { "".to_string() } else { record.service_id[0].clone() }; + let set_by = record.peer_id.clone(); // set_by ignored in republish connection.execute( f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ - VALUES ('{key}', '{record.value}', '{record.peer_id}', '{relay_id}',\ - '{service_id}', '{record.timestamp_created}', '{current_timestamp}')"))?; + VALUES ('{key}', '{record.value}', '{record.peer_id}', '{set_by}', '{relay_id}',\ + '{service_id}', '{record.timestamp_created}', '{current_timestamp}', '{record.weight}')"))?; updated += connection.changes() as u64; } @@ -224,10 +269,23 @@ pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> { let connection = get_connection()?; let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE; - connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp})"))?; - let deleted_values = connection.changes() as u64; - connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?; - let deleted_keys = connection.changes() as u64; + let host_id = call_parameters.host_id; + + connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} \ + WHERE timestamp_created <= {expired_timestamp} AND peer_id !='{host_id}')"))?; + let mut deleted_values = connection.changes() as u64; + + // TODO: ignore keys with host values + connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp} AND NOT pinned"))?; + let mut deleted_keys = connection.changes() as u64; + + let expired_host_timestamp = current_timestamp - EXPIRED_HOST_VALUE_AGE; + connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} \ + WHERE timestamp_created <= {expired_host_timestamp})"))?; + deleted_values += connection.changes() as u64; + + connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_host_timestamp}"))?; + deleted_keys += connection.changes() as u64; Ok((deleted_keys, deleted_values)) } @@ -237,28 +295,29 @@ pub fn evict_stale_impl(current_timestamp: u64) -> SqliteResult = vec![]; let mut statement = connection.prepare( - f!("SELECT key, peer_id, timestamp_created FROM {KEYS_TABLE_NAME} \ + f!("SELECT key, peer_id, timestamp_created, pinned, weight FROM {KEYS_TABLE_NAME} \ WHERE timestamp_accessed <= {stale_timestamp}"))?; while let State::Row = statement.next()? { - stale_keys.push(Key { - key: statement.read::(0)?, - peer_id: statement.read::(1)?, - timestamp_created: statement.read::(2)? as u64, - }); + stale_keys.push(read_key(&statement)?); } let mut results: Vec = vec![]; - for key in stale_keys.iter() { - results.push(EvictStaleItem { key: key.clone(), records: get_values_helper(&connection, key.key.clone())? }); - connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key='{key.key}'"))?; - connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key='{key.key}'"))?; + let host_id = call_parameters.host_id; + for key in stale_keys.into_iter() { + let values = get_values_helper(&connection, key.key.clone())?; + connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key = '{key.key}' AND set_by != '{host_id}'"))?; + + if !key.pinned && !values.iter().any(|val| val.peer_id == host_id) { + connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key='{key.key}'"))?; + } + + results.push(EvictStaleItem { key, records: values }); } Ok(results) @@ -281,3 +340,40 @@ pub fn merge_impl(records: Vec) -> SqliteResult> { Ok(result.into_iter().map(|(_, rec)| rec).collect()) } + +pub fn renew_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> { + let call_parameters = fluence::get_call_parameters(); + check_timestamp_tetraplets(&call_parameters, 0) + .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; + let connection = get_connection()?; + + check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + + let set_by = call_parameters.init_peer_id; + let host_id = call_parameters.host_id; + + connection.execute( + f!("UPDATE {VALUES_TABLE_NAME} \ + SET timestamp_created = '{current_timestamp}', timestamp_accessed = '{current_timestamp}' \ + WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?; + + (connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) }) +} + +pub fn clear_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> { + let call_parameters = fluence::get_call_parameters(); + check_timestamp_tetraplets(&call_parameters, 0) + .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; + let connection = get_connection()?; + + check_key_existence(&connection, key.clone(), current_timestamp.clone())?; + + let host_id = call_parameters.host_id; + let set_by = call_parameters.init_peer_id; + + connection.execute( + f!("DELETE FROM {VALUES_TABLE_NAME} \ + WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?; + + (connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) }) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5dbb76b..e595736 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,8 @@ mod results; mod tests; mod impls; -use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult, RecordsStruct}; -use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl}; +use crate::results::{Key, GetKeyMetadataResult, DhtResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult}; +use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl}; use fluence::marine; use fluence::module_manifest; @@ -35,6 +35,8 @@ pub static VALUES_TABLE_NAME: &str = "dht_values"; pub static DB_PATH: &str = "/tmp/dht.db"; pub static STALE_VALUE_AGE: u64 = 60 * 60; pub static EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60; +pub static EXPIRED_HOST_VALUE_AGE: u64 = 10 * EXPIRED_VALUE_AGE; +pub static VALUES_LIMIT: usize = 20; pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer"; pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec"; @@ -46,8 +48,8 @@ fn main() { // KEYS #[marine] -pub fn register_key(key: String, current_timestamp: u64) -> RegisterKeyResult { - register_key_impl(key, current_timestamp).into() +pub fn register_key(key: String, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult { + register_key_impl(key, current_timestamp, pin, weight).into() } #[marine] @@ -56,19 +58,29 @@ pub fn get_key_metadata(key: String, current_timestamp: u64) -> GetKeyMetadataRe } #[marine] -pub fn republish_key(key: Key, current_timestamp: u64) -> RepublishKeyResult { +pub fn republish_key(key: Key, current_timestamp: u64) -> DhtResult { republish_key_impl(key, current_timestamp).into() } // VALUES #[marine] -pub fn put_value(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec) -> PutValueResult { - put_value_impl(key, value, current_timestamp, relay_id, service_id).into() +pub fn put_value(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, false).into() } #[marine] -pub fn put_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String) -> PutValueResult { - put_value_impl(key, value, current_timestamp, vec![relay_id], vec![]).into() +pub fn put_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, false).into() +} + +#[marine] +pub fn put_host_value(key: String, value: String, current_timestamp: u64, relay_id: Vec, service_id: Vec, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, true).into() +} + +#[marine] +pub fn put_host_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult { + put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, true).into() } #[marine] @@ -81,6 +93,16 @@ pub fn republish_values(key: String, records: Vec, current_timestamp: u6 republish_values_impl(key, records, current_timestamp).into() } +#[marine] +pub fn renew_host_value(key: String, current_timestamp: u64) -> DhtResult { + renew_host_value_impl(key, current_timestamp).into() +} + +#[marine] +pub fn clear_host_value(key: String, current_timestamp: u64) -> DhtResult { + clear_host_value_impl(key, current_timestamp).into() +} + // BOTH #[marine] pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult { @@ -102,22 +124,6 @@ pub fn merge_two(a: Vec, b: Vec) -> MergeResult { merge_impl(a.into_iter().chain(b.into_iter()).collect()).into() } -#[marine] -pub fn merge_hack(records: Vec>, hack: String) -> MergeResult { - print!("{}", hack); - merge(records) -} - -#[marine] -pub fn merge_wrapped(records: Vec>>) -> MergeResult { - merge_impl(records.into_iter().flatten().flatten().collect()).into() -} - -#[marine] -pub fn merge_hack_struct(records: RecordsStruct) -> MergeResult { - merge_impl(records.records).into() -} - #[marine] pub fn merge_hack_get_values(records: Vec) -> MergeResult { merge_impl( diff --git a/src/results.rs b/src/results.rs index fce0d0a..d1e53d6 100644 --- a/src/results.rs +++ b/src/results.rs @@ -19,56 +19,12 @@ use marine_sqlite_connector::Result as SqliteResult; #[marine] #[derive(Debug)] -pub struct RegisterKeyResult { +pub struct DhtResult { pub success: bool, pub error: String, } -impl From> for RegisterKeyResult { - fn from(result: SqliteResult<()>) -> Self { - match result { - Ok(_) => Self { - success: true, - error: "".to_string(), - }, - Err(err) => Self { - success: false, - error: err.to_string(), - }, - } - } -} - -#[marine] -#[derive(Debug)] -pub struct RepublishKeyResult { - pub success: bool, - pub error: String, -} - -impl From> for RepublishKeyResult { - fn from(result: SqliteResult<()>) -> Self { - match result { - Ok(_) => Self { - success: true, - error: "".to_string(), - }, - Err(err) => Self { - success: false, - error: err.to_string(), - }, - } - } -} - -#[marine] -#[derive(Debug)] -pub struct PutValueResult { - pub success: bool, - pub error: String, -} - -impl From> for PutValueResult { +impl From> for DhtResult { fn from(result: SqliteResult<()>) -> Self { match result { Ok(_) => Self { @@ -88,9 +44,11 @@ impl From> for PutValueResult { pub struct Record { pub value: String, pub peer_id: String, + pub set_by: String, pub relay_id: Vec, pub service_id: Vec, pub timestamp_created: u64, + pub weight: u32, } #[marine] @@ -177,6 +135,8 @@ pub struct Key { pub key: String, pub peer_id: String, pub timestamp_created: u64, + pub pinned: bool, + pub weight: u32, } #[marine] @@ -281,8 +241,3 @@ impl From>> for MergeResult { } } } - -#[marine] -pub struct RecordsStruct { - pub records: Vec -} diff --git a/src/tests.rs b/src/tests.rs index ac39f55..f42444d 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -21,6 +21,8 @@ mod tests { use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, EXPIRED_VALUE_AGE, STALE_VALUE_AGE}; use fluence::{CallParameters, SecurityTetraplet}; + const HOST_ID: &str = "some_host_id"; + fn clear_db() { let connection = Connection::open(DB_PATH).unwrap(); @@ -30,13 +32,14 @@ mod tests { fn get_correct_timestamp_cp(arg_number: usize) -> CallParameters { let mut cp = CallParameters::default(); + cp.host_id = HOST_ID.to_string(); for _ in 0..arg_number { cp.tetraplets.push(vec![]); } cp.tetraplets.push(vec![SecurityTetraplet { - peer_pk: "".to_string(), + peer_pk: HOST_ID.to_string(), service_id: TRUSTED_TIMESTAMP_SERVICE_ID.to_string(), function_name: TRUSTED_TIMESTAMP_FUNCTION_NAME.to_string(), json_path: "".to_string(), @@ -46,9 +49,9 @@ mod tests { } macro_rules! put_value_and_check { - ($aqua_dht:expr, $key:expr, $value:expr, $timestamp:expr, $relay_id:expr, $service_id:expr, $cp:expr) => { + ($aqua_dht:expr, $key:expr, $value:expr, $timestamp:expr, $relay_id:expr, $service_id:expr, $weight:expr, $cp:expr) => { { - let result = $aqua_dht.put_value_cp($key.clone(), $value.clone(), $timestamp.clone(), $relay_id.clone(), $service_id.clone(), $cp.clone()); + let result = $aqua_dht.put_value_cp($key.clone(), $value.clone(), $timestamp.clone(), $relay_id.clone(), $service_id.clone(), $weight.clone(), $cp.clone()); assert_eq!(result.error, ""); assert!(result.success); @@ -57,7 +60,7 @@ mod tests { } macro_rules! check_key_metadata { - ($aqua_dht:expr, $key:expr, $timestamp:expr, $peer_id:expr, $current_timestamp:expr, $cp:expr) => { + ($aqua_dht:expr, $key:expr, $timestamp:expr, $peer_id:expr, $current_timestamp:expr, $pinned:expr, $weight: expr, $cp:expr) => { { let result = $aqua_dht.get_key_metadata_cp($key.clone(), $current_timestamp.clone(), $cp.clone()); assert!(result.success); @@ -65,18 +68,20 @@ mod tests { assert_eq!(result.key.key, $key); assert_eq!(result.key.peer_id, $peer_id); assert_eq!(result.key.timestamp_created, $timestamp); + assert_eq!(result.key.pinned, $pinned); + assert_eq!(result.key.weight, $weight); } } } macro_rules! register_key_and_check { - ($aqua_dht:expr, $key:expr, $timestamp:expr, $cp:expr) => { + ($aqua_dht:expr, $key:expr, $timestamp:expr, $pin:expr, $weight: expr, $cp:expr) => { { - let result = $aqua_dht.register_key_cp($key.clone(), $timestamp, $cp.clone()); + let result = $aqua_dht.register_key_cp($key.clone(), $timestamp.clone(), $pin.clone(), $weight.clone(), $cp.clone()); assert_eq!(result.error, ""); assert!(result.success); - check_key_metadata!($aqua_dht, $key, $timestamp, $cp.init_peer_id, $timestamp, $cp); + check_key_metadata!($aqua_dht, $key, $timestamp, $cp.init_peer_id, $timestamp, $pin, $weight, $cp); } } @@ -89,7 +94,7 @@ mod tests { assert_eq!(result.error, ""); assert!(result.success); - check_key_metadata!($aqua_dht, $key.key, $key.timestamp_created, $key.peer_id, $timestamp, $cp); + check_key_metadata!($aqua_dht, $key.key, $key.timestamp_created, $key.peer_id, $timestamp, false, $key.weight, $cp); } } } @@ -97,15 +102,15 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key() { clear_db(); - register_key_and_check!(aqua_dht, "some_key".to_string(), 123u64, get_correct_timestamp_cp(1)); + register_key_and_check!(aqua_dht, "some_key".to_string(), 123u64, false, 0u32, get_correct_timestamp_cp(1)); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn register_key_empty_cp() { clear_db(); - let result = aqua_dht.register_key("some_key".to_string(), 123u64); + let result = aqua_dht.register_key("some_key".to_string(), 123u64, false, 0u32); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -120,9 +125,9 @@ mod tests { json_path: "some json path".to_string(), }]); - let result = aqua_dht.register_key_cp("some_key".to_string(), 123u64, invalid_cp); + let result = aqua_dht.register_key_cp("some_key".to_string(), 123u64, false, 8u32, invalid_cp); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -130,11 +135,13 @@ mod tests { clear_db(); let key = "some_key".to_string(); let timestamp = 123u64; + let weight = 8u32; + let pin = false; let mut cp = get_correct_timestamp_cp(1); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key, timestamp, cp); - register_key_and_check!(aqua_dht, key, timestamp, cp); + register_key_and_check!(aqua_dht, key, timestamp, pin, weight, cp); + register_key_and_check!(aqua_dht, key, timestamp + 1, pin, weight, cp); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -142,12 +149,14 @@ mod tests { clear_db(); let key = "some_key".to_string(); let timestamp = 123u64; + let weight = 8u32; + let pin = false; let mut cp = get_correct_timestamp_cp(1); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key, timestamp, cp); + register_key_and_check!(aqua_dht, key, timestamp, pin, weight, cp); cp.init_peer_id = "other_peer_id".to_string(); - let result = aqua_dht.register_key_cp(key.clone(), timestamp, cp); + let result = aqua_dht.register_key_cp(key.clone(), timestamp, pin, weight, cp); assert!(!result.success); assert_eq!(result.error, "key already exists with different peer_id"); } @@ -167,6 +176,8 @@ mod tests { key: "some_key".to_string(), peer_id: "some_peer".to_string(), timestamp_created: 0, + pinned: false, + weight: 8u32, }; republish_key_and_check!(aqua_dht, key, 123u64, get_correct_timestamp_cp(1)); @@ -177,14 +188,18 @@ mod tests { clear_db(); let key_str = "some_key".to_string(); let timestamp = 123u64; + let weight = 8u32; + let pin = false; let mut cp = get_correct_timestamp_cp(1); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key_str, timestamp, cp); + register_key_and_check!(aqua_dht, key_str, timestamp, pin, weight, cp); let key = aqua_dht_structs::Key { key: key_str.clone(), peer_id: cp.init_peer_id, timestamp_created: timestamp + 1, + pinned: false, + weight: weight.clone(), }; republish_key_and_check!(aqua_dht, key, 123123u64, get_correct_timestamp_cp(1)); @@ -195,14 +210,18 @@ mod tests { clear_db(); let key_str = "some_key".to_string(); let timestamp = 123u64; + let weight = 8u32; + let pin = false; let mut cp = get_correct_timestamp_cp(1); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key_str, timestamp, cp); + register_key_and_check!(aqua_dht, key_str, timestamp, pin, weight, cp); let key = aqua_dht_structs::Key { key: key_str.clone(), peer_id: "OTHER_PEER_ID".to_string(), timestamp_created: timestamp + 1, + pinned: false, + weight: weight.clone(), }; let result = aqua_dht.republish_key_cp(key, 123123u64, get_correct_timestamp_cp(1)); @@ -213,9 +232,9 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_empty_cp() { clear_db(); - let result = aqua_dht.put_value("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![]); + let result = aqua_dht.put_value("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -231,9 +250,9 @@ mod tests { json_path: "some json path".to_string(), }]); - let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], invalid_cp); + let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32, invalid_cp); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -241,7 +260,7 @@ mod tests { clear_db(); let result = aqua_dht.get_values("some_key".to_string(), 123u64); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -258,7 +277,7 @@ mod tests { let result = aqua_dht.get_values_cp("some_key".to_string(), 123u64, invalid_cp); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -266,7 +285,7 @@ mod tests { clear_db(); let key = "some_key".to_string(); - register_key_and_check!(aqua_dht, key, 123u64, get_correct_timestamp_cp(1)); + register_key_and_check!(aqua_dht, key, 123u64, false, 8u32, get_correct_timestamp_cp(1)); let result = aqua_dht.get_values_cp(key, 123u64, get_correct_timestamp_cp(1)); @@ -278,7 +297,7 @@ mod tests { #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn put_value_key_not_exists() { clear_db(); - let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], get_correct_timestamp_cp(2)); + let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32, get_correct_timestamp_cp(2)); assert!(!result.success); assert_eq!(result.error, "not found"); } @@ -290,13 +309,14 @@ mod tests { let key = "some_key".to_string(); let value = "some_value".to_string(); let timestamp = 123u64; + let weight = 8u32; let relay_id = "some_relay".to_string(); let service_id = "some_service_id".to_string(); let mut cp = get_correct_timestamp_cp(2); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1)); - put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp); + register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1)); + put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp); let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1)); @@ -311,6 +331,7 @@ mod tests { assert_eq!(record.relay_id[0], relay_id); assert_eq!(record.service_id[0], service_id); assert_eq!(record.timestamp_created, timestamp); + assert_eq!(record.weight, weight); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -319,16 +340,17 @@ mod tests { let key = "some_key".to_string(); let value1 = "some_value".to_string(); let timestamp = 123u64; + let weight = 8u32; let relay_id = "some_relay".to_string(); let service_id = "some_service_id".to_string(); let mut cp = get_correct_timestamp_cp(2); cp.init_peer_id = "some_peer_id".to_string(); - register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1)); - put_value_and_check!(aqua_dht, key, value1, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp); + register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1)); + put_value_and_check!(aqua_dht, key, value1, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp); let value2 = "other_value".to_string(); - put_value_and_check!(aqua_dht, key, value2, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp); + put_value_and_check!(aqua_dht, key, value2, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp); let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1)); @@ -343,6 +365,7 @@ mod tests { assert_eq!(record.relay_id[0], relay_id); assert_eq!(record.service_id[0], service_id); assert_eq!(record.timestamp_created, timestamp); + assert_eq!(record.weight, weight); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -351,19 +374,20 @@ mod tests { let key = "some_key".to_string(); let value = "some_value".to_string(); let timestamp = 123u64; + let weight = 8u32; let relay_id = "some_relay".to_string(); let service_id = "some_service_id".to_string(); let mut cp = get_correct_timestamp_cp(2); let peer1_id = "some_peer_id".to_string(); let peer2_id = "other_peer_id".to_string(); - register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1)); + register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1)); cp.init_peer_id = peer1_id.clone(); - put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp); + put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp); cp.init_peer_id = peer2_id.clone(); - put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp); + put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp); let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1)); @@ -378,6 +402,7 @@ mod tests { assert_eq!(record.relay_id[0], relay_id); assert_eq!(record.service_id[0], service_id); assert_eq!(record.timestamp_created, timestamp); + assert_eq!(record.weight, weight); let record = &result.result[1]; assert_eq!(record.value, value); @@ -385,6 +410,7 @@ mod tests { assert_eq!(record.relay_id[0], relay_id); assert_eq!(record.service_id[0], service_id); assert_eq!(record.timestamp_created, timestamp); + assert_eq!(record.weight, weight); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -393,7 +419,7 @@ mod tests { let result = aqua_dht.clear_expired(124u64); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -411,15 +437,15 @@ mod tests { let result = aqua_dht.clear_expired_cp(124u64, invalid_cp); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] fn clear_expired_empty() { clear_db(); let result = aqua_dht.clear_expired_cp(124u64, get_correct_timestamp_cp(0)); - assert!(result.success); assert_eq!(result.error, ""); + assert!(result.success); assert_eq!(result.count_keys + result.count_values, 0); } @@ -428,7 +454,7 @@ mod tests { clear_db(); let key = "some_key".to_string(); let expired_timestamp = 0u64; - register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), get_correct_timestamp_cp(1)); + register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); @@ -447,8 +473,8 @@ mod tests { clear_db(); let key = "some_key".to_string(); let expired_timestamp = 0u64; - register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), get_correct_timestamp_cp(1)); - put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], get_correct_timestamp_cp(2)); + register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); + put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0)); @@ -474,7 +500,7 @@ mod tests { let result = aqua_dht.evict_stale(124u64); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -492,7 +518,7 @@ mod tests { let result = aqua_dht.evict_stale_cp(124u64, invalid_cp); assert!(!result.success); - assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp"); + assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp"); } #[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")] @@ -509,7 +535,7 @@ mod tests { clear_db(); let key = "some_key".to_string(); let stale_timestamp = 0u64; - register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), get_correct_timestamp_cp(1)); + register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0)); @@ -531,8 +557,8 @@ mod tests { let key = "some_key".to_string(); let value = "some_value".to_string(); let stale_timestamp = 0u64; - register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), get_correct_timestamp_cp(1)); - put_value_and_check!(aqua_dht, key.clone(), value.clone(), stale_timestamp.clone(), vec![], vec![], get_correct_timestamp_cp(2)); + register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1)); + put_value_and_check!(aqua_dht, key.clone(), value.clone(), stale_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2)); let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0)); @@ -568,7 +594,8 @@ mod tests { relay_id: vec![], service_id: vec![], timestamp_created: 123u64, - + set_by: peer_id.clone(), + weight: 8u32, }; let new_record = aqua_dht_structs::Record { @@ -577,6 +604,8 @@ mod tests { relay_id: vec![], service_id: vec![], timestamp_created: stale_record.timestamp_created + 9999u64, + set_by: peer_id.clone(), + weight: 8u32, }; let result = aqua_dht.merge(vec![vec![stale_record.clone()], vec![new_record.clone()]]); @@ -604,6 +633,8 @@ mod tests { relay_id: vec![], service_id: vec![], timestamp_created: 123u64, + set_by: peer_id1.clone(), + weight: 8u32, }; let record2 = aqua_dht_structs::Record { @@ -612,6 +643,8 @@ mod tests { relay_id: vec![], service_id: vec![], timestamp_created: record1.timestamp_created + 9999u64, + set_by: peer_id2.clone(), + weight: 8u32, }; let result = aqua_dht.merge_two(vec![record1], vec![record2]);