p2p metric, make height and totalTxs gauges

This commit is contained in:
Anton Kaliaev
2018-06-15 15:10:25 +04:00
parent 0cb50c05fc
commit 19699d644f
8 changed files with 125 additions and 89 deletions

View File

@ -42,7 +42,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in // Next: we need to set a switch in order for peers to be added in
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), p2p.NopMetrics())
// Lastly: let's add some blocks in // Lastly: let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {

View File

@ -38,7 +38,7 @@ func TestByzantine(t *testing.T) {
switches := make([]*p2p.Switch, N) switches := make([]*p2p.Switch, N)
p2pLogger := logger.With("module", "p2p") p2pLogger := logger.With("module", "p2p")
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
switches[i] = p2p.NewSwitch(config.P2P) switches[i] = p2p.NewSwitch(config.P2P, p2p.NopMetrics())
switches[i].SetLogger(p2pLogger.With("validator", i)) switches[i].SetLogger(p2pLogger.With("validator", i))
} }

View File

@ -6,7 +6,8 @@ import "github.com/go-kit/kit/metrics/discard"
// Metrics contains metrics exposed by this package. // Metrics contains metrics exposed by this package.
// see MetricsProvider for descriptions. // see MetricsProvider for descriptions.
type Metrics struct { type Metrics struct {
Height metrics.Counter Height metrics.Gauge
Rounds metrics.Gauge Rounds metrics.Gauge
Validators metrics.Gauge Validators metrics.Gauge
@ -19,21 +20,28 @@ type Metrics struct {
BlockIntervalSeconds metrics.Histogram BlockIntervalSeconds metrics.Histogram
NumTxs metrics.Gauge NumTxs metrics.Gauge
TotalTxs metrics.Counter
BlockSizeBytes metrics.Gauge BlockSizeBytes metrics.Gauge
TotalTxs metrics.Gauge
} }
// NopMetrics returns no-op Metrics. // NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics { func NopMetrics() *Metrics {
return &Metrics{ return &Metrics{
Height: discard.NewCounter(), Height: discard.NewGauge(),
Rounds: discard.NewGauge(),
Validators: discard.NewGauge(), Validators: discard.NewGauge(),
ValidatorsPower: discard.NewGauge(),
MissingValidators: discard.NewGauge(), MissingValidators: discard.NewGauge(),
MissingValidatorsPower: discard.NewGauge(),
ByzantineValidators: discard.NewGauge(), ByzantineValidators: discard.NewGauge(),
ByzantineValidatorsPower: discard.NewGauge(),
BlockIntervalSeconds: discard.NewHistogram(), BlockIntervalSeconds: discard.NewHistogram(),
NumTxs: discard.NewGauge(), NumTxs: discard.NewGauge(),
TotalTxs: discard.NewCounter(),
BlockSizeBytes: discard.NewGauge(), BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
} }
} }

View File

@ -391,7 +391,7 @@ func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *t
// internal functions for managing the state // internal functions for managing the state
func (cs *ConsensusState) updateHeight(height int64) { func (cs *ConsensusState) updateHeight(height int64) {
cs.metrics.Height.Add(float64(height - cs.Height)) cs.metrics.Height.Set(float64(height))
cs.Height = height cs.Height = height
} }
@ -697,7 +697,6 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
} }
logger.Info(cmn.Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) logger.Info(cmn.Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
cs.metrics.Rounds.Set(float64(round))
// Increment validators if necessary // Increment validators if necessary
validators := cs.Validators validators := cs.Validators
@ -724,6 +723,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
cs.metrics.Rounds.Set(float64(round))
// Wait for txs to be available in the mempool // Wait for txs to be available in the mempool
// before we enterPropose in round 0. If the last block changed the app hash, // before we enterPropose in round 0. If the last block changed the app hash,
@ -1282,6 +1282,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
fail.Fail() // XXX fail.Fail() // XXX
// must be called before we update state
cs.recordMetrics(height, block) cs.recordMetrics(height, block)
// NewHeightStep! // NewHeightStep!
@ -1332,9 +1333,8 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
} }
cs.metrics.NumTxs.Set(float64(block.NumTxs)) cs.metrics.NumTxs.Set(float64(block.NumTxs))
cs.metrics.TotalTxs.Add(float64(block.NumTxs))
cs.metrics.BlockSizeBytes.Set(float64(block.Size())) cs.metrics.BlockSizeBytes.Set(float64(block.Size()))
cs.metrics.TotalTxs.Set(float64(block.TotalTxs))
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------

View File

