comments; fix locks

This commit is contained in:
Ethan Buchman 2016-01-27 00:27:24 -05:00
parent ec8fd017a5
commit 85874a3765
4 changed files with 168 additions and 116 deletions

View File

@ -4,50 +4,38 @@ import (
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter"
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-events" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-events"
"github.com/tendermint/netmon/types"
tmtypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/types"
) )
/*
Each chain-validator gets an eventmeter which maintains the websocket
Certain pre-defined events may update the netmon state: latency pongs, new blocks
TODO: config changes for new validators and changing ip/port
*/
// implements eventmeter.EventCallbackFunc // implements eventmeter.EventCallbackFunc
func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter.EventCallbackFunc { // updates validator and possibly chain with new block
func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc {
return func(metric *eventmeter.EventMetric, data events.EventData) { return func(metric *eventmeter.EventMetric, data events.EventData) {
block := data.(tmtypes.EventDataNewBlock).Block block := data.(tmtypes.EventDataNewBlock).Block
tn.mtx.Lock() // these functions are thread safe
defer tn.mtx.Unlock() // we should run them concurrently
// grab chain and validator
chain := tn.Chains[chainID]
val, _ := chain.Config.GetValidatorByID(valID)
// update height for validator // update height for validator
val.Status.BlockHeight = block.Header.Height val.NewBlock(block)
// possibly update height and mean block time for chain // possibly update height and mean block time for chain
if block.Header.Height > chain.Status.Height { chainState.NewBlock(block)
chain.Status.NewBlock(block)
}
} }
} }
// implements eventmeter.EventLatencyFunc // implements eventmeter.EventLatencyFunc
func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.LatencyCallbackFunc { func (tn *TendermintNetwork) latencyCallback(chain *types.ChainState, val *types.ValidatorState) eventmeter.LatencyCallbackFunc {
return func(latency float64) { return func(latency float64) {
tn.mtx.Lock() oldLatency := val.UpdateLatency(latency)
defer tn.mtx.Unlock() chain.UpdateLatency(oldLatency, latency)
// grab chain and validator
chain := tn.Chains[chainID]
val, _ := chain.Config.GetValidatorByID(valID)
// update latency for this validator and avg latency for chain
mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators)
mean = (mean - val.Status.Latency + latency) / float64(chain.Status.NumValidators)
val.Status.Latency = latency
chain.Status.MeanLatency = mean
// TODO: possibly update active nodes and uptime for chain
chain.Status.ActiveValidators = chain.Status.NumValidators // XXX
} }
} }

View File

