mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-09 21:32:14 +00:00
fill in metrics
This commit is contained in:
parent
673e346ba4
commit
1af021846e
53
handlers/callbacks.go
Normal file
53
handlers/callbacks.go
Normal file
@ -0,0 +1,53 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"github.com/tendermint/go-event-meter"
|
||||
"github.com/tendermint/go-events"
|
||||
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// implements eventmeter.EventCallbackFunc
|
||||
func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter.EventCallbackFunc {
|
||||
return func(metric *eventmeter.EventMetric, data events.EventData) {
|
||||
block := data.(tmtypes.EventDataNewBlock).Block
|
||||
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
|
||||
// grab chain and validator
|
||||
chain := tn.Chains[chainID]
|
||||
val, _ := chain.Config.GetValidatorByID(valID)
|
||||
|
||||
// update height for validator
|
||||
val.BlockHeight = block.Header.Height
|
||||
|
||||
// possibly update height and mean block time for chain
|
||||
if block.Header.Height > chain.Status.Height {
|
||||
chain.Status.NewBlock(block)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// implements eventmeter.EventLatencyFunc
|
||||
func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.LatencyCallbackFunc {
|
||||
return func(latency float64) {
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
|
||||
// grab chain and validator
|
||||
chain := tn.Chains[chainID]
|
||||
val, _ := chain.Config.GetValidatorByID(valID)
|
||||
|
||||
// update latency for this validator and avg latency for chain
|
||||
mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators)
|
||||
mean = (mean - val.Latency + latency) / float64(chain.Status.NumValidators)
|
||||
val.Latency = latency
|
||||
chain.Status.MeanLatency = mean
|
||||
|
||||
// TODO: possibly update active nodes and uptime for chain
|
||||
chain.Status.ActiveValidators = chain.Status.NumValidators // XXX
|
||||
|
||||
}
|
||||
}
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/tendermint/go-wire"
|
||||
|
||||
"github.com/tendermint/netmon/types"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
type NetMonResult interface {
|
||||
@ -17,7 +18,7 @@ type NetMonResult interface {
|
||||
// for wire.readReflect
|
||||
var _ = wire.RegisterInterface(
|
||||
struct{ NetMonResult }{},
|
||||
wire.ConcreteType{&types.ChainAndValidatorIDs{}, 0x01},
|
||||
wire.ConcreteType{&types.ChainAndValidatorSetIDs{}, 0x01},
|
||||
wire.ConcreteType{&types.ChainState{}, 0x02},
|
||||
wire.ConcreteType{&types.Validator{}, 0x03},
|
||||
wire.ConcreteType{&eventmeter.EventMetric{}, 0x04},
|
||||
@ -47,20 +48,21 @@ func NewTendermintNetwork(chains ...*types.ChainState) *TendermintNetwork {
|
||||
//------------
|
||||
// Public Methods
|
||||
|
||||
func (tn *TendermintNetwork) RegisterChain(chain *types.ChainState) {
|
||||
func (tn *TendermintNetwork) Stop() {
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
tn.Chains[chain.Config.ID] = chain
|
||||
}
|
||||
|
||||
func (tn *TendermintNetwork) Stop() {
|
||||
// TODO: for each chain, stop each validator
|
||||
for _, c := range tn.Chains {
|
||||
for _, v := range c.Config.Validators {
|
||||
v.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------
|
||||
// RPC funcs
|
||||
|
||||
func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) {
|
||||
// Returns sorted lists of all chains and validator sets
|
||||
func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) {
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
chains := make([]string, len(tn.Chains))
|
||||
@ -77,7 +79,7 @@ func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorIDs, error) {
|
||||
}
|
||||
sort.StringSlice(chains).Sort()
|
||||
sort.StringSlice(valSets).Sort()
|
||||
return &types.ChainAndValidatorIDs{
|
||||
return &types.ChainAndValidatorSetIDs{
|
||||
ChainIDs: chains,
|
||||
ValidatorSetIDs: valSets,
|
||||
}, nil
|
||||
@ -91,10 +93,40 @@ func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unknown chain %s", chainID)
|
||||
}
|
||||
fmt.Println("CHAIN:", chain)
|
||||
return chain, nil
|
||||
}
|
||||
|
||||
func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) (*types.ChainState, error) {
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
|
||||
chainState := &types.ChainState{
|
||||
Config: chainConfig,
|
||||
Status: types.NewBlockchainStatus(),
|
||||
}
|
||||
chainState.Status.NumValidators = len(chainConfig.Validators)
|
||||
|
||||
chainState.Config.PopulateValIDMap()
|
||||
|
||||
// start the event meter and listen for new blocks on each validator
|
||||
for _, v := range chainConfig.Validators {
|
||||
|
||||
if err := v.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Validator.ID))
|
||||
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Validator.ID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get/set the validator's pub key
|
||||
v.PubKey()
|
||||
}
|
||||
tn.Chains[chainState.Config.ID] = chainState
|
||||
return chainState, nil
|
||||
}
|
||||
|
||||
func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) {
|
||||
tn.mtx.Lock()
|
||||
defer tn.mtx.Unlock()
|
||||
|
@ -2,6 +2,7 @@ package handlers
|
||||
|
||||
import (
|
||||
rpc "github.com/tendermint/go-rpc/server"
|
||||
"github.com/tendermint/netmon/types"
|
||||
)
|
||||
|
||||
func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc {
|
||||
@ -12,6 +13,7 @@ func Routes(network *TendermintNetwork) map[string]*rpc.RPCFunc {
|
||||
|
||||
"status": rpc.NewRPCFunc(StatusResult(network), ""),
|
||||
"blockchain": rpc.NewRPCFunc(GetChainResult(network), "chain"),
|
||||
"register_chain": rpc.NewRPCFunc(RegisterChainResult(network), "chainConfig"),
|
||||
"validator_set": rpc.NewRPCFunc(GetValidatorSetResult(network), "valsetID"),
|
||||
"validator": rpc.NewRPCFunc(GetValidatorResult(network), "valSetID,valID"),
|
||||
|
||||
@ -33,6 +35,12 @@ func GetChainResult(network *TendermintNetwork) interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterChainResult(network *TendermintNetwork) interface{} {
|
||||
return func(chainConfig *types.BlockchainConfig) (NetMonResult, error) {
|
||||
return network.RegisterChain(chainConfig)
|
||||
}
|
||||
}
|
||||
|
||||
func GetValidatorSetResult(network *TendermintNetwork) interface{} {
|
||||
return func(valSetID string) (NetMonResult, error) {
|
||||
return network.GetValidatorSet(valSetID)
|
||||
|
5
main.go
5
main.go
@ -58,14 +58,14 @@ func cmdMonitor(c *cli.Context) {
|
||||
}
|
||||
chainConfigFile := args[0]
|
||||
|
||||
chainState, err := types.LoadChainFromFile(chainConfigFile)
|
||||
chainConfig, err := types.LoadChainFromFile(chainConfigFile)
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
||||
// the main object that watches for changes and serves the rpc requests
|
||||
network := handlers.NewTendermintNetwork()
|
||||
network.RegisterChain(chainState)
|
||||
network.RegisterChain(chainConfig)
|
||||
|
||||
// the routes are functions on the network object
|
||||
routes := handlers.Routes(network)
|
||||
@ -80,7 +80,6 @@ func cmdMonitor(c *cli.Context) {
|
||||
}
|
||||
|
||||
TrapSignal(func() {
|
||||
// TODO: clean shutdown server, maybe persist last state
|
||||
network.Stop()
|
||||
})
|
||||
|
||||
|
@ -6,15 +6,56 @@ import (
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/tendermint/go-event-meter"
|
||||
"github.com/tendermint/go-events"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
//------------------------------------------------
|
||||
// blockchain types
|
||||
|
||||
// State of a chain
|
||||
// Known chain and validator set IDs (from which anything else can be found)
|
||||
type ChainAndValidatorSetIDs struct {
|
||||
ChainIDs []string `json:"chain_ids"`
|
||||
ValidatorSetIDs []string `json:"validator_set_ids"`
|
||||
}
|
||||
|
||||
// Basic chain and network metrics
|
||||
type BlockchainStatus struct {
|
||||
// Blockchain Info
|
||||
Height int `json:"height"`
|
||||
BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ?
|
||||
MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"`
|
||||
TxThroughput float64 `json:"tx_throughput" wire:"unsafe"`
|
||||
|
||||
blockTimeMeter metrics.Meter
|
||||
txThroughputMeter metrics.Meter
|
||||
|
||||
// Network Info
|
||||
NumValidators int `json:"num_validators"`
|
||||
ActiveValidators int `json:"active_validators"`
|
||||
ActiveNodes int `json:"active_nodes"`
|
||||
MeanLatency float64 `json:"mean_latency" wire:"unsafe"`
|
||||
Uptime float64 `json:"uptime" wire:"unsafe"`
|
||||
|
||||
// TODO: charts for block time, latency (websockets/event-meter ?)
|
||||
}
|
||||
|
||||
func NewBlockchainStatus() *BlockchainStatus {
|
||||
return &BlockchainStatus{
|
||||
blockTimeMeter: metrics.NewMeter(),
|
||||
txThroughputMeter: metrics.NewMeter(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) {
|
||||
s.Height = block.Header.Height
|
||||
s.blockTimeMeter.Mark(1)
|
||||
s.txThroughputMeter.Mark(int64(block.Header.NumTxs))
|
||||
s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean()
|
||||
s.TxThroughput = s.txThroughputMeter.RateMean()
|
||||
}
|
||||
|
||||
// Main chain state
|
||||
// Returned over RPC but also used to manage state
|
||||
type ChainState struct {
|
||||
Config *BlockchainConfig `json:"config"`
|
||||
@ -24,10 +65,10 @@ type ChainState struct {
|
||||
// basic chain config
|
||||
// threadsafe
|
||||
type BlockchainConfig struct {
|
||||
mtx sync.Mutex
|
||||
|
||||
ID string `json:"id"`
|
||||
ValSetID string `json:"val_set_id"`
|
||||
|
||||
mtx sync.Mutex
|
||||
Validators []*ChainValidator `json:"validators"`
|
||||
valIDMap map[string]int // map IDs to indices
|
||||
}
|
||||
@ -52,7 +93,7 @@ func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ChainValidator, err
|
||||
return bc.Validators[valIndex], nil
|
||||
}
|
||||
|
||||
func LoadChainFromFile(configFile string) (*ChainState, error) {
|
||||
func LoadChainFromFile(configFile string) (*BlockchainConfig, error) {
|
||||
|
||||
b, err := ioutil.ReadFile(configFile)
|
||||
if err != nil {
|
||||
@ -66,24 +107,5 @@ func LoadChainFromFile(configFile string) (*ChainState, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chainState := &ChainState{Config: chainConfig}
|
||||
|
||||
// start the event meter and listen for new blocks on each validator
|
||||
for _, v := range chainConfig.Validators {
|
||||
|
||||
if err := v.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), func(metric *eventmeter.EventMetric, data events.EventData) {
|
||||
// TODO: update chain status with block and metric
|
||||
// chainState.NewBlock(data.(tmtypes.EventDataNewBlock).Block)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get/set the validator's pub key
|
||||
v.PubKey()
|
||||
}
|
||||
return chainState, nil
|
||||
return chainConfig, nil
|
||||
}
|
||||
|
@ -1,45 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/tendermint/go-crypto"
|
||||
)
|
||||
|
||||
//---------------------------------------------
|
||||
// simple types
|
||||
|
||||
// Known chain and validator set IDs (from which anything else can be found)
|
||||
type ChainAndValidatorIDs struct {
|
||||
ChainIDs []string `json:"chain_ids"`
|
||||
ValidatorSetIDs []string `json:"validator_set_ids"`
|
||||
}
|
||||
|
||||
// basic chain status/metrics
|
||||
type BlockchainStatus struct {
|
||||
Height int `json:"height"`
|
||||
MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"`
|
||||
TxThroughput float64 `json:"tx_throughput" wire:"unsafe"`
|
||||
|
||||
BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ?
|
||||
}
|
||||
|
||||
// validator set (independent of chains)
|
||||
type ValidatorSet struct {
|
||||
Validators []*Validator `json:"validators"`
|
||||
}
|
||||
|
||||
func (vs *ValidatorSet) Validator(valID string) (*Validator, error) {
|
||||
for _, v := range vs.Validators {
|
||||
if v.ID == valID {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("Unknwon validator %s", valID)
|
||||
}
|
||||
|
||||
// validator (independent of chain)
|
||||
type Validator struct {
|
||||
ID string `json:"id"`
|
||||
PubKey crypto.PubKey `json:"pub_key"`
|
||||
Chains []string `json:"chains"`
|
||||
}
|
26
types/val.go
26
types/val.go
@ -16,6 +16,27 @@ import (
|
||||
//------------------------------------------------
|
||||
// validator types
|
||||
|
||||
// validator set (independent of chains)
|
||||
type ValidatorSet struct {
|
||||
Validators []*Validator `json:"validators"`
|
||||
}
|
||||
|
||||
func (vs *ValidatorSet) Validator(valID string) (*Validator, error) {
|
||||
for _, v := range vs.Validators {
|
||||
if v.ID == valID {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("Unknwon validator %s", valID)
|
||||
}
|
||||
|
||||
// validator (independent of chain)
|
||||
type Validator struct {
|
||||
ID string `json:"id"`
|
||||
PubKey crypto.PubKey `json:"pub_key"`
|
||||
Chains []string `json:"chains"`
|
||||
}
|
||||
|
||||
// Validator on a chain
|
||||
// Responsible for communication with the validator
|
||||
// Returned over RPC but also used to manage state
|
||||
@ -24,9 +45,12 @@ type ChainValidator struct {
|
||||
Addr string `json:"addr"` // do we want multiple addrs?
|
||||
Index int `json:"index"`
|
||||
|
||||
Latency float64 `json:"latency" wire:"unsafe"`
|
||||
BlockHeight int `json:"block_height"`
|
||||
|
||||
em *eventmeter.EventMeter // holds a ws connection to the val
|
||||
client *client.ClientURI // rpc client
|
||||
Latency float64 `json:"latency" wire:"unsafe"`
|
||||
|
||||
}
|
||||
|
||||
// Start a new event meter, including the websocket connection
|
||||
|
Loading…
x
Reference in New Issue
Block a user