diff --git a/aqua/registry-api.aqua b/aqua/registry-api.aqua index 24cb274..ca97c13 100644 --- a/aqua/registry-api.aqua +++ b/aqua/registry-api.aqua @@ -11,40 +11,40 @@ data SignResult: service Sig("sig"): sign(data: []u8) -> SignResult -func get_key_signature(label: string, timestamp_created: u64) -> []u8: +func get_route_signature(label: string, timestamp_created: u64) -> []u8: on HOST_PEER_ID: - bytes <- Registry.get_key_bytes(label, nil, timestamp_created, nil, "") + bytes <- Registry.get_route_bytes(label, nil, timestamp_created, nil, "") signature <- Sig.sign(bytes) <- signature.signature -func get_record_signature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: +func get_record_signature(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: on HOST_PEER_ID: - bytes <- Registry.get_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) + bytes <- Registry.get_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil) signature <- Sig.sign(bytes) <- signature.signature -func get_host_record_signature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: +func get_host_record_signature(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: on HOST_PEER_ID: - bytes <- Registry.get_host_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) + bytes <- Registry.get_host_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil) signature <- Sig.sign(bytes) <- signature.signature -func register_key(label: string, timestamp_created: u64, signature: []u8, pin: bool) -> RegisterKeyResult: +func register_route(label: string, timestamp_created: u64, signature: []u8, pin: bool) -> RegisterRouteResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.register_key(label, nil, timestamp_created, nil, "", signature, pin, weight, t) + result <- Registry.register_route(label, nil, timestamp_created, nil, "", signature, pin, weight, t) <- result -func put_record(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> DhtResult: +func put_record(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> DhtResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.put_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) + result <- Registry.put_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) <- result -func put_host_record(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult: +func put_host_record(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.put_host_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) + result <- Registry.put_host_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) <- result func propagate_host_record(res: PutHostRecordResult) -> DhtResult: diff --git a/aqua/registry-scheduled-scripts.aqua b/aqua/registry-scheduled-scripts.aqua index 019e6cc..1b6dd17 100644 --- a/aqua/registry-scheduled-scripts.aqua +++ b/aqua/registry-scheduled-scripts.aqua @@ -12,19 +12,19 @@ func clearExpired_86400(): t <- Peer.timestamp_sec() Registry.clear_expired(t) --- get all old records and replicate it by keys +-- get all old records and replicate it by routes func replicate_3600(): on HOST_PEER_ID: t <- Peer.timestamp_sec() res <- Registry.evict_stale(t) for r <- res.results par: - k <- Op.string_to_b58(r.key.key_id) + k <- Op.string_to_b58(r.route.id) nodes <- Kademlia.neighborhood(k, nil, nil) for n <- nodes par: on n: tt <- Peer.timestamp_sec() - key_weight <- TrustGraph.get_weight(r.key.peer_id, tt) - Registry.republish_key(r.key, key_weight, tt) + key_weight <- TrustGraph.get_weight(r.route.peer_id, tt) + Registry.republish_route(r.route, key_weight, tt) records_weights: *WeightResult for record <- r.records: diff --git a/aqua/registry.aqua b/aqua/registry.aqua index 5dd733d..03599f6 100644 --- a/aqua/registry.aqua +++ b/aqua/registry.aqua @@ -3,15 +3,15 @@ module Registry declares * data ClearExpiredResult: success: bool error: string - count_keys: u64 + count_routes: u64 count_values: u64 data DhtResult: success: bool error: string -data Key: - key_id: string +data Route: + id: string label: string peer_id: string timestamp_created: u64 @@ -20,7 +20,7 @@ data Key: signature: []u8 data Record: - key_id: string + route_id: string value: string peer_id: string set_by: string @@ -31,7 +31,7 @@ data Record: signature: []u8 data EvictStaleItem: - key: Key + route: Route records: []Record data EvictStaleResult: @@ -39,10 +39,10 @@ data EvictStaleResult: error: string results: []EvictStaleItem -data GetKeyMetadataResult: +data GetRouteMetadataResult: success: bool error: string - key: Key + route: Route data GetValuesResult: success: bool @@ -59,10 +59,10 @@ data PutHostRecordResult: error: string value: []Record -data RegisterKeyResult: +data RegisterRouteResult: success: bool error: string - key_id: string + route_id: string data RepublishValuesResult: success: bool @@ -77,21 +77,21 @@ data WeightResult: service Registry("registry"): clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult - clear_host_record(key_id: string, current_timestamp_sec: u64) -> DhtResult + clear_host_record(route_id: string, current_timestamp_sec: u64) -> DhtResult evict_stale(current_timestamp_sec: u64) -> EvictStaleResult - get_host_record_bytes(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 - get_key_bytes(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string) -> []u8 - get_key_id(label: string, peer_id: string) -> string - get_key_metadata(key_id: string, current_timestamp_sec: u64) -> GetKeyMetadataResult - get_record_bytes(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 - get_records(key_id: string, current_timestamp_sec: u64) -> GetValuesResult + get_host_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 + get_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 + get_records(route_id: string, current_timestamp_sec: u64) -> GetValuesResult + get_route_bytes(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string) -> []u8 + get_route_id(label: string, peer_id: string) -> string + get_route_metadata(route_id: string, current_timestamp_sec: u64) -> GetRouteMetadataResult merge(records: [][]Record) -> MergeResult merge_two(a: []Record, b: []Record) -> MergeResult propagate_host_record(set_host_value: PutHostRecordResult, current_timestamp_sec: u64, weight: WeightResult) -> DhtResult - put_host_record(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> PutHostRecordResult - put_record(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult - register_key(key: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string, signature: []u8, pin: bool, weight: WeightResult, current_timestamp_sec: u64) -> RegisterKeyResult - republish_key(key: Key, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult + put_host_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> PutHostRecordResult + put_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult + register_route(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string, signature: []u8, pin: bool, weight: WeightResult, current_timestamp_sec: u64) -> RegisterRouteResult republish_records(records: []Record, weights: []WeightResult, current_timestamp_sec: u64) -> RepublishValuesResult + republish_route(route: Route, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult set_expired_timeout(timeout_sec: u64) set_stale_timeout(timeout_sec: u64) diff --git a/aqua/routing.aqua b/aqua/routing.aqua index 885f1db..467c8cb 100644 --- a/aqua/routing.aqua +++ b/aqua/routing.aqua @@ -7,7 +7,7 @@ import "@fluencelabs/aqua-lib/builtin.aqua" alias RouteId: string func get_route_id(label: string, peer_id: string) -> RouteId: - route_id <- Registry.get_key_id(label, peer_id) + route_id <- Registry.get_route_id(label, peer_id) <- route_id -- Get peers closest to the label's hash in Kademlia network @@ -28,7 +28,7 @@ func removeFromRoute(route_id: string): -- Create a route: register it on the closest peers func createRoute(label: string) -> RouteId: t <- Peer.timestamp_sec() - signature <- get_key_signature(label, t) + signature <- get_route_signature(label, t) on HOST_PEER_ID: route_id <- get_route_id(label, %init_peer_id%) @@ -36,14 +36,14 @@ func createRoute(label: string) -> RouteId: for n <- nodes par: on n: try: - result <- register_key(label, t, signature, false) + result <- register_route(label, t, signature, false) <- route_id -- Create a label and subscribe to it -- %init_peer_id% (current client) will become a subscriber func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, service_id: ?string) -> string: t <- Peer.timestamp_sec() - key_signature <- get_key_signature(label, t) + route_signature <- get_route_signature(label, t) on HOST_PEER_ID: route_id <- get_route_id(label, %init_peer_id%) record_signature <- get_record_signature(route_id, value, relay_id, service_id, t) @@ -54,7 +54,7 @@ func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, ser for n <- nodes par: on n: try: - register_key(label, t, key_signature, false) + register_route(label, t, route_signature, false) put_record(route_id, value, relay_id, service_id, t, record_signature) <- route_id @@ -68,7 +68,7 @@ func createRouteAndRegisterBlocking( ack: i16 ) -> string: t <- Peer.timestamp_sec() - key_signature <- get_key_signature(label, t) + route_signature <- get_route_signature(label, t) on HOST_PEER_ID: route_id <- get_route_id(label, %init_peer_id%) record_signature <- get_record_signature(route_id, value, relay_id, service_id, t) @@ -79,7 +79,7 @@ func createRouteAndRegisterBlocking( for n <- nodes par: on n: try: - res1 <- register_key(label, t, key_signature, false) + res1 <- register_route(label, t, route_signature, false) result <- put_record(route_id, value, relay_id, service_id, t, record_signature) if result.success: results <<- result @@ -90,20 +90,20 @@ func createRouteAndRegisterBlocking( -- Create a label and make the given node a subscriber to it func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string: t <- Peer.timestamp_sec() - key_signature <- get_key_signature(label, t) + route_signature <- get_route_signature(label, t) on HOST_PEER_ID: route_id <- get_route_id(label, %init_peer_id%) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) on subscriber_node_id: - register_key(label, t, key_signature, false) + register_route(label, t, route_signature, false) r <- put_host_record(route_id, value, nil, service_id, t, record_signature) nodes <- getNeighbours(route_id) for n <- nodes par: on n: try: - register_key(label, t, key_signature, false) + register_route(label, t, route_signature, false) propagate_host_record(r) <- route_id @@ -125,7 +125,7 @@ func registerForRoute(route_id: string, value: string, relay_id: ?PeerId, servic -- Note: label must be already initiated func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string): t <- Peer.timestamp_sec() - key_signature <- get_key_signature(label, t) + route_signature <- get_route_signature(label, t) on HOST_PEER_ID: route_id <- get_route_id(label, %init_peer_id%) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) @@ -136,7 +136,7 @@ func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: stri for n <- nodes par: on n: try: - register_key(label, t, key_signature, false) + register_route(label, t, route_signature, false) propagate_host_record(r) -- Find the list of record for the given route_id diff --git a/example/src/example.ts b/example/src/example.ts index f41442b..840f64c 100644 --- a/example/src/example.ts +++ b/example/src/example.ts @@ -23,7 +23,7 @@ let local: Node[] = [ async function main() { // connect to the Fluence network - await Fluence.start({ connectTo: local[0] }); + await Fluence.start({ connectTo: krasnodar[0] }); console.log("%s", await timestamp_sec()); console.log( "📗 created a fluence peer %s with relay %s", @@ -38,11 +38,11 @@ async function main() { let route_id = await createRouteAndRegisterBlocking( label, value, relay, null, (s) => console.log(`node ${s} saved the record`), - 0 + 5 ); // find other peers on this route console.log("let's find subscribers for %s", route_id); - let subscribers = await resolveRoute(route_id, 0); + let subscribers = await resolveRoute(route_id, 5); console.log("found subscribers:", subscribers); } diff --git a/service/Cargo.lock b/service/Cargo.lock index de7c406..aed7c8b 100644 --- a/service/Cargo.lock +++ b/service/Cargo.lock @@ -24,9 +24,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" [[package]] name = "arrayref" @@ -529,9 +529,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816" +checksum = "eed12bbf7b5312f8da1c2722bc06d8c6b12c2d86a7fb35a194c7f3e6fc2bbe39" dependencies = [ "serde", "signature", @@ -590,9 +590,9 @@ dependencies = [ [[package]] name = "eyre" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc225d8f637923fe585089fcf03e705c222131232d2c1fb622e84ecf725d0eb8" +checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f" dependencies = [ "indenter", "once_cell", @@ -1676,9 +1676,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "opaque-debug" @@ -1752,7 +1752,7 @@ dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.2.10", + "redox_syscall 0.2.11", "smallvec", "winapi", ] @@ -2100,18 +2100,18 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" dependencies = [ "bitflags", ] [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -2454,7 +2454,7 @@ dependencies = [ "cfg-if 1.0.0", "fastrand", "libc", - "redox_syscall 0.2.10", + "redox_syscall 0.2.11", "remove_dir_all", "winapi", ] @@ -2972,9 +2972,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "zeroize" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006" +checksum = "50344758e2f40e3a1fcfc8f6f91aa57b5f8ebd8d27919fe6451f15aaaf9ee608" dependencies = [ "zeroize_derive", ] diff --git a/service/src/defaults.rs b/service/src/defaults.rs index 1264857..d8aae40 100644 --- a/service/src/defaults.rs +++ b/service/src/defaults.rs @@ -15,8 +15,8 @@ */ // TODO: sanitize tables' names in SQL expressions -pub static KEYS_TABLE_NAME: &str = "dht_keys"; -pub static KEYS_TIMESTAMPS_TABLE_NAME: &str = "dht_keys_timestamps"; +pub static ROUTES_TABLE_NAME: &str = "dht_routes"; +pub static ROUTES_TIMESTAMPS_TABLE_NAME: &str = "dht_routes_timestamps"; pub static RECORDS_TABLE_NAME: &str = "dht_records"; pub static CONFIG_FILE: &str = "/tmp/Config.toml"; pub static DB_PATH: &str = "/tmp/registry.db"; diff --git a/service/src/error.rs b/service/src/error.rs index 656b20b..b75bc8f 100644 --- a/service/src/error.rs +++ b/service/src/error.rs @@ -25,13 +25,13 @@ pub enum ServiceError { #[source] SqliteError, ), - #[error("Requested key {0} does not exist")] - KeyNotExists(String), - #[error("Key {0} for {1} peer_id already exists with newer timestamp")] - KeyAlreadyExistsNewerTimestamp(String, String), + #[error("Requested route {0} does not exist")] + RouteNotExists(String), + #[error("Route {0} for {1} peer_id already exists with newer timestamp")] + RouteAlreadyExistsNewerTimestamp(String, String), #[error("Values limit for key_d {0} is exceeded")] ValuesLimitExceeded(String), - #[error("Host value for key_id {0} not found ")] + #[error("Host value for route_id {0} not found ")] HostValueNotFound(String), #[error("Invalid set_host_value result: success is false or value is missing")] InvalidSetHostValueResult, @@ -59,8 +59,8 @@ pub enum ServiceError { String, #[source] fluence_keypair::error::VerificationError, ), - #[error("Key can't be registered in the future")] - InvalidKeyTimestamp, + #[error("Route can't be registered in the future")] + InvalidRouteTimestamp, #[error("Record can't be registered in the future")] InvalidRecordTimestamp, #[error("Records to publish should belong to one key id")] diff --git a/service/src/key_storage_impl.rs b/service/src/key_storage_impl.rs deleted file mode 100644 index 586de2d..0000000 --- a/service/src/key_storage_impl.rs +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::defaults::{KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME}; - -use crate::error::ServiceError; -use crate::error::ServiceError::{InternalError, KeyNotExists}; -use crate::key::{Key, KeyInternal}; -use crate::storage_impl::Storage; -use marine_sqlite_connector::{State, Statement, Value}; - -impl Storage { - pub fn create_keys_tables(&self) -> bool { - self.connection - .execute(f!(" - CREATE TABLE IF NOT EXISTS {KEYS_TABLE_NAME} ( - key_id TEXT PRIMARY KEY, - label TEXT, - peer_id TEXT, - timestamp_created INTEGER, - challenge BLOB, - challenge_type TEXT, - signature BLOB NOT NULL, - timestamp_published INTEGER, - pinned INTEGER, - weight INTEGER - ); - ")) - .is_ok() - && self - .connection - .execute(f!(" - CREATE TABLE IF NOT EXISTS {KEYS_TIMESTAMPS_TABLE_NAME} ( - key_id TEXT PRIMARY KEY, - timestamp_accessed INTEGER - ); - ")) - .is_ok() - } - - pub fn update_key_timestamp( - &self, - key_id: &str, - current_timestamp_sec: u64, - ) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!(" - INSERT OR REPLACE INTO {KEYS_TIMESTAMPS_TABLE_NAME} VALUES (?, ?); - "))?; - - statement.bind(1, &Value::String(key_id.to_string()))?; - statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?; - statement.next()?; - Ok(()) - } - - pub fn get_key(&self, key_id: String) -> Result { - let mut statement = self.connection.prepare(f!( - "SELECT key_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \ - FROM {KEYS_TABLE_NAME} WHERE key_id = ?" - ))?; - statement.bind(1, &Value::String(key_id.clone()))?; - - if let State::Row = statement.next()? { - read_key(&statement) - } else { - Err(KeyNotExists(key_id)) - } - } - - pub fn write_key(&self, key: KeyInternal) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!(" - INSERT OR REPLACE INTO {KEYS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); - "))?; - - let pinned = if key.pinned { 1 } else { 0 } as i64; - statement.bind(1, &Value::String(key.key.key_id))?; - statement.bind(2, &Value::String(key.key.label))?; - statement.bind(3, &Value::String(key.key.peer_id))?; - statement.bind(4, &Value::Integer(key.key.timestamp_created as i64))?; - statement.bind(5, &Value::Binary(key.key.challenge))?; - statement.bind(6, &Value::String(key.key.challenge_type))?; - statement.bind(7, &Value::Binary(key.key.signature))?; - statement.bind(8, &Value::Integer(key.timestamp_published as i64))?; - statement.bind(9, &Value::Integer(pinned))?; - statement.bind(10, &Value::Integer(key.weight as i64))?; - statement.next()?; - Ok(()) - } - - pub fn update_key(&self, key: KeyInternal) -> Result<(), ServiceError> { - if let Ok(existing_key) = self.get_key(key.key.key_id.clone()) { - if existing_key.timestamp_created > key.key.timestamp_created { - return Err(ServiceError::KeyAlreadyExistsNewerTimestamp( - key.key.label, - key.key.peer_id, - )); - } - } - - self.write_key(key) - } - - pub fn check_key_existence(&self, key_id: &str) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT EXISTS(SELECT 1 FROM {KEYS_TABLE_NAME} WHERE key_id = ? LIMIT 1)" - ))?; - statement.bind(1, &Value::String(key_id.to_string()))?; - - if let State::Row = statement.next()? { - let exists = statement.read::(0)?; - if exists == 1 { - Ok(()) - } else { - Err(KeyNotExists(key_id.to_string())) - } - } else { - Err(InternalError( - "EXISTS should always return something".to_string(), - )) - } - } - - pub fn get_stale_keys(&self, stale_timestamp: u64) -> Result, ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT label, peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, pinned, weight \ - FROM {KEYS_TABLE_NAME} WHERE timestamp_published <= ?" - ))?; - statement.bind(1, &Value::Integer(stale_timestamp as i64))?; - - let mut stale_keys: Vec = vec![]; - while let State::Row = statement.next()? { - stale_keys.push(read_internal_key(&statement)?); - } - - Ok(stale_keys) - } - - pub fn delete_key(&self, key_id: String) -> Result<(), ServiceError> { - let mut statement = self - .connection - .prepare(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key_id=?"))?; - statement.bind(1, &Value::String(key_id.clone()))?; - statement.next().map(drop)?; - - if self.connection.changes() == 1 { - Ok(()) - } else { - Err(KeyNotExists(key_id)) - } - } - - /// not pinned only - pub fn get_expired_keys(&self, expired_timestamp: u64) -> Result, ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT label, peer_id, timestamp_created, challenge, challenge_type, signature \ - FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= ? and pinned != 1" - ))?; - statement.bind(1, &Value::Integer(expired_timestamp as i64))?; - - let mut expired_keys: Vec = vec![]; - while let State::Row = statement.next()? { - let key = read_key(&statement)?; - let timestamp_accessed = self.get_key_timestamp_accessed(&key.key_id)?; - let with_host_records = self.get_host_records_count_by_key(key.key_id.clone())? != 0; - - if timestamp_accessed <= expired_timestamp && !with_host_records { - expired_keys.push(key); - } - } - - Ok(expired_keys) - } - - pub fn get_key_timestamp_accessed(&self, key_id: &str) -> Result { - let mut statement = self.connection.prepare(f!( - "SELECT timestamp_accessed FROM {KEYS_TIMESTAMPS_TABLE_NAME} WHERE key_id != ?" - ))?; - statement.bind(1, &Value::String(key_id.to_string()))?; - - if let State::Row = statement.next()? { - statement - .read::(0) - .map(|t| t as u64) - .map_err(ServiceError::SqliteError) - } else { - Err(KeyNotExists(key_id.to_string())) - } - } -} - -pub fn read_key(statement: &Statement) -> Result { - Ok(Key { - key_id: statement.read::(0)?, - label: statement.read::(1)?, - peer_id: statement.read::(2)?, - timestamp_created: statement.read::(3)? as u64, - challenge: statement.read::>(4)?, - challenge_type: statement.read::(5)?, - signature: statement.read::>(6)?, - }) -} - -pub fn read_internal_key(statement: &Statement) -> Result { - Ok(KeyInternal { - key: read_key(statement)?, - timestamp_published: statement.read::(7)? as u64, - pinned: statement.read::(8)? != 0, - weight: statement.read::(9)? as u32, - }) -} diff --git a/service/src/main.rs b/service/src/main.rs index f2de3ad..1c117c5 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -24,14 +24,14 @@ use crate::tetraplets_checkers::check_timestamp_tetraplets; mod config; mod defaults; mod error; -mod key; -mod key_api; -mod key_storage_impl; mod misc; mod record; mod record_api; mod record_storage_impl; mod results; +mod route; +mod route_api; +mod route_storage_impl; mod storage_impl; mod tests; mod tetraplets_checkers; @@ -59,7 +59,7 @@ pub struct WeightResult { fn main() { let storage = get_storage().unwrap(); - storage.create_keys_tables(); + storage.create_route_tables(); storage.create_values_table(); create_config(); } diff --git a/service/src/record.rs b/service/src/record.rs index 6b6bd20..b2d812d 100644 --- a/service/src/record.rs +++ b/service/src/record.rs @@ -23,7 +23,7 @@ use sha2::{Digest, Sha256}; #[marine] #[derive(Debug, Default, Clone)] pub struct Record { - pub key_id: String, + pub route_id: String, pub value: String, pub peer_id: String, pub set_by: String, @@ -43,7 +43,7 @@ pub struct RecordInternal { impl Record { pub fn signature_bytes(&self) -> Vec { let mut metadata = Vec::new(); - metadata.extend(self.key_id.as_bytes()); + metadata.extend(self.route_id.as_bytes()); metadata.extend(self.value.as_bytes()); metadata.extend(self.peer_id.as_bytes()); metadata.extend(self.set_by.as_bytes()); @@ -80,7 +80,7 @@ impl Record { let bytes = self.signature_bytes(); let signature = Signature::from_bytes(pk.get_key_format(), self.signature.clone()); pk.verify(&bytes, &signature).map_err(|e| { - ServiceError::InvalidRecordSignature(self.key_id.clone(), self.value.clone(), e) + ServiceError::InvalidRecordSignature(self.route_id.clone(), self.value.clone(), e) }) } } diff --git a/service/src/record_api.rs b/service/src/record_api.rs index 3651c9e..ca2f137 100644 --- a/service/src/record_api.rs +++ b/service/src/record_api.rs @@ -32,7 +32,7 @@ use marine_rs_sdk::marine; #[marine] pub fn get_record_bytes( - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -41,7 +41,7 @@ pub fn get_record_bytes( ) -> Vec { let cp = marine_rs_sdk::get_call_parameters(); Record { - key_id, + route_id, value, peer_id: cp.init_peer_id.clone(), set_by: cp.init_peer_id, @@ -56,7 +56,7 @@ pub fn get_record_bytes( #[marine] pub fn put_record( - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -72,7 +72,7 @@ pub fn put_record( check_timestamp_tetraplets(&cp, 8)?; check_weight_result(&cp.init_peer_id, &weight)?; let record = Record { - key_id, + route_id, value, peer_id: cp.init_peer_id.clone(), set_by: cp.init_peer_id, @@ -85,7 +85,7 @@ pub fn put_record( record.verify(current_timestamp_sec)?; let storage = get_storage()?; - storage.check_key_existence(&record.key_id)?; + storage.check_route_existence(&record.route_id)?; storage .update_record( RecordInternal { @@ -101,7 +101,7 @@ pub fn put_record( #[marine] pub fn get_host_record_bytes( - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -110,7 +110,7 @@ pub fn get_host_record_bytes( ) -> Vec { let cp = marine_rs_sdk::get_call_parameters(); Record { - key_id, + route_id, value, peer_id: cp.host_id, set_by: cp.init_peer_id, @@ -124,7 +124,7 @@ pub fn get_host_record_bytes( } #[marine] pub fn put_host_record( - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -140,7 +140,7 @@ pub fn put_host_record( check_timestamp_tetraplets(&cp, 8)?; check_weight_result(&cp.init_peer_id, &weight)?; let record = Record { - key_id, + route_id, value, peer_id: cp.host_id, set_by: cp.init_peer_id, @@ -153,7 +153,7 @@ pub fn put_host_record( record.verify(current_timestamp_sec)?; let storage = get_storage()?; - storage.check_key_existence(&record.key_id)?; + storage.check_route_existence(&record.route_id)?; storage.update_record( RecordInternal { record: record.clone(), @@ -180,7 +180,7 @@ pub fn propagate_host_record( return Err(ServiceError::InvalidSetHostValueResult); } - let mut record = set_host_value.value[0].clone(); + let record = set_host_value.value[0].clone(); record.verify(current_timestamp_sec)?; let call_parameters = marine_rs_sdk::get_call_parameters(); @@ -191,12 +191,12 @@ pub fn propagate_host_record( let weight = weight.weight; let storage = get_storage()?; - storage.check_key_existence(&record.key_id)?; - storage.update_key_timestamp(&record.key_id, current_timestamp_sec)?; + storage.check_route_existence(&record.route_id)?; + storage.update_route_timestamp(&record.route_id, current_timestamp_sec)?; storage .merge_and_update_records( - record.key_id.clone(), + record.route_id.clone(), vec![RecordInternal { record, weight }], ) .map(|_| ()) @@ -206,15 +206,15 @@ pub fn propagate_host_record( /// Return all values by key #[marine] -pub fn get_records(key_id: String, current_timestamp_sec: u64) -> GetValuesResult { +pub fn get_records(route_id: String, current_timestamp_sec: u64) -> GetValuesResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.check_key_existence(&key_id)?; - storage.update_key_timestamp(&key_id, current_timestamp_sec)?; + storage.check_route_existence(&route_id)?; + storage.update_route_timestamp(&route_id, current_timestamp_sec)?; storage - .get_records(key_id) + .get_records(route_id) .map(|records| records.into_iter().map(|r| r.record).collect()) }) .into() @@ -232,7 +232,7 @@ pub fn republish_records( return Ok(0); } - let key_id = records[0].key_id.clone(); + let route_id = records[0].route_id.clone(); let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 2)?; let mut records_to_merge = vec![]; @@ -245,7 +245,7 @@ pub fn republish_records( record.set_by.clone(), ))?; check_weight_result(&record.set_by, weight_result)?; - if record.key_id != key_id { + if record.route_id != route_id { return Err(ServiceError::RecordsPublishingError); } @@ -256,29 +256,29 @@ pub fn republish_records( } let storage = get_storage()?; - storage.check_key_existence(&key_id)?; - storage.update_key_timestamp(&key_id, current_timestamp_sec)?; - storage.merge_and_update_records(key_id, records_to_merge) + storage.check_route_existence(&route_id)?; + storage.update_route_timestamp(&route_id, current_timestamp_sec)?; + storage.merge_and_update_records(route_id, records_to_merge) }) .into() } /// Remove host value by key and caller peer_id #[marine] -pub fn clear_host_record(key_id: String, current_timestamp_sec: u64) -> DhtResult { +pub fn clear_host_record(route_id: String, current_timestamp_sec: u64) -> DhtResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.check_key_existence(&key_id)?; - storage.update_key_timestamp(&key_id, current_timestamp_sec)?; + storage.check_route_existence(&route_id)?; + storage.update_route_timestamp(&route_id, current_timestamp_sec)?; let peer_id = call_parameters.host_id; let set_by = call_parameters.init_peer_id; - let deleted = storage.delete_record(key_id.clone(), peer_id, set_by)?; + let deleted = storage.delete_record(route_id.clone(), peer_id, set_by)?; - deleted.as_result((), ServiceError::HostValueNotFound(key_id)) + deleted.as_result((), ServiceError::HostValueNotFound(route_id)) }) .into() } diff --git a/service/src/record_storage_impl.rs b/service/src/record_storage_impl.rs index 3aee7fa..959bceb 100644 --- a/service/src/record_storage_impl.rs +++ b/service/src/record_storage_impl.rs @@ -28,7 +28,7 @@ impl Storage { self.connection .execute(f!(" CREATE TABLE IF NOT EXISTS {RECORDS_TABLE_NAME} ( - key_id TEXT, + route_id TEXT, value TEXT, peer_id TEXT, set_by TEXT, @@ -38,7 +38,7 @@ impl Storage { solution BLOB, signature BLOB NOT NULL, weight INTEGER, - PRIMARY KEY (key_id, peer_id, set_by) + PRIMARY KEY (route_id, peer_id, set_by) ); ")) .is_ok() @@ -47,12 +47,13 @@ impl Storage { /// Put value with caller peer_id if the key exists. /// If the value is NOT a host value and the key already has `VALUES_LIMIT` records, then a value with the smallest weight is removed and the new value is inserted instead. pub fn update_record(&self, record: RecordInternal, host: bool) -> Result<(), ServiceError> { - let records_count = self.get_non_host_records_count_by_key(record.record.key_id.clone())?; + let records_count = + self.get_non_host_records_count_by_key(record.record.route_id.clone())?; // check values limits for non-host values if !host && records_count >= VALUES_LIMIT { let min_weight_record = - self.get_min_weight_non_host_record_by_key(record.record.key_id.clone())?; + self.get_min_weight_non_host_record_by_key(record.record.route_id.clone())?; if min_weight_record.weight < record.weight || (min_weight_record.weight == record.weight @@ -60,13 +61,13 @@ impl Storage { { // delete the lightest record if the new one is heavier or newer self.delete_record( - min_weight_record.record.key_id, + min_weight_record.record.route_id, min_weight_record.record.peer_id, min_weight_record.record.set_by, )?; } else { // return error if limit is exceeded - return Err(ServiceError::ValuesLimitExceeded(record.record.key_id)); + return Err(ServiceError::ValuesLimitExceeded(record.record.route_id)); } } @@ -79,7 +80,7 @@ impl Storage { "INSERT OR REPLACE INTO {RECORDS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ))?; - statement.bind(1, &Value::String(record.record.key_id))?; + statement.bind(1, &Value::String(record.record.route_id))?; statement.bind(2, &Value::String(record.record.value))?; statement.bind(3, &Value::String(record.record.peer_id))?; statement.bind(4, &Value::String(record.record.set_by))?; @@ -102,14 +103,14 @@ impl Storage { pub fn delete_record( &self, - key_id: String, + route_id: String, peer_id: String, set_by: String, ) -> Result { let mut statement = self.connection.prepare(f!( - "DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id=? AND peer_id=? AND set_by=?" + "DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id=? AND peer_id=? AND set_by=?" ))?; - statement.bind(1, &Value::String(key_id))?; + statement.bind(1, &Value::String(route_id))?; statement.bind(2, &Value::String(peer_id))?; statement.bind(3, &Value::String(set_by))?; statement.next().map(drop)?; @@ -119,23 +120,23 @@ impl Storage { fn get_min_weight_non_host_record_by_key( &self, - key_id: String, + route_id: String, ) -> Result { let host_id = marine_rs_sdk::get_call_parameters().host_id; // only only non-host values let mut statement = self.connection.prepare( - f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \ - WHERE key_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?; + f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \ + WHERE route_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?; - statement.bind(1, &Value::String(key_id.clone()))?; + statement.bind(1, &Value::String(route_id.clone()))?; statement.bind(2, &Value::String(host_id))?; if let State::Row = statement.next()? { read_record(&statement) } else { Err(InternalError(f!( - "not found non-host records for given key_id: {key_id}" + "not found non-host records for given route_id: {route_id}" ))) } } @@ -145,7 +146,7 @@ impl Storage { // only only non-host values let mut statement = self.connection.prepare(f!( - "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id != ?" + "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id != ?" ))?; statement.bind(1, &Value::String(key))?; statement.bind(2, &Value::String(host_id))?; @@ -162,14 +163,14 @@ impl Storage { } } - pub fn get_host_records_count_by_key(&self, key_id: String) -> Result { + pub fn get_host_records_count_by_key(&self, route_id: String) -> Result { let host_id = marine_rs_sdk::get_call_parameters().host_id; // only only non-host values let mut statement = self.connection.prepare(f!( - "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id = ?" + "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id = ?" ))?; - statement.bind(1, &Value::String(key_id))?; + statement.bind(1, &Value::String(route_id))?; statement.bind(2, &Value::String(host_id))?; if let State::Row = statement.next()? { @@ -186,11 +187,11 @@ impl Storage { pub fn merge_and_update_records( &self, - key_id: String, + route_id: String, records: Vec, ) -> Result { let records = merge_records( - self.get_records(key_id)? + self.get_records(route_id)? .into_iter() .chain(records.into_iter()) .collect(), @@ -205,11 +206,11 @@ impl Storage { Ok(updated) } - pub fn get_records(&self, key_id: String) -> Result, ServiceError> { + pub fn get_records(&self, route_id: String) -> Result, ServiceError> { let mut statement = self.connection.prepare( - f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \ - WHERE key_id = ? ORDER BY weight DESC"))?; - statement.bind(1, &Value::String(key_id))?; + f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \ + WHERE route_id = ? ORDER BY weight DESC"))?; + statement.bind(1, &Value::String(route_id))?; let mut result: Vec = vec![]; @@ -230,12 +231,12 @@ impl Storage { } /// except host records and for pinned keys - pub fn delete_records_by_key(&self, key_id: String) -> Result { + pub fn delete_records_by_key(&self, route_id: String) -> Result { let mut statement = self .connection - .prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id = ?"))?; + .prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id = ?"))?; - statement.bind(1, &Value::String(key_id))?; + statement.bind(1, &Value::String(route_id))?; statement.next().map(drop)?; Ok(self.connection.changes() as u64) @@ -245,7 +246,7 @@ impl Storage { pub fn read_record(statement: &Statement) -> Result { Ok(RecordInternal { record: Record { - key_id: statement.read::(0)?, + route_id: statement.read::(0)?, value: statement.read::(1)?, peer_id: statement.read::(2)?, set_by: statement.read::(3)?, diff --git a/service/src/results.rs b/service/src/results.rs index 1270b36..a3d5e5b 100644 --- a/service/src/results.rs +++ b/service/src/results.rs @@ -15,8 +15,8 @@ */ use crate::error::ServiceError; -use crate::key::Key; use crate::record::Record; +use crate::route::Route; use marine_rs_sdk::marine; #[marine] @@ -43,24 +43,24 @@ impl From> for DhtResult { #[marine] #[derive(Debug)] -pub struct RegisterKeyResult { +pub struct RegisterRouteResult { pub success: bool, pub error: String, - pub key_id: String, + pub route_id: String, } -impl From> for RegisterKeyResult { +impl From> for RegisterRouteResult { fn from(result: Result) -> Self { match result { - Ok(key_id) => Self { + Ok(route_id) => Self { success: true, error: "".to_string(), - key_id, + route_id, }, Err(err) => Self { success: false, error: err.to_string(), - key_id: "".to_string(), + route_id: "".to_string(), }, } } @@ -96,22 +96,22 @@ impl From, ServiceError>> for GetValuesResult { pub struct ClearExpiredResult { pub success: bool, pub error: String, - pub count_keys: u64, + pub count_routes: u64, pub count_values: u64, } impl From> for ClearExpiredResult { fn from(result: Result<(u64, u64), ServiceError>) -> Self { match result { - Ok((keys, values)) => Self { + Ok((routes, values)) => Self { success: true, - count_keys: keys, + count_routes: routes, count_values: values, error: "".to_string(), }, Err(err) => Self { success: false, - count_keys: 0, + count_routes: 0, count_values: 0, error: err.to_string(), }, @@ -145,24 +145,24 @@ impl From, ServiceError>> for GetStaleRecordsResult { } #[marine] -pub struct GetKeyMetadataResult { +pub struct GetRouteMetadataResult { pub success: bool, pub error: String, - pub key: Key, + pub route: Route, } -impl From> for GetKeyMetadataResult { - fn from(result: Result) -> Self { +impl From> for GetRouteMetadataResult { + fn from(result: Result) -> Self { match result { - Ok(key) => Self { + Ok(route) => Self { success: true, error: "".to_string(), - key, + route, }, Err(err) => Self { success: false, error: err.to_string(), - key: Key::default(), + route: Route::default(), }, } } @@ -194,7 +194,7 @@ impl From> for RepublishValuesResult { #[marine] pub struct EvictStaleItem { - pub key: Key, + pub route: Route, pub records: Vec, } diff --git a/service/src/key.rs b/service/src/route.rs similarity index 88% rename from service/src/key.rs rename to service/src/route.rs index be6a9b1..c3b6f6c 100644 --- a/service/src/key.rs +++ b/service/src/route.rs @@ -22,8 +22,8 @@ use sha2::{Digest, Sha256}; #[marine] #[derive(Default, Clone)] -pub struct Key { - pub key_id: String, +pub struct Route { + pub id: String, pub label: String, pub peer_id: String, pub timestamp_created: u64, @@ -33,14 +33,14 @@ pub struct Key { } #[derive(Default, Clone)] -pub struct KeyInternal { - pub key: Key, +pub struct RouteInternal { + pub route: Route, pub timestamp_published: u64, pub pinned: bool, pub weight: u32, } -impl Key { +impl Route { pub fn new( label: String, peer_id: String, @@ -49,10 +49,10 @@ impl Key { challenge_type: String, signature: Vec, ) -> Self { - let key_id = Self::get_key_id(&label, &peer_id); + let id = Self::get_id(&label, &peer_id); Self { - key_id, + id, label, peer_id, timestamp_created, @@ -62,8 +62,8 @@ impl Key { } } - pub fn get_key_id(key: &str, peer_id: &str) -> String { - format!("{}{}", key, peer_id) + pub fn get_id(label: &str, peer_id: &str) -> String { + format!("{}{}", label, peer_id) } pub fn signature_bytes(&self) -> Vec { @@ -81,7 +81,7 @@ impl Key { pub fn verify(&self, current_timestamp_sec: u64) -> Result<(), ServiceError> { if self.timestamp_created > current_timestamp_sec { - return Err(ServiceError::InvalidKeyTimestamp); + return Err(ServiceError::InvalidRouteTimestamp); } self.verify_signature() diff --git a/service/src/key_api.rs b/service/src/route_api.rs similarity index 65% rename from service/src/key_api.rs rename to service/src/route_api.rs index c270b31..67e8026 100644 --- a/service/src/key_api.rs +++ b/service/src/route_api.rs @@ -14,23 +14,23 @@ * limitations under the License. */ use crate::error::ServiceError; -use crate::key::{Key, KeyInternal}; use crate::misc::check_weight_result; -use crate::results::{DhtResult, GetKeyMetadataResult, RegisterKeyResult}; +use crate::results::{DhtResult, GetRouteMetadataResult, RegisterRouteResult}; +use crate::route::{Route, RouteInternal}; use crate::storage_impl::get_storage; use crate::tetraplets_checkers::{check_timestamp_tetraplets, check_weight_tetraplets}; use crate::{wrapped_try, WeightResult}; use marine_rs_sdk::marine; #[marine] -pub fn get_key_bytes( +pub fn get_route_bytes( label: String, mut peer_id: Vec, timestamp_created: u64, challenge: Vec, challenge_type: String, ) -> Vec { - Key { + Route { label, peer_id: peer_id .pop() @@ -44,14 +44,14 @@ pub fn get_key_bytes( } #[marine] -pub fn get_key_id(label: String, peer_id: String) -> String { - Key::get_key_id(&label, &peer_id) +pub fn get_route_id(label: String, peer_id: String) -> String { + Route::get_id(&label, &peer_id) } -/// register new key if not exists with caller peer_id, update if exists with same peer_id or return error +/// register new route if not exists with caller peer_id, update if exists with same peer_id or return error #[marine] -pub fn register_key( - key: String, +pub fn register_route( + label: String, peer_id: Vec, timestamp_created: u64, challenge: Vec, @@ -60,7 +60,7 @@ pub fn register_key( pin: bool, weight: WeightResult, current_timestamp_sec: u64, -) -> RegisterKeyResult { +) -> RegisterRouteResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_weight_tetraplets(&call_parameters, 7, 0)?; @@ -70,68 +70,72 @@ pub fn register_key( .unwrap_or(&call_parameters.init_peer_id) .clone(); check_weight_result(&peer_id, &weight)?; - let key = Key::new( - key, + let route = Route::new( + label, peer_id, timestamp_created, challenge, challenge_type, signature, ); - key.verify(current_timestamp_sec)?; + route.verify(current_timestamp_sec)?; - let key_id = key.key_id.clone(); + let route_id = route.id.clone(); let weight = weight.weight; let storage = get_storage()?; - storage.update_key_timestamp(&key.key_id, current_timestamp_sec)?; - storage.update_key(KeyInternal { - key, + storage.update_route_timestamp(&route.id, current_timestamp_sec)?; + storage.update_route(RouteInternal { + route, timestamp_published: 0, pinned: pin, weight, })?; - Ok(key_id) + Ok(route_id) }) .into() } #[marine] -pub fn get_key_metadata(key_id: String, current_timestamp_sec: u64) -> GetKeyMetadataResult { +pub fn get_route_metadata(route_id: String, current_timestamp_sec: u64) -> GetRouteMetadataResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.update_key_timestamp(&key_id, current_timestamp_sec)?; - storage.get_key(key_id) + storage.update_route_timestamp(&route_id, current_timestamp_sec)?; + storage.get_route(route_id) }) .into() } -/// Used for replication, same as register_key, but key.pinned is ignored, updates timestamp_accessed +/// Used for replication, same as register_route, but route.pinned is ignored, updates timestamp_accessed #[marine] -pub fn republish_key(mut key: Key, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult { +pub fn republish_route( + mut route: Route, + weight: WeightResult, + current_timestamp_sec: u64, +) -> DhtResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_weight_tetraplets(&call_parameters, 1, 0)?; - check_weight_result(&key.peer_id, &weight)?; + check_weight_result(&route.peer_id, &weight)?; check_timestamp_tetraplets(&call_parameters, 2)?; - key.verify(current_timestamp_sec)?; + route.verify(current_timestamp_sec)?; // just to be sure - key.key_id = Key::get_key_id(&key.label, &key.peer_id); + route.id = Route::get_id(&route.label, &route.peer_id); let storage = get_storage()?; - storage.update_key_timestamp(&key.key_id, current_timestamp_sec)?; - match storage.update_key(KeyInternal { - key, + storage.update_route_timestamp(&route.id, current_timestamp_sec)?; + match storage.update_route(RouteInternal { + route, timestamp_published: 0, pinned: false, weight: weight.weight, }) { // we should ignore this error for republish - Err(ServiceError::KeyAlreadyExistsNewerTimestamp(_, _)) => Ok(()), + Err(ServiceError::RouteAlreadyExistsNewerTimestamp(_, _)) => Ok(()), other => other, } }) diff --git a/service/src/route_storage_impl.rs b/service/src/route_storage_impl.rs new file mode 100644 index 0000000..1493cfa --- /dev/null +++ b/service/src/route_storage_impl.rs @@ -0,0 +1,239 @@ +/* + * Copyright 2021 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::defaults::{ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME}; + +use crate::error::ServiceError; +use crate::error::ServiceError::{InternalError, RouteNotExists}; +use crate::route::{Route, RouteInternal}; +use crate::storage_impl::Storage; +use marine_sqlite_connector::{State, Statement, Value}; + +impl Storage { + pub fn create_route_tables(&self) -> bool { + self.connection + .execute(f!(" + CREATE TABLE IF NOT EXISTS {ROUTES_TABLE_NAME} ( + route_id TEXT PRIMARY KEY, + label TEXT, + peer_id TEXT, + timestamp_created INTEGER, + challenge BLOB, + challenge_type TEXT, + signature BLOB NOT NULL, + timestamp_published INTEGER, + pinned INTEGER, + weight INTEGER + ); + ")) + .is_ok() + && self + .connection + .execute(f!(" + CREATE TABLE IF NOT EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME} ( + route_id TEXT PRIMARY KEY, + timestamp_accessed INTEGER + ); + ")) + .is_ok() + } + + pub fn update_route_timestamp( + &self, + route_id: &str, + current_timestamp_sec: u64, + ) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!(" + INSERT OR REPLACE INTO {ROUTES_TIMESTAMPS_TABLE_NAME} VALUES (?, ?); + "))?; + + statement.bind(1, &Value::String(route_id.to_string()))?; + statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?; + statement.next()?; + Ok(()) + } + + pub fn get_route(&self, route_id: String) -> Result { + let mut statement = self.connection.prepare(f!( + "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \ + FROM {ROUTES_TABLE_NAME} WHERE route_id = ?" + ))?; + statement.bind(1, &Value::String(route_id.clone()))?; + + if let State::Row = statement.next()? { + read_route(&statement) + } else { + Err(RouteNotExists(route_id)) + } + } + + pub fn write_route(&self, route: RouteInternal) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!(" + INSERT OR REPLACE INTO {ROUTES_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + "))?; + + let pinned = if route.pinned { 1 } else { 0 } as i64; + statement.bind(1, &Value::String(route.route.id))?; + statement.bind(2, &Value::String(route.route.label))?; + statement.bind(3, &Value::String(route.route.peer_id))?; + statement.bind(4, &Value::Integer(route.route.timestamp_created as i64))?; + statement.bind(5, &Value::Binary(route.route.challenge))?; + statement.bind(6, &Value::String(route.route.challenge_type))?; + statement.bind(7, &Value::Binary(route.route.signature))?; + statement.bind(8, &Value::Integer(route.timestamp_published as i64))?; + statement.bind(9, &Value::Integer(pinned))?; + statement.bind(10, &Value::Integer(route.weight as i64))?; + statement.next()?; + Ok(()) + } + + pub fn update_route(&self, route: RouteInternal) -> Result<(), ServiceError> { + if let Ok(existing_route) = self.get_route(route.route.id.clone()) { + if existing_route.timestamp_created > route.route.timestamp_created { + return Err(ServiceError::RouteAlreadyExistsNewerTimestamp( + route.route.label, + route.route.peer_id, + )); + } + } + + self.write_route(route) + } + + pub fn check_route_existence(&self, route_id: &str) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT EXISTS(SELECT 1 FROM {ROUTES_TABLE_NAME} WHERE route_id = ? LIMIT 1)" + ))?; + statement.bind(1, &Value::String(route_id.to_string()))?; + + if let State::Row = statement.next()? { + let exists = statement.read::(0)?; + if exists == 1 { + Ok(()) + } else { + Err(RouteNotExists(route_id.to_string())) + } + } else { + Err(InternalError( + "EXISTS should always return something".to_string(), + )) + } + } + + pub fn get_stale_routes( + &self, + stale_timestamp: u64, + ) -> Result, ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, pinned, weight \ + FROM {ROUTES_TABLE_NAME} WHERE timestamp_published <= ?" + ))?; + statement.bind(1, &Value::Integer(stale_timestamp as i64))?; + + let mut stale_keys: Vec = vec![]; + while let State::Row = statement.next()? { + stale_keys.push(read_internal_route(&statement)?); + } + + Ok(stale_keys) + } + + pub fn delete_key(&self, route_id: String) -> Result<(), ServiceError> { + let mut statement = self + .connection + .prepare(f!("DELETE FROM {ROUTES_TABLE_NAME} WHERE route_id=?"))?; + statement.bind(1, &Value::String(route_id.clone()))?; + statement.next().map(drop)?; + + if self.connection.changes() == 1 { + Ok(()) + } else { + Err(RouteNotExists(route_id)) + } + } + + /// not pinned only + pub fn get_expired_routes(&self, expired_timestamp: u64) -> Result, ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \ + FROM {ROUTES_TABLE_NAME} WHERE timestamp_created <= ? and pinned != 1" + ))?; + statement.bind(1, &Value::Integer(expired_timestamp as i64))?; + + let mut expired_routes: Vec = vec![]; + while let State::Row = statement.next()? { + let route = read_route(&statement)?; + let timestamp_accessed = self.get_route_timestamp_accessed(&route.id)?; + let with_host_records = self.get_host_records_count_by_key(route.id.clone())? != 0; + + if timestamp_accessed <= expired_timestamp && !with_host_records { + expired_routes.push(route); + } + } + + Ok(expired_routes) + } + + pub fn get_route_timestamp_accessed(&self, route_id: &str) -> Result { + let mut statement = self.connection.prepare(f!( + "SELECT timestamp_accessed FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE route_id = ?" + ))?; + statement.bind(1, &Value::String(route_id.to_string()))?; + + if let State::Row = statement.next()? { + statement + .read::(0) + .map(|t| t as u64) + .map_err(ServiceError::SqliteError) + } else { + Err(RouteNotExists(route_id.to_string())) + } + } + + pub fn clear_expired_timestamps_accessed( + &self, + expired_timestamp: u64, + ) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!( + "DELETE FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE timestamp_accessed < ?" + ))?; + statement.bind(1, &Value::Integer(expired_timestamp as i64))?; + statement.next().map(drop)?; + + Ok(()) + } +} + +pub fn read_route(statement: &Statement) -> Result { + Ok(Route { + id: statement.read::(0)?, + label: statement.read::(1)?, + peer_id: statement.read::(2)?, + timestamp_created: statement.read::(3)? as u64, + challenge: statement.read::>(4)?, + challenge_type: statement.read::(5)?, + signature: statement.read::>(6)?, + }) +} + +pub fn read_internal_route(statement: &Statement) -> Result { + Ok(RouteInternal { + route: read_route(statement)?, + timestamp_published: statement.read::(7)? as u64, + pinned: statement.read::(8)? != 0, + weight: statement.read::(9)? as u32, + }) +} diff --git a/service/src/storage_impl.rs b/service/src/storage_impl.rs index ff8a844..4b974db 100644 --- a/service/src/storage_impl.rs +++ b/service/src/storage_impl.rs @@ -59,14 +59,15 @@ impl Storage { // delete expired non-host records deleted_values += self.clear_expired_records(expired_timestamp)?; - let expired_keys = self.get_expired_keys(expired_timestamp)?; + let expired_keys = self.get_expired_routes(expired_timestamp)?; for key in expired_keys { - self.delete_key(key.key_id)?; + self.delete_key(key.id)?; deleted_keys += self.connection.changes() as u64; } - // TODO: clear expired timestamp accessed for keys + self.clear_expired_timestamps_accessed(expired_timestamp)?; + Ok((deleted_keys, deleted_values)) } @@ -79,30 +80,30 @@ impl Storage { ) -> Result, ServiceError> { let stale_timestamp = current_timestamp_sec - load_config().stale_timeout; - let stale_keys = self.get_stale_keys(stale_timestamp)?; + let stale_keys = self.get_stale_routes(stale_timestamp)?; let mut key_to_delete: Vec = vec![]; let mut results: Vec = vec![]; let host_id = marine_rs_sdk::get_call_parameters().host_id; - for key in stale_keys.into_iter() { + for route in stale_keys.into_iter() { let records: Vec = self - .get_records(key.key.key_id.clone())? + .get_records(route.route.id.clone())? .into_iter() .map(|r| r.record) .collect(); - if !key.pinned && !records.iter().any(|r| r.peer_id == host_id) { - key_to_delete.push(key.key.key_id.clone()); + if !route.pinned && !records.iter().any(|r| r.peer_id == host_id) { + key_to_delete.push(route.route.id.clone()); } results.push(EvictStaleItem { - key: key.key, + route: route.route, records, }); } - for key_id in key_to_delete { - self.delete_key(key_id.clone())?; - self.delete_records_by_key(key_id)?; + for route_id in key_to_delete { + self.delete_key(route_id.clone())?; + self.delete_records_by_key(route_id)?; } Ok(results) diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index cdff620..628ace8 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -24,21 +24,23 @@ mod tests { use marine_test_env::registry::{DhtResult, Record, ServiceInterface}; use crate::defaults::{ - CONFIG_FILE, DB_PATH, DEFAULT_STALE_VALUE_AGE, KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME, - RECORDS_TABLE_NAME, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, - TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID, VALUES_LIMIT, + CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME, + TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, + TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID, }; use crate::error::ServiceError::{ - InvalidKeyTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId, - KeyAlreadyExistsNewerTimestamp, + InvalidRouteTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId, + RouteAlreadyExistsNewerTimestamp, + }; + use crate::tests::tests::marine_test_env::registry::{ + RegisterRouteResult, Route, WeightResult, }; - use crate::tests::tests::marine_test_env::registry::{Key, RegisterKeyResult, WeightResult}; const HOST_ID: &str = "some_host_id"; - impl PartialEq for Key { + impl PartialEq for Route { fn eq(&self, other: &Self) -> bool { - self.key_id == other.key_id + self.id == other.id && self.label == other.label && self.timestamp_created == other.timestamp_created && self.signature == other.signature @@ -46,17 +48,17 @@ mod tests { } } - impl Eq for Key {} + impl Eq for Route {} fn clear_env() { let connection = Connection::open(DB_PATH).unwrap(); connection - .execute(f!("DROP TABLE IF EXISTS {KEYS_TABLE_NAME}").as_str(), []) + .execute(f!("DROP TABLE IF EXISTS {ROUTES_TABLE_NAME}").as_str(), []) .unwrap(); connection .execute( - f!("DROP TABLE IF EXISTS {KEYS_TIMESTAMPS_TABLE_NAME}").as_str(), + f!("DROP TABLE IF EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME}").as_str(), [], ) .unwrap(); @@ -144,7 +146,7 @@ mod tests { } } - fn get_signed_key_bytes( + fn get_signed_route_bytes( registry: &mut ServiceInterface, kp: &KeyPair, label: String, @@ -153,17 +155,17 @@ mod tests { challenge_type: String, ) -> Vec { let issuer_peer_id = kp.get_peer_id().to_base58(); - let key_bytes = registry.get_key_bytes( + let route_bytes = registry.get_route_bytes( label.clone(), vec![issuer_peer_id.clone()], timestamp_created, challenge, challenge_type, ); - kp.sign(&key_bytes).unwrap().to_vec().to_vec() + kp.sign(&route_bytes).unwrap().to_vec().to_vec() } - fn register_key( + fn register_route( registry: &mut ServiceInterface, kp: &KeyPair, label: String, @@ -171,11 +173,11 @@ mod tests { current_timestamp: u64, pin: bool, weight: u32, - ) -> RegisterKeyResult { + ) -> RegisterRouteResult { let issuer_peer_id = kp.get_peer_id().to_base58(); let challenge = vec![]; let challenge_type = "".to_string(); - let signature = get_signed_key_bytes( + let signature = get_signed_route_bytes( registry, kp, label.clone(), @@ -187,7 +189,7 @@ mod tests { .add_weight_tetraplets(7) .add_timestamp_tetraplets(8); let weight = get_weight(issuer_peer_id.clone(), weight); - registry.register_key_cp( + registry.register_route_cp( label, vec![issuer_peer_id], timestamp_created, @@ -201,43 +203,43 @@ mod tests { ) } - fn register_key_checked( + fn register_route_checked( registry: &mut ServiceInterface, kp: &KeyPair, - key: String, + route: String, timestamp_created: u64, current_timestamp: u64, pin: bool, weight: u32, ) -> String { - let result = register_key( + let result = register_route( registry, kp, - key, + route, timestamp_created, current_timestamp, pin, weight, ); assert!(result.success, "{}", result.error); - result.key_id + result.route_id } - fn get_key_metadata( + fn get_route_metadata( registry: &mut ServiceInterface, - key_id: String, + route_id: String, current_timestamp: u64, - ) -> Key { + ) -> Route { let cp = CPWrapper::new("peer_id").add_timestamp_tetraplets(1); - let result = registry.get_key_metadata_cp(key_id, current_timestamp, cp.get()); + let result = registry.get_route_metadata_cp(route_id, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); - result.key + result.route } fn get_signed_record_bytes( registry: &mut ServiceInterface, kp: &KeyPair, - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -245,9 +247,9 @@ mod tests { solution: Vec, ) -> Vec { let issuer_peer_id = kp.get_peer_id().to_base58(); - let mut cp = CPWrapper::new(&issuer_peer_id); + let cp = CPWrapper::new(&issuer_peer_id); let record_bytes = registry.get_record_bytes_cp( - key_id, + route_id, value, relay_id, service_id, @@ -262,7 +264,7 @@ mod tests { fn put_record( registry: &mut ServiceInterface, kp: &KeyPair, - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -276,7 +278,7 @@ mod tests { let signature = get_signed_record_bytes( registry, kp, - key_id.clone(), + route_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -284,12 +286,12 @@ mod tests { solution.clone(), ); - let mut cp = CPWrapper::new(&issuer_peer_id) + let cp = CPWrapper::new(&issuer_peer_id) .add_weight_tetraplets(7) .add_timestamp_tetraplets(8); let weight = get_weight(issuer_peer_id.clone(), weight); registry.put_record_cp( - key_id, + route_id, value, relay_id, service_id, @@ -305,7 +307,7 @@ mod tests { fn put_record_checked( registry: &mut ServiceInterface, kp: &KeyPair, - key_id: String, + route_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -316,7 +318,7 @@ mod tests { let result = put_record( registry, kp, - key_id, + route_id, value, relay_id, service_id, @@ -329,33 +331,30 @@ mod tests { fn get_records( registry: &mut ServiceInterface, - key_id: String, + route_id: String, current_timestamp: u64, ) -> Vec { let cp = CPWrapper::new("some_peer_id").add_timestamp_tetraplets(1); - let result = registry.get_records_cp(key_id, current_timestamp, cp.get()); + let result = registry.get_records_cp(route_id, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); result.result } #[test] - fn register_key_invalid_signature() { + fn register_route_invalid_signature() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); let mut cp = CPWrapper::new(&issuer_peer_id); - let key = "some_key".to_string(); - let timestamp_created = 0u64; - let current_timestamp = 100u64; let weight = get_weight(issuer_peer_id.clone(), 0); let invalid_signature = vec![]; cp = cp.add_weight_tetraplets(5).add_timestamp_tetraplets(6); - let reg_key_result = registry.register_key_cp( - "some_key".to_string(), + let reg_route_result = registry.register_route_cp( + "some_route".to_string(), vec![], 100u64, vec![], @@ -366,24 +365,24 @@ mod tests { 10u64, cp.get(), ); - assert!(!reg_key_result.success); + assert!(!reg_route_result.success); } #[test] - fn register_key_invalid_weight_tetraplet() { + fn register_route_invalid_weight_tetraplet() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); let mut cp = CPWrapper::new(&issuer_peer_id); - let label = "some_key".to_string(); + let label = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let challenge = vec![]; let challenge_type = "".to_string(); let weight = get_weight(issuer_peer_id.clone(), 0); - let signature = get_signed_key_bytes( + let signature = get_signed_route_bytes( &mut registry, &kp, label.clone(), @@ -393,7 +392,7 @@ mod tests { ); cp = cp.add_timestamp_tetraplets(8); - let reg_key_result = registry.register_key_cp( + let reg_route_result = registry.register_route_cp( label, vec![], timestamp_created, @@ -405,23 +404,23 @@ mod tests { current_timestamp, cp.get(), ); - assert!(!reg_key_result.success); + assert!(!reg_route_result.success); } #[test] - fn register_key_missing_timestamp_tetraplet() { + fn register_route_missing_timestamp_tetraplet() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let label = "some_key".to_string(); + let label = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = get_weight(issuer_peer_id.clone(), 0); let challenge = vec![1u8, 2u8, 3u8]; let challenge_type = "type".to_string(); - let signature = get_signed_key_bytes( + let signature = get_signed_route_bytes( &mut registry, &kp, label.clone(), @@ -431,7 +430,7 @@ mod tests { ); let cp = CPWrapper::new(&issuer_peer_id).add_weight_tetraplets(7); - let reg_key_result = registry.register_key_cp( + let reg_route_result = registry.register_route_cp( label, vec![], timestamp_created, @@ -443,29 +442,29 @@ mod tests { current_timestamp, cp.get(), ); - assert!(!reg_key_result.success); + assert!(!reg_route_result.success); assert_eq!( - reg_key_result.error, + reg_route_result.error, InvalidTimestampTetraplet(format!("{:?}", cp.cp.tetraplets)).to_string() ); } #[test] - fn register_key_invalid_weight_peer_id() { + fn register_route_invalid_weight_peer_id() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); let invalid_peer_id = "INVALID_PEER_ID".to_string(); let mut cp = CPWrapper::new(&issuer_peer_id); - let label = "some_key".to_string(); + let label = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let challenge = vec![1u8, 2u8, 3u8]; let challenge_type = "type".to_string(); let weight = get_weight(invalid_peer_id.clone(), 0); - let signature = get_signed_key_bytes( + let signature = get_signed_route_bytes( &mut registry, &kp, label.clone(), @@ -475,7 +474,7 @@ mod tests { ); cp = cp.add_weight_tetraplets(7).add_timestamp_tetraplets(8); - let reg_key_result = registry.register_key_cp( + let reg_route_result = registry.register_route_cp( label, vec![], timestamp_created, @@ -487,28 +486,28 @@ mod tests { current_timestamp, cp.get(), ); - assert!(!reg_key_result.success); + assert!(!reg_route_result.success); assert_eq!( - reg_key_result.error, + reg_route_result.error, InvalidWeightPeerId(issuer_peer_id, invalid_peer_id).to_string() ); } #[test] - fn register_key_correct() { + fn register_route_correct() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; let pin = false; - let result = register_key( + let result = register_route( &mut registry, &kp, - key, + route, timestamp_created, current_timestamp, pin, @@ -519,20 +518,20 @@ mod tests { } #[test] - fn register_key_older_timestamp() { + fn register_route_older_timestamp() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let timestamp_created_first = 100u64; let current_timestamp = 1000u64; let weight = 0; let pin = false; - register_key_checked( + register_route_checked( &mut registry, &kp, - key.clone(), + route.clone(), timestamp_created_first, current_timestamp, pin, @@ -540,10 +539,10 @@ mod tests { ); let timestamp_created_second = timestamp_created_first - 10u64; - let result_second = register_key( + let result_second = register_route( &mut registry, &kp, - key.clone(), + route.clone(), timestamp_created_second, current_timestamp, pin, @@ -552,89 +551,89 @@ mod tests { assert_eq!( result_second.error, - KeyAlreadyExistsNewerTimestamp(key, kp.get_peer_id().to_base58()).to_string() + RouteAlreadyExistsNewerTimestamp(route, kp.get_peer_id().to_base58()).to_string() ); } #[test] - fn register_key_in_the_future() { + fn register_route_in_the_future() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let current_timestamp = 100u64; let timestamp_created = current_timestamp + 100u64; let weight = 0; let pin = false; - let result = register_key( + let result = register_route( &mut registry, &kp, - key, + route, timestamp_created, current_timestamp, pin, weight, ); - assert_eq!(result.error, InvalidKeyTimestamp.to_string()) + assert_eq!(result.error, InvalidRouteTimestamp.to_string()) } #[test] - fn register_key_update_republish_old() { + fn register_route_update_republish_old() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let timestamp_created_old = 0u64; let current_timestamp = 100u64; let weight = 0; let pin = false; - let key_id = register_key_checked( + let route_id = register_route_checked( &mut registry, &kp, - key.clone(), + route.clone(), timestamp_created_old, current_timestamp, pin, weight, ); - let old_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); + let old_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); let timestamp_created_new = timestamp_created_old + 10u64; - register_key_checked( + register_route_checked( &mut registry, &kp, - key, + route, timestamp_created_new, current_timestamp, pin, weight, ); - let new_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); - assert_ne!(old_key, new_key); + let new_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); + assert_ne!(old_route, new_route); let cp = CPWrapper::new(&issuer_peer_id) .add_weight_tetraplets(1) .add_timestamp_tetraplets(2); let weight = get_weight(issuer_peer_id.clone(), weight); let result = - registry.republish_key_cp(old_key.clone(), weight, current_timestamp, cp.get()); + registry.republish_route_cp(old_route.clone(), weight, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); - let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); - assert_eq!(new_key, result_key); + let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); + assert_eq!(new_route, result_route); } #[test] - fn get_key_metadata_test() { + fn get_route_metadata_test() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let label = "some_key".to_string(); + let label = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; @@ -643,16 +642,16 @@ mod tests { let challenge_type = "".to_string(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let key_bytes = registry.get_key_bytes( + let route_bytes = registry.get_route_bytes( label.clone(), vec![issuer_peer_id.clone()], timestamp_created, challenge.clone(), challenge_type.clone(), ); - let signature = kp.sign(&key_bytes).unwrap().to_vec().to_vec(); + let signature = kp.sign(&route_bytes).unwrap().to_vec().to_vec(); - let key_id = register_key_checked( + let route_id = register_route_checked( &mut registry, &kp, label.clone(), @@ -662,9 +661,9 @@ mod tests { weight, ); - let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); - let expected_key = Key { - key_id, + let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); + let expected_route = Route { + id: route_id, label, peer_id: issuer_peer_id, timestamp_created, @@ -672,38 +671,38 @@ mod tests { challenge_type, signature, }; - assert_eq!(result_key, expected_key); + assert_eq!(result_route, expected_route); } #[test] - fn republish_same_key_test() { + fn republish_same_route_test() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; let pin = false; - let key_id = register_key_checked( + let route_id = register_route_checked( &mut registry, &kp, - key.clone(), + route.clone(), timestamp_created, current_timestamp, pin, weight, ); - let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); + let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); let cp = CPWrapper::new(&issuer_peer_id) .add_weight_tetraplets(1) .add_timestamp_tetraplets(2); let weight = get_weight(issuer_peer_id.clone(), weight); let result = - registry.republish_key_cp(result_key.clone(), weight, current_timestamp, cp.get()); + registry.republish_route_cp(result_route.clone(), weight, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); } @@ -712,16 +711,16 @@ mod tests { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let key = "some_key".to_string(); + let route = "some_route".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; let pin = false; - let key_id = register_key_checked( + let route_id = register_route_checked( &mut registry, &kp, - key, + route, timestamp_created, current_timestamp, pin, @@ -735,7 +734,7 @@ mod tests { put_record_checked( &mut registry, &kp, - key_id.clone(), + route_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -744,10 +743,10 @@ mod tests { weight, ); - let records = get_records(&mut registry, key_id.clone(), current_timestamp); + let records = get_records(&mut registry, route_id.clone(), current_timestamp); assert_eq!(records.len(), 1); let record = &records[0]; - assert_eq!(record.key_id, key_id); + assert_eq!(record.route_id, route_id); assert_eq!(record.relay_id, relay_id); assert_eq!(record.service_id, service_id); assert_eq!(record.peer_id, kp.get_peer_id().to_base58());