@ -20,8 +20,9 @@ var _ = wire.RegisterInterface(
struct{ NetMonResult }{}, struct{ NetMonResult }{},
wire.ConcreteType{&types.ChainAndValidatorSetIDs{}, 0x01}, wire.ConcreteType{&types.ChainAndValidatorSetIDs{}, 0x01},
wire.ConcreteType{&types.ChainState{}, 0x02}, wire.ConcreteType{&types.ChainState{}, 0x02},
wire.ConcreteType{&types.Validator{}, 0x03}, wire.ConcreteType{&types.ValidatorSet{}, 0x10},
wire.ConcreteType{&eventmeter.EventMetric{}, 0x04}, wire.ConcreteType{&types.Validator{}, 0x11},
wire.ConcreteType{&eventmeter.EventMetric{}, 0x20},
) )
//--------------------------------------------- //---------------------------------------------
@ -33,15 +34,11 @@ type TendermintNetwork struct {
ValSets map[string]*types.ValidatorSet `json:"validator_sets"` ValSets map[string]*types.ValidatorSet `json:"validator_sets"`
} }
// TODO: populate validator sets func NewTendermintNetwork() *TendermintNetwork {
func NewTendermintNetwork(chains ...*types.ChainState) *TendermintNetwork {
network := &TendermintNetwork{ network := &TendermintNetwork{
Chains: make(map[string]*types.ChainState), Chains: make(map[string]*types.ChainState),
ValSets: make(map[string]*types.ValidatorSet), ValSets: make(map[string]*types.ValidatorSet),
} }
for _, chain := range chains {
network.Chains[chain.Config.ID] = chain
}
return network return network
} }
@ -58,8 +55,12 @@ func (tn *TendermintNetwork) Stop() {
} }
} }
//------------ //-----------------------------------------------------------
// RPC funcs // RPC funcs
//-----------------------------------------------------------
//------------------
// Status
// Returns sorted lists of all chains and validator sets // Returns sorted lists of all chains and validator sets
func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) { func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) {
@ -86,6 +87,11 @@ func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) {
} }
// NOTE: returned values should not be manipulated by callers as they are pointers to the state!
//------------------
// Blockchains
// Get the current state of a chain
func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) { func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) {
tn.mtx.Lock() tn.mtx.Lock()
defer tn.mtx.Unlock() defer tn.mtx.Unlock()
@ -96,9 +102,10 @@ func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error)
return chain, nil return chain, nil
} }
// Register a new chain on the network.
// For each validator, start a websocket connection to listen for new block events and record latency
func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) (*types.ChainState, error) { func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) (*types.ChainState, error) {
tn.mtx.Lock() // Don't bother locking until we touch the TendermintNetwork object
defer tn.mtx.Unlock()
chainState := &types.ChainState{ chainState := &types.ChainState{
Config: chainConfig, Config: chainConfig,
@ -106,6 +113,7 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig)
} }
chainState.Status.NumValidators = len(chainConfig.Validators) chainState.Status.NumValidators = len(chainConfig.Validators)
// so we can easily lookup validators by id rather than index
chainState.Config.PopulateValIDMap() chainState.Config.PopulateValIDMap()
// start the event meter and listen for new blocks on each validator // start the event meter and listen for new blocks on each validator
@ -115,19 +123,27 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig)
if err := v.Start(); err != nil { if err := v.Start(); err != nil {
return nil, err return nil, err
} }
v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Config.Validator.ID))
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Config.Validator.ID)) v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v))
err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v))
if err != nil { if err != nil {
return nil, err return nil, err
} }
// get/set the validator's pub key // get/set the validator's pub key
// TODO: possibly remove? why should we depend on this here?
v.PubKey() v.PubKey()
} }
tn.mtx.Lock()
defer tn.mtx.Unlock()
tn.Chains[chainState.Config.ID] = chainState tn.Chains[chainState.Config.ID] = chainState
return chainState, nil return chainState, nil
} }
//------------------
// Validators
func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) { func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) {
tn.mtx.Lock() tn.mtx.Lock()
defer tn.mtx.Unlock() defer tn.mtx.Unlock()
@ -159,6 +175,9 @@ func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Valida
return val, nil return val, nil
} }
//------------------
// Event metering
func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error { func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error {
tn.mtx.Lock() tn.mtx.Lock()
defer tn.mtx.Unlock() defer tn.mtx.Unlock()
@ -190,6 +209,7 @@ func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmet
return val.EventMeter().GetMetric(eventID) return val.EventMeter().GetMetric(eventID)
} }
// assumes lock is held
func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ValidatorState, error) { func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ValidatorState, error) {
chain, ok := tn.Chains[chainID] chain, ok := tn.Chains[chainID]
if !ok { if !ok {

View File

@ -1,9 +1,7 @@
package types package types
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"sync" "sync"
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/rcrowley/go-metrics" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/rcrowley/go-metrics"
@ -12,75 +10,50 @@ import (
//------------------------------------------------ //------------------------------------------------
// blockchain types // blockchain types
//------------------------------------------------
// Known chain and validator set IDs (from which anything else can be found) // Known chain and validator set IDs (from which anything else can be found)
// Returned by the Status RPC
type ChainAndValidatorSetIDs struct { type ChainAndValidatorSetIDs struct {
ChainIDs []string `json:"chain_ids"` ChainIDs []string `json:"chain_ids"`
ValidatorSetIDs []string `json:"validator_set_ids"` ValidatorSetIDs []string `json:"validator_set_ids"`
} }
// Basic chain and network metrics //------------------------------------------------
type BlockchainStatus struct { // chain state
// Blockchain Info
Height int `json:"height"`
BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ?
MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"`
TxThroughput float64 `json:"tx_throughput" wire:"unsafe"`
blockTimeMeter metrics.Meter
txThroughputMeter metrics.Meter
// Network Info
NumValidators int `json:"num_validators"`
ActiveValidators int `json:"active_validators"`
ActiveNodes int `json:"active_nodes"`
MeanLatency float64 `json:"mean_latency" wire:"unsafe"`
Uptime float64 `json:"uptime" wire:"unsafe"`
// TODO: charts for block time, latency (websockets/event-meter ?)
}
func NewBlockchainStatus() *BlockchainStatus {
return &BlockchainStatus{
blockTimeMeter: metrics.NewMeter(),
txThroughputMeter: metrics.NewMeter(),
}
}
func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) {
s.Height = block.Header.Height
s.blockTimeMeter.Mark(1)
s.txThroughputMeter.Mark(int64(block.Header.NumTxs))
s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean()
s.TxThroughput = s.txThroughputMeter.RateMean()
}
// Main chain state // Main chain state
// Returned over RPC but also used to manage state // Returned over RPC; also used to manage state
type ChainState struct { type ChainState struct {
Config *BlockchainConfig `json:"config"` Config *BlockchainConfig `json:"config"`
Status *BlockchainStatus `json:"status"` Status *BlockchainStatus `json:"status"`
} }
// chain config without ValidatorState func (cs *ChainState) NewBlock(block *tmtypes.Block) {
type BlockchainBaseConfig struct { cs.Status.NewBlock(block)
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
Validators []*ValidatorConfig `json:"validators"`
} }
// basic chain config func (cs *ChainState) UpdateLatency(oldLatency, newLatency float64) {
// threadsafe cs.Status.UpdateLatency(oldLatency, newLatency)
type BlockchainConfig struct { }
ID string `json:"id"`
ValSetID string `json:"val_set_id"`
//------------------------------------------------
// Blockchain Config: id, validator config
// Chain Config
type BlockchainConfig struct {
// should be fixed for life of chain
ID string `json:"id"`
ValSetID string `json:"val_set_id"` // NOTE: do we really commit to one val set per chain?
// handles live validator states (latency, last block, etc)
// and validator set changes
mtx sync.Mutex mtx sync.Mutex
Validators []*ValidatorState `json:"validators"` // TODO: this should be ValidatorConfig and the state in BlockchainStatus Validators []*ValidatorState `json:"validators"` // TODO: this should be ValidatorConfig and the state in BlockchainStatus
valIDMap map[string]int // map IDs to indices valIDMap map[string]int // map IDs to indices
} }
// So we can fetch validator by id // So we can fetch validator by id rather than index
func (bc *BlockchainConfig) PopulateValIDMap() { func (bc *BlockchainConfig) PopulateValIDMap() {
bc.mtx.Lock() bc.mtx.Lock()
defer bc.mtx.Unlock() defer bc.mtx.Unlock()
@ -100,19 +73,61 @@ func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ValidatorState, err
return bc.Validators[valIndex], nil return bc.Validators[valIndex], nil
} }
func LoadChainFromFile(configFile string) (*BlockchainConfig, error) { //------------------------------------------------
// BlockchainStatus
b, err := ioutil.ReadFile(configFile) // Basic blockchain metrics
if err != nil { type BlockchainStatus struct {
return nil, err mtx sync.Mutex
}
// for now we start with one blockchain loaded from file; // Blockchain Info
// eventually more can be uploaded or created through endpoints Height int `json:"height"`
chainConfig := new(BlockchainConfig) BlockchainSize int64 `json:"blockchain_size"`
if err := json.Unmarshal(b, chainConfig); err != nil { MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"`
return nil, err TxThroughput float64 `json:"tx_throughput" wire:"unsafe"`
}
return chainConfig, nil blockTimeMeter metrics.Meter
txThroughputMeter metrics.Meter
// Network Info
NumValidators int `json:"num_validators"`
ActiveValidators int `json:"active_validators"`
ActiveNodes int `json:"active_nodes"`
MeanLatency float64 `json:"mean_latency" wire:"unsafe"`
Uptime float64 `json:"uptime" wire:"unsafe"`
// What else can we get / do we want?
// TODO: charts for block time, latency (websockets/event-meter ?)
}
func NewBlockchainStatus() *BlockchainStatus {
return &BlockchainStatus{
blockTimeMeter: metrics.NewMeter(),
txThroughputMeter: metrics.NewMeter(),
}
}
func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) {
s.mtx.Lock()
defer s.mtx.Unlock()
if block.Header.Height > s.Height {
s.Height = block.Header.Height
s.blockTimeMeter.Mark(1)
s.txThroughputMeter.Mark(int64(block.Header.NumTxs))
s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean()
s.TxThroughput = s.txThroughputMeter.RateMean()
}
}
func (s *BlockchainStatus) UpdateLatency(oldLatency, newLatency float64) {
s.mtx.Lock()
defer s.mtx.Unlock()
// update latency for this validator and avg latency for chain
mean := s.MeanLatency * float64(s.NumValidators)
mean = (mean - oldLatency + newLatency) / float64(s.NumValidators)
s.MeanLatency = mean
// TODO: possibly update active nodes and uptime for chain
s.ActiveValidators = s.NumValidators // XXX
} }

