rewrite indexer to be a listener of eventBus

This commit is contained in:
Anton Kaliaev 2017-11-15 15:07:08 -06:00
parent cd4be1f308
commit 29cd1a1b8f
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
15 changed files with 141 additions and 161 deletions

View File

@ -173,20 +173,6 @@ func NewNode(config *cfg.Config,
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
state.SetLogger(stateLogger) state.SetLogger(stateLogger)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
txIndexer = kv.NewTxIndex(store)
default:
txIndexer = &null.TxIndex{}
}
state.TxIndexer = txIndexer
// Generate node PrivKey // Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
@ -293,6 +279,30 @@ func NewNode(config *cfg.Config,
bcReactor.SetEventBus(eventBus) bcReactor.SetEventBus(eventBus)
consensusReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
txIndexer = kv.NewTxIndex(store)
default:
txIndexer = &null.TxIndex{}
}
// subscribe for all transactions and index them by tags
ch := make(chan interface{})
eventBus.Subscribe(context.Background(), "tx_index", types.EventQueryTx, ch)
go func() {
for event := range ch {
// XXX: may be not perfomant to write one event at a time
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
txIndexer.Index(&txResult)
}
}()
// run the profile server // run the profile server
profileHost := config.ProfListenAddress profileHost := config.ProfListenAddress
if profileHost != "" { if profileHost != "" {

View File

@ -100,7 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx // make sure this is the proper tx
require.EqualValues(tx, txe.Tx) require.EqualValues(tx, txe.Tx)
require.True(txe.Code.IsOK()) require.True(txe.Result.Code.IsOK())
} }
} }
@ -132,6 +132,6 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx // make sure this is the proper tx
require.EqualValues(tx, txe.Tx) require.EqualValues(tx, txe.Tx)
require.True(txe.Code.IsOK()) require.True(txe.Result.Code.IsOK())
} }
} }

View File

