mirror of
https://github.com/fluencelabs/registry.git
synced 2025-04-24 17:52:14 +00:00
73 lines
2.4 KiB
Plaintext
73 lines
2.4 KiB
Plaintext
import Op, Debug, Peer, Kademlia from "@fluencelabs/aqua-lib/builtin.aqua"
|
|
import Spell from "@fluencelabs/spell/spell_service.aqua"
|
|
import Compare from "@fluencelabs/aqua-lib/math.aqua"
|
|
|
|
import "../aqua/registry-service.aqua"
|
|
import "../aqua/registry-api.aqua"
|
|
import "@fluencelabs/aqua-lib/binary.aqua"
|
|
import "@fluencelabs/trust-graph/trust-graph.aqua"
|
|
|
|
data SpellConfig:
|
|
expired_interval: u32
|
|
renew_interval: u32
|
|
replicate_interval:u32
|
|
|
|
-- A hack to allow using timestamp as u32 values
|
|
-- Aqua doesn't allow truncating values
|
|
service PeerTimeTrunc("peer"):
|
|
timestamp_sec() -> u32
|
|
|
|
func log_info(spell_id: string, msg: string):
|
|
Spell spell_id
|
|
Spell.list_push_string("logs", msg)
|
|
|
|
|
|
-- clears expired records
|
|
func clear_expired(now:u32):
|
|
Registry.clear_expired(now)
|
|
|
|
-- update stale local records
|
|
func renew(now:u32):
|
|
res <- Registry.get_stale_local_records(now)
|
|
for r <- res.result par:
|
|
signature <- getRecordSignature(r.metadata, now)
|
|
putRecord(r.metadata, now, signature.signature!)
|
|
|
|
-- get all old records and replicate it by routes
|
|
func replicate(now:u32):
|
|
res <- Registry.evict_stale(now)
|
|
for r <- res.results par:
|
|
k <- Op.string_to_b58(r.key.id)
|
|
nodes <- Kademlia.neighborhood(k, nil, nil)
|
|
for n <- nodes par:
|
|
on n:
|
|
tt <- Peer.timestamp_sec()
|
|
key_weight <- TrustGraph.get_weight(r.key.owner_peer_id, tt)
|
|
Registry.republish_key(r.key, key_weight, tt)
|
|
|
|
records_weights: *WeightResult
|
|
for record <- r.records:
|
|
records_weights <- TrustGraph.get_weight(record.metadata.issued_by, tt)
|
|
Registry.republish_records(r.records, records_weights, tt)
|
|
|
|
func spell(config: SpellConfig):
|
|
Spell "registry-spell"
|
|
log = (msg: string):
|
|
log_info("registry-spell", msg)
|
|
|
|
check_and_run = (key: string, now:u32, interval: u32, job: u32 -> ()):
|
|
last_run <- Spell.get_u32(key)
|
|
need_to_run <- or(not(last_run.success), Compare.gte(now - last_run.num, interval))
|
|
if need_to_run == true:
|
|
log(Op.concat_strings(Op.concat_strings("Running ", key), "job"))
|
|
job(now)
|
|
Spell.set_u32(key, now)
|
|
|
|
|
|
on HOST_PEER_ID:
|
|
now <- PeerTimeTrunc.timestamp_sec()
|
|
check_and_run("clear_expired", now, config.expired_interval, clear_expired)
|
|
check_and_run("renew", now, config.renew_interval, renew)
|
|
check_and_run("replicate", now, config.replicate_interval, replicate)
|
|
|