Change API and db structure from keys to routes (#87)

This commit is contained in:
Aleksey Proshutisnkiy 2022-03-10 19:49:38 +04:00 committed by GitHub
parent 52217121bc
commit 16fba064be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 567 additions and 546 deletions

View File

@ -11,40 +11,40 @@ data SignResult:
service Sig("sig"): service Sig("sig"):
sign(data: []u8) -> SignResult sign(data: []u8) -> SignResult
func get_key_signature(label: string, timestamp_created: u64) -> []u8: func get_route_signature(label: string, timestamp_created: u64) -> []u8:
on HOST_PEER_ID: on HOST_PEER_ID:
bytes <- Registry.get_key_bytes(label, nil, timestamp_created, nil, "") bytes <- Registry.get_route_bytes(label, nil, timestamp_created, nil, "")
signature <- Sig.sign(bytes) signature <- Sig.sign(bytes)
<- signature.signature <- signature.signature
func get_record_signature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: func get_record_signature(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8:
on HOST_PEER_ID: on HOST_PEER_ID:
bytes <- Registry.get_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) bytes <- Registry.get_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil)
signature <- Sig.sign(bytes) signature <- Sig.sign(bytes)
<- signature.signature <- signature.signature
func get_host_record_signature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: func get_host_record_signature(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8:
on HOST_PEER_ID: on HOST_PEER_ID:
bytes <- Registry.get_host_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) bytes <- Registry.get_host_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil)
signature <- Sig.sign(bytes) signature <- Sig.sign(bytes)
<- signature.signature <- signature.signature
func register_key(label: string, timestamp_created: u64, signature: []u8, pin: bool) -> RegisterKeyResult: func register_route(label: string, timestamp_created: u64, signature: []u8, pin: bool) -> RegisterRouteResult:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
weight <- TrustGraph.get_weight(%init_peer_id%, t) weight <- TrustGraph.get_weight(%init_peer_id%, t)
result <- Registry.register_key(label, nil, timestamp_created, nil, "", signature, pin, weight, t) result <- Registry.register_route(label, nil, timestamp_created, nil, "", signature, pin, weight, t)
<- result <- result
func put_record(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> DhtResult: func put_record(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> DhtResult:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
weight <- TrustGraph.get_weight(%init_peer_id%, t) weight <- TrustGraph.get_weight(%init_peer_id%, t)
result <- Registry.put_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) result <- Registry.put_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t)
<- result <- result
func put_host_record(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult: func put_host_record(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
weight <- TrustGraph.get_weight(%init_peer_id%, t) weight <- TrustGraph.get_weight(%init_peer_id%, t)
result <- Registry.put_host_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) result <- Registry.put_host_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t)
<- result <- result
func propagate_host_record(res: PutHostRecordResult) -> DhtResult: func propagate_host_record(res: PutHostRecordResult) -> DhtResult:

View File

@ -12,19 +12,19 @@ func clearExpired_86400():
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
Registry.clear_expired(t) Registry.clear_expired(t)
-- get all old records and replicate it by keys -- get all old records and replicate it by routes
func replicate_3600(): func replicate_3600():
on HOST_PEER_ID: on HOST_PEER_ID:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
res <- Registry.evict_stale(t) res <- Registry.evict_stale(t)
for r <- res.results par: for r <- res.results par:
k <- Op.string_to_b58(r.key.key_id) k <- Op.string_to_b58(r.route.id)
nodes <- Kademlia.neighborhood(k, nil, nil) nodes <- Kademlia.neighborhood(k, nil, nil)
for n <- nodes par: for n <- nodes par:
on n: on n:
tt <- Peer.timestamp_sec() tt <- Peer.timestamp_sec()
key_weight <- TrustGraph.get_weight(r.key.peer_id, tt) key_weight <- TrustGraph.get_weight(r.route.peer_id, tt)
Registry.republish_key(r.key, key_weight, tt) Registry.republish_route(r.route, key_weight, tt)
records_weights: *WeightResult records_weights: *WeightResult
for record <- r.records: for record <- r.records:

View File