View File

@ -3,7 +3,7 @@ package types
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "sync"
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-crypto" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-crypto"
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter"
@ -11,10 +11,15 @@ import (
client "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-rpc/client" client "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-rpc/client"
"github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-wire" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-wire"
ctypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/types"
) )
//------------------------------------------------ //------------------------------------------------
// validator types // validator types
//------------------------------------------------
//------------------------------------------------
// simple validator set and validator (just crypto, no network)
// validator set (independent of chains) // validator set (independent of chains)
type ValidatorSet struct { type ValidatorSet struct {
@ -38,25 +43,19 @@ type Validator struct {
Chains []string `json:"chains,omitempty"` // TODO: put this elsewhere (?) Chains []string `json:"chains,omitempty"` // TODO: put this elsewhere (?)
} }
type ValidatorConfig struct { //------------------------------------------------
Validator *Validator `json:"validator"` // Live validator on a chain
P2PAddr string `json:"p2p_addr"`
RPCAddr string `json:"rpc_addr"`
Index int `json:"index,omitempty"`
}
type ValidatorStatus struct {
Latency float64 `json:"latency" wire:"unsafe"`
BlockHeight int `json:"block_height"`
}
// Validator on a chain // Validator on a chain
// Responsible for communication with the validator
// Returned over RPC but also used to manage state // Returned over RPC but also used to manage state
// Responsible for communication with the validator
type ValidatorState struct { type ValidatorState struct {
Config *ValidatorConfig `json:"config"` Config *ValidatorConfig `json:"config"`
Status *ValidatorStatus `json:"status"` Status *ValidatorStatus `json:"status"`
// Currently we get IPs and dial,
// but should reverse so the nodes dial the netmon,
// both for node privacy and easier reconfig (validators changing ip/port)
em *eventmeter.EventMeter // holds a ws connection to the val em *eventmeter.EventMeter // holds a ws connection to the val
client *client.ClientURI // rpc client client *client.ClientURI // rpc client
} }
@ -81,8 +80,23 @@ func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter {
return vs.em return vs.em
} }
func (vs *ValidatorState) NewBlock(block *tmtypes.Block) {
vs.Status.mtx.Lock()
defer vs.Status.mtx.Unlock()
vs.Status.BlockHeight = block.Header.Height
}
func (vs *ValidatorState) UpdateLatency(latency float64) float64 {
vs.Status.mtx.Lock()
defer vs.Status.mtx.Unlock()
old := vs.Status.Latency
vs.Status.Latency = latency
return old
}
// Return the validators pubkey. If it's not yet set, get it from the node // Return the validators pubkey. If it's not yet set, get it from the node
// TODO: proof that it's the node's key // TODO: proof that it's the node's key
// XXX: Is this necessary? Why would it not be set
func (vs *ValidatorState) PubKey() crypto.PubKey { func (vs *ValidatorState) PubKey() crypto.PubKey {
if vs.Config.Validator.PubKey != nil { if vs.Config.Validator.PubKey != nil {
return vs.Config.Validator.PubKey return vs.Config.Validator.PubKey
@ -99,6 +113,20 @@ func (vs *ValidatorState) PubKey() crypto.PubKey {
return vs.Config.Validator.PubKey return vs.Config.Validator.PubKey
} }
type ValidatorConfig struct {
mtx sync.Mutex
Validator *Validator `json:"validator"`
P2PAddr string `json:"p2p_addr"`
RPCAddr string `json:"rpc_addr"`
Index int `json:"index,omitempty"`
}
type ValidatorStatus struct {
mtx sync.Mutex
Latency float64 `json:"latency" wire:"unsafe"`
BlockHeight int `json:"block_height"`
}
//--------------------------------------------------- //---------------------------------------------------
// utilities // utilities
@ -112,7 +140,8 @@ func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
} }
event, ok := (*result).(*ctypes.ResultEvent) event, ok := (*result).(*ctypes.ResultEvent)
if !ok { if !ok {
return "", nil, fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result)) return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe)
// fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result))
} }
return event.Name, event.Data, nil return event.Name, event.Data, nil