add republish_values and clear_expired methods

This commit is contained in:
Alexey Proshutinskiy 2021-05-13 19:04:11 +03:00
parent 0eaff4e447
commit f9c62f2e9a
3 changed files with 84 additions and 19 deletions

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME}; use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE};
use crate::results::{Key, Record}; use crate::results::{Key, Record};
use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State}; use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State};
use fluence::{CallParameters}; use fluence::{CallParameters};
@ -123,6 +123,7 @@ pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_
let connection = get_connection()?; let connection = get_connection()?;
// checking key for existence
let _key = get_key_metadata_helper(&connection, key.clone())?; let _key = get_key_metadata_helper(&connection, key.clone())?;
let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() }; let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() };
let peer_id = call_parameters.init_peer_id; let peer_id = call_parameters.init_peer_id;
@ -164,3 +165,40 @@ pub fn get_values_impl(key: String, current_timestamp: u64) -> SqliteResult<Vec<
Ok(result) Ok(result)
} }
pub fn republish_values_impl(key: String, records: Vec<Record>, current_timestamp: u64) -> SqliteResult<u64> {
let call_parameters = fluence::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 2)
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
let connection = get_connection()?;
// checking key for existence
let _key = get_key_metadata_helper(&connection, key.clone())?;
let mut updated = 0u64;
for record in records {
connection.execute(
f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \
VALUES ('{key}', '{record.value}', '{record.peer_id}', '{record.relay_id}',\
'{record.service_id}', '{record.timestamp_created}', '{current_timestamp}')"))?;
updated += connection.changes() as u64;
}
Ok(updated)
}
pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> {
let call_parameters = fluence::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 0)
.map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?;
let connection = get_connection()?;
let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE;
connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE id IN (SELECT key FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?;
let deleted_values = connection.changes() as u64;
connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?;
let deleted_keys = connection.changes() as u64;
Ok((deleted_keys, deleted_values))
}

View File

@ -19,8 +19,8 @@ mod results;
mod tests; mod tests;
mod impls; mod impls;
use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult}; use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult};
use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl}; use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl};
use fluence::marine; use fluence::marine;
use fluence::module_manifest; use fluence::module_manifest;
@ -72,16 +72,16 @@ pub fn get_values(key: String, current_timestamp: u64) -> GetValuesResult {
get_values_impl(key, current_timestamp).into() get_values_impl(key, current_timestamp).into()
} }
// #[marine] #[marine]
// pub fn republish_values(key: String, records: Vec<Record>, current_timestamp: u64) -> RepublishValuesResult { pub fn republish_values(key: String, records: Vec<Record>, current_timestamp: u64) -> RepublishValuesResult {
// republish_values_impl(key, records, current_timestamp).into() republish_values_impl(key, records, current_timestamp).into()
// } }
// BOTH // BOTH
// #[marine] #[marine]
// pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult { pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult {
// clear_expired_impl(current_timestamp).into() clear_expired_impl(current_timestamp).into()
// } }
// //
// #[marine] // #[marine]
// pub fn evict_stale(current_timestamp: u64) -> GetStaleResult { // pub fn evict_stale(current_timestamp: u64) -> GetStaleResult {

View File

@ -123,20 +123,23 @@ impl From<SqliteResult<Vec<Record>>> for GetValuesResult {
pub struct ClearExpiredResult { pub struct ClearExpiredResult {
pub success: bool, pub success: bool,
pub error: String, pub error: String,
pub count: u64, pub count_keys: u64,
pub count_values: u64,
} }
impl From<SqliteResult<u64>> for ClearExpiredResult { impl From<SqliteResult<(u64, u64)>> for ClearExpiredResult {
fn from(result: SqliteResult<u64>) -> Self { fn from(result: SqliteResult<(u64, u64)>) -> Self {
match result { match result {
Ok(result) => Self { Ok((keys, values)) => Self {
success: true, success: true,
count: result, count_keys: keys,
count_values: values,
error: "".to_string(), error: "".to_string(),
}, },
Err(err) => Self { Err(err) => Self {
success: false, success: false,
count: 0, count_keys: 0,
count_values: 0,
error: err.to_string(), error: err.to_string(),
}, },
} }
@ -199,3 +202,27 @@ impl From<SqliteResult<Key>> for GetKeyMetadataResult {
} }
} }
} }
#[marine]
pub struct RepublishValuesResult {
pub success: bool,
pub error: String,
pub updated: u64,
}
impl From<SqliteResult<u64>> for RepublishValuesResult {
fn from(result: SqliteResult<u64>) -> Self {
match result {
Ok(count) => Self {
success: true,
error: "".to_string(),
updated: count,
},
Err(err) => Self {
success: false,
error: err.to_string(),
updated: 0,
},
}
}
}