@ -3,15 +3,15 @@ module Registry declares *
data ClearExpiredResult: data ClearExpiredResult:
success: bool success: bool
error: string error: string
count_keys: u64 count_routes: u64
count_values: u64 count_values: u64
data DhtResult: data DhtResult:
success: bool success: bool
error: string error: string
data Key: data Route:
key_id: string id: string
label: string label: string
peer_id: string peer_id: string
timestamp_created: u64 timestamp_created: u64
@ -20,7 +20,7 @@ data Key:
signature: []u8 signature: []u8
data Record: data Record:
key_id: string route_id: string
value: string value: string
peer_id: string peer_id: string
set_by: string set_by: string
@ -31,7 +31,7 @@ data Record:
signature: []u8 signature: []u8
data EvictStaleItem: data EvictStaleItem:
key: Key route: Route
records: []Record records: []Record
data EvictStaleResult: data EvictStaleResult:
@ -39,10 +39,10 @@ data EvictStaleResult:
error: string error: string
results: []EvictStaleItem results: []EvictStaleItem
data GetKeyMetadataResult: data GetRouteMetadataResult:
success: bool success: bool
error: string error: string
key: Key route: Route
data GetValuesResult: data GetValuesResult:
success: bool success: bool
@ -59,10 +59,10 @@ data PutHostRecordResult:
error: string error: string
value: []Record value: []Record
data RegisterKeyResult: data RegisterRouteResult:
success: bool success: bool
error: string error: string
key_id: string route_id: string
data RepublishValuesResult: data RepublishValuesResult:
success: bool success: bool
@ -77,21 +77,21 @@ data WeightResult:
service Registry("registry"): service Registry("registry"):
clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult
clear_host_record(key_id: string, current_timestamp_sec: u64) -> DhtResult clear_host_record(route_id: string, current_timestamp_sec: u64) -> DhtResult
evict_stale(current_timestamp_sec: u64) -> EvictStaleResult evict_stale(current_timestamp_sec: u64) -> EvictStaleResult
get_host_record_bytes(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 get_host_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8
get_key_bytes(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string) -> []u8 get_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8
get_key_id(label: string, peer_id: string) -> string get_records(route_id: string, current_timestamp_sec: u64) -> GetValuesResult
get_key_metadata(key_id: string, current_timestamp_sec: u64) -> GetKeyMetadataResult get_route_bytes(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string) -> []u8
get_record_bytes(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 get_route_id(label: string, peer_id: string) -> string
get_records(key_id: string, current_timestamp_sec: u64) -> GetValuesResult get_route_metadata(route_id: string, current_timestamp_sec: u64) -> GetRouteMetadataResult
merge(records: [][]Record) -> MergeResult merge(records: [][]Record) -> MergeResult
merge_two(a: []Record, b: []Record) -> MergeResult merge_two(a: []Record, b: []Record) -> MergeResult
propagate_host_record(set_host_value: PutHostRecordResult, current_timestamp_sec: u64, weight: WeightResult) -> DhtResult propagate_host_record(set_host_value: PutHostRecordResult, current_timestamp_sec: u64, weight: WeightResult) -> DhtResult
put_host_record(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> PutHostRecordResult put_host_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> PutHostRecordResult
put_record(key_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult put_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult
register_key(key: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string, signature: []u8, pin: bool, weight: WeightResult, current_timestamp_sec: u64) -> RegisterKeyResult register_route(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string, signature: []u8, pin: bool, weight: WeightResult, current_timestamp_sec: u64) -> RegisterRouteResult
republish_key(key: Key, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult
republish_records(records: []Record, weights: []WeightResult, current_timestamp_sec: u64) -> RepublishValuesResult republish_records(records: []Record, weights: []WeightResult, current_timestamp_sec: u64) -> RepublishValuesResult
republish_route(route: Route, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult
set_expired_timeout(timeout_sec: u64) set_expired_timeout(timeout_sec: u64)
set_stale_timeout(timeout_sec: u64) set_stale_timeout(timeout_sec: u64)

View File

@ -7,7 +7,7 @@ import "@fluencelabs/aqua-lib/builtin.aqua"
alias RouteId: string alias RouteId: string
func get_route_id(label: string, peer_id: string) -> RouteId: func get_route_id(label: string, peer_id: string) -> RouteId:
route_id <- Registry.get_key_id(label, peer_id) route_id <- Registry.get_route_id(label, peer_id)
<- route_id <- route_id
-- Get peers closest to the label's hash in Kademlia network -- Get peers closest to the label's hash in Kademlia network
@ -28,7 +28,7 @@ func removeFromRoute(route_id: string):
-- Create a route: register it on the closest peers -- Create a route: register it on the closest peers
func createRoute(label: string) -> RouteId: func createRoute(label: string) -> RouteId:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
signature <- get_key_signature(label, t) signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%) route_id <- get_route_id(label, %init_peer_id%)
@ -36,14 +36,14 @@ func createRoute(label: string) -> RouteId:
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
result <- register_key(label, t, signature, false) result <- register_route(label, t, signature, false)
<- route_id <- route_id
-- Create a label and subscribe to it -- Create a label and subscribe to it
-- %init_peer_id% (current client) will become a subscriber -- %init_peer_id% (current client) will become a subscriber
func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, service_id: ?string) -> string: func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, service_id: ?string) -> string:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
key_signature <- get_key_signature(label, t) route_signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%) route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t) record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
@ -54,7 +54,7 @@ func createRouteAndRegister(label: string, value: string, relay_id: ?PeerId, ser
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
register_key(label, t, key_signature, false) register_route(label, t, route_signature, false)
put_record(route_id, value, relay_id, service_id, t, record_signature) put_record(route_id, value, relay_id, service_id, t, record_signature)
<- route_id <- route_id
@ -68,7 +68,7 @@ func createRouteAndRegisterBlocking(
ack: i16 ack: i16
) -> string: ) -> string:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
key_signature <- get_key_signature(label, t) route_signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%) route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_record_signature(route_id, value, relay_id, service_id, t) record_signature <- get_record_signature(route_id, value, relay_id, service_id, t)
@ -79,7 +79,7 @@ func createRouteAndRegisterBlocking(
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
res1 <- register_key(label, t, key_signature, false) res1 <- register_route(label, t, route_signature, false)
result <- put_record(route_id, value, relay_id, service_id, t, record_signature) result <- put_record(route_id, value, relay_id, service_id, t, record_signature)
if result.success: if result.success:
results <<- result results <<- result
@ -90,20 +90,20 @@ func createRouteAndRegisterBlocking(
-- Create a label and make the given node a subscriber to it -- Create a label and make the given node a subscriber to it
func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string: func createRouteAndRegisterNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string) -> string:
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
key_signature <- get_key_signature(label, t) route_signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%) route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
on subscriber_node_id: on subscriber_node_id:
register_key(label, t, key_signature, false) register_route(label, t, route_signature, false)
r <- put_host_record(route_id, value, nil, service_id, t, record_signature) r <- put_host_record(route_id, value, nil, service_id, t, record_signature)
nodes <- getNeighbours(route_id) nodes <- getNeighbours(route_id)
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
register_key(label, t, key_signature, false) register_route(label, t, route_signature, false)
propagate_host_record(r) propagate_host_record(r)
<- route_id <- route_id
@ -125,7 +125,7 @@ func registerForRoute(route_id: string, value: string, relay_id: ?PeerId, servic
-- Note: label must be already initiated -- Note: label must be already initiated
func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string): func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: string, service_id: ?string):
t <- Peer.timestamp_sec() t <- Peer.timestamp_sec()
key_signature <- get_key_signature(label, t) route_signature <- get_route_signature(label, t)
on HOST_PEER_ID: on HOST_PEER_ID:
route_id <- get_route_id(label, %init_peer_id%) route_id <- get_route_id(label, %init_peer_id%)
record_signature <- get_host_record_signature(route_id, value, nil, service_id, t) record_signature <- get_host_record_signature(route_id, value, nil, service_id, t)
@ -136,7 +136,7 @@ func registerForRouteNode(subscriber_node_id: PeerId, label: string, value: stri
for n <- nodes par: for n <- nodes par:
on n: on n:
try: try:
register_key(label, t, key_signature, false) register_route(label, t, route_signature, false)
propagate_host_record(r) propagate_host_record(r)
-- Find the list of record for the given route_id -- Find the list of record for the given route_id

View File

@ -23,7 +23,7 @@ let local: Node[] = [
async function main() { async function main() {
// connect to the Fluence network // connect to the Fluence network
await Fluence.start({ connectTo: local[0] }); await Fluence.start({ connectTo: krasnodar[0] });
console.log("%s", await timestamp_sec()); console.log("%s", await timestamp_sec());
console.log( console.log(
"📗 created a fluence peer %s with relay %s", "📗 created a fluence peer %s with relay %s",
@ -38,11 +38,11 @@ async function main() {
let route_id = await createRouteAndRegisterBlocking( let route_id = await createRouteAndRegisterBlocking(
label, value, relay, null, label, value, relay, null,
(s) => console.log(`node ${s} saved the record`), (s) => console.log(`node ${s} saved the record`),
0 5
); );
// find other peers on this route // find other peers on this route
console.log("let's find subscribers for %s", route_id); console.log("let's find subscribers for %s", route_id);
let subscribers = await resolveRoute(route_id, 0); let subscribers = await resolveRoute(route_id, 5);
console.log("found subscribers:", subscribers); console.log("found subscribers:", subscribers);
} }

32
service/Cargo.lock generated
View File

@ -24,9 +24,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.55" version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]] [[package]]
name = "arrayref" name = "arrayref"
@ -529,9 +529,9 @@ dependencies = [
[[package]] [[package]]
name = "ed25519" name = "ed25519"
version = "1.3.0" version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816" checksum = "eed12bbf7b5312f8da1c2722bc06d8c6b12c2d86a7fb35a194c7f3e6fc2bbe39"
dependencies = [ dependencies = [
"serde", "serde",
"signature", "signature",
@ -590,9 +590,9 @@ dependencies = [
[[package]] [[package]]
name = "eyre" name = "eyre"
version = "0.6.6" version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc225d8f637923fe585089fcf03e705c222131232d2c1fb622e84ecf725d0eb8" checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f"
dependencies = [ dependencies = [
"indenter", "indenter",
"once_cell", "once_cell",
@ -1676,9 +1676,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.9.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
@ -1752,7 +1752,7 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"instant", "instant",
"libc", "libc",
"redox_syscall 0.2.10", "redox_syscall 0.2.11",
"smallvec", "smallvec",
"winapi", "winapi",
] ]
@ -2100,18 +2100,18 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.10" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
dependencies = [ dependencies = [
"bitflags", "bitflags",
] ]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.4" version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -2454,7 +2454,7 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"fastrand", "fastrand",
"libc", "libc",
"redox_syscall 0.2.10", "redox_syscall 0.2.11",
"remove_dir_all", "remove_dir_all",
"winapi", "winapi",
] ]
@ -2972,9 +2972,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "zeroize" name = "zeroize"
version = "1.5.2" version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006" checksum = "50344758e2f40e3a1fcfc8f6f91aa57b5f8ebd8d27919fe6451f15aaaf9ee608"
dependencies = [ dependencies = [
"zeroize_derive", "zeroize_derive",
] ]

View File

@ -15,8 +15,8 @@
*/ */
// TODO: sanitize tables' names in SQL expressions // TODO: sanitize tables' names in SQL expressions
pub static KEYS_TABLE_NAME: &str = "dht_keys"; pub static ROUTES_TABLE_NAME: &str = "dht_routes";
pub static KEYS_TIMESTAMPS_TABLE_NAME: &str = "dht_keys_timestamps"; pub static ROUTES_TIMESTAMPS_TABLE_NAME: &str = "dht_routes_timestamps";
pub static RECORDS_TABLE_NAME: &str = "dht_records"; pub static RECORDS_TABLE_NAME: &str = "dht_records";
pub static CONFIG_FILE: &str = "/tmp/Config.toml"; pub static CONFIG_FILE: &str = "/tmp/Config.toml";
pub static DB_PATH: &str = "/tmp/registry.db"; pub static DB_PATH: &str = "/tmp/registry.db";

View File

@ -25,13 +25,13 @@ pub enum ServiceError {
#[source] #[source]
SqliteError, SqliteError,
), ),
#[error("Requested key {0} does not exist")] #[error("Requested route {0} does not exist")]
KeyNotExists(String), RouteNotExists(String),
#[error("Key {0} for {1} peer_id already exists with newer timestamp")] #[error("Route {0} for {1} peer_id already exists with newer timestamp")]
KeyAlreadyExistsNewerTimestamp(String, String), RouteAlreadyExistsNewerTimestamp(String, String),
#[error("Values limit for key_d {0} is exceeded")] #[error("Values limit for key_d {0} is exceeded")]
ValuesLimitExceeded(String), ValuesLimitExceeded(String),
#[error("Host value for key_id {0} not found ")] #[error("Host value for route_id {0} not found ")]
HostValueNotFound(String), HostValueNotFound(String),
#[error("Invalid set_host_value result: success is false or value is missing")] #[error("Invalid set_host_value result: success is false or value is missing")]
InvalidSetHostValueResult, InvalidSetHostValueResult,
@ -59,8 +59,8 @@ pub enum ServiceError {
String, String,
#[source] fluence_keypair::error::VerificationError, #[source] fluence_keypair::error::VerificationError,
), ),
#[error("Key can't be registered in the future")] #[error("Route can't be registered in the future")]
InvalidKeyTimestamp, InvalidRouteTimestamp,
#[error("Record can't be registered in the future")] #[error("Record can't be registered in the future")]
InvalidRecordTimestamp, InvalidRecordTimestamp,
#[error("Records to publish should belong to one key id")] #[error("Records to publish should belong to one key id")]

View File

@ -1,223 +0,0 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::defaults::{KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME};
use crate::error::ServiceError;
use crate::error::ServiceError::{InternalError, KeyNotExists};
use crate::key::{Key, KeyInternal};
use crate::storage_impl::Storage;
use marine_sqlite_connector::{State, Statement, Value};
impl Storage {
pub fn create_keys_tables(&self) -> bool {
self.connection
.execute(f!("
CREATE TABLE IF NOT EXISTS {KEYS_TABLE_NAME} (
key_id TEXT PRIMARY KEY,
label TEXT,
peer_id TEXT,
timestamp_created INTEGER,
challenge BLOB,
challenge_type TEXT,
signature BLOB NOT NULL,
timestamp_published INTEGER,
pinned INTEGER,
weight INTEGER
);
"))
.is_ok()
&& self
.connection
.execute(f!("
CREATE TABLE IF NOT EXISTS {KEYS_TIMESTAMPS_TABLE_NAME} (
key_id TEXT PRIMARY KEY,
timestamp_accessed INTEGER
);
"))
.is_ok()
}
pub fn update_key_timestamp(
&self,
key_id: &str,
current_timestamp_sec: u64,
) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!("
INSERT OR REPLACE INTO {KEYS_TIMESTAMPS_TABLE_NAME} VALUES (?, ?);
"))?;
statement.bind(1, &Value::String(key_id.to_string()))?;
statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?;
statement.next()?;
Ok(())
}
pub fn get_key(&self, key_id: String) -> Result<Key, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT key_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \
FROM {KEYS_TABLE_NAME} WHERE key_id = ?"
))?;
statement.bind(1, &Value::String(key_id.clone()))?;
if let State::Row = statement.next()? {
read_key(&statement)
} else {
Err(KeyNotExists(key_id))
}
}
pub fn write_key(&self, key: KeyInternal) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!("
INSERT OR REPLACE INTO {KEYS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"))?;
let pinned = if key.pinned { 1 } else { 0 } as i64;
statement.bind(1, &Value::String(key.key.key_id))?;
statement.bind(2, &Value::String(key.key.label))?;
statement.bind(3, &Value::String(key.key.peer_id))?;
statement.bind(4, &Value::Integer(key.key.timestamp_created as i64))?;
statement.bind(5, &Value::Binary(key.key.challenge))?;
statement.bind(6, &Value::String(key.key.challenge_type))?;
statement.bind(7, &Value::Binary(key.key.signature))?;
statement.bind(8, &Value::Integer(key.timestamp_published as i64))?;
statement.bind(9, &Value::Integer(pinned))?;
statement.bind(10, &Value::Integer(key.weight as i64))?;
statement.next()?;
Ok(())
}
pub fn update_key(&self, key: KeyInternal) -> Result<(), ServiceError> {
if let Ok(existing_key) = self.get_key(key.key.key_id.clone()) {
if existing_key.timestamp_created > key.key.timestamp_created {
return Err(ServiceError::KeyAlreadyExistsNewerTimestamp(
key.key.label,
key.key.peer_id,
));
}
}
self.write_key(key)
}
pub fn check_key_existence(&self, key_id: &str) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT EXISTS(SELECT 1 FROM {KEYS_TABLE_NAME} WHERE key_id = ? LIMIT 1)"
))?;
statement.bind(1, &Value::String(key_id.to_string()))?;
if let State::Row = statement.next()? {
let exists = statement.read::<i64>(0)?;
if exists == 1 {
Ok(())
} else {
Err(KeyNotExists(key_id.to_string()))
}
} else {
Err(InternalError(
"EXISTS should always return something".to_string(),
))
}
}
pub fn get_stale_keys(&self, stale_timestamp: u64) -> Result<Vec<KeyInternal>, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT label, peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, pinned, weight \
FROM {KEYS_TABLE_NAME} WHERE timestamp_published <= ?"
))?;
statement.bind(1, &Value::Integer(stale_timestamp as i64))?;
let mut stale_keys: Vec<KeyInternal> = vec![];
while let State::Row = statement.next()? {
stale_keys.push(read_internal_key(&statement)?);
}
Ok(stale_keys)
}
pub fn delete_key(&self, key_id: String) -> Result<(), ServiceError> {
let mut statement = self
.connection
.prepare(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key_id=?"))?;
statement.bind(1, &Value::String(key_id.clone()))?;
statement.next().map(drop)?;
if self.connection.changes() == 1 {
Ok(())
} else {
Err(KeyNotExists(key_id))
}
}
/// not pinned only
pub fn get_expired_keys(&self, expired_timestamp: u64) -> Result<Vec<Key>, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT label, peer_id, timestamp_created, challenge, challenge_type, signature \
FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= ? and pinned != 1"
))?;
statement.bind(1, &Value::Integer(expired_timestamp as i64))?;
let mut expired_keys: Vec<Key> = vec![];
while let State::Row = statement.next()? {
let key = read_key(&statement)?;
let timestamp_accessed = self.get_key_timestamp_accessed(&key.key_id)?;
let with_host_records = self.get_host_records_count_by_key(key.key_id.clone())? != 0;
if timestamp_accessed <= expired_timestamp && !with_host_records {
expired_keys.push(key);
}
}
Ok(expired_keys)
}
pub fn get_key_timestamp_accessed(&self, key_id: &str) -> Result<u64, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT timestamp_accessed FROM {KEYS_TIMESTAMPS_TABLE_NAME} WHERE key_id != ?"
))?;
statement.bind(1, &Value::String(key_id.to_string()))?;
if let State::Row = statement.next()? {
statement
.read::<i64>(0)
.map(|t| t as u64)
.map_err(ServiceError::SqliteError)
} else {
Err(KeyNotExists(key_id.to_string()))
}
}
}
pub fn read_key(statement: &Statement) -> Result<Key, ServiceError> {
Ok(Key {
key_id: statement.read::<String>(0)?,
label: statement.read::<String>(1)?,
peer_id: statement.read::<String>(2)?,
timestamp_created: statement.read::<i64>(3)? as u64,
challenge: statement.read::<Vec<u8>>(4)?,
challenge_type: statement.read::<String>(5)?,
signature: statement.read::<Vec<u8>>(6)?,
})
}
pub fn read_internal_key(statement: &Statement) -> Result<KeyInternal, ServiceError> {
Ok(KeyInternal {
key: read_key(statement)?,
timestamp_published: statement.read::<i64>(7)? as u64,
pinned: statement.read::<i64>(8)? != 0,
weight: statement.read::<i64>(9)? as u32,
})
}

