mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-24 17:52:14 +00:00
181 lines
5.3 KiB
Plaintext
181 lines
5.3 KiB
Plaintext
module Registry.Subnetwork declares *
|
|
|
|
import "registry-service.aqua"
|
|
import "registry-api.aqua"
|
|
import "misc.aqua"
|
|
import "constants.aqua"
|
|
import "@fluencelabs/aqua-lib/builtin.aqua"
|
|
import "@fluencelabs/trust-graph/trust-graph.aqua"
|
|
|
|
alias Error: string
|
|
|
|
const MIN_ACK = 2
|
|
|
|
func getInsecuredPeerId() -> PeerId:
|
|
Sig "insecure_sig"
|
|
peer_id <- Sig.get_peer_id()
|
|
<- peer_id
|
|
|
|
func getKeyInsecuredSignature(label: string, peer_id: PeerId, timestamp_created: u64) -> SignResult:
|
|
bytes <- Registry.get_key_bytes(label, ?[peer_id], timestamp_created, nil, "")
|
|
Sig "insecure_sig"
|
|
result <- Sig.sign(bytes)
|
|
<- result
|
|
|
|
-- peer_id comes from ("insecure_sig" "get_peer_id")
|
|
func registerSubnetworkKey(deal_id: string, peer_id: PeerId, timestamp_created: u64, signature: []u8) -> RegisterKeyResult:
|
|
t <- Peer.timestamp_sec()
|
|
weight <- TrustGraph.get_weight(peer_id, t)
|
|
result <- Registry.register_key(deal_id, ?[peer_id], timestamp_created, nil, "", signature, weight, t)
|
|
<- result
|
|
|
|
func createSubnetwork(deal_id: string) -> ?string, *Error:
|
|
t <- Peer.timestamp_sec()
|
|
|
|
subnetwork_id: ?string
|
|
error: *Error
|
|
on HOST_PEER_ID:
|
|
peer_id <- getInsecuredPeerId()
|
|
sig_result <- getKeyInsecuredSignature(deal_id, peer_id, t)
|
|
if sig_result.success == false:
|
|
error <<- sig_result.error!
|
|
else:
|
|
signature = sig_result.signature!
|
|
|
|
id <- Registry.get_key_id(deal_id, peer_id)
|
|
nodes <- getNeighbors(id)
|
|
successful: *bool
|
|
on HOST_PEER_ID:
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
res <- registerSubnetworkKey(deal_id, peer_id, t, signature)
|
|
|
|
if res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- res.error
|
|
|
|
success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT)
|
|
|
|
if success == false:
|
|
error <<- "key wasn't created: timeout exceeded"
|
|
else:
|
|
subnetwork_id <<- id
|
|
|
|
<- subnetwork_id, error
|
|
|
|
func getWorkerRecordMetadata(subnetwork_id: string) -> ?RecordMetadata, ?string:
|
|
t <- Peer.timestamp_sec()
|
|
relay_id = ?[HOST_PEER_ID]
|
|
bytes <- Registry.get_record_metadata_bytes(subnetwork_id, INIT_PEER_ID, t, "", INIT_PEER_ID, relay_id, nil, nil)
|
|
on INIT_PEER_ID:
|
|
Sig "sig"
|
|
sig_result <- Sig.sign(bytes)
|
|
|
|
result: ?RecordMetadata
|
|
error: *string
|
|
if sig_result.success == true:
|
|
result <- Registry.create_record_metadata(subnetwork_id, INIT_PEER_ID, t, "", INIT_PEER_ID, relay_id, nil, nil, sig_result.signature!)
|
|
else:
|
|
error <<- sig_result.error!
|
|
|
|
<- result, error
|
|
|
|
func getWorkerRecordSignature(metadata: RecordMetadata, timestamp_created: u64) -> SignResult:
|
|
signature: *SignResult
|
|
bytes <- Registry.get_record_bytes(metadata, timestamp_created)
|
|
|
|
on INIT_PEER_ID:
|
|
Sig "sig"
|
|
signature <- Sig.sign(bytes)
|
|
|
|
<- signature!
|
|
|
|
func registerWorker(subnetwork_id: string) -> bool, *Error:
|
|
success: *bool
|
|
error: *Error
|
|
|
|
metadata, err <- getWorkerRecordMetadata(subnetwork_id)
|
|
if metadata == nil:
|
|
success <<- false
|
|
error <<- err!
|
|
else:
|
|
t <- Peer.timestamp_sec()
|
|
sig_result = getWorkerRecordSignature(metadata!, t)
|
|
if sig_result.success == false:
|
|
error <<- sig_result.error!
|
|
success <<- false
|
|
else:
|
|
key, error_get <- getResourceHelper(subnetwork_id)
|
|
|
|
if key == nil:
|
|
appendErrors(error, error_get)
|
|
success <<- false
|
|
else:
|
|
republish_result <- republishKey(key!)
|
|
if republish_result.success == false:
|
|
error <<- republish_result.error
|
|
success <<- false
|
|
else:
|
|
p_res <- putRecord(metadata!, t, sig_result.signature!)
|
|
if p_res.success == false:
|
|
error <<- p_res.error
|
|
success <<- false
|
|
else:
|
|
nodes <- getNeighbors(subnetwork_id)
|
|
|
|
successful: *bool
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
republish_res <- republishKey(key!)
|
|
if republish_res.success == false:
|
|
error <<- republish_res.error
|
|
else:
|
|
put_res <- putRecord(metadata!, t, sig_result.signature!)
|
|
if put_res.success:
|
|
successful <<- true
|
|
else:
|
|
error <<- put_res.error
|
|
|
|
success <- wait(successful, INITIAL_REPLICATION_FACTOR, DEFAULT_TIMEOUT)
|
|
|
|
succ = success!
|
|
if succ == false:
|
|
error <<- "worker hasn't registered: timeout exceeded"
|
|
|
|
<- succ, error
|
|
|
|
func resolveSubnetwork(deal_id: string) -> ?[]Record, *Error:
|
|
peer_id <- getInsecuredPeerId()
|
|
key_id <- Registry.get_key_id(deal_id, peer_id)
|
|
|
|
nodes <- getNeighbors(key_id)
|
|
result: ?[]Record
|
|
records: *[]Record
|
|
error: *Error
|
|
successful: *bool
|
|
on HOST_PEER_ID:
|
|
for n <- nodes par:
|
|
on n:
|
|
try:
|
|
t <- Peer.timestamp_sec()
|
|
get_result <- Registry.get_records(key_id, t)
|
|
if get_result.success:
|
|
records <<- get_result.result
|
|
successful <<- true
|
|
else:
|
|
error <<- get_result.error
|
|
|
|
success <- wait(successful, MIN_ACK, DEFAULT_TIMEOUT)
|
|
if success == false:
|
|
error <<- "timeout exceeded"
|
|
else:
|
|
merged <- Registry.merge(records)
|
|
if merged.success == false:
|
|
error <<- merged.error
|
|
else:
|
|
result <<- merged.result
|
|
<- result, error
|