listen for blocks, not headers

to be able to record txs throughput
This commit is contained in:
Anton Kaliaev 2019-07-29 10:40:48 +04:00
parent 851f1edfd9
commit 58c84a209f
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
3 changed files with 8 additions and 8 deletions

View File

@ -82,7 +82,7 @@ func (m *Monitor) Monitor(n *Node) error {
m.Nodes = append(m.Nodes, n)
m.mtx.Unlock()
blockCh := make(chan tmtypes.Header, 10)
blockCh := make(chan *tmtypes.Block, 10)
n.SendBlocksTo(blockCh)
blockLatencyCh := make(chan float64, 10)
n.SendBlockLatenciesTo(blockLatencyCh)
@ -165,7 +165,7 @@ func (m *Monitor) Stop() {
}
// main loop where we listen for events from the node
func (m *Monitor) listen(nodeName string, blockCh <-chan tmtypes.Header, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
func (m *Monitor) listen(nodeName string, blockCh <-chan *tmtypes.Block, blockLatencyCh <-chan float64, disconnectCh <-chan bool, quit <-chan struct{}) {
logger := m.logger.With("node", nodeName)
for {

View File

@ -69,7 +69,7 @@ func NewNetwork() *Network {
}
}
func (n *Network) NewBlock(b tmtypes.Header) {
func (n *Network) NewBlock(b *tmtypes.Block) {
n.mu.Lock()
defer n.mu.Unlock()
@ -85,7 +85,7 @@ func (n *Network) NewBlock(b tmtypes.Header) {
} else {
n.AvgBlockTime = 0.0
}
// n.txThroughputMeter.Mark(int64(b.NumTxs)) TODO:
n.txThroughputMeter.Mark(int64(len(b.Data.Txs)))
n.AvgTxThroughput = n.txThroughputMeter.Rate1()
}

View File

@ -35,7 +35,7 @@ type Node struct {
// rpcClient is an client for making RPC calls to TM
rpcClient rpc_client.HTTPClient
blockCh chan<- tmtypes.Header
blockCh chan<- *tmtypes.Block
blockLatencyCh chan<- float64
disconnectCh chan<- bool
@ -79,7 +79,7 @@ func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
}
}
func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
func (n *Node) SendBlocksTo(ch chan<- *tmtypes.Block) {
n.blockCh = ch
}
@ -103,7 +103,7 @@ func (n *Node) Start() error {
}
n.em.RegisterLatencyCallback(latencyCallback(n))
err := n.em.Subscribe(tmtypes.EventQueryNewBlockHeader.String(), newBlockCallback(n))
err := n.em.Subscribe(tmtypes.EventQueryNewBlock.String(), newBlockCallback(n))
if err != nil {
return err
}
@ -128,7 +128,7 @@ func (n *Node) Stop() {
// implements eventmeter.EventCallbackFunc
func newBlockCallback(n *Node) em.EventCallbackFunc {
return func(metric *em.EventMetric, data interface{}) {
block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlockHeader).Header
block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlock).Block
n.Height = block.Height
n.logger.Info("new block", "height", block.Height)