@ -90,14 +90,14 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
) )
} }
// MetricsProvider returns a consensus Metrics. // MetricsProvider returns a consensus and p2p Metrics.
type MetricsProvider func() *cs.Metrics type MetricsProvider func() (*cs.Metrics, *p2p.Metrics)
// DefaultMetrics returns a consensus Metrics build using Prometheus client // DefaultMetricsProvider returns a consensus and p2p Metrics build using
// library. // Prometheus client library.
func DefaultMetricsProvider() *cs.Metrics { func DefaultMetricsProvider() (*cs.Metrics, *p2p.Metrics) {
return &cs.Metrics{ return &cs.Metrics{
Height: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus", Subsystem: "consensus",
Name: "height", Name: "height",
Help: "Height of the chain.", Help: "Height of the chain.",
@ -151,17 +151,22 @@ func DefaultMetricsProvider() *cs.Metrics {
Name: "num_txs", Name: "num_txs",
Help: "Number of transactions.", Help: "Number of transactions.",
}, []string{}), }, []string{}),
TotalTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: "consensus",
Name: "total_txs",
Help: "Total number of transactions.",
}, []string{}),
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus", Subsystem: "consensus",
Name: "block_size_bytes", Name: "block_size_bytes",
Help: "Size of the block.", Help: "Size of the block.",
}, []string{}), }, []string{}),
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Name: "total_txs",
Help: "Total number of transactions.",
}, []string{}),
}, &p2p.Metrics{
Peers: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "p2p",
Name: "peers",
Help: "Number of peers.",
}, []string{}),
} }
} }
@ -321,8 +326,9 @@ func NewNode(config *cfg.Config,
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
csMetrics, p2pMetrics := metricsProvider()
// Make ConsensusReactor // Make ConsensusReactor
csMetrics := metricsProvider()
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(), consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
blockExec, blockStore, mempool, evidencePool, csMetrics) blockExec, blockStore, mempool, evidencePool, csMetrics)
consensusState.SetLogger(consensusLogger) consensusState.SetLogger(consensusLogger)
@ -334,7 +340,7 @@ func NewNode(config *cfg.Config,
p2pLogger := logger.With("module", "p2p") p2pLogger := logger.With("module", "p2p")
sw := p2p.NewSwitch(config.P2P) sw := p2p.NewSwitch(config.P2P, p2pMetrics)
sw.SetLogger(p2pLogger) sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("BLOCKCHAIN", bcReactor)

17
p2p/metrics.go Normal file
View File

@ -0,0 +1,17 @@
package p2p
import "github.com/go-kit/kit/metrics"
import "github.com/go-kit/kit/metrics/discard"
// Metrics contains metrics exposed by this package.
// see MetricsProvider for descriptions.
type Metrics struct {
Peers metrics.Gauge
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Peers: discard.NewGauge(),
}
}

View File

@ -73,10 +73,12 @@ type Switch struct {
mConfig conn.MConnConfig mConfig conn.MConnConfig
rng *cmn.Rand // seed for randomizing dial times and orders rng *cmn.Rand // seed for randomizing dial times and orders
metrics *Metrics
} }
// NewSwitch creates a new Switch with the given config. // NewSwitch creates a new Switch with the given config.
func NewSwitch(cfg *config.P2PConfig) *Switch { func NewSwitch(cfg *config.P2PConfig, metrics *Metrics) *Switch {
sw := &Switch{ sw := &Switch{
config: cfg, config: cfg,
reactors: make(map[string]Reactor), reactors: make(map[string]Reactor),
@ -85,6 +87,7 @@ func NewSwitch(cfg *config.P2PConfig) *Switch {
peers: NewPeerSet(), peers: NewPeerSet(),
dialing: cmn.NewCMap(), dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(), reconnecting: cmn.NewCMap(),
metrics: metrics,
} }
// Ensure we have a completely undeterministic PRNG. // Ensure we have a completely undeterministic PRNG.
@ -279,6 +282,7 @@ func (sw *Switch) StopPeerGracefully(peer Peer) {
func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
sw.peers.Remove(peer) sw.peers.Remove(peer)
sw.metrics.Peers.Add(float64(-1))
peer.Stop() peer.Stop()
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason) reactor.RemovePeer(peer, reason)
@ -623,6 +627,7 @@ func (sw *Switch) addPeer(pc peerConn) error {
if err := sw.peers.Add(peer); err != nil { if err := sw.peers.Add(peer); err != nil {
return err return err
} }
sw.metrics.Peers.Add(float64(1))
sw.Logger.Info("Added peer", "peer", peer) sw.Logger.Info("Added peer", "peer", peer)
return nil return nil

View File

@ -137,7 +137,7 @@ func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitc
nodeKey := &NodeKey{ nodeKey := &NodeKey{
PrivKey: crypto.GenPrivKeyEd25519(), PrivKey: crypto.GenPrivKeyEd25519(),
} }
sw := NewSwitch(cfg) sw := NewSwitch(cfg, NopMetrics())
sw.SetLogger(log.TestingLogger()) sw.SetLogger(log.TestingLogger())
sw = initSwitch(i, sw) sw = initSwitch(i, sw)
ni := NodeInfo{ ni := NodeInfo{