@ -104,7 +104,7 @@ func TestABCIQuery(t *testing.T) {
k, v, tx := MakeTxKV() k, v, tx := MakeTxKV()
bres, err := c.BroadcastTxCommit(tx) bres, err := c.BroadcastTxCommit(tx)
require.Nil(t, err, "%d: %+v", i, err) require.Nil(t, err, "%d: %+v", i, err)
apph := bres.Height + 1 // this is where the tx will be applied to the state apph := int(bres.Height) + 1 // this is where the tx will be applied to the state
// wait before querying // wait before querying
client.WaitForHeight(c, apph, nil) client.WaitForHeight(c, apph, nil)
@ -136,7 +136,7 @@ func TestAppCalls(t *testing.T) {
bres, err := c.BroadcastTxCommit(tx) bres, err := c.BroadcastTxCommit(tx)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(bres.DeliverTx.Code.IsOK()) require.True(bres.DeliverTx.Code.IsOK())
txh := bres.Height txh := int(bres.Height)
apph := txh + 1 // this is where the tx will be applied to the state apph := txh + 1 // this is where the tx will be applied to the state
// wait before querying // wait before querying
@ -153,7 +153,7 @@ func TestAppCalls(t *testing.T) {
// ptx, err := c.Tx(bres.Hash, true) // ptx, err := c.Tx(bres.Hash, true)
ptx, err := c.Tx(bres.Hash, true) ptx, err := c.Tx(bres.Hash, true)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
assert.Equal(txh, ptx.Height) assert.EqualValues(txh, ptx.Height)
assert.EqualValues(tx, ptx.Tx) assert.EqualValues(tx, ptx.Tx)
// and we can even check the block is added // and we can even check the block is added
@ -280,9 +280,9 @@ func TestTx(t *testing.T) {
require.NotNil(err) require.NotNil(err)
} else { } else {
require.Nil(err, "%+v", err) require.Nil(err, "%+v", err)
assert.Equal(txHeight, ptx.Height) assert.EqualValues(txHeight, ptx.Height)
assert.EqualValues(tx, ptx.Tx) assert.EqualValues(tx, ptx.Tx)
assert.Equal(0, ptx.Index) assert.Zero(ptx.Index)
assert.True(ptx.TxResult.Code.IsOK()) assert.True(ptx.TxResult.Code.IsOK())
// time to verify the proof // time to verify the proof

View File

@ -154,7 +154,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
deliverTxResCh := make(chan interface{}) deliverTxResCh := make(chan interface{})
q := types.EventQueryTx(tx) q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx") err = errors.Wrap(err, "failed to subscribe to tx")
@ -192,9 +192,9 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx)
// The tx was included in a block. // The tx was included in a block.
deliverTxR := &abci.ResponseDeliverTx{ deliverTxR := &abci.ResponseDeliverTx{
Code: deliverTxRes.Code, Code: deliverTxRes.Result.Code,
Data: deliverTxRes.Data, Data: deliverTxRes.Result.Data,
Log: deliverTxRes.Log, Log: deliverTxRes.Result.Log,
} }
logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR) logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{

View File

@ -82,13 +82,13 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return nil, fmt.Errorf("Tx (%X) not found", hash) return nil, fmt.Errorf("Tx (%X) not found", hash)
} }
height := int(r.Height) // XXX height := r.Height
index := int(r.Index) index := r.Index
var proof types.TxProof var proof types.TxProof
if prove { if prove {
block := blockStore.LoadBlock(height) block := blockStore.LoadBlock(int(height))
proof = block.Data.Txs.Proof(index) proof = block.Data.Txs.Proof(int(index))
} }
return &ctypes.ResultTx{ return &ctypes.ResultTx{

View File

@ -107,12 +107,12 @@ type ResultBroadcastTxCommit struct {
CheckTx abci.Result `json:"check_tx"` CheckTx abci.Result `json:"check_tx"`
DeliverTx abci.Result `json:"deliver_tx"` DeliverTx abci.Result `json:"deliver_tx"`
Hash data.Bytes `json:"hash"` Hash data.Bytes `json:"hash"`
Height int `json:"height"` Height uint64 `json:"height"`
} }
type ResultTx struct { type ResultTx struct {
Height int `json:"height"` Height uint64 `json:"height"`
Index int `json:"index"` Index uint32 `json:"index"`
TxResult abci.Result `json:"tx_result"` TxResult abci.Result `json:"tx_result"`
Tx types.Tx `json:"tx"` Tx types.Tx `json:"tx"`
Proof types.TxProof `json:"proof,omitempty"` Proof types.TxProof `json:"proof,omitempty"`

View File

@ -8,7 +8,6 @@ import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -54,47 +53,25 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
// TODO: make use of this info // TODO: make use of this info
// Blocks may include invalid txs. // Blocks may include invalid txs.
// reqDeliverTx := req.(abci.RequestDeliverTx) // reqDeliverTx := req.(abci.RequestDeliverTx)
txError := ""
txResult := r.DeliverTx txResult := r.DeliverTx
if txResult.Code == abci.CodeType_OK { if txResult.Code == abci.CodeType_OK {
validTxs++ validTxs++
} else { } else {
logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log)
invalidTxs++ invalidTxs++
txError = txResult.Code.String()
} }
abciResponses.DeliverTx[txIndex] = txResult
txIndex++
// NOTE: if we count we can access the tx from the block instead of // NOTE: if we count we can access the tx from the block instead of
// pulling it from the req // pulling it from the req
tx := types.Tx(req.GetDeliverTx().Tx) txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: uint64(block.Height),
Index: uint32(txIndex),
Tx: types.Tx(req.GetDeliverTx().Tx),
Result: *txResult,
}})
tags := make(map[string]interface{}) abciResponses.DeliverTx[txIndex] = txResult
for _, t := range txResult.Tags { txIndex++
// basic validation
if t.Key == "" {
logger.Info("Got tag with an empty key (skipping)", "tag", t, "tx", tx)
continue
}
if t.ValueString != "" {
tags[t.Key] = t.ValueString
} else {
tags[t.Key] = t.ValueInt
}
}
txEventPublisher.PublishEventTx(types.EventDataTx{
Height: block.Height,
Tx: tx,
Data: txResult.Data,
Code: txResult.Code,
Log: txResult.Log,
Tags: tags,
Error: txError,
})
} }
} }
proxyAppConn.SetResponseCallback(proxyCb) proxyAppConn.SetResponseCallback(proxyCb)
@ -227,7 +204,6 @@ func (s *State) validateBlock(block *types.Block) error {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ApplyBlock validates & executes the block, updates state w/ ABCI responses, // ApplyBlock validates & executes the block, updates state w/ ABCI responses,
// then commits and updates the mempool atomically, then saves state. // then commits and updates the mempool atomically, then saves state.
// Transaction results are optionally indexed.
// ApplyBlock validates the block against the state, executes it against the app, // ApplyBlock validates the block against the state, executes it against the app,
// commits it, and saves the block and state. It's the only function that needs to be called // commits it, and saves the block and state. It's the only function that needs to be called
@ -242,9 +218,6 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn
fail.Fail() // XXX fail.Fail() // XXX
// index txs. This could run in the background
s.indexTxs(abciResponses)
// save the results before we commit // save the results before we commit
s.SaveABCIResponses(abciResponses) s.SaveABCIResponses(abciResponses)
@ -293,26 +266,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
return mempool.Update(block.Height, block.Txs) return mempool.Update(block.Height, block.Txs)
} }
func (s *State) indexTxs(abciResponses *ABCIResponses) {
// save the tx results using the TxIndexer
// NOTE: these may be overwriting, but the values should be the same.
batch := txindex.NewBatch(len(abciResponses.DeliverTx))
for i, d := range abciResponses.DeliverTx {
tx := abciResponses.txs[i]
if err := batch.Add(types.TxResult{
Height: uint64(abciResponses.Height),
Index: uint32(i),
Tx: tx,
Result: *d,
}); err != nil {
s.logger.Error("Error with batch.Add", "err", err)
}
}
if err := s.TxIndexer.AddBatch(batch); err != nil {
s.logger.Error("Error adding batch", "err", err)
}
}
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit). // It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {

View File

@ -3,13 +3,11 @@ package state
import ( import (
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/abci/example/dummy" "github.com/tendermint/abci/example/dummy"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -31,8 +29,6 @@ func TestApplyBlock(t *testing.T) {
state := state() state := state()
state.SetLogger(log.TestingLogger()) state.SetLogger(log.TestingLogger())
indexer := &dummyIndexer{0}
state.TxIndexer = indexer
// make block // make block
block := makeBlock(1, state) block := makeBlock(1, state)
@ -40,7 +36,6 @@ func TestApplyBlock(t *testing.T) {
err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{})
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works
// TODO check state and mempool // TODO check state and mempool
} }
@ -75,16 +70,3 @@ func makeBlock(num int, state *State) *types.Block {
prevBlockID, valHash, state.AppHash, testPartSize) prevBlockID, valHash, state.AppHash, testPartSize)
return block return block
} }
// dummyIndexer increments counter every time we index transaction.
type dummyIndexer struct {
Indexed int
}
func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) {
return nil, nil
}
func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error {
indexer.Indexed += batch.Size()
return nil
}

View File

@ -15,8 +15,6 @@ import (
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -61,9 +59,6 @@ type State struct {
// AppHash is updated after Commit // AppHash is updated after Commit
AppHash []byte AppHash []byte
// TxIndexer indexes transactions
TxIndexer txindex.TxIndexer `json:"-"`
logger log.Logger logger log.Logger
} }
@ -95,7 +90,7 @@ func loadState(db dbm.DB, key []byte) *State {
return nil return nil
} }
s := &State{db: db, TxIndexer: &null.TxIndex{}} s := &State{db: db}
r, n, err := bytes.NewReader(buf), new(int), new(error) r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err) wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil { if *err != nil {
@ -114,8 +109,6 @@ func (s *State) SetLogger(l log.Logger) {
} }
// Copy makes a copy of the State for mutating. // Copy makes a copy of the State for mutating.
// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same
// underlying TxIndexer.
func (s *State) Copy() *State { func (s *State) Copy() *State {
return &State{ return &State{
db: s.db, db: s.db,
@ -125,7 +118,6 @@ func (s *State) Copy() *State {
Validators: s.Validators.Copy(), Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(), LastValidators: s.LastValidators.Copy(),
AppHash: s.AppHash, AppHash: s.AppHash,
TxIndexer: s.TxIndexer,
LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, LastHeightValidatorsChanged: s.LastHeightValidatorsChanged,
logger: s.logger, logger: s.logger,
ChainID: s.ChainID, ChainID: s.ChainID,
@ -368,7 +360,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
} }
} }
// we do not need indexer during replay and in tests
return &State{ return &State{
db: db, db: db,
@ -381,7 +372,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
Validators: types.NewValidatorSet(validators), Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil), LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash, AppHash: genDoc.AppHash,
TxIndexer: &null.TxIndex{},
LastHeightValidatorsChanged: 1, LastHeightValidatorsChanged: 1,
}, nil }, nil
} }

