simplify indexer service main loop

This commit is contained in:
Anton Kaliaev 2018-05-14 11:10:59 +04:00
parent 6f7333fd5f
commit 5e3a23df6d
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
2 changed files with 12 additions and 19 deletions

View File

@ -343,6 +343,7 @@ func NewNode(config *cfg.Config,
} }
indexerService := txindex.NewIndexerService(txIndexer, eventBus) indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
// run the profile server // run the profile server
profileHost := config.ProfListenAddress profileHost := config.ProfListenAddress

View File

@ -41,32 +41,24 @@ func (is *IndexerService) OnStart() error {
} }
go func() { go func() {
var numTxs, got int64
var batch *Batch
for { for {
select { e, ok := <-blockHeadersCh
case e, ok := <-blockHeadersCh: if !ok {
return
}
header := e.(types.EventDataNewBlockHeader).Header
batch := NewBatch(header.NumTxs)
for i := int64(0); i < header.NumTxs; i++ {
e, ok := <-txsCh
if !ok { if !ok {
is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i)
return return
} }
numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs
batch = NewBatch(numTxs)
case e, ok := <-txsCh:
if !ok {
return
}
if batch == nil {
panic("Expected pubsub to send block header first, but got tx event")
}
txResult := e.(types.EventDataTx).TxResult txResult := e.(types.EventDataTx).TxResult
batch.Add(&txResult) batch.Add(&txResult)
got++
if numTxs == got {
is.idr.AddBatch(batch)
batch = nil
got = 0
}
} }
is.idr.AddBatch(batch)
is.Logger.Info("Indexed block", "height", header.Height)
} }
}() }()
return nil return nil