From 1c5ec8754d7378f1cd23a1dd606603f04d39f222 Mon Sep 17 00:00:00 2001 From: Aleksey Proshutisnkiy Date: Mon, 11 Apr 2022 16:24:51 +0400 Subject: [PATCH] Add resources-api, update terminology, remove pinned field from keys (#99) --- README.md | 4 +- aqua/registry-api.aqua | 68 ++--- aqua/registry-scheduled-scripts.aqua | 2 +- aqua/registry.aqua | 103 ------- aqua/resources-api.aqua | 330 +++++++++++++++++++++++ aqua/routing.aqua | 217 --------------- example/src/aqua/event_example.aqua | 8 +- example/src/aqua/export.aqua | 4 +- example/src/example.ts | 23 +- service/Cargo.lock | 80 +++--- service/build.rs | 2 +- service/build.sh | 2 +- service/src/defaults.rs | 8 +- service/src/error.rs | 16 +- service/src/{route.rs => key.rs} | 27 +- service/src/{route_api.rs => key_api.rs} | 84 +++--- service/src/key_storage_impl.rs | 231 ++++++++++++++++ service/src/main.rs | 8 +- service/src/record.rs | 6 +- service/src/record_api.rs | 60 ++--- service/src/record_storage_impl.rs | 63 +++-- service/src/results.rs | 56 ++-- service/src/route_storage_impl.rs | 239 ---------------- service/src/storage_impl.rs | 22 +- service/src/tests/mod.rs | 272 +++++++++---------- 25 files changed, 955 insertions(+), 980 deletions(-) delete mode 100644 aqua/registry.aqua create mode 100644 aqua/resources-api.aqua delete mode 100644 aqua/routing.aqua rename service/src/{route.rs => key.rs} (82%) rename service/src/{route_api.rs => key_api.rs} (60%) create mode 100644 service/src/key_storage_impl.rs delete mode 100644 service/src/route_storage_impl.rs diff --git a/README.md b/README.md index b4b4bdb..8d65091 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [Distributed Hash Table](https://en.wikipedia.org/wiki/Distributed_hash_table) (DHT) implementation for the Fluence network with an Aqua interface. ## Documentation -See [Aqua Book](https://fluence.dev/aqua-book/libraries/aqua-dht). +See [Aqua Book](https://fluence.dev/aqua-book/libraries/registry). ## How to Use @@ -13,7 +13,7 @@ See [example](./example): ## API -API is defined in the [routing.aqua](./aqua/routing.aqua) module. +API is defined in the [resources-api.aqua](./aqua/resources-api.aqua) module. ## Learn Aqua diff --git a/aqua/registry-api.aqua b/aqua/registry-api.aqua index 417ee44..6d65788 100644 --- a/aqua/registry-api.aqua +++ b/aqua/registry-api.aqua @@ -1,56 +1,56 @@ -import "registry.aqua" -import PeerId, Peer, Sig from "@fluencelabs/aqua-lib/builtin.aqua" +import "registry-service.aqua" +import PeerId, Peer, Sig, SignResult from "@fluencelabs/aqua-lib/builtin.aqua" import "@fluencelabs/trust-graph/trust-graph.aqua" -func getRouteSignature(label: string, timestamp_created: u64) -> []u8: - on HOST_PEER_ID: - bytes <- Registry.get_route_bytes(label, nil, timestamp_created, nil, "") - signature <- Sig.sign(bytes) - <- signature.signature! - -func getRecordSignature(route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: - on HOST_PEER_ID: - bytes <- Registry.get_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil) - signature <- Sig.sign(bytes) - <- signature.signature! - -func getHostRecordSignature(provider_node_id: PeerId, route_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> []u8: - on provider_node_id: - bytes <- Registry.get_host_record_bytes(route_id, value, relay_id, service_id, timestamp_created, nil) - signature <- Sig.sign(bytes) - <- signature.signature! - -func registerRoute(label: string, timestamp_created: u64, signature: []u8, pin: bool) -> RegisterRouteResult: - t <- Peer.timestamp_sec() - weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.register_route(label, nil, timestamp_created, nil, "", signature, pin, weight, t) +func getKeySignature(label: string, timestamp_created: u64) -> SignResult: + bytes <- Registry.get_key_bytes(label, nil, timestamp_created, nil, "") + on INIT_PEER_ID: + result <- Sig.sign(bytes) <- result -func putRecord(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> DhtResult: +func getRecordSignature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> SignResult: + bytes <- Registry.get_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) + on INIT_PEER_ID: + signature <- Sig.sign(bytes) + <- signature + +func getHostRecordSignature(key_id: string, value: string, relay_id: ?PeerId, service_id: ?string, timestamp_created: u64) -> SignResult: + bytes <- Registry.get_host_record_bytes(key_id, value, relay_id, service_id, timestamp_created, nil) + on INIT_PEER_ID: + signature <- Sig.sign(bytes) + <- signature + +func registerKey(label: string, timestamp_created: u64, signature: []u8) -> RegisterKeyResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.put_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) + result <- Registry.register_key(label, nil, timestamp_created, nil, "", signature, weight, t) <- result -func putHostRecord(route_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult: +func putRecord(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> RegistryResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(%init_peer_id%, t) - result <- Registry.put_host_record(route_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) + result <- Registry.put_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) <- result -func propagateHostRecord(res: PutHostRecordResult) -> DhtResult: +func putHostRecord(key_id: string, value: string, relay_id: ?PeerId, service_id: []string, timestamp_created: u64, signature: []u8) -> PutHostRecordResult: + t <- Peer.timestamp_sec() + weight <- TrustGraph.get_weight(%init_peer_id%, t) + result <- Registry.put_host_record(key_id, value, relay_id, service_id, timestamp_created, nil, signature, weight, t) + <- result + +func propagateHostRecord(res: PutHostRecordResult) -> RegistryResult: t <- Peer.timestamp_sec() weight <- TrustGraph.get_weight(res.record!.peer_id, t) result <- Registry.propagate_host_record(res, t, weight) <- result -func getRouteMetadata(route_id: string) -> GetRouteMetadataResult: +func getKeyMetadata(key_id: string) -> GetKeyMetadataResult: t <- Peer.timestamp_sec() - result <- Registry.get_route_metadata(route_id, t) + result <- Registry.get_key_metadata(key_id, t) <- result -func republishRoute(route: MergeRoutesResult) -> DhtResult: +func republishKey(key: Key) -> RegistryResult: t <- Peer.timestamp_sec() - weight <- TrustGraph.get_weight(route.route.peer_id, t) - result <- Registry.republish_route(route.route, weight, t) + weight <- TrustGraph.get_weight(key.owner_peer_id, t) + result <- Registry.republish_key(key, weight, t) <- result diff --git a/aqua/registry-scheduled-scripts.aqua b/aqua/registry-scheduled-scripts.aqua index 1b6dd17..f355e9e 100644 --- a/aqua/registry-scheduled-scripts.aqua +++ b/aqua/registry-scheduled-scripts.aqua @@ -24,7 +24,7 @@ func replicate_3600(): on n: tt <- Peer.timestamp_sec() key_weight <- TrustGraph.get_weight(r.route.peer_id, tt) - Registry.republish_route(r.route, key_weight, tt) + Registry.republish_key(r.route, key_weight, tt) records_weights: *WeightResult for record <- r.records: diff --git a/aqua/registry.aqua b/aqua/registry.aqua deleted file mode 100644 index e949f23..0000000 --- a/aqua/registry.aqua +++ /dev/null @@ -1,103 +0,0 @@ -module Registry declares * - -data ClearExpiredResult: - success: bool - error: string - count_routes: u64 - count_records: u64 - -data DhtResult: - success: bool - error: string - -data Route: - id: string - label: string - peer_id: string - timestamp_created: u64 - challenge: []u8 - challenge_type: string - signature: []u8 - -data Record: - route_id: string - value: string - peer_id: string - set_by: string - relay_id: []string - service_id: []string - timestamp_created: u64 - solution: []u8 - signature: []u8 - -data EvictStaleItem: - route: Route - records: []Record - -data EvictStaleResult: - success: bool - error: string - results: []EvictStaleItem - -data GetRecordsResult: - success: bool - error: string - result: []Record - -data GetRouteMetadataResult: - success: bool - error: string - route: Route - -data MergeResult: - success: bool - error: string - result: []Record - -data MergeRoutesResult: - success: bool - error: string - route: Route - -data PutHostRecordResult: - success: bool - error: string - record: []Record - -data RegisterRouteResult: - success: bool - error: string - route_id: string - -data RepublishRecordsResult: - success: bool - error: string - updated: u64 - -data WeightResult: - success: bool - weight: u32 - peer_id: string - error: string - -service Registry("registry"): - clear_expired(current_timestamp_sec: u64) -> ClearExpiredResult - clear_host_record(route_id: string, current_timestamp_sec: u64) -> DhtResult - evict_stale(current_timestamp_sec: u64) -> EvictStaleResult - get_host_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 - get_record_bytes(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8) -> []u8 - get_records(route_id: string, current_timestamp_sec: u64) -> GetRecordsResult - get_route_bytes(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string) -> []u8 - get_route_id(label: string, peer_id: string) -> string - get_route_metadata(route_id: string, current_timestamp_sec: u64) -> GetRouteMetadataResult - merge(records: [][]Record) -> MergeResult - merge_routes(routes: []Route) -> MergeRoutesResult - merge_two(a: []Record, b: []Record) -> MergeResult - propagate_host_record(set_host_value: PutHostRecordResult, current_timestamp_sec: u64, weight: WeightResult) -> DhtResult - put_host_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> PutHostRecordResult - put_record(route_id: string, value: string, relay_id: []string, service_id: []string, timestamp_created: u64, solution: []u8, signature: []u8, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult - register_route(label: string, peer_id: []string, timestamp_created: u64, challenge: []u8, challenge_type: string, signature: []u8, pin: bool, weight: WeightResult, current_timestamp_sec: u64) -> RegisterRouteResult - republish_records(records: []Record, weights: []WeightResult, current_timestamp_sec: u64) -> RepublishRecordsResult - republish_route(route: Route, weight: WeightResult, current_timestamp_sec: u64) -> DhtResult - set_expired_timeout(timeout_sec: u64) - set_stale_timeout(timeout_sec: u64) diff --git a/aqua/resources-api.aqua b/aqua/resources-api.aqua new file mode 100644 index 0000000..ce66481 --- /dev/null +++ b/aqua/resources-api.aqua @@ -0,0 +1,330 @@ +module Registry.ResourcesAPI declares * + +import "registry-service.aqua" +import "registry-api.aqua" +import "@fluencelabs/aqua-lib/builtin.aqua" + +alias ResourceId: string +alias Error: string + +func appendErrors(error1: *Error, error2: *Error): + for e <- error2: + error1 <<- e + +func getResourceId(label: string, peer_id: string) -> ResourceId: + resource_id <- Registry.get_key_id(label, peer_id) + <- resource_id + +-- Get peers closest to the resource_id's hash in Kademlia network +-- These peers are expected to store list of providers for this key +func getNeighbours(resource_id: ResourceId) -> []PeerId: + k <- Op.string_to_b58(resource_id) + nodes <- Kademlia.neighborhood(k, nil, nil) + <- nodes + +func getResource(resource_id: ResourceId) -> ?Key, *Error: + nodes <- getNeighbours(resource_id) + resources: *Key + error: *Error + for n <- nodes par: + on n: + try: + t <- Peer.timestamp_sec() + get_result <- Registry.get_key_metadata(resource_id, t) + if get_result.success: + resources <<- get_result.key + else: + error <<- get_result.error + + join resources[0] + par error <- Peer.timeout(10000, "resource not found, timeout exceeded") + + merge_result <- Registry.merge_keys(resources) + resource: ?Key + + if merge_result.success: + resource <<- merge_result.key + else: + error <<- merge_result.error + + <- resource, error + +-- If this peer have set node_id as a provider for resource, +-- this call will prevent provider from renew +-- so that eventually it will disappear from the providers list +func removeNodeFromProviders(resource_id: ResourceId): + on HOST_PEER_ID: + t <- Peer.timestamp_sec() + Registry.clear_host_record(resource_id, t) + +-- Create a resource: register it on the closest peers +func createResource(label: string) -> ?ResourceId, *Error: + t <- Peer.timestamp_sec() + + resource_id: ?ResourceId + error: *Error + successful: *bool + on HOST_PEER_ID: + sig_result <- getKeySignature(label, t) + if sig_result.success: + signature = sig_result.signature! + id <- getResourceId(label, INIT_PEER_ID) + nodes <- getNeighbours(id) + + for n <- nodes par: + on n: + try: + res <- registerKey(label, t, signature) + if res.success: + successful <<- true + else: + error <<- res.error + par Peer.timeout(10000, "timeout exceeded") + else: + error <<- sig_result.error! + + timeout: ?string + join successful[0] + par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") + + if timeout == nil: + resource_id <<- id + else: + error <<- timeout! + + <- resource_id, error + +-- Create a resource and register as provider +-- INIT_PEER_ID (current client) will become a provider +func createResourceAndRegisterProvider(label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: + resource_id: ?ResourceId + error: *Error + + relay_id: ?string + relay_id <<- HOST_PEER_ID + + t <- Peer.timestamp_sec() + successful: *bool + on HOST_PEER_ID: + key_sig_result <- getKeySignature(label, t) + + if key_sig_result.success == false: + error <<- key_sig_result.error! + else: + id <- getResourceId(label, INIT_PEER_ID) + record_sig_result <- getRecordSignature(id, value, relay_id, service_id, t) + if record_sig_result.success == false: + error <<- record_sig_result.error! + else: + key_signature = key_sig_result.signature! + record_signature = record_sig_result.signature! + nodes <- getNeighbours(id) + for n <- nodes par: + on n: + try: + reg_res <- registerKey(label, t, key_signature) + if reg_res.success: + put_res <- putRecord(id, value, relay_id, service_id, t, record_signature) + if put_res.success: + successful <<- true + else: + error <<- put_res.error + else: + error <<- reg_res.error + par Peer.timeout(10000, "timeout exceeded") + + timeout: ?string + join successful[0] + par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") + + if timeout == nil: + resource_id <<- id + else: + error <<- timeout! + + <- resource_id, error + +-- Create a resource and make the given node a provider for it +func createResourceAndRegisterNodeProvider(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> ?ResourceId, *Error: + resource_id: ?ResourceId + error: *Error + + t <- Peer.timestamp_sec() + + successful: *bool + on provider_node_id: + key_sig_result <- getKeySignature(label, t) + + if key_sig_result.success == false: + error <<- key_sig_result.error! + else: + id <- getResourceId(label, INIT_PEER_ID) + record_sig_result <- getHostRecordSignature(id, value, nil, service_id, t) + if record_sig_result.success == false: + error <<- record_sig_result.error! + else: + key_signature = key_sig_result.signature! + record_signature = record_sig_result.signature! + reg_res1 <- registerKey(label, t, key_signature) + if reg_res1.success == false: + error <<- reg_res1.error + else: + r <- putHostRecord(id, value, nil, service_id, t, record_signature) + nodes <- getNeighbours(id) + for n <- nodes par: + on n: + try: + reg_res <- registerKey(label, t, key_signature) + if reg_res.success: + prop_res <- propagateHostRecord(r) + if prop_res.success: + successful <<- true + else: + error <<- prop_res.error + else: + error <<- reg_res.error + par Peer.timeout(10000, "timeout exceeded") + + timeout: ?string + join successful[0] + par timeout <- Peer.timeout(10000, "resource hasn't created: timeout exceeded") + + if timeout == nil: + resource_id <<- id + else: + error <<- timeout! + + <- resource_id, error + +-- Register for a resource as provider +-- Note: resource must be already created +func registerProvider(resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: + error: *Error + relay_id: ?string + relay_id <<- HOST_PEER_ID + + t <- Peer.timestamp_sec() + + on HOST_PEER_ID: + record_sig_result <- getRecordSignature(resource_id, value, relay_id, service_id, t) + + if record_sig_result.success == false: + error <<- record_sig_result.error! + else: + record_signature = record_sig_result.signature! + key, error_get <- getResource(resource_id) + appendErrors(error, error_get) + + successful: *bool + if key != nil: + nodes <- getNeighbours(resource_id) + for n <- nodes par: + on n: + try: + republish_res <- republishKey(key!) + if republish_res.success == false: + error <<- republish_res.error + else: + put_res <- putRecord(resource_id, value, relay_id, service_id, t, record_signature) + if put_res.success: + successful <<- true + else: + error <<- put_res.error + par Peer.timeout(10000, "timeout exceeded") + + + timeout: ?string + join successful[0] + par timeout <- Peer.timeout(10000, "provider hasn't registered: timeout exceeded") + + success: *bool + if timeout == nil: + success <<- true + else: + success <<- false + error <<- timeout! + + <- success!, error + +-- Register a node as provider to the given resource +-- Note: resource must be already created +func registerNodeProvider(provider_node_id: PeerId, resource_id: ResourceId, value: string, service_id: ?string) -> bool, *Error: + error: *Error + t <- Peer.timestamp_sec() + + successful: *bool + on provider_node_id: + record_sig_result <- getHostRecordSignature(resource_id, value, nil, service_id, t) + if record_sig_result.success == false: + error <<- record_sig_result.error! + else: + record_signature = record_sig_result.signature! + key, error_get <- getResource(resource_id) + if key == nil: + appendErrors(error, error_get) + else: + republish_result <- republishKey(key!) + if republish_result.success == false: + error <<- republish_result.error + else: + r <- putHostRecord(resource_id, value, nil, service_id, t, record_signature) + nodes <- getNeighbours(resource_id) + for n <- nodes par: + on n: + try: + republish_res <- republishKey(key!) + if republish_res.success == false: + error <<- republish_res.error + else: + prop_res <- propagateHostRecord(r) + if prop_res.success: + successful <<- true + else: + error <<- prop_res.error + par Peer.timeout(10000, "timeout exceeded") + + timeout: ?string + join successful[0] + par timeout <- Peer.timeout(10000, "provider hasn't registered: timeout exceeded") + + success: *bool + if timeout == nil: + success <<- true + else: + success <<- false + error <<- timeout! + + <- success!, error + +-- Find the list of providers' records for the given resource_id +func resolveProviders(resource_id: ResourceId, ack: i16) -> []Record, *Error: + on HOST_PEER_ID: + nodes <- getNeighbours(resource_id) + res: *[]Record + error: *Error + for n <- nodes par: + on n: + try: + t <- Peer.timestamp_sec() + get_result <- Registry.get_records(resource_id, t) + if get_result.success: + res <<- get_result.result + else: + error <<- get_result.error + par Peer.timeout(10000, "timeout exceeded") + + join res[ack] + par Peer.timeout(10000, "") + result <- Registry.merge(res) + if result.success == false: + error <<- result.error + <- result.result, error + +-- Execute the given call on providers +-- Note that you can provide another Aqua function as an argument to this one +func executeOnProviders(resource_id: ResourceId, ack: i16, call: Record -> ()) -> *Error: + providers, error <- resolveProviders(resource_id, ack) + for r <- providers par: + on r.peer_id via r.relay_id: + call(r) + <- error diff --git a/aqua/routing.aqua b/aqua/routing.aqua deleted file mode 100644 index edec8c7..0000000 --- a/aqua/routing.aqua +++ /dev/null @@ -1,217 +0,0 @@ -module Registry.Routing declares * - -import "registry.aqua" -import "registry-api.aqua" -import "@fluencelabs/aqua-lib/builtin.aqua" - -alias RouteId: string - -func getRouteId(label: string, peer_id: string) -> RouteId: - route_id <- Registry.get_route_id(label, peer_id) - <- route_id - --- Get peers closest to the route_id's hash in Kademlia network --- These peers are expected to store list of providers for this route -func getNeighbours(route_id: RouteId) -> []PeerId: - k <- Op.string_to_b58(route_id) - nodes <- Kademlia.neighborhood(k, nil, nil) - <- nodes - -func getRoute(route_id: RouteId) -> MergeRoutesResult: - nodes <- getNeighbours(route_id) - res: *Route - for n <- nodes par: - on n: - try: - t <- Peer.timestamp_sec() - get_result <- Registry.get_route_metadata(route_id, t) - if get_result.success: - res <<- get_result.route - - join res[0] - result <- Registry.merge_routes(res) - <- result - --- If this peer have set node_id as a provider for route, --- this call will prevent provider from renew --- so that eventually it will disappear from the providers list -func removeFromRoute(route_id: RouteId): - on HOST_PEER_ID: - t <- Peer.timestamp_sec() - Registry.clear_host_record(route_id, t) - --- Create a route: register it on the closest peers -func createRoute(label: string) -> RouteId: - t <- Peer.timestamp_sec() - signature <- getRouteSignature(label, t) - - on HOST_PEER_ID: - route_id <- getRouteId(label, INIT_PEER_ID) - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - result <- registerRoute(label, t, signature, false) - <- route_id - --- Create a route and register for it --- INIT_PEER_ID (current client) will become a provider -func createRouteAndRegister(label: string, value: string, service_id: ?string) -> string: - relay_id: ?string - relay_id <<- HOST_PEER_ID - - t <- Peer.timestamp_sec() - route_signature <- getRouteSignature(label, t) - on HOST_PEER_ID: - route_id <- getRouteId(label, INIT_PEER_ID) - record_signature <- getRecordSignature(route_id, value, relay_id, service_id, t) - - on HOST_PEER_ID: - - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - registerRoute(label, t, route_signature, false) - putRecord(route_id, value, relay_id, service_id, t, record_signature) - <- route_id - --- Create a route and register for it --- INIT_PEER_ID (current client) will become a provider --- In contrast with non-blocking version, waits for at least a single write to succeed -func createRouteAndRegisterBlocking( - label: string, value: string, - service_id: ?string, - progress: string -> (), - ack: i16 -) -> string: - relay_id: ?string - relay_id <<- HOST_PEER_ID - - t <- Peer.timestamp_sec() - route_signature <- getRouteSignature(label, t) - on HOST_PEER_ID: - route_id <- getRouteId(label, INIT_PEER_ID) - record_signature <- getRecordSignature(route_id, value, relay_id, service_id, t) - - results: *DhtResult - on HOST_PEER_ID: - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - res1 <- registerRoute(label, t, route_signature, false) - result <- putRecord(route_id, value, relay_id, service_id, t, record_signature) - if result.success: - results <<- result - progress(n) - join results[ack] - <- route_id - --- Create a route and make the given node a provider for it -func createRouteAndRegisterNode(provider_node_id: PeerId, label: string, value: string, service_id: ?string) -> RouteId: - t <- Peer.timestamp_sec() - route_signature <- getRouteSignature(label, t) - on HOST_PEER_ID: - route_id <- getRouteId(label, INIT_PEER_ID) - - record_signature <- getHostRecordSignature(provider_node_id, route_id, value, nil, service_id, t) - - on provider_node_id: - registerRoute(label, t, route_signature, false) - r <- putHostRecord(route_id, value, nil, service_id, t, record_signature) - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - registerRoute(label, t, route_signature, false) - propagateHostRecord(r) - <- route_id - -func createRouteAndRegisterNodeBlocking( - provider_node_id: PeerId, label: string, - value: string, service_id: ?string, - progress: string -> (), - ack: i16 -) -> RouteId: - t <- Peer.timestamp_sec() - route_signature <- getRouteSignature(label, t) - on HOST_PEER_ID: - route_id <- getRouteId(label, INIT_PEER_ID) - - record_signature <- getHostRecordSignature(provider_node_id, route_id, value, nil, service_id, t) - - results: *DhtResult - on provider_node_id: - registerRoute(label, t, route_signature, false) - r <- putHostRecord(route_id, value, nil, service_id, t, record_signature) - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - registerRoute(label, t, route_signature, false) - results <- propagateHostRecord(r) - progress(n) - - join results[ack] - <- route_id - --- Register for a route --- Note: route must be already initiated -func registerForRoute(route_id: RouteId, value: string, service_id: ?string): - relay_id: ?string - relay_id <<- HOST_PEER_ID - - t <- Peer.timestamp_sec() - record_signature <- getRecordSignature(route_id, value, relay_id, service_id, t) - - on HOST_PEER_ID: - route <- getRoute(route_id) - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - republishRoute(route) - putRecord(route_id, value, relay_id, service_id, t, record_signature) - - --- Register a node to the given route --- Note: route must be already initiated -func registerForRouteNode(provider_node_id: PeerId, route_id: RouteId, value: string, service_id: ?string): - t <- Peer.timestamp_sec() - record_signature <- getHostRecordSignature(provider_node_id, route_id, value, nil, service_id, t) - - on provider_node_id: - route <- getRoute(route_id) - republishRoute(route) - r <- putHostRecord(route_id, value, nil, service_id, t, record_signature) - nodes <- getNeighbours(route_id) - for n <- nodes par: - on n: - try: - republishRoute(route) - propagateHostRecord(r) - --- Find the list of record for the given route_id -func resolveRoute(route_id: RouteId, ack: i16) -> []Record: - on HOST_PEER_ID: - nodes <- getNeighbours(route_id) - res: *[]Record - for n <- nodes par: - on n: - try: - t <- Peer.timestamp_sec() - get_result <- Registry.get_records(route_id, t) - res <<- get_result.result - - join res[ack] - result <- Registry.merge(res) - <- result.result - --- Execute the given code on providers --- Note that you can provide another Aqua function as an argument to this one -func executeOnRoute(route_id: RouteId, ack: i16, call: Record -> ()): - providers <- resolveRoute(route_id, ack) - for r <- providers par: - on r.peer_id via r.relay_id: - call(r) diff --git a/example/src/aqua/event_example.aqua b/example/src/aqua/event_example.aqua index b6a4651..8786b6d 100644 --- a/example/src/aqua/event_example.aqua +++ b/example/src/aqua/event_example.aqua @@ -1,8 +1,8 @@ -- This file demonstrates how to send events to subscribers of a topic -- Detailed explanation can be found in the Aqua Book: https://doc.fluence.dev/aqua-book/libraries/aqua-dht#passing-data-to-subscribers -import "@fluencelabs/registry/routing.aqua" -import "@fluencelabs/registry/registry.aqua" +import "@fluencelabs/registry/registry-service.aqua" +import "@fluencelabs/registry/resources-api.aqua" import PeerId from "@fluencelabs/aqua-lib/builtin.aqua" -- Application event @@ -23,10 +23,10 @@ func notify_peer(rec: Record, event: Event): EventAPI.receive_event(event) -- send event to every peer registered on route -func send_everyone(route_id: string, event: Event, ack: i16): +func send_everyone(key_id: string, event: Event, ack: i16): on HOST_PEER_ID: -- retrieve all peers registered to the route - records <- resolveRoute(route_id, ack) + records <- resolveProviders(key_id, ack) -- iterate through them for rec <- records par: notify_peer(rec, event) diff --git a/example/src/aqua/export.aqua b/example/src/aqua/export.aqua index aeaa5db..04f1d77 100644 --- a/example/src/aqua/export.aqua +++ b/example/src/aqua/export.aqua @@ -1,7 +1,7 @@ module Export -import createRouteAndRegisterNodeBlocking, resolveRoute from "@fluencelabs/registry/routing.aqua" +import createResourceAndRegisterNodeProvider, resolveProviders from "@fluencelabs/registry/resources-api.aqua" import Peer from "@fluencelabs/aqua-lib/builtin.aqua" -export createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec +export createResourceAndRegisterNodeProvider, resolveProviders, timestamp_sec func timestamp_sec() -> u64: diff --git a/example/src/example.ts b/example/src/example.ts index a618d62..65a2067 100644 --- a/example/src/example.ts +++ b/example/src/example.ts @@ -1,6 +1,6 @@ import {Fluence, KeyPair} from "@fluencelabs/fluence"; import { krasnodar, Node } from "@fluencelabs/fluence-network-environment"; -import {createRouteAndRegisterNodeBlocking, resolveRoute, timestamp_sec} from "./generated/export"; +import {createResourceAndRegisterNodeProvider, resolveProviders, timestamp_sec} from "./generated/export"; let local: Node[] = [ { @@ -31,17 +31,20 @@ async function main() { ); let label = "myLabel"; let value = "myValue"; - console.log("Will create route with label:", label); + console.log("Will create resource with label:", label); // create route (if not exists) and register on it - let route_id = await createRouteAndRegisterNodeBlocking(krasnodar[0].peerId, - label, value, "identity", - (s) => console.log(`node ${s} saved the record`), - 5 + let [resource_id, error] = await createResourceAndRegisterNodeProvider(krasnodar[0].peerId, + label, value, "identity" ); - // find other peers on this route - console.log("let's resolve route for %s", route_id); - let providers = await resolveRoute(route_id, 5); - console.log("route providers:", providers); + + if (resource_id !== null) { + // find other peers on this route + console.log("let's resolve route for %s", resource_id); + let [providers, error] = await resolveProviders(resource_id, 5); + console.log("route providers:", providers); + } else { + console.error(error); + } } main().then(() => process.exit(0)) diff --git a/service/Cargo.lock b/service/Cargo.lock index 91eed3e..9e6df69 100644 --- a/service/Cargo.lock +++ b/service/Cargo.lock @@ -436,12 +436,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.13.1" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0d720b8683f8dd83c65155f0530560cba68cd2bf395f6513a483caee57ff7f4" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core 0.13.1", - "darling_macro 0.13.1", + "darling_core 0.13.4", + "darling_macro 0.13.4", ] [[package]] @@ -460,9 +460,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.13.1" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a340f241d2ceed1deb47ae36c4144b2707ec7dd0b649f894cb39bb595986324" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" dependencies = [ "fnv", "ident_case", @@ -485,11 +485,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.13.1" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72c41b3b7352feb3211a0d743dc5700a4e3b60f51bd2b368892d1e0f9a95f44b" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core 0.13.1", + "darling_core 0.13.4", "quote", "syn", ] @@ -591,9 +591,9 @@ dependencies = [ [[package]] name = "eyre" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9289ed2c0440a6536e65119725cf91fc2c6b5e513bfd2e36e1134d7cca6ca12f" +checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb" dependencies = [ "indenter", "once_cell", @@ -1106,9 +1106,9 @@ checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" [[package]] name = "js-sys" -version = "0.3.56" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" +checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" dependencies = [ "wasm-bindgen", ] @@ -1140,9 +1140,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" [[package]] name = "libp2p-core" @@ -1907,9 +1907,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" dependencies = [ "unicode-xid", ] @@ -1980,9 +1980,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -2313,9 +2313,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec1e6ec4d8950e5b1e894eac0d360742f3b1407a6078a604a731c4b3f49cefbc" +checksum = "946fa04a8ac43ff78a1f4b811990afb9ddbdf5890b46d6dda0ba1998230138b7" dependencies = [ "rustversion", "serde", @@ -2324,11 +2324,11 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12e47be9471c72889ebafb5e14d5ff930d89ae7a67bbdb5f8abb564f845a927e" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ - "darling 0.13.1", + "darling 0.13.4", "proc-macro2", "quote", "syn", @@ -2378,9 +2378,9 @@ checksum = "f054c6c1a6e95179d6f23ed974060dcefb2d9388bb7256900badad682c499de4" [[package]] name = "slab" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" @@ -2420,9 +2420,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.90" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" dependencies = [ "proc-macro2", "quote", @@ -2680,9 +2680,9 @@ checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasm-bindgen" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" +checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -2690,9 +2690,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" +checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" dependencies = [ "bumpalo", "lazy_static", @@ -2705,9 +2705,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" +checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2715,9 +2715,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" +checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" dependencies = [ "proc-macro2", "quote", @@ -2728,9 +2728,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" +checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" [[package]] name = "wasmer-clif-backend-fl" @@ -2931,9 +2931,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.56" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" +checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/service/build.rs b/service/build.rs index 344d62a..f9804f2 100644 --- a/service/build.rs +++ b/service/build.rs @@ -29,7 +29,7 @@ fn main() { generate_marine_test_env(services, "marine_test_env.rs", file!()); } - println!("cargo:rerun-if-changed=src/route_api.rs"); + println!("cargo:rerun-if-changed=src/key_api.rs"); println!("cargo:rerun-if-changed=src/record_api.rs"); println!("cargo:rerun-if-changed=src/main.rs"); } diff --git a/service/build.sh b/service/build.sh index 14bce16..927eeaf 100755 --- a/service/build.sh +++ b/service/build.sh @@ -17,4 +17,4 @@ cp target/wasm32-wasi/release/registry.wasm artifacts/ curl -L https://github.com/fluencelabs/sqlite/releases/download/v0.15.0_w/sqlite3.wasm -o artifacts/sqlite3.wasm # generate Aqua bindings -marine aqua artifacts/registry.wasm -s Registry -i registry >../aqua/registry.aqua +marine aqua artifacts/registry.wasm -s Registry -i registry >../aqua/registry-service.aqua diff --git a/service/src/defaults.rs b/service/src/defaults.rs index 7069e4e..47e4546 100644 --- a/service/src/defaults.rs +++ b/service/src/defaults.rs @@ -15,14 +15,14 @@ */ // TODO: sanitize tables' names in SQL expressions -pub static ROUTES_TABLE_NAME: &str = "dht_routes"; -pub static ROUTES_TIMESTAMPS_TABLE_NAME: &str = "dht_routes_timestamps"; -pub static RECORDS_TABLE_NAME: &str = "dht_records"; +pub static KEYS_TABLE_NAME: &str = "keys_table"; +pub static KEYS_TIMESTAMPS_TABLE_NAME: &str = "keys_timestamps_table"; +pub static RECORDS_TABLE_NAME: &str = "records_table"; pub static CONFIG_FILE: &str = "/tmp/Config.toml"; pub static DB_PATH: &str = "/tmp/registry.db"; pub static DEFAULT_STALE_VALUE_AGE: u64 = 60 * 60; pub static DEFAULT_EXPIRED_VALUE_AGE: u64 = 24 * 60 * 60; -pub static VALUES_LIMIT: usize = 32; +pub static RECORDS_LIMIT: usize = 32; pub static TRUSTED_TIMESTAMP_SERVICE_ID: &str = "peer"; pub static TRUSTED_TIMESTAMP_FUNCTION_NAME: &str = "timestamp_sec"; diff --git a/service/src/error.rs b/service/src/error.rs index b75bc8f..aba004e 100644 --- a/service/src/error.rs +++ b/service/src/error.rs @@ -25,13 +25,13 @@ pub enum ServiceError { #[source] SqliteError, ), - #[error("Requested route {0} does not exist")] - RouteNotExists(String), - #[error("Route {0} for {1} peer_id already exists with newer timestamp")] - RouteAlreadyExistsNewerTimestamp(String, String), + #[error("Requested key {0} does not exist")] + KeyNotExists(String), + #[error("Key {0} for {1} peer_id already exists with newer timestamp")] + KeyAlreadyExistsNewerTimestamp(String, String), #[error("Values limit for key_d {0} is exceeded")] ValuesLimitExceeded(String), - #[error("Host value for route_id {0} not found ")] + #[error("Host value for key_id {0} not found ")] HostValueNotFound(String), #[error("Invalid set_host_value result: success is false or value is missing")] InvalidSetHostValueResult, @@ -59,8 +59,8 @@ pub enum ServiceError { String, #[source] fluence_keypair::error::VerificationError, ), - #[error("Route can't be registered in the future")] - InvalidRouteTimestamp, + #[error("Key can't be registered in the future")] + InvalidKeyTimestamp, #[error("Record can't be registered in the future")] InvalidRecordTimestamp, #[error("Records to publish should belong to one key id")] @@ -77,4 +77,6 @@ pub enum ServiceError { ), #[error("Weight for record with peer_id {0} and set_by {1} is missing ")] MissingRecordWeight(String, String), + #[error("merge_keys: keys argument is empty")] + KeysArgumentEmpty, } diff --git a/service/src/route.rs b/service/src/key.rs similarity index 82% rename from service/src/route.rs rename to service/src/key.rs index c3b6f6c..443fa18 100644 --- a/service/src/route.rs +++ b/service/src/key.rs @@ -22,10 +22,10 @@ use sha2::{Digest, Sha256}; #[marine] #[derive(Default, Clone)] -pub struct Route { +pub struct Key { pub id: String, pub label: String, - pub peer_id: String, + pub owner_peer_id: String, pub timestamp_created: u64, pub challenge: Vec, pub challenge_type: String, @@ -33,28 +33,27 @@ pub struct Route { } #[derive(Default, Clone)] -pub struct RouteInternal { - pub route: Route, +pub struct KeyInternal { + pub key: Key, pub timestamp_published: u64, - pub pinned: bool, pub weight: u32, } -impl Route { +impl Key { pub fn new( label: String, - peer_id: String, + owner_peer_id: String, timestamp_created: u64, challenge: Vec, challenge_type: String, signature: Vec, ) -> Self { - let id = Self::get_id(&label, &peer_id); + let id = Self::get_id(&label, &owner_peer_id); Self { id, label, - peer_id, + owner_peer_id, timestamp_created, challenge, challenge_type, @@ -62,14 +61,14 @@ impl Route { } } - pub fn get_id(label: &str, peer_id: &str) -> String { - format!("{}{}", label, peer_id) + pub fn get_id(label: &str, owner_peer_id: &str) -> String { + format!("{}{}", label, owner_peer_id) } pub fn signature_bytes(&self) -> Vec { let mut metadata = Vec::new(); metadata.extend(self.label.as_bytes()); - metadata.extend(self.peer_id.as_bytes()); + metadata.extend(self.owner_peer_id.as_bytes()); metadata.extend(self.timestamp_created.to_le_bytes()); metadata.extend(&self.challenge); metadata.extend(self.challenge_type.as_bytes()); @@ -81,14 +80,14 @@ impl Route { pub fn verify(&self, current_timestamp_sec: u64) -> Result<(), ServiceError> { if self.timestamp_created > current_timestamp_sec { - return Err(ServiceError::InvalidRouteTimestamp); + return Err(ServiceError::InvalidKeyTimestamp); } self.verify_signature() } pub fn verify_signature(&self) -> Result<(), ServiceError> { - let pk = extract_public_key(self.peer_id.clone())?; + let pk = extract_public_key(self.owner_peer_id.clone())?; let bytes = self.signature_bytes(); let signature = Signature::from_bytes(pk.get_key_format(), self.signature.clone()); pk.verify(&bytes, &signature) diff --git a/service/src/route_api.rs b/service/src/key_api.rs similarity index 60% rename from service/src/route_api.rs rename to service/src/key_api.rs index 4d2ca75..606d373 100644 --- a/service/src/route_api.rs +++ b/service/src/key_api.rs @@ -14,25 +14,25 @@ * limitations under the License. */ use crate::error::ServiceError; +use crate::key::{Key, KeyInternal}; use crate::misc::check_weight_result; -use crate::results::{DhtResult, GetRouteMetadataResult, MergeRoutesResult, RegisterRouteResult}; -use crate::route::{Route, RouteInternal}; +use crate::results::{GetKeyMetadataResult, MergeKeysResult, RegisterKeyResult, RegistryResult}; use crate::storage_impl::get_storage; use crate::tetraplets_checkers::{check_timestamp_tetraplets, check_weight_tetraplets}; use crate::{wrapped_try, WeightResult}; use marine_rs_sdk::marine; #[marine] -pub fn get_route_bytes( +pub fn get_key_bytes( label: String, - mut peer_id: Vec, + mut owner_peer_id: Vec, timestamp_created: u64, challenge: Vec, challenge_type: String, ) -> Vec { - Route { + Key { label, - peer_id: peer_id + owner_peer_id: owner_peer_id .pop() .unwrap_or(marine_rs_sdk::get_call_parameters().init_peer_id), timestamp_created, @@ -44,110 +44,106 @@ pub fn get_route_bytes( } #[marine] -pub fn get_route_id(label: String, peer_id: String) -> String { - Route::get_id(&label, &peer_id) +pub fn get_key_id(label: String, peer_id: String) -> String { + Key::get_id(&label, &peer_id) } -/// register new route if not exists with caller peer_id, update if exists with same peer_id or return error +/// register new key if not exists with caller peer_id, update if exists with same peer_id or return error #[marine] -pub fn register_route( +pub fn register_key( label: String, - peer_id: Vec, + owner_peer_id: Vec, timestamp_created: u64, challenge: Vec, challenge_type: String, signature: Vec, - pin: bool, weight: WeightResult, current_timestamp_sec: u64, -) -> RegisterRouteResult { +) -> RegisterKeyResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_weight_tetraplets(&call_parameters, 7, 0)?; check_timestamp_tetraplets(&call_parameters, 8)?; - let peer_id = peer_id + let owner_peer_id = owner_peer_id .get(0) .unwrap_or(&call_parameters.init_peer_id) .clone(); - check_weight_result(&peer_id, &weight)?; - let route = Route::new( + check_weight_result(&owner_peer_id, &weight)?; + let key = Key::new( label, - peer_id, + owner_peer_id, timestamp_created, challenge, challenge_type, signature, ); - route.verify(current_timestamp_sec)?; + key.verify(current_timestamp_sec)?; - let route_id = route.id.clone(); + let key_id = key.id.clone(); let weight = weight.weight; let storage = get_storage()?; - storage.update_route_timestamp(&route.id, current_timestamp_sec)?; - storage.update_route(RouteInternal { - route, + storage.update_key_timestamp(&key.id, current_timestamp_sec)?; + storage.update_key(KeyInternal { + key, timestamp_published: 0, - pinned: pin, weight, })?; - Ok(route_id) + Ok(key_id) }) .into() } #[marine] -pub fn get_route_metadata(route_id: String, current_timestamp_sec: u64) -> GetRouteMetadataResult { +pub fn get_key_metadata(key_id: String, current_timestamp_sec: u64) -> GetKeyMetadataResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.update_route_timestamp(&route_id, current_timestamp_sec)?; - storage.get_route(route_id) + storage.update_key_timestamp(&key_id, current_timestamp_sec)?; + storage.get_key(key_id) }) .into() } -/// Used for replication, same as register_route, but route.pinned is ignored, updates timestamp_accessed +/// Used for replication, same as register_key, updates timestamp_accessed #[marine] -pub fn republish_route( - mut route: Route, +pub fn republish_key( + mut key: Key, weight: WeightResult, current_timestamp_sec: u64, -) -> DhtResult { +) -> RegistryResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_weight_tetraplets(&call_parameters, 1, 0)?; - check_weight_result(&route.peer_id, &weight)?; + check_weight_result(&key.owner_peer_id, &weight)?; check_timestamp_tetraplets(&call_parameters, 2)?; - route.verify(current_timestamp_sec)?; + key.verify(current_timestamp_sec)?; // just to be sure - route.id = Route::get_id(&route.label, &route.peer_id); + key.id = Key::get_id(&key.label, &key.owner_peer_id); let storage = get_storage()?; - storage.update_route_timestamp(&route.id, current_timestamp_sec)?; - match storage.update_route(RouteInternal { - route, + storage.update_key_timestamp(&key.id, current_timestamp_sec)?; + match storage.update_key(KeyInternal { + key: key, timestamp_published: 0, - pinned: false, weight: weight.weight, }) { // we should ignore this error for republish - Err(ServiceError::RouteAlreadyExistsNewerTimestamp(_, _)) => Ok(()), + Err(ServiceError::KeyAlreadyExistsNewerTimestamp(_, _)) => Ok(()), other => other, } }) .into() } -/// merge route and return the latest +/// merge key and return the latest #[marine] -pub fn merge_routes(routes: Vec) -> MergeRoutesResult { - routes - .into_iter() +pub fn merge_keys(keys: Vec) -> MergeKeysResult { + keys.into_iter() .max_by(|l, r| l.timestamp_created.cmp(&r.timestamp_created)) - .ok_or(ServiceError::InvalidRecordTimestamp) + .ok_or(ServiceError::KeysArgumentEmpty) .into() } diff --git a/service/src/key_storage_impl.rs b/service/src/key_storage_impl.rs new file mode 100644 index 0000000..096407b --- /dev/null +++ b/service/src/key_storage_impl.rs @@ -0,0 +1,231 @@ +/* + * Copyright 2021 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::defaults::{KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME}; + +use crate::error::ServiceError; +use crate::error::ServiceError::{InternalError, KeyNotExists}; +use crate::key::{Key, KeyInternal}; +use crate::storage_impl::Storage; +use marine_sqlite_connector::{State, Statement, Value}; + +impl Storage { + pub fn create_key_tables(&self) -> bool { + self.connection + .execute(f!(" + CREATE TABLE IF NOT EXISTS {KEYS_TABLE_NAME} ( + key_id TEXT PRIMARY KEY, + label TEXT, + owner_peer_id TEXT, + timestamp_created INTEGER, + challenge BLOB, + challenge_type TEXT, + signature BLOB NOT NULL, + timestamp_published INTEGER, + weight INTEGER + ); + ")) + .is_ok() + && self + .connection + .execute(f!(" + CREATE TABLE IF NOT EXISTS {KEYS_TIMESTAMPS_TABLE_NAME} ( + key_id TEXT PRIMARY KEY, + timestamp_accessed INTEGER + ); + ")) + .is_ok() + } + + pub fn update_key_timestamp( + &self, + key_id: &str, + current_timestamp_sec: u64, + ) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!(" + INSERT OR REPLACE INTO {KEYS_TIMESTAMPS_TABLE_NAME} VALUES (?, ?); + "))?; + + statement.bind(1, &Value::String(key_id.to_string()))?; + statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?; + statement.next()?; + Ok(()) + } + + pub fn get_key(&self, key_id: String) -> Result { + let mut statement = self.connection.prepare(f!( + "SELECT key_id, label, owner_peer_id, timestamp_created, challenge, challenge_type, signature \ + FROM {KEYS_TABLE_NAME} WHERE key_id = ?" + ))?; + statement.bind(1, &Value::String(key_id.clone()))?; + + if let State::Row = statement.next()? { + read_key(&statement) + } else { + Err(KeyNotExists(key_id)) + } + } + + pub fn write_key(&self, key: KeyInternal) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!(" + INSERT OR REPLACE INTO {KEYS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); + "))?; + + statement.bind(1, &Value::String(key.key.id))?; + statement.bind(2, &Value::String(key.key.label))?; + statement.bind(3, &Value::String(key.key.owner_peer_id))?; + statement.bind(4, &Value::Integer(key.key.timestamp_created as i64))?; + statement.bind(5, &Value::Binary(key.key.challenge))?; + statement.bind(6, &Value::String(key.key.challenge_type))?; + statement.bind(7, &Value::Binary(key.key.signature))?; + statement.bind(8, &Value::Integer(key.timestamp_published as i64))?; + statement.bind(9, &Value::Integer(key.weight as i64))?; + statement.next()?; + Ok(()) + } + + pub fn update_key(&self, key: KeyInternal) -> Result<(), ServiceError> { + if let Ok(existing_key) = self.get_key(key.key.id.clone()) { + if existing_key.timestamp_created > key.key.timestamp_created { + return Err(ServiceError::KeyAlreadyExistsNewerTimestamp( + key.key.label, + key.key.owner_peer_id, + )); + } + } + + self.write_key(key) + } + + pub fn check_key_existence(&self, key_id: &str) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT EXISTS(SELECT 1 FROM {KEYS_TABLE_NAME} WHERE key_id = ? LIMIT 1)" + ))?; + statement.bind(1, &Value::String(key_id.to_string()))?; + + if let State::Row = statement.next()? { + let exists = statement.read::(0)?; + if exists == 1 { + Ok(()) + } else { + Err(KeyNotExists(key_id.to_string())) + } + } else { + Err(InternalError( + "EXISTS should always return something".to_string(), + )) + } + } + + pub fn get_stale_keys(&self, stale_timestamp: u64) -> Result, ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT key_id, label, owner_peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, weight \ + FROM {KEYS_TABLE_NAME} WHERE timestamp_published <= ?" + ))?; + statement.bind(1, &Value::Integer(stale_timestamp as i64))?; + + let mut stale_keys: Vec = vec![]; + while let State::Row = statement.next()? { + stale_keys.push(read_internal_key(&statement)?); + } + + Ok(stale_keys) + } + + pub fn delete_key(&self, key_id: String) -> Result<(), ServiceError> { + let mut statement = self + .connection + .prepare(f!("DELETE FROM {KEYS_TABLE_NAME} WHERE key_id=?"))?; + statement.bind(1, &Value::String(key_id.clone()))?; + statement.next().map(drop)?; + + if self.connection.changes() == 1 { + Ok(()) + } else { + Err(KeyNotExists(key_id)) + } + } + + pub fn get_expired_keys(&self, expired_timestamp: u64) -> Result, ServiceError> { + let mut statement = self.connection.prepare(f!( + "SELECT key_id, label, owner_peer_id, timestamp_created, challenge, challenge_type, signature \ + FROM {KEYS_TABLE_NAME} WHERE timestamp_created <= ?" + ))?; + statement.bind(1, &Value::Integer(expired_timestamp as i64))?; + + let mut expired_keys: Vec = vec![]; + while let State::Row = statement.next()? { + let key = read_key(&statement)?; + let timestamp_accessed = self.get_key_timestamp_accessed(&key.id)?; + let with_host_records = self.get_host_records_count_by_key(key.id.clone())? != 0; + + if timestamp_accessed <= expired_timestamp && !with_host_records { + expired_keys.push(key); + } + } + + Ok(expired_keys) + } + + pub fn get_key_timestamp_accessed(&self, key_id: &str) -> Result { + let mut statement = self.connection.prepare(f!( + "SELECT timestamp_accessed FROM {KEYS_TIMESTAMPS_TABLE_NAME} WHERE key_id = ?" + ))?; + statement.bind(1, &Value::String(key_id.to_string()))?; + + if let State::Row = statement.next()? { + statement + .read::(0) + .map(|t| t as u64) + .map_err(ServiceError::SqliteError) + } else { + Err(KeyNotExists(key_id.to_string())) + } + } + + pub fn clear_expired_timestamps_accessed( + &self, + expired_timestamp: u64, + ) -> Result<(), ServiceError> { + let mut statement = self.connection.prepare(f!( + "DELETE FROM {KEYS_TIMESTAMPS_TABLE_NAME} WHERE timestamp_accessed < ?" + ))?; + statement.bind(1, &Value::Integer(expired_timestamp as i64))?; + statement.next().map(drop)?; + + Ok(()) + } +} + +pub fn read_key(statement: &Statement) -> Result { + Ok(Key { + id: statement.read::(0)?, + label: statement.read::(1)?, + owner_peer_id: statement.read::(2)?, + timestamp_created: statement.read::(3)? as u64, + challenge: statement.read::>(4)?, + challenge_type: statement.read::(5)?, + signature: statement.read::>(6)?, + }) +} + +pub fn read_internal_key(statement: &Statement) -> Result { + Ok(KeyInternal { + key: read_key(statement)?, + timestamp_published: statement.read::(7)? as u64, + weight: statement.read::(8)? as u32, + }) +} diff --git a/service/src/main.rs b/service/src/main.rs index 1c117c5..73f2715 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -24,14 +24,14 @@ use crate::tetraplets_checkers::check_timestamp_tetraplets; mod config; mod defaults; mod error; +mod key; +mod key_api; +mod key_storage_impl; mod misc; mod record; mod record_api; mod record_storage_impl; mod results; -mod route; -mod route_api; -mod route_storage_impl; mod storage_impl; mod tests; mod tetraplets_checkers; @@ -59,7 +59,7 @@ pub struct WeightResult { fn main() { let storage = get_storage().unwrap(); - storage.create_route_tables(); + storage.create_key_tables(); storage.create_values_table(); create_config(); } diff --git a/service/src/record.rs b/service/src/record.rs index c7df554..abeb692 100644 --- a/service/src/record.rs +++ b/service/src/record.rs @@ -23,7 +23,7 @@ use sha2::{Digest, Sha256}; #[marine] #[derive(Debug, Default, Clone)] pub struct Record { - pub route_id: String, + pub key_id: String, pub value: String, pub peer_id: String, pub set_by: String, @@ -43,7 +43,7 @@ pub struct RecordInternal { impl Record { pub fn signature_bytes(&self) -> Vec { let mut metadata = Vec::new(); - metadata.extend(self.route_id.as_bytes()); + metadata.extend(self.key_id.as_bytes()); metadata.extend(self.value.as_bytes()); metadata.extend(self.peer_id.as_bytes()); metadata.extend(self.set_by.as_bytes()); @@ -83,7 +83,7 @@ impl Record { let bytes = self.signature_bytes(); let signature = Signature::from_bytes(pk.get_key_format(), self.signature.clone()); pk.verify(&bytes, &signature).map_err(|e| { - ServiceError::InvalidRecordSignature(self.route_id.clone(), self.value.clone(), e) + ServiceError::InvalidRecordSignature(self.key_id.clone(), self.value.clone(), e) }) } } diff --git a/service/src/record_api.rs b/service/src/record_api.rs index c560de3..451a435 100644 --- a/service/src/record_api.rs +++ b/service/src/record_api.rs @@ -21,7 +21,7 @@ use crate::misc::check_weight_result; use crate::record::{Record, RecordInternal}; use crate::record_storage_impl::merge_records; use crate::results::{ - DhtResult, GetRecordsResult, MergeResult, PutHostRecordResult, RepublishRecordsResult, + GetRecordsResult, MergeResult, PutHostRecordResult, RegistryResult, RepublishRecordsResult, }; use crate::storage_impl::get_storage; use crate::tetraplets_checkers::{ @@ -32,7 +32,7 @@ use marine_rs_sdk::marine; #[marine] pub fn get_record_bytes( - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -41,7 +41,7 @@ pub fn get_record_bytes( ) -> Vec { let cp = marine_rs_sdk::get_call_parameters(); Record { - route_id, + key_id, value, peer_id: cp.init_peer_id.clone(), set_by: cp.init_peer_id, @@ -56,7 +56,7 @@ pub fn get_record_bytes( #[marine] pub fn put_record( - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -65,14 +65,14 @@ pub fn put_record( signature: Vec, weight: WeightResult, current_timestamp_sec: u64, -) -> DhtResult { +) -> RegistryResult { wrapped_try(|| { let cp = marine_rs_sdk::get_call_parameters(); check_weight_tetraplets(&cp, 7, 0)?; check_timestamp_tetraplets(&cp, 8)?; check_weight_result(&cp.init_peer_id, &weight)?; let record = Record { - route_id, + key_id, value, peer_id: cp.init_peer_id.clone(), set_by: cp.init_peer_id, @@ -85,7 +85,7 @@ pub fn put_record( record.verify(current_timestamp_sec)?; let storage = get_storage()?; - storage.check_route_existence(&record.route_id)?; + storage.check_key_existence(&record.key_id)?; storage .update_record( RecordInternal { @@ -101,7 +101,7 @@ pub fn put_record( #[marine] pub fn get_host_record_bytes( - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -110,7 +110,7 @@ pub fn get_host_record_bytes( ) -> Vec { let cp = marine_rs_sdk::get_call_parameters(); Record { - route_id, + key_id, value, peer_id: cp.host_id, set_by: cp.init_peer_id, @@ -124,7 +124,7 @@ pub fn get_host_record_bytes( } #[marine] pub fn put_host_record( - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -140,7 +140,7 @@ pub fn put_host_record( check_timestamp_tetraplets(&cp, 8)?; check_weight_result(&cp.init_peer_id, &weight)?; let record = Record { - route_id, + key_id, value, peer_id: cp.host_id, set_by: cp.init_peer_id, @@ -153,7 +153,7 @@ pub fn put_host_record( record.verify(current_timestamp_sec)?; let storage = get_storage()?; - storage.check_route_existence(&record.route_id)?; + storage.check_key_existence(&record.key_id)?; storage.update_record( RecordInternal { record: record.clone(), @@ -174,7 +174,7 @@ pub fn propagate_host_record( set_host_value: PutHostRecordResult, current_timestamp_sec: u64, weight: WeightResult, -) -> DhtResult { +) -> RegistryResult { wrapped_try(|| { if !set_host_value.success || set_host_value.record.len() != 1 { return Err(ServiceError::InvalidSetHostValueResult); @@ -191,12 +191,12 @@ pub fn propagate_host_record( let weight = weight.weight; let storage = get_storage()?; - storage.check_route_existence(&record.route_id)?; - storage.update_route_timestamp(&record.route_id, current_timestamp_sec)?; + storage.check_key_existence(&record.key_id)?; + storage.update_key_timestamp(&record.key_id, current_timestamp_sec)?; storage .merge_and_update_records( - record.route_id.clone(), + record.key_id.clone(), vec![RecordInternal { record, weight }], ) .map(|_| ()) @@ -206,15 +206,15 @@ pub fn propagate_host_record( /// Return all values by key #[marine] -pub fn get_records(route_id: String, current_timestamp_sec: u64) -> GetRecordsResult { +pub fn get_records(key_id: String, current_timestamp_sec: u64) -> GetRecordsResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.check_route_existence(&route_id)?; - storage.update_route_timestamp(&route_id, current_timestamp_sec)?; + storage.check_key_existence(&key_id)?; + storage.update_key_timestamp(&key_id, current_timestamp_sec)?; storage - .get_records(route_id) + .get_records(key_id) .map(|records| records.into_iter().map(|r| r.record).collect()) }) .into() @@ -232,7 +232,7 @@ pub fn republish_records( return Ok(0); } - let route_id = records[0].route_id.clone(); + let key_id = records[0].key_id.clone(); let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 2)?; let mut records_to_merge = vec![]; @@ -245,7 +245,7 @@ pub fn republish_records( record.set_by.clone(), ))?; check_weight_result(&record.set_by, weight_result)?; - if record.route_id != route_id { + if record.key_id != key_id { return Err(ServiceError::RecordsPublishingError); } @@ -256,29 +256,29 @@ pub fn republish_records( } let storage = get_storage()?; - storage.check_route_existence(&route_id)?; - storage.update_route_timestamp(&route_id, current_timestamp_sec)?; - storage.merge_and_update_records(route_id, records_to_merge) + storage.check_key_existence(&key_id)?; + storage.update_key_timestamp(&key_id, current_timestamp_sec)?; + storage.merge_and_update_records(key_id, records_to_merge) }) .into() } /// Remove host value by key and caller peer_id #[marine] -pub fn clear_host_record(route_id: String, current_timestamp_sec: u64) -> DhtResult { +pub fn clear_host_record(key_id: String, current_timestamp_sec: u64) -> RegistryResult { wrapped_try(|| { let call_parameters = marine_rs_sdk::get_call_parameters(); check_timestamp_tetraplets(&call_parameters, 1)?; let storage = get_storage()?; - storage.check_route_existence(&route_id)?; - storage.update_route_timestamp(&route_id, current_timestamp_sec)?; + storage.check_key_existence(&key_id)?; + storage.update_key_timestamp(&key_id, current_timestamp_sec)?; let peer_id = call_parameters.host_id; let set_by = call_parameters.init_peer_id; - let deleted = storage.delete_record(route_id.clone(), peer_id, set_by)?; + let deleted = storage.delete_record(key_id.clone(), peer_id, set_by)?; - deleted.as_result((), ServiceError::HostValueNotFound(route_id)) + deleted.as_result((), ServiceError::HostValueNotFound(key_id)) }) .into() } diff --git a/service/src/record_storage_impl.rs b/service/src/record_storage_impl.rs index 959bceb..26e0b1c 100644 --- a/service/src/record_storage_impl.rs +++ b/service/src/record_storage_impl.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; -use crate::defaults::{RECORDS_TABLE_NAME, VALUES_LIMIT}; +use crate::defaults::{RECORDS_LIMIT, RECORDS_TABLE_NAME}; use crate::error::ServiceError; use crate::error::ServiceError::InternalError; use crate::record::{Record, RecordInternal}; @@ -28,7 +28,7 @@ impl Storage { self.connection .execute(f!(" CREATE TABLE IF NOT EXISTS {RECORDS_TABLE_NAME} ( - route_id TEXT, + key_id TEXT, value TEXT, peer_id TEXT, set_by TEXT, @@ -38,7 +38,7 @@ impl Storage { solution BLOB, signature BLOB NOT NULL, weight INTEGER, - PRIMARY KEY (route_id, peer_id, set_by) + PRIMARY KEY (key_id, peer_id, set_by) ); ")) .is_ok() @@ -47,13 +47,12 @@ impl Storage { /// Put value with caller peer_id if the key exists. /// If the value is NOT a host value and the key already has `VALUES_LIMIT` records, then a value with the smallest weight is removed and the new value is inserted instead. pub fn update_record(&self, record: RecordInternal, host: bool) -> Result<(), ServiceError> { - let records_count = - self.get_non_host_records_count_by_key(record.record.route_id.clone())?; + let records_count = self.get_non_host_records_count_by_key(record.record.key_id.clone())?; // check values limits for non-host values - if !host && records_count >= VALUES_LIMIT { + if !host && records_count >= RECORDS_LIMIT { let min_weight_record = - self.get_min_weight_non_host_record_by_key(record.record.route_id.clone())?; + self.get_min_weight_non_host_record_by_key(record.record.key_id.clone())?; if min_weight_record.weight < record.weight || (min_weight_record.weight == record.weight @@ -61,13 +60,13 @@ impl Storage { { // delete the lightest record if the new one is heavier or newer self.delete_record( - min_weight_record.record.route_id, + min_weight_record.record.key_id, min_weight_record.record.peer_id, min_weight_record.record.set_by, )?; } else { // return error if limit is exceeded - return Err(ServiceError::ValuesLimitExceeded(record.record.route_id)); + return Err(ServiceError::ValuesLimitExceeded(record.record.key_id)); } } @@ -80,7 +79,7 @@ impl Storage { "INSERT OR REPLACE INTO {RECORDS_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ))?; - statement.bind(1, &Value::String(record.record.route_id))?; + statement.bind(1, &Value::String(record.record.key_id))?; statement.bind(2, &Value::String(record.record.value))?; statement.bind(3, &Value::String(record.record.peer_id))?; statement.bind(4, &Value::String(record.record.set_by))?; @@ -103,14 +102,14 @@ impl Storage { pub fn delete_record( &self, - route_id: String, + key_id: String, peer_id: String, set_by: String, ) -> Result { let mut statement = self.connection.prepare(f!( - "DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id=? AND peer_id=? AND set_by=?" + "DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id=? AND peer_id=? AND set_by=?" ))?; - statement.bind(1, &Value::String(route_id))?; + statement.bind(1, &Value::String(key_id))?; statement.bind(2, &Value::String(peer_id))?; statement.bind(3, &Value::String(set_by))?; statement.next().map(drop)?; @@ -120,23 +119,23 @@ impl Storage { fn get_min_weight_non_host_record_by_key( &self, - route_id: String, + key_id: String, ) -> Result { let host_id = marine_rs_sdk::get_call_parameters().host_id; // only only non-host values let mut statement = self.connection.prepare( - f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \ - WHERE route_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?; + f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, signature, weight FROM {RECORDS_TABLE_NAME} \ + WHERE key_id = ? AND peer_id != ? ORDER BY weight ASC LIMIT 1"))?; - statement.bind(1, &Value::String(route_id.clone()))?; + statement.bind(1, &Value::String(key_id.clone()))?; statement.bind(2, &Value::String(host_id))?; if let State::Row = statement.next()? { read_record(&statement) } else { Err(InternalError(f!( - "not found non-host records for given route_id: {route_id}" + "not found non-host records for given key_id: {key_id}" ))) } } @@ -146,7 +145,7 @@ impl Storage { // only only non-host values let mut statement = self.connection.prepare(f!( - "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id != ?" + "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id != ?" ))?; statement.bind(1, &Value::String(key))?; statement.bind(2, &Value::String(host_id))?; @@ -163,14 +162,14 @@ impl Storage { } } - pub fn get_host_records_count_by_key(&self, route_id: String) -> Result { + pub fn get_host_records_count_by_key(&self, key_id: String) -> Result { let host_id = marine_rs_sdk::get_call_parameters().host_id; // only only non-host values let mut statement = self.connection.prepare(f!( - "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE route_id = ? AND peer_id = ?" + "SELECT COUNT(*) FROM {RECORDS_TABLE_NAME} WHERE key_id = ? AND peer_id = ?" ))?; - statement.bind(1, &Value::String(route_id))?; + statement.bind(1, &Value::String(key_id))?; statement.bind(2, &Value::String(host_id))?; if let State::Row = statement.next()? { @@ -187,11 +186,11 @@ impl Storage { pub fn merge_and_update_records( &self, - route_id: String, + key_id: String, records: Vec, ) -> Result { let records = merge_records( - self.get_records(route_id)? + self.get_records(key_id)? .into_iter() .chain(records.into_iter()) .collect(), @@ -206,11 +205,11 @@ impl Storage { Ok(updated) } - pub fn get_records(&self, route_id: String) -> Result, ServiceError> { + pub fn get_records(&self, key_id: String) -> Result, ServiceError> { let mut statement = self.connection.prepare( - f!("SELECT route_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \ - WHERE route_id = ? ORDER BY weight DESC"))?; - statement.bind(1, &Value::String(route_id))?; + f!("SELECT key_id, value, peer_id, set_by, relay_id, service_id, timestamp_created, solution, signature, weight FROM {RECORDS_TABLE_NAME} \ + WHERE key_id = ? ORDER BY weight DESC"))?; + statement.bind(1, &Value::String(key_id))?; let mut result: Vec = vec![]; @@ -231,12 +230,12 @@ impl Storage { } /// except host records and for pinned keys - pub fn delete_records_by_key(&self, route_id: String) -> Result { + pub fn delete_records_by_key(&self, key_id: String) -> Result { let mut statement = self .connection - .prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE route_id = ?"))?; + .prepare(f!("DELETE FROM {RECORDS_TABLE_NAME} WHERE key_id = ?"))?; - statement.bind(1, &Value::String(route_id))?; + statement.bind(1, &Value::String(key_id))?; statement.next().map(drop)?; Ok(self.connection.changes() as u64) @@ -246,7 +245,7 @@ impl Storage { pub fn read_record(statement: &Statement) -> Result { Ok(RecordInternal { record: Record { - route_id: statement.read::(0)?, + key_id: statement.read::(0)?, value: statement.read::(1)?, peer_id: statement.read::(2)?, set_by: statement.read::(3)?, diff --git a/service/src/results.rs b/service/src/results.rs index 1bc9b32..9f8572e 100644 --- a/service/src/results.rs +++ b/service/src/results.rs @@ -15,18 +15,18 @@ */ use crate::error::ServiceError; +use crate::key::Key; use crate::record::Record; -use crate::route::Route; use marine_rs_sdk::marine; #[marine] #[derive(Debug)] -pub struct DhtResult { +pub struct RegistryResult { pub success: bool, pub error: String, } -impl From> for DhtResult { +impl From> for RegistryResult { fn from(result: Result<(), ServiceError>) -> Self { match result { Ok(_) => Self { @@ -43,24 +43,24 @@ impl From> for DhtResult { #[marine] #[derive(Debug)] -pub struct RegisterRouteResult { +pub struct RegisterKeyResult { pub success: bool, pub error: String, - pub route_id: String, + pub key_id: String, } -impl From> for RegisterRouteResult { +impl From> for RegisterKeyResult { fn from(result: Result) -> Self { match result { - Ok(route_id) => Self { + Ok(key_id) => Self { success: true, error: "".to_string(), - route_id, + key_id, }, Err(err) => Self { success: false, error: err.to_string(), - route_id: "".to_string(), + key_id: "".to_string(), }, } } @@ -96,22 +96,22 @@ impl From, ServiceError>> for GetRecordsResult { pub struct ClearExpiredResult { pub success: bool, pub error: String, - pub count_routes: u64, + pub count_keys: u64, pub count_records: u64, } impl From> for ClearExpiredResult { fn from(result: Result<(u64, u64), ServiceError>) -> Self { match result { - Ok((count_routes, count_records)) => Self { + Ok((count_keys, count_records)) => Self { success: true, - count_routes, + count_keys, count_records, error: "".to_string(), }, Err(err) => Self { success: false, - count_routes: 0, + count_keys: 0, count_records: 0, error: err.to_string(), }, @@ -145,24 +145,24 @@ impl From, ServiceError>> for GetStaleRecordsResult { } #[marine] -pub struct GetRouteMetadataResult { +pub struct GetKeyMetadataResult { pub success: bool, pub error: String, - pub route: Route, + pub key: Key, } -impl From> for GetRouteMetadataResult { - fn from(result: Result) -> Self { +impl From> for GetKeyMetadataResult { + fn from(result: Result) -> Self { match result { - Ok(route) => Self { + Ok(key) => Self { success: true, error: "".to_string(), - route, + key, }, Err(err) => Self { success: false, error: err.to_string(), - route: Route::default(), + key: Key::default(), }, } } @@ -194,7 +194,7 @@ impl From> for RepublishRecordsResult { #[marine] pub struct EvictStaleItem { - pub route: Route, + pub key: Key, pub records: Vec, } @@ -272,24 +272,24 @@ impl From, ServiceError>> for MergeResult { } #[marine] -pub struct MergeRoutesResult { +pub struct MergeKeysResult { pub success: bool, pub error: String, - pub route: Route, + pub key: Key, } -impl From> for MergeRoutesResult { - fn from(result: Result) -> Self { +impl From> for MergeKeysResult { + fn from(result: Result) -> Self { match result { - Ok(route) => Self { + Ok(key) => Self { success: true, error: "".to_string(), - route, + key, }, Err(err) => Self { success: false, error: err.to_string(), - route: Route::default(), + key: Key::default(), }, } } diff --git a/service/src/route_storage_impl.rs b/service/src/route_storage_impl.rs deleted file mode 100644 index 1493cfa..0000000 --- a/service/src/route_storage_impl.rs +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright 2021 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::defaults::{ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME}; - -use crate::error::ServiceError; -use crate::error::ServiceError::{InternalError, RouteNotExists}; -use crate::route::{Route, RouteInternal}; -use crate::storage_impl::Storage; -use marine_sqlite_connector::{State, Statement, Value}; - -impl Storage { - pub fn create_route_tables(&self) -> bool { - self.connection - .execute(f!(" - CREATE TABLE IF NOT EXISTS {ROUTES_TABLE_NAME} ( - route_id TEXT PRIMARY KEY, - label TEXT, - peer_id TEXT, - timestamp_created INTEGER, - challenge BLOB, - challenge_type TEXT, - signature BLOB NOT NULL, - timestamp_published INTEGER, - pinned INTEGER, - weight INTEGER - ); - ")) - .is_ok() - && self - .connection - .execute(f!(" - CREATE TABLE IF NOT EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME} ( - route_id TEXT PRIMARY KEY, - timestamp_accessed INTEGER - ); - ")) - .is_ok() - } - - pub fn update_route_timestamp( - &self, - route_id: &str, - current_timestamp_sec: u64, - ) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!(" - INSERT OR REPLACE INTO {ROUTES_TIMESTAMPS_TABLE_NAME} VALUES (?, ?); - "))?; - - statement.bind(1, &Value::String(route_id.to_string()))?; - statement.bind(2, &Value::Integer(current_timestamp_sec as i64))?; - statement.next()?; - Ok(()) - } - - pub fn get_route(&self, route_id: String) -> Result { - let mut statement = self.connection.prepare(f!( - "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \ - FROM {ROUTES_TABLE_NAME} WHERE route_id = ?" - ))?; - statement.bind(1, &Value::String(route_id.clone()))?; - - if let State::Row = statement.next()? { - read_route(&statement) - } else { - Err(RouteNotExists(route_id)) - } - } - - pub fn write_route(&self, route: RouteInternal) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!(" - INSERT OR REPLACE INTO {ROUTES_TABLE_NAME} VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); - "))?; - - let pinned = if route.pinned { 1 } else { 0 } as i64; - statement.bind(1, &Value::String(route.route.id))?; - statement.bind(2, &Value::String(route.route.label))?; - statement.bind(3, &Value::String(route.route.peer_id))?; - statement.bind(4, &Value::Integer(route.route.timestamp_created as i64))?; - statement.bind(5, &Value::Binary(route.route.challenge))?; - statement.bind(6, &Value::String(route.route.challenge_type))?; - statement.bind(7, &Value::Binary(route.route.signature))?; - statement.bind(8, &Value::Integer(route.timestamp_published as i64))?; - statement.bind(9, &Value::Integer(pinned))?; - statement.bind(10, &Value::Integer(route.weight as i64))?; - statement.next()?; - Ok(()) - } - - pub fn update_route(&self, route: RouteInternal) -> Result<(), ServiceError> { - if let Ok(existing_route) = self.get_route(route.route.id.clone()) { - if existing_route.timestamp_created > route.route.timestamp_created { - return Err(ServiceError::RouteAlreadyExistsNewerTimestamp( - route.route.label, - route.route.peer_id, - )); - } - } - - self.write_route(route) - } - - pub fn check_route_existence(&self, route_id: &str) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT EXISTS(SELECT 1 FROM {ROUTES_TABLE_NAME} WHERE route_id = ? LIMIT 1)" - ))?; - statement.bind(1, &Value::String(route_id.to_string()))?; - - if let State::Row = statement.next()? { - let exists = statement.read::(0)?; - if exists == 1 { - Ok(()) - } else { - Err(RouteNotExists(route_id.to_string())) - } - } else { - Err(InternalError( - "EXISTS should always return something".to_string(), - )) - } - } - - pub fn get_stale_routes( - &self, - stale_timestamp: u64, - ) -> Result, ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature, timestamp_published, pinned, weight \ - FROM {ROUTES_TABLE_NAME} WHERE timestamp_published <= ?" - ))?; - statement.bind(1, &Value::Integer(stale_timestamp as i64))?; - - let mut stale_keys: Vec = vec![]; - while let State::Row = statement.next()? { - stale_keys.push(read_internal_route(&statement)?); - } - - Ok(stale_keys) - } - - pub fn delete_key(&self, route_id: String) -> Result<(), ServiceError> { - let mut statement = self - .connection - .prepare(f!("DELETE FROM {ROUTES_TABLE_NAME} WHERE route_id=?"))?; - statement.bind(1, &Value::String(route_id.clone()))?; - statement.next().map(drop)?; - - if self.connection.changes() == 1 { - Ok(()) - } else { - Err(RouteNotExists(route_id)) - } - } - - /// not pinned only - pub fn get_expired_routes(&self, expired_timestamp: u64) -> Result, ServiceError> { - let mut statement = self.connection.prepare(f!( - "SELECT route_id, label, peer_id, timestamp_created, challenge, challenge_type, signature \ - FROM {ROUTES_TABLE_NAME} WHERE timestamp_created <= ? and pinned != 1" - ))?; - statement.bind(1, &Value::Integer(expired_timestamp as i64))?; - - let mut expired_routes: Vec = vec![]; - while let State::Row = statement.next()? { - let route = read_route(&statement)?; - let timestamp_accessed = self.get_route_timestamp_accessed(&route.id)?; - let with_host_records = self.get_host_records_count_by_key(route.id.clone())? != 0; - - if timestamp_accessed <= expired_timestamp && !with_host_records { - expired_routes.push(route); - } - } - - Ok(expired_routes) - } - - pub fn get_route_timestamp_accessed(&self, route_id: &str) -> Result { - let mut statement = self.connection.prepare(f!( - "SELECT timestamp_accessed FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE route_id = ?" - ))?; - statement.bind(1, &Value::String(route_id.to_string()))?; - - if let State::Row = statement.next()? { - statement - .read::(0) - .map(|t| t as u64) - .map_err(ServiceError::SqliteError) - } else { - Err(RouteNotExists(route_id.to_string())) - } - } - - pub fn clear_expired_timestamps_accessed( - &self, - expired_timestamp: u64, - ) -> Result<(), ServiceError> { - let mut statement = self.connection.prepare(f!( - "DELETE FROM {ROUTES_TIMESTAMPS_TABLE_NAME} WHERE timestamp_accessed < ?" - ))?; - statement.bind(1, &Value::Integer(expired_timestamp as i64))?; - statement.next().map(drop)?; - - Ok(()) - } -} - -pub fn read_route(statement: &Statement) -> Result { - Ok(Route { - id: statement.read::(0)?, - label: statement.read::(1)?, - peer_id: statement.read::(2)?, - timestamp_created: statement.read::(3)? as u64, - challenge: statement.read::>(4)?, - challenge_type: statement.read::(5)?, - signature: statement.read::>(6)?, - }) -} - -pub fn read_internal_route(statement: &Statement) -> Result { - Ok(RouteInternal { - route: read_route(statement)?, - timestamp_published: statement.read::(7)? as u64, - pinned: statement.read::(8)? != 0, - weight: statement.read::(9)? as u32, - }) -} diff --git a/service/src/storage_impl.rs b/service/src/storage_impl.rs index 4b974db..63f5a9c 100644 --- a/service/src/storage_impl.rs +++ b/service/src/storage_impl.rs @@ -59,7 +59,7 @@ impl Storage { // delete expired non-host records deleted_values += self.clear_expired_records(expired_timestamp)?; - let expired_keys = self.get_expired_routes(expired_timestamp)?; + let expired_keys = self.get_expired_keys(expired_timestamp)?; for key in expired_keys { self.delete_key(key.id)?; @@ -71,7 +71,7 @@ impl Storage { Ok((deleted_keys, deleted_values)) } - /// Delete all stale keys and values except for pinned keys and host values. + /// Delete all stale keys and records except host records. /// Stale means that `timestamp_accessed` has surpassed `stale_timeout`. /// Returns all deleted items pub fn evict_stale( @@ -80,30 +80,30 @@ impl Storage { ) -> Result, ServiceError> { let stale_timestamp = current_timestamp_sec - load_config().stale_timeout; - let stale_keys = self.get_stale_routes(stale_timestamp)?; + let stale_keys = self.get_stale_keys(stale_timestamp)?; let mut key_to_delete: Vec = vec![]; let mut results: Vec = vec![]; let host_id = marine_rs_sdk::get_call_parameters().host_id; - for route in stale_keys.into_iter() { + for key in stale_keys.into_iter() { let records: Vec = self - .get_records(route.route.id.clone())? + .get_records(key.key.id.clone())? .into_iter() .map(|r| r.record) .collect(); - if !route.pinned && !records.iter().any(|r| r.peer_id == host_id) { - key_to_delete.push(route.route.id.clone()); + if !records.iter().any(|r| r.peer_id == host_id) { + key_to_delete.push(key.key.id.clone()); } results.push(EvictStaleItem { - route: route.route, + key: key.key, records, }); } - for route_id in key_to_delete { - self.delete_key(route_id.clone())?; - self.delete_records_by_key(route_id)?; + for key_id in key_to_delete { + self.delete_key(key_id.clone())?; + self.delete_records_by_key(key_id)?; } Ok(results) diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index e3f9b0e..2c9c4fe 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -21,44 +21,44 @@ mod tests { use marine_rs_sdk::{CallParameters, SecurityTetraplet}; use rusqlite::Connection; marine_rs_sdk_test::include_test_env!("/marine_test_env.rs"); - use marine_test_env::registry::{DhtResult, PutHostRecordResult, Record, ServiceInterface}; + use marine_test_env::registry::{ + PutHostRecordResult, Record, RegistryResult, ServiceInterface, + }; use crate::defaults::{ - CONFIG_FILE, DB_PATH, RECORDS_TABLE_NAME, ROUTES_TABLE_NAME, ROUTES_TIMESTAMPS_TABLE_NAME, + CONFIG_FILE, DB_PATH, KEYS_TABLE_NAME, KEYS_TIMESTAMPS_TABLE_NAME, RECORDS_TABLE_NAME, TRUSTED_TIMESTAMP_FUNCTION_NAME, TRUSTED_TIMESTAMP_SERVICE_ID, TRUSTED_WEIGHT_FUNCTION_NAME, TRUSTED_WEIGHT_SERVICE_ID, }; use crate::error::ServiceError::{ - InvalidRouteTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId, - RouteAlreadyExistsNewerTimestamp, - }; - use crate::tests::tests::marine_test_env::registry::{ - RegisterRouteResult, Route, WeightResult, + InvalidKeyTimestamp, InvalidTimestampTetraplet, InvalidWeightPeerId, + KeyAlreadyExistsNewerTimestamp, }; + use crate::tests::tests::marine_test_env::registry::{Key, RegisterKeyResult, WeightResult}; const HOST_ID: &str = "some_host_id"; - impl PartialEq for Route { + impl PartialEq for Key { fn eq(&self, other: &Self) -> bool { self.id == other.id && self.label == other.label && self.timestamp_created == other.timestamp_created && self.signature == other.signature - && self.peer_id == other.peer_id + && self.owner_peer_id == other.owner_peer_id } } - impl Eq for Route {} + impl Eq for Key {} fn clear_env() { let connection = Connection::open(DB_PATH).unwrap(); connection - .execute(f!("DROP TABLE IF EXISTS {ROUTES_TABLE_NAME}").as_str(), []) + .execute(f!("DROP TABLE IF EXISTS {KEYS_TABLE_NAME}").as_str(), []) .unwrap(); connection .execute( - f!("DROP TABLE IF EXISTS {ROUTES_TIMESTAMPS_TABLE_NAME}").as_str(), + f!("DROP TABLE IF EXISTS {KEYS_TIMESTAMPS_TABLE_NAME}").as_str(), [], ) .unwrap(); @@ -146,7 +146,7 @@ mod tests { } } - fn get_signed_route_bytes( + fn get_signed_key_bytes( registry: &mut ServiceInterface, kp: &KeyPair, label: String, @@ -155,29 +155,28 @@ mod tests { challenge_type: String, ) -> Vec { let issuer_peer_id = kp.get_peer_id().to_base58(); - let route_bytes = registry.get_route_bytes( + let key_bytes = registry.get_key_bytes( label.clone(), vec![issuer_peer_id.clone()], timestamp_created, challenge, challenge_type, ); - kp.sign(&route_bytes).unwrap().to_vec().to_vec() + kp.sign(&key_bytes).unwrap().to_vec().to_vec() } - fn register_route( + fn register_key( registry: &mut ServiceInterface, kp: &KeyPair, label: String, timestamp_created: u64, current_timestamp: u64, - pin: bool, weight: u32, - ) -> RegisterRouteResult { + ) -> RegisterKeyResult { let issuer_peer_id = kp.get_peer_id().to_base58(); let challenge = vec![]; let challenge_type = "".to_string(); - let signature = get_signed_route_bytes( + let signature = get_signed_key_bytes( registry, kp, label.clone(), @@ -189,57 +188,54 @@ mod tests { .add_weight_tetraplets(7) .add_timestamp_tetraplets(8); let weight = get_weight(issuer_peer_id.clone(), weight); - registry.register_route_cp( + registry.register_key_cp( label, vec![issuer_peer_id], timestamp_created, challenge, challenge_type, signature, - pin, weight, current_timestamp, cp.get(), ) } - fn register_route_checked( + fn register_key_checked( registry: &mut ServiceInterface, kp: &KeyPair, - route: String, + key: String, timestamp_created: u64, current_timestamp: u64, - pin: bool, weight: u32, ) -> String { - let result = register_route( + let result = register_key( registry, kp, - route, + key, timestamp_created, current_timestamp, - pin, weight, ); assert!(result.success, "{}", result.error); - result.route_id + result.key_id } - fn get_route_metadata( + fn get_key_metadata( registry: &mut ServiceInterface, - route_id: String, + key_id: String, current_timestamp: u64, - ) -> Route { + ) -> Key { let cp = CPWrapper::new("peer_id").add_timestamp_tetraplets(1); - let result = registry.get_route_metadata_cp(route_id, current_timestamp, cp.get()); + let result = registry.get_key_metadata_cp(key_id, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); - result.route + result.key } fn get_signed_record_bytes( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -249,7 +245,7 @@ mod tests { let issuer_peer_id = kp.get_peer_id().to_base58(); let cp = CPWrapper::new(&issuer_peer_id); let record_bytes = registry.get_record_bytes_cp( - route_id, + key_id, value, relay_id, service_id, @@ -264,21 +260,21 @@ mod tests { fn put_record( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, timestamp_created: u64, current_timestamp: u64, weight: u32, - ) -> DhtResult { + ) -> RegistryResult { let issuer_peer_id = kp.get_peer_id().to_base58(); let solution = vec![]; let signature = get_signed_record_bytes( registry, kp, - route_id.clone(), + key_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -291,7 +287,7 @@ mod tests { .add_timestamp_tetraplets(8); let weight = get_weight(issuer_peer_id.clone(), weight); registry.put_record_cp( - route_id, + key_id, value, relay_id, service_id, @@ -307,7 +303,7 @@ mod tests { fn put_record_checked( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -318,7 +314,7 @@ mod tests { let result = put_record( registry, kp, - route_id, + key_id, value, relay_id, service_id, @@ -332,7 +328,7 @@ mod tests { fn get_signed_host_record_bytes( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -342,7 +338,7 @@ mod tests { let issuer_peer_id = kp.get_peer_id().to_base58(); let cp = CPWrapper::new(&issuer_peer_id); let record_bytes = registry.get_host_record_bytes_cp( - route_id, + key_id, value, relay_id, service_id, @@ -357,7 +353,7 @@ mod tests { fn put_host_record( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -371,7 +367,7 @@ mod tests { let signature = get_signed_host_record_bytes( registry, kp, - route_id.clone(), + key_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -384,7 +380,7 @@ mod tests { .add_timestamp_tetraplets(8); let weight = get_weight(issuer_peer_id.clone(), weight); registry.put_host_record_cp( - route_id, + key_id, value, relay_id, service_id, @@ -400,7 +396,7 @@ mod tests { fn put_host_record_checked( registry: &mut ServiceInterface, kp: &KeyPair, - route_id: String, + key_id: String, value: String, relay_id: Vec, service_id: Vec, @@ -411,7 +407,7 @@ mod tests { let result = put_host_record( registry, kp, - route_id, + key_id, value, relay_id, service_id, @@ -424,18 +420,18 @@ mod tests { fn get_records( registry: &mut ServiceInterface, - route_id: String, + key_id: String, current_timestamp: u64, ) -> Vec { let cp = CPWrapper::new("some_peer_id").add_timestamp_tetraplets(1); - let result = registry.get_records_cp(route_id, current_timestamp, cp.get()); + let result = registry.get_records_cp(key_id, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); result.result } #[test] - fn register_route_invalid_signature() { + fn register_key_invalid_signature() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); @@ -446,36 +442,35 @@ mod tests { let invalid_signature = vec![]; cp = cp.add_weight_tetraplets(5).add_timestamp_tetraplets(6); - let reg_route_result = registry.register_route_cp( - "some_route".to_string(), + let reg_key_result = registry.register_key_cp( + "some_key".to_string(), vec![], 100u64, vec![], "".to_string(), invalid_signature, - false, weight, 10u64, cp.get(), ); - assert!(!reg_route_result.success); + assert!(!reg_key_result.success); } #[test] - fn register_route_invalid_weight_tetraplet() { + fn register_key_invalid_weight_tetraplet() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); let mut cp = CPWrapper::new(&issuer_peer_id); - let label = "some_route".to_string(); + let label = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let challenge = vec![]; let challenge_type = "".to_string(); let weight = get_weight(issuer_peer_id.clone(), 0); - let signature = get_signed_route_bytes( + let signature = get_signed_key_bytes( &mut registry, &kp, label.clone(), @@ -485,35 +480,34 @@ mod tests { ); cp = cp.add_timestamp_tetraplets(8); - let reg_route_result = registry.register_route_cp( + let reg_key_result = registry.register_key_cp( label, vec![], timestamp_created, challenge, challenge_type, signature, - false, weight, current_timestamp, cp.get(), ); - assert!(!reg_route_result.success); + assert!(!reg_key_result.success); } #[test] - fn register_route_missing_timestamp_tetraplet() { + fn register_key_missing_timestamp_tetraplet() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let label = "some_route".to_string(); + let label = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = get_weight(issuer_peer_id.clone(), 0); let challenge = vec![1u8, 2u8, 3u8]; let challenge_type = "type".to_string(); - let signature = get_signed_route_bytes( + let signature = get_signed_key_bytes( &mut registry, &kp, label.clone(), @@ -523,41 +517,40 @@ mod tests { ); let cp = CPWrapper::new(&issuer_peer_id).add_weight_tetraplets(7); - let reg_route_result = registry.register_route_cp( + let reg_key_result = registry.register_key_cp( label, vec![], timestamp_created, challenge, challenge_type, signature, - false, weight, current_timestamp, cp.get(), ); - assert!(!reg_route_result.success); + assert!(!reg_key_result.success); assert_eq!( - reg_route_result.error, + reg_key_result.error, InvalidTimestampTetraplet(format!("{:?}", cp.cp.tetraplets)).to_string() ); } #[test] - fn register_route_invalid_weight_peer_id() { + fn register_key_invalid_weight_peer_id() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); let invalid_peer_id = "INVALID_PEER_ID".to_string(); let mut cp = CPWrapper::new(&issuer_peer_id); - let label = "some_route".to_string(); + let label = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let challenge = vec![1u8, 2u8, 3u8]; let challenge_type = "type".to_string(); let weight = get_weight(invalid_peer_id.clone(), 0); - let signature = get_signed_route_bytes( + let signature = get_signed_key_bytes( &mut registry, &kp, label.clone(), @@ -567,43 +560,40 @@ mod tests { ); cp = cp.add_weight_tetraplets(7).add_timestamp_tetraplets(8); - let reg_route_result = registry.register_route_cp( + let reg_key_result = registry.register_key_cp( label, vec![], timestamp_created, challenge, challenge_type, signature, - false, weight, current_timestamp, cp.get(), ); - assert!(!reg_route_result.success); + assert!(!reg_key_result.success); assert_eq!( - reg_route_result.error, + reg_key_result.error, InvalidWeightPeerId(issuer_peer_id, invalid_peer_id).to_string() ); } #[test] - fn register_route_correct() { + fn register_key_correct() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; - let result = register_route( + let result = register_key( &mut registry, &kp, - route, + key, timestamp_created, current_timestamp, - pin, weight, ); @@ -611,191 +601,179 @@ mod tests { } #[test] - fn register_route_older_timestamp() { + fn register_key_older_timestamp() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created_first = 100u64; let current_timestamp = 1000u64; let weight = 0; - let pin = false; - register_route_checked( + register_key_checked( &mut registry, &kp, - route.clone(), + key.clone(), timestamp_created_first, current_timestamp, - pin, weight, ); let timestamp_created_second = timestamp_created_first - 10u64; - let result_second = register_route( + let result_second = register_key( &mut registry, &kp, - route.clone(), + key.clone(), timestamp_created_second, current_timestamp, - pin, weight, ); assert_eq!( result_second.error, - RouteAlreadyExistsNewerTimestamp(route, kp.get_peer_id().to_base58()).to_string() + KeyAlreadyExistsNewerTimestamp(key, kp.get_peer_id().to_base58()).to_string() ); } #[test] - fn register_route_in_the_future() { + fn register_key_in_the_future() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let current_timestamp = 100u64; let timestamp_created = current_timestamp + 100u64; let weight = 0; - let pin = false; - let result = register_route( + let result = register_key( &mut registry, &kp, - route, + key, timestamp_created, current_timestamp, - pin, weight, ); - assert_eq!(result.error, InvalidRouteTimestamp.to_string()) + assert_eq!(result.error, InvalidKeyTimestamp.to_string()) } #[test] - fn register_route_update_republish_old() { + fn register_key_update_republish_old() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created_old = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; - let route_id = register_route_checked( + let key_id = register_key_checked( &mut registry, &kp, - route.clone(), + key.clone(), timestamp_created_old, current_timestamp, - pin, weight, ); - let old_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); + let old_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let timestamp_created_new = timestamp_created_old + 10u64; - register_route_checked( + register_key_checked( &mut registry, &kp, - route, + key, timestamp_created_new, current_timestamp, - pin, weight, ); - let new_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); - assert_ne!(old_route, new_route); + let new_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); + assert_ne!(old_key, new_key); let cp = CPWrapper::new(&issuer_peer_id) .add_weight_tetraplets(1) .add_timestamp_tetraplets(2); let weight = get_weight(issuer_peer_id.clone(), weight); let result = - registry.republish_route_cp(old_route.clone(), weight, current_timestamp, cp.get()); + registry.republish_key_cp(old_key.clone(), weight, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); - let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); - assert_eq!(new_route, result_route); + let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); + assert_eq!(new_key, result_key); } #[test] - fn get_route_metadata_test() { + fn get_key_metadata_test() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let label = "some_route".to_string(); + let label = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; let challenge = vec![]; let challenge_type = "".to_string(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let route_bytes = registry.get_route_bytes( + let key_bytes = registry.get_key_bytes( label.clone(), vec![issuer_peer_id.clone()], timestamp_created, challenge.clone(), challenge_type.clone(), ); - let signature = kp.sign(&route_bytes).unwrap().to_vec().to_vec(); + let signature = kp.sign(&key_bytes).unwrap().to_vec().to_vec(); - let route_id = register_route_checked( + let key_id = register_key_checked( &mut registry, &kp, label.clone(), timestamp_created, current_timestamp, - pin, weight, ); - let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); - let expected_route = Route { - id: route_id, + let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); + let expected_key = Key { + id: key_id, label, - peer_id: issuer_peer_id, + owner_peer_id: issuer_peer_id, timestamp_created, challenge, challenge_type, signature, }; - assert_eq!(result_route, expected_route); + assert_eq!(result_key, expected_key); } #[test] - fn republish_same_route_test() { + fn republish_same_key_test() { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); let issuer_peer_id = kp.get_peer_id().to_base58(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; - let route_id = register_route_checked( + let key_id = register_key_checked( &mut registry, &kp, - route.clone(), + key.clone(), timestamp_created, current_timestamp, - pin, weight, ); - let result_route = get_route_metadata(&mut registry, route_id.clone(), current_timestamp); + let result_key = get_key_metadata(&mut registry, key_id.clone(), current_timestamp); let cp = CPWrapper::new(&issuer_peer_id) .add_weight_tetraplets(1) .add_timestamp_tetraplets(2); let weight = get_weight(issuer_peer_id.clone(), weight); let result = - registry.republish_route_cp(result_route.clone(), weight, current_timestamp, cp.get()); + registry.republish_key_cp(result_key.clone(), weight, current_timestamp, cp.get()); assert!(result.success, "{}", result.error); } @@ -804,19 +782,17 @@ mod tests { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; - let route_id = register_route_checked( + let key_id = register_key_checked( &mut registry, &kp, - route, + key, timestamp_created, current_timestamp, - pin, weight, ); let value = "some_value".to_string(); @@ -827,7 +803,7 @@ mod tests { put_record_checked( &mut registry, &kp, - route_id.clone(), + key_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -836,10 +812,10 @@ mod tests { weight, ); - let records = get_records(&mut registry, route_id.clone(), current_timestamp); + let records = get_records(&mut registry, key_id.clone(), current_timestamp); assert_eq!(records.len(), 1); let record = &records[0]; - assert_eq!(record.route_id, route_id); + assert_eq!(record.key_id, key_id); assert_eq!(record.relay_id, relay_id); assert_eq!(record.service_id, service_id); assert_eq!(record.peer_id, kp.get_peer_id().to_base58()); @@ -852,19 +828,17 @@ mod tests { clear_env(); let mut registry = ServiceInterface::new(); let kp = KeyPair::generate_ed25519(); - let route = "some_route".to_string(); + let key = "some_key".to_string(); let timestamp_created = 0u64; let current_timestamp = 100u64; let weight = 0; - let pin = false; - let route_id = register_route_checked( + let key_id = register_key_checked( &mut registry, &kp, - route, + key, timestamp_created, current_timestamp, - pin, weight, ); let value = "some_value".to_string(); @@ -875,7 +849,7 @@ mod tests { put_host_record_checked( &mut registry, &kp, - route_id.clone(), + key_id.clone(), value.clone(), relay_id.clone(), service_id.clone(), @@ -884,10 +858,10 @@ mod tests { weight, ); - let records = get_records(&mut registry, route_id.clone(), current_timestamp); + let records = get_records(&mut registry, key_id.clone(), current_timestamp); assert_eq!(records.len(), 1); let record = &records[0]; - assert_eq!(record.route_id, route_id); + assert_eq!(record.key_id, key_id); assert_eq!(record.relay_id, relay_id); assert_eq!(record.service_id, service_id); assert_eq!(record.set_by, kp.get_peer_id().to_base58());