View File

@ -9,12 +9,12 @@ import (
// TxIndexer interface defines methods to index and search transactions. // TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface { type TxIndexer interface {
// AddBatch analyzes, indexes or stores a batch of transactions. // AddBatch analyzes, indexes and stores a batch of transactions.
// NOTE: We do not specify Index method for analyzing a single transaction
// here because it bears heavy performance losses. Almost all advanced indexers
// support batching.
AddBatch(b *Batch) error AddBatch(b *Batch) error
// Index analyzes, indexes and stores a single transaction.
Index(result *types.TxResult) error
// Get returns the transaction specified by hash or nil if the transaction is not indexed // Get returns the transaction specified by hash or nil if the transaction is not indexed
// or stored. // or stored.
Get(hash []byte) (*types.TxResult, error) Get(hash []byte) (*types.TxResult, error)

View File

@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
db "github.com/tendermint/tmlibs/db" db "github.com/tendermint/tmlibs/db"
@ -56,3 +56,10 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch.Write() storeBatch.Write()
return nil return nil
} }
// Index writes a single transaction into the TxIndex storage.
func (txi *TxIndex) Index(result *types.TxResult) error {
rawBytes := wire.BinaryBytes(result)
txi.store.Set(result.Tx.Hash(), rawBytes)
return nil
}

View File

@ -30,6 +30,17 @@ func TestTxIndex(t *testing.T) {
loadedTxResult, err := indexer.Get(hash) loadedTxResult, err := indexer.Get(hash)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, txResult, loadedTxResult) assert.Equal(t, txResult, loadedTxResult)
tx2 := types.Tx("BYE BYE WORLD")
txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
hash2 := tx2.Hash()
err = indexer.Index(txResult2)
require.Nil(t, err)
loadedTxResult2, err := indexer.Get(hash2)
require.Nil(t, err)
assert.Equal(t, txResult2, loadedTxResult2)
} }
func benchmarkTxIndex(txsCount int, b *testing.B) { func benchmarkTxIndex(txsCount int, b *testing.B) {

View File

@ -19,3 +19,8 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
return nil return nil
} }
// Index is a noop and always returns nil.
func (txi *TxIndex) Index(result *types.TxResult) error {
return nil
}

View File

