extract tags from DeliverTx/Result

and send them along with predefined
This commit is contained in:
Anton Kaliaev
2017-11-09 17:35:46 -05:00
parent f233cde9a9
commit a52cdbfe43
8 changed files with 28 additions and 13 deletions

View File

@ -392,6 +392,7 @@ func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result {
r.Code, r.Code,
r.Data, r.Data,
r.Log, r.Log,
r.Tags,
} }
} }

2
glide.lock generated
View File

@ -98,7 +98,7 @@ imports:
- leveldb/table - leveldb/table
- leveldb/util - leveldb/util
- name: github.com/tendermint/abci - name: github.com/tendermint/abci
version: 76ef8a0697c6179220a74c479b36c27a5b53008a version: 6b47155e08732f46dafdcef185d23f0ff9ff24a5
subpackages: subpackages:
- client - client
- example/counter - example/counter

View File

@ -18,7 +18,7 @@ import:
- package: github.com/spf13/viper - package: github.com/spf13/viper
version: v1.0.0 version: v1.0.0
- package: github.com/tendermint/abci - package: github.com/tendermint/abci
version: ~0.7.0 version: 6b47155e08732f46dafdcef185d23f0ff9ff24a5
subpackages: subpackages:
- client - client
- example/dummy - example/dummy

View File

@ -75,6 +75,7 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
Data: txResult.Data, Data: txResult.Data,
Code: txResult.Code, Code: txResult.Code,
Log: txResult.Log, Log: txResult.Log,
Tags: txResult.Tags,
Error: txError, Error: txError,
} }
txEventPublisher.PublishEventTx(event) txEventPublisher.PublishEventTx(event)

View File

@ -78,8 +78,8 @@ func TestABCIResponsesSaveLoad(t *testing.T) {
// build mock responses // build mock responses
block := makeBlock(2, state) block := makeBlock(2, state)
abciResponses := NewABCIResponses(block) abciResponses := NewABCIResponses(block)
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Tags: []*abci.KVPair{}}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Tags: []*abci.KVPair{}}
abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{
{ {
PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(),

View File

@ -17,7 +17,7 @@ func TestTxIndex(t *testing.T) {
indexer := &TxIndex{store: db.NewMemDB()} indexer := &TxIndex{store: db.NewMemDB()}
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
hash := tx.Hash() hash := tx.Hash()
batch := txindex.NewBatch(1) batch := txindex.NewBatch(1)
@ -34,7 +34,7 @@ func TestTxIndex(t *testing.T) {
func benchmarkTxIndex(txsCount int, b *testing.B) { func benchmarkTxIndex(txsCount int, b *testing.B) {
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
dir, err := ioutil.TempDir("", "tx_index_db") dir, err := ioutil.TempDir("", "tx_index_db")
if err != nil { if err != nil {

View File

@ -82,7 +82,19 @@ func (b *EventBus) PublishEventVote(vote EventDataVote) error {
func (b *EventBus) PublishEventTx(tx EventDataTx) error { func (b *EventBus) PublishEventTx(tx EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) tags := make(map[string]interface{})
for _, t := range tx.Tags {
// TODO [@melekes]: validate, but where?
if t.ValueString != "" {
tags[t.Key] = t.ValueString
} else {
tags[t.Key] = t.ValueInt
}
}
// predefined tags should come last
tags[EventTypeKey] = EventTx
tags[TxHashKey] = fmt.Sprintf("%X", tx.Tx.Hash())
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, tags)
return nil return nil
} }

View File

@ -110,12 +110,13 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx // All txs fire EventDataTx
type EventDataTx struct { type EventDataTx struct {
Height int `json:"height"` Height int `json:"height"`
Tx Tx `json:"tx"` Tx Tx `json:"tx"`
Data data.Bytes `json:"data"` Data data.Bytes `json:"data"`
Log string `json:"log"` Log string `json:"log"`
Code abci.CodeType `json:"code"` Code abci.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now Tags []*abci.KVPair `json:"tags"`
Error string `json:"error"` // this is redundant information for now
} }
type EventDataProposalHeartbeat struct { type EventDataProposalHeartbeat struct {