diff --git a/src/impls.rs b/src/impls.rs index 4c161ea..019a077 100644 --- a/src/impls.rs +++ b/src/impls.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME}; +use crate::{KEYS_TABLE_NAME, VALUES_TABLE_NAME, DB_PATH, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, EXPIRED_VALUE_AGE}; use crate::results::{Key, Record}; use marine_sqlite_connector::{Connection, Result as SqliteResult, Error as SqliteError, State}; use fluence::{CallParameters}; @@ -123,6 +123,7 @@ pub fn put_value_impl(key: String, value: String, current_timestamp: u64, relay_ let connection = get_connection()?; + // checking key for existence let _key = get_key_metadata_helper(&connection, key.clone())?; let relay_id = if relay_id.len() == 0 { "".to_string() } else { relay_id[0].clone() }; let peer_id = call_parameters.init_peer_id; @@ -153,9 +154,9 @@ pub fn get_values_impl(key: String, current_timestamp: u64) -> SqliteResult = vec![]; while let State::Row = statement.next()? { - result.push(Record{ + result.push(Record { value: statement.read::(0)?, - peer_id:statement.read::(1)?, + peer_id: statement.read::(1)?, relay_id: statement.read::(2)?, service_id: statement.read::(3)?, timestamp_created: statement.read::(4)? as u64, @@ -164,3 +165,40 @@ pub fn get_values_impl(key: String, current_timestamp: u64) -> SqliteResult, current_timestamp: u64) -> SqliteResult { + let call_parameters = fluence::get_call_parameters(); + check_timestamp_tetraplets(&call_parameters, 2) + .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; + let connection = get_connection()?; + + // checking key for existence + let _key = get_key_metadata_helper(&connection, key.clone())?; + + let mut updated = 0u64; + for record in records { + connection.execute( + f!("INSERT OR REPLACE INTO {VALUES_TABLE_NAME} \ + VALUES ('{key}', '{record.value}', '{record.peer_id}', '{record.relay_id}',\ + '{record.service_id}', '{record.timestamp_created}', '{current_timestamp}')"))?; + + updated += connection.changes() as u64; + } + + Ok(updated) +} + +pub fn clear_expired_impl(current_timestamp: u64) -> SqliteResult<(u64, u64)> { + let call_parameters = fluence::get_call_parameters(); + check_timestamp_tetraplets(&call_parameters, 0) + .map_err(|e| SqliteError { code: None, message: Some(e.to_string()) })?; + let connection = get_connection()?; + + let expired_timestamp = current_timestamp - EXPIRED_VALUE_AGE; + connection.execute(f!("DELETE FROM {VALUES_TABLE_NAME} WHERE id IN (SELECT key FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?; + let deleted_values = connection.changes() as u64; + connection.execute(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= {expired_timestamp}"))?; + let deleted_keys = connection.changes() as u64; + + Ok((deleted_keys, deleted_values)) +} diff --git a/src/main.rs b/src/main.rs index f23cf40..096de2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,8 +19,8 @@ mod results; mod tests; mod impls; -use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult}; -use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl}; +use crate::results::{Key, GetKeyMetadataResult, RegisterKeyResult, RepublishKeyResult, PutValueResult, GetValuesResult, Record, RepublishValuesResult, ClearExpiredResult}; +use crate::impls::{create_keys_table, create_values_table, register_key_impl, get_key_metadata_impl, republish_key_impl, put_value_impl, get_values_impl, republish_values_impl, clear_expired_impl}; use fluence::marine; use fluence::module_manifest; @@ -72,16 +72,16 @@ pub fn get_values(key: String, current_timestamp: u64) -> GetValuesResult { get_values_impl(key, current_timestamp).into() } -// #[marine] -// pub fn republish_values(key: String, records: Vec, current_timestamp: u64) -> RepublishValuesResult { -// republish_values_impl(key, records, current_timestamp).into() -// } +#[marine] +pub fn republish_values(key: String, records: Vec, current_timestamp: u64) -> RepublishValuesResult { + republish_values_impl(key, records, current_timestamp).into() +} // BOTH -// #[marine] -// pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult { -// clear_expired_impl(current_timestamp).into() -// } +#[marine] +pub fn clear_expired(current_timestamp: u64) -> ClearExpiredResult { + clear_expired_impl(current_timestamp).into() +} // // #[marine] // pub fn evict_stale(current_timestamp: u64) -> GetStaleResult { diff --git a/src/results.rs b/src/results.rs index eac5d46..0a9904c 100644 --- a/src/results.rs +++ b/src/results.rs @@ -123,20 +123,23 @@ impl From>> for GetValuesResult { pub struct ClearExpiredResult { pub success: bool, pub error: String, - pub count: u64, + pub count_keys: u64, + pub count_values: u64, } -impl From> for ClearExpiredResult { - fn from(result: SqliteResult) -> Self { +impl From> for ClearExpiredResult { + fn from(result: SqliteResult<(u64, u64)>) -> Self { match result { - Ok(result) => Self { + Ok((keys, values)) => Self { success: true, - count: result, + count_keys: keys, + count_values: values, error: "".to_string(), }, Err(err) => Self { success: false, - count: 0, + count_keys: 0, + count_values: 0, error: err.to_string(), }, } @@ -199,3 +202,27 @@ impl From> for GetKeyMetadataResult { } } } + +#[marine] +pub struct RepublishValuesResult { + pub success: bool, + pub error: String, + pub updated: u64, +} + +impl From> for RepublishValuesResult { + fn from(result: SqliteResult) -> Self { + match result { + Ok(count) => Self { + success: true, + error: "".to_string(), + updated: count, + }, + Err(err) => Self { + success: false, + error: err.to_string(), + updated: 0, + }, + } + } +}