@ -67,67 +67,95 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
//--- block, tx, and vote events //--- block, tx, and vote events
func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error {
return b.Publish(EventNewBlock, TMEventData{block}) return b.Publish(EventNewBlock, TMEventData{event})
} }
func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, TMEventData{header}) return b.Publish(EventNewBlockHeader, TMEventData{event})
} }
func (b *EventBus) PublishEventVote(vote EventDataVote) error { func (b *EventBus) PublishEventVote(event EventDataVote) error {
return b.Publish(EventVote, TMEventData{vote}) return b.Publish(EventVote, TMEventData{event})
} }
func (b *EventBus) PublishEventTx(tx EventDataTx) error { // PublishEventTx publishes tx event with tags from Result. Note it will add
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
// will be overwritten.
func (b *EventBus) PublishEventTx(event EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
tags := tx.Tags
// add predefined tags (they should overwrite any existing tags) tags := make(map[string]interface{})
// validate and fill tags from tx result
for _, tag := range event.Result.Tags {
// basic validation
if tag.Key == "" {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx)
continue
}
if tag.ValueString != "" {
tags[tag.Key] = tag.ValueString
} else {
tags[tag.Key] = tag.ValueInt
}
}
// add predefined tags
if tag, ok := tags[EventTypeKey]; ok {
b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag)
}
tags[EventTypeKey] = EventTx tags[EventTypeKey] = EventTx
tags[TxHashKey] = fmt.Sprintf("%X", tx.Tx.Hash())
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, tags) if tag, ok := tags[TxHashKey]; ok {
b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag)
}
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags)
return nil return nil
} }
func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, TMEventData{ph}) return b.Publish(EventProposalHeartbeat, TMEventData{event})
} }
//--- EventDataRoundState events //--- EventDataRoundState events
func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error {
return b.Publish(EventNewRoundStep, TMEventData{rs}) return b.Publish(EventNewRoundStep, TMEventData{event})
} }
func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, TMEventData{rs}) return b.Publish(EventTimeoutPropose, TMEventData{event})
} }
func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error {
return b.Publish(EventTimeoutWait, TMEventData{rs}) return b.Publish(EventTimeoutWait, TMEventData{event})
} }
func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error {
return b.Publish(EventNewRound, TMEventData{rs}) return b.Publish(EventNewRound, TMEventData{event})
} }
func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error {
return b.Publish(EventCompleteProposal, TMEventData{rs}) return b.Publish(EventCompleteProposal, TMEventData{event})
} }
func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { func (b *EventBus) PublishEventPolka(event EventDataRoundState) error {
return b.Publish(EventPolka, TMEventData{rs}) return b.Publish(EventPolka, TMEventData{event})
} }
func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error {
return b.Publish(EventUnlock, TMEventData{rs}) return b.Publish(EventUnlock, TMEventData{event})
} }
func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { func (b *EventBus) PublishEventRelock(event EventDataRoundState) error {
return b.Publish(EventRelock, TMEventData{rs}) return b.Publish(EventRelock, TMEventData{event})
} }
func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { func (b *EventBus) PublishEventLock(event EventDataRoundState) error {
return b.Publish(EventLock, TMEventData{rs}) return b.Publish(EventLock, TMEventData{event})
} }

View File

@ -3,7 +3,6 @@ package types
import ( import (
"fmt" "fmt"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
tmpubsub "github.com/tendermint/tmlibs/pubsub" tmpubsub "github.com/tendermint/tmlibs/pubsub"
tmquery "github.com/tendermint/tmlibs/pubsub/query" tmquery "github.com/tendermint/tmlibs/pubsub/query"
@ -110,13 +109,7 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx // All txs fire EventDataTx
type EventDataTx struct { type EventDataTx struct {
Height int `json:"height"` TxResult
Tx Tx `json:"tx"`
Data data.Bytes `json:"data"`
Log string `json:"log"`
Code abci.CodeType `json:"code"`
Tags map[string]interface{} `json:"tags"`
Error string `json:"error"` // this is redundant information for now
} }
type EventDataProposalHeartbeat struct { type EventDataProposalHeartbeat struct {
@ -168,9 +161,10 @@ var (
EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) EventQueryTimeoutWait = queryForEvent(EventTimeoutWait)
EventQueryVote = queryForEvent(EventVote) EventQueryVote = queryForEvent(EventVote)
EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat)
EventQueryTx = queryForEvent(EventTx)
) )
func EventQueryTx(tx Tx) tmpubsub.Query { func EventQueryTxFor(tx Tx) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash()))
} }