View File

@ -24,14 +24,14 @@ use crate::tetraplets_checkers::check_timestamp_tetraplets;
mod config; mod config;
mod defaults; mod defaults;
mod error; mod error;
mod key;
mod key_api;
mod key_storage_impl;
mod misc; mod misc;
mod record; mod record;
mod record_api; mod record_api;
mod record_storage_impl; mod record_storage_impl;
mod results; mod results;
mod route;
mod route_api;
mod route_storage_impl;
mod storage_impl; mod storage_impl;
mod tests; mod tests;
mod tetraplets_checkers; mod tetraplets_checkers;
@ -59,7 +59,7 @@ pub struct WeightResult {
fn main() { fn main() {
let storage = get_storage().unwrap(); let storage = get_storage().unwrap();
storage.create_keys_tables(); storage.create_route_tables();
storage.create_values_table(); storage.create_values_table();
create_config(); create_config();
} }

View File

@ -23,7 +23,7 @@ use sha2::{Digest, Sha256};
#[marine] #[marine]
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct Record { pub struct Record {
pub key_id: String, pub route_id: String,
pub value: String, pub value: String,
pub peer_id: String, pub peer_id: String,
pub set_by: String, pub set_by: String,
@ -43,7 +43,7 @@ pub struct RecordInternal {
impl Record { impl Record {
pub fn signature_bytes(&self) -> Vec<u8> { pub fn signature_bytes(&self) -> Vec<u8> {
let mut metadata = Vec::new(); let mut metadata = Vec::new();
metadata.extend(self.key_id.as_bytes()); metadata.extend(self.route_id.as_bytes());
metadata.extend(self.value.as_bytes()); metadata.extend(self.value.as_bytes());
metadata.extend(self.peer_id.as_bytes()); metadata.extend(self.peer_id.as_bytes());
metadata.extend(self.set_by.as_bytes()); metadata.extend(self.set_by.as_bytes());
@ -80,7 +80,7 @@ impl Record {
let bytes = self.signature_bytes(); let bytes = self.signature_bytes();
let signature = Signature::from_bytes(pk.get_key_format(), self.signature.clone()); let signature = Signature::from_bytes(pk.get_key_format(), self.signature.clone());
pk.verify(&bytes, &signature).map_err(|e| { pk.verify(&bytes, &signature).map_err(|e| {
ServiceError::InvalidRecordSignature(self.key_id.clone(), self.value.clone(), e) ServiceError::InvalidRecordSignature(self.route_id.clone(), self.value.clone(), e)
}) })
} }
} }

View File

@ -32,7 +32,7 @@ use marine_rs_sdk::marine;
#[marine] #[marine]
pub fn get_record_bytes( pub fn get_record_bytes(
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -41,7 +41,7 @@ pub fn get_record_bytes(
) -> Vec<u8> { ) -> Vec<u8> {
let cp = marine_rs_sdk::get_call_parameters(); let cp = marine_rs_sdk::get_call_parameters();
Record { Record {
key_id, route_id,
value, value,
peer_id: cp.init_peer_id.clone(), peer_id: cp.init_peer_id.clone(),
set_by: cp.init_peer_id, set_by: cp.init_peer_id,
@ -56,7 +56,7 @@ pub fn get_record_bytes(
#[marine] #[marine]
pub fn put_record( pub fn put_record(
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -72,7 +72,7 @@ pub fn put_record(
check_timestamp_tetraplets(&cp, 8)?; check_timestamp_tetraplets(&cp, 8)?;
check_weight_result(&cp.init_peer_id, &weight)?; check_weight_result(&cp.init_peer_id, &weight)?;
let record = Record { let record = Record {
key_id, route_id,
value, value,
peer_id: cp.init_peer_id.clone(), peer_id: cp.init_peer_id.clone(),
set_by: cp.init_peer_id, set_by: cp.init_peer_id,
@ -85,7 +85,7 @@ pub fn put_record(
record.verify(current_timestamp_sec)?; record.verify(current_timestamp_sec)?;
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&record.key_id)?; storage.check_route_existence(&record.route_id)?;
storage storage
.update_record( .update_record(
RecordInternal { RecordInternal {
@ -101,7 +101,7 @@ pub fn put_record(
#[marine] #[marine]
pub fn get_host_record_bytes( pub fn get_host_record_bytes(
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -110,7 +110,7 @@ pub fn get_host_record_bytes(
) -> Vec<u8> { ) -> Vec<u8> {
let cp = marine_rs_sdk::get_call_parameters(); let cp = marine_rs_sdk::get_call_parameters();
Record { Record {
key_id, route_id,
value, value,
peer_id: cp.host_id, peer_id: cp.host_id,
set_by: cp.init_peer_id, set_by: cp.init_peer_id,
@ -124,7 +124,7 @@ pub fn get_host_record_bytes(
} }
#[marine] #[marine]
pub fn put_host_record( pub fn put_host_record(
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -140,7 +140,7 @@ pub fn put_host_record(
check_timestamp_tetraplets(&cp, 8)?; check_timestamp_tetraplets(&cp, 8)?;
check_weight_result(&cp.init_peer_id, &weight)?; check_weight_result(&cp.init_peer_id, &weight)?;
let record = Record { let record = Record {
key_id, route_id,
value, value,
peer_id: cp.host_id, peer_id: cp.host_id,
set_by: cp.init_peer_id, set_by: cp.init_peer_id,
@ -153,7 +153,7 @@ pub fn put_host_record(
record.verify(current_timestamp_sec)?; record.verify(current_timestamp_sec)?;
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&record.key_id)?; storage.check_route_existence(&record.route_id)?;
storage.update_record( storage.update_record(
RecordInternal { RecordInternal {
record: record.clone(), record: record.clone(),
@ -180,7 +180,7 @@ pub fn propagate_host_record(
return Err(ServiceError::InvalidSetHostValueResult); return Err(ServiceError::InvalidSetHostValueResult);
} }
let mut record = set_host_value.value[0].clone(); let record = set_host_value.value[0].clone();
record.verify(current_timestamp_sec)?; record.verify(current_timestamp_sec)?;
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
@ -191,12 +191,12 @@ pub fn propagate_host_record(
let weight = weight.weight; let weight = weight.weight;
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&record.key_id)?; storage.check_route_existence(&record.route_id)?;
storage.update_key_timestamp(&record.key_id, current_timestamp_sec)?; storage.update_route_timestamp(&record.route_id, current_timestamp_sec)?;
storage storage
.merge_and_update_records( .merge_and_update_records(
record.key_id.clone(), record.route_id.clone(),
vec![RecordInternal { record, weight }], vec![RecordInternal { record, weight }],
) )
.map(|_| ()) .map(|_| ())
@ -206,15 +206,15 @@ pub fn propagate_host_record(
/// Return all values by key /// Return all values by key
#[marine] #[marine]
pub fn get_records(key_id: String, current_timestamp_sec: u64) -> GetValuesResult { pub fn get_records(route_id: String, current_timestamp_sec: u64) -> GetValuesResult {
wrapped_try(|| { wrapped_try(|| {
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 1)?; check_timestamp_tetraplets(&call_parameters, 1)?;
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&key_id)?; storage.check_route_existence(&route_id)?;
storage.update_key_timestamp(&key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route_id, current_timestamp_sec)?;
storage storage
.get_records(key_id) .get_records(route_id)
.map(|records| records.into_iter().map(|r| r.record).collect()) .map(|records| records.into_iter().map(|r| r.record).collect())
}) })
.into() .into()
@ -232,7 +232,7 @@ pub fn republish_records(
return Ok(0); return Ok(0);
} }
let key_id = records[0].key_id.clone(); let route_id = records[0].route_id.clone();
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 2)?; check_timestamp_tetraplets(&call_parameters, 2)?;
let mut records_to_merge = vec![]; let mut records_to_merge = vec![];
@ -245,7 +245,7 @@ pub fn republish_records(
record.set_by.clone(), record.set_by.clone(),
))?; ))?;
check_weight_result(&record.set_by, weight_result)?; check_weight_result(&record.set_by, weight_result)?;
if record.key_id != key_id { if record.route_id != route_id {
return Err(ServiceError::RecordsPublishingError); return Err(ServiceError::RecordsPublishingError);
} }
@ -256,29 +256,29 @@ pub fn republish_records(
} }
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&key_id)?; storage.check_route_existence(&route_id)?;
storage.update_key_timestamp(&key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route_id, current_timestamp_sec)?;
storage.merge_and_update_records(key_id, records_to_merge) storage.merge_and_update_records(route_id, records_to_merge)
}) })
.into() .into()
} }
/// Remove host value by key and caller peer_id /// Remove host value by key and caller peer_id
#[marine] #[marine]
pub fn clear_host_record(key_id: String, current_timestamp_sec: u64) -> DhtResult { pub fn clear_host_record(route_id: String, current_timestamp_sec: u64) -> DhtResult {
wrapped_try(|| { wrapped_try(|| {
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 1)?; check_timestamp_tetraplets(&call_parameters, 1)?;
let storage = get_storage()?; let storage = get_storage()?;
storage.check_key_existence(&key_id)?; storage.check_route_existence(&route_id)?;
storage.update_key_timestamp(&key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route_id, current_timestamp_sec)?;
let peer_id = call_parameters.host_id; let peer_id = call_parameters.host_id;
let set_by = call_parameters.init_peer_id; let set_by = call_parameters.init_peer_id;
let deleted = storage.delete_record(key_id.clone(), peer_id, set_by)?; let deleted = storage.delete_record(route_id.clone(), peer_id, set_by)?;
deleted.as_result((), ServiceError::HostValueNotFound(key_id)) deleted.as_result((), ServiceError::HostValueNotFound(route_id))
}) })
.into() .into()
} }

View File

@ -28,7 +28,7 @@ impl Storage {
self.connection self.connection
.execute(f!(" .execute(f!("
CREATE TABLE IF NOT EXISTS {RECORDS_TABLE_NAME} ( CREATE TABLE IF NOT EXISTS {RECORDS_TABLE_NAME} (
key_id TEXT, route_id TEXT,
value TEXT, value TEXT,
peer_id TEXT, peer_id TEXT,
set_by TEXT, set_by TEXT,
@ -38,7 +38,7 @@ impl Storage {
solution BLOB, solution BLOB,
signature BLOB NOT NULL, signature BLOB NOT NULL,
weight INTEGER, weight INTEGER,
PRIMARY KEY (key_id, peer_id, set_by) PRIMARY KEY (route_id, peer_id, set_by)
); );
")) "))
.is_ok() .is_ok()
@ -47,12 +47,13 @@ impl Storage {
/// Put value with caller peer_id if the key exists. /// Put value with caller peer_id if the key exists.
/// If the value is NOT a host value and the key already has `VALUES_LIMIT` records, then a value with the smallest weight is removed and the new value is inserted instead. /// If the value is NOT a host value and the key already has `VALUES_LIMIT` records, then a value with the smallest weight is removed and the new value is inserted instead.
pub fn update_record(&self, record: RecordInternal, host: bool) -> Result<(), ServiceError> { pub fn update_record(&self, record: RecordInternal, host: bool) -> Result<(), ServiceError> {
let records_count = self.get_non_host_records_count_by_key(record.record.key_id.clone())?; let records_count =
self.get_non_host_records_count_by_key(record.record.route_id.clone())?;
// check values limits for non-host values // check values limits for non-host values
if !host && records_count >= VALUES_LIMIT { if !host && records_count >= VALUES_LIMIT {
let min_weight_record = let min_weight_record =
self.get_min_weight_non_host_record_by_key(record.record.key_id.clone())?; self.get_min_weight_non_host_record_by_key(record.record.route_id.clone())?;
if min_weight_record.weight < record.weight if min_weight_record.weight < record.weight
|| (min_weight_record.weight == record.weight || (min_weight_record.weight == record.weight
@ -60,13 +61,13 @@ impl Storage {
{ {
// delete the lightest record if the new one is heavier or newer // delete the lightest record if the new one is heavier or newer
self.delete_record( self.delete_record(
min_weight_record.record.key_id, min_weight_record.record.route_id,
min_weight_record.record.peer_id, min_weight_record.record.peer_id,
min_weight_record.record.set_by, min_weight_record.record.set_by,
)?; )?;
} else { } else {
// return error if limit is exceeded // return error if limit is exceeded
return Err(ServiceError::ValuesLimitExceeded(record.record.key_id)); return Err(ServiceError::ValuesLimitExceeded(record.record.route_id));
} }
} }
@ -79,7 +80,7 @@ impl Storage {
"INSERT OR REPLACE INTO {RECORDS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" "INSERT OR REPLACE INTO {RECORDS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
))?; ))?;
statement.bind(1, &Value::String(record.record.key_id))?; statement.bind(1, &Value::String(record.record.route_id))?;
statement.bind(2, &Value::String(record.record.value))?; statement.bind(2, &Value::String(record.record.value))?;
statement.bind(3, &Value::String(record.record.peer_id))?; statement.bind(3, &Value::String(record.record.peer_id))?;
statement.bind(4, &Value::String(record.record.set_by))?; statement.bind(4, &Value::String(record.record.set_by))?;
@ -102,14 +103,14 @@ impl Storage {
pub fn delete_record( pub fn delete_record(
&self, &self,
key_id: String, route_id: String,
peer_id: String, peer_id: String,
set_by: String, set_by: String,
) -> Result<bool, ServiceError> { ) -> Result<bool, ServiceError> {
let mut statement = self.connection.prepare(f!( let mut statement = self.connection.prepare(f!(
"DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id=? AND peer_id=? AND set_by=?" "DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id=? AND peer_id=? AND set_by=?"
))?; ))?;
statement.bind(1, &Value::String(key_id))?; statement.bind(1, &Value::String(route_id))?;
statement.bind(2, &Value::String(peer_id))?; statement.bind(2, &Value::String(peer_id))?;
statement.bind(3, &Value::String(set_by))?; statement.bind(3, &Value::String(set_by))?;
statement.next().map(drop)?; statement.next().map(drop)?;
@ -119,23 +120,23 @@ impl Storage {
fn get_min_weight_non_host_record_by_key( fn get_min_weight_non_host_record_by_key(
&self, &self,
key_id: String, route_id: String,
) -> Result<RecordInternal, ServiceError> { ) -> Result<RecordInternal, ServiceError> {
let host_id = marine_rs_sdk::get_call_parameters().host_id; let host_id = marine_rs_sdk::get_call_parameters().host_id;
// only only non-host values // only only non-host values
let mut statement = self.connection.prepare( let mut statement = self.connection.prepare(
f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \ f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \
WHERE key_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?; WHERE route_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?;
statement.bind(1, &Value::String(key_id.clone()))?; statement.bind(1, &Value::String(route_id.clone()))?;
statement.bind(2, &Value::String(host_id))?; statement.bind(2, &Value::String(host_id))?;
if let State::Row = statement.next()? { if let State::Row = statement.next()? {
read_record(&statement) read_record(&statement)
} else { } else {
Err(InternalError(f!( Err(InternalError(f!(
"not found non-host records for given key_id: {key_id}" "not found non-host records for given route_id: {route_id}"
))) )))
} }
} }
@ -145,7 +146,7 @@ impl Storage {
// only only non-host values // only only non-host values
let mut statement = self.connection.prepare(f!( let mut statement = self.connection.prepare(f!(
"SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id != ?" "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id != ?"
))?; ))?;
statement.bind(1, &Value::String(key))?; statement.bind(1, &Value::String(key))?;
statement.bind(2, &Value::String(host_id))?; statement.bind(2, &Value::String(host_id))?;
@ -162,14 +163,14 @@ impl Storage {
} }
} }
pub fn get_host_records_count_by_key(&self, key_id: String) -> Result<u64, ServiceError> { pub fn get_host_records_count_by_key(&self, route_id: String) -> Result<u64, ServiceError> {
let host_id = marine_rs_sdk::get_call_parameters().host_id; let host_id = marine_rs_sdk::get_call_parameters().host_id;
// only only non-host values // only only non-host values
let mut statement = self.connection.prepare(f!( let mut statement = self.connection.prepare(f!(
"SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id = ?" "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id = ?"
))?; ))?;
statement.bind(1, &Value::String(key_id))?; statement.bind(1, &Value::String(route_id))?;
statement.bind(2, &Value::String(host_id))?; statement.bind(2, &Value::String(host_id))?;
if let State::Row = statement.next()? { if let State::Row = statement.next()? {
@ -186,11 +187,11 @@ impl Storage {
pub fn merge_and_update_records( pub fn merge_and_update_records(
&self, &self,
key_id: String, route_id: String,
records: Vec<RecordInternal>, records: Vec<RecordInternal>,
) -> Result<u64, ServiceError> { ) -> Result<u64, ServiceError> {
let records = merge_records( let records = merge_records(
self.get_records(key_id)? self.get_records(route_id)?
.into_iter() .into_iter()
.chain(records.into_iter()) .chain(records.into_iter())
.collect(), .collect(),
@ -205,11 +206,11 @@ impl Storage {
Ok(updated) Ok(updated)
} }
pub fn get_records(&self, key_id: String) -> Result<Vec<RecordInternal>, ServiceError> { pub fn get_records(&self, route_id: String) -> Result<Vec<RecordInternal>, ServiceError> {
let mut statement = self.connection.prepare( let mut statement = self.connection.prepare(
f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \ f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \
WHERE key_id = ? ORDER BY weight DESC"))?; WHERE route_id = ? ORDER BY weight DESC"))?;
statement.bind(1, &Value::String(key_id))?; statement.bind(1, &Value::String(route_id))?;
let mut result: Vec<RecordInternal> = vec![]; let mut result: Vec<RecordInternal> = vec![];
@ -230,12 +231,12 @@ impl Storage {
} }
/// except host records and for pinned keys /// except host records and for pinned keys
pub fn delete_records_by_key(&self, key_id: String) -> Result<u64, ServiceError> { pub fn delete_records_by_key(&self, route_id: String) -> Result<u64, ServiceError> {
let mut statement = self let mut statement = self
.connection .connection
.prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id = ?"))?; .prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id = ?"))?;
statement.bind(1, &Value::String(key_id))?; statement.bind(1, &Value::String(route_id))?;
statement.next().map(drop)?; statement.next().map(drop)?;
Ok(self.connection.changes() as u64) Ok(self.connection.changes() as u64)
@ -245,7 +246,7 @@ impl Storage {
pub fn read_record(statement: &Statement) -> Result<RecordInternal, ServiceError> { pub fn read_record(statement: &Statement) -> Result<RecordInternal, ServiceError> {
Ok(RecordInternal { Ok(RecordInternal {
record: Record { record: Record {
key_id: statement.read::<String>(0)?, route_id: statement.read::<String>(0)?,
value: statement.read::<String>(1)?, value: statement.read::<String>(1)?,
peer_id: statement.read::<String>(2)?, peer_id: statement.read::<String>(2)?,
set_by: statement.read::<String>(3)?, set_by: statement.read::<String>(3)?,

View File

@ -15,8 +15,8 @@
*/ */
use crate::error::ServiceError; use crate::error::ServiceError;
use crate::key::Key;
use crate::record::Record; use crate::record::Record;
use crate::route::Route;
use marine_rs_sdk::marine; use marine_rs_sdk::marine;
#[marine] #[marine]
@ -43,24 +43,24 @@ impl From<Result<(), ServiceError>> for DhtResult {
#[marine] #[marine]
#[derive(Debug)] #[derive(Debug)]
pub struct RegisterKeyResult { pub struct RegisterRouteResult {
pub success: bool, pub success: bool,
pub error: String, pub error: String,
pub key_id: String, pub route_id: String,
} }
impl From<Result<String, ServiceError>> for RegisterKeyResult { impl From<Result<String, ServiceError>> for RegisterRouteResult {
fn from(result: Result<String, ServiceError>) -> Self { fn from(result: Result<String, ServiceError>) -> Self {
match result { match result {
Ok(key_id) => Self { Ok(route_id) => Self {
success: true, success: true,
error: "".to_string(), error: "".to_string(),
key_id, route_id,
}, },
Err(err) => Self { Err(err) => Self {
success: false, success: false,
error: err.to_string(), error: err.to_string(),
key_id: "".to_string(), route_id: "".to_string(),
}, },
} }
} }
@ -96,22 +96,22 @@ impl From<Result<Vec<Record>, ServiceError>> for GetValuesResult {
pub struct ClearExpiredResult { pub struct ClearExpiredResult {
pub success: bool, pub success: bool,
pub error: String, pub error: String,
pub count_keys: u64, pub count_routes: u64,
pub count_values: u64, pub count_values: u64,
} }
impl From<Result<(u64, u64), ServiceError>> for ClearExpiredResult { impl From<Result<(u64, u64), ServiceError>> for ClearExpiredResult {
fn from(result: Result<(u64, u64), ServiceError>) -> Self { fn from(result: Result<(u64, u64), ServiceError>) -> Self {
match result { match result {
Ok((keys, values)) => Self { Ok((routes, values)) => Self {
success: true, success: true,
count_keys: keys, count_routes: routes,
count_values: values, count_values: values,
error: "".to_string(), error: "".to_string(),
}, },
Err(err) => Self { Err(err) => Self {
success: false, success: false,
count_keys: 0, count_routes: 0,
count_values: 0, count_values: 0,
error: err.to_string(), error: err.to_string(),
}, },
@ -145,24 +145,24 @@ impl From<Result<Vec<Record>, ServiceError>> for GetStaleRecordsResult {
} }
#[marine] #[marine]
pub struct GetKeyMetadataResult { pub struct GetRouteMetadataResult {
pub success: bool, pub success: bool,
pub error: String, pub error: String,
pub key: Key, pub route: Route,
} }
impl From<Result<Key, ServiceError>> for GetKeyMetadataResult { impl From<Result<Route, ServiceError>> for GetRouteMetadataResult {
fn from(result: Result<Key, ServiceError>) -> Self { fn from(result: Result<Route, ServiceError>) -> Self {
match result { match result {
Ok(key) => Self { Ok(route) => Self {
success: true, success: true,
error: "".to_string(), error: "".to_string(),
key, route,
}, },
Err(err) => Self { Err(err) => Self {
success: false, success: false,
error: err.to_string(), error: err.to_string(),
key: Key::default(), route: Route::default(),
}, },
} }
} }
@ -194,7 +194,7 @@ impl From<Result<u64, ServiceError>> for RepublishValuesResult {
#[marine] #[marine]
pub struct EvictStaleItem { pub struct EvictStaleItem {
pub key: Key, pub route: Route,
pub records: Vec<Record>, pub records: Vec<Record>,
} }

View File

@ -22,8 +22,8 @@ use sha2::{Digest, Sha256};
#[marine] #[marine]
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct Key { pub struct Route {
pub key_id: String, pub id: String,
pub label: String, pub label: String,
pub peer_id: String, pub peer_id: String,
pub timestamp_created: u64, pub timestamp_created: u64,
@ -33,14 +33,14 @@ pub struct Key {
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct KeyInternal { pub struct RouteInternal {
pub key: Key, pub route: Route,
pub timestamp_published: u64, pub timestamp_published: u64,
pub pinned: bool, pub pinned: bool,
pub weight: u32, pub weight: u32,
} }
impl Key { impl Route {
pub fn new( pub fn new(
label: String, label: String,
peer_id: String, peer_id: String,
@ -49,10 +49,10 @@ impl Key {
challenge_type: String, challenge_type: String,
signature: Vec<u8>, signature: Vec<u8>,
) -> Self { ) -> Self {
let key_id = Self::get_key_id(&label, &peer_id); let id = Self::get_id(&label, &peer_id);
Self { Self {
key_id, id,
label, label,
peer_id, peer_id,
timestamp_created, timestamp_created,
@ -62,8 +62,8 @@ impl Key {
} }
} }
pub fn get_key_id(key: &str, peer_id: &str) -> String { pub fn get_id(label: &str, peer_id: &str) -> String {
format!("{}{}", key, peer_id) format!("{}{}", label, peer_id)
} }
pub fn signature_bytes(&self) -> Vec<u8> { pub fn signature_bytes(&self) -> Vec<u8> {
@ -81,7 +81,7 @@ impl Key {
pub fn verify(&self, current_timestamp_sec: u64) -> Result<(), ServiceError> { pub fn verify(&self, current_timestamp_sec: u64) -> Result<(), ServiceError> {
if self.timestamp_created > current_timestamp_sec { if self.timestamp_created > current_timestamp_sec {
return Err(ServiceError::InvalidKeyTimestamp); return Err(ServiceError::InvalidRouteTimestamp);
} }
self.verify_signature() self.verify_signature()

View File

@ -14,23 +14,23 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::error::ServiceError; use crate::error::ServiceError;
use crate::key::{Key, KeyInternal};
use crate::misc::check_weight_result; use crate::misc::check_weight_result;
use crate::results::{DhtResult, GetKeyMetadataResult, RegisterKeyResult}; use crate::results::{DhtResult, GetRouteMetadataResult, RegisterRouteResult};
use crate::route::{Route, RouteInternal};
use crate::storage_impl::get_storage; use crate::storage_impl::get_storage;
use crate::tetraplets_checkers::{check_timestamp_tetraplets, check_weight_tetraplets}; use crate::tetraplets_checkers::{check_timestamp_tetraplets, check_weight_tetraplets};
use crate::{wrapped_try, WeightResult}; use crate::{wrapped_try, WeightResult};
use marine_rs_sdk::marine; use marine_rs_sdk::marine;
#[marine] #[marine]
pub fn get_key_bytes( pub fn get_route_bytes(
label: String, label: String,
mut peer_id: Vec<String>, mut peer_id: Vec<String>,
timestamp_created: u64, timestamp_created: u64,
challenge: Vec<u8>, challenge: Vec<u8>,
challenge_type: String, challenge_type: String,
) -> Vec<u8> { ) -> Vec<u8> {
Key { Route {
label, label,
peer_id: peer_id peer_id: peer_id
.pop() .pop()
@ -44,14 +44,14 @@ pub fn get_key_bytes(
} }
#[marine] #[marine]
pub fn get_key_id(label: String, peer_id: String) -> String { pub fn get_route_id(label: String, peer_id: String) -> String {
Key::get_key_id(&label, &peer_id) Route::get_id(&label, &peer_id)
} }
/// register new key if not exists with caller peer_id, update if exists with same peer_id or return error /// register new route if not exists with caller peer_id, update if exists with same peer_id or return error
#[marine] #[marine]
pub fn register_key( pub fn register_route(
key: String, label: String,
peer_id: Vec<String>, peer_id: Vec<String>,
timestamp_created: u64, timestamp_created: u64,
challenge: Vec<u8>, challenge: Vec<u8>,
@ -60,7 +60,7 @@ pub fn register_key(
pin: bool, pin: bool,
weight: WeightResult, weight: WeightResult,
current_timestamp_sec: u64, current_timestamp_sec: u64,
) -> RegisterKeyResult { ) -> RegisterRouteResult {
wrapped_try(|| { wrapped_try(|| {
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_weight_tetraplets(&call_parameters, 7, 0)?; check_weight_tetraplets(&call_parameters, 7, 0)?;
@ -70,68 +70,72 @@ pub fn register_key(
.unwrap_or(&call_parameters.init_peer_id) .unwrap_or(&call_parameters.init_peer_id)
.clone(); .clone();
check_weight_result(&peer_id, &weight)?; check_weight_result(&peer_id, &weight)?;
let key = Key::new( let route = Route::new(
key, label,
peer_id, peer_id,
timestamp_created, timestamp_created,
challenge, challenge,
challenge_type, challenge_type,
signature, signature,
); );
key.verify(current_timestamp_sec)?; route.verify(current_timestamp_sec)?;
let key_id = key.key_id.clone(); let route_id = route.id.clone();
let weight = weight.weight; let weight = weight.weight;
let storage = get_storage()?; let storage = get_storage()?;
storage.update_key_timestamp(&key.key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route.id, current_timestamp_sec)?;
storage.update_key(KeyInternal { storage.update_route(RouteInternal {
key, route,
timestamp_published: 0, timestamp_published: 0,
pinned: pin, pinned: pin,
weight, weight,
})?; })?;
Ok(key_id) Ok(route_id)
}) })
.into() .into()
} }
#[marine] #[marine]
pub fn get_key_metadata(key_id: String, current_timestamp_sec: u64) -> GetKeyMetadataResult { pub fn get_route_metadata(route_id: String, current_timestamp_sec: u64) -> GetRouteMetadataResult {
wrapped_try(|| { wrapped_try(|| {
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_timestamp_tetraplets(&call_parameters, 1)?; check_timestamp_tetraplets(&call_parameters, 1)?;
let storage = get_storage()?; let storage = get_storage()?;
storage.update_key_timestamp(&key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route_id, current_timestamp_sec)?;
storage.get_key(key_id) storage.get_route(route_id)
}) })
.into() .into()
} }
/// Used for replication, same as register_key, but key.pinned is ignored, updates timestamp_accessed /// Used for replication, same as register_route, but route.pinned is ignored, updates timestamp_accessed
#[marine] #[marine]
pub fn republish_key(mut key: Key, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult { pub fn republish_route(
mut route: Route,
weight: WeightResult,
current_timestamp_sec: u64,
) -> DhtResult {
wrapped_try(|| { wrapped_try(|| {
let call_parameters = marine_rs_sdk::get_call_parameters(); let call_parameters = marine_rs_sdk::get_call_parameters();
check_weight_tetraplets(&call_parameters, 1, 0)?; check_weight_tetraplets(&call_parameters, 1, 0)?;
check_weight_result(&key.peer_id, &weight)?; check_weight_result(&route.peer_id, &weight)?;
check_timestamp_tetraplets(&call_parameters, 2)?; check_timestamp_tetraplets(&call_parameters, 2)?;
key.verify(current_timestamp_sec)?; route.verify(current_timestamp_sec)?;
// just to be sure // just to be sure
key.key_id = Key::get_key_id(&key.label, &key.peer_id); route.id = Route::get_id(&route.label, &route.peer_id);
let storage = get_storage()?; let storage = get_storage()?;
storage.update_key_timestamp(&key.key_id, current_timestamp_sec)?; storage.update_route_timestamp(&route.id, current_timestamp_sec)?;
match storage.update_key(KeyInternal { match storage.update_route(RouteInternal {
key, route,
timestamp_published: 0, timestamp_published: 0,
pinned: false, pinned: false,
weight: weight.weight, weight: weight.weight,
}) { }) {
// we should ignore this error for republish // we should ignore this error for republish
Err(ServiceError::KeyAlreadyExistsNewerTimestamp(_, _)) => Ok(()), Err(ServiceError::RouteAlreadyExistsNewerTimestamp(_, _)) => Ok(()),
other => other, other => other,
} }
}) })

View File

@ -0,0 +1,239 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::defaults::{ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME};
use crate::error::ServiceError;
use crate::error::ServiceError::{InternalError, RouteNotExists};
use crate::route::{Route, RouteInternal};
use crate::storage_impl::Storage;
use marine_sqlite_connector::{State, Statement, Value};
impl Storage {
pub fn create_route_tables(&self) -> bool {
self.connection
.execute(f!("
CREATE TABLE IF NOT EXISTS {ROUTES_TABLE_NAME} (
route_id TEXT PRIMARY KEY,
label TEXT,
peer_id TEXT,
timestamp_created INTEGER,
challenge BLOB,
challenge_type TEXT,
signature BLOB NOT NULL,
timestamp_published INTEGER,
pinned INTEGER,
weight INTEGER
);
"))
.is_ok()
&& self
.connection
.execute(f!("
CREATE TABLE IF NOT EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME} (
route_id TEXT PRIMARY KEY,
timestamp_accessed INTEGER
);
"))
.is_ok()
}
pub fn update_route_timestamp(
&self,
route_id: &str,
current_timestamp_sec: u64,
) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!("
INSERT OR REPLACE INTO {ROUTES_TIMESTAMPS_TABLE_NAME} VALUES (?, ?);
"))?;
statement.bind(1, &Value::String(route_id.to_string()))?;
statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?;
statement.next()?;
Ok(())
}
pub fn get_route(&self, route_id: String) -> Result<Route, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \
FROM {ROUTES_TABLE_NAME} WHERE route_id = ?"
))?;
statement.bind(1, &Value::String(route_id.clone()))?;
if let State::Row = statement.next()? {
read_route(&statement)
} else {
Err(RouteNotExists(route_id))
}
}
pub fn write_route(&self, route: RouteInternal) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!("
INSERT OR REPLACE INTO {ROUTES_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"))?;
let pinned = if route.pinned { 1 } else { 0 } as i64;
statement.bind(1, &Value::String(route.route.id))?;
statement.bind(2, &Value::String(route.route.label))?;
statement.bind(3, &Value::String(route.route.peer_id))?;
statement.bind(4, &Value::Integer(route.route.timestamp_created as i64))?;
statement.bind(5, &Value::Binary(route.route.challenge))?;
statement.bind(6, &Value::String(route.route.challenge_type))?;
statement.bind(7, &Value::Binary(route.route.signature))?;
statement.bind(8, &Value::Integer(route.timestamp_published as i64))?;
statement.bind(9, &Value::Integer(pinned))?;
statement.bind(10, &Value::Integer(route.weight as i64))?;
statement.next()?;
Ok(())
}
pub fn update_route(&self, route: RouteInternal) -> Result<(), ServiceError> {
if let Ok(existing_route) = self.get_route(route.route.id.clone()) {
if existing_route.timestamp_created > route.route.timestamp_created {
return Err(ServiceError::RouteAlreadyExistsNewerTimestamp(
route.route.label,
route.route.peer_id,
));
}
}
self.write_route(route)
}
pub fn check_route_existence(&self, route_id: &str) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT EXISTS(SELECT 1 FROM {ROUTES_TABLE_NAME} WHERE route_id = ? LIMIT 1)"
))?;
statement.bind(1, &Value::String(route_id.to_string()))?;
if let State::Row = statement.next()? {
let exists = statement.read::<i64>(0)?;
if exists == 1 {
Ok(())
} else {
Err(RouteNotExists(route_id.to_string()))
}
} else {
Err(InternalError(
"EXISTS should always return something".to_string(),
))
}
}
pub fn get_stale_routes(
&self,
stale_timestamp: u64,
) -> Result<Vec<RouteInternal>, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, pinned, weight \
FROM {ROUTES_TABLE_NAME} WHERE timestamp_published <= ?"
))?;
statement.bind(1, &Value::Integer(stale_timestamp as i64))?;
let mut stale_keys: Vec<RouteInternal> = vec![];
while let State::Row = statement.next()? {
stale_keys.push(read_internal_route(&statement)?);
}
Ok(stale_keys)
}
pub fn delete_key(&self, route_id: String) -> Result<(), ServiceError> {
let mut statement = self
.connection
.prepare(f!("DELETE FROM {ROUTES_TABLE_NAME} WHERE route_id=?"))?;
statement.bind(1, &Value::String(route_id.clone()))?;
statement.next().map(drop)?;
if self.connection.changes() == 1 {
Ok(())
} else {
Err(RouteNotExists(route_id))
}
}
/// not pinned only
pub fn get_expired_routes(&self, expired_timestamp: u64) -> Result<Vec<Route>, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \
FROM {ROUTES_TABLE_NAME} WHERE timestamp_created <= ? and pinned != 1"
))?;
statement.bind(1, &Value::Integer(expired_timestamp as i64))?;
let mut expired_routes: Vec<Route> = vec![];
while let State::Row = statement.next()? {
let route = read_route(&statement)?;
let timestamp_accessed = self.get_route_timestamp_accessed(&route.id)?;
let with_host_records = self.get_host_records_count_by_key(route.id.clone())? != 0;
if timestamp_accessed <= expired_timestamp && !with_host_records {
expired_routes.push(route);
}
}
Ok(expired_routes)
}
pub fn get_route_timestamp_accessed(&self, route_id: &str) -> Result<u64, ServiceError> {
let mut statement = self.connection.prepare(f!(
"SELECT timestamp_accessed FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE route_id = ?"
))?;
statement.bind(1, &Value::String(route_id.to_string()))?;
if let State::Row = statement.next()? {
statement
.read::<i64>(0)
.map(|t| t as u64)
.map_err(ServiceError::SqliteError)
} else {
Err(RouteNotExists(route_id.to_string()))
}
}
pub fn clear_expired_timestamps_accessed(
&self,
expired_timestamp: u64,
) -> Result<(), ServiceError> {
let mut statement = self.connection.prepare(f!(
"DELETE FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE timestamp_accessed < ?"
))?;
statement.bind(1, &Value::Integer(expired_timestamp as i64))?;
statement.next().map(drop)?;
Ok(())
}
}
pub fn read_route(statement: &Statement) -> Result<Route, ServiceError> {
Ok(Route {
id: statement.read::<String>(0)?,
label: statement.read::<String>(1)?,
peer_id: statement.read::<String>(2)?,
timestamp_created: statement.read::<i64>(3)? as u64,
challenge: statement.read::<Vec<u8>>(4)?,
challenge_type: statement.read::<String>(5)?,
signature: statement.read::<Vec<u8>>(6)?,
})
}
pub fn read_internal_route(statement: &Statement) -> Result<RouteInternal, ServiceError> {
Ok(RouteInternal {
route: read_route(statement)?,
timestamp_published: statement.read::<i64>(7)? as u64,
pinned: statement.read::<i64>(8)? != 0,
weight: statement.read::<i64>(9)? as u32,
})
}

View File

@ -59,14 +59,15 @@ impl Storage {
// delete expired non-host records // delete expired non-host records
deleted_values += self.clear_expired_records(expired_timestamp)?; deleted_values += self.clear_expired_records(expired_timestamp)?;
let expired_keys = self.get_expired_keys(expired_timestamp)?; let expired_keys = self.get_expired_routes(expired_timestamp)?;
for key in expired_keys { for key in expired_keys {
self.delete_key(key.key_id)?; self.delete_key(key.id)?;
deleted_keys += self.connection.changes() as u64; deleted_keys += self.connection.changes() as u64;
} }
// TODO: clear expired timestamp accessed for keys self.clear_expired_timestamps_accessed(expired_timestamp)?;
Ok((deleted_keys, deleted_values)) Ok((deleted_keys, deleted_values))
} }
@ -79,30 +80,30 @@ impl Storage {
) -> Result<Vec<EvictStaleItem>, ServiceError> { ) -> Result<Vec<EvictStaleItem>, ServiceError> {
let stale_timestamp = current_timestamp_sec - load_config().stale_timeout; let stale_timestamp = current_timestamp_sec - load_config().stale_timeout;
let stale_keys = self.get_stale_keys(stale_timestamp)?; let stale_keys = self.get_stale_routes(stale_timestamp)?;
let mut key_to_delete: Vec<String> = vec![]; let mut key_to_delete: Vec<String> = vec![];
let mut results: Vec<EvictStaleItem> = vec![]; let mut results: Vec<EvictStaleItem> = vec![];
let host_id = marine_rs_sdk::get_call_parameters().host_id; let host_id = marine_rs_sdk::get_call_parameters().host_id;
for key in stale_keys.into_iter() { for route in stale_keys.into_iter() {
let records: Vec<Record> = self let records: Vec<Record> = self
.get_records(key.key.key_id.clone())? .get_records(route.route.id.clone())?
.into_iter() .into_iter()
.map(|r| r.record) .map(|r| r.record)
.collect(); .collect();
if !key.pinned && !records.iter().any(|r| r.peer_id == host_id) { if !route.pinned && !records.iter().any(|r| r.peer_id == host_id) {
key_to_delete.push(key.key.key_id.clone()); key_to_delete.push(route.route.id.clone());
} }
results.push(EvictStaleItem { results.push(EvictStaleItem {
key: key.key, route: route.route,
records, records,
}); });
} }
for key_id in key_to_delete { for route_id in key_to_delete {
self.delete_key(key_id.clone())?; self.delete_key(route_id.clone())?;
self.delete_records_by_key(key_id)?; self.delete_records_by_key(route_id)?;
} }
Ok(results) Ok(results)

View File

@ -24,21 +24,23 @@ mod tests {
use marine_test_env::registry::{DhtResult, Record, ServiceInterface}; use marine_test_env::registry::{DhtResult, Record, ServiceInterface};
use crate::defaults::{ use crate::defaults::{
CONFIG_FILE, DB_PATH, DEFAULT_STALE_VALUE_AGE, KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME, CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME,
RECORDS_TABLE_NAME, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID,
TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID, VALUES_LIMIT, TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID,
}; };
use crate::error::ServiceError::{ use crate::error::ServiceError::{
InvalidKeyTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId, InvalidRouteTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId,
KeyAlreadyExistsNewerTimestamp, RouteAlreadyExistsNewerTimestamp,
};
use crate::tests::tests::marine_test_env::registry::{
RegisterRouteResult, Route, WeightResult,
}; };
use crate::tests::tests::marine_test_env::registry::{Key, RegisterKeyResult, WeightResult};
const HOST_ID: &str = "some_host_id"; const HOST_ID: &str = "some_host_id";
impl PartialEq for Key { impl PartialEq for Route {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.key_id == other.key_id self.id == other.id
&& self.label == other.label && self.label == other.label
&& self.timestamp_created == other.timestamp_created && self.timestamp_created == other.timestamp_created
&& self.signature == other.signature && self.signature == other.signature
@ -46,17 +48,17 @@ mod tests {
} }
} }
impl Eq for Key {} impl Eq for Route {}
fn clear_env() { fn clear_env() {
let connection = Connection::open(DB_PATH).unwrap(); let connection = Connection::open(DB_PATH).unwrap();
connection connection
.execute(f!("DROP TABLE IF EXISTS {KEYS_TABLE_NAME}").as_str(), []) .execute(f!("DROP TABLE IF EXISTS {ROUTES_TABLE_NAME}").as_str(), [])
.unwrap(); .unwrap();
connection connection
.execute( .execute(
f!("DROP TABLE IF EXISTS {KEYS_TIMESTAMPS_TABLE_NAME}").as_str(), f!("DROP TABLE IF EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME}").as_str(),
[], [],
) )
.unwrap(); .unwrap();
@ -144,7 +146,7 @@ mod tests {
} }
} }
fn get_signed_key_bytes( fn get_signed_route_bytes(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
label: String, label: String,
@ -153,17 +155,17 @@ mod tests {
challenge_type: String, challenge_type: String,
) -> Vec<u8> { ) -> Vec<u8> {
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let key_bytes = registry.get_key_bytes( let route_bytes = registry.get_route_bytes(
label.clone(), label.clone(),
vec![issuer_peer_id.clone()], vec![issuer_peer_id.clone()],
timestamp_created, timestamp_created,
challenge, challenge,
challenge_type, challenge_type,
); );
kp.sign(&key_bytes).unwrap().to_vec().to_vec() kp.sign(&route_bytes).unwrap().to_vec().to_vec()
} }
fn register_key( fn register_route(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
label: String, label: String,
@ -171,11 +173,11 @@ mod tests {
current_timestamp: u64, current_timestamp: u64,
pin: bool, pin: bool,
weight: u32, weight: u32,
) -> RegisterKeyResult { ) -> RegisterRouteResult {
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let challenge = vec![]; let challenge = vec![];
let challenge_type = "".to_string(); let challenge_type = "".to_string();
let signature = get_signed_key_bytes( let signature = get_signed_route_bytes(
registry, registry,
kp, kp,
label.clone(), label.clone(),
@ -187,7 +189,7 @@ mod tests {
.add_weight_tetraplets(7) .add_weight_tetraplets(7)
.add_timestamp_tetraplets(8); .add_timestamp_tetraplets(8);
let weight = get_weight(issuer_peer_id.clone(), weight); let weight = get_weight(issuer_peer_id.clone(), weight);
registry.register_key_cp( registry.register_route_cp(
label, label,
vec![issuer_peer_id], vec![issuer_peer_id],
timestamp_created, timestamp_created,
@ -201,43 +203,43 @@ mod tests {
) )
} }
fn register_key_checked( fn register_route_checked(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
key: String, route: String,
timestamp_created: u64, timestamp_created: u64,
current_timestamp: u64, current_timestamp: u64,
pin: bool, pin: bool,
weight: u32, weight: u32,
) -> String { ) -> String {
let result = register_key( let result = register_route(
registry, registry,
kp, kp,
key, route,
timestamp_created, timestamp_created,
current_timestamp, current_timestamp,
pin, pin,
weight, weight,
); );
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
result.key_id result.route_id
} }
fn get_key_metadata( fn get_route_metadata(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
key_id: String, route_id: String,
current_timestamp: u64, current_timestamp: u64,
) -> Key { ) -> Route {
let cp = CPWrapper::new("peer_id").add_timestamp_tetraplets(1); let cp = CPWrapper::new("peer_id").add_timestamp_tetraplets(1);
let result = registry.get_key_metadata_cp(key_id, current_timestamp, cp.get()); let result = registry.get_route_metadata_cp(route_id, current_timestamp, cp.get());
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
result.key result.route
} }
fn get_signed_record_bytes( fn get_signed_record_bytes(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -245,9 +247,9 @@ mod tests {
solution: Vec<u8>, solution: Vec<u8>,
) -> Vec<u8> { ) -> Vec<u8> {
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let mut cp = CPWrapper::new(&issuer_peer_id); let cp = CPWrapper::new(&issuer_peer_id);
let record_bytes = registry.get_record_bytes_cp( let record_bytes = registry.get_record_bytes_cp(
key_id, route_id,
value, value,
relay_id, relay_id,
service_id, service_id,
@ -262,7 +264,7 @@ mod tests {
fn put_record( fn put_record(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -276,7 +278,7 @@ mod tests {
let signature = get_signed_record_bytes( let signature = get_signed_record_bytes(
registry, registry,
kp, kp,
key_id.clone(), route_id.clone(),
value.clone(), value.clone(),
relay_id.clone(), relay_id.clone(),
service_id.clone(), service_id.clone(),
@ -284,12 +286,12 @@ mod tests {
solution.clone(), solution.clone(),
); );
let mut cp = CPWrapper::new(&issuer_peer_id) let cp = CPWrapper::new(&issuer_peer_id)
.add_weight_tetraplets(7) .add_weight_tetraplets(7)
.add_timestamp_tetraplets(8); .add_timestamp_tetraplets(8);
let weight = get_weight(issuer_peer_id.clone(), weight); let weight = get_weight(issuer_peer_id.clone(), weight);
registry.put_record_cp( registry.put_record_cp(
key_id, route_id,
value, value,
relay_id, relay_id,
service_id, service_id,
@ -305,7 +307,7 @@ mod tests {
fn put_record_checked( fn put_record_checked(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
kp: &KeyPair, kp: &KeyPair,
key_id: String, route_id: String,
value: String, value: String,
relay_id: Vec<String>, relay_id: Vec<String>,
service_id: Vec<String>, service_id: Vec<String>,
@ -316,7 +318,7 @@ mod tests {
let result = put_record( let result = put_record(
registry, registry,
kp, kp,
key_id, route_id,
value, value,
relay_id, relay_id,
service_id, service_id,
@ -329,33 +331,30 @@ mod tests {
fn get_records( fn get_records(
registry: &mut ServiceInterface, registry: &mut ServiceInterface,
key_id: String, route_id: String,
current_timestamp: u64, current_timestamp: u64,
) -> Vec<Record> { ) -> Vec<Record> {
let cp = CPWrapper::new("some_peer_id").add_timestamp_tetraplets(1); let cp = CPWrapper::new("some_peer_id").add_timestamp_tetraplets(1);
let result = registry.get_records_cp(key_id, current_timestamp, cp.get()); let result = registry.get_records_cp(route_id, current_timestamp, cp.get());
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
result.result result.result
} }
#[test] #[test]
fn register_key_invalid_signature() { fn register_route_invalid_signature() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let mut cp = CPWrapper::new(&issuer_peer_id); let mut cp = CPWrapper::new(&issuer_peer_id);
let key = "some_key".to_string();
let timestamp_created = 0u64;
let current_timestamp = 100u64;
let weight = get_weight(issuer_peer_id.clone(), 0); let weight = get_weight(issuer_peer_id.clone(), 0);
let invalid_signature = vec![]; let invalid_signature = vec![];
cp = cp.add_weight_tetraplets(5).add_timestamp_tetraplets(6); cp = cp.add_weight_tetraplets(5).add_timestamp_tetraplets(6);
let reg_key_result = registry.register_key_cp( let reg_route_result = registry.register_route_cp(
"some_key".to_string(), "some_route".to_string(),
vec![], vec![],
100u64, 100u64,
vec![], vec![],
@ -366,24 +365,24 @@ mod tests {
10u64, 10u64,
cp.get(), cp.get(),
); );
assert!(!reg_key_result.success); assert!(!reg_route_result.success);
} }
#[test] #[test]
fn register_key_invalid_weight_tetraplet() { fn register_route_invalid_weight_tetraplet() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let mut cp = CPWrapper::new(&issuer_peer_id); let mut cp = CPWrapper::new(&issuer_peer_id);
let label = "some_key".to_string(); let label = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let challenge = vec![]; let challenge = vec![];
let challenge_type = "".to_string(); let challenge_type = "".to_string();
let weight = get_weight(issuer_peer_id.clone(), 0); let weight = get_weight(issuer_peer_id.clone(), 0);
let signature = get_signed_key_bytes( let signature = get_signed_route_bytes(
&mut registry, &mut registry,
&kp, &kp,
label.clone(), label.clone(),
@ -393,7 +392,7 @@ mod tests {
); );
cp = cp.add_timestamp_tetraplets(8); cp = cp.add_timestamp_tetraplets(8);
let reg_key_result = registry.register_key_cp( let reg_route_result = registry.register_route_cp(
label, label,
vec![], vec![],
timestamp_created, timestamp_created,
@ -405,23 +404,23 @@ mod tests {
current_timestamp, current_timestamp,
cp.get(), cp.get(),
); );
assert!(!reg_key_result.success); assert!(!reg_route_result.success);
} }
#[test] #[test]
fn register_key_missing_timestamp_tetraplet() { fn register_route_missing_timestamp_tetraplet() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let label = "some_key".to_string(); let label = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = get_weight(issuer_peer_id.clone(), 0); let weight = get_weight(issuer_peer_id.clone(), 0);
let challenge = vec![1u8, 2u8, 3u8]; let challenge = vec![1u8, 2u8, 3u8];
let challenge_type = "type".to_string(); let challenge_type = "type".to_string();
let signature = get_signed_key_bytes( let signature = get_signed_route_bytes(
&mut registry, &mut registry,
&kp, &kp,
label.clone(), label.clone(),
@ -431,7 +430,7 @@ mod tests {
); );
let cp = CPWrapper::new(&issuer_peer_id).add_weight_tetraplets(7); let cp = CPWrapper::new(&issuer_peer_id).add_weight_tetraplets(7);
let reg_key_result = registry.register_key_cp( let reg_route_result = registry.register_route_cp(
label, label,
vec![], vec![],
timestamp_created, timestamp_created,
@ -443,29 +442,29 @@ mod tests {
current_timestamp, current_timestamp,
cp.get(), cp.get(),
); );
assert!(!reg_key_result.success); assert!(!reg_route_result.success);
assert_eq!( assert_eq!(
reg_key_result.error, reg_route_result.error,
InvalidTimestampTetraplet(format!("{:?}", cp.cp.tetraplets)).to_string() InvalidTimestampTetraplet(format!("{:?}", cp.cp.tetraplets)).to_string()
); );
} }
#[test] #[test]
fn register_key_invalid_weight_peer_id() { fn register_route_invalid_weight_peer_id() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let invalid_peer_id = "INVALID_PEER_ID".to_string(); let invalid_peer_id = "INVALID_PEER_ID".to_string();
let mut cp = CPWrapper::new(&issuer_peer_id); let mut cp = CPWrapper::new(&issuer_peer_id);
let label = "some_key".to_string(); let label = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let challenge = vec![1u8, 2u8, 3u8]; let challenge = vec![1u8, 2u8, 3u8];
let challenge_type = "type".to_string(); let challenge_type = "type".to_string();
let weight = get_weight(invalid_peer_id.clone(), 0); let weight = get_weight(invalid_peer_id.clone(), 0);
let signature = get_signed_key_bytes( let signature = get_signed_route_bytes(
&mut registry, &mut registry,
&kp, &kp,
label.clone(), label.clone(),
@ -475,7 +474,7 @@ mod tests {
); );
cp = cp.add_weight_tetraplets(7).add_timestamp_tetraplets(8); cp = cp.add_weight_tetraplets(7).add_timestamp_tetraplets(8);
let reg_key_result = registry.register_key_cp( let reg_route_result = registry.register_route_cp(
label, label,
vec![], vec![],
timestamp_created, timestamp_created,
@ -487,28 +486,28 @@ mod tests {
current_timestamp, current_timestamp,
cp.get(), cp.get(),
); );
assert!(!reg_key_result.success); assert!(!reg_route_result.success);
assert_eq!( assert_eq!(
reg_key_result.error, reg_route_result.error,
InvalidWeightPeerId(issuer_peer_id, invalid_peer_id).to_string() InvalidWeightPeerId(issuer_peer_id, invalid_peer_id).to_string()
); );
} }
#[test] #[test]
fn register_key_correct() { fn register_route_correct() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let key = "some_key".to_string(); let route = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
let result = register_key( let result = register_route(
&mut registry, &mut registry,
&kp, &kp,
key, route,
timestamp_created, timestamp_created,
current_timestamp, current_timestamp,
pin, pin,
@ -519,20 +518,20 @@ mod tests {
} }
#[test] #[test]
fn register_key_older_timestamp() { fn register_route_older_timestamp() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let key = "some_key".to_string(); let route = "some_route".to_string();
let timestamp_created_first = 100u64; let timestamp_created_first = 100u64;
let current_timestamp = 1000u64; let current_timestamp = 1000u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
register_key_checked( register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
key.clone(), route.clone(),
timestamp_created_first, timestamp_created_first,
current_timestamp, current_timestamp,
pin, pin,
@ -540,10 +539,10 @@ mod tests {
); );
let timestamp_created_second = timestamp_created_first - 10u64; let timestamp_created_second = timestamp_created_first - 10u64;
let result_second = register_key( let result_second = register_route(
&mut registry, &mut registry,
&kp, &kp,
key.clone(), route.clone(),
timestamp_created_second, timestamp_created_second,
current_timestamp, current_timestamp,
pin, pin,
@ -552,89 +551,89 @@ mod tests {
assert_eq!( assert_eq!(
result_second.error, result_second.error,
KeyAlreadyExistsNewerTimestamp(key, kp.get_peer_id().to_base58()).to_string() RouteAlreadyExistsNewerTimestamp(route, kp.get_peer_id().to_base58()).to_string()
); );
} }
#[test] #[test]
fn register_key_in_the_future() { fn register_route_in_the_future() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let key = "some_key".to_string(); let route = "some_route".to_string();
let current_timestamp = 100u64; let current_timestamp = 100u64;
let timestamp_created = current_timestamp + 100u64; let timestamp_created = current_timestamp + 100u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
let result = register_key( let result = register_route(
&mut registry, &mut registry,
&kp, &kp,
key, route,
timestamp_created, timestamp_created,
current_timestamp, current_timestamp,
pin, pin,
weight, weight,
); );
assert_eq!(result.error, InvalidKeyTimestamp.to_string()) assert_eq!(result.error, InvalidRouteTimestamp.to_string())
} }
#[test] #[test]
fn register_key_update_republish_old() { fn register_route_update_republish_old() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let key = "some_key".to_string(); let route = "some_route".to_string();
let timestamp_created_old = 0u64; let timestamp_created_old = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
let key_id = register_key_checked( let route_id = register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
key.clone(), route.clone(),
timestamp_created_old, timestamp_created_old,
current_timestamp, current_timestamp,
pin, pin,
weight, weight,
); );
let old_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let old_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp);
let timestamp_created_new = timestamp_created_old + 10u64; let timestamp_created_new = timestamp_created_old + 10u64;
register_key_checked( register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
key, route,
timestamp_created_new, timestamp_created_new,
current_timestamp, current_timestamp,
pin, pin,
weight, weight,
); );
let new_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let new_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp);
assert_ne!(old_key, new_key); assert_ne!(old_route, new_route);
let cp = CPWrapper::new(&issuer_peer_id) let cp = CPWrapper::new(&issuer_peer_id)
.add_weight_tetraplets(1) .add_weight_tetraplets(1)
.add_timestamp_tetraplets(2); .add_timestamp_tetraplets(2);
let weight = get_weight(issuer_peer_id.clone(), weight); let weight = get_weight(issuer_peer_id.clone(), weight);
let result = let result =
registry.republish_key_cp(old_key.clone(), weight, current_timestamp, cp.get()); registry.republish_route_cp(old_route.clone(), weight, current_timestamp, cp.get());
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp);
assert_eq!(new_key, result_key); assert_eq!(new_route, result_route);
} }
#[test] #[test]
fn get_key_metadata_test() { fn get_route_metadata_test() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let label = "some_key".to_string(); let label = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = 0; let weight = 0;
@ -643,16 +642,16 @@ mod tests {
let challenge_type = "".to_string(); let challenge_type = "".to_string();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let key_bytes = registry.get_key_bytes( let route_bytes = registry.get_route_bytes(
label.clone(), label.clone(),
vec![issuer_peer_id.clone()], vec![issuer_peer_id.clone()],
timestamp_created, timestamp_created,
challenge.clone(), challenge.clone(),
challenge_type.clone(), challenge_type.clone(),
); );
let signature = kp.sign(&key_bytes).unwrap().to_vec().to_vec(); let signature = kp.sign(&route_bytes).unwrap().to_vec().to_vec();
let key_id = register_key_checked( let route_id = register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
label.clone(), label.clone(),
@ -662,9 +661,9 @@ mod tests {
weight, weight,
); );
let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp);
let expected_key = Key { let expected_route = Route {
key_id, id: route_id,
label, label,
peer_id: issuer_peer_id, peer_id: issuer_peer_id,
timestamp_created, timestamp_created,
@ -672,38 +671,38 @@ mod tests {
challenge_type, challenge_type,
signature, signature,
}; };
assert_eq!(result_key, expected_key); assert_eq!(result_route, expected_route);
} }
#[test] #[test]
fn republish_same_key_test() { fn republish_same_route_test() {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let issuer_peer_id = kp.get_peer_id().to_base58(); let issuer_peer_id = kp.get_peer_id().to_base58();
let key = "some_key".to_string(); let route = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
let key_id = register_key_checked( let route_id = register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
key.clone(), route.clone(),
timestamp_created, timestamp_created,
current_timestamp, current_timestamp,
pin, pin,
weight, weight,
); );
let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp);
let cp = CPWrapper::new(&issuer_peer_id) let cp = CPWrapper::new(&issuer_peer_id)
.add_weight_tetraplets(1) .add_weight_tetraplets(1)
.add_timestamp_tetraplets(2); .add_timestamp_tetraplets(2);
let weight = get_weight(issuer_peer_id.clone(), weight); let weight = get_weight(issuer_peer_id.clone(), weight);
let result = let result =
registry.republish_key_cp(result_key.clone(), weight, current_timestamp, cp.get()); registry.republish_route_cp(result_route.clone(), weight, current_timestamp, cp.get());
assert!(result.success, "{}", result.error); assert!(result.success, "{}", result.error);
} }
@ -712,16 +711,16 @@ mod tests {
clear_env(); clear_env();
let mut registry = ServiceInterface::new(); let mut registry = ServiceInterface::new();
let kp = KeyPair::generate_ed25519(); let kp = KeyPair::generate_ed25519();
let key = "some_key".to_string(); let route = "some_route".to_string();
let timestamp_created = 0u64; let timestamp_created = 0u64;
let current_timestamp = 100u64; let current_timestamp = 100u64;
let weight = 0; let weight = 0;
let pin = false; let pin = false;
let key_id = register_key_checked( let route_id = register_route_checked(
&mut registry, &mut registry,
&kp, &kp,
key, route,
timestamp_created, timestamp_created,
current_timestamp, current_timestamp,
pin, pin,
@ -735,7 +734,7 @@ mod tests {
put_record_checked( put_record_checked(
&mut registry, &mut registry,
&kp, &kp,
key_id.clone(), route_id.clone(),
value.clone(), value.clone(),
relay_id.clone(), relay_id.clone(),
service_id.clone(), service_id.clone(),
@ -744,10 +743,10 @@ mod tests {
weight, weight,
); );
let records = get_records(&mut registry, key_id.clone(), current_timestamp); let records = get_records(&mut registry, route_id.clone(), current_timestamp);
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
let record = &records[0]; let record = &records[0];
assert_eq!(record.key_id, key_id); assert_eq!(record.route_id, route_id);
assert_eq!(record.relay_id, relay_id); assert_eq!(record.relay_id, relay_id);
assert_eq!(record.service_id, service_id); assert_eq!(record.service_id, service_id);
assert_eq!(record.peer_id, kp.get_peer_id().to_base58()); assert_eq!(record.peer_id, kp.get_peer_id().to_base58());