mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-25 18:22:15 +00:00
api rework (#5)
This commit is contained in:
parent
53d9b007ad
commit
bfbc30c88f
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -23,6 +23,7 @@ checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
|
|||||||
name = "aqua-dht"
|
name = "aqua-dht"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"boolinator",
|
||||||
"eyre",
|
"eyre",
|
||||||
"fluence",
|
"fluence",
|
||||||
"fluence-test",
|
"fluence-test",
|
||||||
@ -237,9 +238,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-epoch"
|
name = "crossbeam-epoch"
|
||||||
version = "0.9.4"
|
version = "0.9.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "52fb27eab85b17fbb9f6fd667089e07d6a2eb8743d02639ee7f6a7a7729c9c94"
|
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if 1.0.0",
|
"cfg-if 1.0.0",
|
||||||
"crossbeam-utils",
|
"crossbeam-utils",
|
||||||
@ -250,11 +251,10 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-utils"
|
name = "crossbeam-utils"
|
||||||
version = "0.8.4"
|
version = "0.8.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278"
|
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg",
|
|
||||||
"cfg-if 1.0.0",
|
"cfg-if 1.0.0",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
]
|
]
|
||||||
|
@ -14,6 +14,7 @@ fluence = "0.6.8"
|
|||||||
marine-sqlite-connector = "0.4.1"
|
marine-sqlite-connector = "0.4.1"
|
||||||
fstrings = "0.2.3"
|
fstrings = "0.2.3"
|
||||||
eyre = "0.6.5"
|
eyre = "0.6.5"
|
||||||
|
boolinator = "2.4.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
fluence-test = "0.1.9"
|
fluence-test = "0.1.9"
|
||||||
|
@ -1,49 +1,48 @@
|
|||||||
data RepublishKeyResult:
|
|
||||||
success: bool
|
|
||||||
error: string
|
|
||||||
|
|
||||||
data Record:
|
data Record:
|
||||||
value: string
|
value: string
|
||||||
peer_id: string
|
peer_id: string
|
||||||
|
set_by: string
|
||||||
relay_id: []string
|
relay_id: []string
|
||||||
service_id: []string
|
service_id: []string
|
||||||
timestamp_created: u64
|
timestamp_created: u64
|
||||||
|
weight: u32
|
||||||
data MergeResult:
|
|
||||||
success: bool
|
|
||||||
error: string
|
|
||||||
result: []Record
|
|
||||||
|
|
||||||
data GetValuesResult:
|
data GetValuesResult:
|
||||||
success: bool
|
success: bool
|
||||||
error: string
|
error: string
|
||||||
result: []Record
|
result: []Record
|
||||||
|
|
||||||
|
data MergeResult:
|
||||||
|
success: bool
|
||||||
|
error: string
|
||||||
|
result: []Record
|
||||||
|
|
||||||
|
data ClearExpiredResult:
|
||||||
|
success: bool
|
||||||
|
error: string
|
||||||
|
count_keys: u64
|
||||||
|
count_values: u64
|
||||||
|
|
||||||
data Key:
|
data Key:
|
||||||
key: string
|
key: string
|
||||||
peer_id: string
|
peer_id: string
|
||||||
timestamp_created: u64
|
timestamp_created: u64
|
||||||
|
pinned: bool
|
||||||
|
weight: u32
|
||||||
|
|
||||||
data EvictStaleItem:
|
data EvictStaleItem:
|
||||||
key: Key
|
key: Key
|
||||||
records: []Record
|
records: []Record
|
||||||
|
|
||||||
data RegisterKeyResult:
|
data DhtResult:
|
||||||
success: bool
|
success: bool
|
||||||
error: string
|
error: string
|
||||||
|
|
||||||
data RecordsStruct:
|
|
||||||
records: []Record
|
|
||||||
|
|
||||||
data RepublishValuesResult:
|
data RepublishValuesResult:
|
||||||
success: bool
|
success: bool
|
||||||
error: string
|
error: string
|
||||||
updated: u64
|
updated: u64
|
||||||
|
|
||||||
data PutValueResult:
|
|
||||||
success: bool
|
|
||||||
error: string
|
|
||||||
|
|
||||||
data EvictStaleResult:
|
data EvictStaleResult:
|
||||||
success: bool
|
success: bool
|
||||||
error: string
|
error: string
|
||||||
@ -54,25 +53,20 @@ data GetKeyMetadataResult:
|
|||||||
error: string
|
error: string
|
||||||
key: Key
|
key: Key
|
||||||
|
|
||||||
data ClearExpiredResult:
|
|
||||||
success: bool
|
|
||||||
error: string
|
|
||||||
count_keys: u64
|
|
||||||
count_values: u64
|
|
||||||
|
|
||||||
service AquaDHT("aqua-dht"):
|
service AquaDHT("aqua-dht"):
|
||||||
republish_key(key: Key, current_timestamp: u64) -> RepublishKeyResult
|
|
||||||
clear_expired(current_timestamp: u64) -> ClearExpiredResult
|
|
||||||
merge_two(a: []Record, b: []Record) -> MergeResult
|
|
||||||
merge_hack_get_values(records: []GetValuesResult) -> MergeResult
|
|
||||||
merge_hack_struct(records: RecordsStruct) -> MergeResult
|
|
||||||
evict_stale(current_timestamp: u64) -> EvictStaleResult
|
evict_stale(current_timestamp: u64) -> EvictStaleResult
|
||||||
republish_values(key: string, records: []Record, current_timestamp: u64) -> RepublishValuesResult
|
put_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult
|
||||||
|
merge_two(a: []Record, b: []Record) -> MergeResult
|
||||||
merge(records: [][]Record) -> MergeResult
|
merge(records: [][]Record) -> MergeResult
|
||||||
put_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string) -> PutValueResult
|
put_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult
|
||||||
get_values(key: string, current_timestamp: u64) -> GetValuesResult
|
clear_expired(current_timestamp: u64) -> ClearExpiredResult
|
||||||
merge_wrapped(records: [][][]Record) -> MergeResult
|
|
||||||
merge_hack(records: [][]Record, hack: string) -> MergeResult
|
|
||||||
put_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string) -> PutValueResult
|
|
||||||
get_key_metadata(key: string, current_timestamp: u64) -> GetKeyMetadataResult
|
get_key_metadata(key: string, current_timestamp: u64) -> GetKeyMetadataResult
|
||||||
register_key(key: string, current_timestamp: u64) -> RegisterKeyResult
|
republish_values(key: string, records: []Record, current_timestamp: u64) -> RepublishValuesResult
|
||||||
|
republish_key(key: Key, current_timestamp: u64) -> DhtResult
|
||||||
|
merge_hack_get_values(records: []GetValuesResult) -> MergeResult
|
||||||
|
put_host_value_relay(key: string, value: string, current_timestamp: u64, relay_id: string, weight: u32) -> DhtResult
|
||||||
|
renew_host_value(key: string, current_timestamp: u64) -> DhtResult
|
||||||
|
get_values(key: string, current_timestamp: u64) -> GetValuesResult
|
||||||
|
register_key(key: string, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult
|
||||||
|
clear_host_value(key: string, current_timestamp: u64) -> DhtResult
|
||||||
|
put_host_value(key: string, value: string, current_timestamp: u64, relay_id: []string, service_id: []string, weight: u32) -> DhtResult
|
||||||
|
202
src/impls.rs
202
src/impls.rs
@ -14,13 +14,14 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE, STALE_VALUE_AGE};
|
use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE, STALE_VALUE_AGE, EXPIRED_HOST_VALUE_AGE, VALUES_LIMIT};
|
||||||
use crate::results::{Key, Record, EvictStaleItem};
|
use crate::results::{Key, Record, EvictStaleItem};
|
||||||
use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State};
|
use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State, Statement};
|
||||||
use fluence::{CallParameters};
|
use fluence::{CallParameters};
|
||||||
use eyre;
|
use eyre;
|
||||||
use eyre::ContextCompat;
|
use eyre::ContextCompat;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use boolinator::Boolinator;
|
||||||
|
|
||||||
|
|
||||||
fn get_custom_option(value: String) -> Vec<String> {
|
fn get_custom_option(value: String) -> Vec<String> {
|
||||||
@ -31,13 +32,39 @@ fn get_custom_option(value: String) -> Vec<String> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn read_key(statement: &Statement) -> SqliteResult<Key> {
|
||||||
|
Ok(Key {
|
||||||
|
key: statement.read::<String>(0)?,
|
||||||
|
peer_id: statement.read::<String>(1)?,
|
||||||
|
timestamp_created: statement.read::<i64>(2)? as u64,
|
||||||
|
pinned: statement.read::<i64>(3)? != 0,
|
||||||
|
weight: statement.read::<i64>(4)? as u32,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_record(statement: &Statement) -> SqliteResult<Record> {
|
||||||
|
Ok(Record {
|
||||||
|
value: statement.read::<String>(0)?,
|
||||||
|
peer_id: statement.read::<String>(1)?,
|
||||||
|
set_by: statement.read::<String>(2)?,
|
||||||
|
relay_id: get_custom_option(statement.read::<String>(3)?),
|
||||||
|
service_id: get_custom_option(statement.read::<String>(4)?),
|
||||||
|
timestamp_created: statement.read::<i64>(5)? as u64,
|
||||||
|
weight: statement.read::<i64>(6)? as u32,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_key_existence(connection: &Connection, key: String, current_timestamp: u64) -> SqliteResult<()> {
|
||||||
|
get_key_metadata_helper(&connection, key, current_timestamp).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn check_timestamp_tetraplets(call_parameters: &CallParameters, arg_number: usize) -> eyre::Result<()> {
|
pub(crate) fn check_timestamp_tetraplets(call_parameters: &CallParameters, arg_number: usize) -> eyre::Result<()> {
|
||||||
let error_msg = "you should use peer.timestamp_ms to pass timestamp";
|
let error_msg = "you should use host peer.timestamp_sec to pass timestamp";
|
||||||
let tetraplets = call_parameters.tetraplets.get(arg_number).wrap_err(error_msg)?;
|
let tetraplets = call_parameters.tetraplets.get(arg_number).wrap_err(error_msg)?;
|
||||||
let tetraplet = tetraplets.get(0).wrap_err(error_msg)?;
|
let tetraplet = tetraplets.get(0).wrap_err(error_msg)?;
|
||||||
(tetraplet.service_id == TRUSTED_TIMESTAMP_SERVICE_ID &&
|
(tetraplet.service_id == TRUSTED_TIMESTAMP_SERVICE_ID &&
|
||||||
tetraplet.function_name == TRUSTED_TIMESTAMP_FUNCTION_NAME).then(|| ()).wrap_err(error_msg)
|
tetraplet.function_name == TRUSTED_TIMESTAMP_FUNCTION_NAME
|
||||||
// TODO check host_id == peer_pk(???)
|
&& tetraplet.peer_pk == call_parameters.host_id).then(|| ()).wrap_err(error_msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -54,7 +81,10 @@ pub(crate) fn create_keys_table() -> bool {
|
|||||||
key TEXT PRIMARY KEY,
|
key TEXT PRIMARY KEY,
|
||||||
timestamp_created INTEGER,
|
timestamp_created INTEGER,
|
||||||
timestamp_accessed INTEGER,
|
timestamp_accessed INTEGER,
|
||||||
peer_id TEXT);
|
peer_id TEXT,
|
||||||
|
pinned INTEGER,
|
||||||
|
weight INTEGER
|
||||||
|
);
|
||||||
"),
|
"),
|
||||||
).is_ok()
|
).is_ok()
|
||||||
}
|
}
|
||||||
@ -68,11 +98,13 @@ pub(crate) fn create_values_table() -> bool {
|
|||||||
key TEXT,
|
key TEXT,
|
||||||
value TEXT,
|
value TEXT,
|
||||||
peer_id TEXT,
|
peer_id TEXT,
|
||||||
|
set_by TEXT,
|
||||||
relay_id TEXT,
|
relay_id TEXT,
|
||||||
service_id TEXT,
|
service_id TEXT,
|
||||||
timestamp_created INTEGER,
|
timestamp_created INTEGER,
|
||||||
timestamp_accessed INTEGER,
|
timestamp_accessed INTEGER,
|
||||||
PRIMARY KEY (key, peer_id)
|
weight INTEGER,
|
||||||
|
PRIMARY KEY (key, peer_id, set_by)
|
||||||
);
|
);
|
||||||
"),
|
"),
|
||||||
).is_ok()
|
).is_ok()
|
||||||
@ -85,27 +117,31 @@ fn get_key_metadata_helper(connection: &Connection, key: String, current_timesta
|
|||||||
WHERE key = '{key}'"))?;
|
WHERE key = '{key}'"))?;
|
||||||
|
|
||||||
let mut statement = connection
|
let mut statement = connection
|
||||||
.prepare(f!("SELECT key, peer_id, timestamp_created \
|
.prepare(f!("SELECT key, peer_id, timestamp_created, pinned, weight \
|
||||||
FROM {KEYS_TABLE_NAME} WHERE key = '{key}'"))?;
|
FROM {KEYS_TABLE_NAME} WHERE key = '{key}'"))?;
|
||||||
|
|
||||||
if let State::Row = statement.next()? {
|
if let State::Row = statement.next()? {
|
||||||
Ok(Key {
|
read_key(&statement)
|
||||||
key: statement.read::<String>(0)?,
|
|
||||||
peer_id: statement.read::<String>(1)?,
|
|
||||||
timestamp_created: statement.read::<i64>(2)? as u64,
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
Err(SqliteError { code: None, message: Some("not found".to_string()) })
|
Err(SqliteError { code: None, message: Some("not found".to_string()) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_key(connection: &Connection, key: String, peer_id: String, timestamp_created: u64, timestamp_accessed: u64) -> SqliteResult<()> {
|
fn update_key(connection: &Connection, key: String, peer_id: String, timestamp_created: u64, timestamp_accessed: u64, pin: bool, weight: u32) -> SqliteResult<()> {
|
||||||
let old_key = get_key_metadata_helper(&connection, key.clone(), timestamp_accessed);
|
let old_key = get_key_metadata_helper(&connection, key.clone(), timestamp_accessed);
|
||||||
|
let pinned = pin as i32;
|
||||||
|
let update_allowed = {
|
||||||
|
match old_key {
|
||||||
|
Ok(key) => key.peer_id == peer_id && key.timestamp_created < timestamp_created,
|
||||||
|
Err(_) => true,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: compare conflicting keys by timestamp_created
|
|
||||||
if old_key.is_err() || old_key?.peer_id == peer_id {
|
if update_allowed {
|
||||||
connection.execute(f!("
|
connection.execute(f!("
|
||||||
INSERT OR REPLACE INTO {KEYS_TABLE_NAME} VALUES ('{key}', '{timestamp_created}', '{timestamp_accessed}', '{peer_id}');
|
INSERT OR REPLACE INTO {KEYS_TABLE_NAME} \
|
||||||
|
VALUES ('{key}', '{timestamp_created}', '{timestamp_accessed}', '{peer_id}', '{pinned}', '{weight}');
|
||||||
"))
|
"))
|
||||||
} else {
|
} else {
|
||||||
Err(SqliteError { code: None, message: Some("key already exists with different peer_id".to_string()) })
|
Err(SqliteError { code: None, message: Some("key already exists with different peer_id".to_string()) })
|
||||||
@ -120,13 +156,13 @@ pub fn get_key_metadata_impl(key: String, current_timestamp: u64) -> SqliteResul
|
|||||||
get_key_metadata_helper(&get_connection()?, key, current_timestamp)
|
get_key_metadata_helper(&get_connection()?, key, current_timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_key_impl(key: String, current_timestamp: u64) -> SqliteResult<()> {
|
pub fn register_key_impl(key: String, current_timestamp: u64, pin: bool, weight: u32) -> SqliteResult<()> {
|
||||||
let call_parameters = fluence::get_call_parameters();
|
let call_parameters = fluence::get_call_parameters();
|
||||||
let peer_id = call_parameters.init_peer_id.clone();
|
let peer_id = call_parameters.init_peer_id.clone();
|
||||||
check_timestamp_tetraplets(&call_parameters, 1)
|
check_timestamp_tetraplets(&call_parameters, 1)
|
||||||
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
|
|
||||||
update_key(&get_connection()?, key, peer_id, current_timestamp.clone(), current_timestamp)
|
update_key(&get_connection()?, key, peer_id, current_timestamp.clone(), current_timestamp, pin, weight)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()> {
|
pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()> {
|
||||||
@ -134,45 +170,55 @@ pub fn republish_key_impl(key: Key, current_timestamp: u64) -> SqliteResult<()>
|
|||||||
check_timestamp_tetraplets(&call_parameters, 1)
|
check_timestamp_tetraplets(&call_parameters, 1)
|
||||||
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
|
|
||||||
update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp)
|
// Key.pinned is ignored in republish
|
||||||
|
update_key(&get_connection()?, key.key, key.peer_id, key.timestamp_created, current_timestamp, false, key.weight)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_id: Vec<String>, service_id: Vec<String>) -> SqliteResult<()> {
|
pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32, host: bool) -> SqliteResult<()> {
|
||||||
let call_parameters = fluence::get_call_parameters();
|
let call_parameters = fluence::get_call_parameters();
|
||||||
check_timestamp_tetraplets(&call_parameters, 2)
|
check_timestamp_tetraplets(&call_parameters, 2)
|
||||||
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
|
|
||||||
let connection = get_connection()?;
|
let connection = get_connection()?;
|
||||||
|
|
||||||
// checking key for existence
|
check_key_existence(&connection, key.clone(), current_timestamp.clone())?;
|
||||||
let _key = get_key_metadata_helper(&connection, key.clone(), current_timestamp.clone())?;
|
|
||||||
|
let values: Vec<Record> = get_values_helper(&connection, key.clone())?.into_iter().filter(|item| item.peer_id == item.set_by).collect();
|
||||||
|
let min_weight_record = values.iter().last();
|
||||||
|
|
||||||
|
if values.len() < VALUES_LIMIT || min_weight_record.unwrap().weight < weight {
|
||||||
let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() };
|
let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() };
|
||||||
let peer_id = call_parameters.init_peer_id;
|
let peer_id = if host { call_parameters.host_id } else { call_parameters.init_peer_id.clone() };
|
||||||
|
let set_by = call_parameters.init_peer_id;
|
||||||
let service_id = if service_id.len() == 0 { "".to_string() } else { service_id[0].clone() };
|
let service_id = if service_id.len() == 0 { "".to_string() } else { service_id[0].clone() };
|
||||||
|
|
||||||
|
if values.len() >= VALUES_LIMIT {
|
||||||
|
if let Some(rec) = min_weight_record {
|
||||||
|
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE set_by='{rec.peer_id}'"))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
connection.execute(
|
connection.execute(
|
||||||
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
|
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
|
||||||
VALUES ('{key}', '{value}', '{peer_id}', '{relay_id}',\
|
VALUES ('{key}', '{value}', '{peer_id}', '{set_by}', '{relay_id}',\
|
||||||
'{service_id}', '{current_timestamp}', '{current_timestamp}')")
|
'{service_id}', '{current_timestamp}', '{current_timestamp}', '{weight}')")
|
||||||
)
|
)
|
||||||
|
} else {
|
||||||
|
Err(SqliteError { code: None, message: Some("values limit is exceeded".to_string()) })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_values_helper(connection: &Connection, key: String) -> SqliteResult<Vec<Record>> {
|
pub fn get_values_helper(connection: &Connection, key: String) -> SqliteResult<Vec<Record>> {
|
||||||
let mut statement = connection.prepare(
|
let mut statement = connection.prepare(
|
||||||
f!("SELECT value, peer_id, relay_id, service_id, timestamp_created FROM {VALUES_TABLE_NAME} \
|
f!("SELECT value, peer_id, set_by, relay_id, service_id, timestamp_created, weight FROM {VALUES_TABLE_NAME} \
|
||||||
WHERE key = '{key}'"))?;
|
WHERE key = '{key}'"))?;
|
||||||
let mut result: Vec<Record> = vec![];
|
let mut result: Vec<Record> = vec![];
|
||||||
|
|
||||||
while let State::Row = statement.next()? {
|
while let State::Row = statement.next()? {
|
||||||
result.push(Record {
|
result.push(read_record(&statement)?)
|
||||||
value: statement.read::<String>(0)?,
|
|
||||||
peer_id: statement.read::<String>(1)?,
|
|
||||||
relay_id: get_custom_option(statement.read::<String>(2)?),
|
|
||||||
service_id: get_custom_option(statement.read::<String>(3)?),
|
|
||||||
timestamp_created: statement.read::<i64>(4)? as u64,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result.sort_by(|a, b| b.weight.cmp(&a.weight));
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,19 +243,18 @@ pub fn republish_values_impl(key: String, records: Vec<Record>, current_timestam
|
|||||||
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
let connection = get_connection()?;
|
let connection = get_connection()?;
|
||||||
|
|
||||||
// checking key for existence
|
check_key_existence(&connection, key.clone(), current_timestamp.clone())?;
|
||||||
let _key = get_key_metadata_helper(&connection, key.clone(), current_timestamp.clone())?;
|
|
||||||
|
|
||||||
|
|
||||||
// TODO: compare conflicting values by timestamp_created
|
// TODO: compare conflicting values by timestamp_created
|
||||||
let mut updated = 0u64;
|
let mut updated = 0u64;
|
||||||
for record in records.iter() {
|
for record in records.iter() {
|
||||||
let relay_id = if record.relay_id.is_empty() { "".to_string() } else { record.relay_id[0].clone() };
|
let relay_id = if record.relay_id.is_empty() { "".to_string() } else { record.relay_id[0].clone() };
|
||||||
let service_id = if record.service_id.is_empty() { "".to_string() } else { record.service_id[0].clone() };
|
let service_id = if record.service_id.is_empty() { "".to_string() } else { record.service_id[0].clone() };
|
||||||
|
let set_by = record.peer_id.clone(); // set_by ignored in republish
|
||||||
connection.execute(
|
connection.execute(
|
||||||
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
|
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
|
||||||
VALUES ('{key}', '{record.value}', '{record.peer_id}', '{relay_id}',\
|
VALUES ('{key}', '{record.value}', '{record.peer_id}', '{set_by}', '{relay_id}',\
|
||||||
'{service_id}', '{record.timestamp_created}', '{current_timestamp}')"))?;
|
'{service_id}', '{record.timestamp_created}', '{current_timestamp}', '{record.weight}')"))?;
|
||||||
|
|
||||||
updated += connection.changes() as u64;
|
updated += connection.changes() as u64;
|
||||||
}
|
}
|
||||||
@ -224,10 +269,23 @@ pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> {
|
|||||||
let connection = get_connection()?;
|
let connection = get_connection()?;
|
||||||
|
|
||||||
let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE;
|
let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE;
|
||||||
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp})"))?;
|
let host_id = call_parameters.host_id;
|
||||||
let deleted_values = connection.changes() as u64;
|
|
||||||
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?;
|
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} \
|
||||||
let deleted_keys = connection.changes() as u64;
|
WHERE timestamp_created <= {expired_timestamp} AND peer_id !='{host_id}')"))?;
|
||||||
|
let mut deleted_values = connection.changes() as u64;
|
||||||
|
|
||||||
|
// TODO: ignore keys with host values
|
||||||
|
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp} AND NOT pinned"))?;
|
||||||
|
let mut deleted_keys = connection.changes() as u64;
|
||||||
|
|
||||||
|
let expired_host_timestamp = current_timestamp - EXPIRED_HOST_VALUE_AGE;
|
||||||
|
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key IN (SELECT key FROM {KEYS_TABLE_NAME} \
|
||||||
|
WHERE timestamp_created <= {expired_host_timestamp})"))?;
|
||||||
|
deleted_values += connection.changes() as u64;
|
||||||
|
|
||||||
|
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_host_timestamp}"))?;
|
||||||
|
deleted_keys += connection.changes() as u64;
|
||||||
|
|
||||||
Ok((deleted_keys, deleted_values))
|
Ok((deleted_keys, deleted_values))
|
||||||
}
|
}
|
||||||
@ -237,30 +295,31 @@ pub fn evict_stale_impl(current_timestamp: u64) -> SqliteResult<Vec<EvictStaleIt
|
|||||||
check_timestamp_tetraplets(&call_parameters, 0)
|
check_timestamp_tetraplets(&call_parameters, 0)
|
||||||
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
let connection = get_connection()?;
|
let connection = get_connection()?;
|
||||||
|
|
||||||
let stale_timestamp = current_timestamp - STALE_VALUE_AGE;
|
let stale_timestamp = current_timestamp - STALE_VALUE_AGE;
|
||||||
|
|
||||||
let mut stale_keys: Vec<Key> = vec![];
|
let mut stale_keys: Vec<Key> = vec![];
|
||||||
let mut statement =
|
let mut statement =
|
||||||
connection.prepare(
|
connection.prepare(
|
||||||
f!("SELECT key, peer_id, timestamp_created FROM {KEYS_TABLE_NAME} \
|
f!("SELECT key, peer_id, timestamp_created, pinned, weight FROM {KEYS_TABLE_NAME} \
|
||||||
WHERE timestamp_accessed <= {stale_timestamp}"))?;
|
WHERE timestamp_accessed <= {stale_timestamp}"))?;
|
||||||
|
|
||||||
while let State::Row = statement.next()? {
|
while let State::Row = statement.next()? {
|
||||||
stale_keys.push(Key {
|
stale_keys.push(read_key(&statement)?);
|
||||||
key: statement.read::<String>(0)?,
|
|
||||||
peer_id: statement.read::<String>(1)?,
|
|
||||||
timestamp_created: statement.read::<i64>(2)? as u64,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut results: Vec<EvictStaleItem> = vec![];
|
let mut results: Vec<EvictStaleItem> = vec![];
|
||||||
for key in stale_keys.iter() {
|
let host_id = call_parameters.host_id;
|
||||||
results.push(EvictStaleItem { key: key.clone(), records: get_values_helper(&connection, key.key.clone())? });
|
for key in stale_keys.into_iter() {
|
||||||
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key='{key.key}'"))?;
|
let values = get_values_helper(&connection, key.key.clone())?;
|
||||||
|
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE key = '{key.key}' AND set_by != '{host_id}'"))?;
|
||||||
|
|
||||||
|
if !key.pinned && !values.iter().any(|val| val.peer_id == host_id) {
|
||||||
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key='{key.key}'"))?;
|
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key='{key.key}'"))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
results.push(EvictStaleItem { key, records: values });
|
||||||
|
}
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,3 +340,40 @@ pub fn merge_impl(records: Vec<Record>) -> SqliteResult<Vec<Record>> {
|
|||||||
|
|
||||||
Ok(result.into_iter().map(|(_, rec)| rec).collect())
|
Ok(result.into_iter().map(|(_, rec)| rec).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn renew_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> {
|
||||||
|
let call_parameters = fluence::get_call_parameters();
|
||||||
|
check_timestamp_tetraplets(&call_parameters, 0)
|
||||||
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
|
let connection = get_connection()?;
|
||||||
|
|
||||||
|
check_key_existence(&connection, key.clone(), current_timestamp.clone())?;
|
||||||
|
|
||||||
|
let set_by = call_parameters.init_peer_id;
|
||||||
|
let host_id = call_parameters.host_id;
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
f!("UPDATE {VALUES_TABLE_NAME} \
|
||||||
|
SET timestamp_created = '{current_timestamp}', timestamp_accessed = '{current_timestamp}' \
|
||||||
|
WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?;
|
||||||
|
|
||||||
|
(connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear_host_value_impl(key: String, current_timestamp: u64) -> SqliteResult<()> {
|
||||||
|
let call_parameters = fluence::get_call_parameters();
|
||||||
|
check_timestamp_tetraplets(&call_parameters, 0)
|
||||||
|
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
|
||||||
|
let connection = get_connection()?;
|
||||||
|
|
||||||
|
check_key_existence(&connection, key.clone(), current_timestamp.clone())?;
|
||||||
|
|
||||||
|
let host_id = call_parameters.host_id;
|
||||||
|
let set_by = call_parameters.init_peer_id;
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
f!("DELETE FROM {VALUES_TABLE_NAME} \
|
||||||
|
WHERE key = '{key}' AND set_by = '{set_by}' AND peer_id = '{host_id}'"))?;
|
||||||
|
|
||||||
|
(connection.changes() == 1).as_result((), SqliteError { code: None, message: Some("host value not found".to_string()) })
|
||||||
|
}
|
56
src/main.rs
56
src/main.rs
@ -19,8 +19,8 @@ mod results;
|
|||||||
mod tests;
|
mod tests;
|
||||||
mod impls;
|
mod impls;
|
||||||
|
|
||||||
use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult, RecordsStruct};
|
use crate::results::{Key, GetKeyMetadataResult, DhtResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult, EvictStaleResult, MergeResult};
|
||||||
use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl};
|
use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl, evict_stale_impl, merge_impl, renew_host_value_impl, clear_host_value_impl};
|
||||||
|
|
||||||
use fluence::marine;
|
use fluence::marine;
|
||||||
use fluence::module_manifest;
|
use fluence::module_manifest;
|
||||||
@ -35,6 +35,8 @@ pub static VALUES_TABLE_NAME: &str = "dht_values";
|
|||||||
pub static DB_PATH: &str = "/tmp/dht.db";
|
pub static DB_PATH: &str = "/tmp/dht.db";
|
||||||
pub static STALE_VALUE_AGE: u64 = 60 * 60;
|
pub static STALE_VALUE_AGE: u64 = 60 * 60;
|
||||||
pub static EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60;
|
pub static EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60;
|
||||||
|
pub static EXPIRED_HOST_VALUE_AGE: u64 = 10 * EXPIRED_VALUE_AGE;
|
||||||
|
pub static VALUES_LIMIT: usize = 20;
|
||||||
|
|
||||||
pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer";
|
pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer";
|
||||||
pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec";
|
pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec";
|
||||||
@ -46,8 +48,8 @@ fn main() {
|
|||||||
|
|
||||||
// KEYS
|
// KEYS
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn register_key(key: String, current_timestamp: u64) -> RegisterKeyResult {
|
pub fn register_key(key: String, current_timestamp: u64, pin: bool, weight: u32) -> DhtResult {
|
||||||
register_key_impl(key, current_timestamp).into()
|
register_key_impl(key, current_timestamp, pin, weight).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
@ -56,19 +58,29 @@ pub fn get_key_metadata(key: String, current_timestamp: u64) -> GetKeyMetadataRe
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn republish_key(key: Key, current_timestamp: u64) -> RepublishKeyResult {
|
pub fn republish_key(key: Key, current_timestamp: u64) -> DhtResult {
|
||||||
republish_key_impl(key, current_timestamp).into()
|
republish_key_impl(key, current_timestamp).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
// VALUES
|
// VALUES
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn put_value(key: String, value: String, current_timestamp: u64, relay_id: Vec<String>, service_id: Vec<String>) -> PutValueResult {
|
pub fn put_value(key: String, value: String, current_timestamp: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32) -> DhtResult {
|
||||||
put_value_impl(key, value, current_timestamp, relay_id, service_id).into()
|
put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, false).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn put_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String) -> PutValueResult {
|
pub fn put_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult {
|
||||||
put_value_impl(key, value, current_timestamp, vec![relay_id], vec![]).into()
|
put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, false).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[marine]
|
||||||
|
pub fn put_host_value(key: String, value: String, current_timestamp: u64, relay_id: Vec<String>, service_id: Vec<String>, weight: u32) -> DhtResult {
|
||||||
|
put_value_impl(key, value, current_timestamp, relay_id, service_id, weight, true).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[marine]
|
||||||
|
pub fn put_host_value_relay(key: String, value: String, current_timestamp: u64, relay_id: String, weight: u32) -> DhtResult {
|
||||||
|
put_value_impl(key, value, current_timestamp, vec![relay_id], vec![], weight, true).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
@ -81,6 +93,16 @@ pub fn republish_values(key: String, records: Vec<Record>, current_timestamp: u6
|
|||||||
republish_values_impl(key, records, current_timestamp).into()
|
republish_values_impl(key, records, current_timestamp).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[marine]
|
||||||
|
pub fn renew_host_value(key: String, current_timestamp: u64) -> DhtResult {
|
||||||
|
renew_host_value_impl(key, current_timestamp).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[marine]
|
||||||
|
pub fn clear_host_value(key: String, current_timestamp: u64) -> DhtResult {
|
||||||
|
clear_host_value_impl(key, current_timestamp).into()
|
||||||
|
}
|
||||||
|
|
||||||
// BOTH
|
// BOTH
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult {
|
pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult {
|
||||||
@ -102,22 +124,6 @@ pub fn merge_two(a: Vec<Record>, b: Vec<Record>) -> MergeResult {
|
|||||||
merge_impl(a.into_iter().chain(b.into_iter()).collect()).into()
|
merge_impl(a.into_iter().chain(b.into_iter()).collect()).into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
|
||||||
pub fn merge_hack(records: Vec<Vec<Record>>, hack: String) -> MergeResult {
|
|
||||||
print!("{}", hack);
|
|
||||||
merge(records)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[marine]
|
|
||||||
pub fn merge_wrapped(records: Vec<Vec<Vec<Record>>>) -> MergeResult {
|
|
||||||
merge_impl(records.into_iter().flatten().flatten().collect()).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[marine]
|
|
||||||
pub fn merge_hack_struct(records: RecordsStruct) -> MergeResult {
|
|
||||||
merge_impl(records.records).into()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
pub fn merge_hack_get_values(records: Vec<GetValuesResult>) -> MergeResult {
|
pub fn merge_hack_get_values(records: Vec<GetValuesResult>) -> MergeResult {
|
||||||
merge_impl(
|
merge_impl(
|
||||||
|
@ -19,56 +19,12 @@ use marine_sqlite_connector::Result as SqliteResult;
|
|||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct RegisterKeyResult {
|
pub struct DhtResult {
|
||||||
pub success: bool,
|
pub success: bool,
|
||||||
pub error: String,
|
pub error: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<SqliteResult<()>> for RegisterKeyResult {
|
impl From<SqliteResult<()>> for DhtResult {
|
||||||
fn from(result: SqliteResult<()>) -> Self {
|
|
||||||
match result {
|
|
||||||
Ok(_) => Self {
|
|
||||||
success: true,
|
|
||||||
error: "".to_string(),
|
|
||||||
},
|
|
||||||
Err(err) => Self {
|
|
||||||
success: false,
|
|
||||||
error: err.to_string(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[marine]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct RepublishKeyResult {
|
|
||||||
pub success: bool,
|
|
||||||
pub error: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<SqliteResult<()>> for RepublishKeyResult {
|
|
||||||
fn from(result: SqliteResult<()>) -> Self {
|
|
||||||
match result {
|
|
||||||
Ok(_) => Self {
|
|
||||||
success: true,
|
|
||||||
error: "".to_string(),
|
|
||||||
},
|
|
||||||
Err(err) => Self {
|
|
||||||
success: false,
|
|
||||||
error: err.to_string(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[marine]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct PutValueResult {
|
|
||||||
pub success: bool,
|
|
||||||
pub error: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<SqliteResult<()>> for PutValueResult {
|
|
||||||
fn from(result: SqliteResult<()>) -> Self {
|
fn from(result: SqliteResult<()>) -> Self {
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => Self {
|
Ok(_) => Self {
|
||||||
@ -88,9 +44,11 @@ impl From<SqliteResult<()>> for PutValueResult {
|
|||||||
pub struct Record {
|
pub struct Record {
|
||||||
pub value: String,
|
pub value: String,
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
|
pub set_by: String,
|
||||||
pub relay_id: Vec<String>,
|
pub relay_id: Vec<String>,
|
||||||
pub service_id: Vec<String>,
|
pub service_id: Vec<String>,
|
||||||
pub timestamp_created: u64,
|
pub timestamp_created: u64,
|
||||||
|
pub weight: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
@ -177,6 +135,8 @@ pub struct Key {
|
|||||||
pub key: String,
|
pub key: String,
|
||||||
pub peer_id: String,
|
pub peer_id: String,
|
||||||
pub timestamp_created: u64,
|
pub timestamp_created: u64,
|
||||||
|
pub pinned: bool,
|
||||||
|
pub weight: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
#[marine]
|
||||||
@ -281,8 +241,3 @@ impl From<SqliteResult<Vec<Record>>> for MergeResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine]
|
|
||||||
pub struct RecordsStruct {
|
|
||||||
pub records: Vec<Record>
|
|
||||||
}
|
|
||||||
|
127
src/tests.rs
127
src/tests.rs
@ -21,6 +21,8 @@ mod tests {
|
|||||||
use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, EXPIRED_VALUE_AGE, STALE_VALUE_AGE};
|
use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, EXPIRED_VALUE_AGE, STALE_VALUE_AGE};
|
||||||
use fluence::{CallParameters, SecurityTetraplet};
|
use fluence::{CallParameters, SecurityTetraplet};
|
||||||
|
|
||||||
|
const HOST_ID: &str = "some_host_id";
|
||||||
|
|
||||||
fn clear_db() {
|
fn clear_db() {
|
||||||
let connection = Connection::open(DB_PATH).unwrap();
|
let connection = Connection::open(DB_PATH).unwrap();
|
||||||
|
|
||||||
@ -30,13 +32,14 @@ mod tests {
|
|||||||
|
|
||||||
fn get_correct_timestamp_cp(arg_number: usize) -> CallParameters {
|
fn get_correct_timestamp_cp(arg_number: usize) -> CallParameters {
|
||||||
let mut cp = CallParameters::default();
|
let mut cp = CallParameters::default();
|
||||||
|
cp.host_id = HOST_ID.to_string();
|
||||||
|
|
||||||
for _ in 0..arg_number {
|
for _ in 0..arg_number {
|
||||||
cp.tetraplets.push(vec![]);
|
cp.tetraplets.push(vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.tetraplets.push(vec![SecurityTetraplet {
|
cp.tetraplets.push(vec![SecurityTetraplet {
|
||||||
peer_pk: "".to_string(),
|
peer_pk: HOST_ID.to_string(),
|
||||||
service_id: TRUSTED_TIMESTAMP_SERVICE_ID.to_string(),
|
service_id: TRUSTED_TIMESTAMP_SERVICE_ID.to_string(),
|
||||||
function_name: TRUSTED_TIMESTAMP_FUNCTION_NAME.to_string(),
|
function_name: TRUSTED_TIMESTAMP_FUNCTION_NAME.to_string(),
|
||||||
json_path: "".to_string(),
|
json_path: "".to_string(),
|
||||||
@ -46,9 +49,9 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! put_value_and_check {
|
macro_rules! put_value_and_check {
|
||||||
($aqua_dht:expr, $key:expr, $value:expr, $timestamp:expr, $relay_id:expr, $service_id:expr, $cp:expr) => {
|
($aqua_dht:expr, $key:expr, $value:expr, $timestamp:expr, $relay_id:expr, $service_id:expr, $weight:expr, $cp:expr) => {
|
||||||
{
|
{
|
||||||
let result = $aqua_dht.put_value_cp($key.clone(), $value.clone(), $timestamp.clone(), $relay_id.clone(), $service_id.clone(), $cp.clone());
|
let result = $aqua_dht.put_value_cp($key.clone(), $value.clone(), $timestamp.clone(), $relay_id.clone(), $service_id.clone(), $weight.clone(), $cp.clone());
|
||||||
|
|
||||||
assert_eq!(result.error, "");
|
assert_eq!(result.error, "");
|
||||||
assert!(result.success);
|
assert!(result.success);
|
||||||
@ -57,7 +60,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! check_key_metadata {
|
macro_rules! check_key_metadata {
|
||||||
($aqua_dht:expr, $key:expr, $timestamp:expr, $peer_id:expr, $current_timestamp:expr, $cp:expr) => {
|
($aqua_dht:expr, $key:expr, $timestamp:expr, $peer_id:expr, $current_timestamp:expr, $pinned:expr, $weight: expr, $cp:expr) => {
|
||||||
{
|
{
|
||||||
let result = $aqua_dht.get_key_metadata_cp($key.clone(), $current_timestamp.clone(), $cp.clone());
|
let result = $aqua_dht.get_key_metadata_cp($key.clone(), $current_timestamp.clone(), $cp.clone());
|
||||||
assert!(result.success);
|
assert!(result.success);
|
||||||
@ -65,18 +68,20 @@ mod tests {
|
|||||||
assert_eq!(result.key.key, $key);
|
assert_eq!(result.key.key, $key);
|
||||||
assert_eq!(result.key.peer_id, $peer_id);
|
assert_eq!(result.key.peer_id, $peer_id);
|
||||||
assert_eq!(result.key.timestamp_created, $timestamp);
|
assert_eq!(result.key.timestamp_created, $timestamp);
|
||||||
|
assert_eq!(result.key.pinned, $pinned);
|
||||||
|
assert_eq!(result.key.weight, $weight);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! register_key_and_check {
|
macro_rules! register_key_and_check {
|
||||||
($aqua_dht:expr, $key:expr, $timestamp:expr, $cp:expr) => {
|
($aqua_dht:expr, $key:expr, $timestamp:expr, $pin:expr, $weight: expr, $cp:expr) => {
|
||||||
{
|
{
|
||||||
let result = $aqua_dht.register_key_cp($key.clone(), $timestamp, $cp.clone());
|
let result = $aqua_dht.register_key_cp($key.clone(), $timestamp.clone(), $pin.clone(), $weight.clone(), $cp.clone());
|
||||||
assert_eq!(result.error, "");
|
assert_eq!(result.error, "");
|
||||||
assert!(result.success);
|
assert!(result.success);
|
||||||
|
|
||||||
check_key_metadata!($aqua_dht, $key, $timestamp, $cp.init_peer_id, $timestamp, $cp);
|
check_key_metadata!($aqua_dht, $key, $timestamp, $cp.init_peer_id, $timestamp, $pin, $weight, $cp);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -89,7 +94,7 @@ mod tests {
|
|||||||
assert_eq!(result.error, "");
|
assert_eq!(result.error, "");
|
||||||
assert!(result.success);
|
assert!(result.success);
|
||||||
|
|
||||||
check_key_metadata!($aqua_dht, $key.key, $key.timestamp_created, $key.peer_id, $timestamp, $cp);
|
check_key_metadata!($aqua_dht, $key.key, $key.timestamp_created, $key.peer_id, $timestamp, false, $key.weight, $cp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -97,15 +102,15 @@ mod tests {
|
|||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
fn register_key() {
|
fn register_key() {
|
||||||
clear_db();
|
clear_db();
|
||||||
register_key_and_check!(aqua_dht, "some_key".to_string(), 123u64, get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, "some_key".to_string(), 123u64, false, 0u32, get_correct_timestamp_cp(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
fn register_key_empty_cp() {
|
fn register_key_empty_cp() {
|
||||||
clear_db();
|
clear_db();
|
||||||
let result = aqua_dht.register_key("some_key".to_string(), 123u64);
|
let result = aqua_dht.register_key("some_key".to_string(), 123u64, false, 0u32);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -120,9 +125,9 @@ mod tests {
|
|||||||
json_path: "some json path".to_string(),
|
json_path: "some json path".to_string(),
|
||||||
}]);
|
}]);
|
||||||
|
|
||||||
let result = aqua_dht.register_key_cp("some_key".to_string(), 123u64, invalid_cp);
|
let result = aqua_dht.register_key_cp("some_key".to_string(), 123u64, false, 8u32, invalid_cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -130,11 +135,13 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
|
let pin = false;
|
||||||
let mut cp = get_correct_timestamp_cp(1);
|
let mut cp = get_correct_timestamp_cp(1);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
|
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, cp);
|
register_key_and_check!(aqua_dht, key, timestamp, pin, weight, cp);
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, cp);
|
register_key_and_check!(aqua_dht, key, timestamp + 1, pin, weight, cp);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -142,12 +149,14 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
|
let pin = false;
|
||||||
let mut cp = get_correct_timestamp_cp(1);
|
let mut cp = get_correct_timestamp_cp(1);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, cp);
|
register_key_and_check!(aqua_dht, key, timestamp, pin, weight, cp);
|
||||||
|
|
||||||
cp.init_peer_id = "other_peer_id".to_string();
|
cp.init_peer_id = "other_peer_id".to_string();
|
||||||
let result = aqua_dht.register_key_cp(key.clone(), timestamp, cp);
|
let result = aqua_dht.register_key_cp(key.clone(), timestamp, pin, weight, cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "key already exists with different peer_id");
|
assert_eq!(result.error, "key already exists with different peer_id");
|
||||||
}
|
}
|
||||||
@ -167,6 +176,8 @@ mod tests {
|
|||||||
key: "some_key".to_string(),
|
key: "some_key".to_string(),
|
||||||
peer_id: "some_peer".to_string(),
|
peer_id: "some_peer".to_string(),
|
||||||
timestamp_created: 0,
|
timestamp_created: 0,
|
||||||
|
pinned: false,
|
||||||
|
weight: 8u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
republish_key_and_check!(aqua_dht, key, 123u64, get_correct_timestamp_cp(1));
|
republish_key_and_check!(aqua_dht, key, 123u64, get_correct_timestamp_cp(1));
|
||||||
@ -177,14 +188,18 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key_str = "some_key".to_string();
|
let key_str = "some_key".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
|
let pin = false;
|
||||||
let mut cp = get_correct_timestamp_cp(1);
|
let mut cp = get_correct_timestamp_cp(1);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
register_key_and_check!(aqua_dht, key_str, timestamp, cp);
|
register_key_and_check!(aqua_dht, key_str, timestamp, pin, weight, cp);
|
||||||
|
|
||||||
let key = aqua_dht_structs::Key {
|
let key = aqua_dht_structs::Key {
|
||||||
key: key_str.clone(),
|
key: key_str.clone(),
|
||||||
peer_id: cp.init_peer_id,
|
peer_id: cp.init_peer_id,
|
||||||
timestamp_created: timestamp + 1,
|
timestamp_created: timestamp + 1,
|
||||||
|
pinned: false,
|
||||||
|
weight: weight.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
republish_key_and_check!(aqua_dht, key, 123123u64, get_correct_timestamp_cp(1));
|
republish_key_and_check!(aqua_dht, key, 123123u64, get_correct_timestamp_cp(1));
|
||||||
@ -195,14 +210,18 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key_str = "some_key".to_string();
|
let key_str = "some_key".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
|
let pin = false;
|
||||||
let mut cp = get_correct_timestamp_cp(1);
|
let mut cp = get_correct_timestamp_cp(1);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
register_key_and_check!(aqua_dht, key_str, timestamp, cp);
|
register_key_and_check!(aqua_dht, key_str, timestamp, pin, weight, cp);
|
||||||
|
|
||||||
let key = aqua_dht_structs::Key {
|
let key = aqua_dht_structs::Key {
|
||||||
key: key_str.clone(),
|
key: key_str.clone(),
|
||||||
peer_id: "OTHER_PEER_ID".to_string(),
|
peer_id: "OTHER_PEER_ID".to_string(),
|
||||||
timestamp_created: timestamp + 1,
|
timestamp_created: timestamp + 1,
|
||||||
|
pinned: false,
|
||||||
|
weight: weight.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = aqua_dht.republish_key_cp(key, 123123u64, get_correct_timestamp_cp(1));
|
let result = aqua_dht.republish_key_cp(key, 123123u64, get_correct_timestamp_cp(1));
|
||||||
@ -213,9 +232,9 @@ mod tests {
|
|||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
fn put_value_empty_cp() {
|
fn put_value_empty_cp() {
|
||||||
clear_db();
|
clear_db();
|
||||||
let result = aqua_dht.put_value("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![]);
|
let result = aqua_dht.put_value("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -231,9 +250,9 @@ mod tests {
|
|||||||
json_path: "some json path".to_string(),
|
json_path: "some json path".to_string(),
|
||||||
}]);
|
}]);
|
||||||
|
|
||||||
let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], invalid_cp);
|
let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32, invalid_cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -241,7 +260,7 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let result = aqua_dht.get_values("some_key".to_string(), 123u64);
|
let result = aqua_dht.get_values("some_key".to_string(), 123u64);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -258,7 +277,7 @@ mod tests {
|
|||||||
|
|
||||||
let result = aqua_dht.get_values_cp("some_key".to_string(), 123u64, invalid_cp);
|
let result = aqua_dht.get_values_cp("some_key".to_string(), 123u64, invalid_cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -266,7 +285,7 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
|
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
register_key_and_check!(aqua_dht, key, 123u64, get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key, 123u64, false, 8u32, get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
let result = aqua_dht.get_values_cp(key, 123u64, get_correct_timestamp_cp(1));
|
let result = aqua_dht.get_values_cp(key, 123u64, get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
@ -278,7 +297,7 @@ mod tests {
|
|||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
fn put_value_key_not_exists() {
|
fn put_value_key_not_exists() {
|
||||||
clear_db();
|
clear_db();
|
||||||
let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], get_correct_timestamp_cp(2));
|
let result = aqua_dht.put_value_cp("some_key".to_string(), "value".to_string(), 123u64, vec![], vec![], 8u32, get_correct_timestamp_cp(2));
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "not found");
|
assert_eq!(result.error, "not found");
|
||||||
}
|
}
|
||||||
@ -290,13 +309,14 @@ mod tests {
|
|||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let value = "some_value".to_string();
|
let value = "some_value".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
let relay_id = "some_relay".to_string();
|
let relay_id = "some_relay".to_string();
|
||||||
let service_id = "some_service_id".to_string();
|
let service_id = "some_service_id".to_string();
|
||||||
let mut cp = get_correct_timestamp_cp(2);
|
let mut cp = get_correct_timestamp_cp(2);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
|
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1));
|
||||||
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp);
|
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp);
|
||||||
|
|
||||||
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
@ -311,6 +331,7 @@ mod tests {
|
|||||||
assert_eq!(record.relay_id[0], relay_id);
|
assert_eq!(record.relay_id[0], relay_id);
|
||||||
assert_eq!(record.service_id[0], service_id);
|
assert_eq!(record.service_id[0], service_id);
|
||||||
assert_eq!(record.timestamp_created, timestamp);
|
assert_eq!(record.timestamp_created, timestamp);
|
||||||
|
assert_eq!(record.weight, weight);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -319,16 +340,17 @@ mod tests {
|
|||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let value1 = "some_value".to_string();
|
let value1 = "some_value".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
let relay_id = "some_relay".to_string();
|
let relay_id = "some_relay".to_string();
|
||||||
let service_id = "some_service_id".to_string();
|
let service_id = "some_service_id".to_string();
|
||||||
|
|
||||||
let mut cp = get_correct_timestamp_cp(2);
|
let mut cp = get_correct_timestamp_cp(2);
|
||||||
cp.init_peer_id = "some_peer_id".to_string();
|
cp.init_peer_id = "some_peer_id".to_string();
|
||||||
|
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1));
|
||||||
put_value_and_check!(aqua_dht, key, value1, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp);
|
put_value_and_check!(aqua_dht, key, value1, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp);
|
||||||
let value2 = "other_value".to_string();
|
let value2 = "other_value".to_string();
|
||||||
put_value_and_check!(aqua_dht, key, value2, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp);
|
put_value_and_check!(aqua_dht, key, value2, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp);
|
||||||
|
|
||||||
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
@ -343,6 +365,7 @@ mod tests {
|
|||||||
assert_eq!(record.relay_id[0], relay_id);
|
assert_eq!(record.relay_id[0], relay_id);
|
||||||
assert_eq!(record.service_id[0], service_id);
|
assert_eq!(record.service_id[0], service_id);
|
||||||
assert_eq!(record.timestamp_created, timestamp);
|
assert_eq!(record.timestamp_created, timestamp);
|
||||||
|
assert_eq!(record.weight, weight);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -351,19 +374,20 @@ mod tests {
|
|||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let value = "some_value".to_string();
|
let value = "some_value".to_string();
|
||||||
let timestamp = 123u64;
|
let timestamp = 123u64;
|
||||||
|
let weight = 8u32;
|
||||||
let relay_id = "some_relay".to_string();
|
let relay_id = "some_relay".to_string();
|
||||||
let service_id = "some_service_id".to_string();
|
let service_id = "some_service_id".to_string();
|
||||||
let mut cp = get_correct_timestamp_cp(2);
|
let mut cp = get_correct_timestamp_cp(2);
|
||||||
let peer1_id = "some_peer_id".to_string();
|
let peer1_id = "some_peer_id".to_string();
|
||||||
let peer2_id = "other_peer_id".to_string();
|
let peer2_id = "other_peer_id".to_string();
|
||||||
|
|
||||||
register_key_and_check!(aqua_dht, key, timestamp, get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key, timestamp, false, 8u32, get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
cp.init_peer_id = peer1_id.clone();
|
cp.init_peer_id = peer1_id.clone();
|
||||||
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp);
|
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp);
|
||||||
|
|
||||||
cp.init_peer_id = peer2_id.clone();
|
cp.init_peer_id = peer2_id.clone();
|
||||||
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], cp);
|
put_value_and_check!(aqua_dht, key, value, timestamp, vec![relay_id.clone()], vec![service_id.clone()], weight, cp);
|
||||||
|
|
||||||
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
let result = aqua_dht.get_values_cp(key, timestamp.clone(), get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
@ -378,6 +402,7 @@ mod tests {
|
|||||||
assert_eq!(record.relay_id[0], relay_id);
|
assert_eq!(record.relay_id[0], relay_id);
|
||||||
assert_eq!(record.service_id[0], service_id);
|
assert_eq!(record.service_id[0], service_id);
|
||||||
assert_eq!(record.timestamp_created, timestamp);
|
assert_eq!(record.timestamp_created, timestamp);
|
||||||
|
assert_eq!(record.weight, weight);
|
||||||
|
|
||||||
let record = &result.result[1];
|
let record = &result.result[1];
|
||||||
assert_eq!(record.value, value);
|
assert_eq!(record.value, value);
|
||||||
@ -385,6 +410,7 @@ mod tests {
|
|||||||
assert_eq!(record.relay_id[0], relay_id);
|
assert_eq!(record.relay_id[0], relay_id);
|
||||||
assert_eq!(record.service_id[0], service_id);
|
assert_eq!(record.service_id[0], service_id);
|
||||||
assert_eq!(record.timestamp_created, timestamp);
|
assert_eq!(record.timestamp_created, timestamp);
|
||||||
|
assert_eq!(record.weight, weight);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -393,7 +419,7 @@ mod tests {
|
|||||||
|
|
||||||
let result = aqua_dht.clear_expired(124u64);
|
let result = aqua_dht.clear_expired(124u64);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -411,15 +437,15 @@ mod tests {
|
|||||||
|
|
||||||
let result = aqua_dht.clear_expired_cp(124u64, invalid_cp);
|
let result = aqua_dht.clear_expired_cp(124u64, invalid_cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
fn clear_expired_empty() {
|
fn clear_expired_empty() {
|
||||||
clear_db();
|
clear_db();
|
||||||
let result = aqua_dht.clear_expired_cp(124u64, get_correct_timestamp_cp(0));
|
let result = aqua_dht.clear_expired_cp(124u64, get_correct_timestamp_cp(0));
|
||||||
assert!(result.success);
|
|
||||||
assert_eq!(result.error, "");
|
assert_eq!(result.error, "");
|
||||||
|
assert!(result.success);
|
||||||
assert_eq!(result.count_keys + result.count_values, 0);
|
assert_eq!(result.count_keys + result.count_values, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -428,7 +454,7 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let expired_timestamp = 0u64;
|
let expired_timestamp = 0u64;
|
||||||
register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0));
|
let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0));
|
||||||
|
|
||||||
@ -447,8 +473,8 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let expired_timestamp = 0u64;
|
let expired_timestamp = 0u64;
|
||||||
register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key.clone(), expired_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1));
|
||||||
put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], get_correct_timestamp_cp(2));
|
put_value_and_check!(aqua_dht, key.clone(), "some_value".to_string(), expired_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2));
|
||||||
|
|
||||||
let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0));
|
let result = aqua_dht.clear_expired_cp(expired_timestamp + EXPIRED_VALUE_AGE, get_correct_timestamp_cp(0));
|
||||||
|
|
||||||
@ -474,7 +500,7 @@ mod tests {
|
|||||||
|
|
||||||
let result = aqua_dht.evict_stale(124u64);
|
let result = aqua_dht.evict_stale(124u64);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -492,7 +518,7 @@ mod tests {
|
|||||||
|
|
||||||
let result = aqua_dht.evict_stale_cp(124u64, invalid_cp);
|
let result = aqua_dht.evict_stale_cp(124u64, invalid_cp);
|
||||||
assert!(!result.success);
|
assert!(!result.success);
|
||||||
assert_eq!(result.error, "you should use peer.timestamp_ms to pass timestamp");
|
assert_eq!(result.error, "you should use host peer.timestamp_sec to pass timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
#[marine_test(config_path = "../Config.toml", modules_dir = "../artifacts/")]
|
||||||
@ -509,7 +535,7 @@ mod tests {
|
|||||||
clear_db();
|
clear_db();
|
||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let stale_timestamp = 0u64;
|
let stale_timestamp = 0u64;
|
||||||
register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1));
|
||||||
|
|
||||||
let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0));
|
let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0));
|
||||||
|
|
||||||
@ -531,8 +557,8 @@ mod tests {
|
|||||||
let key = "some_key".to_string();
|
let key = "some_key".to_string();
|
||||||
let value = "some_value".to_string();
|
let value = "some_value".to_string();
|
||||||
let stale_timestamp = 0u64;
|
let stale_timestamp = 0u64;
|
||||||
register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), get_correct_timestamp_cp(1));
|
register_key_and_check!(aqua_dht, key.clone(), stale_timestamp.clone(), false, 8u32, get_correct_timestamp_cp(1));
|
||||||
put_value_and_check!(aqua_dht, key.clone(), value.clone(), stale_timestamp.clone(), vec![], vec![], get_correct_timestamp_cp(2));
|
put_value_and_check!(aqua_dht, key.clone(), value.clone(), stale_timestamp.clone(), vec![], vec![], 8u32, get_correct_timestamp_cp(2));
|
||||||
|
|
||||||
let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0));
|
let result = aqua_dht.evict_stale_cp(stale_timestamp + STALE_VALUE_AGE, get_correct_timestamp_cp(0));
|
||||||
|
|
||||||
@ -568,7 +594,8 @@ mod tests {
|
|||||||
relay_id: vec![],
|
relay_id: vec![],
|
||||||
service_id: vec![],
|
service_id: vec![],
|
||||||
timestamp_created: 123u64,
|
timestamp_created: 123u64,
|
||||||
|
set_by: peer_id.clone(),
|
||||||
|
weight: 8u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_record = aqua_dht_structs::Record {
|
let new_record = aqua_dht_structs::Record {
|
||||||
@ -577,6 +604,8 @@ mod tests {
|
|||||||
relay_id: vec![],
|
relay_id: vec![],
|
||||||
service_id: vec![],
|
service_id: vec![],
|
||||||
timestamp_created: stale_record.timestamp_created + 9999u64,
|
timestamp_created: stale_record.timestamp_created + 9999u64,
|
||||||
|
set_by: peer_id.clone(),
|
||||||
|
weight: 8u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = aqua_dht.merge(vec![vec![stale_record.clone()], vec![new_record.clone()]]);
|
let result = aqua_dht.merge(vec![vec![stale_record.clone()], vec![new_record.clone()]]);
|
||||||
@ -604,6 +633,8 @@ mod tests {
|
|||||||
relay_id: vec![],
|
relay_id: vec![],
|
||||||
service_id: vec![],
|
service_id: vec![],
|
||||||
timestamp_created: 123u64,
|
timestamp_created: 123u64,
|
||||||
|
set_by: peer_id1.clone(),
|
||||||
|
weight: 8u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
let record2 = aqua_dht_structs::Record {
|
let record2 = aqua_dht_structs::Record {
|
||||||
@ -612,6 +643,8 @@ mod tests {
|
|||||||
relay_id: vec![],
|
relay_id: vec![],
|
||||||
service_id: vec![],
|
service_id: vec![],
|
||||||
timestamp_created: record1.timestamp_created + 9999u64,
|
timestamp_created: record1.timestamp_created + 9999u64,
|
||||||
|
set_by: peer_id2.clone(),
|
||||||
|
weight: 8u32,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = aqua_dht.merge_two(vec![record1], vec![record2]);
|
let result = aqua_dht.merge_two(vec![record1], vec![record2]);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user